异步编程模拟多任务下载

Julia 异步编程模拟多任务下载。

规则:

  • 同时下载数目 ≤ 10
  • 一边下载(入栈)一边处理数据(出栈)
  • 正在下载的数目 + 本地已下载数目 ≤ 15

最近一些任务用到爬虫,一边学习一边把学到的方法分享出来。

参考链接

代码

  1. 模拟下载

    """下载网页"""
    function download(url)
        sleep(rand(1:5))
        println("page_$url downloaded")
        flush(stdout) ## 清除缓存,避免运行过程不打印
        return "page_$url"
    end
    """处理数据"""
    dosth(sth) = "process_$sth"
    
    # 数据和结果
    urls = 1:100  # 链接
    res = String[] # 处理结果
    
  2. 多任务下载及数据处理

     state = Channel(10) # 设置最大下载数目
     pages = Channel(15) # 正在下载 + 已下载的最大数目
     @sync begin
         # 多任务下载
         for url in urls
             @async begin
                 push!(state, url) ## “登记”后下载
                 push!(pages, download(url))
                 take!(state) ## 下载完毕,去掉“登记”
             end
         end
         # 数据处理
         for _ in eachindex(urls)
             page = take!(pages) # 读取下载数据
             println(dosth(page)) # 处理数据
         end
     end
    
  3. 本地容量设置为 Inf,保持下载数为 10

    state = Channel(10)
    pages = Channel(Inf) # 容量设置为 Inf
    urls = 1:50
    @sync for url in urls
        @async begin
            push!(state, url) ## “登记”后下载
            push!(pages, download(url))
            take!(state) ## 下载完毕,去掉“登记”
        end
    end
    
  4. 多线程执行 dosth (如果计算耗时)

    # env JULIA_NUM_THREADS=4 julia
    using .Threads
    for _ in eachindex(urls)
        page = take!(pages) # 读取数据
        println(dosth(page)) # 处理数据
    end
    
  5. 代码说明

    • @async 标记异步任务并丢给任务池
    • @sync 等待代码块的任务执行完毕
    • Channel 生成一个队列(先进先出)
    • @spawn 调用空闲线程来执行当前任务

踩坑点

队列 state 用于限制任务开始和结束,省略将带来问题,比如

pages = Channel(10)
@sync for url in urls
    @async put!(pages, download(url))
end

虽然 pages 限制了下载数目,但 put!(pages, download(url)) 先执行 download(url) 再入栈,download 不会被队列卡住

page = download(url) # 先执行
put!(pages, page) # 后入栈

这时候用 @async 丢入任务池,将同时执行 length(urls)download 任务,一边进行 put! 操作

而队列 state 的限制 download 的执行

# 丢入任务池后
push!(state, true) ## “登记”下载,如果没有空间,进入等待状态
push!(pages, download(url))
take!(state)
1 个赞

state主要是用来做数量限制的吗,我觉得可以直接用原子操作,判断数量是否大于10来实现目的

引入 state 是为了限制 download(url) 的执行,而不是避免 pages 的写入冲突。
假设去掉 state,直接用 async 丢入任务池。如果 download 没结束,push 操作会被延后,而不会被 pages 队列卡住,因此将一次性启动所有 download。