import java.time.Instant;
import java.util.concurrent.atomic.AtomicLong;
import net.openhft.chronicle.Chronicle;
import net.openhft.chronicle.ChronicleQueueBuilder;
import net.openhft.chronicle.ExcerptAppender;
import net.openhft.lang.model.Byteable;
import net.openhft.lang.model.DataValueClasses;
public class ChronicleV3 {
private static int MAX_BUFFER_SIZE = 256;
public interface IByteableEvent extends Byteable {
void setId(long id);
long getId();
void setTimestamp(long epochSecond);
long getTimestamp();
void setTimestampSp(int nanosp);
int getTimestampSp();
}
public static void main(String[] args) throws InterruptedException {
long nbEvent = 10_000_000;
String path = "./data/indexed";
AtomicLong counter = new AtomicLong();
try (Chronicle queue = ChronicleQueueBuilder.indexed(path).build()) {
final IByteableEvent eventToWrite = DataValueClasses.newDirectInstance(IByteableEvent.class);
final ExcerptAppender appender = queue.createAppender();
for (long i = 0L; i < nbEvent; i++) {
appender.startExcerpt(eventToWrite.maxSize() + MAX_BUFFER_SIZE);
Instant now = Instant.now();
byte[] data = new byte[MAX_BUFFER_SIZE];
// Gestion du store
eventToWrite.bytes(appender, 0);
eventToWrite.setTimestamp(now.getEpochSecond());
eventToWrite.setTimestampSp(now.getNano());
eventToWrite.setId(counter.getAndIncrement());
// Ecriture
appender.position(eventToWrite.maxSize());
appender.write(data);
appender.finish();
if((i % 1_000_000) == 0) {
System.out.println("i : " + i);
}
}
appender.close();
} catch(Exception e) {
System.out.println("Counter KO : " + counter.get());
e.printStackTrace();
}
System.out.println("Counter OK : " + counter.get());
}
}
i : 0
i : 1000000
i : 2000000
i : 3000000
i : 4000000
i : 5000000
i : 6000000
i : 7000000
i : 8000000
i : 9000000
Counter OK : 10000000
import java.time.Instant;
import java.util.concurrent.atomic.AtomicLong;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ChronicleQueueBuilder;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.wire.WireKey;
public class ChronicleV4 {
private static int MAX_BUFFER_SIZE = 256;
public enum EEventField implements WireKey {
TIMESTAMP_EPOCH, TIMESTAMP_SP, ID, DATA
}
public static void main(String[] args) {
long nbEvent = 10_000_000;
String basePath = "./data";
AtomicLong counter = new AtomicLong();
try (ChronicleQueue queue = ChronicleQueueBuilder.single(basePath).build();) {
final ExcerptAppender appender = queue.createAppender();
for (long i = 0L; i < nbEvent; i++) {
Instant now = Instant.now();
byte[] data = new byte[MAX_BUFFER_SIZE];
// Ecriture
appender.writeDocument(w -> w.write(EEventField.TIMESTAMP_EPOCH).int32(now.getEpochSecond()) //
.write(EEventField.TIMESTAMP_SP).int32(now.getNano()) //
.write(EEventField.ID).int64(counter.incrementAndGet()) //
.write(EEventField.DATA).bytes(data));
if((i % 1_000_000) == 0) {
System.out.println("i : " + i);
}
}
} catch(Exception e) {
System.out.println("Counter KO : " + counter.get());
e.printStackTrace();
}
System.out.println("Counter OK : " + counter.get());
}
}
i : 0
i : 1000000
i : 2000000
i : 3000000
Counter KO : 3610744
net.openhft.chronicle.core.io.IORuntimeException: java.io.IOException: java.lang.OutOfMemoryError: Map failed
at net.openhft.chronicle.bytes.MappedBytes.writeCheckOffset(MappedBytes.java:133)
at net.openhft.chronicle.bytes.AbstractBytes.compareAndSwapInt(AbstractBytes.java:145)
at net.openhft.chronicle.wire.AbstractWire.writeHeader(AbstractWire.java:207)
at net.openhft.chronicle.wire.WireOut.writeHeader(WireOut.java:144)
at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreAppender.writingDocument(SingleChronicleQueueExcerpts.java:130)
at net.openhft.chronicle.wire.MarshallableOut.writeDocument(MarshallableOut.java:68)
at gist.ChronicleV4.main(ChronicleV4.java:31)
Caused by: java.io.IOException: java.lang.OutOfMemoryError: Map failed
at net.openhft.chronicle.core.OS.asAnIOException(OS.java:306)
at net.openhft.chronicle.core.OS.map(OS.java:282)
at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:153)
at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:113)
at net.openhft.chronicle.bytes.MappedBytes.writeCheckOffset(MappedBytes.java:130)
... 6 more
Caused by: java.lang.OutOfMemoryError: Map failed
at sun.nio.ch.FileChannelImpl.map0(Native Method)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at net.openhft.chronicle.core.OS.map0(OS.java:290)
at net.openhft.chronicle.core.OS.map(OS.java:278)
... 9 more
At the moment I would expect single to use more heap. If you have a large queue you can increase the block size to reduce memory usage. Single has been tuned for Linux and we expect it to scale better on Linux as it supports memory mapping larger than main memory.
Regards, Peter.
--
You received this message because you are subscribed to the Google Groups "Chronicle" group.
To unsubscribe from this group and stop receiving emails from it, send an email to java-chronicl...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--