53 _dbFname = ostr.str();
55 _dbIname = ostr.str();
59 if ((nof = !
exist(_dbFname)) || purge)
65 vector<string> dblst, idxlst;
66 dblst.push_back(_dbFname);
67 idxlst.push_back(_dbIname);
72 ostr << _dbFname <<
'.' << (ii + 1);
73 dblst.push_back(ostr.str());
75 idxlst.push_back(ostr.str());
78 for (
unsigned ii(_rotnum); ii; --ii)
80 rename (dblst[ii - 1].c_str(), dblst[ii].c_str());
81 rename (idxlst[ii - 1].c_str(), idxlst[ii].c_str());
86 if ((_fod = open(_dbFname.c_str(), O_RDWR | O_CREAT | O_TRUNC |
O_BINARY, 0600)) < 0)
88 glout_error <<
"Error: creating database: " << _dbFname <<
" (" << strerror(errno) <<
')';
91 if ((_iod = open(_dbIname.c_str(), O_RDWR | O_CREAT | O_TRUNC |
O_BINARY, 0600)) < 0)
93 glout_error <<
"Error: creating database index: " << _dbIname <<
" (" << strerror(errno) <<
')';
101 glout_info << (_rotnum ?
"Rotated and purged perist db" :
"Purged perist db");
106 if ((_fod = open(_dbFname.c_str(), O_RDWR |
O_BINARY)) < 0)
108 glout_error <<
"Error: opening existing database: " << _dbFname <<
" (" << strerror(errno) <<
')';
111 if ((_iod = open(_dbIname.c_str(), O_RDWR |
O_BINARY)) < 0)
113 glout_error <<
"Error: opening existing database index: " << _dbIname <<
" (" << strerror(errno) <<
')';
120 const ssize_t blrd(read(_iod, static_cast<void *>(&iprec),
sizeof(
IPrec)));
123 glout_error <<
"Error: reading existing database index: " << _dbIname <<
" (" << strerror(errno) <<
')';
134 if (!_index.insert({iprec._seq, iprec._prec}).second)
136 glout_warn <<
"Warning: inserting index record into database index: " << _dbIname <<
" (" << iprec <<
"). Ignoring.";
142 glout_info <<
"Database " << _dbFname <<
" indexed " << _index.size() <<
" records.";
146 if (get_last_seqnum(last))
148 glout_info << _dbFname <<
": Last sequence is " << last;
152 return _opened =
true;
165 return sequence = _index.empty() ? 0 : _index.rbegin()->first;
172 unsigned last_seq(0);
173 get_last_seqnum(last_seq);
174 unsigned recs_sent(0), startSeqNum(find_nearest_highest_seqnum (from, last_seq));
175 const unsigned finish(to == 0 ? last_seq : to);
178 if (!startSeqNum || from > finish)
186 Index::const_iterator itr(_index.find(startSeqNum));
187 if (itr != _index.end())
193 if (!itr->first || itr->first > finish)
195 if (lseek(_fod, itr->second._offset, SEEK_SET) < 0)
197 glout_error <<
"Error: could not seek to correct index location for get: " << _dbFname;
201 if (read (_fod, buff, itr->second._size) != itr->second._size)
203 glout_error <<
"Error: could not read message record for seqnum " << itr->first <<
" from: " << _dbFname;
209 if (!(session.*callback)(txresult, rctx))
211 glout_debug <<
"Retransmission callback signalled an error, not sending any more records from: " << _dbFname;
215 while(++itr != _index.end());
222 glout_error <<
"record not found (" << startSeqNum <<
')';
233 IPrec iprec(0, sender_seqnum, target_seqnum);
234 Index::iterator itr(_index.find(0));
235 if (itr == _index.end())
236 _index.insert({0, iprec._prec});
238 itr->second = iprec.
_prec;
240 if (lseek(_iod, 0, SEEK_SET) < 0)
242 glout_error <<
"Error: could not seek to 0 for seqnum persitence: " << _dbIname;
245 return write (_iod, static_cast<void *>(&iprec),
sizeof(
IPrec)) ==
sizeof(
IPrec);
251 if (!_opened || !seqnum)
254 if (_index.find(seqnum) != _index.end())
256 glout_error <<
"Error: seqnum " << seqnum <<
" already persisted in: " << _dbIname;
259 if (lseek(_iod, 0, SEEK_END) < 0)
261 glout_error <<
"Error: could not seek to index end for seqnum persitence: " << _dbIname;
265 if ((offset = lseek(_fod, 0, SEEK_END)) < 0)
267 glout_error <<
"Error: could not seek to end for seqnum persitence: " << _dbFname;
270 IPrec iprec(seqnum, offset, static_cast<unsigned>(what.size()));
271 if (write (_iod, static_cast<void *>(&iprec),
sizeof(
IPrec)) !=
sizeof(
IPrec))
273 glout_error <<
"Error: could not write index record for seqnum " << seqnum <<
" to: " << _dbIname;
276 if (write (_fod, what.data(),
static_cast<unsigned>(what.size())) !=
static_cast<ssize_t
>(what.size()))
278 glout_error <<
"Error: could not write record for seqnum " << seqnum <<
" to: " << _dbFname;
282 return _index.insert({seqnum, iprec.
_prec}).second;
293 glout_warn <<
"Warning: index is empty: " << _dbIname;
297 Index::const_iterator itr(_index.find(0));
298 if (itr == _index.end())
300 glout_error <<
"Error: index does not contain control record: " << _dbIname;
304 sender_seqnum = itr->second._offset;
305 target_seqnum = itr->second._size;
312 if (!_opened || !seqnum || _index.empty())
314 Index::const_iterator itr(_index.find(seqnum));
315 if (itr == _index.end())
317 glout_warn <<
"Warning: index does not contain seqnum: " << seqnum <<
" in: " << _dbIname;
321 if (lseek(_fod, itr->second._offset, SEEK_SET) < 0)
323 glout_error <<
"Error: could not seek to correct index location for get: " << _dbFname;
328 if (read (_fod, buff, itr->second._size) != itr->second._size)
330 glout_error <<
"Error: could not read message record for seqnum " << seqnum <<
" from: " << _dbFname;
334 to.assign(buff, itr->second._size);
343 for (
unsigned startseqnum(requested); startseqnum <= last; ++startseqnum)
345 Index::const_iterator itr(_index.find(startseqnum));
346 if (itr != _index.end())
Fix8 Base Session. User sessions are derived from this class.
std::pair< const unsigned, const f8String > SequencePair
virtual F8API bool initialise(const f8String &dbDir, const f8String &dbFname, bool purge=false)
Provides context to your retrans handler.
virtual F8API bool put(const unsigned seqnum, const f8String &what)
unsigned get_next_send_seq() const
static const int max_rotation
#define FIX8_MAX_MSG_LENGTH
F8API std::string & CheckAddTrailingSlash(std::string &source)
virtual F8API ~FilePersister()
Dtor.
virtual F8API unsigned find_nearest_highest_seqnum(const unsigned requested, const unsigned last) const
virtual F8API unsigned get_last_seqnum(unsigned &to) const
virtual F8API bool get(const unsigned seqnum, f8String &to) const
bool exist(const std::string &fname)