fix8  version 1.4.0
Open Source C++ FIX Framework
connection.cpp
Go to the documentation of this file.
1 //-----------------------------------------------------------------------------------------
2 /*
3 
4 Fix8 is released under the GNU LESSER GENERAL PUBLIC LICENSE Version 3.
5 
6 Fix8 Open Source FIX Engine.
7 Copyright (C) 2010-16 David L. Dight <fix@fix8.org>
8 
9 Fix8 is free software: you can redistribute it and / or modify it under the terms of the
10 GNU Lesser General Public License as published by the Free Software Foundation, either
11 version 3 of the License, or (at your option) any later version.
12 
13 Fix8 is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
14 even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15 
16 You should have received a copy of the GNU Lesser General Public License along with Fix8.
17 If not, see <http://www.gnu.org/licenses/>.
18 
19 BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY FOR THE PROGRAM, TO
20 THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE
21 COPYRIGHT HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY
22 KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS TO
24 THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE,
25 YOU ASSUME THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
26 
27 IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING WILL ANY COPYRIGHT
28 HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR REDISTRIBUTE THE PROGRAM AS PERMITTED
29 ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR
30 CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT
31 NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR
32 THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF SUCH
33 HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
34 
35 */
36 //-----------------------------------------------------------------------------------------
37 #include "precomp.hpp"
38 #include <fix8/f8includes.hpp>
39 
40 //-------------------------------------------------------------------------------------------------
41 using namespace FIX8;
42 using namespace std;
43 
44 //-------------------------------------------------------------------------------------------------
46 {
47  unsigned processed(0), dropped(0), invalid(0);
48  int retval(0);
49 
50  if (_pmodel == pm_coro)
51  {
52  f8String msg;
53 
54  try
55  {
56  reenter(_coro)
57  {
58  while (!_session.is_shutdown())
59  {
60  if (poll()) // it is a fair assumption that there is a complete message waiting to be read
61  {
62  if (read(msg)) // if our assumption is wrong we could block here
63  {
64  if (!_session.process(msg))
65  {
66  scout_warn << "Unhandled message: " << msg;
67  ++invalid;
68  }
69  else
70  ++processed;
71  }
72  else
73  ++invalid;
74  }
75  coro_yield;
76  }
77  }
78  }
79  catch (Poco::Net::NetException& e)
80  {
81  scout_error << e.what();
82  _session.do_state_change(States::st_session_terminated);
83  retval = -1;
84  }
85  catch (PeerResetConnection& e)
86  {
87  scout_error << e.what();
88  _session.do_state_change(States::st_session_terminated);
89  //_session.stop();
90  retval = -1;
91  }
92  catch (exception& e)
93  {
94  scout_error << e.what();
95  retval = -2;
96  ++invalid;
97  }
98  catch (...)
99  {
100  scout_error << "reader(coro): Unknown exception";
101  retval = -2;
102  }
103  }
104  else
105  {
106  while (!cancellation_token && !_session.is_shutdown())
107  {
108  f8String msg;
109 
110  try
111  {
112  if (read(msg)) // will block
113  {
114  if (_pmodel == pm_pipeline)
115  {
116  if (!_msg_queue.try_push (msg))
117  {
118  scout_warn << "FIXReader: message queue is full";
119  ++dropped;
120  }
121  else
122  ++processed;
123  }
124  else // _pmodel == pm_thread
125  {
126  if (!_session.process(msg))
127  {
128  scout_warn << "Unhandled message: " << msg;
129  ++invalid;
130  }
131  else
132  ++processed;
133  }
134  }
135  else
136  ++invalid;
137  }
138  catch (exception& e)
139  {
140  scout_error << e.what();
141  if (!_session.is_shutdown())
142  _session.do_state_change(States::st_session_terminated);
143  retval = -1;
144  ++invalid;
145  break;
146  }
147  }
148  scout_info << "FIXReader: " << processed << " messages processed, " << dropped << " dropped, "
149  << invalid << " invalid";
150  if (retval && errno)
151  {
152  scout_warn << "socket error=" << strerror(errno) << '(' << errno << ')';
153  }
154 
155  {
156  f8_scoped_lock guard(_start_mutex);
157  _started = false;
158  }
159  }
160  return retval;
161 }
162 
163 //-------------------------------------------------------------------------------------------------
165 {
166  int processed(0), ignored(0);
167 
168  for (; !_cancellation_token && !_session.is_shutdown();)
169  {
170  f8String *msg_ptr(0);
171 #if (FIX8_MPMC_SYSTEM == FIX8_MPMC_TBB)
172  f8String msg;
173  _msg_queue.pop (msg); // will block
174  if (msg.empty()) // means exit
175  break;
176  msg_ptr = &msg;
177 #else
178  _msg_queue.pop (msg_ptr); // will block
179  if (msg_ptr->empty()) // means exit
180  break;
181 #endif
182 
183  if (!_session.process(*msg_ptr))
184  {
185  scout_warn << "Unhandled message: " << *msg_ptr;
186  ++ignored;
187  }
188  else
189  ++processed;
190 #if (FIX8_MPMC_SYSTEM == FIX8_MPMC_FF)
191  _msg_queue.release(msg_ptr);
192 #endif
193  }
194 
195  scout_info << "FIXReaderCallback: " << processed << " messages processed, " << ignored << " ignored";
196 
197  return 0;
198 }
199 
200 //-------------------------------------------------------------------------------------------------
202 {
203  _bg_sz = 2 + _session.get_ctx()._beginStr.size() + 1 + 3;
204 }
205 
206 //-------------------------------------------------------------------------------------------------
207 bool FIXReader::read(f8String& to) // read a complete FIX message
208 {
209  char msg_buf[_max_msg_len] {};
210  int result(sockRead(msg_buf, _bg_sz));
211 
212  if (result == static_cast<int>(_bg_sz))
213  {
214  char bt;
215  size_t offs(_bg_sz);
216  do // get the last chrs of bodylength and ^A
217  {
218  if (sockRead(&bt, 1) != 1)
219  return false;
220  if (!isdigit(bt) && bt != default_field_separator)
221  throw IllegalMessage(msg_buf, FILE_LINE);
222  msg_buf[offs++] = bt;
223  }
224  while (bt != default_field_separator && offs < _max_msg_len);
225  to.assign(msg_buf, offs);
226 
228  unsigned result;
229  if ((result = MessageBase::extract_element(to.data(), static_cast<unsigned>(to.size()), tag, val)))
230  {
231  if (*tag != '8')
232  throw IllegalMessage(to, FILE_LINE);
233 
234  if (_session.get_ctx()._beginStr.compare(val)) // invalid FIX version
235  throw InvalidVersion(string(val));
236 
237  if ((result = MessageBase::extract_element(to.data() + result, static_cast<unsigned>(to.size()) - result, tag, val)))
238  {
239  if (*tag != '9')
240  throw IllegalMessage(to, FILE_LINE);
241 
242  const unsigned mlen(fast_atoi<unsigned>(val));
243  if (mlen == 0 || mlen > _max_msg_len - _bg_sz - _chksum_sz) // invalid msglen
244  throw InvalidBodyLength(mlen);
245 
246  // read the body
247  if ((result = sockRead(msg_buf, mlen) != static_cast<int>(mlen)))
248  return false;
249 
250  // read the checksum
251  if ((result = sockRead(msg_buf + mlen, _chksum_sz) != static_cast<int>(_chksum_sz)))
252  return false;
253 
254  to.append(msg_buf, mlen + _chksum_sz);
255  _session.update_received();
256  //string ts;
257  //cerr << GetTimeAsStringMS(ts, &_session.get_last_received(), 9) << endl;
258  return true;
259  }
260  }
261 
262  throw IllegalMessage(to, FILE_LINE);
263  }
264 
265  return false;
266 }
267 
268 //-------------------------------------------------------------------------------------------------
270 {
271  int result(0), processed(0), invalid(0);
272 
273  while (!cancellation_token && !_session.is_shutdown())
274  {
275  try
276  {
277  Message *inmsg(0);
278  _msg_queue.pop (inmsg); // will block
279  if (!inmsg)
280  break;
281  unique_ptr<Message> msg(inmsg);
282  _session.send_process(msg.get());
283  ++processed;
284  }
285  catch (PeerResetConnection& e)
286  {
287  scout_error << e.what();
288  result = -1;
289  break;
290  }
291  catch (Poco::Net::NetException& e)
292  {
293  scout_error << e.what();
294  ++invalid;
295  break;
296  }
297  catch (exception& e)
298  {
299  scout_error << e.what();
300  ++invalid;
301  break; //?
302  }
303  }
304 
305  scout_info << "FIXWriter: " << processed << " messages processed, " << invalid << " invalid";
306 
307  {
308  f8_scoped_lock guard(_start_mutex);
309  _started = false;
310  }
311  return result;
312 }
313 
314 //-------------------------------------------------------------------------------------------------
316 {
317  _writer.start();
318  _reader.start();
319 }
320 
321 //-------------------------------------------------------------------------------------------------
323 {
324  scout_debug << "Connection::stop()";
325  _writer.stop();
326  _writer.join();
327 
328  scout_debug << "Connection::stop() => _reader.stop()";
329  _reader.stop();
330  if (_reader.started())
331  _reader.socket()->shutdown();
332  _reader.join();
333 }
334 
335 //-------------------------------------------------------------------------------------------------
336 //-------------------------------------------------------------------------------------------------
338 {
339  unsigned attempts(0);
340  const LoginParameters& lparam(_session.get_login_parameters());
341  const Poco::Timespan timeout(lparam._connect_timeout, 0);
342 
343  while (attempts < (lparam._reliable ? 1 : lparam._login_retries))
344  {
345  try
346  {
347  if (_addr == Poco::Net::SocketAddress())
348  throw Poco::Net::InvalidAddressException("empty address");
349 
350  scout_info << "Trying to connect to: " << _addr.toString() << " (" << ++attempts << ')' << ( _secured ? " secured" : " not-secured");
351  _sock->connect(_addr, timeout);
352  if (lparam._recv_buf_sz)
353  set_recv_buf_sz(lparam._recv_buf_sz);
354  if (lparam._send_buf_sz)
355  set_send_buf_sz(lparam._send_buf_sz);
356  _sock->setLinger(false, 0);
357  _sock->setNoDelay(_no_delay);
358  scout_info << "Connection successful";
359  return _connected = true;
360  }
361  catch (Poco::Exception& e)
362  {
363  if (lparam._reliable)
364  {
365  scout_debug << "rethrowing Poco::Exception: " << e.displayText();
366  throw;
367  }
368  scout_error << "exception: " << e.displayText();
369  hypersleep<h_milliseconds>(lparam._login_retry_interval);
370  }
371  catch (exception& e)
372  {
373  if (lparam._reliable)
374  {
375  scout_debug << "rethrowing exception: " << e.what();
376  throw;
377  }
378  scout_error << "exception: " << e.what();
379  hypersleep<h_milliseconds>(lparam._login_retry_interval);
380  }
381  }
382 
383  scout_error << "Connection failed";
384  return false;
385 }
virtual F8API int execute(f8_thread_cancellation_token &cancellation_token)
Definition: connection.cpp:45
const size_t MAX_MSGTYPE_FIELD_LEN(32)
A message was decoded that had a Fix version not configured for this session.
#define FIX8_MAX_FLD_LENGTH
Definition: f8config.h:571
#define FILE_LINE
Definition: f8utils.hpp:68
F8API void stop()
Stop the reader and writer threads.
Definition: connection.cpp:322
A message was read that was in an illegal format.
A message was decoded with an invalid message body length.
F8API void set_preamble_sz()
Calculate the length of the Fix message preamble, e.g. "8=FIX.4.4^A9=".
Definition: connection.cpp:201
Thread cancellation token.
Definition: thread.hpp:206
bool read(f8String &to)
Definition: connection.cpp:207
int hypersleep< h_milliseconds >(unsigned amt)
Definition: hypersleep.hpp:105
F8API void start()
Start the reader and writer threads.
Definition: connection.cpp:315
static unsigned extract_element(const char *from, const unsigned sz, char *tag, char *val)
Definition: message.hpp:886
A complete Fix message with header, body and trailer.
Definition: message.hpp:1058
#define reenter(c)
Definition: yield.hpp:12
A connected peer has reset the connection (disconnected).
#define scout_warn
Definition: connection.hpp:756
const unsigned char default_field_separator(0x1)
default FIX field separator (^A)
#define scout_info
Definition: connection.hpp:755
#define coro_yield
Definition: yield.hpp:16
virtual F8API int execute(f8_thread_cancellation_token &cancellation_token)
Definition: connection.cpp:269
const char * what() const
Definition: f8exception.hpp:85
std::string f8String
Definition: f8types.hpp:47
F8API int callback_processor()
Definition: connection.cpp:164
#define scout_error
Definition: connection.hpp:757
#define scout_debug
Definition: connection.hpp:759