请问Julia怎么分块读取300G以上的大文件?是按字节分块吗?
在尝试使用多线程的时候,发现
Threads.@threads for i in eachline(s.txt) 不能使用,由于文件巨大,我的目的是想使用多线程按行处理任务,但是不知道如何解决。一个思路是利用单线程按行读取文件,加入进通道,然后再多线程处理,不知道大神们可有更好的方法?
你可以把你具体的需求写明白么。
速度上有要求么?
单线程还是多线程?
读取的目的是什么?处理数据?还是做parsing?
处理数据的话,每一行数据都可以独立处理么?
每一行都可以独立处理的话,可以给一行数据的例子么?
以及你现在做到了什么程度,需要达到什么样的程度?
直接这样问问题就好像在问“编程怎么编一样”,会很难回答的。
一个思路是利用单线程按行读取文件,加入进通道,然后再多线程处理,不知道大神们可有更好的方法?
你这个思路现在做到什么程度了?有代码吗?
暂时没有,遇到了问题
a=Channel()
@async for i in eachline(“1.txt”)
put!(a,i)
end
for s in a
println(s)
end
最后channel 无法关闭
测试数据
1, hello
2, world
3,
4, hello
5, world
6, hello
7, world
8, hello
9, world
10, hello
先来一个单线程版本的,确保你能理解每一行代码在做什么工作:
datapath = "test.txt"
function main(filename; buffer_lines = 3)
open(filename) do io
buffer = IOBuffer()
while !eof(io)
readlines!(buffer, io; nlines=buffer_lines)
process(String(take!(buffer)))
end
end
end
function process(lines::AbstractString)
for line in split(lines, "\n")
isempty(line) || println(line)
end
end
function readlines!(dst::IO, src::IO; nlines=3, keep=true)
for _ in 1:nlines
eof(src) && break
write(dst, readline(src; keep=keep))
end
end
main(datapath) # 按顺序打印每一行
1 个赞
多线程版本只需要将多创建一些buffer就行了:
function main(filename; buffer_lines = 3)
open(filename) do io
- buffer = IOBuffer()
+ buffers = [IOBuffer() for i in 1:Threads.nthreads()]
while !eof(io)
- readlines!(buffer, io; nlines=buffer_lines))
+ # 单线程读取
+ for buffer in buffers
+ readlines!(buffer, io; nlines=buffer_lines))
+ end
- process(String(take!(buffer)))
+ # 多线程处理
+ Threads.@threads for buffer in buffers
+ process(String(take!(buffer)))
+ end
end
end
end
设置4个线程的话,打印出来的结果是这样的(顺序可能会变):
main(datapath; buffer_lines=1)
4, hello
10, hello
7, world
1, hello
8, hello
9, world
5, world
2, world
3,
6, hello
1 个赞
多谢,我好好看看
设置一个原子值(用以记录已读行数或offset)就可以解决乱序的问题了。让多个线程每次都只读取offset之后的固定大小的内容。
还可以为这些固定大小的块都编上序号。这样在后续处理的时候就能按原样拼接好了。
1 个赞