Path sourceFile = Paths.get("/home/Benjamin/Desktop/access-logs/access.log-20181206.gz.csv").toAbsolutePath();
//build pipeline
Pipeline p = Pipeline.create();
//read from flat file
BatchStage<HTTPRequest> stage = p.drawFrom(
filesBuilder(sourceFile.getParent().toString())
.glob(sourceFile.getFileName().toString())
.build((file, line) -> {
String[] split = line.split(",");
HTTPRequest httpRequest = new HTTPRequest();
httpRequest.setSourceIP(split[0]);
httpRequest.setRequestDateTime(DATE_FORMAT.parse(split[1]).getTime());
httpRequest.setMethodType(split[2]);
httpRequest.setUrlCall(split[3]);
httpRequest.setHttpType(split[4]);
httpRequest.setHttpRespCode(split[5]);
httpRequest.setContentLength(split[6]);
httpRequest.setClientType(split[7]);
httpRequest.setUpstreamServerIP(split[8]);
httpRequest.setProcessingTime(split[9]);
httpRequest.setUpstreamConnectTime(split[10]);
httpRequest.setUpstreamResponseTime(split[11]);
httpRequest.setModule(split[12].trim());
return httpRequest;
}));
Path sourceFile = Paths.get("/home/Benjamin/Desktop/access-logs/").toAbsolutePath();
//build pipeline
Pipeline p = Pipeline.create();
//read from flat file
BatchStage<HTTPRequest> stage = p.drawFrom(
filesBuilder(sourceFile.toString())
.glob("*.csv")
.build((file, line) -> {
String[] split = line.split(",");
HTTPRequest httpRequest = new HTTPRequest();
httpRequest.setSourceIP(split[0]);
httpRequest.setRequestDateTime(DATE_FORMAT.parse(split[1]).getTime());
httpRequest.setMethodType(split[2]);
httpRequest.setUrlCall(split[3]);
httpRequest.setHttpType(split[4]);
httpRequest.setHttpRespCode(split[5]);
httpRequest.setContentLength(split[6]);
httpRequest.setClientType(split[7]);
httpRequest.setUpstreamServerIP(split[8]);
httpRequest.setProcessingTime(split[9]);
httpRequest.setUpstreamConnectTime(split[10]);
httpRequest.setUpstreamResponseTime(split[11]);
httpRequest.setModule(split[12].trim());
return httpRequest;
}));
run:
Dec 28, 2018 12:06:19 PM com.hazelcast.core.LifecycleService
INFO: hz.client_0 [jet] [0.7] [3.10.5] HazelcastClient 3.10.5 (20180913 - 6ffa2ee) is STARTING
Dec 28, 2018 12:06:20 PM com.hazelcast.client.spi.ClientInvocationService
INFO: hz.client_0 [jet] [0.7] [3.10.5] Running with 2 response threads
Dec 28, 2018 12:06:20 PM com.hazelcast.core.LifecycleService
INFO: hz.client_0 [jet] [0.7] [3.10.5] HazelcastClient 3.10.5 (20180913 - 6ffa2ee) is STARTED
Dec 28, 2018 12:06:20 PM com.hazelcast.client.connection.ClientConnectionManager
INFO: hz.client_0 [jet] [0.7] [3.10.5] Trying to connect to [localhost]:6701 as owner member
Dec 28, 2018 12:06:20 PM com.hazelcast.client.connection.ClientConnectionManager
INFO: hz.client_0 [jet] [0.7] [3.10.5] Setting ClientConnection{alive=true, connectionId=1, channel=NioChannel{/127.0.0.1:41186->localhost/127.0.0.1:6701}, remoteEndpoint=[localhost]:6701, lastReadTime=2018-12-28 12:06:20.296, lastWriteTime=2018-12-28 12:06:20.295, closedTime=never, lastHeartbeatRequested=never, lastHeartbeatReceived=never, connected server version=3.10.5} as owner with principal ClientPrincipal{uuid='d116821b-3794-42f8-b370-6f08a1aceca8', ownerUuid='64959856-d40b-4a0e-8f70-8359de9d19a9'}
Dec 28, 2018 12:06:20 PM com.hazelcast.client.connection.ClientConnectionManager
INFO: hz.client_0 [jet] [0.7] [3.10.5] Authenticated with server [localhost]:6701, server version:3.10.5 Local address: /127.0.0.1:41186
Dec 28, 2018 12:06:20 PM com.hazelcast.client.spi.impl.ClientMembershipListener
INFO: hz.client_0 [jet] [0.7] [3.10.5]
Members [1] {
Member [localhost]:6701 - 64959856-d40b-4a0e-8f70-8359de9d19a9
}
Dec 28, 2018 12:06:20 PM com.hazelcast.core.LifecycleService
INFO: hz.client_0 [jet] [0.7] [3.10.5] HazelcastClient 3.10.5 (20180913 - 6ffa2ee) is CLIENT_CONNECTED
Dec 28, 2018 12:06:20 PM com.hazelcast.internal.diagnostics.Diagnostics
INFO: hz.client_0 [jet] [0.7] [3.10.5] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments.
digraph Pipeline {
"filesSource(/home/Benjamin/Desktop/access-logs/*.csv)" -> "filter";
"filter" -> "timestamp";
"timestamp" -> "sliding-window";
"timestamp" -> "sliding-window-2";
"timestamp" -> "sliding-window-3";
"timestamp" -> "sliding-window-4";
"timestamp" -> "sliding-window-5";
"sliding-window" -> "map";
"map" -> "remoteMapSink(mdstcnt)";
"sliding-window-2" -> "map-2";
"map-2" -> "remoteMapSink(httpcodecnt)";
"sliding-window-3" -> "map-3";
"map-3" -> "remoteMapSink(proctimeavg)";
"sliding-window-4" -> "map-4";
"map-4" -> "remoteMapSink(upstreamcnt)";
"sliding-window-5" -> "map-5";
"map-5" -> "remoteMapSink(clustercnt)";
}
Dec 28, 2018 12:06:20 PM com.hazelcast.core.LifecycleService
INFO: hz.client_0 [jet] [0.7] [3.10.5] HazelcastClient 3.10.5 (20180913 - 6ffa2ee) is SHUTTING_DOWN
Dec 28, 2018 12:06:20 PM com.hazelcast.client.connection.ClientConnectionManager
INFO: hz.client_0 [jet] [0.7] [3.10.5] Removed connection to endpoint: [localhost]:6701, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:41186->localhost/127.0.0.1:6701}, remoteEndpoint=[localhost]:6701, lastReadTime=2018-12-28 12:06:20.988, lastWriteTime=2018-12-28 12:06:20.602, closedTime=2018-12-28 12:06:20.991, lastHeartbeatRequested=never, lastHeartbeatReceived=never, connected server version=3.10.5}
Dec 28, 2018 12:06:21 PM com.hazelcast.core.LifecycleService
INFO: hz.client_0 [jet] [0.7] [3.10.5] HazelcastClient 3.10.5 (20180913 - 6ffa2ee) is SHUTDOWN
Exception in thread "main" java.util.concurrent.ExecutionException: com.hazelcast.jet.JetException: Exception in ProcessorTasklet{Compute-Daily-Txns/filesSource(/home/Benjamin/Desktop/access-logs/*.csv)#0}: java.lang.NumberFormatException: For input string: ""
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at com.hazelcast.jet.Job.lambda$join$0(Job.java:92)
at com.hazelcast.jet.impl.util.Util.uncheckRun(Util.java:111)
at com.hazelcast.jet.Job.join(Job.java:92)
at com.ben.mscit.HZJetIMDGConnector.main(HZJetIMDGConnector.java:134)
Caused by: com.hazelcast.jet.JetException: Exception in ProcessorTasklet{Compute-Daily-Txns/filesSource(/home/Benjamin/Desktop/access-logs/*.csv)#0}: java.lang.NumberFormatException: For input string: ""
at com.hazelcast.jet.impl.execution.TaskletExecutionService$BlockingWorker.run(TaskletExecutionService.java:254)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
at ------ submitted from ------.(Unknown Source)
at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolve(InvocationFuture.java:127)
at com.hazelcast.spi.impl.AbstractInvocationFuture$1.run(AbstractInvocationFuture.java:250)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:64)
at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:80)
Caused by: com.hazelcast.client.UndefinedErrorCodeException: Class name: java.lang.NumberFormatException, Message: For input string: ""
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:601)
at java.lang.Long.parseLong(Long.java:631)
at java.text.DigitList.getLong(DigitList.java:195)
at java.text.DecimalFormat.parse(DecimalFormat.java:2051)
at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1869)
at java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1514)
at java.text.DateFormat.parse(DateFormat.java:364)
at com.ben.mscit.HZJetIMDGConnector.lambda$main$4bf612ba$1(HZJetIMDGConnector.java:58)
at com.hazelcast.jet.function.DistributedBiFunction.apply(DistributedBiFunction.java:41)
at com.hazelcast.jet.impl.connector.ReadFilesP.lambda$processFile$0(ReadFilesP.java:113)
at com.hazelcast.jet.Traverser.lambda$map$0(Traverser.java:58)
at com.hazelcast.jet.Traverser$5.next(Traverser.java:214)
at com.hazelcast.jet.impl.util.FlatMappingTraverser.next(FlatMappingTraverser.java:50)
at com.hazelcast.jet.core.AbstractProcessor.emitFromTraverser(AbstractProcessor.java:402)
at com.hazelcast.jet.core.AbstractProcessor.emitFromTraverser(AbstractProcessor.java:416)
at com.hazelcast.jet.impl.connector.ReadFilesP.complete(ReadFilesP.java:94)
at com.hazelcast.jet.impl.execution.ProcessorTasklet.stateMachineStep(ProcessorTasklet.java:350)
at com.hazelcast.jet.impl.execution.ProcessorTasklet.call(ProcessorTasklet.java:218)
at com.hazelcast.jet.impl.execution.ProcessorTasklet.call(ProcessorTasklet.java:211)
at com.hazelcast.jet.impl.execution.TaskletExecutionService$BlockingWorker.run(TaskletExecutionService.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
/home/Benjamin/.cache/netbeans/8.2/executor-snippets/run.xml:53: Java returned: 1
BUILD FAILED (total time: 2 seconds)
if (split[1].trim().isEmpty()) {
System.out.println("File: " + file + ", line: " + line);
}
--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/105a1210-9ca9-493d-a9cb-0ad9dc39a195%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/feedcdf6-fb85-43ba-9df2-c7748bfb325a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
package test;
import com.ben.mscit.HTTPRequest;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import static com.hazelcast.jet.pipeline.Sources.filesBuilder;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
*
* @author benjamin
*/
public class MultipleFileReaderTestClass {
public static void main(String[] args) {
Path sourceFile = Paths.get("/home/Benjamin/Desktop/access-logs/").toAbsolutePath();
//build pipeline
Pipeline p = Pipeline.create();
//read from flat file
BatchStage<HTTPRequest> stage = p.drawFrom(
filesBuilder(sourceFile.toString())
.glob("file*")
.build((file, line) -> {
String[] split = line.split(",");
HTTPRequest httpRequest = new HTTPRequest();
httpRequest.setSourceIP(split[0]);
httpRequest.setRequestDateTime(split[1]);
httpRequest.setMethodType(split[2]);
httpRequest.setUrlCall(split[3]);
httpRequest.setHttpType(split[4]);
httpRequest.setHttpRespCode(split[5]);
httpRequest.setContentLength(split[6]);
httpRequest.setClientType(split[7]);
httpRequest.setUpstreamServerIP(split[8]);
httpRequest.setProcessingTime(split[9]);
httpRequest.setUpstreamConnectTime(split[10]);
httpRequest.setUpstreamResponseTime(split[11]);
httpRequest.setModule(split[12].trim());
return httpRequest;
}));
stage.addTimestamps((httpRequest) -> new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss").parse(httpRequest.getRequestDateTime()).getTime(), SECONDS.toMillis(5))
.drainTo(Sinks.logger());
JetInstance jet = Jet.newJetInstance();
try {
jet.newJob(p).join();
} catch (Exception ex) {
ex.printStackTrace(System.err);
} finally {
jet.shutdown();
}
}
}
private static ThreadLocal<SimpleDateFormat> DATE_FORMAT = ThreadLocal.withInitial(() ->
new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss"));
httpRequest.setRequestDateTime(DATE_FORMAT.get().parse(split[1]).getTime());
--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/16d8c230-1ffb-47e8-8879-9673fe038255%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("dd/MMM/yyyy:HH:mm:ss");
2) Parse using DateTimeFormatter
httpRequest.setRequestDateTime(LocalDateTime.from(DATE_FORMAT.parse(split[1])).atZone(ZoneId.systemDefault()).toEpochSecond() * 1000);
I tested with the files you shared & it's working fine.
Thanks
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/CANVo842%2BfqYL-2JLfpCcdpxjfWer3gdQ2XirqNjLNp7XbaKm9A%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.
Gokhan Oner
Senior Solutions Architect, TechOps
350 Cambridge Ave #100, Palo Alto, CA 94306 USA
VALUE_MAP_DATE_FORMAT
private static final SimpleDateFormat VALUE_MAP_DATE_FORMAT = new SimpleDateFormat("\"HH:mm\"");
private static final ThreadLocal<SimpleDateFormat> DATE_FORMAT = ThreadLocal.withInitial(() -> new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss"));
public static void main(String[] args) {
Path sourceFile = Paths.get("/home/Benjamin/Desktop/access-logs/").toAbsolutePath();
//build pipeline
Pipeline p = Pipeline.create();
//read from flat file
BatchStage<HTTPRequest> stage = p.drawFrom(
filesBuilder(sourceFile.toString())
.glob("file*")
.build((file, line) -> {
String[] split = line.split(",");
HTTPRequest httpRequest = new HTTPRequest();
httpRequest.setSourceIP(split[0]);
httpRequest.setRequestDateTime(DATE_FORMAT.get().parse(split[1]).getTime());
httpRequest.setMethodType(split[2]);
httpRequest.setUrlCall(split[3]);
httpRequest.setHttpType(split[4]);
httpRequest.setHttpRespCode(split[5]);
httpRequest.setContentLength(split[6]);
httpRequest.setClientType(split[7]);
httpRequest.setUpstreamServerIP(split[8]);
httpRequest.setProcessingTime(split[9]);
httpRequest.setUpstreamConnectTime(split[10]);
httpRequest.setUpstreamResponseTime(split[11]);
httpRequest.setModule(split[12].trim());
return httpRequest;
}));
try {
//looking at events in last 10secs and windwed at 5 Minutes
StageWithWindow<HTTPRequest> stageWithWindow = stage
.filter((httpRequest) -> httpRequest.getModule().length() > 1 && httpRequest.getUpstreamServerIP().length() > 1)
.addTimestamps(HTTPRequest::getRequestDateTime, SECONDS.toMillis(10))
.window(tumbling(MINUTES.toMillis(5)));
//module hits for looking at events in last 10secs and windwed at 5 secs
stageWithWindow
.groupingKey(HTTPRequest::getModule)
.aggregate(counting())
.map((TimestampedEntry<String, Long> t) -> entry(t.getKey(), "{\"time\":" + VALUE_MAP_DATE_FORMAT.format(new Date(t.getTimestamp())) + ",\"value\":" + t.getValue() + "}"))
.drainTo(Sinks.logger());
}
Thanks Gokhan,This works as well, I deeply grateful.
--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/3fc32c7e-87e3-4d72-9ad2-0bce3e82a9c4%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/CAHbLCXO5aqY9-M_g20PwE1b58h8srmg3SyF5n2aNnGKg4ms1Kg%40mail.gmail.com.
stage.window(tumbling(SECONDS.toMillis(5)))
.groupingKey(HTTPRequest::getModule)
.aggregate(counting())
.map((TimestampedEntry<String, Long> t) -> entry(t.getKey(), "{\"time\":" + VALUE_MAP_DATE_FORMAT.get().format(new Date(t.getTimestamp())) + ",\"value\":" + t.getValue() + "}"))
.drainTo(Sinks.logger());
StreamStage<HTTPRequest> stage = p.drawFrom(filesBuilder(sourceFile.toString())
.glob("file*").build((file, line) -> line))
.mapUsingContext(ContextFactory.withCreateFn(
jet -> new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss")),
(format, line) -> {
String[] split = line.split(",");
HTTPRequest httpRequest = new HTTPRequest();
httpRequest.setSourceIP(split[0]);
httpRequest.setRequestDateTime(format.parse(split[1]).getTime());
httpRequest.setMethodType(split[2]);
httpRequest.setUrlCall(split[3]);
httpRequest.setHttpType(split[4]);
httpRequest.setHttpRespCode(split[5]);
httpRequest.setContentLength(split[6]);
httpRequest.setClientType(split[7]);
httpRequest.setUpstreamServerIP(split[8]);
httpRequest.setProcessingTime(split[9]);
httpRequest.setUpstreamConnectTime(split[10]);
httpRequest.setUpstreamResponseTime(split[11]);
httpRequest.setModule(split[12].trim());
return httpRequest;
})
.addTimestamps(HTTPRequest::getRequestDateTime, SECONDS.toMillis(5));
--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/6e97fbdb-72c8-4391-a091-b0281d016d89%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
stage.window(tumbling(SECONDS.toMillis(5)))
.groupingKey(HTTPRequest::getModule)
.aggregate(counting())
.mapUsingContext(ContextFactory.withCreateFn((JetInstance t) -> new SimpleDateFormat("\"HH:mm\"")), (SimpleDateFormat fmt, TimestampedEntry<String, Long> u) -> entry(u.getKey(), "{\"time\":" + fmt.format(new Date(u.getTimestamp())) + ",\"value\":" + u.getValue() + "}"))
.drainTo(Sinks.logger());
--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/13bee072-417d-478a-baf9-3402d893f203%40googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To post to this group, send email to hazelc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/c8b4ff49-1eb9-4960-931e-c188df8481b5%40googlegroups.com.