fix8  version 1.4.0
Open Source C++ FIX Framework
FIX8::FilePersister Class Reference

#include <persist.hpp>

Inheritance diagram for FIX8::FilePersister:
FIX8::Persister

Public Member Functions

 FilePersister (unsigned rotnum=0)
 Ctor. More...
 
virtual F8API ~FilePersister ()
 Dtor. More...
 
virtual F8API bool initialise (const f8String &dbDir, const f8String &dbFname, bool purge=false)
 
virtual F8API bool put (const unsigned seqnum, const f8String &what)
 
virtual F8API bool put (const unsigned sender_seqnum, const unsigned target_seqnum)
 
virtual F8API bool get (const unsigned seqnum, f8String &to) const
 
virtual F8API unsigned get (const unsigned from, const unsigned to, Session &session, bool(Session::*)(const Session::SequencePair &with, Session::RetransmissionContext &rctx)) const
 
virtual F8API unsigned get_last_seqnum (unsigned &to) const
 
virtual F8API bool get (unsigned &sender_seqnum, unsigned &target_seqnum) const
 
virtual F8API unsigned find_nearest_highest_seqnum (const unsigned requested, const unsigned last) const
 
- Public Member Functions inherited from FIX8::Persister
 Persister ()=default
 Ctor. More...
 
virtual ~Persister ()
 Dtor. More...
 
 Persister (const Persister &)=delete
 
Persisteroperator= (const Persister &)=delete
 
virtual bool put (const f8String &key, const f8String &what)
 
virtual bool get (const f8String &key, f8String &to) const
 
virtual bool del (const f8String &key)
 
virtual bool purge ()
 
virtual void stop ()
 Stop the persister thread. More...
 

Private Types

using Index = std::map< uint32_t, Prec >
 

Private Attributes

f8String _dbFname
 
f8String _dbIname
 
int _fod
 
int _iod
 
unsigned _rotnum
 
bool _wasCreated
 
Index _index
 

Additional Inherited Members

- Public Types inherited from FIX8::Persister
enum  { MaxMsgLen = FIX8_MAX_MSG_LENGTH }
 Maximum length of persisted FIX message. More...
 
- Protected Attributes inherited from FIX8::Persister
bool _opened = false
 

Detailed Description

Definition at line 391 of file persist.hpp.

Member Typedef Documentation

using FIX8::FilePersister::Index = std::map<uint32_t, Prec>
private

Definition at line 398 of file persist.hpp.

Constructor & Destructor Documentation

FIX8::FilePersister::FilePersister ( unsigned  rotnum = 0)
inline

Ctor.

Definition at line 403 of file persist.hpp.

403 : _fod(-1), _iod(-1), _rotnum(rotnum), _wasCreated() {}
FilePersister::~FilePersister ( )
virtual

Dtor.

Definition at line 156 of file filepersist.cpp.

157 {
158  close(_fod);
159  close(_iod);
160 }

Member Function Documentation

unsigned FilePersister::find_nearest_highest_seqnum ( const unsigned  requested,
const unsigned  last 
) const
virtual

Find the nearest highest sequence number from the sequence to last provided.

Parameters
requestedsequence number to start
lasthighest sequence
Returns
the nearest sequence number or 0 if not found

Implements FIX8::Persister.

Definition at line 339 of file filepersist.cpp.

340 {
341  if (last)
342  {
343  for (unsigned startseqnum(requested); startseqnum <= last; ++startseqnum)
344  {
345  Index::const_iterator itr(_index.find(startseqnum));
346  if (itr != _index.end())
347  return itr->first;
348  }
349  }
350 
351  return 0;
352 }
bool FilePersister::get ( const unsigned  seqnum,
f8String to 
) const
virtual

Retrieve a persisted message.

Parameters
seqnumsequence number of message
totarget message string
Returns
true on success

Implements FIX8::Persister.

Definition at line 310 of file filepersist.cpp.

References FIX8_MAX_MSG_LENGTH, glout_error, and glout_warn.

