多线程操作进程的全局变量字典出错

以下是我的代码,我建立一个模块qmod,里面有个全局变量eginedata,是多线程共用的。其中的方法run_with_params是打算多线程运行,并把线程内创建的user_data存到全局变量enginedata中,按理说如果存成功了,下面运行pop!(enginedata,params)肯定能正常运行。

# code
module qmod

mutable struct userparam
    strategy_name::String
    params::String
end

global enginedata = Dict{String,userparam}()

function run_with_params(strategy_name::String, params::String, external_data)
    global enginedata
    user_data = userparam(strategy_name, params)
    enginedata[params] = user_data
    pop!(enginedata,params)
    return nothing
end
export run_with_params

end

我写了另外一个模块,用来实现多线程调用。

# code
module st01
qmod_path = "D:/strategy/julia/"
if qmod_path in LOAD_PATH
else
    push!(LOAD_PATH, qmod_path)
end
using qmod

function config_generator(;continues::Dict{String,<:Vector}=Dict{String, Vector{Number}}(), 
                    discrete::Dict{String, <:Vector}=Dict{String, Vector{Number}}())
    allparam = Dict{String, Vector{Number}}()
    for paramname in keys(continues)
        down, up, cen, width = continues[paramname]
        datavector = collect(cen:width:up)
        popfirst!(datavector)
        push!(datavector, collect(cen:-width:down)...)
        allparam[paramname] = copy(datavector)
    end
    for paramname in keys(discrete)
        datavector = discrete[paramname]
        allparam[paramname] = copy(datavector)
    end
    paramname = Tuple(keys(allparam))
    parvect = [allparam[key] for key in paramname]
    pargrid = vec(collect(Iterators.product(parvect...)))
    configset = Vector{Dict{String,Number}}()
    for point in pargrid
        configi = Dict{String,Number}()
        for i in eachindex(paramname)
            configi[paramname[i]] = point[i]
        end
        push!(configset, copy(configi))
    end
    return configset
end


function do_work()
    continues = Dict("delta"=>[1,20,10,1],"short"=>[1,30,10,1],"period"=>[1,60,5,1])
    configset = config_generator(continues=continues)
    strategy_name = "SMA"
    Threads.@threads for params in configset
#    for params in configset    
        long = params["short"] + params["delta"]
        short = params["short"]
        period = params["period"]
        params_str = string(short,"_",long,"_",period)
        external_data = Vector{Int}()
        run_with_params(strategy_name, params_str, external_data)
    end
    println(myid(), " th job has done.")
end
end

然后在Julia中按如下方式运行:


然后对于有的params就会报这个错误:

以上代码如果不用Threads.@threads来多线程运行,那不会报错。现在问题就很奇怪,难道线程内对enginedata的赋值不会及时生效?
如果不会立马生效,那应该无论什么params都会报错才对。现在只是某些进程的某些params报错。

在 Julia 中使用线程时需要注意以下这些特定的限制和警告:

  • 如果多个线程同时使用基本容器类型,且至少有一个线程修改容器时,需要手动加锁(常见示例包括 push! 数组,或将项插入 Dict)。
  • 任务开始在某个线程上运行后(例如通过@spawn),它会在阻塞后始终在同一线程上重新启动。 将来这个限制将被移除,任务会在线程之间迁移。
  • @threads 当前使用静态调度,使用所有线程并为每个线程分配相等的迭代计数。将来,默认时间表可能会更改为动态的。
  • @spawn 使用的时间表是不确定的,不应依赖。
  • 计算绑定、非内存分配任务可以防止垃圾回收在其他正在分配内存的线程中运行。 在这些情况下,可能需要手动调用 GC.safepoint() 以允许 GC 运行。该限制在未来会被移除。
  • 避免并行运行顶层操作,例如,includeeval 评估类型、方法和模块定义。
  • 请注意,如果启用线程,则库注册的终结器可能会中断。 这可能需要在整个生态系统中进行一些过渡工作,然后才能放心地广泛采用线程。 有关更多详细信息,请参阅下一节。

