#1

# 自底向上理解Julia中的并行计算

## 几个基本概念

### `Task`

``````julia> methods(Task)
# 1 method for generic function "(::Type)":
[1] Task(f) in Core at boot.jl:377

[2] serialize(s::Serialization.AbstractSerializer, t::Task) in Serialization at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.0/Serialization/src/Serialization.jl:427
[6] schedule(t::Task) in Base at event.jl:95
[7] schedule(t::Task, arg) in Base at event.jl:129
[8] show(io::IO, ::MIME{Symbol("text/plain")}, t::Task) in Base at show.jl:150
[11] yield(t::Task) in Base at event.jl:166
[12] yield(t::Task, x) in Base at event.jl:166
[13] yieldto(t::Task) in Base at event.jl:181
[14] yieldto(t::Task, x) in Base at event.jl:181

(:parent, :storage, :state, :donenotify, :result, :exception, :backtrace, :logstate, :code)
``````

task的构造函数只有一个`Task(f)`，其唯一的一个参数`f`必须是不带参数的函数，如果传一个带参数的函数，会在真正执行时触发`MethodError`

``````julia> t = Task((x) -> x + 1)

julia> schedule(t)
MethodError: no method matching (::getfield(Main, Symbol("##11#12")))()
Closest candidates are:
#11(::Any) at REPL[29]:1
``````

``````julia> t = @task println("Hi")

julia> schedule(t)
Hi

julia> t
``````

``````t = @task begin
inv(rand(2000, 2000))
end

begin
schedule(t)
println(length(Base.Workqueue))
println(t.state)
println("begin computing")
println(sum(inv(randn(1500, 1500))))
println("end computing")
println(length(Base.Workqueue))
println(t.state)
end

# 1
# queued
# begin computing
# 97.12983082590253
# end computing
# 1
# queued
``````

``````julia> fieldnames(Condition)
(:waitq,)
``````

`Condition()`只有一个类型为`Vector`的字段`:waitq`用于记录在等待该条件的所有task，在一个task内部，可以通过执行`wait(c::Condition)`，声明其正在等待某个条件，然后将自己添加到`Base.Workqueue`尾部，同时从中取出第一个task并做切换。当条件满足时，通过执行`notify(c::Condition)`再将这些task重新加入到`Base.Workqueue`中等待执行。

``````julia> c = Condition()
Condition(Any[])

println("waiting condition")
wait(c)
println("condition meet")
end

julia> schedule(t)
waiting condition

julia> notify(c)
condition meet
1
``````

``````yield() = (enq_work(current_task()); wait())
``````

``````julia> t1 = @task begin
yield()
end

yield()
end

julia> begin
schedule(t1)
schedule(t2)
yield()
end
``````

### `Channel`

`Channel`就是一个通道，不同的task可以从一端往其中写入数据，而另外一些task则可以从另外一端读取数据。`Channel`的结构很简单：

``````mutable struct Channel{T} <: AbstractChannel{T}
cond_take::Condition                 # waiting for data to become available
cond_put::Condition                  # waiting for a writeable slot
state::Symbol
excp::Union{Exception, Nothing}         # exception to be thrown when state != :open

data::Vector{T}
sz_max::Int                          # maximum size of channel

# Used when sz_max == 0, i.e., an unbuffered channel.
waiters::Int
end
``````

## 多进程

### 1. 如何表示一个work中的对象？

``````mutable struct Future <: AbstractRemoteRef
where::Int
whence::Int
id::Int
v::Union{Some{Any}, Nothing}
end
``````

``````mutable struct RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef
where::Int
whence::Int
id::Int
end
``````

### 2. 怎么发起远程调用？

Julia中，提供了一个底层函数`remotecall`来实现远程调用，执行后会立即返回一个`Future`对象，然后可以通过`fetch`将value写入到`Future``v`字段中（此时会发生数据转移，也就是导致并行计算性能瓶颈的地方）。例如：

``````julia> using Distributed

4-element Array{Int64,1}:
2
3
4
5

julia> m = remotecall(rand, 5, 2, 2)
Future(5, 1, 6, nothing)

julia> fetch(m)
2×2 Array{Float64,2}:
0.109123  0.304667
0.454125  0.197551
``````

### 3. 什么时候会发生GC？

`Distributed`中有一个`clear!`函数用于将worker中的变量置成nothing，不过，如果不引入全局变量的话，大多时候并不需要手动进行该操作。`fetch`会自动执行`send_del_client`函数，并通知gc.此外手册里也提到，由于对master来说，一个RemoteReference的内存占用很小，并不会马上被gc，因而可以调用`finalize`，从而会立即执行`send_del_client`向worker发送gc信号。

TODO: 一个分布式并行计算的实例