在使用过程中假设我如下代码:
function task1()
while Control
...
end
end
t = Threads.@spawn task1()
请问我用什么作为循环控制的条件比较好呢,我现在是使用Atomic{Bool}类型作为判断,不知道这样会不会造成多线程的冲突(没有使用锁)
在使用过程中假设我如下代码:
function task1()
while Control
...
end
end
t = Threads.@spawn task1()
请问我用什么作为循环控制的条件比较好呢,我现在是使用Atomic{Bool}类型作为判断,不知道这样会不会造成多线程的冲突(没有使用锁)
对于Threads.Atomic
,可以用Threads.atomic_add!
这些函数,本身就是线程安全的。
你这里是想要从外部控制任务的中断,还是在内部(像单线程一样)自动跳出循环?
从外部的话,你可以通过比如说丢一个中断信号(异常)进去
julia> function task1()
while true
sleep(1)
print(".")
end
end
task1 (generic function with 1 method)
julia> t = Threads.@spawn task1()
Task (runnable) @0x00007f97ab995430
julia> schedule(t, ErrorException("stop task"), error=true)
Task (runnable) @0x00007f97ab995430
不知道有没有更好的手段,我在 Julia 镜像服务器的脚本代码里用这个作为超时控制
我也做过类似的,双保险,任务也可能在timeout
之内完成
function recvbytes(sock; timeout=0.001, interval=0.001)
res = nothing
ch = Channel{Bool}(1)
t = @async begin
isready(ch) && return
res = readavailable(sock)
end
times = Int(timeout / interval)
while times > 0
sleep(interval)
istaskdone(t) && return res
times -= 1
end
put!(ch, true)
return res
end
可以的话再多补充些背景,这个很依情况而定。
主要我一个任务读取数据,一个任务处理数据,读取数据是通过MATLAB的库读取的,不好直接放源码,没插设备运行会报错,我现在两个就任务都是通过ATOMIC控制,但是我现在调用Threads.atomic_xchg!(Control, false)
关闭处理数据的任务时,读取数据任务也会出错,并且不会正常退出。
试验后发现是Channel的原因,比如
t = Channel{Matrix}(1)
put!(t, ones(5,5))
take!(t)
以上语句能正常运行,但是,我连续两次put!(t, ones(5,5))
在第二次运行时REPL会卡住,我怀疑当我处理任务关闭后,中间缺少take!
过程可能会导致读取任务出错。
是不是我在每次put!(Channel, elem)
时都要先取出来在放入,或者有没有什么判断我能否放入的函数吗?
这个我觉得很好,我就可以直接通过交互界面来关闭某部分功能。请问上述语法可以直接运行而不需要考虑线程安全吗?
如果channel申明阶段大小为1可以直接通过isread来判断是否要put!,但是如果channel定义长度大于1,我们好像得自己唯一个放入元素量来判断能不能继续put!
这个的原因应该是 Channel
的容量为 1
导致的,当这个 channel 塞满之后,后续的 put!
任务会被阻塞直到处理数据的任务那边有一个 take!
取出来腾空 Channel
。可能在代码设计的时候要考虑如何确保数据会一直被处理(比如说超时的任务直接丢弃,之类的…)
一个任务读数据,一个任务处理数据,似乎是一条 pipeline
,那么用通道来传输数据应该是个不错的选择。你看看这种模式符不符合你的需要。
function readdata(chan)
while isopen(chan)
data = somework()
put!(chan, data)
yield
end
end
function process(chan::Channel)
while isopen(chan)
data = take!(chan)
process(data)
yield
end
end
chan = Channel{Any}(Inf)
Threads.@spawn readdata(chan)
Threads.@spawn process(chan)
好的,我现在是使用channel来传输数据,基本能够达到要求,谢谢了