Single Jep in concurrency

15 views
Skip to first unread message

BK

unread,
Oct 21, 2024, 11:02:47 AM10/21/24
to Jep Java Users
Hi,

Can I use a single instance of Jep to evaluate expressions in a concurrent user environment?

Regards,
BK

Richard Morris

unread,
Oct 23, 2024, 1:09:28 PM10/23/24
to Jep Java Users
In theory, yes, but with many caveats. 

You could use the same instance of Jep in multiple threads, but using the same Jep instance means using the same VariableTable and the same set of Variables. This is likely to cause problems, firstly if a Variable value is changed in one thread then other threads will see the changed value, possibly half way through the evaluation. Consider one thread is trying the evaluate the polynomial "a x^2 + b x + c", if another thread alters the value of x between the evaluation of the first and second term you will get a meaningless result. Problems are worse as assignment of doubles is not an atomic operation, so high bytes could be set and low bytes unchanged. 

The approach taken in both methods discussed in the threads' documentation      
is to give each thread its own VariableTable, with a set of Variables owned by that thread. This is achieved by using a 
light weight version of Jep. 

Jep j = new Jep();
ComponentSet cs = new LightWeightComponentSet(j);
Jep lwj = new Jep(cs);

This instance will have its own copy of the VariableTable and Evaluator, but either null instances or copies of the other components. It has a small memory footprint <1K. 

It might be possible to use the same Jep instance across multiple threads. But you would need to work out how you want to manage variables. One possibility is to use AtomicReferences to hold the values of variables and a custom AtomicVariable class,  

static class AtomicVariable extends Variable {

private static final long serialVersionUID = 420L;


AtomicReference<Object> value = new AtomicReference<>();

protected AtomicVariable(String name) {

super(name);

}


public AtomicVariable(String name, Object val) {

super(name);

value.set(val);

}


@Override

public Object getValue() {

return value.get();

}


@Override

public boolean setValue(Object val) {

if (isConstant())

return false;

value.set(val);

return true;

}

}


and corresponding VariableFactory

static class AtomicVariableFactory extends VariableFactory {

private static final long serialVersionUID = 410L;


@Override

public Variable createVariable(String name) {

return new AtomicVariable(name);

}


@Override

public Variable createVariable(String name, Object value) {

return new AtomicVariable(name, value);

}

}

I've not tested the above and it should give the equivalent of volatile memory access, at least ensuring the value is completely updated before trying to read it.  There are other variations on the above with different memory semantics. 

The other problem with using the same Jep instance across multiple threads is the Evaluator classes. Each instance has its own stack so using it across multiple threads could easily corrupt the stack. This could simply be solved by creating a new Evaluator for each thread. 

A final problem is the set of PFMC functions used. Most are stateless so can be used in multiple threads, but a few require special treatment. 

I could probably give a better answer if I knew your use case better. 

BK

unread,
Oct 27, 2024, 12:56:26 PM10/27/24
to Jep Java Users
Thank you for the detailed explanation, Richard. This is really helpful for my implementation. I have another use case where I would like to use Jep for parsing expressions only, without requiring evaluation. I have a function table component set for the Jep instance in this case. Will there be any side effects if I use the same Jep instance for concurrent threads?

Regards,
BK

Richard Morris

unread,
Oct 28, 2024, 10:38:30 AM10/28/24
to Jep Java Users
Using Jep to parse multiple equations in parallel presents a few problems.

Firstly, parsing often creates new variables, and this can create Concurrency errors 
when using the standard VariableTable in multiple threads. I created a ConcurrentVariableTable

package com.singularsys.jep.misc.parallelparse;


import java.text.MessageFormat;

import java.util.Collection;

import java.util.Set;

import java.util.concurrent.ConcurrentHashMap;


import com.singularsys.jep.Jep;

import com.singularsys.jep.JepComponent;

import com.singularsys.jep.JepException;

import com.singularsys.jep.JepMessages;

import com.singularsys.jep.JepRuntimeException;

import com.singularsys.jep.Variable;

import com.singularsys.jep.VariableTable;


/**

* A variable table backed by a ConcurrentHashMap that is safe for concurrent

* access. Inside the {@link #addVariable(String)} methods it uses the

* {@link ConcurrentHashMap#computeIfAbsent(Object, java.util.function.Function)}

* and {@link ConcurrentHashMap#compute(Object, java.util.function.BiFunction)}

* to atomically create variables when needed. As a consequence, the

* {@link #addVariable(String)} methods may block while a variable is created in

* another thread.

*/

public class ConcurrentVariableTable extends VariableTable {

private static final long serialVersionUID = 410L;

/** The table of variables. */

private final ConcurrentHashMap<String, Variable> ctable = new ConcurrentHashMap<>();


/**

* Default constructor.

*/

public ConcurrentVariableTable() {

}


/**

* Copy constructor, For each variable from the source table create a new variable.

* @param tbl soure table

*/

public ConcurrentVariableTable(VariableTable tbl) {

super(tbl);

}


@Override

public void init(Jep j) {

super.init(j);

}


@Override

public Variable addVariable(String name) {

return ctable.computeIfAbsent(name, n -> vf.createVariable(n));

}


@Override

public Variable addVariable(String name, Object value) throws JepException {

try {

var vbl = ctable.compute(name, (n, v) -> {

if (v == null) {

Variable v2 = vf.createVariable(n, value);

return v2;

} else {

boolean flag = v.setValue(value);

if (flag) {

return v;

} else {

throw new JepRuntimeException(new JepException(MessageFormat.format(

JepMessages.getString("VariableTable.AttemptToSetTheValueOfAConstantVariable"), name))); //$NON-NLS-1$

}

}

});

return vbl;

} catch (JepRuntimeException e) {

throw (JepException) e.getCause();

}

}


@Override

public Variable addConstant(String name, Object value) throws JepException {

try {

var vbl = ctable.compute(name, (n, v) -> {

if (v == null) {

Variable v2 = vf.createVariable(n, value);

v2.setValidValue(true);

v2.setIsConstant(true);

return v2;

} else {

throw new JepRuntimeException(new JepException(MessageFormat.format(

JepMessages.getString("VariableTable.AttemptToSetTheValueOfAConstantVariable"), name))); //$NON-NLS-1$

}

});

return vbl;

} catch (JepRuntimeException e) {

throw (JepException) e.getCause();

}

}


@Override

public Variable getVariable(String name) {

return ctable.get(name);

}


@Override

public void clear() {

ctable.clear();

}


@Override

public boolean isEmpty() {

return ctable.isEmpty();

}


@Override

public Set<String> keySet() {

return ctable.keySet();

}


@Override

public Collection<Variable> getVariables() {

return ctable.values();

}


@Override

public Variable remove(String varname) {

return ctable.remove(varname);

}


@Override

public Variable remove(Variable var) {

return ctable.remove(var.getName());

}


@Override

public Collection<Variable> values() {

return ctable.values();

}


@Override

public int size() {

return ctable.size();

}


@Override

public boolean containsKey(String key) {

return ctable.containsKey(key);

}


@Override

public boolean containsVariable(Variable value) {

return ctable.containsValue(value);

}


@Override

public JepComponent getLightWeightInstance() {

return new ConcurrentVariableTable();

}


@Override

public void copyConstantsFrom(VariableTable vt) {

for (Variable var : vt.getVariables()) {

if (var.isConstant()) {


Variable existing = ctable.get(var.getName());

if (existing != null) {

existing.setIsConstant(false);

existing.setValue(var.getValue());

existing.setValidValue(var.hasValidValue());

existing.hookKeys().forEach(existing::removeHook);

var.hookKeys().forEach(key -> existing.setHook(key, var.getHook(key)));

existing.setIsConstant(var.isConstant());

} else {

Variable v2 = vf.copyVariable(var);

ctable.put(var.getName(), v2);

}


}

}

}


@Override

public void copyVariablesFrom(VariableTable vt) {

for (Variable var : vt.getVariables()) {

Variable existing = ctable.get(var.getName());

if (existing != null) {

existing.setIsConstant(false);

existing.setValue(var.getValue());

existing.setValidValue(var.hasValidValue());

existing.hookKeys().forEach(existing::removeHook);

var.hookKeys().forEach(key -> existing.setHook(key, var.getHook(key)));

existing.setIsConstant(var.isConstant());

} else {

Variable v2 = vf.copyVariable(var);

ctable.put(var.getName(), v2);

}

}

}


@Override

public void clearValues() {

super.clearValues();

}


@Override

public void removeNonConstants() {

table.entrySet().removeIf(ent -> !ent.getValue().isConstant());

}


}



That is backed by a ConcurrentHashMap and provides atomic operations for adding Variables. 

The second problem is that neither the JavaCC based StandardParser and the ConfigurableParser were
built for multi-threaded uses. There are a few options
1) Create a new StandardParser for each thread
2) Create a new StandardConfigurableParser for each thread
3) Use the new ParallelConfigurableParser, below, which has a single Parse method than can be used in multiple threads.
4) Use a new copy constructor of the ConfigurableParser to create new instances than reuse the same set of Matchers as as a template.

These are listed in reverse order of speed. Given a set of 40400 equations with about 200 terms in each. 
Method 4 took  0.290s, Method 3 took 0.512s Method 2 took 0.765s and Method 1 1.016s. The best non parallel parsing speed for this set is 0.571s. The advantages of parallel parsing go up as the number of equations increase.

I'll post the implementation of the ParallelConfigurableParser in the next reply.

Richard Morris

unread,
Oct 28, 2024, 1:51:06 PM10/28/24
to Jep Java Users
Here is the ParallelConfigurableParser

package com.singularsys.jep.misc.parallelparse;

import java.io.BufferedReader;
import java.io.Reader;
import java.util.Iterator;
import java.util.List;

import com.singularsys.jep.JepMessages;
import com.singularsys.jep.ParseException;
import com.singularsys.jep.configurableparser.ConfigurableParser;
import com.singularsys.jep.configurableparser.GrammarParser;
import com.singularsys.jep.configurableparser.TokenFilter;
import com.singularsys.jep.configurableparser.Tokenizer;
import com.singularsys.jep.configurableparser.matchers.GrammarMatcher;
import com.singularsys.jep.configurableparser.matchers.OperatorTokenMatcher;
import com.singularsys.jep.configurableparser.matchers.TokenMatcher;
import com.singularsys.jep.configurableparser.tokens.Token;
import com.singularsys.jep.parser.Node;

/**
 * A version of the ConfigurableParser that can parse expressions
 * in multiple threads.
 * Only the @link{#parse(Reader)} method is supported other
 * methods are not suitable for multithreaded use and will throw UnsupportedOperationException.
 * The {@link #parse(Reader)} method creates a new {@link Tokenizer},
 * {@link GrammarParser}, and new instances of each {@link TokenFilter},
 * uses them to parse the input.
 * <p>
 * The individual {@link TokenMatcher} and {@link GrammarMatcher} objects
 * are assumed to be thread safe.
 *
 */
public class ParallelConfigurableParser extends ConfigurableParser {

private static final long serialVersionUID = 410L;

/**
 * Default constructor with no matchers.
 */
public ParallelConfigurableParser() {
}

/**
 * Copy constructor
 * reruses the TokenMatchers, TokenFilters, GrammarMatchers, TokenizerFactory, GrammarParserFactory, OperatorTokenMatcher and SymbolTokenMatcher
 * from the base.
 * @param base the base ConfigurableParser
 */
public ParallelConfigurableParser(ConfigurableParser base) {
super();
for(var tm:base.getTokenMatchers())
addTokenMatcher(tm);
for(var tf:base.getTokenFilters())
addTokenFilter(tf);
for(var gm:base.getGrammarMatchers())
addGrammarMatcher(gm);
this.tf = base.getTokenizerFactory();
this.gpf = base.getGrammarParserFactory();
this.otm = (OperatorTokenMatcher) base.getOperatorTokenMatcher();
this.stm = base.getSymbolTokenMatcher();
}

/**
 * Parses the input stream.
 * Creates a new {@link Tokenizer},
 * {@link GrammarParser}, and new instances of each {@link TokenFilter},
 * uses them to parse the input.
 */
@Override
public Node parse(Reader stream) throws ParseException {
        BufferedReader br;
        if(stream instanceof BufferedReader)
            br = (BufferedReader) stream;
        else
            br = new BufferedReader(stream);

        var tokenizer = tf.newInstance(this,br);

        List<Token> tokens = tokenizer.scan();
        if(tokens==null) throw new ParseException(JepMessages.getString("configurableparser.ConfigurableParser.EmptyInput")); //$NON-NLS-1$
        Iterator<Token> it = filter(tokens);
    GrammarParser sy = gpf.newInstance(this);
        Node node = sy.parse(it);
        if(node==null) throw new ParseException(JepMessages.getString("configurableparser.ConfigurableParser.EmptyInput")); //$NON-NLS-1$
        return node;
}

/**
 * @throws UnsupportedOperationException always
 */
@Override
public void restart(Reader stream) {
throw new UnsupportedOperationException("Not implemented");
}

/**
 * @throws UnsupportedOperationException always
 */
@Override
public Node continueParse() throws ParseException {
throw new UnsupportedOperationException("Not implemented");
}

/**
 * @throws UnsupportedOperationException always
 */
@Override
public List<Token> scan(Reader stream) throws ParseException {
throw new UnsupportedOperationException("Not implemented");
}

/**
 * @throws UnsupportedOperationException always
 */
@Override
public List<Token> scan() throws ParseException {
throw new UnsupportedOperationException("Not implemented");
}

}

It simplifies the parsing related methods of the ConfigurableParser supporting a single method  
Node parse(Reader stream) 
for parsing the input, this can be used for parsing in multiple thread. 

This can then be used to parse expressions in multiple threads

Jep jep = new MultipleEquationParsingJep(

new ParallelConfigurableParser(new StandardConfigurableParser()),

new ConcurrentVariableTable());


List<String> equations = new ArrayList<>(List.of("x=1", "y=2", "z=x+y"));

// Service for executing threads

ExecutorService service = Executors.newFixedThreadPool(numberOfThreads);

// Used to wait for all threads to complete

CountDownLatch latch = new CountDownLatch(equations.size());

// The nodes created by the threads

ConcurrentLinkedQueue<Node> nodes = new ConcurrentLinkedQueue<>();

// Used to record failures

AtomicInteger failures = new AtomicInteger();

// loop over the equations, starting a new thread for each

for (String eqn : equations) {

service.execute(() -> {

try {

// Parse the equation - calls the ParallelConfigurableParser.parse method

var node = jep.parse(eqn);

// Add the node to the list

nodes.add(node);

} catch (ParseException e) {

failures.incrementAndGet();

}

latch.countDown();

});

}

// Wait for all threads to complete

latch.await();

// Print the parsed nodes. Order is random

for (var node : nodes) {

jep.println(node);

}

Reply all
Reply to author
Forward
0 new messages