Marshalling Sessions with temporal operators

104 views
Skip to first unread message

Thomas Allen

unread,
Aug 9, 2023, 12:30:25 PM8/9/23
to Drools Usage
Hi folks,

Is it possible to use temporal operators while serializing and deserializing sessions?

I have a requirement to store sessions off memory and load them as required, for this I am planning on marshalling the session to a file. I plan on disposing the session once its persisted, freeing any resources.

I have a rule created that when a status of an event has not changed in a period of time then output a new event to a channel (for now storing it in a map that can be queried). The rule works as expected when I use the same session but when I use a session that has be deseriailized by the marshaller it does not fire.

Below is code is reproduces the issue

rule
```
package org.example.drools.test;

import org.example.drools.test.domain.Event;

rule "Close event without status change x "
when
    $eventCreated: Event($eventId: eventId, status=="created")
    not(
        Event(
            this != $eventCreated,
            eventId==$eventId,
            status != "created",
            status != "closed",
            this after[0s, 5s] $eventCreated
        )
    )
then
    System.out.println("No status change in 5 secs");
    insert(new Event($eventCreated.getEventId(), "closed", "auto-closed"));
end

rule "Collect closed event"
when
    $eventClosed: Event(status=="closed")
then
    System.out.println("Sending " + $eventClosed + " to notification channel");
    channels['notification-channel'].send($eventClosed);

end

```

Event POJO
```
package org.example.drools.test.domain;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.kie.api.definition.type.Role;

import java.io.Serializable;

@Data
@AllArgsConstructor
@NoArgsConstructor
@Role(Role.Type.EVENT)
public class Event implements Serializable {
    private String eventId;
    private String status;
    private String reason;
}

```
Minimal reproducible test
```
public void testingMarshalling() throws IOException, ClassNotFoundException, InterruptedException {
        KieServices kieServices = KieServices.Factory.get();
        KieContainer kieContainer = kieServices.getKieClasspathContainer();
        KieSessionConfiguration config = KieServices.Factory.get().newKieSessionConfiguration();
        config.setOption(ClockTypeOption.PSEUDO);

        KieSession kieSession = kieContainer.getKieBase().newKieSession(config, null);
        SessionPseudoClock clock = kieSession.getSessionClock();

        EventNotificationChannel notificationChannel = new EventNotificationChannel();
        kieSession.registerChannel("notification-channel", notificationChannel);

        String eventId = UUID.randomUUID().toString();

        Event eventCreated = new Event();
        eventCreated.setEventId(eventId);
        eventCreated.setStatus("created");

        kieSession.insert(eventCreated);

        long ruleFireCount = kieSession.fireAllRules();
        System.out.println(ruleFireCount);
        assertEquals(0, ruleFireCount);
        System.out.println("advancing time");

        clock.advanceTime(5, TimeUnit.SECONDS);
        System.out.println("Session clock -> " + new Date(kieSession.getSessionClock().getCurrentTime()));

        byte[] sessionBytes = 
serializeSession(kieSession);

        KieSession deserializeSession = deserializeSession(sessionBytes, kieContainer);

        System.out.println("Session clock -> " + new Date(deserializeSession.getSessionClock().getCurrentTime()));
        System.out.println("firing rules again");
        
        // ***REPLACE*** deserializeSession with kieSession to make rule fire
        ruleFireCount = deserializeSession.fireAllRules();

        System.out.println(ruleFireCount);
        assertEquals(2, ruleFireCount);

        Event event = notificationChannel.getClosedEvent(eventId);
        assertEquals("closed", event.getStatus());
        assertEquals("auto-closed", event.getReason());
    }

    private KieSession deserializeSession(byte[] sessionBytes, KieContainer kieContainer) throws IOException, ClassNotFoundException {
        Marshaller marshaller = MarshallerFactory.newMarshaller(kieContainer.getKieBase());
        ByteArrayInputStream bais = new ByteArrayInputStream(sessionBytes);
        return marshaller.unmarshall(bais);
    }

    private byte[] serializeSession(KieSession kieSession) throws IOException {
        KieMarshallers kieMarshallers = KieServices.get().getMarshallers();
        Marshaller marshaller = kieMarshallers.newMarshaller(kieSession.getKieBase());
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        marshaller.marshall(baos, kieSession);
        return baos.toByteArray();
    }

```

