关于并行计算的Bug?

我有如下代码,目的是对不同的workers分配任务,并给出进度

# bug.jl

# 一个简单的计算任务
function cal_one_task(size)
    a = rand(size)
    sum(a)
end

# 管理不同worker上的任务,并给出进度
function auto_manger(task)
    # 总共的任务数量
    num_of_task = length(task)
    # 获取远程进程数量
    numof_workers = length(workers())
    result = []
    workersFuture = [Future(1) for i in 1:numof_workers]
    # 记录完成任务ID在列表中的位置
    c = Channel{Int64}(numof_workers)

    progress = 0.0

    # 同时启动numof_workers数量的进程
    for i in 1:numof_workers
        @async begin; put!(workersFuture[i], remotecall_fetch(cal_one_task, workers()[i], task[i])); put!(c, i); end
    end

    # i记录了下次数据在result中存放的位置
    # (i - 1)即为已经记录的数据数量
    # 并且(i - 1 + numof_workers)为已经启动的进程数
    # 当已经启动的进程数量小于num_of_task 时,不断记录已经完成任务的进程的结果,并在完成任务的进程再启动计算任务
    # 当启动num_of_task 数量的进程后,等待剩余正在运行的进程(最后一个)并返回结果
    i = 1
    test_num = 0
    while  true
            
        # 完成任务的进程序号
        j = take!(c)
        # 取出结果
        push!(result, fetch(workersFuture[j]))
        workersFuture[j] = Future(1)
        println("up $i #")


        if i < num_of_task
            @async begin; put!(workersFuture[j], remotecall_fetch(cal_one_task, workers()[j], task[i + numof_workers])); put!(c, j); end;
            # test_num = test_num + 1
            # println("test_num $test_num")
            ## 进度条
            if (i - 1)/num_of_task > 0.001 + progress
                progress = i/num_of_task
                print("\r")
                print(round(progress*100, digits = 1))
                print("%")
            end
            ## 进度条
        else
            break
        end
        i = i + 1
    end

    close(c)
    print("  all done @")
    println(now())
    result
end

接着只要在REPL中运行下述命令就可以启动计算

using Distributed
using Dates
@everywhere include("./bug.jl")
my_tasks = [10^8 for i in 1:10];
auto_manger(my_tasks)

令人匪夷所思的事情是上述程序总是卡在最后一个任务上,一直处于j = take!(c)的等待状态

更令人匪夷所思的事情是,当我去掉下面的#时,即@async后打印一下test_num这个变量,

            # test_num = test_num + 1
            # println("test_num $test_num")

bug好像就没了??

  • 卡在最后 卡住
  • 正常结束 正常运行

在while loop里面这一行:

@async begin
    put!(workersFuture[j], remotecall_fetch(cal_one_task, workers()[j], task[i + numof_workers]))
    put!(c, j)
end

角标i是从1开始的,那么task[i + numof_workers]最后一项是不是越界了?显示backtrace卡在take!那里,是因为在某个worker上出错了,main process在等结果,永远等不到。为了确认这个问题,不妨我们把@async改成@sync,我看到的错误是

ERROR: BoundsError: attempt to access 10-element Vector{Int64} at index [11]

我猜之所以加了println没问题了,只是一个假象,因为你的while loop里面的写法,@async+只要i==num_of_task就在main process上跳出,那个报错的worker其实还在那里,只不过没人理而已。

你觉得呢?

numof_workers大于1的时候,i + numof_workers确实可能大于num_of_task
但如果按我给出的示例,workers()仅含一个元素。numof_workers = 1,在@asunc前有if i < num_of_task,因此i + numof_workers = i + 1 <= num_of_task,这种情况下不应该发生越界,但程序还是在无限等待

根据henry2004y的提示,我先修复了numof_workers大于1时存在的bug,然后我把注意力放在越界问题上,发现了一些有趣的现象:

更改后的代码

# bug.jl

# 一个简单的计算任务
function cal_one_task(size)
    a = rand(size)
    sum(a)
end

# 管理不同worker上的任务,并给出进度
function auto_manger(task)
    # 总共的任务数量
    num_of_task = length(task)
    println(num_of_task)
    # println(task[num_of_task])
    # 获取远程进程数量
    numof_workers = length(workers())
    println(numof_workers)
    result = []
    workersFuture = [Future(1) for i in 1:numof_workers]
    # 记录完成任务ID在列表中的位置
    c = Channel{Int64}(numof_workers)

    progress = 0.0

    # 同时启动numof_workers数量的进程
    for i in 1:numof_workers
        # println("$num_of_task")
        @async begin; put!(workersFuture[i], remotecall_fetch(cal_one_task, workers()[i], task[i])); put!(c, i); end
        # println("first")
    end

    # i记录了下次数据在result中存放的位置
    # (i - 1)即为已经记录的数据数量
    # 并且(i - 1 + numof_workers)为已经启动的进程数
    # 当已经启动的进程数量小于num_of_task时,不断记录已经完成任务的进程的结果,并在完成任务的进程再启动完成任务
    # 当启动num_of_task数量的进程后,等待剩余正在运行的进程并返回结果
    i = 1
    # while (i - 1 + numof_workers) < num_of_task
    test_num = 0
    while  true
            
        # yield()
        # 完成任务的进程序号
        j = take!(c)
        # 取出结果
        push!(result, fetch(workersFuture[j]))
        # println(result[i])
        workersFuture[j] = Future(1)
        println("up $i #")
        # println(task[i + numof_workers].W)
        # println(workers()[j])
        # println((i - 1 + numof_workers) < num_of_task)
        # print(i + numof_workers)
        # print(", j $j \n")

        # 未启动过num_of_task的进程则继续启动
        if (i - 1 + numof_workers) < num_of_task
            @async begin; println(i); put!(workersFuture[j], remotecall_fetch(cal_one_task, workers()[j], task[i + numof_workers])); put!(c, j); end
            test_num = test_num + 1
            # println(i)
            # println("test_num $test_num")
            ## 进度条
            # if (i - 1)/num_of_task > 0.001 + progress
            #     progress = i/num_of_task
            #     print("\r")
            #     print(round(progress*100, digits = 1))
            #     print("%")
            # end
            ## 进度条
        # 未记录num_of_task数量的暑假则继续记录
        elseif i < num_of_task
            nothing
        else
            break
        end
        # println(length(result))
        i = i + 1



    end

    close(c)
    print("  all done @")
    println(now())
    result
end

注意我在第二个@async后添加了print(i),要求启动的task打印i的值

@async begin; println(i); put!(workersFuture[j], remotecall_fetch(cal_one_task, workers()[j], task[i + numof_workers])); put!(c, j); end

运行

仍然只使用一个worker:

using Distributed
using Dates
@everywhere include("./bug.jl")
my_tasks = [10^8 for i in 1:10];
auto_manger(my_tasks)

输出

理应输出

10
1
up 1 #
1
up 2 #
2
···

也就是说,在@async后,i 还没有来得及输入进task中,就立刻被+ 1
目前我也不知道该怎么解决了,
这就是异步操作:racehorse:

我没有细看了,但关于Julia的异步计算,

  • 如果在worker上出错,main process在wait,是不会显示错误信息的;

  • 就源代码的话,Julia中的Channel本身就自带了blocking的,所以从理论上来说,完全不需要自己单独写一个i的循环然后跳出,何况还是类似于两个Channel一起使用。所以可能的话,考虑使用一个Channel,然后循环并加上@async。可以参考一下文档中的例子。

  • 如果Channel是空的,take!的时候还是会继续等,等到有数据为止。

哦,解决的方法就是在本地计算i + numof_workers就好
比如说

the_task = task[i + numof_workers]
the_id = workers()[j]
the_j = j
@async begin; put!(workersFuture[the_j], remotecall_fetch(cal_one_task, the_id, the_task)); put!(c, the_j); end

目前算是可以正常运行,
这里我使用只在if判断语句中存在的临时变量the_taskthe_id不知道能不能防止本地更改参数值,导致远程worker输入参数被更改的问题

备案号:京ICP备17009874号-2