47 unsigned processed(0), dropped(0), invalid(0);
58 while (!_session.is_shutdown())
64 if (!_session.process(msg))
79 catch (Poco::Net::NetException& e)
106 while (!cancellation_token && !_session.is_shutdown())
116 if (!_msg_queue.try_push (msg))
118 scout_warn <<
"FIXReader: message queue is full";
126 if (!_session.process(msg))
141 if (!_session.is_shutdown())
148 scout_info <<
"FIXReader: " << processed <<
" messages processed, " << dropped <<
" dropped, "
149 << invalid <<
" invalid";
152 scout_warn <<
"socket error=" << strerror(errno) <<
'(' << errno <<
')';
166 int processed(0), ignored(0);
168 for (; !_cancellation_token && !_session.is_shutdown();)
171 #if (FIX8_MPMC_SYSTEM == FIX8_MPMC_TBB)
173 _msg_queue.pop (msg);
178 _msg_queue.pop (msg_ptr);
179 if (msg_ptr->empty())
183 if (!_session.process(*msg_ptr))
185 scout_warn <<
"Unhandled message: " << *msg_ptr;
190 #if (FIX8_MPMC_SYSTEM == FIX8_MPMC_FF)
191 _msg_queue.release(msg_ptr);
195 scout_info <<
"FIXReaderCallback: " << processed <<
" messages processed, " << ignored <<
" ignored";
203 _bg_sz = 2 + _session.get_ctx()._beginStr.size() + 1 + 3;
209 char msg_buf[_max_msg_len] {};
210 int result(sockRead(msg_buf, _bg_sz));
212 if (result == static_cast<int>(_bg_sz))
218 if (sockRead(&bt, 1) != 1)
222 msg_buf[offs++] = bt;
225 to.assign(msg_buf, offs);
234 if (_session.get_ctx()._beginStr.compare(val))
242 const unsigned mlen(fast_atoi<unsigned>(val));
243 if (mlen == 0 || mlen > _max_msg_len - _bg_sz - _chksum_sz)
247 if ((result = sockRead(msg_buf, mlen) != static_cast<int>(mlen)))
251 if ((result = sockRead(msg_buf + mlen, _chksum_sz) != static_cast<int>(_chksum_sz)))
254 to.append(msg_buf, mlen + _chksum_sz);
255 _session.update_received();
271 int result(0), processed(0), invalid(0);
273 while (!cancellation_token && !_session.is_shutdown())
278 _msg_queue.pop (inmsg);
281 unique_ptr<Message> msg(inmsg);
282 _session.send_process(msg.get());
291 catch (Poco::Net::NetException& e)
305 scout_info <<
"FIXWriter: " << processed <<
" messages processed, " << invalid <<
" invalid";
328 scout_debug <<
"Connection::stop() => _reader.stop()";
330 if (_reader.started())
331 _reader.socket()->shutdown();
339 unsigned attempts(0);
341 const Poco::Timespan timeout(lparam._connect_timeout, 0);
343 while (attempts < (lparam._reliable ? 1 : lparam._login_retries))
347 if (_addr == Poco::Net::SocketAddress())
348 throw Poco::Net::InvalidAddressException(
"empty address");
350 scout_info <<
"Trying to connect to: " << _addr.toString() <<
" (" << ++attempts <<
')' << ( _secured ?
" secured" :
" not-secured");
351 _sock->connect(_addr, timeout);
352 if (lparam._recv_buf_sz)
353 set_recv_buf_sz(lparam._recv_buf_sz);
354 if (lparam._send_buf_sz)
355 set_send_buf_sz(lparam._send_buf_sz);
356 _sock->setLinger(
false, 0);
357 _sock->setNoDelay(_no_delay);
359 return _connected =
true;
361 catch (Poco::Exception& e)
363 if (lparam._reliable)
365 scout_debug <<
"rethrowing Poco::Exception: " << e.displayText();
373 if (lparam._reliable)
375 scout_debug <<
"rethrowing exception: " << e.what();
virtual F8API int execute(f8_thread_cancellation_token &cancellation_token)
const size_t MAX_MSGTYPE_FIELD_LEN(32)
A message was decoded that had a Fix version not configured for this session.
#define FIX8_MAX_FLD_LENGTH
F8API void stop()
Stop the reader and writer threads.
A message was read that was in an illegal format.
A message was decoded with an invalid message body length.
F8API void set_preamble_sz()
Calculate the length of the Fix message preamble, e.g. "8=FIX.4.4^A9=".
Thread cancellation token.
int hypersleep< h_milliseconds >(unsigned amt)
F8API void start()
Start the reader and writer threads.
static unsigned extract_element(const char *from, const unsigned sz, char *tag, char *val)
A complete Fix message with header, body and trailer.
A connected peer has reset the connection (disconnected).
const unsigned char default_field_separator(0x1)
default FIX field separator (^A)
virtual F8API int execute(f8_thread_cancellation_token &cancellation_token)
const char * what() const
F8API int callback_processor()