Apache storm dependency injection

by Eli Oxman  
6 min read  • 10 Mar 2015

This post assumes that you are familiar with DI and it's benefits (if not, read Martin Fowler's original article).

The main advantage as I see it of DI is that it makes each class expose a clear contract of dependencies through its constructor - no hidden dependencies (as usually caused by singletons). As a side bonus it also makes the code much more testable, as it is much easier to replace real modules with mocks.

Being huge fans of DI we wanted to be able to use DI instead of implementing an old-fashioned singleton

Here at alooma, Apache Storm is a major piece of our solution, we are using Storm to run complex topologies. In order to keep our topologies dynamic and changeable from the outside we are using Zookeeper. From our external configuration system we are passing configuration to our topologies over Zookeeper, using the CuratorFramework. It is a recommended best practice to keep a single CuratorFramework (i.e, a singleton), but being huge fans of DI we wanted to be able to use DI instead of implementing an old-fashioned singleton.

To use DI we need two central pieces, a place to bind our injectables upon startup, and a hook into the creation of our objects.

Let's recall how Storm builds and runs our topology. I will actually use Trident terminologies - the same principles apply for Storm topologies.

On the submitting side we are creating our spouts, functions and state factories using plain old constructors. These will get serialized by Storm upon submitting the topology, de-serialized on the receiving side, which will then start executing the topology.

DI doesn't play well naturally with this scenario, because DI doesn't play well with serialization.

Let's assume we have the following Trident function:

public class SampleFunction extends BaseFunction {

  private final transient CuratorFramework curatorFramework;
  private final String pathToWatch;

  public SampleFunction(CuratorFramework curatorFramework, String pathToWatch) {
      ...
  }

  @Override
  public void prepare(Map conf, TridentOperationContext context) {
      ...
  }

  @Override
  public void execute(TridentTuple tuple, TridentCollector collector) {
      ...
  }

  @Override
  public void cleanup() {
      ...
  }
}

This function watches some path on the Zookeeper (through the CuratorFramework), and based on the value in this path does some transformation on the tuple.

The CuratorFramework is not serializable (and thus it is transient), so we must find a way to re-inject it on the receiving side upon de-serialization. Additionally, our CuratorFramework is an object whose lifecycle we'd like to be managed by the DI framework (for example, it might be a Singleton).

At first we must find a hook into the creation of the object. This comes in the form of the Java readResolve() method. When implemented the method gets called by Java's de-serialization framework. When the method is called, all the object's non-transient fields are already resolved, and we need only to resolve our transient fields, which we would like to receive from our DI framework. The main problem is that we have no access to the injector.

Our solution comes in the form of the Provider interface. We add a static Provider <samplefunction> field to our SampleFunction class. This provider will be able to provide us with a SampleFunction with all the injectable fields already injected, and thus we gain access to the DI managed CuratorFramework.

So now our SampleFunction becomes as follows:

  public class SampleFunction extends BaseFunction {

      @Inject
      private static Provider<SampleFunction> provider;

      private final transient CuratorFramework curatorFramework;
      private final String pathToWatch;

      @Inject
      public SampleFunction(CuratorFramework curatorFramework, String pathToWatch) {
          ...
      }

      @Override
      public void prepare(Map conf, TridentOperationContext context) {
          ...
      }

      @Override
      public void execute(TridentTuple tuple, TridentCollector collector) {
          ...
      }

      @Override
      public void cleanup() {
          ...
      }

      protected Object readResolve() throws ObjectStreamException {
          return new SampleFunction(provider.get().curatorFramework, pathToWatch);
      }
  }

Now the final piece of the puzzle is to find a place to initialize our bindings and injector upon start up of our topology in Storm.

At first we used the TOPOLOGY_AUTO_TASK_HOOKS config for this. Using this configuration one can provide a class which extends the BaseTaskHook. Overriding the prepare method of such a class allows you to execute some code before the prepare of any other Storm bolt. Here we initialized the Injector (using an old-fashioned singleton to avoid duplicate initialization).

Later on we realized that these hooks are not called before the creation of spouts, and so we added the initialization code to a class which implements the IKryoDecorator (and register it using the TOPOLOGY_KRYO_DECORATORS config). Though our code isn't really related to the Kryo serialization, we have found that the IKryoDecorator's decorate method is called before the creation of spouts.

These hooks still had one flaw - the IKryoDecorator doesn't receive the Map conf parameter, and so our injection setup couldn't be based on configuration. The next hook which we are using (and serves us well thus far) is implementating the IKryoFactory interface. We are overriding the getKryo method, which receives the Map conf as a parameter. This hook also gets called both before bolt and before spout creation.

This is the point to mention that if anyone is familiar with a better start up hook in Storm, please feel free to suggest in the comments.

The last point not to forget is that along with regular bindings we now must call the requestStaticInjection method for each component into which we want to inject a provider.

Clearly this is not the cleanest possible solution (as we have some static injections, as well as an old-fashioned singleton in use), but we find it satisfactory as it gives us the ability to maintain our constructors sensible, and exposing all the required dependencies, as wanted when using DI.

Like what you read? Share on

Get your data flowing

Contact us to start using Alooma for free

Get Started

This might interest you as well

Schedule a free demo!

We'll show you how Alooma can integrate all of your data sources in minutes.