看了上面这段,我好像明白了,对dict的操作需要自己手工加锁。

更完整的实现其实是这样的:

module qmod

mutable struct userparam
    strategy_name::String
    params::String
    timepointer::Vector(Int)
end

global enginedata = Dict{String,userparam}()
global user_operate::Function

function set_callback(on_user::Function)
    global user_operate = on_user
end
export set_callback

function insert_timepointer(params::String, timep::Int)
    global enginedata
    user_data = enginedata[params]
    push!(user_data.timepointer, timep)
end
export insert_timepointer

function run_with_params(strategy_name::String, params::String, external_data)
    global enginedata
    user_data = userparam(strategy_name, params, Vector{Int}())
    enginedata[params] = user_data
    user_operate(params)
    pop!(enginedata,params)
    return nothing
end
export run_with_params

end

这个模块里面的insert_timepointer是我暴露给用户用的函数,这里我并不想把完整的userparam类型创建的user_data暴露给用户,怕用户会修改里面的值。所以我想到的是建立一个字典enginedata, 把params和user_data关联起来。这样我只需在回调函数里面user_operate里面把params传给用户,用户如果需要调用insert_timepointer,只需要按如下方式使用:

module st01
qmod_path = "D:/strategy/julia/"
if qmod_path in LOAD_PATH
else
    push!(LOAD_PATH, qmod_path)
end
using qmod

function on_user(params::String)
    insert_timepointer(params, 300)
end
set_callback(on_user)

function config_generator(;continues::Dict{String,<:Vector}=Dict{String, Vector{Number}}(), 
                    discrete::Dict{String, <:Vector}=Dict{String, Vector{Number}}())
    allparam = Dict{String, Vector{Number}}()
    for paramname in keys(continues)
        down, up, cen, width = continues[paramname]
        datavector = collect(cen:width:up)
        popfirst!(datavector)
        push!(datavector, collect(cen:-width:down)...)
        allparam[paramname] = copy(datavector)
    end
    for paramname in keys(discrete)
        datavector = discrete[paramname]
        allparam[paramname] = copy(datavector)
    end
    paramname = Tuple(keys(allparam))
    parvect = [allparam[key] for key in paramname]
    pargrid = vec(collect(Iterators.product(parvect...)))
    configset = Vector{Dict{String,Number}}()
    for point in pargrid
        configi = Dict{String,Number}()
        for i in eachindex(paramname)
            configi[paramname[i]] = point[i]
        end
        push!(configset, copy(configi))
    end
    return configset
end


function do_work()
    continues = Dict("delta"=>[1,20,10,1],"short"=>[1,30,10,1],"period"=>[1,60,5,1])
    configset = config_generator(continues=continues)
    strategy_name = "SMA"
    Threads.@threads for params in configset
#    for params in configset    
        long = params["short"] + params["delta"]
        short = params["short"]
        period = params["period"]
        params_str = string(short,"_",long,"_",period)
        external_data = Vector{Int}()
        run_with_params(strategy_name, params_str, external_data)
    end
    println(myid(), " th job has done.")
end
end

现在看来,如果是用dict来做这个关联,由于对dict的操作不是线程安全的,就会出现这个问题。不知道有没有别的办法来做这个关联?还是说最好的方式,就是把系统用的局部变量user_data暴露给用户?

    enginedata[params] = user_data
    user_operate(params)
    pop!(enginedata,params)

这里实在没明白这里在干嘛,set了之后,又 pop,感觉啥也没干啊,如果你多线程,这里 pop! 的时候,如果碰巧key相同,会有冲突的。

这个是我精简了的代码,为了更好地反映问题,中间有很多复杂的逻辑被我删掉了。我这个使用方式里面肯定会保证key是唯一的。不会有相同的key的。

明白,但看报错信息,最可能出错的地方就是这里了

原因我是搞明白了,就是在2楼里面贴了一段文档,大概就是说字典不是线程安全的,在添加新键值的时候要加锁。当然我虽然知道了问题,但不知道有没有除了加锁之外更好的解决方式。