37 #ifndef FIX8_CONNECTION_HPP_
38 #define FIX8_CONNECTION_HPP_
40 #include <Poco/Net/StreamSocket.h>
41 #include <Poco/Timespan.h>
42 #include <Poco/Net/NetException.h>
74 : _sock(sock), _session(session), _pmodel(pmodel), _started(false), _thread(
std::ref(*this)) {}
81 size_t queued()
const {
return _msg_queue.size(); }
142 #if FIX8_EXPERIMENTAL_BUFFERED_SOCKET_READ
144 char *_read_buffer_rptr, *_read_buffer_wptr;
158 #if FIX8_EXPERIMENTAL_BUFFERED_SOCKET_READ
163 int sockRead(
char *where,
size_t sz)
165 const size_t available_in_buffer(static_cast<size_t>(_read_buffer_wptr - _read_buffer_rptr));
166 if (available_in_buffer < sz)
168 sz = std::min((
size_t)(_read_buffer_wptr-_read_buffer_rptr), sz);
169 memcpy(where, _read_buffer_rptr, sz);
170 _read_buffer_rptr += sz;
172 if (static_cast<size_t>(_read_buffer_rptr - _read_buffer) >= shift)
174 memcpy(_read_buffer, &_read_buffer[shift],
sizeof(_read_buffer) - shift);
175 _read_buffer_rptr -= shift;
176 _read_buffer_wptr -= shift;
186 int realSockRead(
size_t sz,
size_t maxsz)
188 const size_t max_sz(_read_buffer +
sizeof(_read_buffer) - _read_buffer_wptr);
189 int maxremaining(std::min(maxsz, max_sz)), remaining(std::min(sz, max_sz));
190 char *ptr(_read_buffer_wptr), *eptr(_read_buffer +
sizeof(_read_buffer));
193 while (remaining > 0 && _read_buffer_wptr < eptr)
195 rdsz =
_sock->receiveBytes(_read_buffer_wptr, maxremaining);
199 #
if defined EWOULDBLOCK && EAGAIN != EWOULDBLOCK
200 || errno == EWOULDBLOCK
204 throw PeerResetConnection(
"sockRead: connection gone");
206 _read_buffer_wptr += rdsz;
208 maxremaining -= rdsz;
210 return _read_buffer_wptr - ptr;
219 unsigned remaining(static_cast<unsigned>(sz)), rddone(0);
220 while (remaining > 0)
222 const int rdSz(
_sock->receiveBytes(where + rddone, remaining));
226 #
if defined EWOULDBLOCK && EAGAIN != EWOULDBLOCK
227 || errno == EWOULDBLOCK
247 #if FIX8_EXPERIMENTAL_BUFFERED_SOCKET_READ
248 , _read_buffer(), _read_buffer_rptr(_read_buffer), _read_buffer_wptr(_read_buffer)
265 _socket_error =
false;
270 if (_callback_thread.
start())
271 _socket_error =
true;
281 _callback_thread.
join();
307 glout_warn <<
"FIXReader: AsyncSocket already stopped.";
330 bool poll(
const Poco::Timespan &ts = Poco::Timespan())
const
332 return _sock->poll(ts, Poco::Net::Socket::SELECT_READ);
369 std::unique_ptr<Message> msg(from);
379 size_t write_batch(
const std::vector<Message *>& msgs,
bool destroy)
383 if (msgs.size() == 1)
384 return write(msgs.front(), destroy) ? 1 : 0;
387 for (std::vector<Message *>::const_iterator itr(msgs.begin()), eitr(msgs.end()), litr(eitr-1); itr != eitr; ++itr)
402 for (std::vector<Message *>::const_iterator itr(msgs.begin()), eitr(msgs.end()); itr != eitr; ++itr)
404 std::unique_ptr<Message> smsg(*itr);
420 throw f8Exception(
"cannot send message directly if pipelining");
428 bool poll(
const Poco::Timespan &ts = Poco::Timespan())
const
430 return _sock->poll(ts, Poco::Net::Socket::SELECT_WRITE);
438 int send(
const char *
data,
size_t remaining,
bool nb=
false)
442 while (remaining > 0)
444 const int wrtSz(
_sock->sendBytes(data + wrdone, static_cast<int>(remaining)));
448 #
if defined EWOULDBLOCK && EAGAIN != EWOULDBLOCK
449 || errno == EWOULDBLOCK
489 glout_warn <<
"FIXWriter: AsyncSocket already stopped.";
539 : _sock(sock), _addr(addr), _connected(role ==
cn_acceptor), _session(session), _role(role), _pmodel(pmodel),
540 _hb_interval(hb_interval), _hb_interval20pc(hb_interval + hb_interval / 5),
541 _reader(sock, session, pmodel), _writer(sock, session, pmodel),
594 int send(
const char *from,
size_t sz) {
return _writer.
send(from, sz); }
604 { _hb_interval =
hb_interval; _hb_interval20pc = hb_interval + hb_interval / 5; }
635 const unsigned current_sz(sock->getReceiveBufferSize());
636 sock->setReceiveBufferSize(sz);
637 glout_info <<
"fd(" << sock->impl()->sockfd() <<
") ReceiveBufferSize old:" << current_sz
638 <<
" requested:" << sz <<
" new:" << sock->getReceiveBufferSize();
646 const unsigned current_sz(sock->getSendBufferSize());
647 sock->setSendBufferSize(sz);
648 glout_info <<
"fd(" << sock->impl()->sockfd() <<
") SendBufferSize old:" << current_sz
649 <<
" requested:" << sz <<
" new:" << sock->getSendBufferSize();
663 #if defined FIX8_HAVE_DECL_TCP_CORK && TCP_CORK != 0
664 _sock->setOption(IPPROTO_TCP, TCP_CORK, way ? 1 : 0);
679 bool reader_poll(
const Poco::Timespan &ts = Poco::Timespan())
const {
return _reader.
poll(ts); }
688 bool writer_poll(
const Poco::Timespan &ts = Poco::Timespan())
const {
return _writer.
poll(ts); }
739 bool reuse_addr=
false,
int linger=-1,
bool keepalive=
false,
bool secured=
false) :
742 _sock->setLinger(linger >= 0, linger);
743 _sock->setNoDelay(no_delay);
744 _sock->setReuseAddress(reuse_addr);
745 _sock->setKeepAlive(keepalive);
754 #define scout ssout_info((&_session))
755 #define scout_info ssout_info((&_session))
756 #define scout_warn ssout_warn((&_session))
757 #define scout_error ssout_error((&_session))
758 #define scout_fatal ssout_fatal((&_session))
759 #define scout_debug ssout_debug((&_session))
765 #endif // FIX8_CONNECTION_HPP_
virtual void start()
Start the processing threads.
f8_atomic< bool > _connected
Client (initiator) specialisation of Connection.
unsigned get_hb_interval() const
ClientConnection(Poco::Net::StreamSocket *sock, Poco::Net::SocketAddress &addr, Session &session, unsigned hb_interval, ProcessModel pmodel=pm_pipeline, bool no_delay=true, bool secured=false)
virtual int execute(f8_thread_cancellation_token &cancellation_token)
void set_hb_interval(const unsigned hb_interval)
f8_thread_cancellation_token & callback_cancellation_token()
virtual ~AsyncSocket()=default
Dtor.
virtual F8API int execute(f8_thread_cancellation_token &cancellation_token)
Fix8 Base Session. User sessions are derived from this class.
generic pthread_mutex wrapper
size_t write_batch(const std::vector< Message * > &msgs, bool destroy)
virtual ~ClientConnection()
Dtor.
static void set_recv_buf_sz(const unsigned sz, Poco::Net::Socket *sock)
AsyncSocket(Poco::Net::StreamSocket *sock, Session &session, const ProcessModel pmodel=pm_pipeline)
bool reader_poll(const Poco::Timespan &ts=Poco::Timespan()) const
bool write(Message *from, bool destroy)
F8API void stop()
Stop the reader and writer threads.
Poco::Net::StreamSocket * _sock
Server (acceptor) specialisation of Connection.
ProcessModel get_pmodel() const
virtual void quit()
Stop the processing thread and quit.
unsigned _hb_interval20pc
virtual void quit()
Stop the processing threads and quit.
int send(const f8String &from)
Thread wrapper. Ctor provides T instance and specifies ptr to member to call or defaults to operator(...
Role
Roles: acceptor, initiator or unknown.
Complete Fix connection (reader and writer).
F8API void set_preamble_sz()
Calculate the length of the Fix message preamble, e.g. "8=FIX.4.4^A9=".
Connection(Poco::Net::StreamSocket *sock, Poco::Net::SocketAddress &addr, Session &session, Role role, const ProcessModel pmodel, unsigned hb_interval, bool secured)
virtual void start()
Start the processing threads.
ProcessModel
Supported session process models.
virtual void start()
Start the processing thread.
Thread cancellation token.
f8_thread_cancellation_token _cancellation_token
f8_thread< AsyncSocket > _thread
bool is_socket_error() const
static void set_send_buf_sz(const unsigned sz, Poco::Net::Socket *sock)
void clear_connection(const Connection *connection)
bool is_connected() const
Half duplex async socket wrapper with thread.
virtual int join(int timeoutInMs=0)
virtual void quit()
Stop the processing threads and quit.
bool poll(const Poco::Timespan &ts=Poco::Timespan()) const
Poco::Net::StreamSocket * _sock
F8API void start()
Start the reader and writer threads.
#define FIX8_MAX_MSG_LENGTH
bool write(Message &from)
f8_thread_cancellation_token _callback_cancellation_token
const Poco::Net::SocketAddress & get_socket_address() const
Poco::Net::StreamSocket * socket()
f8_thread_cancellation_token & cancellation_token()
virtual ~FIXWriter()
Dtor.
f8_atomic< bool > _socket_error
virtual bool write(Message &from)
void set_end_of_batch(bool is_end_of_batch)
A complete Fix message with header, body and trailer.
std::atomic< T > f8_atomic
generic spin_lock wrapper
F8API bool send_process(Message *msg)
tbb::concurrent_bounded_queue< T > f8_concurrent_queue
virtual ~FIXReader()
Dtor.
f8_concurrent_queue< T > _msg_queue
void set_recv_buf_sz(const unsigned sz) const
A connected peer has reset the connection (disconnected).
Poco::Net::SocketAddress get_peer_socket_address() const
virtual void request_stop()
Start the processing thread.
Poco::Net::SocketAddress _addr
void set_tcp_cork_flag(bool way) const
bool writer_poll(const Poco::Timespan &ts=Poco::Timespan()) const
void set_send_buf_sz(const unsigned sz) const
size_t write_batch(const std::vector< Message * > &msgs, bool destroy)
virtual ~Connection()
Dtor.
unsigned get_hb_interval20pc() const
int send(const char *from, size_t sz)
virtual ~ServerConnection()
Dtor.
FIXWriter(Poco::Net::StreamSocket *sock, Session &session, const ProcessModel pmodel=pm_pipeline)
virtual F8API int execute(f8_thread_cancellation_token &cancellation_token)
FIXReader(Poco::Net::StreamSocket *sock, Session &session, const ProcessModel pmodel=pm_pipeline)
ServerConnection(Poco::Net::StreamSocket *sock, Poco::Net::SocketAddress &addr, Session &session, unsigned hb_interval, ProcessModel pmodel=pm_pipeline, bool no_delay=true, bool reuse_addr=false, int linger=-1, bool keepalive=false, bool secured=false)
int send(const char *data, size_t remaining, bool nb=false)
virtual void stop()
Send a message to the processing method instructing it to quit.
virtual void stop()
Send a message to the processing method instructing it to quit.
int sockRead(char *where, const size_t sz)
bool is_socket_error() const
F8API int callback_processor()
f8_thread< FIXReader > _callback_thread
virtual bool write(Message *from, bool destroy=true)
bool poll(const Poco::Timespan &ts=Poco::Timespan()) const