1
基本上我必須使用多線程處理一個大型的csv文件,其中包含近100萬條記錄。ExecutorService多線程無法正常工作,但在調試模式下正常工作
我創建了一個類IngestionCallerThread
public class IngestionCallerThread {
public static void main(String[] args) {
try {
int count = 0;
InputStream ios = IngestionCallerThread.class.getClassLoader().getResourceAsStream("aa10.csv");
byte[] buff = new byte[8000];
int bytesRead = 0;
ByteArrayOutputStream bao = new ByteArrayOutputStream();
while ((bytesRead = ios.read(buff)) != -1) {
bao.write(buff, 0, bytesRead);
}
byte[] data = bao.toByteArray();
ByteArrayInputStream bin = new ByteArrayInputStream(data);
BufferedReader fileInputStreamBufferedReader = new BufferedReader(new InputStreamReader(bin));
while ((fileInputStreamBufferedReader.readLine()) != null) {
count++;
}
bin.reset();
int numberOfThreads = 12;
int rowsForEachThread = count/numberOfThreads;
int remRows = count % numberOfThreads;
int startPosition = 0;
System.out.println(count);
ExecutorService es = Executors.newCachedThreadPool();
for (int i = 0; i < numberOfThreads && startPosition < count; i++) {
if (remRows > 0 && i + 1 >= numberOfThreads)
rowsForEachThread = remRows;
IngestionThread ingThread = new IngestionThread(bin, startPosition, rowsForEachThread);
es.execute(ingThread);
startPosition = (startPosition + rowsForEachThread);
}
es.shutdown();
if (es.isTerminated()) {
System.out.println("Completed");
}
// t2.start();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
,我用它來打電話
public class IngestionThread implements Runnable {
InputStream is;
long startPosition;
long length;
public IngestionThread(InputStream targetStream, long position, long length) {
this.is = targetStream;
this.startPosition = position;
this.length = length;
}
@Override
public void run() {
// TODO Auto-generated method stub
int currentPosition = 0;
try {
is.reset();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
BufferedReader fileInputStreamBufferedReader = new BufferedReader(new InputStreamReader(is));
if (startPosition != 0) {
String line;
try {
while (((line = fileInputStreamBufferedReader.readLine())) != null) {
if (currentPosition + 1 == startPosition)
break;
currentPosition++;
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
try {
int execLength = 0;
String line;
while ((line = fileInputStreamBufferedReader.readLine()) != null && execLength < length) {
System.out.println(line);
execLength++;
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
我用的20條小csv文件測試,我已經實現了另一個運行的類。問題是當我調試類幾乎所有的記錄正在打印。但是當我運行這個類時,有時會讀取15條記錄,有時會讀取12條記錄。我不確定是什麼問題。任何幫助將非常感激。提前致謝。
修復了這個問題,它像一個魅力工作..非常感謝.. :) –