Skip to main content

Notice: this Wiki will be going read only early in 2024 and edits will no longer be possible. Please see: https://gitlab.eclipse.org/eclipsefdn/helpdesk/-/wikis/Wiki-shutdown-plan for the plan.

Jump to: navigation, search

Stardust/Knowledge Base/Performance Tuning/Maintaining model coherence in a Stardust cluster using Hazelcast

Introduction

In a clustered configuration where Stardust is deployed on multiple nodes we run into the issue of model coherence. If a model is uploaded, deleted or versioned on a particular node, the local engine caches on the other nodes do not automatically get updated. The behavior of the cluster thus becomes inconsistent and leads to problems during process execution. While it is possible to "reinit" the engine on each of the individual nodes using the "console" utility, it would be preferable to automate the process so that all the engine caches are automatically flushed when a model change is made on any node. This can be achieved by configuring the Stardust engine to use Hazelcast to broadcast and listen to model changes. This technique is described below.

Hazelcast configuration

The following changes need to be added to server side carnot.properties file:

Carnot.Engine.Hazelcast.JcaConnectionFactoryProvider = org.eclipse.stardust.engine.spring.integration.jca.
SpringAppContextHazelcastJcaConnectionFactoryProvider
Infinity.Engine.Caching = true

Please refer to the Stardust documentation available herefor information on how to configure Hazelcast with Stardust.

IPartitionMonitor implementation

Once Hazelcast has been configured properly we only need to add a custom IPartitionMonitor SPI implementation and plug that into our web application. This implementation is responsible for broadcasting model changes across the cluster and forcing the individual nodes to flush their caches. Model coherence is thus maintained. The ModelWatcher class shown below provides an implementation of the IPartitionMonitor SPI.

package com.infinity.bpm.rt.integration.cache.hazelcast;
 
import java.util.HashMap;
import java.util.Map;
 
import org.eclipse.stardust.common.log.LogManager;
import org.eclipse.stardust.common.log.Logger;
import org.eclipse.stardust.engine.api.model.IModel;
import org.eclipse.stardust.engine.api.runtime.AdministrationService;
import org.eclipse.stardust.engine.api.runtime.DeploymentException;
import org.eclipse.stardust.engine.api.runtime.QueryService;
import org.eclipse.stardust.engine.api.runtime.ServiceFactory;
import org.eclipse.stardust.engine.api.web.ServiceFactoryLocator;
import org.eclipse.stardust.engine.core.runtime.beans.IUser;
import org.eclipse.stardust.engine.core.runtime.beans.IUserRealm;
import org.eclipse.stardust.engine.core.runtime.beans.QueryServiceImpl;
import org.eclipse.stardust.engine.core.runtime.beans.removethis.SecurityProperties;
import org.eclipse.stardust.engine.core.spi.monitoring.IPartitionMonitor;
 
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.MessageListener;
 
@SuppressWarnings("unchecked")
public class ModelWatcher implements IPartitionMonitor, MessageListener {
 
	private static final Logger logger = LogManager.getLogger(ModelWatcher.class);
 
	private Integer globalState = 1;
 
	private ITopic modelTopic;
 
	private QueryService qService;
 
	public ModelWatcher() {
 
	this.modelTopic = Hazelcast.getTopic("model");
	this.modelTopic.addMessageListener(this);
 
	logger.info("Member added to model change listener.");
 
	this.qService = new QueryServiceImpl();
	}
 
	@Override
	public void onMessage(Object msg) {
		 logger.info("Received model change notification.");
		 Map<String, String> properties = new HashMap<String, String>();
		 properties.put(SecurityProperties.PARTITION, "default");
		 ServiceFactory sf = ServiceFactoryLocator.get("motu","motu",properties);
		 AdministrationService aService = sf.getAdministrationService();
		 aService.flushCaches();
 
	}
 
	@Override
	public void modelDeleted(IModel arg0) throws DeploymentException {
		this.modelTopic.publish(this.globalState);
	}
 
	@Override
	public void modelDeployed(IModel arg0, boolean arg1)
			throws DeploymentException {
		this.modelTopic.publish(this.globalState);
	}
 
	@Override
	public void userCreated(IUser arg0) {
		// TODO Auto-generated method stub
 
	}
 
	@Override
	public void userDisabled(IUser arg0) {
		// TODO Auto-generated method stub
 
	}
 
	@Override
	public void userEnabled(IUser arg0) {
		// TODO Auto-generated method stub
 
	}
 
	@Override
	public void userRealmCreated(IUserRealm arg0) {
		// TODO Auto-generated method stub
 
	}
 
	@Override
	public void userRealmDropped(IUserRealm arg0) {
		// TODO Auto-generated method stub
 
	}
 
}

The implementation is made available to the Stardust engine using the standard ServiceLoader technique provided by Java - by creating a file with the FQN of the SPI interface i.e. "org.eclipse.stardust.engine.core.spi.monitoring.IPartitionMonitor" and adding the full name of our class to this file viz. "com.infinity.bpm.rt.integration.cache.hazelcast.ModelWatcher". The contents of the file look as follows:

com.infinity.bpm.rt.integration.cache.hazelcast.ModelWatcher

We then drop it into a folder called META-INF/services under the classes directory of our web application on all nodes and restart the nodes. Upon restart, a distributed Hazelcast cache spanning all nodes is constructed and model changes made on any node are broadcast across the cache. We are thus able to automate the process of model management across cluster nodes.

Back to the top