数组在多线程中的元素锁

大家好,我有一个数组,然后想在不同的线程中进行修改,因为修改会有一定的概率修改到同一个元素,所以我想在修改时对要修改的元素加锁(不是对整个数组进行加锁)。请问这个在julia中该如何做的?

根据我的理解,所谓锁lock不是针对对象,而是针对read/write操作的,所以不存在什么数组锁,而是只有操作锁。Julia的多线程提供了atomic operation,即当一个线程执行一个操作时该片内存受到保护,必须等到执行完才能到下一个线程。

所以首先你可以阅读一下官方文档下关于atomic operation的介绍,尝试修改,有问题再问。

感谢回复

我是想让一个元素在一定的时间内不能被修改,原子操作好像不能实现这个功能。我今天看了看文档,发现用一个 Threads.Condition 数组可以实现这个功能。

恐怕不是你想要的。。。

能详细说一下吗?

我目前打算类似这样使用

a=zeros(256)
aLock=Vector{Threads.Condition}(undef,256)
for i in 1:256
    aLock[i]=Threads.Condition()
end

Threads.@threads for i in 1:100
    b=rand(UInt8)+1
    lock(aLock[b])
    a[b] = rand()
    unlock(aLock[b])
end

是否有什么问题?

锁的使用与具体场景有关,先说一下大概需要注意的点:

  • 底层 for 循环使用 lock 会带来非常大的性能开销,在使用前需要先确认是否有其他方式避免它,比如说: A[indices] .+= B 这种广播操作即使 indices 有重复元素,它在CPU单线程下也不会引起数据竞争,因此一个自然的方案就是构造一个更大的中间矩阵 B 来存储并行的中间结果(不需要锁),然后在单线程模式下将结果累加回 A.
  • 为了避免锁未被成功释放,在使用的时候尽量使用 lock(lk) do ... end 语句

无锁:

function no_lock_setindex!(X, n)
    R = 1:n*length(X)
    Threads.@threads for i in R
        i = mod(rand(R), length(X))+1
        X[i] += 1
    end
    X
end

X = zeros(100);
n = 10000
@time no_lock_setindex!(X, n); # 0.041815 seconds (74.23 k allocations: 3.985 MiB, 25.26% compilation time)
sum(X) == n*length(X) # false # 发生了数据竞争

单一锁:

function single_lock_setindex!(X, n, lk)
    R = 1:n*length(X)
    Threads.@threads for i in R
        # 由于锁的存在,这里本质上是一个串行计算过程
        lock(lk) do
            i = mod(rand(R), length(X))+1
            X[i] += 1
        end
    end
    X
end

X = zeros(100);
n = 10000;
@time single_lock_setindex!(X, n, ReentrantLock()); # 2.135054 seconds (6.88 M allocations: 149.004 MiB)
sum(X) == n*length(X) # true # 无竞争性写入

单一锁的局限很明显,很多不必要加锁的地方都统一加锁处理了,代码的运行本质上成为串行模式了。如果对于代码结构有更清楚认知的话,那么可以引入多个锁,来避免一些不必要的加锁从而允许一部分并行,例如:

function multi_lock_setindex!(X, n, lock_pool)
    R = 1:n*length(X)
    Threads.@threads for i in R
        i = mod(rand(R), length(X))+1
        lock(lock_pool[i]) do 
            X[i] += 1
        end
    end
    X
end

X = zeros(100);
n = 10000;
lockpool = [ReentrantLock() for _ in 1:length(X)];
@time multi_lock_setindex!(X, n, lockpool); # 0.074552 seconds (5.02 M allocations: 92.273 MiB)
sum(X) == n*length(X) # true # 无竞争性写入

lockpool 如何构造以及怎么使用,需要根据具体的代码来进行设计

4 个赞

非常详细,学到了,感谢。