写了一个快速非支配排序算法,应该怎样提升效率

代码如下:

julia> function ≺(v₁::Vector{T}, v₂::Vector{T}) where T
           v₁==v₂ && return 0
           all(v₁.≤v₂) && return -1
           all(v₂.≤v₁) && return  1
           return 0
       end


julia> function nds(sols::Vector{T}) where T
           n = length(sols)

           d_m, i_d = zeros(Int, n), [[] for _ in 1:n]

           for p = 1:n-1, q = p+1:n
               flag = sols[p] ≺ sols[q]
               flag == -1 && (push!(i_d[p], q), d_m[q]+=1)
               flag ==  1 && (push!(i_d[q], p), d_m[p]+=1)
           end

           idx, fronts = 1, [findall(d_m .== 0)]
           while length(fronts[idx]) ≠ 0
               d_m[fronts[idx]] .-= 1

               @simd for i in fronts[idx]
                   length(i_d[i])>0&&(d_m[i_d[i]] .-= 1)
               end

               push!(fronts, findall(d_m .== 0))
               idx += 1
           end

           return fronts[1:end-1]
       end


julia> sols = [rand(Float32, 3) for _ in 1:10000];

julia> @time nds(sols);
 11.914778 seconds (242.02 M allocations: 12.337 GiB, 26.58% gc time)

# 大家平台可能跟我不一样,仅作参考
julia> versioninfo()
Julia Version 1.7.0-DEV.168
Commit dedf290c99 (2020-12-25 20:12 UTC)
Platform Info:
  OS: Linux (x86_64-pc-linux-gnu)
  CPU: Intel(R) Core(TM) i7-9700K CPU @ 3.60GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-11.0.0 (ORCJIT, skylake)

建议尽量不用 push。

1赞

好的,我先尝试在这方面改进一下

我先给你改了一部分

function nds2(sols::Vector{T}) where T
    n = length(sols)

    d_m= zeros(Int, n)
    i_d= falses(n,n) # BitArray 

    for p = 1:n-1, q = p+1:n
        flag = sols[p] ≺ sols[q]
        if flag ==-1
            i_d[q,p]=true 
# 这里写 [q,p] 是为了 qs=findall(i_d[:,i])快一些,
# 要是这里写 [p,q], 就得写 qs=findall(i_d[i,:]) 会更慢。
            d_m[q]+=1 
        end
        if flag ==  1 
            i_d[p,q]=true
            d_m[p]+=1
        end
    end

    idx, fronts = 1, [findall(d_m .== 0)]
    while length(fronts[idx]) ≠ 0
        d_m[fronts[idx]] .-= 1

        @simd for i in fronts[idx]
             if any(i_d[:,i])
                 qs=findall(i_d[:,i])
                 d_m[qs] .-= 1
             end
             end

        push!(fronts, findall(d_m .== 0)) # 这一段 while 还在看怎么改。
        idx += 1
    end

    return fronts[1:end-1]
end
julia> @time nds2(sols);
  8.428517 seconds (187.59 M allocations: 11.403 GiB, 11.65% gc time)
2赞

又改了一个版本,现在不用 push 了,但是提升不理想

function nds3(sols::Vector{T}) where T
    n = length(sols)

    d_m= zeros(Int, n)
    i_d= falses(n,n)

    for p = 1:n-1, q = p+1:n
        flag = sols[p] ≺ sols[q]
        if flag ==-1
            i_d[q,p]=true
            d_m[q]+=1
        end
        if flag ==  1 
            i_d[p,q]=true
            d_m[p]+=1
        end
    end
    
    fronts=zeros(Int,length(d_m), length(d_m))
    idx=1
        tmp=findall(d_m .== 0)
    ll=length(tmp)
    @simd for i in 1:ll
        fronts[i,idx]=tmp[i]
    end
    while any(fronts[:,idx].>0)
        d_m[fronts[1:ll,idx]].-=1
        @simd for i in fronts[1:ll,idx]
            if any(i_d[:,i])
                 qs=findall(i_d[:,i])
                 d_m[qs] .-= 1
             end
        end
        idx += 1
        tmp=findall(d_m .== 0)
        ll=length(tmp)
        @simd for i in 1:length(tmp)
            fronts[i,idx]=tmp[i]
        end
    end

    return fronts[:,1:idx-1]
end
julia> @time nds3(sols);
  7.541670 seconds (187.59 M allocations: 12.155 GiB, 23.35% gc time)
