Cluster job synchronization with Spring AOP and memcached

Once you start load balancing your application across a set of nodes, you either start a dedicated node for your scheduled jobs or you find out some way of synchronizing your calls.

Since I already have memcached up and running to keep my web applications session free (state has moved from httpsession to memcached, I run my loadbalancer without sticky sessions), I decided to use the already existing memcached instance to synchronize my calls.

Since I am using the Spring Framework on my backend, it was quite easy to use AOP with a custom annotation to keep my job calls synchronized:

@Component
@Transactional
public class SimpleJob {
   @ClusterSynchronized(
       jobToken = "SimpleJob",
       jobTokenExpirationTimeInSeconds = 240)
   @Scheduled(fixedDelay = 30 * 60 * 1000)
   public void doSomething() {
      // do some job that should not be run
      // twice at the same time accross your
      // cluster
   }
}

The AOP magic happens in the spring component declared to be an AOP aspect. I try to acquire a given token on the memcached cache instance. If it is already there, I don’t execute the method. If I manage to acquire a lock, I do execute the method. After the execution I finally remove the token (unlock) the service. If something bad happens with my node (e.g. the machine explodes), memcached will remove the token after the given expiration time defined in the annotation.

@Component
@Aspect
public class MemcachedSynchronizationAspect {

   @Autowired
   private MemcachedJobSynchronization jobSync;

   @Around("execution(public * *(..)) && @annotation(syncParams)")
   public Object processClusterSynchronized(
      ProceedingJoinPoint pjp,
      ClusterSynchronized syncParams) throws Throwable {

      final String token = syncParams.jobToken();
      final int tokenExpirationTime = syncParams.jobTokenExpirationTimeInSeconds();

      if (this.jobSync.aquireLock(token, tokenExpirationTime)) {
         try {
            return pjp.proceed();
         } finally {
            this.jobSync.releaseLock(token);
         }
      }
      return null;
   }
}

The syncronization through memcached is quite simple, as implemented in the MemcachedJobSynchronization class. I deleted all javadocs and logging to keep code small, but code should be quite self explanatory…

I use a UUID generated once and uniquely for each running instance of my application. I store this node identifier under the job token with the given expiration time. Whenever I try to acquire a job lock, I check if there is no token already stored in memcached. If there is none, I add a token and do check if a get call returns my unique id. Only then the service will be executed.

@Component
public class MemcachedJobSynchronization {

   @Autowired
   private MemcachedClient cache;

   private final String instanceNodeId = UUID.randomUUID().toString();

   /**
    * Try to acquire the token
    *
    * @param token
    * @param lockExpirationTime
    * @return
    */
   final public boolean aquireLock(String jobToken, int tokenExpirationTime) {
      if (Strings.isNullOrEmpty((String) this.cache.get(jobToken))) {
         this.cache.add(jobToken, tokenExpirationTime, this.instanceNodeId);
         return this.isOurLock(jobToken);
      }
      return false;
   }

   public MemcachedClient getCache() {
      return this.cache;
   }

   public String instanceNodeId() {
      return this.instanceNodeId;
   }

   final public boolean isOurLock(String jobToken) {
      try {
         boolean isOurLock = Objects.equal(this.instanceNodeId, this.cache.get(jobToken));
         return isOurLock;
      } catch (Exception e) {
      }
      return false;
   }

   final public void releaseLock(String jobToken) {
      if (this.isOurLock(jobToken)) {
         try {
            this.cache.delete(jobToken);
         } catch (Exception e) {
         }
      }
   }
}
Short URL for this post: https://wp.me/p4nxik-1Bq
This entry was posted in Java EE, Spring Universe and tagged , , , , , . Bookmark the permalink.

5 Responses to Cluster job synchronization with Spring AOP and memcached

  1. Thorsten Maier Thorsten Maier says:

    I think you can simplify your pointcut expression to
    @Around(“@annotation(syncParams)”)

  2. Insomniac says:

    I am assuming the ClusterSynchronized is a custom class you wrote, but it’s not posted anywhere… Do you happen to have it handy, as I would love to implement this code.

  3. Papick G. Taboada Papick G. Taboada says:

    ClusterSynchronized is a simple annotation:

    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.METHOD)
    public @interface ClusterSynchronized {
    /**
    * The unique job token identifier.
    *
    * @return
    */
    String token();

    /**
    * This is the token expiration in seconds.
    *
    * @return
    */
    int tokenExpiration();

    }

  4. agnize says:

    Thank for sharing this.

    You said you are using memcached, assuming it is on external server, did you do load testing to see increased latency and performance with network round trips and I/O? If there are large number of users and your API is protected then each call have to do a network trip to memcached.

Leave a Reply