Friday, 4 July 2014

Update a running topology on Storm Cluster

Sometimes, we want to update any running topology based on some given conditions or rules. As of now, storm doesn’t have any direct command or code to update it, so for that there are two approaches.

First approach :  Kill that topology from the command-line using :

storm kill <topology-name>

And re-run. But what if we don’t to kill it manually and that should be automatically handled in the code.

Second Approach: Use NimbusClient to kill the topology programmatically and start again.

Map storm_conf = Utils.readStormConfig();
Client client = NimbusClient.getConfiguredClient(storm_conf).getClient();
Iterator<TopologySummary> topologyList = client.getClusterInfo().get_topologies_iterator();
if (topologyNameExists(storm_conf, topologyName)) {

Above code will kill the topology. But topology takes some time to get cleared from list, therefore if you immediately start the same topology, it’ll throw the exception “Same name topology exists on the cluster”, so you need to check for few seconds.
Here is the running example:
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.ClusterSummary;
import backtype.storm.generated.TopologySummary;
import backtype.storm.generated.Nimbus.Client;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;

public class TestTopology {

    public static void main(String[] args) throws Exception {
            String topologyName = "testTopology";
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("testspout", new TestSpout(), 1);
            builder.setBolt("testbolt",new TestBolt(), 1)
            Config conf = new Config();

            Map storm_conf = Utils.readStormConfig();
            Client client = NimbusClient.getConfiguredClient(storm_conf)
            Iterator<TopologySummary> topologyList = client.getClusterInfo()
            if (topologyNameExists(storm_conf, topologyName)) {
            boolean flag = true;
            while (flag) {
                  if (topologyNameExists(storm_conf, topologyName)) {
                        flag = true;
                  } else {
                        flag = false;
            TopologyBuilder builder = new TopologyBuilder();
                StormSubmitter.submitTopology(topologyName, conf,
            } catch (AlreadyAliveException ae) {

Above code will start the Topology (if not running), otherwise will kill it and restart.  You can monitor this from storm ui.

Friday, 20 June 2014

Submitting a topology to Remote Storm Cluster

It is very easy to write a topology and submit to the same Storm Cluster.
But problem arises when we need to submit a topology remotely to remote Storm cluster from a local machine.
What should we do in that case?

Here is the approach to submit a topology to remote cluster.

I have a local windows machine and one Storm Cluster(1 nimbus Linux machine and 2 supervisor Linux machine)
Let's say following are the machines in cluster :

Nimbus Machine :
Supervisor Machine 1:
Supervisor Machine 2:

Storm cluster should be up and running on above machine.

Now from local machine, use NimbusClient to submit Jar to cluster.

NimbusClient nimbus = new NimbusClient(storm_conf,"<nimbus machine ip>",<nimbus port>);
nimbus.getClient().submitTopology(topologyName,uploadedJarLocation,jsonConf, builder.createTopology());

Here is a running example:

import java.util.Map;
import org.json.simple.JSONValue;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.Nimbus.Client;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;

public class RunningClusterTopology {
    public static void main(String[] args) throws Exception {
  TopologyBuilder builder = new TopologyBuilder();
                Config conf = new Config();
                conf.put(Config.NIMBUS_HOST, "");
                Map storm_conf = Utils.readStormConfig();
                storm_conf.put("", "");
                Client client = NimbusClient.getConfiguredClient(storm_conf)
                String inputJar = "C:\\workspace\\TestStormRunner\\target\\TestStormRunner-0.0.1-SNAPSHOT-jar-with-dependencies.jar";
                NimbusClient nimbus = new NimbusClient(storm_conf, "",
      // upload topology jar to Cluster using StormSubmitter
               String uploadedJarLocation = StormSubmitter.submitJar(storm_conf,
                try {
                        String jsonConf = JSONValue.toJSONString(storm_conf);
                                        uploadedJarLocation, jsonConf, builder.createTopology());
                } catch (AlreadyAliveException ae) {

It will submit a topology on Nimbus Machine where it’ll run on 2 supervisor machines( and
To test it, open storm UI in browser : http://<nimbus-ip>:<port>