fix8  version 1.4.0
Open Source C++ FIX Framework
FIX8::FIXReader Class Reference

Fix message reader. More...

#include <connection.hpp>

Inheritance diagram for FIX8::FIXReader:
FIX8::AsyncSocket< f8String >

Public Member Functions

 FIXReader (Poco::Net::StreamSocket *sock, Session &session, const ProcessModel pmodel=pm_pipeline)
 
virtual ~FIXReader ()
 Dtor. More...
 
virtual void start ()
 Start the processing threads. More...
 
virtual void quit ()
 Stop the processing threads and quit. More...
 
virtual void stop ()
 Send a message to the processing method instructing it to quit. More...
 
virtual F8API int execute (f8_thread_cancellation_token &cancellation_token)
 
int join ()
 
F8API void set_preamble_sz ()
 Calculate the length of the Fix message preamble, e.g. "8=FIX.4.4^A9=". More...
 
bool is_socket_error () const
 
bool poll (const Poco::Timespan &ts=Poco::Timespan()) const
 
f8_thread_cancellation_tokencallback_cancellation_token ()
 
- Public Member Functions inherited from FIX8::AsyncSocket< f8String >
 AsyncSocket (Poco::Net::StreamSocket *sock, Session &session, const ProcessModel pmodel=pm_pipeline)
 
virtual ~AsyncSocket ()=default
 Dtor. More...
 
size_t queued () const
 
bool started () const
 
int operator() ()
 
virtual void request_stop ()
 Start the processing thread. More...
 
Poco::Net::StreamSocket * socket ()
 
int join ()
 
f8_thread_cancellation_tokencancellation_token ()
 

Private Types

enum  { _max_msg_len = FIX8_MAX_MSG_LENGTH, _chksum_sz = 7 }
 

Private Member Functions

F8API int callback_processor ()
 
bool read (f8String &to)
 
int sockRead (char *where, const size_t sz)
 

Private Attributes

f8_atomic< bool > _socket_error
 
f8_thread< FIXReader_callback_thread
 
f8_thread_cancellation_token _callback_cancellation_token
 
size_t _bg_sz
 

Additional Inherited Members

- Protected Attributes inherited from FIX8::AsyncSocket< f8String >
coroutine _coro
 
Poco::Net::StreamSocket * _sock
 
f8_concurrent_queue< f8String_msg_queue
 
Session_session
 
ProcessModel _pmodel
 
f8_thread_cancellation_token _cancellation_token
 
volatile bool _started
 
f8_mutex _start_mutex
 

Detailed Description

Fix message reader.

Definition at line 134 of file connection.hpp.

Member Enumeration Documentation

anonymous enum
private
Enumerator
_max_msg_len 
_chksum_sz 

Definition at line 136 of file connection.hpp.

Constructor & Destructor Documentation

FIX8::FIXReader::FIXReader ( Poco::Net::StreamSocket *  sock,
Session session,
const ProcessModel  pmodel = pm_pipeline 
)
inline

Ctor.

Parameters
sockconnected socket
sessionsession
pmodelprocess model

Definition at line 245 of file connection.hpp.

References set_preamble_sz().

246  : AsyncSocket<f8String>(sock, session, pmodel), _callback_thread(std::ref(*this), &FIXReader::callback_processor)
247 #if FIX8_EXPERIMENTAL_BUFFERED_SOCKET_READ
248  , _read_buffer(), _read_buffer_rptr(_read_buffer), _read_buffer_wptr(_read_buffer)
249 #endif
250  , _bg_sz()
251  {
252  set_preamble_sz();
253  }
F8API void set_preamble_sz()
Calculate the length of the Fix message preamble, e.g. "8=FIX.4.4^A9=".
Definition: connection.cpp:201
F8API int callback_processor()
Definition: connection.cpp:164
f8_thread< FIXReader > _callback_thread
Definition: connection.hpp:139
virtual FIX8::FIXReader::~FIXReader ( )
inlinevirtual

Dtor.

Definition at line 256 of file connection.hpp.

References join(), and stop().

