协程虽然是老概念,但直到近些年才被人们广泛接受。论坛里也有不少针对协程的提问,今天我看到了一个比较有意思的问题:请问协程与yield的关系
于是我决定讲一下协程到底是什么,以及各种语言里的 yield
的关系。
协程
是一种非抢占式多任务处理的方式,简单来说就是某个函数可以在中途的某个点暂停执行(并且可以返回结果),然后可以从这个点开始恢复执行。
半协程
也叫不对称协程
,具有调用/被调用的关系,协程在执行途中只能返回到它的调用者。用半协程可以很方便地实现生成器
,即每次需要下一个值的时候就让生成器恢复执行,然后它在中途返回结果,看起来就是在生成一串序列。举个例子,C# 的 IEnumerator
写法:
IEnumerator Routine(int n)
{
for(var i = 0; i < n; i++)
{
yield return i;
}
}
void Test()
{
foreach (var r in Routine(10))
{
Console.Writeline(r);
}
}
Julia的协程 Task
属于对称协程
。对称协程没有调用/被调用的关系,协程在执行途中返回时可以返回到任意协程(这里说返回可能不太合适,更接近切换)。在 Julia 里,一开始你有一个主 Task ,可以创建新的Task,然后切换到它。
底层的协程切换方法是 yieldto
,它完全不考虑调度的问题(yield(t)
会将当前协程加入调度队列)。手动处理对称协程十分容易出错,因此尽量不要手动调度到某个协程:
julia> m=current_task()
Task (runnable) @0x000000011692c490
julia> t=@task begin
println(1)
yieldto(m)
println(2)
yieldto(m)
end
Task (runnable) @0x000000011692ed10
julia> begin
yieldto(t)
yieldto(t)
println(3)
end
1
2
3
Julia 有一个调度器,使用 schedule
可以将 Task 加入调度队列,使用无参数的 yield
可以切换到调度器。每次调度器运行时,会从队列中取出一个状态为 :runnable
的 Task 然后切换到它。
julia> t=@task begin
println(1)
yield()
println(2)
yield()
println(3)
end
Task (runnable) @0x000000011e8a73d0
julia> schedule(t);wait(t)
1
2
3
julia> t
Task (done) @0x000000011e8a73d0
每次 yield
都会把当前的 Task 加入队列,然后把控制权交给调度器,等它选择队列里的一个可执行的Task继续执行(如果没有其他可运行的 Task,那么当前的 Task 可以持续被调度到)。这里使用 wait
来阻塞主 Task 是为了防止 REPL 打印的提示符搞乱 t 的输出。
@async
宏相当于 @task
加上 schedule
。
那么,如何用 Task 模拟半协程来做生成器呢?
最底层的方案还是使用 yieldto
。手动调度协程的时候可以提供一个值,这个值会成为目标协程上一次离开时调用的 yieldto
的返回值:
julia> m=current_task()
Task (runnable) @0x0000000104fd8490
julia> t=@task begin
for i=1:5
yieldto(m,i)
end
schedule(m)
end
Task (runnable) @0x0000000104fdb610
julia> while true
ret=yieldto(t)
if istaskdone(t)
break
else
println(ret)
end
end
1
2
3
4
5
t
中的 yieldto
传递的 i
会成为主 Task 里面的 yieldto
的返回值。由于我们的主 Task 使用了 yieldto
手动切换协程,在 t
结束运行之前必须再次将 m
加入调度队列,否则 m
就没有机会被调度了。
有没有简单点的方法?答案是使用Channel
:
julia> c=Channel{Int}() do ch
for i=1:5
put!(ch,i)
end
end
Channel{Int64}(sz_max:0,sz_curr:1)
julia> take!(c)
1
julia> take!(c)
2
julia> collect(c)
3-element Array{Int64,1}:
3
4
5
简单得不像协程。Channel
的构造函数可以接受函数,使用这个函数创建 Task
,我们在函数里向 Channel
放入数据就可以了。Channel
的默认容量是0,因此 put!
放入元素后就会立刻阻塞(并切换到调度器)。同样,take!
当没有元素时也是阻塞的。在这些函数内部大量使用了 wait
、notify
等同步操作来调度协程,以实现生产者/消费者模型。一般情况下使用协程的时候,建议使用这些封装好的高层设施。
还有一个常见问题是:如何中断 Task?
其实没什么好方法,如果你能通过一些 flag 来让 Task 主动退出,那就这么做;如果做不到,可以让调度器在调度 Task 时抛出异常,从而中断执行:
julia> r=Ref{Task}()
Base.RefValue{Task}(#undef)
julia> c=Channel{Int}(taskref=r) do ch
for i=1:5
put!(ch,i)
end
end
Channel{Int64}(sz_max:0,sz_curr:1)
julia> t=r[]
Task (runnable) @0x0000000104fd9b10
julia> schedule(t,InterruptException();error=true);
julia> t
Task (failed) @0x0000000104fd9b10
InterruptException:
try_yieldto(::typeof(Base.ensure_rescheduled), ::Base.RefValue{Task}) at ./task.jl:611
wait() at ./task.jl:668
wait(::Base.GenericCondition{ReentrantLock}) at ./condition.jl:106
put_unbuffered(::Channel{Int64}, ::Int64) at ./channels.jl:350
put! at ./channels.jl:325 [inlined]
#23 at ./REPL[45]:3 [inlined]
(::Base.var"#634#635"{var"#23#24",Channel{Int64}})() at ./channels.jl:129