Skip to content

Commit 297a959

Browse files
committed
Add WFRedisSubscriber.
1 parent bfb57e6 commit 297a959

5 files changed

Lines changed: 398 additions & 4 deletions

File tree

CMakeLists_Headers.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ set(INCLUDE_HEADERS
7979
src/server/WFDnsServer.h
8080
src/client/WFMySQLConnection.h
8181
src/client/WFDnsClient.h
82+
src/client/WFRedisSubscriber.h
8283
src/client/WFHttpChunkedClient.h
8384
src/manager/DnsCache.h
8485
src/manager/WFGlobal.h

src/client/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ set(SRC
55
WFMySQLConnection.cc
66
WFDnsClient.cc
77
WFHttpChunkedClient.cc
8+
WFRedisSubscriber.cc
89
)
910

1011
add_library(${PROJECT_NAME} OBJECT ${SRC})

src/client/WFHttpChunkedClient.h

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,26 +61,37 @@ class WFHttpChunkedTask : public WFGenericTask
6161
}
6262

6363
public:
64+
/* Timeout of waiting for the first package of each chunk. If not set,
65+
the max waiting time will be the global 'response_timeout'. */
6466
void set_watch_timeout(int timeout)
6567
{
6668
this->task->set_watch_timeout(timeout);
6769
}
6870

69-
void set_recv_timeout(int timeout)
71+
/* Timeout of receiving a complete chunk. */
72+
void set_receive_timeout(int timeout)
7073
{
7174
this->task->set_receive_timeout(timeout);
7275
}
7376

77+
/* Timeout of sending the HTTP request. */
7478
void set_send_timeout(int timeout)
75-
{
76-
this->task->set_send_timeout(timeout);
77-
}
79+
{
80+
this->task->set_send_timeout(timeout);
81+
}
7882

83+
/* Speicify HTTP keep alive timeout. */
7984
void set_keep_alive(int timeout)
8085
{
8186
this->task->set_keep_alive(timeout);
8287
}
8388

89+
/* Equal to 'set_receive_timeout()'. For compatibility purpose only. */
90+
void set_recv_timeout(int timeout)
91+
{
92+
this->set_receive_timeout(timeout);
93+
}
94+
8495
public:
8596
void set_ssl_ctx(SSL_CTX *ctx)
8697
{

src/client/WFRedisSubscriber.cc

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
Copyright (c) 2024 Sogou, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
16+
Author: Xie Han (xiehan@sogou-inc.com)
17+
*/
18+
19+
#include <errno.h>
20+
#include <string>
21+
#include <vector>
22+
#include <mutex>
23+
#include "URIParser.h"
24+
#include "RedisTaskImpl.inl"
25+
#include "WFRedisSubscriber.h"
26+
27+
int WFRedisSubscribeTask::sync_send(const std::string& command,
28+
const std::vector<std::string>& params)
29+
{
30+
std::string str("*" + std::to_string(1 + params.size()) + "\r\n");
31+
int ret;
32+
33+
str += "$" + std::to_string(command.size()) + "\r\n" + command + "\r\n";
34+
for (const std::string& p : params)
35+
str += "$" + std::to_string(p.size()) + "\r\n" + p + "\r\n";
36+
37+
this->mutex.lock();
38+
if (this->task)
39+
{
40+
ret = this->task->push(str.c_str(), str.size());
41+
if (ret == (int)str.size())
42+
ret = 0;
43+
else
44+
{
45+
if (ret >= 0)
46+
errno = ENOBUFS;
47+
ret = -1;
48+
}
49+
}
50+
else
51+
{
52+
errno = ENOENT;
53+
ret = -1;
54+
}
55+
56+
this->mutex.unlock();
57+
return ret;
58+
}
59+
60+
void WFRedisSubscribeTask::task_extract(WFRedisTask *task)
61+
{
62+
auto *t = (WFRedisSubscribeTask *)task->user_data;
63+
64+
if (t->extract)
65+
t->extract(t);
66+
}
67+
68+
void WFRedisSubscribeTask::task_callback(WFRedisTask *task)
69+
{
70+
auto *t = (WFRedisSubscribeTask *)task->user_data;
71+
72+
t->mutex.lock();
73+
t->task = NULL;
74+
t->mutex.unlock();
75+
76+
t->state = task->get_state();
77+
t->error = task->get_error();
78+
if (t->callback)
79+
t->callback(t);
80+
81+
t->release();
82+
}
83+
84+
int WFRedisSubscriber::init(const std::string& url, SSL_CTX *ssl_ctx)
85+
{
86+
if (URIParser::parse(url, this->uri) >= 0)
87+
{
88+
this->ssl_ctx = ssl_ctx;
89+
return 0;
90+
}
91+
92+
if (this->uri.state == URI_STATE_INVALID)
93+
errno = EINVAL;
94+
95+
return -1;
96+
}
97+
98+
WFRedisTask *
99+
WFRedisSubscriber::create_redis_task(const std::string& command,
100+
const std::vector<std::string>& params)
101+
{
102+
WFRedisTask *task = __WFRedisTaskFactory::create_subscribe_task(this->uri,
103+
WFRedisSubscribeTask::task_extract,
104+
WFRedisSubscribeTask::task_callback);
105+
this->set_ssl_ctx(task);
106+
task->get_req()->set_request(command, params);
107+
return task;
108+
}
109+
110+
WFRedisSubscribeTask *
111+
WFRedisSubscriber::create_subscribe_task(
112+
const std::vector<std::string>& channels,
113+
extract_t extract, callback_t callback)
114+
{
115+
WFRedisTask *task = this->create_redis_task("SUBSCRIBE", channels);
116+
return new WFRedisSubscribeTask(task, std::move(extract),
117+
std::move(callback));
118+
}
119+
120+
WFRedisSubscribeTask *
121+
WFRedisSubscriber::create_psubscribe_task(
122+
const std::vector<std::string>& patterns,
123+
extract_t extract, callback_t callback)
124+
{
125+
WFRedisTask *task = this->create_redis_task("PSUBSCRIBE", patterns);
126+
return new WFRedisSubscribeTask(task, std::move(extract),
127+
std::move(callback));
128+
}
129+

0 commit comments

Comments
 (0)