37 #ifndef FIX8_SESSION_HPP_
38 #define FIX8_SESSION_HPP_
40 #include <Poco/Net/StreamSocket.h>
62 : _beginString(beginString), _senderCompID(senderCompID), _targetCompID(targetCompID) {
make_id(); }
69 : _beginString(beginString), _senderCompID(senderCompID), _targetCompID(targetCompID) {
make_id(); }
77 SessionID(
const SessionID& from) : _beginString(from._beginString), _senderCompID(from._senderCompID),
78 _targetCompID(from._targetCompID), _id(from._id) {}
181 using Client = std::tuple<f8String, Poco::Net::IPAddress>;
182 using Clients = std::unordered_map<f8String, Client>;
207 _start_day(-1), _end_day(-1) {}
210 int start_day=-1,
int end_day=-1) :
211 _start(start), _end(end), _duration(duration),
212 _utc_offset(utc_offset), _start_day(start_day), _end_day(end_day),
213 _toffset(static_cast<
Tickval::ticks>(_utc_offset) *
Tickval::minute)
218 _start(from._start), _end(from._end), _duration(from._duration),
219 _utc_offset(from._utc_offset), _start_day(from._start_day), _end_day(from._end_day),
220 _toffset(from._toffset)
247 bool test(
bool prev=
false)
const
258 if (now.
in_range(today + _start, today + _end))
268 const tm result(now.
get_tm());
274 if ( ((_start_day > _end_day && (result.tm_wday >= _start_day || result.tm_wday <= _end_day))
275 || (_start_day < _end_day && result.tm_wday >= _start_day && result.tm_wday <= _end_day))
276 && now.
in_range(today + _start, today + _end))
279 else if ( ((_start_day > _end_day && (result.tm_wday < _start_day && result.tm_wday > _end_day))
280 || (_start_day < _end_day && result.tm_wday >= _end_day))
281 && now > today +
_end)
307 bool always_seqnum_assign=
false,
bool silent_disconnect=
false,
bool no_chksum_flag=
false,
308 bool permissive_mode_flag=
false,
bool reliable=
false,
bool enforce_compids=
true,
372 _sch(sch), _reject_reason(reject_reason), _reject_text(reject_text)
536 #if (FIX8_THREAD_SYSTEM == FIX8_THREAD_PTHREAD) && !defined _MSC_VER && defined _GNU_SOURCE && defined __linux__
588 if (connection == _connection)
589 _connection =
nullptr;
605 : _begin(begin), _end(end), _interrupted_seqnum(interrupted_seqnum), _last(), _no_more_records() {}
609 os <<
"end:" << what.
_end <<
" last:" << what.
_last <<
" interrupted seqnum:"
629 F8API virtual bool send(
Message *msg,
bool destroy=
true,
const unsigned custom_seqnum=0,
const bool no_increment=
false);
638 F8API virtual bool send(
Message& msg,
const unsigned custom_seqnum=0,
const bool no_increment=
false);
645 F8API virtual size_t send_batch(
const std::vector<Message *>& msgs,
bool destroy=
true);
687 {
return _logger ? _logger->
enqueue(what, lev, fl, value) :
false; }
695 bool log(
const std::string& what,
Logger::Level lev,
const char *fl=
nullptr,
unsigned value=0)
const
696 {
return _logger ? _logger->
send(what, lev, fl, value) :
false; }
704 {
return _plogger ? _plogger->
send(what, lev,
nullptr, direction) :
false; }
830 if (old_state != new_state)
855 static const f8String unknown(
"Unknown");
856 return state < _state_names.size() ? _state_names[state] : unknown;
866 #define ssout_info(x) if (!x->is_loggable(FIX8::Logger::Info)); \
867 else FIX8::log_stream(FIX8::logger_function(std::bind(&FIX8::Session::enqueue, x, std::placeholders::_1, FIX8::Logger::Info, FILE_LINE, std::placeholders::_2)))
868 #define ssout_warn(x) if (!x->is_loggable(FIX8::Logger::Warn)); \
869 else FIX8::log_stream(FIX8::logger_function(std::bind(&FIX8::Session::enqueue, x, std::placeholders::_1, FIX8::Logger::Warn, FILE_LINE, std::placeholders::_2)))
870 #define ssout_error(x) if (!x->is_loggable(FIX8::Logger::Error)); \
871 else FIX8::log_stream(FIX8::logger_function(std::bind(&FIX8::Session::enqueue, x, std::placeholders::_1, FIX8::Logger::Error, FILE_LINE, std::placeholders::_2)))
872 #define ssout_fatal(x) if (!x->is_loggable(FIX8::Logger::Fatal)); \
873 else FIX8::log_stream(FIX8::logger_function(std::bind(&FIX8::Session::enqueue, x, std::placeholders::_1, FIX8::Logger::Fatal, FILE_LINE, std::placeholders::_2)))
874 #if defined FIX8_DEBUG
875 #define ssout_debug(x) if (!x->is_loggable(Logger::Debug)); \
876 else FIX8::log_stream(FIX8::logger_function(std::bind(&FIX8::Session::enqueue, x, std::placeholders::_1, FIX8::Logger::Debug, FILE_LINE, std::placeholders::_2)))
878 #define ssout_debug(x) true ? null_insert() : null_insert()
881 #define ssout(x) ssout_info(x)
882 #define slout ssout(this)
883 #define slout_info ssout_info(this)
884 #define slout_warn ssout_warn(this)
885 #define slout_error ssout_error(this)
886 #define slout_fatal ssout_fatal(this)
887 #define slout_debug ssout_debug(this)
893 #endif // FIX8_SESSION_HPP_
unsigned _req_next_send_seq
Connection * get_connection()
F8API void update_persist_seqnums()
Force persister to sync next send/receive seqnums.
target_comp_id _targetCompID
virtual bool activation_check(const unsigned seqnum, const Message *msg)
bool plog(const std::string &what, Logger::Level lev, const unsigned direction=0) const
f8_thread delegated async logging class
friend std::ostream & operator<<(std::ostream &os, const SessionID &what)
F8API bool sequence_check(const unsigned seqnum, const Message *msg)
bool send(const std::string &what, Level lev=Logger::Info, const char *fl=nullptr, const unsigned val=0)
POSIX regex wrapper class.
const unsigned _interrupted_seqnum
F8API void compid_check(const unsigned seqnum, const Message *msg, const SessionID &id) const
Fix8 Base Session. User sessions are derived from this class.
virtual F8API bool handle_logon(const unsigned seqnum, const Message *msg)
void set_session_config(struct SessionConfig *sf)
SessionID(const f8String &from)
unsigned _login_retry_interval
virtual void state_change(const States::SessionStates before, const States::SessionStates after)
std::pair< const unsigned, const f8String > SequencePair
friend std::ostream & operator<<(std::ostream &os, const Session_Schedule &what)
TimerEvent< Session > _hb_processor
SessionID(const begin_string &beginString, const sender_comp_id &senderCompID, const target_comp_id &targetCompID)
Message instantiation table entry.
Base (ABC) Persister class.
void set_persister(Persister *pst)
default_appl_ver_id _davi
std::tuple< f8String, Poco::Net::IPAddress > Client
Class to hold client info settings for server sessions.
F8API SessionID make_reverse_id() const
States::SessionStates get_session_state() const
friend std::ostream & operator<<(std::ostream &os, const Schedule &what)
Provides context to your retrans handler.
virtual F8API bool process(const f8String &from)
virtual void modify_outbound(Message *msg)
const Tickval & get_last_received() const
bool same_sender_comp_id(const target_comp_id &targetCompID) const
Schedule & operator=(const Schedule &that)
void set_reset_sequence_numbers_flag(bool flag)
Field< f8String, Common_MsgType > msg_type
f8_atomic< States::SessionStates > _state
bool same_side_target_comp_id(const target_comp_id &targetCompID) const
LoginParameters & operator=(const LoginParameters &that)
const SessionID & get_sid() const
TimerEvent< Session > _session_scheduler
void do_state_change(const States::SessionStates new_state)
const sender_comp_id & get_senderCompID() const
const LoginParameters & get_login_parameters() const
std::function< Message *(bool)> _do
Session_Schedule * _schedule
F8API bool activation_service()
Session start/stop service thread method.
Quickfix style sessionid.
virtual bool handle_admin(const unsigned seqnum, const Message *msg)
const F8MetaCntx & get_ctx() const
virtual F8API bool handle_resend_request(const unsigned seqnum, const Message *msg)
Complete Fix connection (reader and writer).
bool test(bool prev=false) const
std::unordered_map< f8String, Client > Clients
static F8API const f8String copyright_string()
std::string _batchmsgs_buffer
LoginParameters _loginParameters
virtual F8API Message * generate_logout(const char *msgstr=nullptr)
friend std::ostream & operator<<(std::ostream &os, const RetransmissionContext &what)
virtual F8API bool handle_sequence_reset(const unsigned seqnum, const Message *msg)
unsigned get_next_send_seq() const
virtual F8API bool handle_logout(const unsigned seqnum, const Message *msg)
Schedule(Tickval start, Tickval end, Tickval duration=Tickval(), int utc_offset=0, int start_day=-1, int end_day=-1)
static F8API const std::vector< f8String > _state_names
string representation of Sessionstates
f8_atomic< unsigned > _next_receive_seq
virtual F8API bool send(Message *msg, bool destroy=true, const unsigned custom_seqnum=0, const bool no_increment=false)
f8_atomic< bool > _active
virtual bool handle_reject(const unsigned seqnum, const Message *msg)
bool _always_seqnum_assign
void clear_connection(const Connection *connection)
bool same_target_comp_id(const sender_comp_id &senderCompID) const
bool operator!=(const SessionID &that)
static const Message * detach(const Message *&msg)
const F8MetaCntx & ctx()
Compiler generated metadata object, accessed through this function.
LoginParameters(unsigned login_retry_interval, unsigned login_retries, const default_appl_ver_id &davi, unsigned connect_timeout, bool reset_seqnum=false, bool always_seqnum_assign=false, bool silent_disconnect=false, bool no_chksum_flag=false, bool permissive_mode_flag=false, bool reliable=false, bool enforce_compids=true, unsigned recv_buf_sz=0, unsigned send_buf_sz=0, unsigned hb_int=defaults::hb_interval, const Schedule &sch=Schedule(), const Clients &clients=Clients(), const f8String &pem_path=f8String())
void set_login_parameters(const LoginParameters &loginParamaters)
LoginParameters()=default
unsigned _connect_timeout
LoginParameters(const LoginParameters &from)
F8API void stop()
stop the session.
Field< f8String, Common_SenderCompID > sender_comp_id
virtual F8API Message * generate_logon(const unsigned heartbeat_interval, const f8String davi=f8String())
bool enqueue(const std::string &what, Logger::Level lev, const char *fl=nullptr, unsigned value=0) const
struct SessionConfig * _sf
virtual bool handle_application(const unsigned seqnum, const Message *&msg)=0
sender_comp_id _senderCompID
virtual F8API int modify_header(MessageBase *msg)
SessionID(const f8String &beginString, const f8String &senderCompID, const f8String &targetCompID)
integral_type has(const T sbit) const
void set_scheduler(int priority)
const Tickval & get_last_sent() const
virtual F8API bool handle_test_request(const unsigned seqnum, const Message *msg)
decltype(f8_time_point::min().time_since_epoch().count()) ticks
const target_comp_id & get_targetCompID() const
virtual F8API ~Session()
Dtor.
bool log(const std::string &what, Logger::Level lev, const char *fl=nullptr, unsigned value=0) const
unsigned _req_next_receive_seq
virtual F8API bool handle_heartbeat(const unsigned seqnum, const Message *msg)
A complete Fix message with header, body and trailer.
std::atomic< T > f8_atomic
generic spin_lock wrapper
F8API bool send_process(Message *msg)
F8API Session(const F8MetaCntx &ctx, const SessionID &sid, Persister *persist=nullptr, Logger *logger=nullptr, Logger *plogger=nullptr)
Timer event object to provide callback context with Timer.
bool enqueue(const std::string &what, Level lev=Logger::Info, const char *fl=nullptr, const unsigned val=0)
virtual F8API Message * generate_reject(const unsigned seqnum, const char *what, const char *msgtype=nullptr)
Timer< Session > & get_timer()
Schedule(const Schedule &from)
void update_sent()
Update the last sent time.
Message * create_msg(const f8String &msg_type) const
bool same_side_sender_comp_id(const sender_comp_id &senderCompID) const
virtual bool authenticate(SessionID &id, const Message *msg)
Session_Schedule(Schedule &sch, int reject_reason=0, const std::string &reject_text="Business messages are not accepted now.")
F8API bool heartbeat_service()
Heartbeat generation service thread method.
static const f8String & get_session_state_string(const States::SessionStates state)
const f8String & get_id() const
const std::string _reject_text
begin_string _beginString
bool operator==(const SessionID &that)
virtual F8API Message * generate_business_reject(const unsigned seqnum, const Message *msg, const int reason, const char *what)
bool _reset_sequence_numbers
virtual F8API Message * generate_test_request(const f8String &testReqID)
bool is_loggable(Logger::Level level) const
void set_affinity(int core_id)
bool in_range(const Tickval &a, const Tickval &b) const
F8API bool enforce(const unsigned seqnum, const Message *msg)
F8API void from_string(const f8String &from)
Create a sessionid string.
RetransmissionContext(const unsigned begin, const unsigned end, const unsigned interrupted_seqnum)
Tickval & adjust(ticks by)
std::thread::id thread_id_t
virtual F8API Message * generate_heartbeat(const f8String &testReqID)
bool _permissive_mode_flag
bool is_loggable(Level level) const
const begin_string & get_beginString() const
static bool is_live(SessionStates ss)
virtual ~SessionID()
Dtor.
virtual F8API size_t send_batch(const std::vector< Message * > &msgs, bool destroy=true)
Base class for all fix messages.
virtual F8API Message * generate_resend_request(const unsigned begin, const unsigned end=0)
const Val * find_ptr(const Key &key) const
f8_atomic< unsigned > _next_send_seq
virtual F8API Message * generate_sequence_reset(const unsigned newseqnum, const bool gapfillflag=false)
F8API int start(Connection *connection, bool wait=true, const unsigned send_seqnum=0, const unsigned recv_seqnum=0, const f8String davi=f8String())
virtual F8API bool handle_outbound_reject(const unsigned seqnum, const Message *msg, const char *errstr)
void update_received()
Update the last received time.
virtual F8API void recover_seqnums()
Recover next expected and next to send sequence numbers from persitence layer.
void atomic_init(States::SessionStates st)
static bool is_established(SessionStates ss)
SessionID(const SessionID &from)
void get_login_parameters(LoginParameters &loginParamaters) const
virtual F8API bool retrans_callback(const SequencePair &with, RetransmissionContext &rctx)