理解 Julia 中的协程 Task

协程虽然是老概念,但直到近些年才被人们广泛接受。论坛里也有不少针对协程的提问,今天我看到了一个比较有意思的问题:请问协程与yield的关系

于是我决定讲一下协程到底是什么,以及各种语言里的 yield 的关系。


协程是一种非抢占式多任务处理的方式,简单来说就是某个函数可以在中途的某个点暂停执行(并且可以返回结果),然后可以从这个点开始恢复执行。

半协程也叫不对称协程,具有调用/被调用的关系,协程在执行途中只能返回到它的调用者。用半协程可以很方便地实现生成器,即每次需要下一个值的时候就让生成器恢复执行,然后它在中途返回结果,看起来就是在生成一串序列。举个例子,C# 的 IEnumerator 写法:

IEnumerator Routine(int n)
{    
    for(var i = 0; i < n; i++)
    {        
        yield return i;
    }
}

void Test()
{    
    foreach (var r in Routine(10))
    {
        Console.Writeline(r);
    }
}

Julia的协程 Task 属于对称协程。对称协程没有调用/被调用的关系,协程在执行途中返回时可以返回到任意协程(这里说返回可能不太合适,更接近切换)。在 Julia 里,一开始你有一个主 Task ,可以创建新的Task,然后切换到它。

底层的协程切换方法是 yieldto,它完全不考虑调度的问题(yield(t) 会将当前协程加入调度队列)。手动处理对称协程十分容易出错,因此尽量不要手动调度到某个协程

julia> m=current_task()
Task (runnable) @0x000000011692c490

julia> t=@task begin
           println(1)
           yieldto(m)
           println(2)
           yieldto(m)
       end
Task (runnable) @0x000000011692ed10

julia> begin
           yieldto(t)
           yieldto(t)
           println(3)
       end
1
2
3

Julia 有一个调度器,使用 schedule 可以将 Task 加入调度队列,使用无参数的 yield 可以切换到调度器。每次调度器运行时,会从队列中取出一个状态为 :runnable 的 Task 然后切换到它。

julia> t=@task begin
           println(1)
           yield()
           println(2)
           yield()
           println(3)
       end
Task (runnable) @0x000000011e8a73d0

julia> schedule(t);wait(t)
1
2
3

julia> t
Task (done) @0x000000011e8a73d0

每次 yield 都会把当前的 Task 加入队列,然后把控制权交给调度器,等它选择队列里的一个可执行的Task继续执行(如果没有其他可运行的 Task,那么当前的 Task 可以持续被调度到)。这里使用 wait 来阻塞主 Task 是为了防止 REPL 打印的提示符搞乱 t 的输出。

@async 宏相当于 @task 加上 schedule

那么,如何用 Task 模拟半协程来做生成器呢?

最底层的方案还是使用 yieldto。手动调度协程的时候可以提供一个值,这个值会成为目标协程上一次离开时调用的 yieldto 的返回值:

julia> m=current_task()
Task (runnable) @0x0000000104fd8490

julia> t=@task begin
           for i=1:5
               yieldto(m,i)
           end
           schedule(m)
       end
Task (runnable) @0x0000000104fdb610

julia> while true
           ret=yieldto(t)
           if istaskdone(t)
               break
           else
               println(ret)
           end
       end
1
2
3
4
5

t 中的 yieldto 传递的 i 会成为主 Task 里面的 yieldto 的返回值。由于我们的主 Task 使用了 yieldto 手动切换协程,在 t 结束运行之前必须再次将 m 加入调度队列,否则 m 就没有机会被调度了。

有没有简单点的方法?答案是使用Channel

julia> c=Channel{Int}() do ch
           for i=1:5
               put!(ch,i)
           end
       end
Channel{Int64}(sz_max:0,sz_curr:1)

julia> take!(c)
1

julia> take!(c)
2

julia> collect(c)
3-element Array{Int64,1}:
 3
 4
 5

简单得不像协程。Channel 的构造函数可以接受函数,使用这个函数创建 Task,我们在函数里向 Channel 放入数据就可以了。Channel 的默认容量是0,因此 put! 放入元素后就会立刻阻塞(并切换到调度器)。同样,take! 当没有元素时也是阻塞的。在这些函数内部大量使用了 waitnotify 等同步操作来调度协程,以实现生产者/消费者模型。一般情况下使用协程的时候,建议使用这些封装好的高层设施。

还有一个常见问题是:如何中断 Task?

其实没什么好方法,如果你能通过一些 flag 来让 Task 主动退出,那就这么做;如果做不到,可以让调度器在调度 Task 时抛出异常,从而中断执行:

julia> r=Ref{Task}()
Base.RefValue{Task}(#undef)

julia> c=Channel{Int}(taskref=r) do ch
           for i=1:5
               put!(ch,i)
           end
       end
Channel{Int64}(sz_max:0,sz_curr:1)

julia> t=r[]
Task (runnable) @0x0000000104fd9b10

julia> schedule(t,InterruptException();error=true);

julia> t
Task (failed) @0x0000000104fd9b10
InterruptException:
try_yieldto(::typeof(Base.ensure_rescheduled), ::Base.RefValue{Task}) at ./task.jl:611
wait() at ./task.jl:668
wait(::Base.GenericCondition{ReentrantLock}) at ./condition.jl:106
put_unbuffered(::Channel{Int64}, ::Int64) at ./channels.jl:350
put! at ./channels.jl:325 [inlined]
#23 at ./REPL[45]:3 [inlined]
(::Base.var"#634#635"{var"#23#24",Channel{Int64}})() at ./channels.jl:129
6 个赞

这么说的话,Channel与协程的对称非对称有关系吗?
C++20标准中没有颁布Channel的实现,可不可以认为非对称协程不需要Channel

不应该说

非对称协程不需要Channel

因为关系是 Channel 需要对称协程支持,而不是反过来。

1 个赞