Job Running only on one node ???

已查看 17 次
跳至第一个未读帖子

yann.b...@gmail.com

未读,
2018年11月28日 03:09:402018/11/28
收件人 hazelcast-jet
Hello,

I'm currently trying Jet. My purpose is to read lines from a csv file and apply heavy computation on each line.

When I test my pipeline, I launch two node from jet-start.sh then I submit my job from jet-submit.sh.

But I only see logs on one node ! I understand that the reading of the file could be done only on one node, but I was thinking that the other process (filter, map) would be distributed.

I past you the code, if you see something. I'm disappointed.

Thanks in advance.

public class PocJet {


public static void main(String[] args) {

int nbInstances=2;
// create 4 instances;
//List<JetInstance> jetInstances = IntStream.range(0, nbInstances).mapToObj(n -> Jet.newJetInstance()).collect(Collectors.toList());

try {
// create jet client
JetInstance jetClient = JetBootstrap.getInstance();


System.out.println("Prepare big map");


IMapJet<String, List<String[]>> bigMap = jetClient.getMap("bigmap");


// 1000 0000 lines in total, 20 line per code -> 50 000 code, the purpose is to simulate high cpu process, for one entry, test 20 lines, for each line test 15 fields.
for ( int index =0 ; index < 50000 ; index++ ) {
List<String[]> dataList = new ArrayList<>(20);
for ( int line = 0 ; line < 20 ; line ++) {
String data[] = new String[15];
dataList.add(data);
for ( int dataIndex = 0 ; dataIndex < 15 ; dataIndex++) {
data[dataIndex] = UUID.randomUUID().toString();
}
}
bigMap.put("INV"+index,dataList);
}

System.out.println(" Big map done" );

long begin=System.currentTimeMillis();
jetClient.newJob(new PipelineCSV().create()).join();
long total = System.currentTimeMillis() - begin;

System.out.printf("End in "+total+" ms");

jetClient.shutdown();
// create pipeline
} finally {
Jet.shutdownAll();
}

}

}

public class PipelineCSV implements Serializable {

private Map<String, FileInfo> fileInfoMap;

public Pipeline create() {

Pipeline pipeline = Pipeline.create();

fileInfoMap = new HashMap<>();

//String inputDir = "/Users/yblazart/Dev/projects/bycode/pochzcjet/data";
String inputDir = "/Volumes/Untitled/";
BatchSource<FileData> source = Sources.filesBuilder(inputDir)
.charset(UTF_8)
.glob("position-big.csv")
.sharedFileSystem(true)
.build((file, line) -> {
FileInfo fileInfo = fileInfoMap.computeIfAbsent(file, key -> FileInfo.builder().lineCount(0L).sourceName(key).headers(line.split(",")).build()); // compute header
return FileData.builder().line(line).lineNumber(fileInfo.getAndIncrementLineCount()).sourceFileName(file).build();
});


pipeline
.drawFrom(source)
.filter(fd->fd.getLineNumber()>0) // skip header, managed in source
.map(fd->fd.parseLine()) // parse in multi thread
.map(fd-> {
fd.getData()[2]= UUID.randomUUID().toString(); // dummy replacement of value
return fd;
})
.mapUsingIMap("bigmap",(bigMap,fd) -> { // simulate high compute cpu
long index = fd.getLineNumber() % 50000;
List<String[]> liste = (List<String[]>) bigMap.get("INV"+index);
for ( String [] data : liste ) {
for ( int dataIndex = 0 ; dataIndex < 15 ; dataIndex ++ ) {
if ( data[dataIndex].equals(fd.getData()[dataIndex])) {
System.out.println("GOTCHA !");
}
}
}
// if ( fd.getLineNumber() % 1_000_000 ==0 ) {
System.out.println("Thread.currentThread() = " + Thread.currentThread());
System.out.println("fd = " + fd);
// }
return fd;

})
.drainTo(Sinks.logger());
return pipeline;
}


}
 
@Data
@Builder
public class FileInfo implements Serializable {

private long lineCount;
private String sourceName;

private String headers[];

public long getAndIncrementLineCount() {
return lineCount++;
}
}
 
@Data
@Builder
public class FileData implements Serializable {

private Long lineNumber;
private String sourceFileName;
private String line;
private String data[];

public FileData parseLine() {
data = line.split(",");
line=null;
return this;
}
} 
 

Can Gencer

未读,
2018年11月28日 03:22:482018/11/28
收件人 yann.b...@gmail.com、hazelc...@googlegroups.com
Hi Yann,

Currently a single file is only read by one node, and mapping/filtering is a local operation, it will stay in the same node.  If you had multiple files as input this wouldn't be an issue. You can partition the data using .groupingKey().mapUsingIMap() instead. This will also make sure that your map lookups will be local.

yann.b...@gmail.com

未读,
2018年11月28日 04:27:322018/11/28
收件人 hazelcast-jet
Thanks for your quick response ! 

As you can see, I simulate an heavy cpu consuming process. I need this process (my mapUsingImap) being distributed across the cluster (at the end I want one file per node)

How can I do that ?

I was thinking to read file, drainTo IMap, then another pipeline drawFromImap, it should distribute ? But in this case also, what about backpressure ? With a long cpu process, I fear that lines will be inserted faster in Imap than they are consumed, with risk of OOM. 

Am I right ?

Thanks in advance.

Can Gencer

未读,
2018年11月28日 04:32:272018/11/28
收件人 yann Blazart、hazelc...@googlegroups.com
Hi Yann,

As I mentioned earlier you can use .groupingKey().mapUsingIMap() to partition the data and distribute it across the network. The groupingKey() operation will partition the data by the supplied key and it will be sent to the corresponding node that owns that key. You need to make sure the key function supplied to groupingKey() will match the key on the IMap entry that you want to read. In your case the keyFn needs to be something like (fd -> "INV" + (fd.getLineNumber() % 50000)) since that's the key you are using on the IMap.

Another option is to split your input into multiple files. If you have one file per node as input then it will be distributed automatically.

--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/34361299-5b06-4f97-a4f3-e54dafd0813e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

yann.b...@gmail.com

未读,
2018年11月28日 16:25:302018/11/28
收件人 hazelcast-jet
Thanks a lot !! it has worked :

.groupingKey(fd->"INV"+(fd.getLineNumber() % 50000))
.mapUsingIMap("bigmap",(fd,lst) -> { // simulate high compute cpu

:)

but now, I'm trying to find why it's lot of much slower than an ugly code.

For a 2Gb file, it take 32sec with 4 jet instances

But the following code take 9.5sec :

System.out.println("Prepare big map");


Map<String, List<String[]>> bigMap = new HashMap<>(50000);


// 1000 0000 lines in total, 20 line per code -> 50 000 code
for ( int index =0 ; index < 50000 ; index++ ) {
List<String[]> dataList = new ArrayList<>(20);
for ( int line = 0 ; line < 20 ; line ++) {
String data[] = new String[15];
dataList.add(data);
for ( int dataIndex = 0 ; dataIndex < 15 ; dataIndex++) {
data[dataIndex] = UUID.randomUUID().toString();
}
}
bigMap.put("INV"+index,dataList);
}

System.out.println(" Big map done" );


long begin=System.currentTimeMillis();

String inputDir = "/Users/yblazart/Dev/work/";

File f=new File(inputDir,"position-big.csv");
Map<String,FileInfo> fileInfoMap = new HashMap<>();
AtomicLong ln = new AtomicLong();
Files.lines(f.toPath()).forEach( line -> {
FileInfo fileInfo = fileInfoMap.computeIfAbsent("a", key -> FileInfo.builder().lineCount(0L).sourceName(key).headers(line.split(",")).build()); // compute header
FileData fd= FileData.builder().line(line).lineNumber(fileInfo.getAndIncrementLineCount()).sourceFileName("a").build();
if ( fd.getLineNumber()==0) return;
fd.parseLine();
fd.getData()[2]= "a";


long index = fd.getLineNumber() % 50000;
    List<String[]> liste = (List<String[]>) bigMap.get("INV"+(fd.getLineNumber() % 50000));
    for ( String [] data : liste ) {
for ( int dataIndex = 0 ; dataIndex < 15 ; dataIndex ++ ) {
if ( data[dataIndex].equals(fd.getData()[dataIndex])) {
System.out.println("GOTCHA !");
}
}
}




});






long total = System.currentTimeMillis() - begin;

System.out.printf("End in "+total+" ms");
回复全部
回复作者
转发
0 个新帖子