swoole Task Worker 篇

swoole_server->task投递一个异步任务到 task_worker 池中。此函数是非阻塞的,执行完毕会立即返回。Worker 进程可以继续处理新的请求。使用Task功能,必须先设置 task_worker_num,并且必须设置 Server 的 onTaskonFinish 事件回调函数。

注意事项:

  • 使用 task 必须为 Server 设置 onTaskonFinish 回调,否则 swoole_server->start 会失败
  • task 操作的次数必须小于 onTask 处理速度,如果投递容量超过处理能力,task 会塞满缓存区,导致 worker 进程发生阻塞。worker 进程将无法接收新的请求
  • 使用 addProcess 添加的用户进程中无法使用 task 投递任务,请使用 sendMessage 接口与 Task 工作进程通信

Task 实例

创建 Server

1
$serv = new swoole_server("127.0.0.1", 9501);

设置task_worker_num

设置task_worker_num是我们使用 task 中不可缺少的一点哦

1
2
3
4
$serv->set([
'worker_num' => 2,
'task_worker_num' => 1,
]);

客户端链接回调

1
2
3
$serv->on('Connect', function ($serv, $fd) {
echo "new client connected.\n";
});

接受客户端数据,使用 task

1
2
3
4
5
6
7
8
9
$serv->on('Receive', function ($serv, $fd, $fromId, $data) {
echo "worker received data: {$data}\n";
// 投递一个任务到task进程中
$serv->task($data);
// 通知客户端server收到数据了
$serv->send($fd, 'This is a message from server.');
// 为了校验task是否是异步的,这里和task进程内都输出内容,看看谁先输出
echo "worker continue run.\n";
});

设置 Server 的 onTaskonFinish 事件回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* $serv swoole_server
* $taskId 投递的任务id,因为task进程是由worker进程发起,所以多worker多task下,该值可能会相同
* $fromId 来自那个worker进程的id
* $data 要投递的任务数据
*/
$serv->on('Task', function ($serv, $taskId, $fromId, $data) {
echo "task start. --- from worker id: {$fromId}.\n";
for ($i=0; $i < 5; $i++) {
sleep(1);
echo "task runing. --- {$i}\n";
}
return "task end";
});
/**
* 只有在task进程中调用了finish方法或者return了结果,才会触发finish
*/
$serv->on('Finish', function ($serv, $taskId, $data) {
echo "finish received data '{$data}'\n";
});

关闭

1
2
3
$serv->on('Close', function ($serv, $fd) {
echo "Client {$fd} close connection\n";
});

执行程序

1
$serv->start();

客户端还是用我们上一篇简单的Server Client中的client.php。
执行查看 client 结果:

1
2
$ php client.php
This is a message from server.

执行查看 server 结果:

1
2
3
4
5
6
7
8
9
10
11
12
$ php index.php
new client connected.
Get Message From Client 1:hello server.
worker continue run.
task start. --- from worker id: 3.
Client 1 close connection
task runing. --- 0
task runing. --- 1
task runing. --- 2
task runing. --- 3
task runing. --- 4
finish received data 'task end'

从上边 server 执行的结果可以看出,task执行已经在 close 之后了,这样就知道什么是异步任务了吧。
总结一下应用场景:

  • 没有耗时任务的情况下,worker直接运行,无需开启task
  • 对于耗时的任务,可以在worker内调用task函数,把异步任务投递给task进程进行处理,task进程的数量取决于task_worker_num的配置
  • task进程内可以选择调用finish方法或者return,来通知worker进程此任务已完成,worker进程会在onFinish回调中对task的执行结果进一步处理。如果worker进程不关心任务的结果,finish就不需要做处理了。
坚持原创技术分享,您的支持将鼓励我继续创作!