Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
00c0da0
Change interfaces name and make it compatible.
Barenboim Sep 19, 2025
78fd766
Optimize http parser. (#1781)
Barenboim Sep 23, 2025
fc1927f
No copying start line when parsing HTTP header. (#1782)
Barenboim Sep 24, 2025
e2e5f5c
Reject control characters in HTTP header. (#1783)
Barenboim Sep 25, 2025
ea69408
Remove unused functions.
Barenboim Sep 27, 2025
50d5717
Simplify codes.
Barenboim Sep 29, 2025
649a65e
Support range-based 'for' loop for ParallelWork. (#1786)
Barenboim Oct 9, 2025
527d22f
Remove 'WIN32' from CMake files.
Barenboim Oct 11, 2025
c3e0d46
Remove support for OpenSSL 1.0.x and below. (#1777)
Barenboim Oct 23, 2025
7a9c71b
Update tutorial-14-consul_cli.cc
Barenboim Oct 24, 2025
0c965d4
Update README's description of OpenSSL requirement.
Barenboim Oct 24, 2025
65f044b
Remove 'setsockopt()' on an accepted sockfd. (#1789)
Barenboim Oct 24, 2025
5246a41
Update English README.md. (#1790)
holmes1412 Oct 24, 2025
d08f097
Fix 'fdatasync' task and change interfaces name. (#1792)
Barenboim Oct 29, 2025
cb04492
Update crc32c module for Kafka message. (#1793)
Barenboim Nov 3, 2025
c41812b
Support URL longer than 2GB bytes. (#1796)
Barenboim Nov 21, 2025
46b129f
Include errno.h in ExecRequest.h (#1797)
kedixa Nov 27, 2025
fa51c22
Disable getting connection after server task callback. (#1803)
Barenboim Jan 9, 2026
5cde50c
Fix URI parser warning.
Barenboim Jan 9, 2026
078f618
Use C++17 to be compatible with latest GTest 1.17 (#1805)
bkmgit Jan 26, 2026
fa48b35
0.11.11 -> 1.0.0
Barenboim Feb 9, 2026
c46d6b8
Fix WFConsultClient::init() bug when specifying an SSL_CTX.
Barenboim Apr 1, 2026
8dc6502
Fix UpstreamPolicie: use size_t when counting for cur_idx. (#1809)
holmes1412 Apr 1, 2026
935f6d3
Fix KafkaMessage boundary checks (#1810)
kedixa Apr 2, 2026
08b7258
Poller max open files limited to 1M. (#1811)
Barenboim Apr 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 6 additions & 11 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set(CMAKE_SKIP_RPATH TRUE)

project(
workflow
VERSION 0.11.11
VERSION 1.0.0
LANGUAGES C CXX
)

Expand Down Expand Up @@ -62,16 +62,11 @@ message("CMAKE_CXX_FLAGS_RELEASE is ${CMAKE_CXX_FLAGS_RELEASE}")
message("CMAKE_CXX_FLAGS_RELWITHDEBINFO is ${CMAKE_CXX_FLAGS_RELWITHDEBINFO}")
message("CMAKE_CXX_FLAGS_MINSIZEREL is ${CMAKE_CXX_FLAGS_MINSIZEREL}")

if (WIN32)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /MP /wd4200")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /MP /wd4200 /std:c++14")
else ()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -fPIC -pipe -std=gnu90")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -fPIC -pipe -std=c++11 -fno-exceptions -Wno-invalid-offsetof")
if (APPLE)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-deprecated-declarations")
endif()
endif ()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -fPIC -pipe -std=gnu90")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -fPIC -pipe -std=c++11 -fno-exceptions -Wno-invalid-offsetof")
if (APPLE)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-deprecated-declarations")
endif()

add_subdirectory(src)

Expand Down
43 changes: 21 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ int main()
* This project has built-in **service governance** and **load balancing** features.
* Wiki link : [PaaS Architecture](https://github.com/sogou/workflow/wiki)

#### Compiling and running environment
#### Compiling and Running Environment

* This project supports `Linux`, `macOS`, `Windows`, `Android` and other operating systems.
* `Windows` version is currently released as an independent [branch](https://github.com/sogou/workflow/tree/windows), using `iocp` to implement asynchronous networking. All user interfaces are consistent with the `Linux` version.
* Supports all CPU platforms, including 32 or 64-bit `x86` processors, big-endian or little-endian `arm` processors, `loongson` processors.
* Master branch requires SSL and `OpenSSL 1.1` or above is recommended. Fully compatible with BoringSSL. If you don't like SSL, you may checkout the [nossl](https://github.com/sogou/workflow/tree/nossl) branch.
* Master branch requires `OpenSSL 1.1` or above, and BoringSSL is fully compatible. If you don't like SSL, you may checkout the [nossl](https://github.com/sogou/workflow/tree/nossl) branch.
* Uses the `C++11` standard and therefore, it should be compiled with a compiler which supports `C++11`. Does not rely on `boost` or `asio`.
* No other dependencies. However, if you need `Kafka` protocol, some compression libraries should be installed, including `lz4`, `zstd` and `snappy`.

### Get started (Linux, macOS):
### Get Started (Linux, macOS):
~~~sh
git clone https://github.com/sogou/workflow
cd workflow
Expand Down Expand Up @@ -134,9 +134,9 @@ If you want to use xmake to build workflow, you can see [xmake build document](d
* [Asynchronous MySQL client:mysql\_cli](docs/en/tutorial-12-mysql_cli.md)
* [Asynchronous Kafka client: kafka\_cli](docs/en/tutorial-13-kafka_cli.md)

#### Programming paradigm
#### Programming Paradigm

We believe that a typical back-end program=protocol+algorithm+workflow and should be developed completely independently.
Program = Protocol + Algorithm + Workflow

* Protocol
* In most cases, users use built-in common network protocols, such as HTTP, Redis or various rpc.
Expand All @@ -151,35 +151,34 @@ We believe that a typical back-end program=protocol+algorithm+workflow and shoul
* The typical workflow is a closed series-parallel graph. Complex business logic may be a non-closed DAG.
* The workflow graph can be constructed directly or dynamically generated based on the results of each step. All tasks are executed asynchronously.

Basic task, task factory and complex task
Structured Concurrency and Task Abstraction

* Our system contains six basic tasks: networking, file IO, CPU, GPU, timer, and counter.
* All tasks are generated by the task factory and automatically recycled after callback.
* Server task is one kind of special networking task, generated by the framework which calls the task factory, and handed over to the user through the process function.
* In most cases, the task generated by the user through the task factory is a complex task, which is transparent to the user.
* Our system contains five basic tasks: communication, computation, file IO, timer, and counter.
* All tasks are generated by the task factory, and users organize the concurrency structure by calling interfaces, such as series, parallel, DAG, etc.
* In most cases, the tasks generated by the user through the task factory is a complex task which encapsulates multiple asynchronous processes, but it is transparent to the user.
* For example, an HTTP request may include many asynchronous processes (DNS, redirection), but for user, it is just a networking task.
* File sorting seems to be an algorithm, but it actually includes many complex interaction processes between file IO and CPU computation.
* If you think of business logic as building circuits with well-designed electronic components, then each electronic component may be a complex circuit.
* The task abstraction mechanism greatly reduces the number of tasks users need to create and the depth of callbacks.
* Any task runs in a **SeriesWork** and the tasks in the same SeriesWork shares the series context, which simplifies data transfer between asynchronous tasks.

Asynchrony and encapsulation based on `C++11 std::function`
Callback and Memory Reclamation Mechanism

* Not based on user mode coroutines. Users need to know that they are writing asynchronous programs.
* All calls are executed asynchronously, and there is almost no operation that occupies a thread.
* Although we also provide some facilities with semi-synchronous interfaces, they are not core features.
* Explicit callback mechanism. Users are aware that they are writing asynchronous programs.
* **A set of object lifecycle mechanisms greatly simplifies memory management for asynchronous programs.**
* The lifecycle of any task created by the framework is from creation until the callback function finishes running. There is no risk of leakage.
* If a task is created but the user does not want to run it, the user needs to release it through the `dismiss()` interface.
* Any data in the task, such as the response of the network request, will also be recycled with the task. At this time, the user can use `std::move()` to move the required data.
* The project doesn’t use `std::shared_ptr` to manage memory.

* We try to avoid user's derivations, and encapsulate user behavior with `std::function` instead, including:
* The callback of any task.
* Any server's process. This conforms to the `FaaS` (Function as a Service) idea.
* The realization of an algorithm is simply a `std::function`. But the algorithm can also be implemented by derivation.
* If used deeply, one will find that everything can be derived.

Memory reclamation mechanism

* Every task will be automatically reclaimed after the callback. If a task is created but a user does not want to run it, the user needs to release it through the dismiss method.
* Any data in the task, such as the response of the network request, will also be recycled with the task. At this time, the user can use `std::move()` to move the required data.
* SeriesWork and ParallelWork are two kinds of framework objects, which are also recycled after their callback.
* When a series is a branch of a parallel, it will be recycled after the callback of the parallel that it belongs to.
* This project doesn’t use `std::shared_ptr` to manage memory.

#### Any other questions?
#### Any Other Questions?

You may check the [FAQ](https://github.com/sogou/workflow/issues/406) and [issues](https://github.com/sogou/workflow/issues) list first to see if you can find the answer.

Expand Down
2 changes: 1 addition & 1 deletion README_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ int main()
* 项目支持``Linux``,``macOS``,``Windows``,``Android``等操作系统。
* ``Windows``版以[windows](https://github.com/sogou/workflow/tree/windows)分支发布,使用``iocp``实现异步网络。用户接口与``Linux``版一致。
* 支持所有CPU平台,包括32或64位``x86``处理器,大端或小端``arm``处理器,国产``loongson``龙芯处理器实测支持。
* 需要依赖于``OpenSSL``,推荐``OpenSSL 1.1``及以上版本
* 需要依赖于``OpenSSL 1.1``或以上版本,也兼容BoringSSL
* 不喜欢SSL的用户可以使用[nossl](https://github.com/sogou/workflow/tree/nossl)分支,代码更简洁。
* 项目使用了``C++11``标准,需要用支持``C++11``的编译器编译。但不依赖``boost``或``asio``。
* 项目无其它依赖。如需使用``kafka``协议,需自行安装``lz4``,``zstd``和``snappy``几个压缩库。
Expand Down
15 changes: 5 additions & 10 deletions benchmark/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,11 @@ include_directories(${OPENSSL_INCLUDE_DIR} ${WORKFLOW_INCLUDE_DIR})
link_directories(${WORKFLOW_LIB_DIR})
find_library(WORKFLOW_LIB NAMES libworkflow.a workflow HINTS ${WORKFLOW_LIB_DIR})

if (WIN32)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /MP /wd4200")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /MP /wd4200 /std:c++14")
else ()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -fPIC -pipe -std=gnu90")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -fPIC -pipe -std=c++11 -fno-exceptions -Wno-invalid-offsetof")
if (APPLE)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-deprecated-declarations")
endif()
endif ()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -fPIC -pipe -std=gnu90")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -fPIC -pipe -std=c++11 -fno-exceptions -Wno-invalid-offsetof")
if (APPLE)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-deprecated-declarations")
endif()

set(BENCHMARK_LIST
benchmark-01-http_server
Expand Down
4 changes: 2 additions & 2 deletions docs/en/tutorial-09-http_file_server.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public:
~~~

Both pread and pwrite return WFFileIOTask. We do not distinguish between sort and psort, and we do not distinguish between client and server task. They all follow the same principle.
In addition to these two interfaces, preadv and pwritev return WFFileVIOTask; fsync and fdsync return WFFileSyncTask. You can see the details in the header file.
In addition to these two interfaces, preadv and pwritev return WFFileVIOTask; fsync and fdatasync return WFFileSyncTask. You can see the details in the header file.
The example uses the user\_data field of the task to save the global data of the service. For larger services, we recommend to use series context. You can see the [proxy examples](/tutorial/tutorial-05-http_proxy.cc) for details.

# Handling file reading results
Expand Down Expand Up @@ -199,4 +199,4 @@ Linux operating system supports a set of asynchronous IO system calls with high
We have implemented a set of posix aio interfaces to support other UNIX systems, and used the sigevent notification method of threads, but it is no longer in use because of its low efficiency.
Currently, for non-Linux systems, asynchronous IO is always simulated by multi-threading. When an IO task arrives, a thread is created in real time to execute IO tasks, and then a callback is used to return to the handler thread pool.
Multi-threaded IO is also the only choice in macOS, because macOS does not have good sigevent support and posix aio will not work in macOS.
Some UNIX systems do not support fdatasync. In this case, an fdsync task is equivalent to an fsync task.
Some UNIX systems do not support fdatasync. In this case, an fdatasync task is equivalent to an fsync task.
4 changes: 2 additions & 2 deletions docs/tutorial-09-http_file_server.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public:
};
~~~
无论是pread还是pwrite,返回的都是WFFileIOTask。这与不区分sort或psort,不区分client或server task是一个道理。
除这两个接口还有preadv和pwritev,返回WFFileVIOTask,以及fsync,fdsync,返回WFFileSyncTask。可以在头文件里查看。
除这两个接口还有preadv和pwritev,返回WFFileVIOTask,以及fsync,fdatasync,返回WFFileSyncTask。可以在头文件里查看。
示例用了task的user_data域保存服务的全局数据。但对于大服务,我们推荐使用series context。可以参考前面的[proxy示例](../tutorial/tutorial-05-http_proxy.cc)。

# 处理读文件结果
Expand Down Expand Up @@ -195,5 +195,5 @@ Linux操作系统支持一套效率很高,CPU占用非常少的异步IO系统
我们曾经实现过一套posix aio接口用于支持其它UNIX系统,并使用线程的sigevent通知方式,但由于其效率太低,已经不再使用了。
目前,对于非Linux系统,异步IO一律是用多线程实现,在IO任务到达时,实时创建线程执行IO任务,callback回到handler线程池。
多线程IO也是macOS下的唯一选择,因为macOS没有良好的sigevent支持,posix aio行不通。
某些UNIX系统不支持fdatasync调用,这种情况下,fdsync任务将等价于fsync任务
某些UNIX系统不支持fdatasync调用,这种情况下,fdatasync任务将等价于fsync任务

4 changes: 2 additions & 2 deletions src/client/WFConsulClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ class WFConsulClient
}

// with specific SSL_CTX
int init(const std::string& proxy_url, SSL_CTX *ctx_ctx)
int init(const std::string& proxy_url, SSL_CTX *ctx)
{
return this->init(proxy_url, protocol::ConsulConfig(), ssl_ctx);
return this->init(proxy_url, protocol::ConsulConfig(), ctx);
}

int init(const std::string& proxy_url, protocol::ConsulConfig config,
Expand Down
19 changes: 15 additions & 4 deletions src/client/WFHttpChunkedClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,26 +61,37 @@ class WFHttpChunkedTask : public WFGenericTask
}

public:
/* Timeout of waiting for the first package of each chunk. If not set,
the max waiting time will be the global 'response_timeout'. */
void set_watch_timeout(int timeout)
{
this->task->set_watch_timeout(timeout);
}

void set_recv_timeout(int timeout)
/* Timeout of receiving a complete chunk. */
void set_receive_timeout(int timeout)
{
this->task->set_receive_timeout(timeout);
}

/* Timeout of sending the HTTP request. */
void set_send_timeout(int timeout)
{
this->task->set_send_timeout(timeout);
}
{
this->task->set_send_timeout(timeout);
}

/* Speicify HTTP keep alive timeout. */
void set_keep_alive(int timeout)
{
this->task->set_keep_alive(timeout);
}

/* Equal to 'set_receive_timeout()'. For compatibility purpose only. */
void set_recv_timeout(int timeout)
{
this->set_receive_timeout(timeout);
}

public:
void set_ssl_ctx(SSL_CTX *ctx)
{
Expand Down
10 changes: 8 additions & 2 deletions src/client/WFRedisSubscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ class WFRedisSubscribeTask : public WFGenericTask
the task is started or in 'extract'. */

/* Timeout of waiting for each message. Very useful. If not set,
the max waiting time will be the global 'response_timeout'*/
the max waiting time will be the global 'response_timeout'. */
void set_watch_timeout(int timeout)
{
this->task->set_watch_timeout(timeout);
}

/* Timeout of receiving a complete message. */
void set_recv_timeout(int timeout)
void set_receive_timeout(int timeout)
{
this->task->set_receive_timeout(timeout);
}
Expand All @@ -135,6 +135,12 @@ class WFRedisSubscribeTask : public WFGenericTask
this->task->set_keep_alive(timeout);
}

/* For compatibility purpose only. */
void set_recv_timeout(int timeout)
{
this->set_receive_timeout(timeout);
}

public:
/* Call 'set_extract' or 'set_callback' only before the task
is started, or in 'extract'. */
Expand Down
16 changes: 8 additions & 8 deletions src/factory/FileTaskImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ class WFFilefsyncTask : public WFFileSyncTask
}
};

class WFFilefdsyncTask : public WFFileSyncTask
class WFFilefdatasyncTask : public WFFileSyncTask
{
public:
WFFilefdsyncTask(int fd, IOService *service, fsync_callback_t&& cb) :
WFFilefdatasyncTask(int fd, IOService *service, fsync_callback_t&& cb) :
WFFileSyncTask(service, std::move(cb))
{
this->args.fd = fd;
Expand All @@ -140,7 +140,7 @@ class WFFilefdsyncTask : public WFFileSyncTask
protected:
virtual int prepare()
{
this->prep_fdsync(this->args.fd);
this->prep_fdatasync(this->args.fd);
return 0;
}
};
Expand Down Expand Up @@ -344,12 +344,12 @@ WFFileSyncTask *WFTaskFactory::create_fsync_task(int fd,
std::move(callback));
}

WFFileSyncTask *WFTaskFactory::create_fdsync_task(int fd,
fsync_callback_t callback)
WFFileSyncTask *WFTaskFactory::create_fdatasync_task(int fd,
fsync_callback_t callback)
{
return new WFFilefdsyncTask(fd,
WFGlobal::get_io_service(),
std::move(callback));
return new WFFilefdatasyncTask(fd,
WFGlobal::get_io_service(),
std::move(callback));
}

/* Factory functions with path name. */
Expand Down
3 changes: 0 additions & 3 deletions src/factory/MySQLTaskImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,6 @@ CommMessageOut *ComplexMySQLTask::message_out()
auth_switch_req->set_password(password_);
auth_switch_req->set_auth_plugin_name(std::move(conn->str));
auth_switch_req->set_seed(conn->seed);
#if OPENSSL_VERSION_NUMBER < 0x10100000L
WFGlobal::get_ssl_client_ctx();
#endif
break;

case ST_SHA256_PUBLIC_KEY_REQUEST:
Expand Down
9 changes: 1 addition & 8 deletions src/factory/WFGraphTask.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,11 @@ SubTask *WFGraphTask::done()

WFGraphTask::~WFGraphTask()
{
SeriesWork *series;
size_t i;

if (this->parallel)
{
for (i = 0; i < this->parallel->size(); i++)
{
series = this->parallel->series_at(i);
for (SeriesWork *series : *this->parallel)
series->unset_last_task();
}

this->parallel->dismiss();
}
}

Loading