One other thing I found was that the pseudo clock configured for the session in the test was also not persisted after serialization.

Any help on this would be great!

Thanks
Thomas

Toshiya Kobayashi

unread,
Aug 10, 2023, 4:52:33 AM8/10/23
to Drools Usage
Hi Thomas, thank you for reporting.

I wrote a reproducer based on your codes.

https://github.com/tkobayas/reproducers/tree/main/Ex-marshaller-temporal-8.42

Firstly, you need to provide `KieSessionConfiguration` when you unmarshall the ksession.

https://github.com/tkobayas/reproducers/blob/main/Ex-marshaller-temporal-8.42/src/test/java/org/example/drools/test/SerializeTest.java#L62

https://github.com/tkobayas/reproducers/blob/main/Ex-marshaller-temporal-8.42/src/test/java/org/example/drools/test/SerializeTest.java#L83-L87

By providing the config, `deserializeSession.getSessionClock()` becomes `SessionPseudoClock`. Also the PseudoClock time is properly deserialized.

However, `SerializeTest` still fails to fire. It seems to be a bug, so I filed a JIRA.

https://issues.redhat.com/browse/DROOLS-7531

I'll investigate further and keep you posted.

Regards,
Toshiya

2023年8月10日木曜日 1:30:25 UTC+9 tommy.timb...@gmail.com:

Toshiya Kobayashi

unread,
Aug 10, 2023, 4:59:21 AM8/10/23
to Drools Usage
Also note that EventNotificationChannel is not marshalled, so you need to explicitly register the EventNotificationChannel to the deserializeSession.

https://github.com/tkobayas/reproducers/blob/main/Ex-marshaller-temporal-8.42/src/test/java/org/example/drools/test/SerializeTest.java#L67

I think you hope that channels are also marshalled/unmarshalled. I filed an enhancement JIRA.

https://issues.redhat.com/browse/DROOLS-7532

Regards,
Toshiya


2023年8月10日木曜日 17:52:33 UTC+9 Toshiya Kobayashi:

Thomas Allen

unread,
Aug 10, 2023, 6:00:16 AM8/10/23
to Drools Usage

Hi Toshiya,

Thanks for getting back, yes I was hoping that anything that could be added to a session would be marshalled like inserted facts, globals, rules, channels and config, essentially the state at which serialization occurs would be the state when its deserializes. It would mean that when you are deserializing you would know the session does not need to add back in things like clocks/channels and can be used immediately and without issue.  Is it possible to know what is and what isn't serialized?

My plan was to load the session from a file system and either insert facts or fire rules but if I need to add back items like channels then it maybe difficult to do so if there is maybe multiple channels depending on the set of rules fired.

Also thank you for filing the JIRAs. I will keep a watch on them.

Best Regards
Thomas

Thomas Allen

unread,
Aug 31, 2023, 6:21:06 AM8/31/23
to Drools Usage
Hi Toshiya,

Is there an estimate as to when the bug https://issues.redhat.com/browse/DROOLS-7531 will be fixed?

Thanks
Thomas

Toshiya Kobayashi

unread,
Sep 4, 2023, 4:36:42 AM9/4/23
to Drools Usage
Sorry for replying late.

I'm looking into the issue. I cannot yet tell the target date, but I'll keep you posted.

Regards,
Toshiya


2023年8月31日木曜日 19:21:06 UTC+9 tommy.timb...@gmail.com:

Toshiya Kobayashi

unread,
Sep 11, 2023, 3:22:02 AM9/11/23
to Drools Usage
https://issues.redhat.com/browse/DROOLS-7531

The fix has been merged in main branch.

It will be available in drools 8.45.0.Final, which will likely be released in 2 or 3 weeks.

Regards,
Toshiya

2023年9月4日月曜日 17:36:42 UTC+9 Toshiya Kobayashi:

Thomas Allen

unread,
Sep 11, 2023, 5:14:21 AM9/11/23
to Drools Usage
Hi Toshiya,

Thanks for getting back and fixing this, I have built a snapshot version of the latest code to verify the fix. Am I right in thinking that globals are the same as channels in that they need to be re registered to the deserialized session?

Best Regards,
Thomas

Toshiya Kobayashi

unread,
Sep 13, 2023, 1:25:54 AM9/13/23
to Drools Usage
Hi Thomas,

Yes, globals need to be re-registered the deserialized session.

Btw, talked with the drools team, globals and channels are somewhat different.

Globals are considered as configuration params of the session, not a part of the state of session. So it's supposed to be managed by clients. Hence, we don't file a JIRA to marshall globals.

Channels are considered as a part of the state of session, we will work on the JIRA (https://issues.redhat.com/browse/DROOLS-7532). However, sorry that I cannot tell an estimate of implementing it at the moment. So please also re-register channels at client side for now.

Regards,
Toshiya


2023年9月11日月曜日 18:14:21 UTC+9 tommy.timb...@gmail.com:

Thomas Allen

unread,
Oct 9, 2023, 10:18:11 AM10/9/23
to Drools Usage
Hi Toshiya,

Any update on a version release of 8.45.0.Final?

/Thomas

Thomas Allen

unread,
Oct 10, 2023, 5:35:34 AM10/10/23
to Drools Usage
I am also facing another marshalling issue when running with the latest snapshot, I am trying to fire the same temporal rule as the initial issue above but I am serializing the session to a file as per example here - https://github.com/tkobayas/kiegroup-examples/tree/master/Ex-marshaller-8.41/src/test/java/com/sample but the rule never fires. I am using two separate tests one to serialize and one to deserialize as I wanted to ensure that the session could survive a JVM restart.

I can see that the fact is available after deserializing and the clock is correct but the rule fails to fire.

Here is the reproducer code

POJO
```
package com.sample;

import org.kie.api.definition.type.Role;

import java.io.Serializable;


@Role(Role.Type.EVENT)
public class Event implements Serializable {

    private String eventId;
    private String status;
    private String reason;

    public Event() {
        super();
        // TODO Auto-generated constructor stub
    }

    public Event(String eventId, String status, String reason) {
        super();
        this.eventId = eventId;
        this.status = status;
        this.reason = reason;
    }

    public String getEventId() {
        return eventId;
    }

    public void setEventId(String eventId) {
        this.eventId = eventId;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getReason() {
        return reason;
    }

    public void setReason(String reason) {
        this.reason = reason;
    }

    @Override
    public String toString() {
        return "Event [eventId=" + eventId + ", status=" + status + ", reason=" + reason + "]";
    }

}
```

Channel
```
package com.sample;

import org.kie.api.runtime.Channel;

import java.util.HashMap;
import java.util.Map;

public class EventNotificationChannel implements Channel {

    private Map<String, Event> eventMap = new HashMap<>();
   
    @Override
    public void send(Object object) {
        System.out.println("send : " + object);
        Event event = (Event)object;
        eventMap.put(event.getEventId(), event);
    }

    public Event getClosedEvent(String eventId) {
        Event event = eventMap.get(eventId);
        if (event != null && event.getStatus().equals("closed")) {
            return event;
        } else {
            return null;
        }
    }

}

```

kmodule
```
<?xml version="1.0" encoding="UTF-8"?>
<kmodule xmlns="http://jboss.org/kie/6.0.0/kmodule">
    <kbase name="testing-marshaller" default="true" eventProcessingMode="stream" equalsBehavior="equality" packages="com.sample"/>
</kmodule>
```

Rule
```
package com.sample;

import com.sample.Event;


rule "Close event without status change x "
when
    $eventCreated: Event($eventId: eventId, status=="created")
    not(
        Event(
            this != $eventCreated,
            eventId==$eventId,
            status != "created",
            status != "closed",
            this after[0s, 1m] $eventCreated
        )
    )
then
    System.out.println("No status change in 1 secs");

    insert(new Event($eventCreated.getEventId(), "closed", "auto-closed"));
end

rule "Collect closed event"
when
    $eventClosed: Event(status=="closed")
then
    System.out.println("Sending " + $eventClosed + " to notification channel");
    channels['notification-channel'].send($eventClosed);

end
```

Tests

```
@Test
    public void testSerialize() {
        // load up the knowledge base                                                                                                                                                                              
        KieServices ks = KieServices.get();
        KieContainer kContainer = ks.getKieClasspathContainer();

        KieSessionConfiguration config = KieServices.Factory.get().newKieSessionConfiguration();
        config.setOption(ClockTypeOption.PSEUDO);
        KieSession kieSession = kContainer.getKieBase().newKieSession(config,null);

        SessionPseudoClock clock = kieSession.getSessionClock();

        EventNotificationChannel notificationChannel = new EventNotificationChannel();
        kieSession.registerChannel("notification-channel", notificationChannel);

        String eventId = "beb0af1a-97f2-42bb-9af9-94990e38f221";


        Event eventCreated = new Event();
        eventCreated.setEventId(eventId);
        eventCreated.setStatus("created");

        kieSession.insert(eventCreated);
        System.out.println("Facts inserted - " + kieSession.getFactHandles());


        long ruleFireCount = kieSession.fireAllRules();
        System.out.println(ruleFireCount);
        assertEquals(0, ruleFireCount);
        System.out.println("advancing time");

        clock.advanceTime(61, TimeUnit.SECONDS);
        System.out.println("Session clock -> " + kieSession.getSessionClock());

        System.out.println("Session clock -> " + new Date(kieSession.getSessionClock().getCurrentTime()));

        // Serialize                                                                                                                                                                                              
        try (FileOutputStream out = new FileOutputStream("./ksession.out")) {

            ks.getMarshallers().newMarshaller(kieSession.getKieBase()).marshall(out, kieSession);

        } catch (IOException e) {
            e.printStackTrace();
        }

        kieSession.dispose();

        System.out.println("=== finished");
    }

    @Test
    public void testDeserialize() throws InterruptedException {
        // load up the knowledge base                                                                                                                                                                              
        KieServices ks = KieServices.Factory.get();
        KieContainer kContainer = ks.getKieClasspathContainer();
        KieBase kBase = kContainer.getKieBase();
        KieSession deserializeSession = null;
        String eventId = "beb0af1a-97f2-42bb-9af9-94990e38f221";
        // Deserialize                                                                                                                                                                                            
        try (FileInputStream in = new FileInputStream("./ksession.out")) {

            KieSessionConfiguration config = KieServices.Factory.get().newKieSessionConfiguration();
            config.setOption(ClockTypeOption.PSEUDO);
            deserializeSession = ks.getMarshallers().newMarshaller(kBase).unmarshall(in, config, null);
            System.out.println("Session clock -> " + deserializeSession.getSessionClock());

            System.out.println("Session clock -> " + new Date(deserializeSession.getSessionClock().getCurrentTime()));

            EventNotificationChannel notificationChannel = new EventNotificationChannel();
            deserializeSession.registerChannel("notification-channel", notificationChannel);

            System.out.println("Facts available - " + deserializeSession.getFactHandles());


            System.out.println("firing rules again");

            int ruleFireCount = deserializeSession.fireAllRules();


            System.out.println(ruleFireCount);
            assertEquals(2, ruleFireCount);

            Event event = notificationChannel.getClosedEvent(eventId);
            assertEquals("closed", event.getStatus());
            assertEquals("auto-closed", event.getReason());
            deserializeSession.dispose();

        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
        }

    }

```
So what the above tests are doing is that it creates a session, inserts fact, advances the clock and serializes the session. Then the deserialize test unmarsalls the session and fires the rule.

Now one thing I have tried is to move advancing the clock to the deserialized test so this would mean the session is serialize at t=0 and deserialized at t=0 and then clock is then advanced so the rule is fired at t=61 I can see the rule is fired. 

After this I have tried to use the realtime clock (run the serialize test, wait 1 min and then run the deserialize test) and what I found was the rule was not fired after deserializing. Again I can see the clock advancing, facts are available but the rule is failing to fire.

Is there something I am missing here?

Thanks
Thomas
Reply all
Reply to author
Forward
0 new messages