311 {
312  if (!_opened || !seqnum || _index.empty())
313  return false;
314  Index::const_iterator itr(_index.find(seqnum));
315  if (itr == _index.end())
316  {
317  glout_warn << "Warning: index does not contain seqnum: " << seqnum << " in: " << _dbIname;
318  return false;
319  }
320 
321  if (lseek(_fod, itr->second._offset, SEEK_SET) < 0)
322  {
323  glout_error << "Error: could not seek to correct index location for get: " << _dbFname;
324  return false;
325  }
326 
327  char buff[FIX8_MAX_MSG_LENGTH];
328  if (read (_fod, buff, itr->second._size) != itr->second._size)
329  {
330  glout_error << "Error: could not read message record for seqnum " << seqnum << " from: " << _dbFname;
331  return false;
332  }
333 
334  to.assign(buff, itr->second._size);
335  return true;
336 }
#define glout_error
Definition: logger.hpp:606
#define glout_warn
Definition: logger.hpp:604
#define FIX8_MAX_MSG_LENGTH
Definition: f8config.h:576
unsigned FilePersister::get ( const unsigned  from,
const unsigned  to,
Session session,
bool(Session::*)(const Session::SequencePair &with, Session::RetransmissionContext &rctx)  callback 
) const
virtual

Retrieve a range of persisted messages.

Parameters
fromstart at sequence number
toend sequence number
sessionsession containing callback method
callbackmethod to call with each retrieved message
Returns
number of messages retrieved

Implements FIX8::Persister.

Definition at line 169 of file filepersist.cpp.

References FIX8::Session::RetransmissionContext::_no_more_records, FIX8_MAX_MSG_LENGTH, FIX8::Session::get_next_send_seq(), glout_debug, glout_error, and glout_info.

171 {
172  unsigned last_seq(0);
173  get_last_seqnum(last_seq);
174  unsigned recs_sent(0), startSeqNum(find_nearest_highest_seqnum (from, last_seq));
175  const unsigned finish(to == 0 ? last_seq : to);
176  Session::RetransmissionContext rctx(from, to, session.get_next_send_seq());
177 
178  if (!startSeqNum || from > finish)
179  {
180  glout_info << "No records found";
181  rctx._no_more_records = true;
182  (session.*callback)(Session::SequencePair(0, ""), rctx);
183  return 0;
184  }
185 
186  Index::const_iterator itr(_index.find(startSeqNum));
187  if (itr != _index.end())
188  {
189  char buff[FIX8_MAX_MSG_LENGTH];
190 
191  do
192  {
193  if (!itr->first || itr->first > finish)
194  break;
195  if (lseek(_fod, itr->second._offset, SEEK_SET) < 0)
196  {
197  glout_error << "Error: could not seek to correct index location for get: " << _dbFname;
198  break;
199  }
200 
201  if (read (_fod, buff, itr->second._size) != itr->second._size)
202  {
203  glout_error << "Error: could not read message record for seqnum " << itr->first << " from: " << _dbFname;
204  break;
205  }
206 
207  Session::SequencePair txresult(itr->first, f8String(buff, itr->second._size));
208  ++recs_sent;
209  if (!(session.*callback)(txresult, rctx))
210  {
211  glout_debug << "Retransmission callback signalled an error, not sending any more records from: " << _dbFname;
212  break;
213  }
214  }
215  while(++itr != _index.end());
216 
217  rctx._no_more_records = true;
218  (session.*callback)(Session::SequencePair(0, ""), rctx);
219  }
220  else
221  {
222  glout_error << "record not found (" << startSeqNum << ')';
223  }
224 
225  return recs_sent;
226 }
std::pair< const unsigned, const f8String > SequencePair
Definition: session.hpp:615
Provides context to your retrans handler.
Definition: session.hpp:598
#define glout_error
Definition: logger.hpp:606
unsigned get_next_send_seq() const
Definition: session.hpp:750
#define glout_debug
Definition: logger.hpp:614
#define FIX8_MAX_MSG_LENGTH
Definition: f8config.h:576
#define glout_info
Definition: logger.hpp:601
virtual F8API unsigned find_nearest_highest_seqnum(const unsigned requested, const unsigned last) const
virtual F8API unsigned get_last_seqnum(unsigned &to) const
std::string f8String
Definition: f8types.hpp:47
bool FilePersister::get ( unsigned &  sender_seqnum,
unsigned &  target_seqnum 
) const
virtual

Retrieve a sequence control record.

Parameters
sender_seqnumsequence number of last sent message
target_seqnumsequence number of last received message
Returns
true on success

Implements FIX8::Persister.

Definition at line 286 of file filepersist.cpp.

References glout_error, and glout_warn.

