49 const string copyright_short {
"Copyright (c) 2010-" };
57 #if defined(_MSC_VER) && !defined(BUILD_F8API)
62 "none",
"continuous",
"session_terminated",
63 "wait_for_logon",
"not_logged_in",
"logon_sent",
"logon_received",
"logoff_sent",
64 "logoff_received",
"test_request_sent",
"sequence_reset_sent",
65 "sequence_reset_received",
"resend_request_sent",
"resend_request_received"
70 #pragma warning(disable: 4273)
76 ostr << _beginString <<
':' << _senderCompID <<
"->" << _targetCompID;
83 return SessionID(_beginString(), _targetCompID(), _senderCompID());
90 if (_sid.SearchString(match, from, 4) == 4)
93 _sid.SubExpr(match, from, bstr, 0, 1);
94 _beginString.set(bstr);
95 _sid.SubExpr(match, from, scid, 0, 2);
96 _senderCompID.set(scid);
97 _sid.SubExpr(match, from, tcid, 0, 3);
98 _targetCompID.set(tcid);
107 _ctx(ctx), _connection(), _req_next_send_seq(), _req_next_receive_seq(),
108 _sid(sid), _sf(), _persist(persist), _logger(logger), _plogger(plogger),
109 _timer(*this, 10), _hb_processor(&
Session::heartbeat_service, true),
110 _session_scheduler(&
Session::activation_service, true), _schedule()
134 _ctx(ctx), _sci(sci), _connection(), _req_next_send_seq(), _req_next_receive_seq(),
135 _sf(), _persist(persist), _logger(logger), _plogger(plogger),
136 _timer(*this, 10), _hb_processor(&
Session::heartbeat_service, true),
137 _session_scheduler(&
Session::activation_service, true), _schedule()
168 const unsigned recv_seqnum,
const f8String davi)
280 bool remote_logged_out {};
285 const f8String::size_type fpos(from.find(
"34="));
286 if (fpos == f8String::npos)
288 slout_debug <<
"Session::process throwing for " << from;
294 bool retry_plog(
false);
305 glout_fatal <<
"Fatal: factory failed to generate a valid message";
310 cout << *msg << endl;
312 cout << *msg << endl;
316 goto application_call;
341 remote_logged_out =
true;
354 if (remote_logged_out)
361 return result && admin_result;
365 cerr << e.
what() << endl;
369 slout_debug <<
"process: f8exception" <<
' ' << seqnum <<
' ' << e.
what();
400 catch (Poco::Net::NetException& e)
402 slout_debug <<
"process:: Poco::Net::NetException";
458 if (msg->
Header()->
get(ost) && ost() > st())
508 glout_warn <<
"Inbound TargetCompID not recognised (" << tci <<
"), expecting (" <<
_sci <<
')';
523 glout_error <<
"Remote (" << sci <<
") not found (" <<
id <<
"). NOT authorised to proceed.";
527 if (!iserr && get<1>(itr->second) != Poco::Net::IPAddress()
530 glout_error <<
"Remote (" << get<0>(itr->second) <<
", " << sci <<
") NOT authorised to proceed ("
542 glout_info <<
"Remote (" << get<0>(itr->second) <<
", " << sci <<
") authorised to proceed ("
551 glout_warn <<
"Warning: no session logger defined for " << id;
556 glout_warn <<
"Warning: no protocol logger defined for " << id;
564 glout_warn <<
"Warning: no persister defined for " << id;
575 glout_error <<
"Error: SessionConfig object missing in session";
603 slout_info <<
"Client setting heartbeat interval to " << hbi();
615 slout_error <<
id <<
" Session unavailable. Login not accepted.";
672 if (!msg->
get(begin))
674 slout_warn <<
"handle_resend_request: can't obtain BeginSeqNo from request";
678 slout_warn <<
"handle_resend_request: can't obtain EndSeqNo from request";
681 if ((begin() > end() && end()) || begin() == 0)
685 const int nxt(static_cast<int>(
_next_send_seq)), nseq(begin() >= nxt ? begin() + 1 : nxt);
688 slout_debug <<
"handle_resend_request scenario #" << (nseq == nxt ? 7 : 8);
700 slout_warn <<
"resend request received during an existing resend sequence";
749 if (rctx.
_last + 1 < with.first)
752 slout_debug <<
"retrans_callback scenario #2, " << rctx;
757 if (with.first > rctx.
_begin)
760 slout_debug <<
"retrans_callback scenario #3, " << rctx;
764 rctx.
_last = with.first;
800 slout_info <<
"Session activation transitioned to " << (
_active ?
"active" :
"inactive");
829 ostr <<
"Remote has ignored my test request. Aborting session...";
837 catch (Poco::Net::NetException& e)
850 ostr <<
"Have not received anything from remote for ";
855 ostr <<
" secs. Sending test request";
881 if (!testReqID.empty())
893 *msg <<
new text(what);
918 *msg <<
new text(what);
951 *msg <<
new text(msgstr);
1053 if (fields_modified)
1055 slout_debug <<
"send_process: " << fields_modified <<
" header fields added/modified";
1063 size_t enclen(msg->encode(&ptr));
1064 const char *optr(ptr);
1065 if (msg->get_end_of_batch())
1075 slout_error <<
"Message write failed: " << enclen <<
" bytes";
1097 if (!msg->is_admin())
1114 catch (Poco::Exception& e)
1129 unsigned send_seqnum, receive_seqnum;
1132 slout_info <<
"Last sent: " << send_seqnum <<
", last received: " << receive_seqnum;
1140 #if (FIX8_THREAD_SYSTEM == FIX8_THREAD_PTHREAD) && !defined _MSC_VER && defined _GNU_SOURCE && defined __linux__
1145 sched_param param {};
1146 if (!pthread_getschedparam(
id, &policy, ¶m))
1147 return policy == SCHED_OTHER ?
"SCHED_OTHER" : policy == SCHED_RR ?
"SCHED_RR"
1148 : policy == SCHED_FIFO ?
"SCHED_FIFO" :
"UNKNOWN";
1150 ostr <<
"Could not get scheduler parameters: " <<
Str_error(errno);
1157 pthread_t thread(pthread_self());
1158 sched_param param { priority };
1160 slout_info <<
"Current scheduler policy: " << get_thread_policy_string(thread);
1162 if (pthread_setschedparam(thread, SCHED_RR, ¶m))
1164 slout_error <<
"Could not set new scheduler priority: " << get_thread_policy_string(thread)
1165 <<
" (" <<
Str_error(errno) <<
") " << priority;
1169 slout_info <<
"New scheduler policy: " << get_thread_policy_string(thread);
1175 const int num_cores(sysconf(_SC_NPROCESSORS_ONLN));
1176 if (core_id >= num_cores)
1184 CPU_SET(core_id, &cpuset);
1185 const int error(pthread_setaffinity_np(pthread_self(),
sizeof(cpu_set_t), &cpuset));
1188 slout_error <<
"Could not set thread affinity for core " << core_id <<
" (" <<
Str_error(errno) <<
')';
1190 slout_info <<
"Set thread affinity to " << core_id <<
" core for thread " << pthread_self();
1209 time_t now(time(0));
1211 struct tm *ptim(localtime (&now));
1214 localtime_r(&now, &tim);
1215 struct tm *ptim(&tim);
1218 ostr << endl << package_version <<
' ' << copyright_short << setw(2) << (ptim->tm_year - 100) << copyright_short2;
1224 #ifdef FIX8_HAVE_OPENSSL
1225 void Fix8CertificateHandler::onInvalidCertificate(
const void*, Poco::Net::VerificationErrorArgs& errorCert)
1227 const Poco::Net::X509Certificate& cert(errorCert.certificate());
1228 glout_warn <<
"WARNING: Certificate verification failed";
1229 glout_warn <<
"----------------------------------------";
1230 glout_warn <<
"Issuer Name: " << cert.issuerName();
1231 glout_warn <<
"Subject Name: " << cert.subjectName();
1232 glout_warn <<
"The certificate yielded the error: " << errorCert.errorMessage();
1233 glout_warn <<
"The error occurred in the certificate chain at position " << errorCert.errorDepth();
1234 errorCert.setIgnoreError(
true);
1237 void Fix8PassPhraseHandler::onPrivateKeyRequested(
const void*, std::string& privateKey)
1239 glout_warn <<
"warning: privatekey passphrase requested and ignored!";
1242 #endif // FIX8_HAVE_OPENSSL
1244 #if defined(_MSC_VER)
1245 #pragma warning(pop)
void stop()
Stop the logging thread.
const char Common_MsgByte_SEQUENCE_RESET('4')
unsigned _req_next_send_seq
A message was received with an invalid sending time.
unsigned get_hb_interval() const
F8API void update_persist_seqnums()
Force persister to sync next send/receive seqnums.
bool have(const unsigned short fnum) const
Field< int, Common_EncryptMethod > encrypt_method
const char Common_MsgByte_REJECT('3')
void set_hb_interval(const unsigned hb_interval)
const f8String Common_MsgType_BUSINESS_REJECT("j")
virtual bool activation_check(const unsigned seqnum, const Message *msg)
bool force_logoff() const
bool plog(const std::string &what, Logger::Level lev, const unsigned direction=0) const
f8_thread delegated async logging class
F8API bool sequence_check(const unsigned seqnum, const Message *msg)
Field< UTCTimestamp, Common_SendingTime > sending_time
F8API std::string Str_error(const int err, const char *str=0)
POSIX regex wrapper class.
const unsigned _interrupted_seqnum
const f8String Common_MsgType_LOGON("A")
F8API void compid_check(const unsigned seqnum, const Message *msg, const SessionID &id) const
Fix8 Base Session. User sessions are derived from this class.
virtual F8API bool handle_logon(const unsigned seqnum, const Message *msg)
size_t write_batch(const std::vector< Message * > &msgs, bool destroy)
const f8String Common_MsgType_SEQUENCE_RESET("4")
Field< Boolean, Common_GapFillFlag > gap_fill_flag
std::pair< const unsigned, const f8String > SequencePair
TimerEvent< Session > _hb_processor
F8API BaseField * remove(const unsigned short fnum, Presence::const_iterator itr)
const char Common_MsgByte_LOGON('A')
const unsigned short Common_TargetCompID(56)
Base (ABC) Persister class.
F8API void stop()
Stop the reader and writer threads.
F8API SessionID make_reverse_id() const
Provides context to your retrans handler.
virtual F8API bool process(const f8String &from)
virtual void modify_outbound(Message *msg)
Field< SeqNum, Common_MsgSeqNum > msg_seq_num
Field< Boolean, Common_PossDupFlag > poss_dup_flag
ProcessModel get_pmodel() const
f8_atomic< States::SessionStates > _state
#define FIX8_PACKAGE_VERSION
virtual void set_custom_seqnum(unsigned seqnum)
const size_t HEADER_CALC_OFFSET(32)
TimerEvent< Session > _session_scheduler
void do_state_change(const States::SessionStates new_state)
const sender_comp_id & get_senderCompID() const
bool has_flag(const Flags flg) const
Field< SeqNum, Common_EndSeqNo > end_seq_num
Session_Schedule * _schedule
F8API bool activation_service()
Session start/stop service thread method.
const f8String Common_MsgType_TEST_REQUEST("1")
const f8String Common_MsgType_HEARTBEAT("0")
const f8String Common_MsgType_LOGOUT("5")
Quickfix style sessionid.
virtual bool handle_admin(const unsigned seqnum, const Message *msg)
virtual bool is_admin() const
virtual F8API bool handle_resend_request(const unsigned seqnum, const Message *msg)
Complete Fix connection (reader and writer).
bool test(bool prev=false) const
static F8API const f8String copyright_string()
std::string _batchmsgs_buffer
LoginParameters _loginParameters
Field< UTCTimestamp, Common_OrigSendingTime > orig_sending_time
virtual F8API Message * generate_logout(const char *msgstr=nullptr)
#define FIX8_PACKAGE_NAME
const unsigned short Common_SenderCompID(49)
virtual F8API bool handle_sequence_reset(const unsigned seqnum, const Message *msg)
const f8String Common_MsgType_REJECT("3")
F8API Persister * create_persister(const XmlElement *from, const SessionID *sid=nullptr, bool flag=false) const
const char Common_MsgByte_RESEND_REQUEST('2')
virtual F8API bool handle_logout(const unsigned seqnum, const Message *msg)
#define FIX8_PACKAGE_BUGREPORT
virtual bool get(const unsigned seqnum, f8String &to) const =0
static F8API const std::vector< f8String > _state_names
string representation of Sessionstates
f8_atomic< unsigned > _next_receive_seq
virtual F8API bool send(Message *msg, bool destroy=true, const unsigned custom_seqnum=0, const bool no_increment=false)
f8_atomic< bool > _active
virtual bool handle_reject(const unsigned seqnum, const Message *msg)
bool _always_seqnum_assign
bool is_connected() const
const F8MetaCntx & ctx()
Compiler generated metadata object, accessed through this function.
const f8String & get_msgtype() const
Field< f8String, Common_TestReqID > test_request_id
MessageBase * Header() const
virtual bool put(const unsigned seqnum, const f8String &what)=0
const unsigned short Common_PossDupFlag(43)
A message was received with an out of sequence sequence number.
F8API void stop()
stop the session.
int hypersleep< h_milliseconds >(unsigned amt)
Field< f8String, Common_SenderCompID > sender_comp_id
virtual F8API Message * generate_logon(const unsigned heartbeat_interval, const f8String davi=f8String())
struct SessionConfig * _sf
F8API void start()
Start the reader and writer threads.
virtual bool handle_application(const unsigned seqnum, const Message *&msg)=0
#define FIX8_MAX_MSG_LENGTH
A message was received with an invalid sender/target compid id.
virtual F8API int modify_header(MessageBase *msg)
F8API void purge_thread_codes()
Remove dead threads from the thread code cache.
Field< f8String, Common_DefaultApplVerID > default_appl_ver_id
void set_scheduler(int priority)
virtual void stop()
Stop the persister thread.
const unsigned short Common_MsgSeqNum(34)
const char Common_MsgByte_HEARTBEAT('0')
Field< Boolean, Common_ResetSeqNumFlag > reset_seqnum_flag
virtual F8API bool handle_test_request(const unsigned seqnum, const Message *msg)
const char Common_MsgByte_TEST_REQUEST('1')
const target_comp_id & get_targetCompID() const
virtual F8API ~Session()
Dtor.
bool log(const std::string &what, Logger::Level lev, const char *fl=nullptr, unsigned value=0) const
unsigned _req_next_receive_seq
virtual F8API bool handle_heartbeat(const unsigned seqnum, const Message *msg)
A complete Fix message with header, body and trailer.
static Message * factory(const F8MetaCntx &ctx, const char *from, bool no_chksum=false, bool permissive_mode=false)
F8API bool send_process(Message *msg)
F8API Session(const F8MetaCntx &ctx, const SessionID &sid, Persister *persist=nullptr, Logger *logger=nullptr, Logger *plogger=nullptr)
const f8String Common_MsgType_RESEND_REQUEST("2")
An invalid message was requested or decoded.
virtual F8API Message * generate_reject(const unsigned seqnum, const char *what, const char *msgtype=nullptr)
Message * create_msg(const f8String &msg_type) const
virtual bool authenticate(SessionID &id, const Message *msg)
int hypersleep< h_seconds >(unsigned amt)
Poco::Net::SocketAddress get_peer_socket_address() const
F8API bool heartbeat_service()
Heartbeat generation service thread method.
F8API Session_Schedule * create_session_schedule(const XmlElement *from) const
virtual F8API Message * generate_business_reject(const unsigned seqnum, const Message *msg, const int reason, const char *what)
bool _reset_sequence_numbers
virtual F8API Message * generate_test_request(const f8String &testReqID)
const unsigned char default_field_separator(0x1)
default FIX field separator (^A)
Field< int, Common_HeartBtInt > heartbeat_interval
void set_affinity(int core_id)
unsigned get_custom_seqnum() const
F8API bool enforce(const unsigned seqnum, const Message *msg)
Field< f8String, Common_Text > text
bool get_ignore_logon_sequence_check_flag(const XmlElement *from, const bool def=false) const
unsigned get_hb_interval20pc() const
F8API void from_string(const f8String &from)
Create a sessionid string.
A class to contain regex matches using RegExp.
int send(const char *from, size_t sz)
Field< int, Common_BusinessRejectReason > business_reject_reason
std::thread::id thread_id_t
Field< SeqNum, Common_BeginSeqNo > begin_seq_num
virtual F8API Message * generate_heartbeat(const f8String &testReqID)
Field< SeqNum, Common_RefSeqNum > ref_seq_num
bool _permissive_mode_flag
bool is_legal(unsigned short fnum) const
virtual F8API size_t send_batch(const std::vector< Message * > &msgs, bool destroy=true)
Field< SeqNum, Common_NewSeqNo > new_seq_num
const char * what() const
Base class for all fix messages.
virtual F8API Message * generate_resend_request(const unsigned begin, const unsigned end=0)
const unsigned short Common_ResetSeqNumFlag(141)
F8API Logger * create_logger(const XmlElement *from, const Logtype ltype, const SessionID *sid=nullptr) const
f8_atomic< unsigned > _next_send_seq
virtual F8API Message * generate_sequence_reset(const unsigned newseqnum, const bool gapfillflag=false)
F8API int start(Connection *connection, bool wait=true, const unsigned send_seqnum=0, const unsigned recv_seqnum=0, const f8String davi=f8String())
virtual F8API bool handle_outbound_reject(const unsigned seqnum, const Message *msg, const char *errstr)
virtual F8API void recover_seqnums()
Recover next expected and next to send sequence numbers from persitence layer.
void atomic_init(States::SessionStates st)
static bool is_established(SessionStates ss)
Field< f8String, Common_RefMsgType > ref_msg_type
virtual void set_no_increment(bool flag=true)
virtual std::ostream & print(std::ostream &os) const =0
const char Common_MsgByte_LOGOUT('5')
Field< f8String, Common_TargetCompID > target_comp_id
virtual F8API bool retrans_callback(const SequencePair &with, RetransmissionContext &rctx)
virtual bool write(Message *from, bool destroy=true)
Could not open a logfile.