我有如下代码,目的是对不同的workers分配任务,并给出进度
# bug.jl
# 一个简单的计算任务
function cal_one_task(size)
a = rand(size)
sum(a)
end
# 管理不同worker上的任务,并给出进度
function auto_manger(task)
# 总共的任务数量
num_of_task = length(task)
# 获取远程进程数量
numof_workers = length(workers())
result = []
workersFuture = [Future(1) for i in 1:numof_workers]
# 记录完成任务ID在列表中的位置
c = Channel{Int64}(numof_workers)
progress = 0.0
# 同时启动numof_workers数量的进程
for i in 1:numof_workers
@async begin; put!(workersFuture[i], remotecall_fetch(cal_one_task, workers()[i], task[i])); put!(c, i); end
end
# i记录了下次数据在result中存放的位置
# (i - 1)即为已经记录的数据数量
# 并且(i - 1 + numof_workers)为已经启动的进程数
# 当已经启动的进程数量小于num_of_task 时,不断记录已经完成任务的进程的结果,并在完成任务的进程再启动计算任务
# 当启动num_of_task 数量的进程后,等待剩余正在运行的进程(最后一个)并返回结果
i = 1
test_num = 0
while true
# 完成任务的进程序号
j = take!(c)
# 取出结果
push!(result, fetch(workersFuture[j]))
workersFuture[j] = Future(1)
println("up $i #")
if i < num_of_task
@async begin; put!(workersFuture[j], remotecall_fetch(cal_one_task, workers()[j], task[i + numof_workers])); put!(c, j); end;
# test_num = test_num + 1
# println("test_num $test_num")
## 进度条
if (i - 1)/num_of_task > 0.001 + progress
progress = i/num_of_task
print("\r")
print(round(progress*100, digits = 1))
print("%")
end
## 进度条
else
break
end
i = i + 1
end
close(c)
print(" all done @")
println(now())
result
end
接着只要在REPL中运行下述命令就可以启动计算
using Distributed
using Dates
@everywhere include("./bug.jl")
my_tasks = [10^8 for i in 1:10];
auto_manger(my_tasks)
令人匪夷所思的事情是上述程序总是卡在最后一个任务上,一直处于j = take!(c)
的等待状态
更令人匪夷所思的事情是,当我去掉下面的#
时,即@async
后打印一下test_num
这个变量,
# test_num = test_num + 1
# println("test_num $test_num")
bug好像就没了??
- 卡在最后

- 正常结束

在while loop里面这一行:
@async begin
put!(workersFuture[j], remotecall_fetch(cal_one_task, workers()[j], task[i + numof_workers]))
put!(c, j)
end
角标i
是从1开始的,那么task[i + numof_workers]
最后一项是不是越界了?显示backtrace卡在take!
那里,是因为在某个worker上出错了,main process在等结果,永远等不到。为了确认这个问题,不妨我们把@async
改成@sync
,我看到的错误是
ERROR: BoundsError: attempt to access 10-element Vector{Int64} at index [11]
我猜之所以加了println
没问题了,只是一个假象,因为你的while loop里面的写法,@async
+只要i==num_of_task
就在main process上跳出,那个报错的worker其实还在那里,只不过没人理而已。
你觉得呢?
在numof_workers
大于1的时候,i + numof_workers
确实可能大于num_of_task
但如果按我给出的示例,workers()
仅含一个元素。numof_workers = 1
,在@asunc
前有if i < num_of_task
,因此i + numof_workers = i + 1 <= num_of_task
,这种情况下不应该发生越界,但程序还是在无限等待
根据henry2004y的提示,我先修复了numof_workers大于1
时存在的bug,然后我把注意力放在越界问题上,发现了一些有趣的现象:
更改后的代码
# bug.jl
# 一个简单的计算任务
function cal_one_task(size)
a = rand(size)
sum(a)
end
# 管理不同worker上的任务,并给出进度
function auto_manger(task)
# 总共的任务数量
num_of_task = length(task)
println(num_of_task)
# println(task[num_of_task])
# 获取远程进程数量
numof_workers = length(workers())
println(numof_workers)
result = []
workersFuture = [Future(1) for i in 1:numof_workers]
# 记录完成任务ID在列表中的位置
c = Channel{Int64}(numof_workers)
progress = 0.0
# 同时启动numof_workers数量的进程
for i in 1:numof_workers
# println("$num_of_task")
@async begin; put!(workersFuture[i], remotecall_fetch(cal_one_task, workers()[i], task[i])); put!(c, i); end
# println("first")
end
# i记录了下次数据在result中存放的位置
# (i - 1)即为已经记录的数据数量
# 并且(i - 1 + numof_workers)为已经启动的进程数
# 当已经启动的进程数量小于num_of_task时,不断记录已经完成任务的进程的结果,并在完成任务的进程再启动完成任务
# 当启动num_of_task数量的进程后,等待剩余正在运行的进程并返回结果
i = 1
# while (i - 1 + numof_workers) < num_of_task
test_num = 0
while true
# yield()
# 完成任务的进程序号
j = take!(c)
# 取出结果
push!(result, fetch(workersFuture[j]))
# println(result[i])
workersFuture[j] = Future(1)
println("up $i #")
# println(task[i + numof_workers].W)
# println(workers()[j])
# println((i - 1 + numof_workers) < num_of_task)
# print(i + numof_workers)
# print(", j $j \n")
# 未启动过num_of_task的进程则继续启动
if (i - 1 + numof_workers) < num_of_task
@async begin; println(i); put!(workersFuture[j], remotecall_fetch(cal_one_task, workers()[j], task[i + numof_workers])); put!(c, j); end
test_num = test_num + 1
# println(i)
# println("test_num $test_num")
## 进度条
# if (i - 1)/num_of_task > 0.001 + progress
# progress = i/num_of_task
# print("\r")
# print(round(progress*100, digits = 1))
# print("%")
# end
## 进度条
# 未记录num_of_task数量的暑假则继续记录
elseif i < num_of_task
nothing
else
break
end
# println(length(result))
i = i + 1
end
close(c)
print(" all done @")
println(now())
result
end
注意我在第二个@async
后添加了print(i)
,要求启动的task打印i
的值
@async begin; println(i); put!(workersFuture[j], remotecall_fetch(cal_one_task, workers()[j], task[i + numof_workers])); put!(c, j); end
运行
仍然只使用一个worker:
using Distributed
using Dates
@everywhere include("./bug.jl")
my_tasks = [10^8 for i in 1:10];
auto_manger(my_tasks)
输出
理应输出
10
1
up 1 #
1
up 2 #
2
···
也就是说,在@async
后,i 还没有来得及输入进task中,就立刻被+ 1
了
目前我也不知道该怎么解决了,
这就是异步操作
哦,解决的方法就是在本地计算i + numof_workers
就好
比如说
the_task = task[i + numof_workers]
the_id = workers()[j]
the_j = j
@async begin; put!(workersFuture[the_j], remotecall_fetch(cal_one_task, the_id, the_task)); put!(c, the_j); end
目前算是可以正常运行,
这里我使用只在if
判断语句中存在的临时变量the_task
和the_id
不知道能不能防止本地更改参数值,导致远程worker输入参数被更改的问题