287 {
288  if (!_opened)
289  return false;
290 
291  if (_index.empty())
292  {
293  glout_warn << "Warning: index is empty: " << _dbIname;
294  return false;
295  }
296 
297  Index::const_iterator itr(_index.find(0));
298  if (itr == _index.end())
299  {
300  glout_error << "Error: index does not contain control record: " << _dbIname;
301  return false;
302  }
303 
304  sender_seqnum = itr->second._offset;
305  target_seqnum = itr->second._size;
306  return true;
307 }
#define glout_error
Definition: logger.hpp:606
#define glout_warn
Definition: logger.hpp:604
unsigned FilePersister::get_last_seqnum ( unsigned &  to) const
virtual

Retrieve sequence number of last peristed message.

Parameters
totarget sequence number
Returns
sequence number of last peristed message on success

Implements FIX8::Persister.

Definition at line 163 of file filepersist.cpp.

164 {
165  return sequence = _index.empty() ? 0 : _index.rbegin()->first;
166 }
bool FilePersister::initialise ( const f8String dbDir,
const f8String dbFname,
bool  purge = false 
)
virtual

Open existing database or create new database.

Parameters
dbDirdatabase directory
dbFnamedatabase name
purgeif true, empty database if found
Returns
true on success

Definition at line 45 of file filepersist.cpp.

References FIX8::IPrec::_seq, FIX8::CheckAddTrailingSlash(), FIX8::exist(), glout_error, glout_info, glout_warn, FIX8::Logger::max_rotation, and O_BINARY.

46 {
47  if (_opened)
48  return true;
49 
50  f8String odbdir(dbDir);
51  ostringstream ostr;
52  ostr << CheckAddTrailingSlash(odbdir) << dbFname;
53  _dbFname = ostr.str();
54  ostr << ".idx";
55  _dbIname = ostr.str();
56 
57  bool nof;
58 
59  if ((nof = !exist(_dbFname)) || purge)
60  {
61  if (purge)
62  {
63  if (_rotnum > 0)
64  {
65  vector<string> dblst, idxlst;
66  dblst.push_back(_dbFname);
67  idxlst.push_back(_dbIname);
68 
69  for (unsigned ii(0); ii < _rotnum && ii < Logger::max_rotation; ++ii)
70  {
71  ostringstream ostr;
72  ostr << _dbFname << '.' << (ii + 1);
73  dblst.push_back(ostr.str());
74  ostr << ".idx";
75  idxlst.push_back(ostr.str());
76  }
77 
78  for (unsigned ii(_rotnum); ii; --ii)
79  {
80  rename (dblst[ii - 1].c_str(), dblst[ii].c_str()); // ignore errors
81  rename (idxlst[ii - 1].c_str(), idxlst[ii].c_str()); // ignore errors
82  }
83  }
84  }
85 
86  if ((_fod = open(_dbFname.c_str(), O_RDWR | O_CREAT | O_TRUNC | O_BINARY, 0600)) < 0)
87  {
88  glout_error << "Error: creating database: " << _dbFname << " (" << strerror(errno) << ')';
89  return false;
90  }
91  if ((_iod = open(_dbIname.c_str(), O_RDWR | O_CREAT | O_TRUNC | O_BINARY, 0600)) < 0)
92  {
93  glout_error << "Error: creating database index: " << _dbIname << " (" << strerror(errno) << ')';
94  return false;
95  }
96 
97  _wasCreated = true;
98 
99  if (purge && !nof)
100  {
101  glout_info << (_rotnum ? "Rotated and purged perist db" : "Purged perist db");
102  }
103  }
104  else
105  {
106  if ((_fod = open(_dbFname.c_str(), O_RDWR | O_BINARY)) < 0)
107  {
108  glout_error << "Error: opening existing database: " << _dbFname << " (" << strerror(errno) << ')';
109  return false;
110  }
111  if ((_iod = open(_dbIname.c_str(), O_RDWR | O_BINARY)) < 0)
112  {
113  glout_error << "Error: opening existing database index: " << _dbIname << " (" << strerror(errno) << ')';
114  return false;
115  }
116 
117  IPrec iprec;
118  while (true)
119  {
120  const ssize_t blrd(read(_iod, static_cast<void *>(&iprec), sizeof(IPrec)));
121  if (blrd < 0)
122  {
123  glout_error << "Error: reading existing database index: " << _dbIname << " (" << strerror(errno) << ')';
124  return false;
125  }
126  else if (blrd == 0)
127  break; // eof
128 
129  if (iprec._seq == 0)
130  {
131  glout_info << iprec;
132  }
133 
134  if (!_index.insert({iprec._seq, iprec._prec}).second)
135  {
136  glout_warn << "Warning: inserting index record into database index: " << _dbIname << " (" << iprec << "). Ignoring.";
137  }
138  }
139 
140  if (_index.size())
141  {
142  glout_info << "Database " << _dbFname << " indexed " << _index.size() << " records.";
143  }
144 
145  unsigned last;
146  if (get_last_seqnum(last))
147  {
148  glout_info << _dbFname << ": Last sequence is " << last;
149  }
150  }
151 
152  return _opened = true;
153 }
virtual bool purge()
Definition: persist.hpp:139
#define O_BINARY
Definition: persist.hpp:350
#define glout_error
Definition: logger.hpp:606
uint32_t _seq
Definition: persist.hpp:383
#define glout_warn
Definition: logger.hpp:604
static const int max_rotation
Definition: logger.hpp:173
#define glout_info
Definition: logger.hpp:601
F8API std::string & CheckAddTrailingSlash(std::string &source)
virtual F8API unsigned get_last_seqnum(unsigned &to) const
bool exist(const std::string &fname)
Definition: f8utils.hpp:1068
std::string f8String
Definition: f8types.hpp:47
bool FilePersister::put ( const unsigned  seqnum,
const f8String what 
)
virtual

Persist a message.

Parameters
seqnumsequence number of message
whatmessage string
Returns
true on success

Implements FIX8::Persister.

Definition at line 249 of file filepersist.cpp.

References FIX8::IPrec::_prec, and glout_error.

250 {
251  if (!_opened || !seqnum)
252  return false;
253 
254  if (_index.find(seqnum) != _index.end())
255  {
256  glout_error << "Error: seqnum " << seqnum << " already persisted in: " << _dbIname;
257  return false;
258  }
259  if (lseek(_iod, 0, SEEK_END) < 0)
260  {
261  glout_error << "Error: could not seek to index end for seqnum persitence: " << _dbIname;
262  return false;
263  }
264  off_t offset;
265  if ((offset = lseek(_fod, 0, SEEK_END)) < 0)
266  {
267  glout_error << "Error: could not seek to end for seqnum persitence: " << _dbFname;
268  return false;
269  }
270  IPrec iprec(seqnum, offset, static_cast<unsigned>(what.size()));
271  if (write (_iod, static_cast<void *>(&iprec), sizeof(IPrec)) != sizeof(IPrec))
272  {
273  glout_error << "Error: could not write index record for seqnum " << seqnum << " to: " << _dbIname;
274  return false;
275  }
276  if (write (_fod, what.data(), static_cast<unsigned>(what.size())) != static_cast<ssize_t>(what.size()))
277  {
278  glout_error << "Error: could not write record for seqnum " << seqnum << " to: " << _dbFname;
279  return false;
280  }
281 
282  return _index.insert({seqnum, iprec._prec}).second;
283 }
#define glout_error
Definition: logger.hpp:606
bool FilePersister::put ( const unsigned  sender_seqnum,
const unsigned  target_seqnum 
)
virtual

Persist a sequence control record.

Parameters
sender_seqnumsequence number of last sent message
target_seqnumsequence number of last received message
Returns
true on success

Implements FIX8::Persister.

Definition at line 229 of file filepersist.cpp.

References FIX8::IPrec::_prec, and glout_error.

230 {
231  if (!_opened)
232  return false;
233  IPrec iprec(0, sender_seqnum, target_seqnum);
234  Index::iterator itr(_index.find(0));
235  if (itr == _index.end())
236  _index.insert({0, iprec._prec});
237  else
238  itr->second = iprec._prec;
239 
240  if (lseek(_iod, 0, SEEK_SET) < 0)
241  {
242  glout_error << "Error: could not seek to 0 for seqnum persitence: " << _dbIname;
243  return false;
244  }
245  return write (_iod, static_cast<void *>(&iprec), sizeof(IPrec)) == sizeof(IPrec);
246 }
#define glout_error
Definition: logger.hpp:606

Member Data Documentation

f8String FIX8::FilePersister::_dbFname
private

Definition at line 393 of file persist.hpp.

f8String FIX8::FilePersister::_dbIname
private

Definition at line 393 of file persist.hpp.

int FIX8::FilePersister::_fod
private

Definition at line 394 of file persist.hpp.

Index FIX8::FilePersister::_index
private

Definition at line 399 of file persist.hpp.

int FIX8::FilePersister::_iod
private

Definition at line 394 of file persist.hpp.

unsigned FIX8::FilePersister::_rotnum
private

Definition at line 395 of file persist.hpp.

bool FIX8::FilePersister::_wasCreated
private

Definition at line 396 of file persist.hpp.


The documentation for this class was generated from the following files: