fix8  version 1.4.0
Open Source C++ FIX Framework
thread.hpp
Go to the documentation of this file.
1 //-----------------------------------------------------------------------------------------
2 /*
3 
4 Fix8 is released under the GNU LESSER GENERAL PUBLIC LICENSE Version 3.
5 
6 Fix8 Open Source FIX Engine.
7 Copyright (C) 2010-16 David L. Dight <fix@fix8.org>
8 
9 Fix8 is free software: you can redistribute it and / or modify it under the terms of the
10 GNU Lesser General Public License as published by the Free Software Foundation, either
11 version 3 of the License, or (at your option) any later version.
12 
13 Fix8 is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
14 even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15 
16 You should have received a copy of the GNU Lesser General Public License along with Fix8.
17 If not, see <http://www.gnu.org/licenses/>.
18 
19 BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY FOR THE PROGRAM, TO
20 THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE
21 COPYRIGHT HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY
22 KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS TO
24 THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE,
25 YOU ASSUME THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
26 
27 IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING WILL ANY COPYRIGHT
28 HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR REDISTRIBUTE THE PROGRAM AS PERMITTED
29 ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR
30 CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT
31 NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR
32 THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF SUCH
33 HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
34 
35 */
36 //-----------------------------------------------------------------------------------------
37 #ifndef FIX8_THREAD_HPP_
38 #define FIX8_THREAD_HPP_
39 
40 //----------------------------------------------------------------------------------------
41 #include <atomic>
42 #include <memory>
43 #if (FIX8_THREAD_SYSTEM == FIX8_THREAD_PTHREAD)
44 #include<pthread.h>
45 #include<signal.h>
46 #elif (FIX8_THREAD_SYSTEM == FIX8_THREAD_STDTHREAD)
47 #include<thread>
48 #include<mutex>
49 #endif
50 
51 //----------------------------------------------------------------------------------------
52 namespace FIX8
53 {
54 
55 template<typename T> using f8_atomic = std::atomic <T>;
56 
57 #if (FIX8_THREAD_SYSTEM == FIX8_THREAD_STDTHREAD)
58  using thread_id_t = std::thread::id;
59 #elif (FIX8_THREAD_SYSTEM == FIX8_THREAD_PTHREAD)
60  using thread_id_t = pthread_t;
61 #endif
62 
63 //----------------------------------------------------------------------------------------
66 {
67 #if (FIX8_THREAD_SYSTEM == FIX8_THREAD_PTHREAD)
68  pthread_attr_t _attr;
69  pthread_t _tid;
70 #elif (FIX8_THREAD_SYSTEM == FIX8_THREAD_STDTHREAD)
71  std::unique_ptr<std::thread> _thread;
72 #endif
73 
74 #if (FIX8_THREAD_SYSTEM == FIX8_THREAD_PTHREAD)
75  template<typename T>
76  static void *_run(void *what) { return reinterpret_cast<void *>((*static_cast<T *>(what))()); }
77 #else
78  template<typename T>
79  static void _run(void *what) { (*static_cast<T *>(what))(); }
80 #endif
81 
82 protected:
83  template<typename T>
84  int _start(void *sub)
85  {
86 #if (FIX8_THREAD_SYSTEM == FIX8_THREAD_PTHREAD)
87  return pthread_create(&_tid, &_attr, _run<T>, sub);
88 #elif (FIX8_THREAD_SYSTEM == FIX8_THREAD_STDTHREAD)
89  _thread.reset(new std::thread(_run<T>, sub));
90 #endif
91  return 0;
92  }
93 
94 public:
97 #if (FIX8_THREAD_SYSTEM == FIX8_THREAD_PTHREAD)
98  : _attr(), _tid()
99  {
100  if (pthread_attr_init(&_attr))
101  throw f8_threadException("pthread_attr_init failure");
102 #else
103  {
104 #endif
105  }
106 
108  virtual ~_f8_threadcore()
109  {
110  join();
111 #if (FIX8_THREAD_SYSTEM == FIX8_THREAD_PTHREAD)
112  pthread_attr_destroy(&_attr);
113 #endif
114  }
115 
118  virtual int start() = 0; // ABC
119 
122  virtual void request_stop() = 0;
123 
126  virtual int join(int timeoutInMs = 0)
127  {
128 #if (FIX8_THREAD_SYSTEM == FIX8_THREAD_PTHREAD)
129  return getid() != get_threadid() ? pthread_join(_tid, nullptr) ? -1 : 0 : -1; // prevent self-join
130 #elif (FIX8_THREAD_SYSTEM == FIX8_THREAD_STDTHREAD)
131  if (_thread.get() && _thread->joinable() && getid() != get_threadid())
132  _thread->join();
133  return 0;
134 #endif
135  }
136 
139 #ifndef _MSC_VER
140 #ifdef __APPLE__
141  int yield() const { return sched_yield(); }
142 #else
143  int yield() const
144  {
145 #if (FIX8_THREAD_SYSTEM == FIX8_THREAD_PTHREAD)
146  return pthread_yield();
147 #elif (FIX8_THREAD_SYSTEM == FIX8_THREAD_STDTHREAD)
148  std::this_thread::yield();
149 #endif
150  return 0;
151  }
152 #endif
153 #endif
154 
158  {
159 #if (FIX8_THREAD_SYSTEM == FIX8_THREAD_PTHREAD)
160  return _tid;
161 #elif (FIX8_THREAD_SYSTEM == FIX8_THREAD_STDTHREAD)
162  return _thread.get() ? _thread->get_id() : std::thread::id();
163 #endif
164  }
165 
169  {
170 #if (FIX8_THREAD_SYSTEM == FIX8_THREAD_PTHREAD)
171  return pthread_self();
172 #elif (FIX8_THREAD_SYSTEM == FIX8_THREAD_STDTHREAD)
173  return std::this_thread::get_id();
174 #endif
175  }
176 
180  bool operator==(const _f8_threadcore& that) const
181  {
182 #if (FIX8_THREAD_SYSTEM == FIX8_THREAD_PTHREAD)
183  return pthread_equal(_tid, that._tid);
184 #else
185  return get_threadid() == that.get_threadid();
186 #endif
187  }
188 
192  bool operator!=(const _f8_threadcore& that) const
193  {
194 #if (FIX8_THREAD_SYSTEM == FIX8_THREAD_PTHREAD)
195  return !pthread_equal(_tid, that._tid);
196 #else
197  return get_threadid() != that.get_threadid();
198 #endif
199  }
200 
201  _f8_threadcore& operator=(const _f8_threadcore&) = delete;
202 };
203 
204 //----------------------------------------------------------------------------------------
207 {
208  f8_atomic<int> _stop_requested, _thread_state;
209 
210 public:
212  f8_thread_cancellation_token() { _stop_requested = 0; _thread_state = Unknown; }
213 
216  bool stop_requested() const { return _stop_requested == 1; }
217 
219  void request_stop() { _thread_state = Stopping; _stop_requested = 1; }
220 
223  operator bool() const { return stop_requested(); }
224 
227  bool operator!() const { return !stop_requested(); }
228 
230  enum ThreadState { Unknown, Running, Stopping, Stopped };
231 
234  int thread_state() const { return _thread_state; }
235 
238  void thread_state(ThreadState state) { _thread_state = state; }
239 };
240 
241 //----------------------------------------------------------------------------------------
243 
244 template<typename T>
245 class f8_thread : public _f8_threadcore
246 {
247  class _helper
248  {
249  T& _what;
250  int (T::*_method)();
252 
253  public:
254  _helper(T& what, int (T::*method)(), f8_thread_cancellation_token& (T::*cancellation_token_method)())
255  : _what(what), _method(method), _cancellation_token_method(cancellation_token_method) {}
257  {
258  try
259  {
260  cancellation_token().thread_state(f8_thread_cancellation_token::Running);
261  const int ret((_what.*_method)());
262  cancellation_token().thread_state(f8_thread_cancellation_token::Stopped);
263  return ret;
264  }
265  catch(const std::exception&)
266  {
267  cancellation_token().thread_state(f8_thread_cancellation_token::Stopped);
268  throw;
269  }
270  }
271  f8_thread_cancellation_token& cancellation_token() { return (_what.*_cancellation_token_method)(); }
272  }
273  _sub;
274 
275 public:
280  f8_thread(std::reference_wrapper<T> what, int (T::*method)()=&T::operator(),
281  f8_thread_cancellation_token& (T::*cancellation_token_method)()=&T::cancellation_token)
282  : _sub(what, method, cancellation_token_method) {}
283 
285  virtual ~f8_thread() {}
286 
289  int start() { return _start<_helper>(static_cast<void *>(&_sub)); }
290 
293  void request_stop() { _sub.cancellation_token().request_stop(); }
294 };
295 
296 //----------------------------------------------------------------------------------------
298 #if (FIX8_THREAD_SYSTEM == FIX8_THREAD_PTHREAD)
299 class f8_mutex
300 {
301  pthread_mutex_t _pmutex;
302 
303 public:
305  {
306  if (pthread_mutex_init(&_pmutex, 0))
307  throw f8Exception("pthread_mutex_init failed");
308  }
309 
310  ~f8_mutex() { pthread_mutex_destroy(&_pmutex); };
311 
312  void lock() { pthread_mutex_lock(&_pmutex); }
313  bool try_lock() { return pthread_mutex_trylock(&_pmutex) == 0; }
314  void unlock() { pthread_mutex_unlock(&_pmutex); }
315 };
316 #endif
317 //----------------------------------------------------------------------------------------
319 
320 #ifdef __APPLE__
321 // A simple spinlock that spins up to 100K times and then does a sched_yield to back-off.
322 // unlock() could just set _lock to false BUT that assumes that the spinlock was locked
323 // to begin with, which may not be the case. Therefore this implementation has the
324 // advantage of causing your thread to spin if you try to unlock something that is
325 // not locked, which I would think is a logic error that the caller should fix.
326 // The choice of 100K was arbitrary. The right way to set that parameter would be
327 // to keep track of how big x gets before thread starvation occurs and use that number.
328 // That number is going to be target- and use-case-dependent though.
329 class f8_spin_lock
330 {
331  bool _isLocked;
332 public:
333  f8_spin_lock() : _isLocked(false) {}
334  ~f8_spin_lock() {}
335 
336  void lock()
337  {
338  register int x = 0;
339  while(!__sync_bool_compare_and_swap(&_isLocked, false, true))
340  {
341  if(++x >= 100000)
342  {
343  x = 0;
344  sched_yield();
345  }
346  }
347  }
348  bool try_lock() { return _isLocked; }
349  void unlock()
350  {
351  register int x = 0;
352  while(!__sync_bool_compare_and_swap(&_isLocked, true, false))
353  {
354  if(++x >= 100000)
355  {
356  x = 0;
357  sched_yield();
358  }
359  }
360  }
361 };
362 #else
363 #if (FIX8_THREAD_SYSTEM == FIX8_THREAD_PTHREAD)
365 {
366  pthread_spinlock_t _psl;
367 
368 public:
370  {
371  if (pthread_spin_init(&_psl, PTHREAD_PROCESS_PRIVATE))
372  throw f8Exception("pthread_spin_init failed");
373  }
374 
375  ~f8_spin_lock() { pthread_spin_destroy(&_psl); };
376 
377  void lock() { pthread_spin_lock(&_psl); }
378  bool try_lock() { return pthread_spin_trylock(&_psl) == 0; }
379  void unlock() { pthread_spin_unlock(&_psl); }
380 };
381 #elif (FIX8_THREAD_SYSTEM == FIX8_THREAD_STDTHREAD)
382 using f8_mutex = std::mutex;
383 class f8_spin_lock
384 {
385  std::atomic_flag _sl
386 #ifndef _MSC_VER
387  = ATOMIC_FLAG_INIT
388 #endif
389  ;
390 
391 public:
392  f8_spin_lock()
393  {
394 #ifdef _MSC_VER
395  _sl.clear(std::memory_order_relaxed); // = ATOMIC_FLAG_INIT # does not compile under vs2013
396 #endif
397  }
398  ~f8_spin_lock() = default;
399 
400  void lock() { while (!try_lock()); }
401  bool try_lock() { return !_sl.test_and_set(std::memory_order_acquire); }
402  void unlock() { _sl.clear(std::memory_order_release); }
403 };
404 #endif
405 #endif //__APPLE__
406 
407 template<typename T>
409 {
410  T *_local_mutex = nullptr;
411  bool _disabled = false;
412 
413 public:
414  f8_scoped_lock_impl() = default;
415  f8_scoped_lock_impl(T& mutex) { acquire(mutex); }
416  f8_scoped_lock_impl(T& mutex, bool disable) : _disabled(disable)
417  {
418  if (!_disabled)
419  acquire(mutex);
420  }
421 
422  ~f8_scoped_lock_impl() { release(); }
423 
424  f8_scoped_lock_impl(const f8_scoped_lock_impl&) = delete;
426 
427  void acquire(T& mutex)
428  {
429  mutex.lock();
430  _local_mutex = &mutex;
431  }
432 
433  bool try_acquire(T& mutex)
434  {
435  bool result(mutex.try_lock());
436  if (result)
437  _local_mutex = &mutex;
438  return result;
439  }
440 
441  void release()
442  {
443  if (!_disabled && _local_mutex)
444  {
445  _local_mutex->unlock();
446  _local_mutex = nullptr;
447  }
448  }
449 };
450 
453 template<typename T> using dthread = f8_thread<T>;
456 
457 } // FIX8
458 
459 #endif // FIX8_THREAD_HPP_
bool try_lock()
Definition: thread.hpp:313
static thread_id_t getid()
Definition: thread.hpp:168
bool try_acquire(T &mutex)
Definition: thread.hpp:433
pthread_t _tid
Definition: thread.hpp:69
pthread_spinlock_t _psl
Definition: thread.hpp:366
virtual ~_f8_threadcore()
Dtor.
Definition: thread.hpp:108
virtual void request_stop()=0
generic pthread_mutex wrapper
Definition: thread.hpp:299
_f8_threadcore & operator=(const _f8_threadcore &)=delete
int yield() const
Definition: thread.hpp:143
ThreadState
Thread state enumerations.
Definition: thread.hpp:230
Thread wrapper. Ctor provides T instance and specifies ptr to member to call or defaults to operator(...
Definition: thread.hpp:245
virtual int start()=0
Thread cancellation token.
Definition: thread.hpp:206
void lock()
Definition: thread.hpp:312
virtual ~f8_thread()
Dtor.
Definition: thread.hpp:285
_helper(T &what, int(T::*method)(), f8_thread_cancellation_token &(T::*cancellation_token_method)())
Definition: thread.hpp:254
virtual int join(int timeoutInMs=0)
Definition: thread.hpp:126
Base exception class.
Definition: f8exception.hpp:49
f8_scoped_lock_impl(T &mutex)
Definition: thread.hpp:415
void thread_state(ThreadState state)
Definition: thread.hpp:238
f8_thread(std::reference_wrapper< T > what, int(T::*method)()=&T::operator(), f8_thread_cancellation_token &(T::*cancellation_token_method)()=&T::cancellation_token)
Definition: thread.hpp:280
A pthread attribute error occured.
void request_stop()
Definition: thread.hpp:293
pthread_mutex_t _pmutex
Definition: thread.hpp:301
static void * _run(void *what)
Definition: thread.hpp:76
std::atomic< T > f8_atomic
Definition: thread.hpp:55
generic spin_lock wrapper
Definition: thread.hpp:364
f8_atomic< int > _thread_state
Definition: thread.hpp:208
void unlock()
Definition: thread.hpp:314
pthread wrapper abstract base
Definition: thread.hpp:65
bool operator!=(const _f8_threadcore &that) const
Definition: thread.hpp:192
bool operator==(const _f8_threadcore &that) const
Definition: thread.hpp:180
pthread_attr_t _attr
Definition: thread.hpp:68
f8_thread_cancellation_token & cancellation_token()
Definition: thread.hpp:271
std::thread::id thread_id_t
Definition: thread.hpp:58
f8_scoped_lock_impl(T &mutex, bool disable)
Definition: thread.hpp:416
int _start(void *sub)
Definition: thread.hpp:84
void acquire(T &mutex)
Definition: thread.hpp:427
f8_thread_cancellation_token &(T::* _cancellation_token_method)()
Definition: thread.hpp:251
thread_id_t get_threadid() const
Definition: thread.hpp:157
void request_stop()
Tell the thread to stop.
Definition: thread.hpp:219