多线程中如何控制任务的中断

在使用过程中假设我如下代码:

function task1()
   while Control
       ...
   end
end
t = Threads.@spawn task1()

请问我用什么作为循环控制的条件比较好呢,我现在是使用Atomic{Bool}类型作为判断,不知道这样会不会造成多线程的冲突(没有使用锁)

对于Threads.Atomic,可以用Threads.atomic_add!这些函数,本身就是线程安全的。

你这里是想要从外部控制任务的中断,还是在内部(像单线程一样)自动跳出循环?

从外部的话,你可以通过比如说丢一个中断信号(异常)进去

julia> function task1()
           while true
               sleep(1)
               print(".")
           end
       end
task1 (generic function with 1 method)

julia> t = Threads.@spawn task1()
Task (runnable) @0x00007f97ab995430


julia> schedule(t, ErrorException("stop task"), error=true)
Task (runnable) @0x00007f97ab995430

不知道有没有更好的手段,我在 Julia 镜像服务器的脚本代码里用这个作为超时控制

我也做过类似的,双保险,任务也可能在timeout之内完成

function recvbytes(sock; timeout=0.001, interval=0.001)
    res = nothing
    ch = Channel{Bool}(1)
    t = @async begin
        isready(ch) && return
        res = readavailable(sock)
    end
    times = Int(timeout / interval)
    while times > 0
        sleep(interval)
        istaskdone(t) && return res
        times -= 1
    end
    put!(ch, true)
    return res
end

可以的话再多补充些背景,这个很依情况而定。

主要我一个任务读取数据,一个任务处理数据,读取数据是通过MATLAB的库读取的,不好直接放源码,没插设备运行会报错,我现在两个就任务都是通过ATOMIC控制,但是我现在调用Threads.atomic_xchg!(Control, false)关闭处理数据的任务时,读取数据任务也会出错,并且不会正常退出。
试验后发现是Channel的原因,比如

t = Channel{Matrix}(1)
put!(t, ones(5,5))
take!(t)

以上语句能正常运行,但是,我连续两次put!(t, ones(5,5))在第二次运行时REPL会卡住,我怀疑当我处理任务关闭后,中间缺少take!过程可能会导致读取任务出错。
是不是我在每次put!(Channel, elem)时都要先取出来在放入,或者有没有什么判断我能否放入的函数吗?

这个我觉得很好,我就可以直接通过交互界面来关闭某部分功能。请问上述语法可以直接运行而不需要考虑线程安全吗?

如果channel申明阶段大小为1可以直接通过isread来判断是否要put!,但是如果channel定义长度大于1,我们好像得自己唯一个放入元素量来判断能不能继续put!

这个的原因应该是 Channel 的容量为 1 导致的,当这个 channel 塞满之后,后续的 put! 任务会被阻塞直到处理数据的任务那边有一个 take! 取出来腾空 Channel。可能在代码设计的时候要考虑如何确保数据会一直被处理(比如说超时的任务直接丢弃,之类的…)

一个任务读数据,一个任务处理数据,似乎是一条 pipeline,那么用通道来传输数据应该是个不错的选择。你看看这种模式符不符合你的需要。

function readdata(chan)
    while isopen(chan)
        data = somework()
        put!(chan, data)
        yield
    end
end

function process(chan::Channel)
    while isopen(chan)
        data = take!(chan)
        process(data)
        yield
    end
end

chan = Channel{Any}(Inf)
Threads.@spawn readdata(chan)
Threads.@spawn process(chan)
1 个赞

好的,我现在是使用channel来传输数据,基本能够达到要求,谢谢了