37 #ifndef FIX8_PERSIST_HPP_
38 #define FIX8_PERSIST_HPP_
40 #if defined FIX8_HAVE_BDB
43 #if defined HAVE_LIBMEMCACHED
44 # include <libmemcached/memcached.h>
46 #if defined FIX8_HAVE_LIBHIREDIS
47 # include <hiredis/hiredis.h>
77 virtual bool put(
const unsigned seqnum,
const f8String& what) = 0;
90 virtual bool put(
const unsigned sender_seqnum,
const unsigned target_seqnum) = 0;
96 virtual bool get(
const unsigned seqnum,
f8String& to)
const = 0;
117 virtual unsigned get(
const unsigned from,
const unsigned to,
Session& session,
129 virtual bool get(
unsigned& sender_seqnum,
unsigned& target_seqnum)
const = 0;
139 virtual bool purge() {
return true; }
146 #if defined FIX8_HAVE_BDB
149 class BDBPersister :
public Persister
151 f8_thread<BDBPersister> _thread;
163 char char_[
sizeof(unsigned)];
165 Ubuf(
const unsigned val) : int_(val) {}
166 Ubuf(
const Ubuf& from) : int_(from.int_) {}
169 unsigned dataBufLen_;
170 char dataBuf_[MaxMsgLen];
172 KeyDataBuffer() : keyBuf_(), dataBufLen_(), dataBuf_() {}
173 KeyDataBuffer(
const unsigned ival) : keyBuf_(ival), dataBufLen_(), dataBuf_() {}
174 KeyDataBuffer(
const unsigned ival,
const f8String& src) : keyBuf_(ival), dataBuf_()
175 { src.copy(dataBuf_, dataBufLen_ = src.size() > MaxMsgLen ? MaxMsgLen : src.size()); }
176 KeyDataBuffer(
const unsigned snd,
const unsigned trg) : keyBuf_(), dataBufLen_(2 * sizeof(unsigned))
178 unsigned *loc(reinterpret_cast<unsigned *>(dataBuf_));
182 KeyDataBuffer(
const KeyDataBuffer& from) : keyBuf_(from.keyBuf_), dataBufLen_(from.dataBufLen_)
183 { memcpy(dataBuf_, from.dataBuf_, dataBufLen_); }
185 bool empty()
const {
return dataBufLen_ == 0 && keyBuf_.int_ == 0; }
192 KeyDataPair(KeyDataBuffer& buf)
193 : _key(buf.keyBuf_.char_, sizeof(unsigned)), _data(buf.dataBuf_, buf.dataBufLen_)
195 _key.set_flags(DB_DBT_USERMEM);
196 _key.set_ulen(
sizeof(
unsigned));
197 _data.set_flags(DB_DBT_USERMEM);
198 _data.set_ulen(MaxMsgLen);
202 static int bt_compare_fcn(Db *db,
const Dbt *p1,
const Dbt *p2)
206 const unsigned& a((*reinterpret_cast<KeyDataBuffer *>(p1->get_data())).keyBuf_.int_);
207 const unsigned& b((*reinterpret_cast<KeyDataBuffer *>(p2->get_data())).keyBuf_.int_);
209 return a < b ? -1 : a > b ? 1 : 0;
214 bool write(
const KeyDataBuffer& what)
216 return _persist_queue.try_push(what);
219 f8_thread_cancellation_token _cancellation_token;
223 BDBPersister() : _thread(
std::ref(*this)), _dbEnv(0), _db(new Db(&_dbEnv, 0)), _wasCreated() {}
225 virtual ~BDBPersister();
238 F8API virtual unsigned find_nearest_highest_seqnum (
const unsigned requested,
const unsigned last)
const;
244 F8API virtual bool put(
const unsigned seqnum,
const f8String& what);
250 F8API virtual bool put(
const unsigned sender_seqnum,
const unsigned target_seqnum);
256 F8API virtual bool get(
const unsigned seqnum,
f8String& to)
const;
264 virtual unsigned get(
const unsigned from,
const unsigned to, Session& session,
270 F8API virtual unsigned get_last_seqnum(
unsigned& to)
const;
276 F8API virtual bool get(
unsigned& sender_seqnum,
unsigned& target_seqnum)
const;
279 void stop() { write(KeyDataBuffer()); _thread.join(); }
283 F8API int operator()();
285 f8_thread_cancellation_token& cancellation_token() {
return _cancellation_token; }
288 #endif // FIX8_HAVE_BDB
294 using Store = std::map<unsigned, const f8String>;
313 F8API virtual bool put(
const unsigned sender_seqnum,
const unsigned target_seqnum);
319 F8API virtual bool get(
const unsigned seqnum,
f8String& to)
const;
327 F8API virtual unsigned get(
const unsigned from,
const unsigned to,
Session& session,
339 F8API virtual bool get(
unsigned& sender_seqnum,
unsigned& target_seqnum)
const;
354 #pragma pack(push, 1)
375 {
return os <<
"offset:" << what.
_offset <<
" size:" << what.
_size; }
380 IPrec(
const uint32_t seq,
const off_t offset,
const int32_t size)
387 {
return os <<
"seq:" << what.
_seq <<
' ' << what.
_prec; }
398 using Index = std::map<uint32_t, Prec>;
403 FilePersister(
unsigned rotnum=0) : _fod(-1), _iod(-1), _rotnum(rotnum), _wasCreated() {}
425 F8API virtual bool put(
const unsigned sender_seqnum,
const unsigned target_seqnum);
431 F8API virtual bool get(
const unsigned seqnum,
f8String& to)
const;
439 F8API virtual unsigned get(
const unsigned from,
const unsigned to,
Session& session,
451 F8API virtual bool get(
unsigned& sender_seqnum,
unsigned& target_seqnum)
const;
461 #if defined HAVE_LIBMEMCACHED
462 class MemcachedPersister :
public Persister
465 memcached_st *_cache =
nullptr;
468 unsigned _server_count = 0;
472 MemcachedPersister() =
default;
475 F8API virtual ~MemcachedPersister();
482 F8API virtual bool initialise(
const f8String& config_str,
const f8String& key_base,
bool purge=
false);
488 F8API virtual bool put(
const unsigned seqnum,
const f8String& what);
494 F8API virtual bool put(
const unsigned sender_seqnum,
const unsigned target_seqnum);
500 F8API virtual bool get(
const unsigned seqnum,
f8String& to)
const;
508 F8API virtual unsigned get(
const unsigned from,
const unsigned to, Session& session,
514 F8API virtual unsigned get_last_seqnum(
unsigned& to)
const;
520 F8API virtual bool get(
unsigned& sender_seqnum,
unsigned& target_seqnum)
const;
526 F8API virtual unsigned find_nearest_highest_seqnum (
const unsigned requested,
const unsigned last)
const;
531 F8API virtual unsigned find_nearest_seqnum (
unsigned requested)
const;
549 bool get_from_cache(
const std::string &key, std::string &target)
const
552 memcached_return_t rc;
555 char *value(memcached_get(_cache, key.c_str(), key.size(), &value_length, &flags, &rc));
558 target.reserve(value_length);
559 target.assign(value, value + value_length);
571 bool put_to_cache(
const std::string &key,
const std::string &source)
573 return memcached_success(memcached_set(_cache, key.c_str(), key.size(), source.c_str(), source.size(), 0, 0));
579 const std::string generate_seq_key(
unsigned seqnum)
const
581 std::ostringstream ostr;
582 ostr << _key_base <<
':' << seqnum;
590 static std::string generate_ctrl_record(
unsigned sender_seqnum,
unsigned target_seqnum)
592 std::ostringstream ostr;
593 ostr << sender_seqnum <<
':' << target_seqnum;
602 static bool extract_ctrl_record(
const std::string& source,
unsigned &sender_seqnum,
unsigned &target_seqnum)
604 std::istringstream istr(source);
605 istr >> sender_seqnum;
607 istr >> target_seqnum;
612 #endif // HAVE_LIBMEMCACHED
615 #if defined FIX8_HAVE_LIBHIREDIS
616 class HiredisPersister :
public Persister
619 redisContext *_cache =
nullptr;
625 HiredisPersister() =
default;
628 F8API virtual ~HiredisPersister();
638 const f8String& key_base,
bool purge=
false);
644 F8API virtual bool put(
const unsigned seqnum,
const f8String& what);
650 F8API virtual bool put(
const unsigned sender_seqnum,
const unsigned target_seqnum);
656 F8API virtual bool get(
const unsigned seqnum,
f8String& to)
const;
664 F8API virtual unsigned get(
const unsigned from,
const unsigned to, Session& session,
670 F8API virtual unsigned get_last_seqnum(
unsigned& to)
const;
676 F8API virtual bool get(
unsigned& sender_seqnum,
unsigned& target_seqnum)
const;
682 F8API virtual unsigned find_nearest_highest_seqnum (
const unsigned requested,
const unsigned last)
const;
702 #endif // FIX8_HAVE_LIBHIREDIS
708 #endif // FIX8_PERSIST_HPP_
std::map< uint32_t, Prec > Index
virtual unsigned find_nearest_highest_seqnum(const unsigned requested, const unsigned last) const =0
friend std::ostream & operator<<(std::ostream &os, const IPrec &what)
virtual F8API unsigned find_nearest_highest_seqnum(const unsigned requested, const unsigned last) const
FilePersister(unsigned rotnum=0)
Ctor.
std::map< unsigned, const f8String > Store
Fix8 Base Session. User sessions are derived from this class.
virtual F8API unsigned get_last_seqnum(unsigned &to) const
virtual bool del(const f8String &key)
std::pair< const unsigned, const f8String > SequencePair
virtual F8API bool initialise(const f8String &dbDir, const f8String &dbFname, bool purge=false)
Base (ABC) Persister class.
virtual F8API bool put(const unsigned seqnum, const f8String &what)
Provides context to your retrans handler.
virtual F8API bool put(const unsigned seqnum, const f8String &what)
virtual ~Persister()
Dtor.
virtual bool put(const f8String &key, const f8String &what)
Persister & operator=(const Persister &)=delete
Prec(const off_t offset, const int32_t size)
virtual unsigned get_last_seqnum(unsigned &to) const =0
IPrec(const uint32_t seq, const off_t offset, const int32_t size)
virtual bool put(const unsigned seqnum, const f8String &what)=0
#define FIX8_MAX_MSG_LENGTH
virtual void stop()
Stop the persister thread.
friend std::ostream & operator<<(std::ostream &os, const Prec &what)
tbb::concurrent_bounded_queue< T > f8_concurrent_queue
Memory based message persister.
virtual ~MemoryPersister()
Dtor.
Prec & operator=(const Prec &that)
virtual F8API ~FilePersister()
Dtor.
virtual F8API unsigned find_nearest_highest_seqnum(const unsigned requested, const unsigned last) const
virtual F8API unsigned get_last_seqnum(unsigned &to) const