Julia 简单爬虫编写

项目介绍

我想试试 do everything in Julia,所以这里我简单写了一个爬虫,来爬取涩图

https://github.com/nesteiner/WebCrawl.jl

安装包

以下的包先安装好

  • HTTP 用于获取 HTTP
  • Gumbo 用于解析 HTTP
  • Cascadia 使用 CSS Selector 解析 HTTP
  • URIs 用于uri 的解析

我们要爬取的 url 在这里 未成年人请在父母陪同下点击

工具函数

Julia 原有的 joinpath 不能满足我们的需求,比如我们要合并这两个 url

合并后的结果应该是 https://example.com/hello/1 ,可 joinpath 合并后的结果是 /hello/1

这里我们使用 URIs 中的 resolvereference 来简单写一个 urljoin

function urljoin(base::AbstractString, ref::AbstractString)
  return string(
    resolvereference(base, ref)
  )
end

整体流程

首先我们定义一个全局的 CONFIG 对象,来设置一些参数,比如

  • 代理地址
  • 请求头
  • Cookie
  • User-Agent

我们再定义解析函数 parse ,由于处理方式与 scrapy 不太一样,没有必要将接口设计成一样

parse(startpage::String, dict::Dict{String, T}) where T <: Any

由于出现分页,我们需要开始递归解析页面,我们从 startpage 开始解析, dict 存储一些额外参数,如

  • 存储的文件夹位置
  • 图片的名称

调用解析函数时,我们还会处理图片下载请求

pipeline(image::String, path::String)

他将从 image 下载资源,存储到 path

详细代码

参考 test/runtests.jl ,我们这样调用接口

@testset "test fetch meitulu" begin
  dict = Dict("startfrom" => 1,
	      "startpath" => "/home/steiner/Downloads/evelyn/[MyGirl美媛馆] 性感嫩模Evelyn艾莉 - 女仆厨娘装制服诱惑系列写真 Vol.157")
  startpage = "https://www.meitu131.com/meinv/5287/index.html"
  try
    WebCrawl.parse(startpage, dict)
  catch error
    print(stderr, error)
  end

end

接下来在包模式下,写下 test 即可,成品就不展示了,怕过不了审
ps: 别忘了更改 startpath

疑问

在下载中,我们写下了如下代码

@sync for image in images
  directory = dict["startpath"]
  if !isdir(directory)
    mkdir(directory)
  end

  path = joinpath(directory, string(dict["startfrom"]) * ".jpg")
  dict["startfrom"] += 1

  url = image.attributes["src"]
  # @async pipeline(url, path)
  @async pipeline(url, path)
end

他的意思是将 for 中的 @async 操作全部同步处理,实际操作下来发现跟去除 @sync, @async

没有多少区别,于是我又设计了这样的代码

tasks = Task[]

for image in images
  task = () -> begin
    directory = dict["startpath"]
    if !isdir(directory)
      mkdir(directory)
    end

    path = joinpath(directory, string(dict["startfrom"]) * ".jpg")
    dict["startfrom"] += 1

    url = image.attributes["src"]
    # @async pipeline(url, path)
    pipeline(url, path)
  end

  push!(tasks, Task(task))
end

for task in tasks
  schedule(task)
end

for task in tasks
  wait(task)
end

其实效率还是一样的,我不知道如何通过异步加速下载过程,如果你了解的话,请务必告诉我

1 个赞

别的有什么好看的

异步的加速会不会和自己主机本身的调度相关,我们异步其实只是放了一个任务进去,但是执行几个这些应该还是要靠系统调度(个人猜想)

更新了异步爬虫的运行流程,大家可以去

看看
也可以运行下试试,试过了,贼快

“贼快贼快”。

我碰到了贼慢的服务器。
单线程情况下,一次请求,大概要60秒才可以返回数据。
8线程,大概要90秒。
12线程,服务器要死机了似乎,反应贼慢。

用的threads。
楼主,像碰到这种服务器很慢的情况:

  • async和threads,哪个更合适?(下载速度是不可能提上去了,就问哪种方法对本地电脑资源消耗少?)
  • 怎么安排async方法?

我觉得这种情况下应该用 async ,这样的话对资源消耗较少
我目前对 async 的理解就是epoll中的事件处理,理解还不是很深,暂时不明白你说的如何安排 async 方法,你可以具体说一说