257  {
258  stop();
259  join();
260  }
virtual void stop()
Send a message to the processing method instructing it to quit.
Definition: connection.hpp:288

Member Function Documentation

f8_thread_cancellation_token& FIX8::FIXReader::callback_cancellation_token ( )
inline

Definition at line 335 of file connection.hpp.

References _callback_cancellation_token.

f8_thread_cancellation_token _callback_cancellation_token
Definition: connection.hpp:140
int FIXReader::callback_processor ( )
private

Process messages from inbound queue, calls session process method.

Returns
number of messages processed

Definition at line 164 of file connection.cpp.

References scout_info, and scout_warn.

165 {
166  int processed(0), ignored(0);
167 
169  {
170  f8String *msg_ptr(0);
171 #if (FIX8_MPMC_SYSTEM == FIX8_MPMC_TBB)
172  f8String msg;
173  _msg_queue.pop (msg); // will block
174  if (msg.empty()) // means exit
175  break;
176  msg_ptr = &msg;
177 #else
178  _msg_queue.pop (msg_ptr); // will block
179  if (msg_ptr->empty()) // means exit
180  break;
181 #endif
182 
183  if (!_session.process(*msg_ptr))
184  {
185  scout_warn << "Unhandled message: " << *msg_ptr;
186  ++ignored;
187  }
188  else
189  ++processed;
190 #if (FIX8_MPMC_SYSTEM == FIX8_MPMC_FF)
191  _msg_queue.release(msg_ptr);
192 #endif
193  }
194 
195  scout_info << "FIXReaderCallback: " << processed << " messages processed, " << ignored << " ignored";
196 
197  return 0;
198 }
virtual F8API bool process(const f8String &from)
Definition: session.cpp:277
f8_thread_cancellation_token _cancellation_token
Definition: connection.hpp:61
f8_concurrent_queue< f8String > _msg_queue
Definition: connection.hpp:58
#define scout_warn
Definition: connection.hpp:756
#define scout_info
Definition: connection.hpp:755
bool is_shutdown()
Definition: session.hpp:778
std::string f8String
Definition: f8types.hpp:47
int FIXReader::execute ( f8_thread_cancellation_token cancellation_token)
virtual

Reader thread method. Reads messages and places them on the queue for processing. Supports pipelined, threaded and coroutine process models.

Returns
0 on success

Reimplemented from FIX8::AsyncSocket< f8String >.

Definition at line 45 of file connection.cpp.

References coro_yield, FIX8::pm_coro, FIX8::pm_pipeline, reenter, scout_error, scout_info, scout_warn, FIX8::States::st_session_terminated, and FIX8::f8Exception::what().

Referenced by FIX8::Connection::reader_execute().

