40 #if defined FIX8_HAVE_LIBHIREDIS
47 const f8String& key_base,
bool purge)
53 if (!(_cache = redisConnectWithTimeout(host.c_str(), port, timeout)) || _cache->err)
57 glout_error <<
"redis error connect: " << _cache->errstr <<
" for " << _key_base;
64 redisReply *reply(static_cast<redisReply*>(redisCommand(_cache,
"ZREMRANGEBYRANK %s 0 -1", _key_base.c_str())));
65 if (reply->type == REDIS_REPLY_ERROR)
67 glout_error <<
"redis error purge (ZREMRANGEBYRANK): " << reply->str <<
" for " << _key_base;
69 freeReplyObject(reply);
76 HiredisPersister::~HiredisPersister()
86 unsigned HiredisPersister::get_last_seqnum(
unsigned& sequence)
const
89 redisReply *reply(static_cast<redisReply*>(redisCommand(_cache,
"ZRANGE %s -1 -1 WITHSCORES", _key_base.c_str())));
90 if (reply->type == REDIS_REPLY_ERROR)
92 glout_error <<
"redis error ZRANGE: " << reply->str <<
" for " << _key_base;
94 else if (reply->type == REDIS_REPLY_ARRAY && reply->elements == 2)
95 result = fast_atoi<unsigned>((*(reply->element + 1))->str);
98 glout_error <<
"redis error ZRANGE: unexpected type: " << reply->type <<
" for " << _key_base;
101 freeReplyObject(reply);
102 return sequence = result;
106 unsigned HiredisPersister::get(
const unsigned from,
const unsigned to,
Session& session,
109 unsigned last_seq(0);
110 get_last_seqnum(last_seq);
111 unsigned recs_sent(0), startSeqNum(from);
112 const unsigned finish(to == 0 ? last_seq : to);
115 if (!startSeqNum || from > finish)
123 redisReply *reply(static_cast<redisReply*>(redisCommand(_cache,
"ZRANGEBYSCORE %s %u %u WITHSCORES", _key_base.c_str(), startSeqNum, finish)));
124 if (reply->type == REDIS_REPLY_ERROR)
126 glout_error <<
"redis error ZRANGEBYSCORE: " << reply->str <<
" for " << _key_base;
128 else if (reply->type == REDIS_REPLY_ARRAY)
132 for (
unsigned ii(0); ii < reply->elements; ii += 2)
135 f8String((*(reply->element + ii))->str, (*(reply->element + ii))->len));
137 if (!(session.*callback)(txresult, rctx))
143 glout_error <<
"redis error ZRANGEBYSCORE: unexpected type: " << reply->type <<
" for " << _key_base;
146 freeReplyObject(reply);
155 bool HiredisPersister::put(
const unsigned sender_seqnum,
const unsigned target_seqnum)
161 redisReply *reply(static_cast<redisReply*>(redisCommand(_cache,
"ZREMRANGEBYSCORE %s 0 0", _key_base.c_str())));
162 freeReplyObject(reply);
163 reply =
static_cast<redisReply*
>(redisCommand(_cache,
"ZADD %s 0 %u:%u", _key_base.c_str(), sender_seqnum, target_seqnum));
164 if (reply->type == REDIS_REPLY_ERROR)
166 glout_error <<
"redis error ZADD: " << reply->str <<
" for " << _key_base;
170 freeReplyObject(reply);
175 bool HiredisPersister::put(
const unsigned seqnum,
const f8String& what)
177 if (!_cache || !seqnum)
181 redisReply *reply(static_cast<redisReply*>(redisCommand(_cache,
"ZADD %s %u %b", _key_base.c_str(),
182 seqnum, what.data(), what.size())));
183 if (reply->type == REDIS_REPLY_ERROR)
185 glout_error <<
"redis error ZADD: " << reply->str <<
" for " << _key_base;
189 freeReplyObject(reply);
200 redisReply *reply(static_cast<redisReply*>(redisCommand(_cache,
"SET %s%s %b", _key_base.c_str(),
201 key.c_str(), what.data(), what.size())));
202 if (reply->type == REDIS_REPLY_ERROR)
204 glout_error <<
"redis error SET: " << reply->str <<
" for " << _key_base <<
':' << key;
208 freeReplyObject(reply);
213 bool HiredisPersister::get(
unsigned& sender_seqnum,
unsigned& target_seqnum)
const
219 redisReply *reply(static_cast<redisReply*>(redisCommand(_cache,
"ZRANGEBYSCORE %s 0 0", _key_base.c_str())));
220 if (reply->type == REDIS_REPLY_ERROR)
222 glout_error <<
"redis error ZRANGEBYSCORE: " << reply->str <<
" for " << _key_base;
224 else if (reply->type == REDIS_REPLY_ARRAY)
226 if (reply->elements == 1)
228 istringstream istr((*reply->element)->str);
229 istr >> sender_seqnum;
231 istr >> target_seqnum;
237 glout_error <<
"redis error ZRANGEBYSCORE: unexpected type: " << reply->type <<
" for " << _key_base;
240 freeReplyObject(reply);
245 bool HiredisPersister::get(
const unsigned seqnum,
f8String& to)
const
247 if (!_cache || !seqnum)
251 redisReply *reply(static_cast<redisReply*>(redisCommand(_cache,
"ZRANGEBYSCORE %s %u %u", _key_base.c_str(), seqnum, seqnum)));
252 if (reply->type == REDIS_REPLY_ERROR)
254 glout_error <<
"redis error ZRANGEBYSCORE: " << reply->str <<
" for " << _key_base;
256 else if (reply->type == REDIS_REPLY_ARRAY && reply->elements == 1)
258 to.assign((*reply->element)->str, (*reply->element)->len);
263 glout_error <<
"redis error ZRANGEBYSCORE: unexpected type: " << reply->type <<
" for " << _key_base;
266 freeReplyObject(reply);
277 redisReply *reply(static_cast<redisReply*>(redisCommand(_cache,
"GET %s%s", _key_base.c_str(), key.c_str())));
278 if (reply->type == REDIS_REPLY_ERROR)
280 glout_error <<
"redis error GET: " << reply->str <<
" for " << _key_base <<
':' << key;
282 else if (reply->type == REDIS_REPLY_ARRAY && reply->elements == 1)
284 to.assign((*reply->element)->str, (*reply->element)->len);
289 glout_error <<
"redis error GET: unexpected type: " << reply->type <<
" for " << _key_base <<
':' << key;
292 freeReplyObject(reply);
297 bool HiredisPersister::del(
const f8String& key)
303 redisReply *reply(static_cast<redisReply*>(redisCommand(_cache,
"DEL %s%s", _key_base.c_str(), key.c_str())));
304 if (reply->type == REDIS_REPLY_ERROR)
306 glout_error <<
"redis error DEL: " << reply->str <<
" for " << _key_base <<
':' << key;
308 else if (reply->type == REDIS_REPLY_ARRAY && reply->elements == 1)
314 glout_error <<
"redis error DEL: unexpected type: " << reply->type <<
" for " << _key_base <<
':' << key;
317 freeReplyObject(reply);
322 unsigned HiredisPersister::find_nearest_highest_seqnum (
const unsigned requested,
const unsigned last)
const
327 for (
unsigned startseqnum(requested); startseqnum <= last; ++startseqnum)
328 if (
get(startseqnum, target))
335 #endif // FIX8_HAVE_LIBHIREDIS
Fix8 Base Session. User sessions are derived from this class.
std::pair< const unsigned, const f8String > SequencePair
Provides context to your retrans handler.
unsigned get_next_send_seq() const