之前写了一个读取超大json文件的封装,但是有些情况需要逐行读取文件,这里提供逐行读取超大文件的封装,读取大json文件请点击传送门
代码中写了详细的注释这里就不详细介绍了直接上代码(代码为参考https://www.cnblogs.com/metoy/p/4470418.html,代码上略有改动)
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel.MapMode;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class BigFileReader {
private int threadSize;
private String charset;
private int bufferSize;
private IHandle handle;
private ExecutorService executorService;
private long fileLength;
private RandomAccessFile rAccessFile;
private Set<StartEndPair> startEndPairs;
private CyclicBarrier cyclicBarrier;
private AtomicLong counter = new AtomicLong(0);
private BigFileReader(File file, IHandle handle, String charset, int bufferSize, int threadSize) {
this.fileLength = file.length();
this.handle = handle;
this.charset = charset;
this.bufferSize = bufferSize;
this.threadSize = threadSize;
try {
this.rAccessFile = new RandomAccessFile(file, "r");
} catch (FileNotFoundException e) {
e.printStackTrace();
}
this.executorService = Executors.newFixedThreadPool(threadSize);
this.startEndPairs = new HashSet<BigFileReader.StartEndPair>();
}
/**
* 开始读取数据
*/
public void start() {
long everySize = this.fileLength / this.threadSize;
try {
// 递归分片
calculateStartEnd(0, everySize);
} catch (IOException e) {
e.printStackTrace();
return;
}
final long startTime = System.currentTimeMillis();
cyclicBarrier = new CyclicBarrier(startEndPairs.size(), new Runnable() {
@Override
public void run() {
System.out.println("use time: " + (System.currentTimeMillis() - startTime));
System.out.println("all line: " + counter.get());
shutdown();
}
});
for (StartEndPair pair : startEndPairs) {
System.out.println("分配分片:" + pair);
this.executorService.execute(new SliceReaderTask(pair));
}
}
/**
* 递归函数分片
*
* @param start
* @param size
* @throws IOException
*/
private void calculateStartEnd(long start, long size) throws IOException {
if (start > fileLength - 1) {
return;
}
StartEndPair pair = new StartEndPair();
pair.start = start;
long endPosition = start + size - 1;
// 最后一个分片
if (endPosition >= fileLength - 1) {
pair.end = fileLength - 1;
startEndPairs.add(pair);
return;
}
// 还有其他没有分片的数据,将指针移动到上一个分片的末尾
rAccessFile.seek(endPosition);
/**
* 由于分割的地方可能在一行数据中间所以
* 将结束位置一直移动到这一行完
*/
byte tmp = (byte) rAccessFile.read();
while (tmp != '\n' && tmp != '\r') {
endPosition++;
if (endPosition >= fileLength - 1) {
endPosition = fileLength - 1;
break;
}
rAccessFile.seek(endPosition);
tmp = (byte) rAccessFile.read();
}
pair.end = endPosition;
startEndPairs.add(pair);
// 递归分片
calculateStartEnd(endPosition + 1, size);
}
private void shutdown() {
try {
this.rAccessFile.close();
} catch (IOException e) {
e.printStackTrace();
}
this.executorService.shutdown();
}
/**
* 将每一行的数据转成指定格式的字符串
* @param bytes
* @throws UnsupportedEncodingException
*/
private void handle(byte[] bytes) throws UnsupportedEncodingException {
String line = null;
if (this.charset == null) {
line = new String(bytes);
} else {
line = new String(bytes, charset);
}
if (line != null && !"".equals(line)) {
this.handle.handle(line);
counter.incrementAndGet();
}
}
/**
* 每个分片的片段
*/
private static class StartEndPair {
// 片段的起始位置
public long start;
// 片段的结束位置
public long end;
@Override
public String toString() {
return "star=" + start + ";end=" + end;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (end ^ (end >>> 32));
result = prime * result + (int) (start ^ (start >>> 32));
return result;
}
/**
* 判断是否为同一片段
*
* @param obj
* @return
*/
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
StartEndPair other = (StartEndPair) obj;
if (end != other.end)
return false;
if (start != other.start)
return false;
return true;
}
}
/**
* 读取一个分片的数据线程
*/
private class SliceReaderTask implements Runnable {
private long start;
private long sliceSize;
private byte[] readBuff;
public SliceReaderTask(StartEndPair pair) {
this.start = pair.start;
this.sliceSize = pair.end - pair.start + 1;
this.readBuff = new byte[bufferSize];
}
@Override
public void run() {
try {
MappedByteBuffer mapBuffer = rAccessFile.getChannel().map(MapMode.READ_ONLY, start, this.sliceSize);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
for (int offset = 0; offset < sliceSize; offset += bufferSize) {
int readLength;
if (offset + bufferSize <= sliceSize) {
readLength = bufferSize;
} else {
readLength = (int) (sliceSize - offset);
}
mapBuffer.get(readBuff, 0, readLength);
for (int i = 0; i < readLength; i++) {
byte tmp = readBuff[i];
if (tmp == '\n' || tmp == '\r') {
handle(bos.toByteArray());
bos.reset();
} else {
bos.write(tmp);
}
}
}
if (bos.size() > 0) {
handle(bos.toByteArray());
}
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 构建BigFileReader类
*/
public static class Builder {
private int threadSize = 1;
private String charset = null;
private int bufferSize = 1024 * 1024;
private IHandle handle;
private File file;
public Builder(String file, IHandle handle) {
this.file = new File(file);
if (!this.file.exists())
throw new IllegalArgumentException("文件不存在!");
this.handle = handle;
}
public Builder withTreahdSize(int size) {
this.threadSize = size;
return this;
}
public Builder withCharset(String charset) {
this.charset = charset;
return this;
}
public Builder withBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
public BigFileReader build() {
return new BigFileReader(this.file, this.handle, this.charset, this.bufferSize, this.threadSize);
}
}
}
/**
* 读取的每一行数据进行处理接口
*/
public interface IHandle {
public void handle(String line);
}
测试代码:
public static void main(String[] args) {
BigFileReader.Builder builder = new BigFileReader.Builder("d:/bigFile.txt",new IHandle() {
@Override
public void handle(String line) {
System.out.println(line);
}
});
builder.withTreahdSize(10)
.withCharset("gbk")
.withBufferSize(1024*1024);
BigFileReader bigFileReader = builder.build();
// 读取完文件会关闭流
bigFileReader.start();
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/5338.html