Hello,
I'm currently trying Jet. My purpose is to read lines from a csv file and apply heavy computation on each line.
When I test my pipeline, I launch two node from jet-start.sh then I submit my job from jet-submit.sh.
But I only see logs on one node ! I understand that the reading of the file could be done only on one node, but I was thinking that the other process (filter, map) would be distributed.
I past you the code, if you see something. I'm disappointed.
Thanks in advance.
public class PocJet {
public static void main(String[] args) {
int nbInstances=2;
// create 4 instances;
//List<JetInstance> jetInstances = IntStream.range(0, nbInstances).mapToObj(n -> Jet.newJetInstance()).collect(Collectors.toList());
try {
// create jet client
JetInstance jetClient = JetBootstrap.getInstance();
System.out.println("Prepare big map");
IMapJet<String, List<String[]>> bigMap = jetClient.getMap("bigmap");
// 1000 0000 lines in total, 20 line per code -> 50 000 code, the purpose is to simulate high cpu process, for one entry, test 20 lines, for each line test 15 fields.
for ( int index =0 ; index < 50000 ; index++ ) {
List<String[]> dataList = new ArrayList<>(20);
for ( int line = 0 ; line < 20 ; line ++) {
String data[] = new String[15];
dataList.add(data);
for ( int dataIndex = 0 ; dataIndex < 15 ; dataIndex++) {
data[dataIndex] = UUID.randomUUID().toString();
}
}
bigMap.put("INV"+index,dataList);
}
System.out.println(" Big map done" );
long begin=System.currentTimeMillis();
jetClient.newJob(new PipelineCSV().create()).join();
long total = System.currentTimeMillis() - begin;
System.out.printf("End in "+total+" ms");
jetClient.shutdown();
// create pipeline
} finally {
Jet.shutdownAll();
}
}
}
public class PipelineCSV implements Serializable {
private Map<String, FileInfo> fileInfoMap;
public Pipeline create() {
Pipeline pipeline = Pipeline.create();
fileInfoMap = new HashMap<>();
//String inputDir = "/Users/yblazart/Dev/projects/bycode/pochzcjet/data";
String inputDir = "/Volumes/Untitled/";
BatchSource<FileData> source = Sources.filesBuilder(inputDir)
.charset(UTF_8)
.glob("position-big.csv")
.sharedFileSystem(true)
.build((file, line) -> {
FileInfo fileInfo = fileInfoMap.computeIfAbsent(file, key -> FileInfo.builder().lineCount(0L).sourceName(key).headers(line.split(",")).build()); // compute header
return FileData.builder().line(line).lineNumber(fileInfo.getAndIncrementLineCount()).sourceFileName(file).build();
});
pipeline
.drawFrom(source)
.filter(fd->fd.getLineNumber()>0) // skip header, managed in source
.map(fd->fd.parseLine()) // parse in multi thread
.map(fd-> {
fd.getData()[2]= UUID.randomUUID().toString(); // dummy replacement of value
return fd;
})
.mapUsingIMap("bigmap",(bigMap,fd) -> { // simulate high compute cpu
long index = fd.getLineNumber() % 50000;
List<String[]> liste = (List<String[]>) bigMap.get("INV"+index);
for ( String [] data : liste ) {
for ( int dataIndex = 0 ; dataIndex < 15 ; dataIndex ++ ) {
if ( data[dataIndex].equals(fd.getData()[dataIndex])) {
System.out.println("GOTCHA !");
}
}
}
// if ( fd.getLineNumber() % 1_000_000 ==0 ) {
System.out.println("Thread.currentThread() = " + Thread.currentThread());
System.out.println("fd = " + fd);
// }
return fd;
})
.drainTo(Sinks.logger());
return pipeline;
}
}
@Data
@Builder
public class FileInfo implements Serializable {
private long lineCount;
private String sourceName;
private String headers[];
public long getAndIncrementLineCount() {
return lineCount++;
}
}
@Data
@Builder
public class FileData implements Serializable {
private Long lineNumber;
private String sourceFileName;
private String line;
private String data[];
public FileData parseLine() {
data = line.split(",");
line=null;
return this;
}
}