48 #if defined FIX8_HAVE_BDB
50 bool BDBPersister::initialise(
const f8String& dbDir,
const f8String& dbFname,
bool purge)
59 _dbEnv.open(_dbDir.c_str(), DB_CREATE | DB_INIT_MPOOL | DB_INIT_CDB | DB_THREAD, 0);
67 _db->set_bt_compare(bt_compare_fcn);
68 _db->open(0, _dbFname.c_str(), 0, DB_BTREE, DB_THREAD, 0);
70 if (get_last_seqnum(last))
71 glout_info << _dbFname <<
": Last sequence is " << last;
73 catch(DbException& dbe)
75 switch (dbe.get_errno())
82 glout_error <<
"Error: opening existing database: " << dbe.what() <<
" (" << dbe.get_errno() <<
')';
88 if (notFound || purge)
92 _db->open(0, _dbFname.c_str(), 0, DB_BTREE, DB_CREATE | DB_THREAD, 0);
95 _db->truncate(0, 0, 0);
99 catch(DbException& dbe)
101 glout_error <<
"Error: creating new database: " << dbe.what() <<
" (" << dbe.get_errno() <<
')';
109 return _opened =
true;
114 BDBPersister::~BDBPersister()
126 unsigned BDBPersister::get_last_seqnum(
unsigned& sequence)
const
129 _db->cursor (0, &cursorp, 0);
131 KeyDataBuffer buffer;
132 KeyDataPair keyPair(buffer);
133 int retval(cursorp->get(&keyPair._key, &keyPair._data, DB_LAST));
137 glout_warn <<
"last record not found (" << db_strerror(retval) <<
')';
140 return sequence = buffer.keyBuf_.int_;
144 unsigned BDBPersister::get(
const unsigned from,
const unsigned to,
Session& session,
147 unsigned last_seq(0);
148 get_last_seqnum(last_seq);
149 unsigned recs_sent(0), startSeqNum(find_nearest_highest_seqnum (from, last_seq));
150 const unsigned finish(to == 0 ? last_seq : to);
153 if (!startSeqNum || from > finish)
161 KeyDataBuffer buffer(startSeqNum);
162 KeyDataPair keyPair(buffer);
164 _db->cursor (0, &cursorp, 0);
167 if ((retval = cursorp->get(&keyPair._key, &keyPair._data, DB_SET)) == 0)
171 const unsigned seqnum(buffer.keyBuf_.int_);
172 if (!seqnum || seqnum > finish)
176 if (!(session.*callback)(result, rctx))
179 while(cursorp->get(&keyPair._key, &keyPair._data, DB_NEXT) == 0);
185 glout_warn <<
"record not found (" << db_strerror(retval) <<
')';
192 bool BDBPersister::put(
const unsigned sender_seqnum,
const unsigned target_seqnum)
196 KeyDataBuffer buffer(sender_seqnum, target_seqnum);
197 return write(buffer);
201 bool BDBPersister::put(
const unsigned seqnum,
const f8String& what)
203 if (!_opened || !seqnum)
205 KeyDataBuffer buffer(seqnum, what);
206 return write(buffer);
210 bool BDBPersister::get(
unsigned& sender_seqnum,
unsigned& target_seqnum)
const
214 KeyDataBuffer buffer(0, 0);
215 KeyDataPair keyPair(buffer);
216 int retval(_db->get(0, &keyPair._key, &keyPair._data, 0));
219 glout_error <<
"Could not get control 0 " <<
'(' << db_strerror(retval) <<
')';
222 unsigned *loc(reinterpret_cast<unsigned *>(buffer.dataBuf_));
223 sender_seqnum = *loc++;
224 target_seqnum = *loc;
229 bool BDBPersister::get(
const unsigned seqnum,
f8String& to)
const
231 if (!_opened || !seqnum)
233 KeyDataBuffer buffer(seqnum);
234 KeyDataPair keyPair(buffer);
235 int retval(_db->get(0, &keyPair._key, &keyPair._data, 0));
238 glout_error <<
"Could not get " << seqnum <<
'(' << db_strerror(retval) <<
')';
241 to.assign(buffer.dataBuf_);
246 unsigned BDBPersister::find_nearest_highest_seqnum (
const unsigned requested,
const unsigned last)
const
250 for (
unsigned startseqnum(requested); startseqnum <= last; ++startseqnum)
252 KeyDataBuffer buffer(startseqnum);
253 KeyDataPair keyPair(buffer);
254 if (_db->get(0, &keyPair._key, &keyPair._data, 0) == 0)
263 int BDBPersister::operator()()
265 unsigned received(0), persisted(0);
266 bool stopping(
false);
268 for (;!_cancellation_token;)
270 KeyDataBuffer *msg_ptr(0);
272 #if (FIX8_MPMC_SYSTEM == FIX8_MPMC_TBB)
273 KeyDataBuffer buffer;
276 if (!_persist_queue.try_pop(buffer))
280 _persist_queue.pop (buffer);
289 _persist_queue.pop(msg_ptr);
290 if (msg_ptr->empty())
299 KeyDataPair keyPair(*msg_ptr);
300 int retval(_db->put(0, &keyPair._key, &keyPair._data, 0));
302 glout_error <<
"Could not add" <<
'(' << db_strerror(retval) <<
')';
306 #if (FIX8_MPMC_SYSTEM == FIX8_MPMC_FF)
307 _persist_queue.release(msg_ptr);
313 glout_info << received <<
" messages received, " << persisted <<
" messages persisted";
318 #endif // FIX8_HAVE_BDB
325 unsigned last_seq(0);
326 get_last_seqnum(last_seq);
327 unsigned recs_sent(0), startSeqNum(find_nearest_highest_seqnum (from, last_seq));
328 const unsigned finish(to == 0 ? last_seq : to);
331 if (!startSeqNum || from > finish)
339 Store::const_iterator itr(_store.find(startSeqNum));
340 if (itr != _store.end())
344 if (!itr->first || itr->first > finish)
348 if (!(session.*callback)(result, rctx))
351 while(++itr != _store.end());
355 (session.*callback)(result, rctx);
358 glout_error <<
"record not found (" << startSeqNum <<
')';
366 const unsigned arr[2] { sender_seqnum, target_seqnum };
367 return _store.insert({0,
f8String(reinterpret_cast<const char *>(arr),
sizeof(arr))}).second;
373 return !seqnum ?
false : _store.insert({seqnum, what}).second;
379 Store::const_iterator itr(_store.find(0));
380 if (itr == _store.end())
382 const unsigned *loc(reinterpret_cast<const unsigned *>(&itr->second));
383 sender_seqnum = *loc++;
384 target_seqnum = *loc;
393 Store::const_iterator itr(_store.find(seqnum));
394 if (itr == _store.end())
405 for (
unsigned startseqnum(requested); startseqnum <= last; ++startseqnum)
407 Store::const_iterator itr(_store.find(startseqnum));
408 if (itr != _store.end())
419 return to = (_store.empty() ? 0 : _store.rbegin()->first);
virtual F8API unsigned find_nearest_highest_seqnum(const unsigned requested, const unsigned last) const
Fix8 Base Session. User sessions are derived from this class.
virtual F8API unsigned get_last_seqnum(unsigned &to) const
std::pair< const unsigned, const f8String > SequencePair
virtual F8API bool put(const unsigned seqnum, const f8String &what)
Provides context to your retrans handler.
const size_t max_global_filename_length(1024)
unsigned get_next_send_seq() const
F8API char glob_log0[max_global_filename_length]
virtual F8API bool get(const unsigned seqnum, f8String &to) const