46 {
47  unsigned processed(0), dropped(0), invalid(0);
48  int retval(0);
49 
50  if (_pmodel == pm_coro)
51  {
52  f8String msg;
53 
54  try
55  {
56  reenter(_coro)
57  {
58  while (!_session.is_shutdown())
59  {
60  if (poll()) // it is a fair assumption that there is a complete message waiting to be read
61  {
62  if (read(msg)) // if our assumption is wrong we could block here
63  {
64  if (!_session.process(msg))
65  {
66  scout_warn << "Unhandled message: " << msg;
67  ++invalid;
68  }
69  else
70  ++processed;
71  }
72  else
73  ++invalid;
74  }
75  coro_yield;
76  }
77  }
78  }
79  catch (Poco::Net::NetException& e)
80  {
81  scout_error << e.what();
83  retval = -1;
84  }
85  catch (PeerResetConnection& e)
86  {
87  scout_error << e.what();
89  //_session.stop();
90  retval = -1;
91  }
92  catch (exception& e)
93  {
94  scout_error << e.what();
95  retval = -2;
96  ++invalid;
97  }
98  catch (...)
99  {
100  scout_error << "reader(coro): Unknown exception";
101  retval = -2;
102  }
103  }
104  else
105  {
106  while (!cancellation_token && !_session.is_shutdown())
107  {
108  f8String msg;
109 
110  try
111  {
112  if (read(msg)) // will block
113  {
114  if (_pmodel == pm_pipeline)
115  {
116  if (!_msg_queue.try_push (msg))
117  {
118  scout_warn << "FIXReader: message queue is full";
119  ++dropped;
120  }
121  else
122  ++processed;
123  }
124  else // _pmodel == pm_thread
125  {
126  if (!_session.process(msg))
127  {
128  scout_warn << "Unhandled message: " << msg;
129  ++invalid;
130  }
131  else
132  ++processed;
133  }
134  }
135  else
136  ++invalid;
137  }
138  catch (exception& e)
139  {
140  scout_error << e.what();
141  if (!_session.is_shutdown())
143  retval = -1;
144  ++invalid;
145  break;
146  }
147  }
148  scout_info << "FIXReader: " << processed << " messages processed, " << dropped << " dropped, "
149  << invalid << " invalid";
150  if (retval && errno)
151  {
152  scout_warn << "socket error=" << strerror(errno) << '(' << errno << ')';
153  }
154 
155  {
157  _started = false;
158  }
159  }
160  return retval;
161 }
virtual F8API bool process(const f8String &from)
Definition: session.cpp:277
void do_state_change(const States::SessionStates new_state)
Definition: session.hpp:827
bool read(f8String &to)
Definition: connection.cpp:207
#define reenter(c)
Definition: yield.hpp:12
f8_concurrent_queue< f8String > _msg_queue
Definition: connection.hpp:58
A connected peer has reset the connection (disconnected).
#define scout_warn
Definition: connection.hpp:756
#define scout_info
Definition: connection.hpp:755
#define coro_yield
Definition: yield.hpp:16
const char * what() const
Definition: f8exception.hpp:85
bool is_shutdown()
Definition: session.hpp:778
std::string f8String
Definition: f8types.hpp:47
#define scout_error
Definition: connection.hpp:757
bool poll(const Poco::Timespan &ts=Poco::Timespan()) const
Definition: connection.hpp:330
bool FIX8::FIXReader::is_socket_error ( ) const
inline

Check to see if the socket is in error

Returns
true if there was a socket error

Definition at line 325 of file connection.hpp.

References _socket_error.

Referenced by FIX8::Connection::is_socket_error().

325 { return _socket_error; }
f8_atomic< bool > _socket_error
Definition: connection.hpp:137
int FIX8::FIXReader::join ( )
inline

Wait till writer thread has finished.

Returns
0 on success

Definition at line 318 of file connection.hpp.

References FIX8::AsyncSocket< f8String >::_pmodel, FIX8::AsyncSocket< T >::join(), and FIX8::pm_coro.

Referenced by FIX8::Connection::join(), and ~FIXReader().

bool FIX8::FIXReader::poll ( const Poco::Timespan &  ts = Poco::Timespan()) const
inline

Check to see if there is any data waiting to be read

Parameters
tstimeout
Returns
true of data ready

Definition at line 330 of file connection.hpp.

References FIX8::AsyncSocket< f8String >::_sock.

Referenced by FIX8::Connection::reader_poll().

331  {
332  return _sock->poll(ts, Poco::Net::Socket::SELECT_READ);
333  }
Poco::Net::StreamSocket * _sock
Definition: connection.hpp:57
virtual void FIX8::FIXReader::quit ( )
inlinevirtual

Stop the processing threads and quit.

Reimplemented from FIX8::AsyncSocket< f8String >.

Definition at line 276 of file connection.hpp.

References FIX8::AsyncSocket< f8String >::_pmodel, FIX8::_f8_threadcore::join(), FIX8::pm_coro, FIX8::pm_pipeline, FIX8::AsyncSocket< T >::quit(), and FIX8::f8_thread< T >::request_stop().

277  {
278  if (_pmodel == pm_pipeline)
279  {
280  _callback_thread.request_stop();
281  _callback_thread.join();
282  }
283  if (_pmodel != pm_coro)
285  }
virtual void quit()
Stop the processing thread and quit.
Definition: connection.hpp:117
f8_thread< FIXReader > _callback_thread
Definition: connection.hpp:139
bool FIXReader::read ( f8String to)
private

