首先我想要实现的功能需求是一开始在channel中放入一个task,然后take!这个task后会根据情况有个分支,消费掉不产生新的task与产生新的task,所以channel中的task是会加入与消费的。
c = Channel(100)
put!(c,0)
count = 0
while isready(c)
i = take!(c)
# 放这里打印只显示一次
@show i
count = count + 1
if count <= 2
@async foreach(i->put!(c,i),1:4)
end
end
@show count
输出打印为如下:
i = 0
count = 1
结果跟我想的不一样,然后我调整了下@show i的顺序就发生了奇妙的变化!
c = Channel(100)
put!(c,0)
count = 0
while isready(c)
i = take!(c)
count = count + 1
if count <= 2
@async foreach(i->put!(c,i),1:4)
end
# 修改到这里打印位置
@show i
end
@show count
这次的打印结果是符合预期的
i = 0
i = 1
i = 2
i = 3
i = 4
i = 1
i = 2
i = 3
i = 4
count = 9
所以为什么不同位置会影响执行呢,是单纯的打印出的问题还是,根本就没执行!
我看isready是判断是否task都完成返回布尔值的,哪里有问题呢
而且我没用for in来弄,是因为如文档中说的一样for in 会阻塞
我也尝试使用过在里面使用bind来绑定任务,结果只成功一次bind
c = Channel(100)
put!(c,0)
count = 0
for i in c
count = count + 1
if count <= 2
task = @async foreach(i->put!(c,i),1:4)
bind(c,task)
end
@show i
end
@show count
打印如下
i = 0
i = 1
i = 2
i = 3
i = 4
count = 5
由于我的task不是事先已知的数量,,尝试bind绑定失败,也无法把函数放入channel(f)中去。
所以有没有大佬知道该怎么让通道能够判断@async 执行方法全部完成后没有task的情况退出,至少目前我写的isready方式存在以上异常执行的问题!
我没看懂你要做什么,你可能对协程有什么误解?
首先,你放入的不是Task
,而是值(Int
)。Channel
是一种带同步功能的队列,类似管道,取出和放入元素都是可等待(阻塞)的。作为消费者,如果你读取的Channel
里没有值,那么你可以等待生产者放入值以后再返回;作为生产者,如果队列满了,你需要等待消费者取出值以后才能继续放入新值。
其次,你放入元素的时候为何要使用@async
?这个宏会创建一个Task
并且使用schedule
将它加入可执行队列,但是这个Task
如果没有被调度,是不会运行的。你的第一种情况下,循环执行到第二次isready(c)
的时候队列是空的,因此它立刻返回false
,你的循环就退出了。第二种情况下,由于你在调度了新建的Task
之后执行了IO操作(show
),此时在默认Task
进行控制台IO的时候你的新Task
被调度运行了,往队列里放入了元素,你的循环就继续运行下去了。举个例子:
julia> a=0
0
julia> begin
@async global a=1
println()
println(a)
end
1
julia> a=0
0
julia> begin
@async global a=1
println(a)
end
0
这样一来你的代码的正确性就建立在Task
被调度的时机上了,是不对的。
最后,bind
的作用是让Task
终止之后自动关闭对应的Channel
,我没看懂你想用它来做什么。for in
迭代的时候会阻塞也是很正常的,因为take!
会阻塞。
function iterate(c::Channel, state=nothing)
try
return (take!(c), nothing)
catch e
if isa(e, InvalidStateException) && e.state == :closed
return nothing
else
rethrow()
end
end
end
简单的Channel
用法可以这样写:
julia> c=Channel()
Channel{Any}(sz_max:0,sz_curr:0)
julia> consumer=@async begin
for i in c
println(i)
end
end
Task (runnable) @0x0000000010593990
julia> for i=1:5
put!(c,i)
end
1
2
3
4
5
julia> istaskdone(consumer)
false
julia> close(c)
julia> istaskdone(consumer)
true
只要你不关闭Channel
,阻塞的等待就不会返回,Task
也不会停止。
3 个赞
超级感谢您的回复,你的清晰解答让我了解了为何会这样,我有一个问题,就是如果我只使用@async对一个函数执行,那么我怎么判断这个函数什么时候执行完并作出响应的语句,特别是dfs_task函数里还有@async修饰的函数,这是一个递归函数,我想的是用队列把所有@async修饰的task加入,然后一个个取出来判断是否都执行完毕,由于由于队列一直在加入,所以可能当我循环判断完毕的时候当前length获取的那些数量的task都执行完毕,但是产生了新的一些task并没判断到,所以还是行不通!
# 这里是部分片段代码
global task_queue = []
task = @async dfs_task(reach,network,property,depth)
push!(task_queue,task)
done_result(task_queue)
function all_task_done(task_queue::Array)
has_all_done = true
len = length(task_queue)
for _ in 1:len
task = pop!(task_queue)
if !istaskdone(task)
push!(task_queue,task)
has_all_done = false
break
end
end
# @show has_all_done
return has_all_done
end
function done_result(task_queue::Array)
while true
if all_task_done(task_queue)
println("执行完毕")
break
end
end
end
跪求大佬,不知道如何才能使用协程来执行递归划分的函数,并且能对其监测执行完毕!
我是新手,刚接触julia,也是看着文档一步步学习,对于并行计算这块的编程经验不足,请大佬指点迷津!
先说好,协程不是并行手段。
根据我的理解,你是想等待一个可能动态添加的Task数组里的所有当前Task完成吗?如果是,你可以这样写:
julia> waitall(tasks)=foreach(wait,tasks)
waitall (generic function with 1 method)
julia> tasks=[@async sleep(10)]
1-element Array{Task,1}:
Task (runnable) @0x0000000034492570
julia> push!(tasks,(@async sleep(x) for x=1:5)...)
6-element Array{Task,1}:
Task (runnable) @0x0000000034492570
Task (runnable) @0x0000000034492850
Task (runnable) @0x0000000034492b30
Task (runnable) @0x0000000034492e10
Task (runnable) @0x00000000344930f0
Task (runnable) @0x00000000344933d0
julia> @time waitall(tasks)
6.900469 seconds (531 allocations: 32.793 KiB)
这个写法貌似可以!
大佬,你说如果一个递归划分的问题,如何对其进行多线程并行化,我查了些资料都是对循环进行并行化的,没什么对递归进行并行,有什么好的建议吗?
Julia的多线程支持还在试验阶段,你可以看一下Threads模块里面的东西。