关于协程中使用channel产生的异常执行问题

首先我想要实现的功能需求是一开始在channel中放入一个task,然后take!这个task后会根据情况有个分支,消费掉不产生新的task与产生新的task,所以channel中的task是会加入与消费的。

c = Channel(100)
put!(c,0)
count = 0
while isready(c)
    i = take!(c)
    # 放这里打印只显示一次
    @show i
    count = count + 1
    if count <= 2 
        @async foreach(i->put!(c,i),1:4)
    end
end
@show count

输出打印为如下:
i = 0
count = 1
结果跟我想的不一样,然后我调整了下@show i的顺序就发生了奇妙的变化!

c = Channel(100)
put!(c,0)
count = 0
while isready(c)
    i = take!(c)
    count = count + 1
    if count <= 2 
        @async foreach(i->put!(c,i),1:4)
    end
    # 修改到这里打印位置
    @show i
end
@show count

这次的打印结果是符合预期的
i = 0
i = 1
i = 2
i = 3
i = 4
i = 1
i = 2
i = 3
i = 4
count = 9

所以为什么不同位置会影响执行呢,是单纯的打印出的问题还是,根本就没执行!
我看isready是判断是否task都完成返回布尔值的,哪里有问题呢
而且我没用for in来弄,是因为如文档中说的一样for in 会阻塞
我也尝试使用过在里面使用bind来绑定任务,结果只成功一次bind

c = Channel(100)
put!(c,0)
count = 0
for i in c
    count = count + 1
    if count <= 2 
        task = @async foreach(i->put!(c,i),1:4)
        bind(c,task)
    end
    @show i
end
@show count

打印如下
i = 0
i = 1
i = 2
i = 3
i = 4
count = 5
由于我的task不是事先已知的数量,,尝试bind绑定失败,也无法把函数放入channel(f)中去。
所以有没有大佬知道该怎么让通道能够判断@async 执行方法全部完成后没有task的情况退出,至少目前我写的isready方式存在以上异常执行的问题!

我没看懂你要做什么,你可能对协程有什么误解?

首先,你放入的不是Task,而是值(Int)。Channel是一种带同步功能的队列,类似管道,取出和放入元素都是可等待(阻塞)的。作为消费者,如果你读取的Channel里没有值,那么你可以等待生产者放入值以后再返回;作为生产者,如果队列满了,你需要等待消费者取出值以后才能继续放入新值。

其次,你放入元素的时候为何要使用@async?这个宏会创建一个Task并且使用schedule将它加入可执行队列,但是这个Task如果没有被调度,是不会运行的。你的第一种情况下,循环执行到第二次isready(c)的时候队列是空的,因此它立刻返回false,你的循环就退出了。第二种情况下,由于你在调度了新建的Task之后执行了IO操作(show),此时在默认Task进行控制台IO的时候你的新Task被调度运行了,往队列里放入了元素,你的循环就继续运行下去了。举个例子:

julia> a=0
0

julia> begin
           @async global a=1
           println()
           println(a)
       end

1

julia> a=0
0

julia> begin
           @async global a=1
           println(a)
       end
0

这样一来你的代码的正确性就建立在Task被调度的时机上了,是不对的。

最后,bind的作用是让Task终止之后自动关闭对应的Channel,我没看懂你想用它来做什么。for in迭代的时候会阻塞也是很正常的,因为take!会阻塞。

function iterate(c::Channel, state=nothing)
    try
        return (take!(c), nothing)
    catch e
        if isa(e, InvalidStateException) && e.state == :closed
            return nothing
        else
            rethrow()
        end
    end
end

简单的Channel用法可以这样写:

julia> c=Channel()
Channel{Any}(sz_max:0,sz_curr:0)

julia> consumer=@async begin
           for i in c
               println(i)
           end
       end
Task (runnable) @0x0000000010593990

julia> for i=1:5
           put!(c,i)
       end
1
2
3
4
5

julia> istaskdone(consumer)
false

julia> close(c)

julia> istaskdone(consumer)
true

只要你不关闭Channel,阻塞的等待就不会返回,Task也不会停止。

3 个赞

超级感谢您的回复,你的清晰解答让我了解了为何会这样,我有一个问题,就是如果我只使用@async对一个函数执行,那么我怎么判断这个函数什么时候执行完并作出响应的语句,特别是dfs_task函数里还有@async修饰的函数,这是一个递归函数,我想的是用队列把所有@async修饰的task加入,然后一个个取出来判断是否都执行完毕,由于由于队列一直在加入,所以可能当我循环判断完毕的时候当前length获取的那些数量的task都执行完毕,但是产生了新的一些task并没判断到,所以还是行不通!

# 这里是部分片段代码
global task_queue = []
task = @async dfs_task(reach,network,property,depth)
push!(task_queue,task)
done_result(task_queue)
function all_task_done(task_queue::Array)
    has_all_done = true
    len = length(task_queue)
    for _ in 1:len
        task = pop!(task_queue)
        if !istaskdone(task)
            push!(task_queue,task)
            has_all_done = false
            break
        end
    end
#     @show has_all_done
    return has_all_done
end

function done_result(task_queue::Array)
    while true
        if all_task_done(task_queue) 
            println("执行完毕")
            break
        end
    end
end

跪求大佬,不知道如何才能使用协程来执行递归划分的函数,并且能对其监测执行完毕!
我是新手,刚接触julia,也是看着文档一步步学习,对于并行计算这块的编程经验不足,请大佬指点迷津!

先说好,协程不是并行手段。

根据我的理解,你是想等待一个可能动态添加的Task数组里的所有当前Task完成吗?如果是,你可以这样写:

julia> waitall(tasks)=foreach(wait,tasks)
waitall (generic function with 1 method)

julia> tasks=[@async sleep(10)]
1-element Array{Task,1}:
 Task (runnable) @0x0000000034492570

julia> push!(tasks,(@async sleep(x) for x=1:5)...)
6-element Array{Task,1}:
 Task (runnable) @0x0000000034492570
 Task (runnable) @0x0000000034492850
 Task (runnable) @0x0000000034492b30
 Task (runnable) @0x0000000034492e10
 Task (runnable) @0x00000000344930f0
 Task (runnable) @0x00000000344933d0

julia> @time waitall(tasks)
  6.900469 seconds (531 allocations: 32.793 KiB)

这个写法貌似可以!
大佬,你说如果一个递归划分的问题,如何对其进行多线程并行化,我查了些资料都是对循环进行并行化的,没什么对递归进行并行,有什么好的建议吗?

Julia的多线程支持还在试验阶段,你可以看一下Threads模块里面的东西。