docs/tutorial-18-redis_subscriber.md
tutorial-18-redis_subscriber.cc
在Workflow中,一个客户端网络任务通常是向服务端发出一个请求并接收一个回复,而Redis订阅任务不同,它会先发出一个订阅请求,然后源源不断地接收服务端推送过来的消息,在这个过程中,客户端还可以新增或取消channels、patterns。
用于实现Redis订阅功能的任务是WFRedisSubscribeTask,与普通的Redis任务不同,它不从任务工厂产生,而是需要使用WFRedisSubscriber来创建。例如
WFRedisSubscriber suber;
if (suber.init(url) != 0)
{
std::cerr << "Subscriber init failed " << strerror(errno) << std::endl;
exit(1);
}
// ...
WFRedisSubscribeTask *task;
task = suber.create_subscribe_task(channels, extract, callback);
task->set_watch_timeout(1000000); // 1000秒
task->start();
// 这里可以使用task的相关接口改变订阅内容
// ...
task->release();
suber.deinit();
初始化WFRedisSubscriber需要使用Redis URL,这与普通Redis任务相同,不再赘述。创建订阅任务时,需要提供三个参数
这个例子中为watch_timeout设置了一个很长的时间,若这个时间较短,且服务端长时间未推送消息,则连接会因为超时而断开,订阅任务也会直接失败,请根据实际情况合理设置。
当任务处理完成后,需要通过task->release()来释放这个任务,这也是与其他任务的一个不同之处。
服务端推送的消息由创建任务时指定的extract函数处理。后续描述中,subscribe对应channel,psubscribe对应pattern。
更多详情可参阅redis文档。
处理消息的一个示例如下,简单地将内容打印到标准输出
void extract(WFRedisSubscribeTask *task)
{
auto *resp = task->get_resp();
protocol::RedisValue value;
resp->get_result(value);
if (value.is_array())
{
for (size_t i = 0; i < value.arr_size(); i++)
{
if (value[i].is_string())
std::cout << value[i].string_value();
else if (value[i].is_int())
std::cout << value[i].int_value();
else if (value[i].is_nil())
std::cout << "nil";
else
std::cout << "Unexpected value in array!";
std::cout << "\n";
}
}
else
std::cout << "Unexpected value!\n";
}
在任务过程中,可以通过下述接口新增或取消订阅,注意在带有channels或patterns参数的接口中,请勿传入空数组。
// ...
task->start();
// 新增订阅一组channels
task->subscribe(channels);
// 取消订阅一组channels
task->unsubscribe(channels);
// 取消订阅所有channels
task->unsubscribe();
// 新增订阅一组patterns
task->psubscribe(patterns);
// 取消订阅一组patterns
task->punsubscribe(patterns);
// 取消订阅所有patterns
task->punsubscribe();
task->release();
当所有channels和patterns都被取消订阅后,任务会直接结束,此后不能再新增订阅,请注意该细节。也可以直接通过task->quit()来主动结束任务。
此外,订阅模式下可以通过task->ping()或task->ping(message)向Redis服务器发起ping请求。当任务设置了较小的watch_timeout,但服务端可能长时间没有消息推送时,通过定时发出ping请求可以令服务端推送pong响应,此时任务便不会因为超时而失败。