Read a Fix message. Throws InvalidBodyLength, IllegalMessage.

Parameters
tostring to place message in
Returns
true on success

Definition at line 207 of file connection.cpp.

References FIX8::default_field_separator(), FIX8::MessageBase::extract_element(), FILE_LINE, FIX8_MAX_FLD_LENGTH, and FIX8::MAX_MSGTYPE_FIELD_LEN().

208 {
209  char msg_buf[_max_msg_len] {};
210  int result(sockRead(msg_buf, _bg_sz));
211 
212  if (result == static_cast<int>(_bg_sz))
213  {
214  char bt;
215  size_t offs(_bg_sz);
216  do // get the last chrs of bodylength and ^A
217  {
218  if (sockRead(&bt, 1) != 1)
219  return false;
220  if (!isdigit(bt) && bt != default_field_separator)
221  throw IllegalMessage(msg_buf, FILE_LINE);
222  msg_buf[offs++] = bt;
223  }
224  while (bt != default_field_separator && offs < _max_msg_len);
225  to.assign(msg_buf, offs);
226 
228  unsigned result;
229  if ((result = MessageBase::extract_element(to.data(), static_cast<unsigned>(to.size()), tag, val)))
230  {
231  if (*tag != '8')
232  throw IllegalMessage(to, FILE_LINE);
233 
234  if (_session.get_ctx()._beginStr.compare(val)) // invalid FIX version
235  throw InvalidVersion(string(val));
236 
237  if ((result = MessageBase::extract_element(to.data() + result, static_cast<unsigned>(to.size()) - result, tag, val)))
238  {
239  if (*tag != '9')
240  throw IllegalMessage(to, FILE_LINE);
241 
242  const unsigned mlen(fast_atoi<unsigned>(val));
243  if (mlen == 0 || mlen > _max_msg_len - _bg_sz - _chksum_sz) // invalid msglen
244  throw InvalidBodyLength(mlen);
245 
246  // read the body
247  if ((result = sockRead(msg_buf, mlen) != static_cast<int>(mlen)))
248  return false;
249 
250  // read the checksum
251  if ((result = sockRead(msg_buf + mlen, _chksum_sz) != static_cast<int>(_chksum_sz)))
252  return false;
253 
254  to.append(msg_buf, mlen + _chksum_sz);
256  //string ts;
257  //cerr << GetTimeAsStringMS(ts, &_session.get_last_received(), 9) << endl;
258  return true;
259  }
260  }
261 
262  throw IllegalMessage(to, FILE_LINE);
263  }
264 
265  return false;
266 }
const size_t MAX_MSGTYPE_FIELD_LEN(32)
A message was decoded that had a Fix version not configured for this session.
#define FIX8_MAX_FLD_LENGTH
Definition: f8config.h:571
#define FILE_LINE
Definition: f8utils.hpp:68
A message was read that was in an illegal format.
A message was decoded with an invalid message body length.
const F8MetaCntx & get_ctx() const
Definition: session.hpp:673
const f8String _beginStr
Fix header beginstring.
Definition: message.hpp:228
static unsigned extract_element(const char *from, const unsigned sz, char *tag, char *val)
Definition: message.hpp:886
const unsigned char default_field_separator(0x1)
default FIX field separator (^A)
void update_received()
Update the last received time.
Definition: session.hpp:718
int sockRead(char *where, const size_t sz)
Definition: connection.hpp:217
void FIXReader::set_preamble_sz ( )

Calculate the length of the Fix message preamble, e.g. "8=FIX.4.4^A9=".

Definition at line 201 of file connection.cpp.

Referenced by FIXReader().

202 {
203  _bg_sz = 2 + _session.get_ctx()._beginStr.size() + 1 + 3;
204 }
const F8MetaCntx & get_ctx() const
Definition: session.hpp:673
const f8String _beginStr
Fix header beginstring.
Definition: message.hpp:228
int FIX8::FIXReader::sockRead ( char *  where,
const size_t  sz 
)
inlineprivate

Read bytes from the socket layer, throws PeerResetConnection.

Parameters
wherebuffer to place bytes in
sznumber of bytes to read
Returns
number of bytes read

Definition at line 217 of file connection.hpp.

References FIX8::AsyncSocket< f8String >::_sock.

218  {
219  unsigned remaining(static_cast<unsigned>(sz)), rddone(0);
220  while (remaining > 0)
221  {
222  const int rdSz(_sock->receiveBytes(where + rddone, remaining));
223  if (rdSz <= 0)
224  {
225  if (errno == EAGAIN
226 #if defined EWOULDBLOCK && EAGAIN != EWOULDBLOCK
227  || errno == EWOULDBLOCK
228 #endif
229  )
230  continue;
231  throw PeerResetConnection("sockRead: connection gone");
232  }
233  rddone += rdSz;
234  remaining -= rdSz;
235  }
236  return rddone;
237  }
Poco::Net::StreamSocket * _sock
Definition: connection.hpp:57
virtual void FIX8::FIXReader::start ( )
inlinevirtual

Start the processing threads.

Reimplemented from FIX8::AsyncSocket< f8String >.

Definition at line 263 of file connection.hpp.

References FIX8::AsyncSocket< f8String >::_pmodel, FIX8::pm_coro, FIX8::pm_pipeline, FIX8::AsyncSocket< T >::start(), and FIX8::f8_thread< T >::start().

264  {
265  _socket_error = false;
266  if (_pmodel != pm_coro)
268  if (_pmodel == pm_pipeline)
269  {
270  if (_callback_thread.start())
271  _socket_error = true;
272  }
273  }
virtual void start()
Start the processing thread.
Definition: connection.hpp:96
f8_atomic< bool > _socket_error
Definition: connection.hpp:137
f8_thread< FIXReader > _callback_thread
Definition: connection.hpp:139
virtual void FIX8::FIXReader::stop ( )
inlinevirtual

Send a message to the processing method instructing it to quit.

Definition at line 288 of file connection.hpp.

References FIX8::AsyncSocket< f8String >::_msg_queue, FIX8::AsyncSocket< f8String >::_pmodel, FIX8::AsyncSocket< f8String >::_start_mutex, FIX8::AsyncSocket< f8String >::_started, glout_warn, FIX8::pm_coro, FIX8::pm_pipeline, FIX8::AsyncSocket< T >::request_stop(), and FIX8::f8_thread< T >::request_stop().

Referenced by ~FIXReader().

289  {
290  if (_started)
291  {
293  if (_started)
294  {
295  if (_pmodel == pm_pipeline)
296  {
297  const f8String from;
298  _msg_queue.try_push(from);
299  _callback_thread.request_stop();
300  }
301  if (_pmodel != pm_coro)
303  }
304  }
305  else
306  {
307  glout_warn << "FIXReader: AsyncSocket already stopped.";
308  }
309  }
f8_scoped_lock_impl< f8_mutex > f8_scoped_lock
Definition: thread.hpp:451
#define glout_warn
Definition: logger.hpp:604
f8_concurrent_queue< f8String > _msg_queue
Definition: connection.hpp:58
virtual void request_stop()
Start the processing thread.
Definition: connection.hpp:114
std::string f8String
Definition: f8types.hpp:47
f8_thread< FIXReader > _callback_thread
Definition: connection.hpp:139

Member Data Documentation

size_t FIX8::FIXReader::_bg_sz
private

Definition at line 151 of file connection.hpp.

f8_thread_cancellation_token FIX8::FIXReader::_callback_cancellation_token
private

Definition at line 140 of file connection.hpp.

Referenced by callback_cancellation_token().

f8_thread<FIXReader> FIX8::FIXReader::_callback_thread
private

Definition at line 139 of file connection.hpp.

f8_atomic<bool> FIX8::FIXReader::_socket_error
private

Definition at line 137 of file connection.hpp.

Referenced by is_socket_error().


The documentation for this class was generated from the following files: