fix8  version 1.4.0
Open Source C++ FIX Framework
persist.cpp
Go to the documentation of this file.
1 //-----------------------------------------------------------------------------------------
2 /*
3 
4 Fix8 is released under the GNU LESSER GENERAL PUBLIC LICENSE Version 3.
5 
6 Fix8 Open Source FIX Engine.
7 Copyright (C) 2010-16 David L. Dight <fix@fix8.org>
8 
9 Fix8 is free software: you can redistribute it and / or modify it under the terms of the
10 GNU Lesser General Public License as published by the Free Software Foundation, either
11 version 3 of the License, or (at your option) any later version.
12 
13 Fix8 is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
14 even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15 
16 You should have received a copy of the GNU Lesser General Public License along with Fix8.
17 If not, see <http://www.gnu.org/licenses/>.
18 
19 BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY FOR THE PROGRAM, TO
20 THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE
21 COPYRIGHT HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY
22 KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS TO
24 THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE,
25 YOU ASSUME THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
26 
27 IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING WILL ANY COPYRIGHT
28 HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR REDISTRIBUTE THE PROGRAM AS PERMITTED
29 ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR
30 CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT
31 NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR
32 THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF SUCH
33 HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
34 
35 */
36 //-----------------------------------------------------------------------------------------
37 #include "precomp.hpp"
38 #include <fix8/f8includes.hpp>
39 
40 //-------------------------------------------------------------------------------------------------
41 using namespace FIX8;
42 using namespace std;
43 
44 //-------------------------------------------------------------------------------------------------
46 
47 //-------------------------------------------------------------------------------------------------
48 #if defined FIX8_HAVE_BDB
49 
50 bool BDBPersister::initialise(const f8String& dbDir, const f8String& dbFname, bool purge)
51 {
52  if (_opened)
53  return true;
54 
55  _dbDir = dbDir;
56  _dbFname = dbFname;
57 
58  // Use concurrent db and default shared memory pool
59  _dbEnv.open(_dbDir.c_str(), DB_CREATE | DB_INIT_MPOOL | DB_INIT_CDB | DB_THREAD, 0);
60 
61  bool notFound(false);
62 
63  if (!purge)
64  {
65  try
66  {
67  _db->set_bt_compare(bt_compare_fcn);
68  _db->open(0, _dbFname.c_str(), 0, DB_BTREE, DB_THREAD, 0); // try and open existing if possible
69  unsigned last;
70  if (get_last_seqnum(last))
71  glout_info << _dbFname << ": Last sequence is " << last;
72  }
73  catch(DbException& dbe)
74  {
75  switch (dbe.get_errno())
76  {
77  case ENOENT:
78  case EACCES:
79  notFound = true;
80  break;
81  default:
82  glout_error << "Error: opening existing database: " << dbe.what() << " (" << dbe.get_errno() << ')';
83  return false;
84  }
85  }
86  }
87 
88  if (notFound || purge) // create a new one
89  {
90  try
91  {
92  _db->open(0, _dbFname.c_str(), 0, DB_BTREE, DB_CREATE | DB_THREAD, 0);
93  if (purge)
94  {
95  _db->truncate(0, 0, 0);
96  glout_info << "Purged perist db";
97  }
98  }
99  catch(DbException& dbe)
100  {
101  glout_error << "Error: creating new database: " << dbe.what() << " (" << dbe.get_errno() << ')';
102  return false;
103  }
104 
105  _wasCreated = true;
106  }
107 
108  _thread.start();
109  return _opened = true;
110 
111 }
112 
113 //-------------------------------------------------------------------------------------------------
114 BDBPersister::~BDBPersister()
115 {
116  stop();
117  if (_opened)
118  _db->close(0);
119  delete _db;
120  _dbEnv.close(0);
121 
122  //cout << "BDBPersister::~BDBPersister()" << endl;
123 }
124 
125 //-------------------------------------------------------------------------------------------------
126 unsigned BDBPersister::get_last_seqnum(unsigned& sequence) const
127 {
128  Dbc *cursorp;
129  _db->cursor (0, &cursorp, 0);
130 
131  KeyDataBuffer buffer;
132  KeyDataPair keyPair(buffer);
133  int retval(cursorp->get(&keyPair._key, &keyPair._data, DB_LAST));
134  cursorp->close();
135  if (retval)
136  {
137  glout_warn << "last record not found (" << db_strerror(retval) << ')';
138  return 0;
139  }
140  return sequence = buffer.keyBuf_.int_;
141 }
142 
143 //-------------------------------------------------------------------------------------------------
144 unsigned BDBPersister::get(const unsigned from, const unsigned to, Session& session,
145  bool (Session::*callback)(const Session::SequencePair& with, Session::RetransmissionContext& rctx)) const
146 {
147  unsigned last_seq(0);
148  get_last_seqnum(last_seq);
149  unsigned recs_sent(0), startSeqNum(find_nearest_highest_seqnum (from, last_seq));
150  const unsigned finish(to == 0 ? last_seq : to);
151  Session::RetransmissionContext rctx(from, to, session.get_next_send_seq());
152 
153  if (!startSeqNum || from > finish)
154  {
155  glout_warn << "No records found";
156  rctx._no_more_records = true;
157  (session.*callback)(Session::SequencePair(0, ""), rctx);
158  return 0;
159  }
160 
161  KeyDataBuffer buffer(startSeqNum);
162  KeyDataPair keyPair(buffer);
163  Dbc *cursorp;
164  _db->cursor (0, &cursorp, 0);
165  int retval;
166 
167  if ((retval = cursorp->get(&keyPair._key, &keyPair._data, DB_SET)) == 0)
168  {
169  do
170  {
171  const unsigned seqnum(buffer.keyBuf_.int_);
172  if (!seqnum || seqnum > finish)
173  break;
174  Session::SequencePair result(seqnum, buffer.dataBuf_);
175  ++recs_sent;
176  if (!(session.*callback)(result, rctx))
177  break;
178  }
179  while(cursorp->get(&keyPair._key, &keyPair._data, DB_NEXT) == 0);
180 
181  rctx._no_more_records = true;
182  (session.*callback)(Session::SequencePair(0, ""), rctx);
183  }
184  else
185  glout_warn << "record not found (" << db_strerror(retval) << ')';
186  cursorp->close();
187 
188  return recs_sent;
189 }
190 
191 //-------------------------------------------------------------------------------------------------
192 bool BDBPersister::put(const unsigned sender_seqnum, const unsigned target_seqnum)
193 {
194  if (!_opened)
195  return false;
196  KeyDataBuffer buffer(sender_seqnum, target_seqnum);
197  return write(buffer);
198 }
199 
200 //-------------------------------------------------------------------------------------------------
201 bool BDBPersister::put(const unsigned seqnum, const f8String& what)
202 {
203  if (!_opened || !seqnum)
204  return false;
205  KeyDataBuffer buffer(seqnum, what);
206  return write(buffer);
207 }
208 
209 //-------------------------------------------------------------------------------------------------
210 bool BDBPersister::get(unsigned& sender_seqnum, unsigned& target_seqnum) const
211 {
212  if (!_opened)
213  return false;
214  KeyDataBuffer buffer(0, 0);
215  KeyDataPair keyPair(buffer);
216  int retval(_db->get(0, &keyPair._key, &keyPair._data, 0));
217  if (retval)
218  {
219  glout_error << "Could not get control 0 " << '(' << db_strerror(retval) << ')';
220  return false;
221  }
222  unsigned *loc(reinterpret_cast<unsigned *>(buffer.dataBuf_));
223  sender_seqnum = *loc++;
224  target_seqnum = *loc;
225  return true;
226 }
227 
228 //-------------------------------------------------------------------------------------------------
229 bool BDBPersister::get(const unsigned seqnum, f8String& to) const
230 {
231  if (!_opened || !seqnum)
232  return false;
233  KeyDataBuffer buffer(seqnum);
234  KeyDataPair keyPair(buffer);
235  int retval(_db->get(0, &keyPair._key, &keyPair._data, 0));
236  if (retval)
237  {
238  glout_error << "Could not get " << seqnum << '(' << db_strerror(retval) << ')';
239  return false;
240  }
241  to.assign(buffer.dataBuf_);
242  return true;
243 }
244 
245 //---------------------------------------------------------------------------------------------------
246 unsigned BDBPersister::find_nearest_highest_seqnum (const unsigned requested, const unsigned last) const
247 {
248  if (_opened && last)
249  {
250  for (unsigned startseqnum(requested); startseqnum <= last; ++startseqnum)
251  {
252  KeyDataBuffer buffer(startseqnum);
253  KeyDataPair keyPair(buffer);
254  if (_db->get(0, &keyPair._key, &keyPair._data, 0) == 0)
255  return startseqnum;
256  }
257  }
258 
259  return 0;
260 }
261 
262 //-------------------------------------------------------------------------------------------------
263 int BDBPersister::operator()()
264 {
265  unsigned received(0), persisted(0);
266  bool stopping(false);
267 
268  for (;!_cancellation_token;)
269  {
270  KeyDataBuffer *msg_ptr(0);
271 
272 #if (FIX8_MPMC_SYSTEM == FIX8_MPMC_TBB)
273  KeyDataBuffer buffer;
274  if (stopping) // make sure we dequeue any pending msgs before exiting
275  {
276  if (!_persist_queue.try_pop(buffer))
277  break;
278  }
279  else
280  _persist_queue.pop (buffer); // will block
281  msg_ptr = &buffer;
282 
283  if (buffer.empty()) // means exit
284  {
285  stopping = true;
286  continue;
287  }
288 #else
289  _persist_queue.pop(msg_ptr); // will block
290  if (msg_ptr->empty()) // means exit
291  break;
292 #endif
293  //cout << "persisted..." << endl;
294 
295  ++received;
296 
297  if (msg_ptr)
298  {
299  KeyDataPair keyPair(*msg_ptr);
300  int retval(_db->put(0, &keyPair._key, &keyPair._data, 0)); // will overwrite if found
301  if (retval)
302  glout_error << "Could not add" << '(' << db_strerror(retval) << ')';
303  else
304  ++persisted;
305  }
306 #if (FIX8_MPMC_SYSTEM == FIX8_MPMC_FF)
307  _persist_queue.release(msg_ptr);
308 #endif
309  }
310 
311  //cout << "persister()() exited..." << endl;
312 
313  glout_info << received << " messages received, " << persisted << " messages persisted";
314 
315  return 0;
316 }
317 
318 #endif // FIX8_HAVE_BDB
319 
320 //-------------------------------------------------------------------------------------------------
321 //-------------------------------------------------------------------------------------------------
322 unsigned MemoryPersister::get(const unsigned from, const unsigned to, Session& session,
323  bool (Session::*callback)(const Session::SequencePair& with, Session::RetransmissionContext& rctx)) const
324 {
325  unsigned last_seq(0);
326  get_last_seqnum(last_seq);
327  unsigned recs_sent(0), startSeqNum(find_nearest_highest_seqnum (from, last_seq));
328  const unsigned finish(to == 0 ? last_seq : to);
329  Session::RetransmissionContext rctx(from, to, session.get_next_send_seq());
330 
331  if (!startSeqNum || from > finish)
332  {
333  glout_warn << "No records found";
334  rctx._no_more_records = true;
335  (session.*callback)(Session::SequencePair(0, ""), rctx);
336  return 0;
337  }
338 
339  Store::const_iterator itr(_store.find(startSeqNum));
340  if (itr != _store.end())
341  {
342  do
343  {
344  if (!itr->first || itr->first > finish)
345  break;
346  Session::SequencePair result(itr->first, itr->second);
347  ++recs_sent;
348  if (!(session.*callback)(result, rctx))
349  break;
350  }
351  while(++itr != _store.end());
352 
353  Session::SequencePair result(0, "");
354  rctx._no_more_records = true;
355  (session.*callback)(result, rctx);
356  }
357  else
358  glout_error << "record not found (" << startSeqNum << ')';
359 
360  return recs_sent;
361 }
362 
363 //-------------------------------------------------------------------------------------------------
364 bool MemoryPersister::put(const unsigned sender_seqnum, const unsigned target_seqnum)
365 {
366  const unsigned arr[2] { sender_seqnum, target_seqnum };
367  return _store.insert({0, f8String(reinterpret_cast<const char *>(arr), sizeof(arr))}).second;
368 }
369 
370 //-------------------------------------------------------------------------------------------------
371 bool MemoryPersister::put(const unsigned seqnum, const f8String& what)
372 {
373  return !seqnum ? false : _store.insert({seqnum, what}).second;
374 }
375 
376 //-------------------------------------------------------------------------------------------------
377 bool MemoryPersister::get(unsigned& sender_seqnum, unsigned& target_seqnum) const
378 {
379  Store::const_iterator itr(_store.find(0));
380  if (itr == _store.end())
381  return false;
382  const unsigned *loc(reinterpret_cast<const unsigned *>(&itr->second));
383  sender_seqnum = *loc++;
384  target_seqnum = *loc;
385  return true;
386 }
387 
388 //-------------------------------------------------------------------------------------------------
389 bool MemoryPersister::get(const unsigned seqnum, f8String& to) const
390 {
391  if (!seqnum)
392  return false;
393  Store::const_iterator itr(_store.find(seqnum));
394  if (itr == _store.end())
395  return false;
396  to = itr->second;
397  return true;
398 }
399 
400 //---------------------------------------------------------------------------------------------------
401 unsigned MemoryPersister::find_nearest_highest_seqnum (const unsigned requested, const unsigned last) const
402 {
403  if (last)
404  {
405  for (unsigned startseqnum(requested); startseqnum <= last; ++startseqnum)
406  {
407  Store::const_iterator itr(_store.find(startseqnum));
408  if (itr != _store.end())
409  return itr->first;
410  }
411  }
412 
413  return 0;
414 }
415 
416 //---------------------------------------------------------------------------------------------------
417 unsigned MemoryPersister::get_last_seqnum(unsigned& to) const
418 {
419  return to = (_store.empty() ? 0 : _store.rbegin()->first);
420 }
virtual F8API unsigned find_nearest_highest_seqnum(const unsigned requested, const unsigned last) const
Definition: persist.cpp:401
Fix8 Base Session. User sessions are derived from this class.
Definition: session.hpp:394
virtual F8API unsigned get_last_seqnum(unsigned &to) const
Definition: persist.cpp:417
std::pair< const unsigned, const f8String > SequencePair
Definition: session.hpp:615
virtual F8API bool put(const unsigned seqnum, const f8String &what)
Definition: persist.cpp:371
Provides context to your retrans handler.
Definition: session.hpp:598
const size_t max_global_filename_length(1024)
#define glout_error
Definition: logger.hpp:606
unsigned get_next_send_seq() const
Definition: session.hpp:750
#define glout_warn
Definition: logger.hpp:604
#define glout_info
Definition: logger.hpp:601
F8API char glob_log0[max_global_filename_length]
Definition: logger.cpp:46
virtual F8API bool get(const unsigned seqnum, f8String &to) const
Definition: persist.cpp:389
std::string f8String
Definition: f8types.hpp:47