线程给计算类任务提速,协程给不耗 CPU 类任务加速(比如下载,读取文件等)

举个例子,这里用 downloaddosth 模拟下载和计算类任务。

using .Threads: @spawn
# 设置同时下载数目 ≤10
cache = Channel(10)
# 设置已下载(未处理)数目的最大值
stores = Channel(15)
# 自定义函数
function download(url)
    sleep(rand()*5)
    println("page_$url downloaded")
    flush(stdout) ## 清除缓存,避免运行过程不打印
    return "page_$url"
end
function dosth(data)
    sleep(rand())
    println("process_$data")
    flush(stdout)
end

下边开始工作,使用异步 + 多线程

t = time()
urls = [1:100;]
@sync begin
   # 多任务下载
   for url in urls
       @async begin
            push!(cache, url) ## “登记” 链接
            push!(stores, download(url))
            take!(cache) ## 下载完成且 stores 有空间存数据时,去掉“登记”
        end
   end
   # 数据处理
   @spawn for _ in 1:length(urls)
       data = take!(stores) # 读取下载数据
       dosth(data) # 处理数据
   end
end
print("总耗时\t",time()-t) # 49 s 单线程

几点解释:

  • @async 将循环体放入后台,导致所有任务同时开启
    • 队列的 push!take! 操作能用来卡住任务
    • 队列 cache 限制同时下载的任务
    • 队列 stores 限制已下载数目,可以设 Inf,这样能使同时下载任务保持为 10
  • @spawn 调用空闲线程进行工作(自动分配)
  • @sync 等待代码段中的“后台任务”运行结束
  • 对于上边例子,数据处理的 take! 也会卡后台任务,所以把 @sync 去掉也不影响效果

注意线程数要在运行 Julia 前设置,比如

env JULIA_NUM_THREADS=4 julia
## 或者
# export JULIA_NUM_THREADS=4
# julia

推荐设置为电脑物理线程数

# using Pkg;Pkg.add("Hwloc")
using Hwloc
Hwloc.num_physical_cores()

可以参考之前写的帖子

1 个赞

对异步的皮毛性了解

我不是计算机相关专业,查了点儿资料,皮毛。

异步,理解起来:让各任务在时间顺序上互相独立,谁也不依赖谁。

看了些Julia官方资料,初步理解是:async(或schedule)、多线程、并行、分布式都属于这类技术,它们实现不同层面(线程、进程、多核CPU核心、集群)的“异步”。async 应该是其中最轻量级的。

HTTP响应耗时制约爬取速度

从我个人不多的爬网页经验来看,等待服务器响应的时间应该占最多时间。咱们两个例子中响应时间差异挺大。

  • 你的例子中,服务器响应快(毫秒级、秒级)。async同时发出很多请求,服务器应付得过来。
  • 我的例子中,服务器响应慢(60秒级)。一分钟发送十几个请求,服务器可能就响应不动了(不知道为什么那么弱,体验上就是这样。数据量不大,每次响应不到100kb。)

服务器响应慢,是现实,只能接受。不讨论怎么爬得更快。

我的痛点

痛点在于:让本地电脑爬起来更轻松,也就是更省资源、不卡顿。

这几天运行多线程爬虫。感觉电脑明显变慢,怀疑变慢跟多线程爬虫有关系。因此,想换async试试

至于问的“如何安排async”,具体来说:怎么用async实现8个(大于8,服务器响应不过来)“线路”去爬?这8个之间互相独立。

不管在你的代码中,还是官方文档中,目前没看到怎么样让async能像threads那样指定同时执行的任务数量

谢谢朋友热情讨论。

谢谢朋友,等会儿消化下。

@async 是单个线程上的并行,开多线程本身可能 会占用更多 CPU。单纯爬数据可以只执行这部分

@sync for url in urls
   @async begin
        push!(cache, url)
        push!(stores, download(url))
        take!(cache)
    end
end

比较详细的原理介绍推荐这几个链接

https://juliacn.gitlab.io/JuliaZH.jl/manual/asynchronous-programming.html

https://docs.julialang.org/en/v1/base/parallel/

1 个赞

感谢大拿!

有费曼学习法的味儿。 :wink:

@sync for url in urls
   @async begin
        other_func()  # 这里能放其他函数?
        push!(cache, url)
        push!(stores, download(url))
        take!(cache)
    end
end

