Julia 异步编程模拟多任务下载。
规则:
- 同时下载数目 ≤ 10
- 一边下载(入栈)一边处理数据(出栈)
- 正在下载的数目 + 本地已下载数目 ≤ 15
最近一些任务用到爬虫,一边学习一边把学到的方法分享出来。
参考链接
代码
-
模拟下载
"""下载网页""" function download(url) sleep(rand(1:5)) println("page_$url downloaded") flush(stdout) ## 清除缓存,避免运行过程不打印 return "page_$url" end """处理数据""" dosth(sth) = "process_$sth" # 数据和结果 urls = 1:100 # 链接 res = String[] # 处理结果
-
多任务下载及数据处理
state = Channel(10) # 设置最大下载数目 pages = Channel(15) # 正在下载 + 已下载的最大数目 @sync begin # 多任务下载 for url in urls @async begin push!(state, url) ## “登记”后下载 push!(pages, download(url)) take!(state) ## 下载完毕,去掉“登记” end end # 数据处理 for _ in eachindex(urls) page = take!(pages) # 读取下载数据 println(dosth(page)) # 处理数据 end end
-
本地容量设置为 Inf,保持下载数为 10
state = Channel(10) pages = Channel(Inf) # 容量设置为 Inf urls = 1:50 @sync for url in urls @async begin push!(state, url) ## “登记”后下载 push!(pages, download(url)) take!(state) ## 下载完毕,去掉“登记” end end
-
多线程执行
dosth
(如果计算耗时)# env JULIA_NUM_THREADS=4 julia using .Threads for _ in eachindex(urls) page = take!(pages) # 读取数据 println(dosth(page)) # 处理数据 end
-
代码说明
@async
标记异步任务并丢给任务池@sync
等待代码块的任务执行完毕Channel
生成一个队列(先进先出)@spawn
调用空闲线程来执行当前任务
踩坑点
队列 state
用于限制任务开始和结束,省略将带来问题,比如
pages = Channel(10)
@sync for url in urls
@async put!(pages, download(url))
end
虽然 pages
限制了下载数目,但 put!(pages, download(url))
先执行 download(url)
再入栈,download
不会被队列卡住
page = download(url) # 先执行
put!(pages, page) # 后入栈
这时候用 @async
丢入任务池,将同时执行 length(urls)
个 download
任务,一边进行 put!
操作
而队列 state
的限制 download
的执行
# 丢入任务池后
push!(state, true) ## “登记”下载,如果没有空间,进入等待状态
push!(pages, download(url))
take!(state)