C=nds3(sols);
cols=[count(c.>0) for c in eachcol(C)] 
D=[C[1:cols[i],i] for i in 1:size(C,2)] # 这个 D 和你原来return 的格式一样。
2赞

非常感谢,提升确实很明显,我消化一下!

julia> length(sols)
21903

julia> @time fronts = nds(sols);
 81.125545 seconds (1.17 G allocations: 60.395 GiB, 39.20% gc time)

julia> @time fronts = nds2(sols);
 27.681556 seconds (932.16 M allocations: 56.396 GiB, 20.41% gc time)

julia> @time fronts = nds3(sols);
 26.872932 seconds (932.17 M allocations: 60.177 GiB, 20.04% gc time)

其实我还想把第一个 for 循环改成这样的,改成这样之后速度能到5s多,但是结果不对。不知道我的理解是不是不太对,我感觉这里是顺序无关的啊。

5.185270 seconds (188.32 M allocations: 12.180 GiB, 57.94% gc time)

Threads.@threads for p = 1:n-1
         for q = p+1:n
        flag = sols[p] ≺ sols[q]
        if flag ==-1
            i_d[q,p]=true
            d_m[q]+=1
        end
        if flag ==  1 
            i_d[p,q]=true
            d_m[p]+=1
        end
    end
    end
1赞

我试了一下,结果应该是对的。结果中fronts的每一层元素顺序无所谓。

是对的的话就这样写吧,我还挺喜欢用 @threads 宏的,只要事先设置好环境变量 JULIA_NUM_THREADS 就可以了

julia> versioninfo()
Julia Version 1.5.3
Commit 788b2c77c1* (2020-11-09 13:37 UTC)
Platform Info:
  OS: Linux (x86_64-pc-linux-gnu)
  CPU: AMD Ryzen 7 4800H with Radeon Graphics
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-10.0.1 (ORCJIT, znver2)
Environment:
  JULIA_NUM_THREADS = 16
  JULIA_PKG_SERVER = https://mirrors.bfsu.edu.cn/julia/static

:100: :100: :100: :100: :100: :100:

v₁.≤v₂ 会分配内存空间,另外这里 all 可以 short-circuit 来避免不比较的计算

- all(v₁.≤v₂) # 14.021479 seconds (243.27 M allocations: 12.424 GiB, 27.64% gc time)
+ all(t->(t[1]≤t[2]), zip(v₁, v₂)) # 4.556885 seconds (54.66 M allocations: 1.195 GiB, 12.80% gc time)

@simd for i in fronts[idx]
                   length(i_d[i])>0&&(d_m[i_d[i]] .-= 1)
               end

这部分的 @simd 不一定会生效,如果想要生效的话需要把 if 去掉


可以用 @inbounds 来取消不必要的下标验证

- d_m[fronts[idx]] .-= 1
+ @inbounds d_m[fronts[idx]] .-= 1
3赞

原来问题出在这里。学习了

3赞
function ≺(v₁::Vector{T}, v₂::Vector{T}) where T
           v₁==v₂ && return 0
           all(v₁.≤v₂) && return -1
           all(v₂.≤v₁) && return  1
           return 0
       end

感觉这里实际上把两种运算合在一起了,虽然理解上没差异,但是在实际计算的时候,很多时候可能并不需要两边都计算,而只需要返回 TrueFalse 就行了. 所以如果能够把计算拆分的话,计算量也许能够减半。 另外也许还可以将 == 的判断放在最后来做,因为这个的概率比较低??

这是个判断两个向量之间关系的定义–支配. 对于一个最小化问题来说, 假设v₁v₂是问题的两个解, 如果对于所有的 i 都满足v₁[i] ≤ v₂[i], 并至少存在一个 j 使得v₁[j] < v₂[j], 那么就称v₁ 支配与 v₂, 记作v₁ ≺ v₂, 这里返回 -1, 反过来v₂ ≺ v₁时, 返回 1, 其它情况称它们是相互非支配的, 返回0. 这也是==放到最前面的原因, 要不然会有bug.

比如:
v₁=[0.1, 0.2], v₂=[0.2, 0.2], 那么v₁ ≺ v₂, 返回 -1
v₁=[0.1, 0.2], v₂=[0.2, 0.1], 那么v₁ ⊀ v₂, 并且 v₂ ⊀ v₁, 返回 0
v₁=[0.1, 0.2], v₂=[0.0, 0.0], 那么 v₂ ≺ v₁, 返回 1

1赞

京ICP备17009874号-2