@async begin...end代码块内部,除了push!, take!以外,不能放其他代码么?

我试着放了诸如println之类的简单语句,打印会停不住…

演示代码

function download_meta_batchly(vect_fdId::Vector,dir::String)
    total_old = length(vect_fdId)
    vect_fdId=exclude_existed_info(vect_fdId,dir)

    total=length(vect_fdId)
    total_diff=total_old-total
    println("$total_diff files have already been downloaded.")

    seat_num = 2
    seat_list = Channel(seat_num)
    pages = Channel(Inf)

    # 70870/100=709, 70870/500=142, 70870/1000=71
    vect_t = collect(0:6:300) # 50 numbers
    i = 0
    
    for fdId in vect_fdId[1:6]   # 整个向量或列表里面有近7万条,我们只选前6条试验。
        i+=1 
        @async begin
# ——————————————块1:这块内容会先运行完6个循环。意思是:不等下面代码块,它会直接先跑完6遍。
            j=i
            if j>seat_num
                t = rand(10000:20000,1)[1]/3000
            else
                t=vect_t[j]
            end
            sleep(t)
            println("sleeped $t")
            println(fdId)
# ——————————————块2:由于服务器慢、且我们限制了channel容量。这块内容会“慢慢”执行。
            push!(seat_list, fdId)
            push!(pages,download_meta(fdId,dir))
            take!(seat_list)
# ——————————————块3:会被块2堵住
            println("└No. $j/$total, excluding $total_diff previously downloaded files.")
# ——————————————
        end
    end
end

输出结果

请注意(变量的):

  • 对于块1输出结果。sleep时间,跟我设想的不一样。我设想的是前seat_num个(seat_num为channel容量)任务按照0、6、12、18…这样的次序挨个发送请求。之后的,取随机数sleep。而实际运行,直接按随机数sleep。
  • 对于块3输出结果。本意是输出第j个任务的完成。但实际每个任务的j全部为6。

这两点都跟变量j有关。

10425 files have already been downloaded.
sleeped 3.783333333333333
166287a04f53dbfa3c25baa434eac89c
fdId: 166287a04f53dbfa3c25baa434eac89c
└downloading
sleeped 3.849
166287a01aafe7855b8bc25425bba49d
fdId: 166287a01aafe7855b8bc25425bba49d
└downloading
sleeped 4.1386666666666665
166287d117a81eaf7c3c1414fa6badc3
sleeped 4.359
1869602489138a0d40b7217477e891cb
sleeped 6.398666666666666
16628e5f4ca6afb186cd5604665bf5da
sleeped 6.5986666666666665
166283100b991aef4757b5a40279a8ad

└No. 6/70000, excluding 10425 previously downloaded files."
└No. 6/70000, excluding 10425 previously downloaded files."
└No. 6/70000, excluding 10425 previously downloaded files."
└No. 6/70000, excluding 10425 previously downloaded files."
└No. 6/70000, excluding 10425 previously downloaded files."
└No. 6/70000, excluding 10425 previously downloaded files."

理解

在不很熟悉@async情况下,尽量把代码包装成函数、然后用put!和take!对channel操作。否则,可能会有“意想不到”的结果。另外,@async块内变量的表现还须再理解下。

对这方面熟悉的朋友,也请出来聊一聊。

最新探索

欢迎收看最新探索进展 :sweat_smile:

最新理解:把代码块放在push!和take!之间,就可以保证每一趟循环中的代码严格按顺序执行。


function download_meta_batchly(vect_fdId::Vector,dir::String)
    total_old = length(vect_fdId)
    vect_fdId=exclude_existed_info(vect_fdId,dir)

    total=length(vect_fdId)
    total_diff=total_old-total
    println("$total_diff files have already been downloaded.")

    seat_num = 2
    seat_list = Channel(seat_num)
    # pages = Channel(Inf)

    # 70870/100=709, 70870/500=142, 70870/1000=71
    vect_t = collect(0:6:300) # 50 numbers
    j = 0
    
    for fdId in vect_fdId[1:4]
        @async begin
            push!(seat_list, fdId)

            j+=1 
            if j>seat_num
                t = rand(10000:20000,1)[1]/3000
            else
                t=vect_t[j]
            end
            sleep(t)
            println("sleeped $t")
            println(fdId)
            
            download_meta(fdId,dir)
            # push!(pages,download_meta(fdId,dir))
            println("└No. $j/$total, excluding $total_diff previously downloaded files.")

            take!(seat_list)
        end
    end
end
1 个赞

我的建议是,先阅读文档,大致了解原理,这样能更好活用。想象代码运行的过程,从哪走到哪,理解在做什么(并行计算讲义)。

简单说,@async 将任务放后台,@sync 等待后台任务结束(异步编程)。对 Channel 类型数据的 push! 如果当前队列已满,会造成任务阻塞,所以当前任务会进入等待,直到有数据被 take!,反之亦然(Task 文档)。

@sync for url in urls
   @async begin
        # position 1
        push!(cache, url)
        # position 2
        take!(cache)
        # position 3
    end
end

@async 将任务依次放到后台,但显然你不会希望让电脑同时执行所有内容,于是第一行 push!(cache, url) 阻塞任务进行,导致尽管任务都在后台开启,但大部分在这一步就进入等待,而不执行后边内容。所以代码应该放哪很明确了吧。


BTW,在执行异步或多线程的时候,println 打印会被滞后,如果希望即时显示进展,需要执行 flush(stdout),参见 Julia print statement not working in certain cases - Stack Overflow(@show 可能也行)

channel容量约束@async内的同时在执行的任务数,已经初步搞定。

感谢朋友。

朋友们, @nesteiner @RexWzh @YANTAN520

我这边下载网页内容时,经常报错。为了让爬虫程序遇到报错后自动重启,写了个while循环,里面套了个try catch结构。

同样的结构,多线程版的爬虫可以自动重启;而async版的会卡壳,即跑着跑着就不动了,也不报错,也不重启、也不结束。

遇到这种情况么?能给指指方向么?

这种情况我建议转战 scrapy , 那边资料应该多点
ps: 其实这个问题我不会做,你等等其他答案吧 :yum:

谢谢。

目前,有一个先能跑起来的,也不错了。代码够用就好。

异步思想的实现,可能会频繁用到,我也该多了解下。

没贴代码不好说。可能用 async 后没限制数目,导致所有任务并行开始?

@async版下载

function download_meta_batchly2(vect_fdId::Vector,dir::String)
    total_old = length(vect_fdId)
    vect_fdId=exclude_existed_info(vect_fdId,dir)

    total=length(vect_fdId)
    total_diff=total_old-total
    println("$total_diff files have already been downloaded.")

    seat_num = 8
    seat_list = Channel(seat_num)
    # pages = Channel(Inf)

    # 70870/100=709, 70870/500=142, 70870/1000=71
    vect_t = collect(0:6:300) # 50 numbers
    j = 0
    
    for fdId in vect_fdId
        @async begin
            push!(seat_list, fdId)

            j+=1 
            filepath = joinpath(dir,"info_$fdId.html")
            if ~isfile(filepath)
                if j>seat_num
                    t = rand(10000:20000,1)[1]/3000
                else
                    t=vect_t[j]
                end
                sleep(t)
                println("sleeped $t")

                download_meta(fdId,dir)
                # push!(pages,download_meta(fdId,dir))
                println("└No. $j/$total, excluding $total_diff previously downloaded files.")
            else
                println("└No. $j/$total already exists.")
            end

            take!(seat_list)
        end
    end
end

自动重启

function download_meta_for_all_dfId()
    filepath = joinpath(dir_data,"download.csv")
    vect_fdId = CSV.read(filepath, DataFrame)[:,"fdId"]
    dir = joinpath(dir_data,"download","info_all_fdId")

    flag=true
    while flag
        try
            download_meta_batchly2(vect_fdId,dir)  # 调用函数
            flag=false
        catch
            @warn "Downloading failed. Please, check and restart the job!"
            sleep(900)
        end
    end
end

问题
本来是想让爬虫遇错后,自动重启。但实际运行中,会卡壳,就是停在那,不动了,也不死机。

有个疑问
在想上面那个问题时,我产生了一个疑惑。
take!(seat_list)函数取出的是最末位的元素,这可能导致不可知情况。
比如,某个进程中 push!(seat_list, fdId)传入的fdId=1,这个1在seat_list最末位,但是到函数download_meta(fdId,dir)执行完毕时,这时seat_list最末位的元素可能是已经是2或者3或者别的了。
这时,如果把2、3或者其它元素拿出来,显然不是我们想要的效果。我们想拿出的是元素1。