Does Hazelcast Jet read/process multiple Files???

28 views
Skip to first unread message

Benjamin E.Ndugga

unread,
Dec 28, 2018, 4:12:32 AM12/28/18
to hazelcast-jet

Hi I have a new Issue where I have failed modify the logic to read multiple files under a directory.

Here is code that reads a singel file. (this runs well)

        Path sourceFile = Paths.get("/home/Benjamin/Desktop/access-logs/access.log-20181206.gz.csv").toAbsolutePath();
       
       
//build pipeline
       
Pipeline p = Pipeline.create();

       
//read from flat file
       
BatchStage<HTTPRequest> stage = p.drawFrom(
                filesBuilder
(sourceFile.getParent().toString())
                       
.glob(sourceFile.getFileName().toString())
                       
.build((file, line) -> {

                           
String[] split = line.split(",");

                           
HTTPRequest httpRequest = new HTTPRequest();

                            httpRequest
.setSourceIP(split[0]);
                            httpRequest
.setRequestDateTime(DATE_FORMAT.parse(split[1]).getTime());
                            httpRequest
.setMethodType(split[2]);
                            httpRequest
.setUrlCall(split[3]);
                            httpRequest
.setHttpType(split[4]);
                            httpRequest
.setHttpRespCode(split[5]);
                            httpRequest
.setContentLength(split[6]);
                            httpRequest
.setClientType(split[7]);
                            httpRequest
.setUpstreamServerIP(split[8]);
                            httpRequest
.setProcessingTime(split[9]);
                            httpRequest
.setUpstreamConnectTime(split[10]);
                            httpRequest
.setUpstreamResponseTime(split[11]);
                            httpRequest
.setModule(split[12].trim());

                           
return httpRequest;

                       
}));



I then modify this to read multiple files and looks like this :


        Path sourceFile = Paths.get("/home/Benjamin/Desktop/access-logs/").toAbsolutePath();

       
//build pipeline
       
Pipeline p = Pipeline.create();

       
//read from flat file
       
BatchStage<HTTPRequest> stage = p.drawFrom(
                filesBuilder
(sourceFile.toString())
                       
.glob("*.csv")
                       
.build((file, line) -> {

                           
String[] split = line.split(",");

                           
HTTPRequest httpRequest = new HTTPRequest();

                            httpRequest
.setSourceIP(split[0]);
                            httpRequest
.setRequestDateTime(DATE_FORMAT.parse(split[1]).getTime());
                            httpRequest
.setMethodType(split[2]);
                            httpRequest
.setUrlCall(split[3]);
                            httpRequest
.setHttpType(split[4]);
                            httpRequest
.setHttpRespCode(split[5]);
                            httpRequest
.setContentLength(split[6]);
                            httpRequest
.setClientType(split[7]);
                            httpRequest
.setUpstreamServerIP(split[8]);
                            httpRequest
.setProcessingTime(split[9]);
                            httpRequest
.setUpstreamConnectTime(split[10]);
                            httpRequest
.setUpstreamResponseTime(split[11]);
                            httpRequest
.setModule(split[12].trim());

                           
return httpRequest;

                       
}));

I get the following Error:

run:
Dec 28, 2018 12:06:19 PM com.hazelcast.core.LifecycleService
INFO
: hz.client_0 [jet] [0.7] [3.10.5] HazelcastClient 3.10.5 (20180913 - 6ffa2ee) is STARTING
Dec 28, 2018 12:06:20 PM com.hazelcast.client.spi.ClientInvocationService
INFO
: hz.client_0 [jet] [0.7] [3.10.5] Running with 2 response threads
Dec 28, 2018 12:06:20 PM com.hazelcast.core.LifecycleService
INFO
: hz.client_0 [jet] [0.7] [3.10.5] HazelcastClient 3.10.5 (20180913 - 6ffa2ee) is STARTED
Dec 28, 2018 12:06:20 PM com.hazelcast.client.connection.ClientConnectionManager
INFO
: hz.client_0 [jet] [0.7] [3.10.5] Trying to connect to [localhost]:6701 as owner member
Dec 28, 2018 12:06:20 PM com.hazelcast.client.connection.ClientConnectionManager
INFO
: hz.client_0 [jet] [0.7] [3.10.5] Setting ClientConnection{alive=true, connectionId=1, channel=NioChannel{/127.0.0.1:41186->localhost/127.0.0.1:6701}, remoteEndpoint=[localhost]:6701, lastReadTime=2018-12-28 12:06:20.296, lastWriteTime=2018-12-28 12:06:20.295, closedTime=never, lastHeartbeatRequested=never, lastHeartbeatReceived=never, connected server version=3.10.5} as owner with principal ClientPrincipal{uuid='d116821b-3794-42f8-b370-6f08a1aceca8', ownerUuid='64959856-d40b-4a0e-8f70-8359de9d19a9'}
Dec 28, 2018 12:06:20 PM com.hazelcast.client.connection.ClientConnectionManager
INFO
: hz.client_0 [jet] [0.7] [3.10.5] Authenticated with server [localhost]:6701, server version:3.10.5 Local address: /127.0.0.1:41186
Dec 28, 2018 12:06:20 PM com.hazelcast.client.spi.impl.ClientMembershipListener
INFO: hz.client_0 [jet] [0.7] [3.10.5]

