static class AtomicVariable extends Variable {
private static final long serialVersionUID = 420L;
AtomicReference<Object> value = new AtomicReference<>();
protected AtomicVariable(String name) {
super(name);
}
public AtomicVariable(String name, Object val) {
super(name);
value.set(val);
}
@Override
public Object getValue() {
return value.get();
}
@Override
public boolean setValue(Object val) {
if (isConstant())
return false;
value.set(val);
return true;
}
}
static class AtomicVariableFactory extends VariableFactory {
private static final long serialVersionUID = 410L;
@Override
public Variable createVariable(String name) {
return new AtomicVariable(name);
}
@Override
public Variable createVariable(String name, Object value) {
return new AtomicVariable(name, value);
}
}
I've not tested the above and it should give the equivalent of volatile memory access, at least ensuring the value is completely updated before trying to read it. There are other variations on the above with different memory semantics.package com.singularsys.jep.misc.parallelparse;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import com.singularsys.jep.Jep;
import com.singularsys.jep.JepComponent;
import com.singularsys.jep.JepException;
import com.singularsys.jep.JepMessages;
import com.singularsys.jep.JepRuntimeException;
import com.singularsys.jep.Variable;
import com.singularsys.jep.VariableTable;
/**
* A variable table backed by a ConcurrentHashMap that is safe for concurrent
* access. Inside the {@link #addVariable(String)} methods it uses the
* {@link ConcurrentHashMap#computeIfAbsent(Object, java.util.function.Function)}
* and {@link ConcurrentHashMap#compute(Object, java.util.function.BiFunction)}
* to atomically create variables when needed. As a consequence, the
* {@link #addVariable(String)} methods may block while a variable is created in
* another thread.
*/
public class ConcurrentVariableTable extends VariableTable {
private static final long serialVersionUID = 410L;
/** The table of variables. */
private final ConcurrentHashMap<String, Variable> ctable = new ConcurrentHashMap<>();
/**
* Default constructor.
*/
public ConcurrentVariableTable() {
}
/**
* Copy constructor, For each variable from the source table create a new variable.
* @param tbl soure table
*/
public ConcurrentVariableTable(VariableTable tbl) {
super(tbl);
}
@Override
public void init(Jep j) {
super.init(j);
}
@Override
public Variable addVariable(String name) {
return ctable.computeIfAbsent(name, n -> vf.createVariable(n));
}
@Override
public Variable addVariable(String name, Object value) throws JepException {
try {
var vbl = ctable.compute(name, (n, v) -> {
if (v == null) {
Variable v2 = vf.createVariable(n, value);
return v2;
} else {
boolean flag = v.setValue(value);
if (flag) {
return v;
} else {
throw new JepRuntimeException(new JepException(MessageFormat.format(
JepMessages.getString("VariableTable.AttemptToSetTheValueOfAConstantVariable"), name))); //$NON-NLS-1$
}
}
});
return vbl;
} catch (JepRuntimeException e) {
throw (JepException) e.getCause();
}
}
@Override
public Variable addConstant(String name, Object value) throws JepException {
try {
var vbl = ctable.compute(name, (n, v) -> {
if (v == null) {
Variable v2 = vf.createVariable(n, value);
v2.setValidValue(true);
v2.setIsConstant(true);
return v2;
} else {
throw new JepRuntimeException(new JepException(MessageFormat.format(
JepMessages.getString("VariableTable.AttemptToSetTheValueOfAConstantVariable"), name))); //$NON-NLS-1$
}
});
return vbl;
} catch (JepRuntimeException e) {
throw (JepException) e.getCause();
}
}
@Override
public Variable getVariable(String name) {
return ctable.get(name);
}
@Override
public void clear() {
ctable.clear();
}
@Override
public boolean isEmpty() {
return ctable.isEmpty();
}
@Override
public Set<String> keySet() {
return ctable.keySet();
}
@Override
public Collection<Variable> getVariables() {
return ctable.values();
}
@Override
public Variable remove(String varname) {
return ctable.remove(varname);
}
@Override
public Variable remove(Variable var) {
return ctable.remove(var.getName());
}
@Override
public Collection<Variable> values() {
return ctable.values();
}
@Override
public int size() {
return ctable.size();
}
@Override
public boolean containsKey(String key) {
return ctable.containsKey(key);
}
@Override
public boolean containsVariable(Variable value) {
return ctable.containsValue(value);
}
@Override
public JepComponent getLightWeightInstance() {
return new ConcurrentVariableTable();
}
@Override
public void copyConstantsFrom(VariableTable vt) {
for (Variable var : vt.getVariables()) {
if (var.isConstant()) {
Variable existing = ctable.get(var.getName());
if (existing != null) {
existing.setIsConstant(false);
existing.setValue(var.getValue());
existing.setValidValue(var.hasValidValue());
existing.hookKeys().forEach(existing::removeHook);
var.hookKeys().forEach(key -> existing.setHook(key, var.getHook(key)));
existing.setIsConstant(var.isConstant());
} else {
Variable v2 = vf.copyVariable(var);
ctable.put(var.getName(), v2);
}
}
}
}
@Override
public void copyVariablesFrom(VariableTable vt) {
for (Variable var : vt.getVariables()) {
Variable existing = ctable.get(var.getName());
if (existing != null) {
existing.setIsConstant(false);
existing.setValue(var.getValue());
existing.setValidValue(var.hasValidValue());
existing.hookKeys().forEach(existing::removeHook);
var.hookKeys().forEach(key -> existing.setHook(key, var.getHook(key)));
existing.setIsConstant(var.isConstant());
} else {
Variable v2 = vf.copyVariable(var);
ctable.put(var.getName(), v2);
}
}
}
@Override
public void clearValues() {
super.clearValues();
}
@Override
public void removeNonConstants() {
table.entrySet().removeIf(ent -> !ent.getValue().isConstant());
}
}
Jep jep = new MultipleEquationParsingJep(
new ParallelConfigurableParser(new StandardConfigurableParser()),
new ConcurrentVariableTable());
List<String> equations = new ArrayList<>(List.of("x=1", "y=2", "z=x+y"));
// Service for executing threads
ExecutorService service = Executors.newFixedThreadPool(numberOfThreads);
// Used to wait for all threads to complete
CountDownLatch latch = new CountDownLatch(equations.size());
// The nodes created by the threads
ConcurrentLinkedQueue<Node> nodes = new ConcurrentLinkedQueue<>();
// Used to record failures
AtomicInteger failures = new AtomicInteger();
// loop over the equations, starting a new thread for each
for (String eqn : equations) {
service.execute(() -> {
try {
// Parse the equation - calls the ParallelConfigurableParser.parse method
var node = jep.parse(eqn);
// Add the node to the list
nodes.add(node);
} catch (ParseException e) {
failures.incrementAndGet();
}
latch.countDown();
});
}
// Wait for all threads to complete
latch.await();
// Print the parsed nodes. Order is random
for (var node : nodes) {
jep.println(node);
}