fix8  version 1.4.0
Open Source C++ FIX Framework
filepersist.cpp
Go to the documentation of this file.
1 //-----------------------------------------------------------------------------------------
2 /*
3 
4 Fix8 is released under the GNU LESSER GENERAL PUBLIC LICENSE Version 3.
5 
6 Fix8 Open Source FIX Engine.
7 Copyright (C) 2010-16 David L. Dight <fix@fix8.org>
8 
9 Fix8 is free software: you can redistribute it and / or modify it under the terms of the
10 GNU Lesser General Public License as published by the Free Software Foundation, either
11 version 3 of the License, or (at your option) any later version.
12 
13 Fix8 is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
14 even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15 
16 You should have received a copy of the GNU Lesser General Public License along with Fix8.
17 If not, see <http://www.gnu.org/licenses/>.
18 
19 BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY FOR THE PROGRAM, TO
20 THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE
21 COPYRIGHT HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY
22 KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS TO
24 THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE,
25 YOU ASSUME THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
26 
27 IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING WILL ANY COPYRIGHT
28 HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR REDISTRIBUTE THE PROGRAM AS PERMITTED
29 ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR
30 CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT
31 NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR
32 THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF SUCH
33 HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
34 
35 */
36 //-----------------------------------------------------------------------------------------
37 #include "precomp.hpp"
38 #include <fix8/f8includes.hpp>
39 
40 //-------------------------------------------------------------------------------------------------
41 using namespace FIX8;
42 using namespace std;
43 
44 //-------------------------------------------------------------------------------------------------
45 bool FilePersister::initialise(const f8String& dbDir, const f8String& dbFname, bool purge)
46 {
47  if (_opened)
48  return true;
49 
50  f8String odbdir(dbDir);
51  ostringstream ostr;
52  ostr << CheckAddTrailingSlash(odbdir) << dbFname;
53  _dbFname = ostr.str();
54  ostr << ".idx";
55  _dbIname = ostr.str();
56 
57  bool nof;
58 
59  if ((nof = !exist(_dbFname)) || purge)
60  {
61  if (purge)
62  {
63  if (_rotnum > 0)
64  {
65  vector<string> dblst, idxlst;
66  dblst.push_back(_dbFname);
67  idxlst.push_back(_dbIname);
68 
69  for (unsigned ii(0); ii < _rotnum && ii < Logger::max_rotation; ++ii)
70  {
71  ostringstream ostr;
72  ostr << _dbFname << '.' << (ii + 1);
73  dblst.push_back(ostr.str());
74  ostr << ".idx";
75  idxlst.push_back(ostr.str());
76  }
77 
78  for (unsigned ii(_rotnum); ii; --ii)
79  {
80  rename (dblst[ii - 1].c_str(), dblst[ii].c_str()); // ignore errors
81  rename (idxlst[ii - 1].c_str(), idxlst[ii].c_str()); // ignore errors
82  }
83  }
84  }
85 
86  if ((_fod = open(_dbFname.c_str(), O_RDWR | O_CREAT | O_TRUNC | O_BINARY, 0600)) < 0)
87  {
88  glout_error << "Error: creating database: " << _dbFname << " (" << strerror(errno) << ')';
89  return false;
90  }
91  if ((_iod = open(_dbIname.c_str(), O_RDWR | O_CREAT | O_TRUNC | O_BINARY, 0600)) < 0)
92  {
93  glout_error << "Error: creating database index: " << _dbIname << " (" << strerror(errno) << ')';
94  return false;
95  }
96 
97  _wasCreated = true;
98 
99  if (purge && !nof)
100  {
101  glout_info << (_rotnum ? "Rotated and purged perist db" : "Purged perist db");
102  }
103  }
104  else
105  {
106  if ((_fod = open(_dbFname.c_str(), O_RDWR | O_BINARY)) < 0)
107  {
108  glout_error << "Error: opening existing database: " << _dbFname << " (" << strerror(errno) << ')';
109  return false;
110  }
111  if ((_iod = open(_dbIname.c_str(), O_RDWR | O_BINARY)) < 0)
112  {
113  glout_error << "Error: opening existing database index: " << _dbIname << " (" << strerror(errno) << ')';
114  return false;
115  }
116 
117  IPrec iprec;
118  while (true)
119  {
120  const ssize_t blrd(read(_iod, static_cast<void *>(&iprec), sizeof(IPrec)));
121  if (blrd < 0)
122  {
123  glout_error << "Error: reading existing database index: " << _dbIname << " (" << strerror(errno) << ')';
124  return false;
125  }
126  else if (blrd == 0)
127  break; // eof
128 
129  if (iprec._seq == 0)
130  {
131  glout_info << iprec;
132  }
133 
134  if (!_index.insert({iprec._seq, iprec._prec}).second)
135  {
136  glout_warn << "Warning: inserting index record into database index: " << _dbIname << " (" << iprec << "). Ignoring.";
137  }
138  }
139 
140  if (_index.size())
141  {
142  glout_info << "Database " << _dbFname << " indexed " << _index.size() << " records.";
143  }
144 
145  unsigned last;
146  if (get_last_seqnum(last))
147  {
148  glout_info << _dbFname << ": Last sequence is " << last;
149  }
150  }
151 
152  return _opened = true;
153 }
154 
155 //-------------------------------------------------------------------------------------------------
157 {
158  close(_fod);
159  close(_iod);
160 }
161 
162 //-------------------------------------------------------------------------------------------------
163 unsigned FilePersister::get_last_seqnum(unsigned& sequence) const
164 {
165  return sequence = _index.empty() ? 0 : _index.rbegin()->first;
166 }
167 
168 //-------------------------------------------------------------------------------------------------
169 unsigned FilePersister::get(const unsigned from, const unsigned to, Session& session,
170  bool (Session::*callback)(const Session::SequencePair& with, Session::RetransmissionContext& rctx)) const
171 {
172  unsigned last_seq(0);
173  get_last_seqnum(last_seq);
174  unsigned recs_sent(0), startSeqNum(find_nearest_highest_seqnum (from, last_seq));
175  const unsigned finish(to == 0 ? last_seq : to);
176  Session::RetransmissionContext rctx(from, to, session.get_next_send_seq());
177 
178  if (!startSeqNum || from > finish)
179  {
180  glout_info << "No records found";
181  rctx._no_more_records = true;
182  (session.*callback)(Session::SequencePair(0, ""), rctx);
183  return 0;
184  }
185 
186  Index::const_iterator itr(_index.find(startSeqNum));
187  if (itr != _index.end())
188  {
189  char buff[FIX8_MAX_MSG_LENGTH];
190 
191  do
192  {
193  if (!itr->first || itr->first > finish)
194  break;
195  if (lseek(_fod, itr->second._offset, SEEK_SET) < 0)
196  {
197  glout_error << "Error: could not seek to correct index location for get: " << _dbFname;
198  break;
199  }
200 
201  if (read (_fod, buff, itr->second._size) != itr->second._size)
202  {
203  glout_error << "Error: could not read message record for seqnum " << itr->first << " from: " << _dbFname;
204  break;
205  }
206 
207  Session::SequencePair txresult(itr->first, f8String(buff, itr->second._size));
208  ++recs_sent;
209  if (!(session.*callback)(txresult, rctx))
210  {
211  glout_debug << "Retransmission callback signalled an error, not sending any more records from: " << _dbFname;
212  break;
213  }
214  }
215  while(++itr != _index.end());
216 
217  rctx._no_more_records = true;
218  (session.*callback)(Session::SequencePair(0, ""), rctx);
219  }
220  else
221  {
222  glout_error << "record not found (" << startSeqNum << ')';
223  }
224 
225  return recs_sent;
226 }
227 
228 //-------------------------------------------------------------------------------------------------
229 bool FilePersister::put(const unsigned sender_seqnum, const unsigned target_seqnum)
230 {
231  if (!_opened)
232  return false;
233  IPrec iprec(0, sender_seqnum, target_seqnum);
234  Index::iterator itr(_index.find(0));
235  if (itr == _index.end())
236  _index.insert({0, iprec._prec});
237  else
238  itr->second = iprec._prec;
239 
240  if (lseek(_iod, 0, SEEK_SET) < 0)
241  {
242  glout_error << "Error: could not seek to 0 for seqnum persitence: " << _dbIname;
243  return false;
244  }
245  return write (_iod, static_cast<void *>(&iprec), sizeof(IPrec)) == sizeof(IPrec);
246 }
247 
248 //-------------------------------------------------------------------------------------------------
249 bool FilePersister::put(const unsigned seqnum, const f8String& what)
250 {
251  if (!_opened || !seqnum)
252  return false;
253 
254  if (_index.find(seqnum) != _index.end())
255  {
256  glout_error << "Error: seqnum " << seqnum << " already persisted in: " << _dbIname;
257  return false;
258  }
259  if (lseek(_iod, 0, SEEK_END) < 0)
260  {
261  glout_error << "Error: could not seek to index end for seqnum persitence: " << _dbIname;
262  return false;
263  }
264  off_t offset;
265  if ((offset = lseek(_fod, 0, SEEK_END)) < 0)
266  {
267  glout_error << "Error: could not seek to end for seqnum persitence: " << _dbFname;
268  return false;
269  }
270  IPrec iprec(seqnum, offset, static_cast<unsigned>(what.size()));
271  if (write (_iod, static_cast<void *>(&iprec), sizeof(IPrec)) != sizeof(IPrec))
272  {
273  glout_error << "Error: could not write index record for seqnum " << seqnum << " to: " << _dbIname;
274  return false;
275  }
276  if (write (_fod, what.data(), static_cast<unsigned>(what.size())) != static_cast<ssize_t>(what.size()))
277  {
278  glout_error << "Error: could not write record for seqnum " << seqnum << " to: " << _dbFname;
279  return false;
280  }
281 
282  return _index.insert({seqnum, iprec._prec}).second;
283 }
284 
285 //-------------------------------------------------------------------------------------------------
286 bool FilePersister::get(unsigned& sender_seqnum, unsigned& target_seqnum) const
287 {
288  if (!_opened)
289  return false;
290 
291  if (_index.empty())
292  {
293  glout_warn << "Warning: index is empty: " << _dbIname;
294  return false;
295  }
296 
297  Index::const_iterator itr(_index.find(0));
298  if (itr == _index.end())
299  {
300  glout_error << "Error: index does not contain control record: " << _dbIname;
301  return false;
302  }
303 
304  sender_seqnum = itr->second._offset;
305  target_seqnum = itr->second._size;
306  return true;
307 }
308 
309 //-------------------------------------------------------------------------------------------------
310 bool FilePersister::get(const unsigned seqnum, f8String& to) const
311 {
312  if (!_opened || !seqnum || _index.empty())
313  return false;
314  Index::const_iterator itr(_index.find(seqnum));
315  if (itr == _index.end())
316  {
317  glout_warn << "Warning: index does not contain seqnum: " << seqnum << " in: " << _dbIname;
318  return false;
319  }
320 
321  if (lseek(_fod, itr->second._offset, SEEK_SET) < 0)
322  {
323  glout_error << "Error: could not seek to correct index location for get: " << _dbFname;
324  return false;
325  }
326 
327  char buff[FIX8_MAX_MSG_LENGTH];
328  if (read (_fod, buff, itr->second._size) != itr->second._size)
329  {
330  glout_error << "Error: could not read message record for seqnum " << seqnum << " from: " << _dbFname;
331  return false;
332  }
333 
334  to.assign(buff, itr->second._size);
335  return true;
336 }
337 
338 //---------------------------------------------------------------------------------------------------
339 unsigned FilePersister::find_nearest_highest_seqnum (const unsigned requested, const unsigned last) const
340 {
341  if (last)
342  {
343  for (unsigned startseqnum(requested); startseqnum <= last; ++startseqnum)
344  {
345  Index::const_iterator itr(_index.find(startseqnum));
346  if (itr != _index.end())
347  return itr->first;
348  }
349  }
350 
351  return 0;
352 }
353 
Prec _prec
Definition: persist.hpp:384
#define O_BINARY
Definition: persist.hpp:350
Fix8 Base Session. User sessions are derived from this class.
Definition: session.hpp:394
std::pair< const unsigned, const f8String > SequencePair
Definition: session.hpp:615
virtual F8API bool initialise(const f8String &dbDir, const f8String &dbFname, bool purge=false)
Definition: filepersist.cpp:45
Provides context to your retrans handler.
Definition: session.hpp:598
virtual F8API bool put(const unsigned seqnum, const f8String &what)
#define glout_error
Definition: logger.hpp:606
uint32_t _seq
Definition: persist.hpp:383
unsigned get_next_send_seq() const
Definition: session.hpp:750
#define glout_debug
Definition: logger.hpp:614
#define glout_warn
Definition: logger.hpp:604
static const int max_rotation
Definition: logger.hpp:173
#define FIX8_MAX_MSG_LENGTH
Definition: f8config.h:576
#define glout_info
Definition: logger.hpp:601
F8API std::string & CheckAddTrailingSlash(std::string &source)
virtual F8API ~FilePersister()
Dtor.
virtual F8API unsigned find_nearest_highest_seqnum(const unsigned requested, const unsigned last) const
virtual F8API unsigned get_last_seqnum(unsigned &to) const
virtual F8API bool get(const unsigned seqnum, f8String &to) const
bool exist(const std::string &fname)
Definition: f8utils.hpp:1068
std::string f8String
Definition: f8types.hpp:47