Members [1] {
    Member [localhost]:6701 - 64959856-d40b-4a0e-8f70-8359de9d19a9
}

Dec 28, 2018 12:06:20 PM com.hazelcast.core.LifecycleService
INFO: hz.client_0 [jet] [0.7] [3.10.5] HazelcastClient 3.10.5 (20180913 - 6ffa2ee) is CLIENT_CONNECTED
Dec 28, 2018 12:06:20 PM com.hazelcast.internal.diagnostics.Diagnostics
INFO: hz.client_0 [jet] [0.7] [3.10.5] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments.
digraph Pipeline {
    "filesSource(/
home/Benjamin/Desktop/access-logs/*.csv)" -> "filter";
    "filter" -> "timestamp";
    "timestamp" -> "sliding-window";
    "timestamp" -> "sliding-window-2";
    "timestamp" -> "sliding-window-3";
    "timestamp" -> "sliding-window-4";
    "timestamp" -> "sliding-window-5";
    "sliding-window" -> "map";
    "map" -> "remoteMapSink(mdstcnt)";
    "sliding-window-2" -> "map-2";
    "map-2" -> "remoteMapSink(httpcodecnt)";
    "sliding-window-3" -> "map-3";
    "map-3" -> "remoteMapSink(proctimeavg)";
    "sliding-window-4" -> "map-4";
    "map-4" -> "remoteMapSink(upstreamcnt)";
    "sliding-window-5" -> "map-5";
    "map-5" -> "remoteMapSink(clustercnt)";
}
Dec 28, 2018 12:06:20 PM com.hazelcast.core.LifecycleService
INFO: hz.client_0 [jet] [0.7] [3.10.5] HazelcastClient 3.10.5 (20180913 - 6ffa2ee) is SHUTTING_DOWN
Dec 28, 2018 12:06:20 PM com.hazelcast.client.connection.ClientConnectionManager
INFO: hz.client_0 [jet] [0.7] [3.10.5] Removed connection to endpoint: [localhost]:6701, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:41186->localhost/127.0.0.1:6701}, remoteEndpoint=[localhost]:6701, lastReadTime=2018-12-28 12:06:20.988, lastWriteTime=2018-12-28 12:06:20.602, closedTime=2018-12-28 12:06:20.991, lastHeartbeatRequested=never, lastHeartbeatReceived=never, connected server version=3.10.5}
Dec 28, 2018 12:06:21 PM com.hazelcast.core.LifecycleService
INFO: hz.client_0 [jet] [0.7] [3.10.5] HazelcastClient 3.10.5 (20180913 - 6ffa2ee) is SHUTDOWN
Exception in thread "main" java.util.concurrent.ExecutionException: com.hazelcast.jet.JetException: Exception in ProcessorTasklet{Compute-Daily-Txns/filesSource(/home/Benjamin/Desktop/access-logs/*.csv)#0}: java.lang.NumberFormatException: For input string: ""
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
    at com.hazelcast.jet.Job.lambda$join$0(Job.java:92)
    at com.hazelcast.jet.impl.util.Util.uncheckRun(Util.java:111)
    at com.hazelcast.jet.Job.join(Job.java:92)
    at com.ben.mscit.HZJetIMDGConnector.main(HZJetIMDGConnector.java:134)
Caused by: com.hazelcast.jet.JetException: Exception in ProcessorTasklet{Compute-Daily-Txns/filesSource(/home/Benjamin/Desktop/access-logs/*.csv)#0}: java.lang.NumberFormatException: For input string: ""
    at com.hazelcast.jet.impl.execution.TaskletExecutionService$BlockingWorker.run(TaskletExecutionService.java:254)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    at ------ submitted from ------.(Unknown Source)
    at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolve(InvocationFuture.java:127)
    at com.hazelcast.spi.impl.AbstractInvocationFuture$1.run(AbstractInvocationFuture.java:250)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:64)
    at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:80)
Caused by: com.hazelcast.client.UndefinedErrorCodeException: Class name: java.lang.NumberFormatException, Message: For input string: ""
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Long.parseLong(Long.java:601)
    at java.lang.Long.parseLong(Long.java:631)
    at java.text.DigitList.getLong(DigitList.java:195)
    at java.text.DecimalFormat.parse(DecimalFormat.java:2051)
    at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1869)
    at java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1514)
    at java.text.DateFormat.parse(DateFormat.java:364)
    at com.ben.mscit.HZJetIMDGConnector.lambda$main$4bf612ba$1(HZJetIMDGConnector.java:58)
    at com.hazelcast.jet.function.DistributedBiFunction.apply(DistributedBiFunction.java:41)
    at com.hazelcast.jet.impl.connector.ReadFilesP.lambda$processFile$0(ReadFilesP.java:113)
    at com.hazelcast.jet.Traverser.lambda$map$0(Traverser.java:58)
    at com.hazelcast.jet.Traverser$5.next(Traverser.java:214)
    at com.hazelcast.jet.impl.util.FlatMappingTraverser.next(FlatMappingTraverser.java:50)
    at com.hazelcast.jet.core.AbstractProcessor.emitFromTraverser(AbstractProcessor.java:402)
    at com.hazelcast.jet.core.AbstractProcessor.emitFromTraverser(AbstractProcessor.java:416)
    at com.hazelcast.jet.impl.connector.ReadFilesP.complete(ReadFilesP.java:94)
    at com.hazelcast.jet.impl.execution.ProcessorTasklet.stateMachineStep(ProcessorTasklet.java:350)
    at com.hazelcast.jet.impl.execution.ProcessorTasklet.call(ProcessorTasklet.java:218)
    at com.hazelcast.jet.impl.execution.ProcessorTasklet.call(ProcessorTasklet.java:211)
    at com.hazelcast.jet.impl.execution.TaskletExecutionService$BlockingWorker.run(TaskletExecutionService.java:243)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
/home/Benjamin/.cache/netbeans/8.2/executor-snippets/run.xml:53: Java returned: 1
BUILD FAILED (total time: 2 seconds)

All the files follow the same format and run well in a single instance where I keep changing the file name in the first logic shown above.

Regards,
Benjamin E Ndugga

file2.csv
file1.csv

Ali Gurbuz

unread,
Dec 28, 2018, 8:32:26 AM12/28/18
to Benjamin E.Ndugga, hazelcast-jet
Hi,

I couldn't reproduce the issue. The issue seems to be coming from a wrong formatted line, can you add a check to pinpoint the problem?
Something like this:

if (split[1].trim().isEmpty()) {
System.out.println("File: " + file + ", line: " + line);
}
--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/105a1210-9ca9-493d-a9cb-0ad9dc39a195%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


--

Ali Gurbuz
Distinguished Engineer

Mahir İz Cad. No:35, Altunizade, İstanbul
a...@hazelcast.com 
+90 507 857 7815
skype: isbiroglu
@aligurbuz

Benjamin E.Ndugga

unread,
Dec 29, 2018, 5:29:43 AM12/29/18
to hazelcast-jet
Hi Ali,

I have attached here the sample project files. The Test Classes end with the .java with the helper class HTTPRequest that holds the mapping of the csv data. The Files to be processed end with the .csv

Kindly use these to reproduce the error as both files are well formatted and have the same number of entries. The Issue here is that Hazelcast Jet fails to read muliple files in a sequential manner.

Regards,
Benjamin E Ndugga
MultipleFileReaderTestClass.java
SingleFileReaderTestClass.java
file1.csv
file2.csv
HTTPRequest.java

Ali Gurbuz

unread,
Dec 29, 2018, 8:25:03 AM12/29/18
to Benjamin E.Ndugga, hazelcast-jet
Hi again, I've used the files you've provided and reproduced the error. The reason of the error is `SimpleDateFormat`, it is not thread safe. When there is a single file only a single thread reads it so there is no issue but when you put multiple files each file will be processed concurrently. If you move the creation of `SimpleDateFormat` into the lambda error is gone.

--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Benjamin E.Ndugga

unread,
Dec 29, 2018, 9:14:49 AM12/29/18
to hazelcast-jet
Hi Ali,

I have modified the logic based on your advise and I have come up with something that looks like this. For every entry that is streamed by the filebuilder under the drawfrom function we add a timestamp by creating a new SimpleDateFormat class.

This has worked well with no Issues however, I am not sure if this serves the memory right as there are many simpleDateFormat objects created.  Please advise if there is a better way of doing this.

package test;

import com.ben.mscit.HTTPRequest;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import static com.hazelcast.jet.pipeline.Sources.filesBuilder;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import static java.util.concurrent.TimeUnit.SECONDS;

/**
 *
 * @author benjamin
 */

public class MultipleFileReaderTestClass {

   
public static void main(String[] args) {


       
Path sourceFile = Paths.get("/home/Benjamin/Desktop/access-logs/").toAbsolutePath();

       
//build pipeline
       
Pipeline p = Pipeline.create();

       
//read from flat file
       
BatchStage<HTTPRequest> stage = p.drawFrom(
                filesBuilder
(sourceFile.toString())

                       
.glob("file*")

                       
.build((file, line) -> {

                           
String[] split = line.split(",");

                           
HTTPRequest httpRequest = new HTTPRequest();

                            httpRequest
.setSourceIP(split[0]);

                            httpRequest
.setRequestDateTime(split[1]);

                            httpRequest
.setMethodType(split[2]);
                            httpRequest
.setUrlCall(split[3]);
                            httpRequest
.setHttpType(split[4]);
                            httpRequest
.setHttpRespCode(split[5]);
                            httpRequest
.setContentLength(split[6]);
                            httpRequest
.setClientType(split[7]);
                            httpRequest
.setUpstreamServerIP(split[8]);
                            httpRequest
.setProcessingTime(split[9]);
                            httpRequest
.setUpstreamConnectTime(split[10]);
                            httpRequest
.setUpstreamResponseTime(split[11]);
                            httpRequest
.setModule(split[12].trim());

                           
return httpRequest;

                       
}));


        stage
.addTimestamps((httpRequest) -> new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss").parse(httpRequest.getRequestDateTime()).getTime(), SECONDS.toMillis(5))
               
.drainTo(Sinks.logger());

       
JetInstance jet = Jet.newJetInstance();

       
try {

            jet
.newJob(p).join();

       
} catch (Exception ex) {

            ex
.printStackTrace(System.err);

       
} finally {
            jet
.shutdown();
       
}
   
}
}

Regards,
Benjamin E Ndugga

Ali Gurbuz

unread,
Dec 29, 2018, 9:57:37 AM12/29/18
to Benjamin E.Ndugga, hazelcast-jet
Yes creating a `SimpleDateFormat` for each line is not a best practice, You can use a thread local maybe:

private static ThreadLocal<SimpleDateFormat> DATE_FORMAT = ThreadLocal.withInitial(() ->
new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss"));

httpRequest.setRequestDateTime(DATE_FORMAT.get().parse(split[1]).getTime());


--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Gokhan Oner

unread,
Dec 29, 2018, 4:20:02 PM12/29/18
to Ali Gurbuz, Benjamin E.Ndugga, hazelcast-jet
Benjamin,

You can also use thread safe DateTimeFormatter like below:

1 ) Define a static date/time formatter:
private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("dd/MMM/yyyy:HH:mm:ss");
2) Parse using DateTimeFormatter
httpRequest.setRequestDateTime(LocalDateTime.from(DATE_FORMAT.parse(split[1])).atZone(ZoneId.systemDefault()).toEpochSecond() * 1000);

I tested with the files you shared & it's working fine.
Thanks


For more options, visit https://groups.google.com/d/optout.


--




Gokhan Oner

Senior Solutions Architect, TechOps


Hazelcast Inc.

350 Cambridge Ave #100, Palo Alto, CA 94306 USA

gok...@hazelcast.com

Benjamin E.Ndugga

unread,
Dec 29, 2018, 4:56:18 PM12/29/18
to hazelcast-jet
 Hi Ali,

Thanks a bunch this has worked well, I have never known that calling the parse Function SimpleDateFormat class is not thread-safe. I wonder how you were able to catch that.

 If you dont mind, Please advise if the following lines of code that emit the BatchStage of type HTTPRequest are thread-safe as shown below, especially the line that converts the results into a  map the results. They are using a different date format named VALUE_MAP_DATE_FORMAT


private static final SimpleDateFormat VALUE_MAP_DATE_FORMAT = new SimpleDateFormat("\"HH:mm\"");

private static final ThreadLocal<SimpleDateFormat> DATE_FORMAT = ThreadLocal.withInitial(() -> new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss"));


   
public static void main(String[] args) {

       
Path sourceFile = Paths.get("/home/Benjamin/Desktop/access-logs/").toAbsolutePath();

       
//build pipeline
       
Pipeline p = Pipeline.create();

       
//read from flat file
       
BatchStage<HTTPRequest> stage = p.drawFrom(
                filesBuilder
(sourceFile.toString())
                       
.glob("file*")
                       
.build((file, line) -> {

                           
String[] split = line.split(",");

                           
HTTPRequest httpRequest = new HTTPRequest();

                            httpRequest
.setSourceIP(split[0]);

                            httpRequest
.setRequestDateTime(DATE_FORMAT.get().parse(split[1]).getTime());

                            httpRequest
.setMethodType(split[2]);
                            httpRequest
.setUrlCall(split[3]);
                            httpRequest
.setHttpType(split[4]);
                            httpRequest
.setHttpRespCode(split[5]);
                            httpRequest
.setContentLength(split[6]);
                            httpRequest
.setClientType(split[7]);
                            httpRequest
.setUpstreamServerIP(split[8]);
                            httpRequest
.setProcessingTime(split[9]);
                            httpRequest
.setUpstreamConnectTime(split[10]);
                            httpRequest
.setUpstreamResponseTime(split[11]);
                            httpRequest
.setModule(split[12].trim());

                           
return httpRequest;

                       
}));


       
try {

           
//looking at events in last 10secs and windwed at 5 Minutes
           
StageWithWindow<HTTPRequest> stageWithWindow = stage
                   
.filter((httpRequest) -> httpRequest.getModule().length() > 1 && httpRequest.getUpstreamServerIP().length() > 1)
                   
.addTimestamps(HTTPRequest::getRequestDateTime, SECONDS.toMillis(10))
                   
.window(tumbling(MINUTES.toMillis(5)));

           
//module hits for looking at events in last 10secs and windwed at 5 secs
            stageWithWindow
                   
.groupingKey(HTTPRequest::getModule)
                   
.aggregate(counting())
                   
.map((TimestampedEntry<String, Long> t) -> entry(t.getKey(), "{\"time\":" + VALUE_MAP_DATE_FORMAT.format(new Date(t.getTimestamp())) + ",\"value\":" + t.getValue() + "}"))
                   
.drainTo(Sinks.logger());
}


Benjamin E.Ndugga

unread,
Dec 29, 2018, 4:57:08 PM12/29/18
to hazelcast-jet


Thanks Gokhan,

This works as well, I deeply grateful.

Gokhan Oner

unread,
Dec 29, 2018, 7:18:16 PM12/29/18
to Benjamin E.Ndugga, hazelcast-jet
Benjamin

Same problem with VALUE_MAP_DATE_FORMAT as well. Since mapping works after grouping, this will also work in parallel for different grouped results. Either use ThreadLocal version of same DateFormat or again use a new DateTimeFormatter for this one as well.

Thanks

On Sat, Dec 29, 2018 at 13:57 Benjamin E.Ndugga <bjnd...@gmail.com> wrote:


Thanks Gokhan,

This works as well, I deeply grateful.

--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Can Gencer

unread,
Dec 31, 2018, 5:03:41 AM12/31/18
to Gokhan Oner, Benjamin E.Ndugga, hazelcast-jet
Instead of ThreadLocal you should use the mapWithContext (https://docs.hazelcast.org/docs/jet/0.7.2/javadoc/com/hazelcast/jet/pipeline/GeneralStage.html#mapUsingContext-com.hazelcast.jet.pipeline.ContextFactory-com.hazelcast.jet.function.DistributedBiFunction-) operator. You create a new context with the DateFormat and this will ensure single-threaded access to it. ThreadLocal in a lambda will be captured and serialised so might cause unexpected behaviour. 

Benjamin E.Ndugga

unread,
Jan 1, 2019, 3:23:53 PM1/1/19
to hazelcast-jet
Hi Can Gencer,

I have well read the documentation on using the mapUsingContext, however I have still failed to see how to add a stage to the StreamStage  and return the new date format. Ideally we want to save the results in JSON format.

Here is the code example we wish you could show us how to change the .map stage and use mapUsingContext to achieve the same thing. Please find attached files.

stage.window(tumbling(SECONDS.toMillis(5)))
               
.groupingKey(HTTPRequest::getModule)
               
.aggregate(counting())
                .map((TimestampedEntry<String, Long> t) -> entry(t.getKey(), "{\"time\":" +  VALUE_MAP_DATE_FORMAT.get().format(new Date(t.getTimestamp())) + ",\"value\":" + t.getValue() + "}"))
               
.drainTo(Sinks.logger());


MultipleFileReaderTestClass.java
file1.csv
file2.csv
file3.csv
HTTPRequest.java

Ali Gurbuz

unread,
Jan 1, 2019, 4:58:12 PM1/1/19
to Benjamin E.Ndugga, hazelcast-jet
You need to do it just after the source stage

StreamStage<HTTPRequest> stage = p.drawFrom(filesBuilder(sourceFile.toString())
.glob("file*").build((file, line) -> line))
.mapUsingContext(ContextFactory.withCreateFn(
jet -> new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss")),
(format, line) -> {

String[] split = line.split(",");

HTTPRequest httpRequest = new HTTPRequest();
httpRequest.setSourceIP(split[0]);
                                              httpRequest.setRequestDateTime(format.parse(split[1]).getTime());

httpRequest.setMethodType(split[2]);
httpRequest.setUrlCall(split[3]);
httpRequest.setHttpType(split[4]);
httpRequest.setHttpRespCode(split[5]);
httpRequest.setContentLength(split[6]);
httpRequest.setClientType(split[7]);
httpRequest.setUpstreamServerIP(split[8]);
httpRequest.setProcessingTime(split[9]);
httpRequest.setUpstreamConnectTime(split[10]);
httpRequest.setUpstreamResponseTime(split[11]);
httpRequest.setModule(split[12].trim());

return httpRequest;
})
                                  .addTimestamps(HTTPRequest::getRequestDateTime, SECONDS.toMillis(5));



--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.


--

Benjamin E.Ndugga

unread,
Jan 2, 2019, 2:58:37 AM1/2/19
to hazelcast-jet
Hi Ali and Team,

So it's safe to say that the best way to achieving this is by using mapUsingContext  I have also added the same lines in the next stage that groups and counts
I must add that it looks quite complex. I hope this is not resource intensive.

Looks something like this:

stage.window(tumbling(SECONDS.toMillis(5)))
               
.groupingKey(HTTPRequest::getModule)
               
.aggregate(counting())
               
.mapUsingContext(ContextFactory.withCreateFn((JetInstance t) -> new SimpleDateFormat("\"HH:mm\"")), (SimpleDateFormat fmt, TimestampedEntry<String, Long> u) -> entry(u.getKey(), "{\"time\":" + fmt.format(new Date(u.getTimestamp())) + ",\"value\":" + u.getValue() + "}"))
               
.drainTo(Sinks.logger());


Regards,
Benjamin E Ndugga

Can Gencer

unread,
Jan 2, 2019, 10:00:59 AM1/2/19
to Benjamin E.Ndugga, hazelcast-jet
Hi Benjamin,

Yes this is correct, the only additional complexity is that you need the context factory, however you can refactor your code to make the factory reusable. It is precisely designed for this use case. It will use less resources than creating a SimpleDateFormat for every object since the same format object will be reused for multiple entries. 

You can also use the ContextFactory approach with a thread-safe date format like the newer DateTimeFormatter and you can mark the ContextFactory with "shareLocally" to mark it as thread-safe. Unless shareLocally is set explicitly, the mapUsingContext guarantees that only one thread will access the provided context a time.



--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.

Benjamin E.Ndugga

unread,
Jan 3, 2019, 4:20:07 AM1/3/19
to hazelcast-jet
Hi Can Gencer,

I alot easier said than done, In my first version I had the simpleDateFormat as a static final variable and there were only read instructions on the object however when it comes to Hazelcast-Jet the logic has to change to suit the way it runs. Jet reads files simultaneously and has to create a new object of the simpleDateFormat and this which is not thread-safe therefore.
How can we make sure we implement it such that there is only one object and can have only read only instructions using the ContextFactory approach with a thread-safety as well.

Regards
Benjamin

Can Gencer

unread,
Jan 3, 2019, 5:53:42 AM1/3/19
to Benjamin E.Ndugga, hazelcast-jet
Hi Ben,

You can't have SimpleDateFormat and have only one instance as it's not thread-safe. The ContextFactory approach that Ali posted earlier should work. Only one SimpleDateFormat per processor instance will be created (typically your core count).

--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages