Posts Tagged ‘InfiniSpan


Embedding Cassandra into your JavaEE AppServer

Recently I started some tests with Apache Cassandra. Sincerely I got impressed with its aptitude to horizontally scale and therefore to handle load increase. Among its main virtues are:

  • Ability of replacing failed nodes without downtime
  • Complete absence of a single point for failure. Every node is identical in regard to functionality

not to mention a few other features.
But still there is one point that bothers me: I still see it and its API as a foundation thing. Something like JGroups turned into. JGroups is largely used nowadays but rarely directly by the application developer. You use it indirectly when you use a clustered JBoss or JBoss TreeCache or Infinispan.
One of these responsibilities that still lies on application developer is the fail over capacity. When you connect to a Cassandra Database node through your application you still need to fetch through its JMX API the rest of the nodes that are part of this cluster, otherwise if this node fails even though your Cassandra cluster is still up and healthy your application won’t know how to connect to it. Another possibility (and in fact this is a recommendation even if you retrieve this node list) is to have a list of reliable servers to serve as bootstrap connection servers but remember Murphy Law even all the servers on this list may go down so you still need to retrieve the whole list for failing over.
So, in summary we have something like what is depicted on the following picture:

Cassandra Regular usage thru JCA and Thrift

Cassandra Regular usage thru JCA and Thrift

Note that there is an iherent complexity in this solution, the JCA connector will be responsible for keeping a list of fail over nodes, something a Cassandra instance already does, so we end up violating the DRY principle.
But what alternatives do we have?

The StorageProxy API

It turns out that CassandraDaemon class (the main one responsible for starting up a Cassandra node) does only a few things that we can embed into our application, or should I say into our JCA Connector since in a JavaEE this is the only place you should be spawning threads and opening files directly from filesystem.
In fact those few steps are properly described on Cassandra Wiki.
Then if you take what we could call a regular approach you’d spawn the embedded Cassandra and perform a connection to localhost in order to communicate make the application talk to the Cassandra Server you’ll end up having an unecessary network communication that could reduce performance by increasing latency. It turns out that this can be avoided too by using the (not so stable) StorageProxy API.
By taking all the steps describe above you’d end up with a much simpler architecture as the one below:

Cassandra Embedded into JavaEE Server

Cassandra Embedded into JavaEE Server

With this architecture you are shielded from the complexity of failing over, Cassandra handles this automatically for you. Then you could argue: what if I need to independently scale the Cassandra layer? No problem! You can resort to an hybrid architecture like the one below:
Cassandra Embedded with Extra Nodes

Cassandra Embedded with Extra Nodes

In order to achieve this you only need to provide to these extra nodes the address of some of the JavaEE servers, this way, the extra standalone nodes can communicate with Cassandra daemons inside the AppServer and become part of the Cassandra cluster.

Memory mapped TTransport

My first attempt on doing this with Cassandra involved implementing a TTransport class that would be responsible for sending over a byte buffer commands to the server worker thread and receiving response in a similar fashion. I tried this first due to the complete ignorance of the existence of the StorageProxy API. But later I thought this could solve the issue related to the lack of stability this API has (as per the apache Wiki page). But this turned to be a not so easy task.
I thought of having a CassandraMMap that would act as the CassandraDaemon class but it would differ on TThreadPoolServer initialization as below:

		this.serverTransp = new MMapServerTransport();
		TThreadPoolServer.Options options = new TThreadPoolServer.Options();
		options.minWorkerThreads = 64;
		this.serverEngine = new TThreadPoolServer(new TProcessorFactory(
				processor), serverTransp, inTransportFactory,
				outTransportFactory, tProtocolFactory, tProtocolFactory,

The same instance of MMapServerTransport would be handled to the client through a getter method in order to open client connections as follows:

		TTransport tr = mmap.getServerTransp().getClientTransportFactory()
		TProtocol proto = new TBinaryProtocol(tr);
		Cassandra.Client client = new Cassandra.Client(proto);;

Requests through getTransport would be queued on server using the class below and a TTransport for the server would be returned upon acceptImpl:

public class MMapTransportTuple {
	private TByteArrayOutputStream server2Client;
	private TByteArrayOutputStream client2Server;
	private TTransport clientTransport;
	private TTransport serverTransport;

	public MMapTransportTuple(int size) {
		server2Client = new TByteArrayOutputStream(size);
		client2Server = new TByteArrayOutputStream(size);
		clientTransport = new MMapTransport(server2Client, client2Server);
		serverTransport = new MMapTransport(client2Server, server2Client);
        //certain codes ommited for brevity

This class would be responsible for binding the memory buffers from client to server and vice-versa.
The last class involved in this implementation would be the MMapTransport:

public class MMapTransport extends TTransport {

	private TByteArrayOutputStream readFrom;

	private TByteArrayOutputStream writeTo;

	public MMapTransport(TByteArrayOutputStream readFrom,
			TByteArrayOutputStream writeTo) {
		this.readFrom = readFrom;
		this.writeTo = writeTo;
        //read and write would operate on the respective buffer
        //and they would point to different buffers on client and server TTransport instances...

But this turned to be harder than I thought at first and as time became a short resource I’ll stick with the StorageProxy API approach for now.


Replacing Persistence Caching with In Memory DataGrids

Anyone that designed an enterprise system in Java with non-functional requirements that demanded tight response times faced the challenge of tuning the overall solution. If you are using Hibernate for persistence your first idea is to turn on Hibernate Caching but unfortunately it only Caches loads through Entity Id and sometimes you need to lookup through a secondary unique key. Another solution you might think is turning on Query Cache but as this post points out is more harmful to scalability and latency than you can ever imagine. Although Hibernate Docs and the post referenced above suggest that using @NaturalId annotation and Querying using Criteria API and a specially crafted Restriction through naturalId method would alleviate much of that problem it turns out that updates on related tables hugely undermines the improvements NaturalId queries may bring as this post suggest:

[…] Even though the objects are essentially static, the query cache takes the safe route and says any change to a related table invalidates any and all queries that reference that table. This makes sense, from a generic cache standpoint. […]

Source: Hibernate: Cache Queries the Natural Id Way
Add to this the fact that second level hibernate caches are data caches, not object caches so even on the best case of a cache miss you’d still have the cost of hydrating the object (as it is called on Hibernate terminology).

Object Caching

So, after giving up on all those previous attempts you’ll start considering taking the next obvious path: using/implementing an Object Caching. Have you noticed that I mentioned implementing? Yes, implementing. Now you’ll probably have to take care of the loading process in the case of cache misses, in fact you’ll be using a pattern called Side Caching.
But this can become a tedious code for maintaining and very prone to errors not to mention that implementing asynchronous write-behind won’t be an easy task.

In Memory DataGrids

Fortunately newer solutions marketed as In Memory DataGrids (or IMDG for short) provide an almost automagic way of plugging specially crafted classes that would be responsible for querying the underlying persistent storage on Cache misses, not to mention that they also provide improved partitioning and a great amount of other facilities usually not available on regular caches.
WebSphere eXtreme Scale Loaders and JBoss InfiniSpan CacheStore are examples of such specially crafted extensions for IMDGs. Those APIs also allow for updating the underlying storage (even asynchronously) with the changes that were passed to the in memory cache. One thing still missing (on different extents) on both solutions is the ability of querying the underlying store using a natural key. On eXtreme Scale there is already a framework for loading JPA entities but it lacks the ability of querying the underlying storage by anything different than the Cache Key that in those cases defaults to Entity Id (eXtreme Scale Query API is capable of querying the cache through any eXtreme Scale entity attribute but it only sees the data in memory). InfiniSpan on the other hand as of the 4.0.0 release does not provide anything similar to eXtreme Scale JPALoader or JPAEntityLoader it still sees the database (as on its JDBC Based CacheStore) as means of providing offloading and surviving reloads but never as means of querying data based on application defined format (eg.: application entities). Instead it uses its own format for storing data which limits its capability of loading from application storage in case of cache misses.

Querying IMDGs through Natural Keys

In summary, if you need to query through a natural key, in both cases you’ll need to roll your own Loader/CacheStore. And by querying through a natural key I mean something along the lines of what is described on Bill Burke’s EJB3 book:

The first way to define a primary-key class (and, for that matter, composite keys) is to use the @IdClass annotation. Your bean class
does not use this primary-key class internally, but it does use it to interact with the entity manager when finding a persisted object through
its primary key.
In your bean class, you designate one or more properties that make up your primary key, using the @Id annotation. These properties
must map exactly to properties in the @IdClass.

This way you can query the Cache using something in the lines of:

MyObjectNaturalKey key = new MyObjectNaturalKey("keyField1", "keyField2");
MyObject o = (MyObject) cache.get(key);

And you’ll have the guarantee that if this object exists either already on memory or in JPA EntityManager it’ll be returned.

Implementing a InfiniSpan CacheStore

Infinispan’s CacheStore interface has the following methods:

package org.infinispan.loaders;
public interface CacheStore extends CacheLoader {
   void store(InternalCacheEntry entry) throws CacheLoaderException;
   void fromStream(ObjectInput inputStream) throws CacheLoaderException;
   void toStream(ObjectOutput outputStream) throws CacheLoaderException;
   void clear() throws CacheLoaderException;
   boolean remove(Object key) throws CacheLoaderException;
   void removeAll(Set<Object> keys) throws CacheLoaderException;
   void purgeExpired() throws CacheLoaderException;
   void prepare(List<? extends Modification> modifications, GlobalTransaction tx, boolean isOnePhase) throws CacheLoaderException;
   void commit(GlobalTransaction tx) throws CacheLoaderException;
   void rollback(GlobalTransaction tx);
   public CacheStoreConfig getCacheStoreConfig();

If you need further explanation of what each method does, see its JavaDoc. InfiniSpan team suggests having a look on DummyInMemoryCacheStore in order to have a general idea of how should a CacheStore be implemented but still I have the feeling it misses an explanation of how InternalCacheEntries should be instantiated and in fact there is a factory class for it.
At last, if you need further information JBoss InfiniSpan Wiki has an overview of the builtin CacheStore classes.

Implementing an eXtreme Scale Loader

eXtreme Scale has plenty information on how to write a Loader as well. InfoCenter has two important pages: the first one presents a general overview on how to write a loader, the second one presents one important concept of eXtreme Scale: its tuples. If you implement a Loader that stores its data based on tuples you are free from having to have the stored classes (in case of custom application classes) in the same classloader as the container process.
WebSphere eXtreme Scale Wiki has a page dedicated to explaining how to write a Loader which presents the signature of a class that implements the Loader interface:

public interface Loader {
    static final SpecialValue KEY_NOT_FOUND;
    List get(TxID txid, List keyList, boolean forUpdate) throws LoaderException;
    void batchUpdate(TxID txid, LogSequence sequence) throws LoaderException, OptimisticCollisionException;
    void preloadMap(Session session, BackingMap backingMap) throws LoaderException;

Apart from the Wiki there’s a post on Billy New Port’s DevWebSphere blog
that has the source code available for what he calls the PureQuery Loader for eXtreme Scale. It is a loader capable of loading POJOs from SQL queries.
Finally if you need a starting guide for eXtreme Scale, have a look at ObjectGrid programming model guide and samples that although slighlty old is still a good starting guide for WebSphere eXtreme Scale.
Lastly I’d like to say that Packt Publishing book on eXtreme Scale is also a good reference for it.


Blog Stats

  • 372,288 hits since aug'08

%d bloggers like this: