- Apache Storm - Applications
- Apache Storm in Yahoo! Finance
- Apache Storm in Twitter
- Apache Storm - Trident
- Apache Storm - Working Example
- Apache Storm - Installation
- Storm - Distributed Msging System
- Apache Storm - Workflow
- Apache Storm - Cluster Architecture
- Apache Storm - Core Concepts
- Apache Storm - Introduction
- Apache Storm - Home
Apache Storm Useful Resources
Selected Reading
- Who is Who
- Computer Glossary
- HR Interview Questions
- Effective Resume Writing
- Questions and Answers
- UPSC IAS Exams Notes
Apache Storm - Working Example
We have gone through the core technical details of the Apache Storm and now it is time to code some simple scenarios.
Scenario – Mobile Call Log Analyzer
Mobile call and its duration will be given as input to Apache Storm and the Storm will process and group the call between the same caller and receiver and their total number of calls.
Spout Creation
Spout is a component which is used for data generation. Basically, a spout will implement an IRichSpout interface. “IRichSpout” interface has the following important methods −
open − Provides the spout with an environment to execute. The executors will run this method to initiapze the spout.
nextTuple − Emits the generated data through the collector.
close − This method is called when a spout is going to shutdown.
declareOutputFields − Declares the output schema of the tuple.
ack − Acknowledges that a specific tuple is processed
fail − Specifies that a specific tuple is not processed and not to be reprocessed.
Open
The signature of the open method is as follows −
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
conf − Provides storm configuration for this spout.
context − Provides complete information about the spout place within the topology, its task id, input and output information.
collector − Enables us to emit the tuple that will be processed by the bolts.
nextTuple
The signature of the nextTuple method is as follows −
nextTuple()
nextTuple() is called periodically from the same loop as the ack() and fail() methods. It must release control of the thread when there is no work to do, so that the other methods have a chance to be called. So the first pne of nextTuple checks to see if processing has finished. If so, it should sleep for at least one milpsecond to reduce load on the processor before returning.
close
The signature of the close method is as follows −
close()
declareOutputFields
The signature of the declareOutputFields method is as follows −
declareOutputFields(OutputFieldsDeclarer declarer)
declarer − It is used to declare output stream ids, output fields, etc.
This method is used to specify the output schema of the tuple.
ack
The signature of the ack method is as follows −
ack(Object msgId)
This method acknowledges that a specific tuple has been processed.
fail
The signature of the nextTuple method is as follows −
ack(Object msgId)
This method informs that a specific tuple has not been fully processed. Storm will reprocess the specific tuple.
FakeCallLogReaderSpout
In our scenario, we need to collect the call log details. The information of the call log contains.
caller number
receiver number
duration
Since, we don’t have real-time information of call logs, we will generate fake call logs. The fake information will be created using Random class. The complete program code is given below.
Coding − FakeCallLogReaderSpout.java
import java.util.*; //import storm tuple packages import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import Spout interface packages import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; //Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionapties pubpc class FakeCallLogReaderSpout implements IRichSpout { //Create instance for SpoutOutputCollector which passes tuples to bolt. private SpoutOutputCollector collector; private boolean completed = false; //Create instance for TopologyContext which contains topology data. private TopologyContext context; //Create instance for Random class. private Random randomGenerator = new Random(); private Integer idx = 0; @Override pubpc void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; this.collector = collector; } @Override pubpc void nextTuple() { if(this.idx <= 1000) { List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while(localIdx++ < 100 && this.idx++ < 1000) { String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); while(fromMobileNumber == toMobileNumber) { toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); } Integer duration = randomGenerator.nextInt(60); this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration)); } } } @Override pubpc void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("from", "to", "duration")); } //Override all the interface methods @Override pubpc void close() {} pubpc boolean isDistributed() { return false; } @Override pubpc void activate() {} @Override pubpc void deactivate() {} @Override pubpc void ack(Object msgId) {} @Override pubpc void fail(Object msgId) {} @Override pubpc Map<String, Object> getComponentConfiguration() { return null; } }
Bolt Creation
Bolt is a component that takes tuples as input, processes the tuple, and produces new tuples as output. Bolts will implement IRichBolt interface. In this program, two bolt classes CallLogCreatorBolt and CallLogCounterBolt are used to perform the operations.
IRichBolt interface has the following methods −
prepare − Provides the bolt with an environment to execute. The executors will run this method to initiapze the spout.
execute − Process a single tuple of input.
cleanup − Called when a bolt is going to shutdown.
declareOutputFields − Declares the output schema of the tuple.
Prepare
The signature of the prepare method is as follows −
prepare(Map conf, TopologyContext context, OutputCollector collector)
conf − Provides Storm configuration for this bolt.
context − Provides complete information about the bolt place within the topology, its task id, input and output information, etc.
collector − Enables us to emit the processed tuple.
execute
The signature of the execute method is as follows −
execute(Tuple tuple)
Here tuple is the input tuple to be processed.
The execute method processes a single tuple at a time. The tuple data can be accessed by getValue method of Tuple class. It is not necessary to process the input tuple immediately. Multiple tuple can be processed and output as a single output tuple. The processed tuple can be emitted by using the OutputCollector class.
cleanup
The signature of the cleanup method is as follows −
cleanup()
declareOutputFields
The signature of the declareOutputFields method is as follows −
declareOutputFields(OutputFieldsDeclarer declarer)
Here the parameter declarer is used to declare output stream ids, output fields, etc.
This method is used to specify the output schema of the tuple
Call log Creator Bolt
Call log creator bolt receives the call log tuple. The call log tuple has caller number, receiver number, and call duration. This bolt simply creates a new value by combining the caller number and the receiver number. The format of the new value is "Caller number – Receiver number" and it is named as new field, "call". The complete code is given below.
Coding − CallLogCreatorBolt.java
//import util packages import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; //import Storm IRichBolt package import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; //Create a class CallLogCreatorBolt which implement IRichBolt interface pubpc class CallLogCreatorBolt implements IRichBolt { //Create instance for OutputCollector which collects and emits tuples to produce output private OutputCollector collector; @Override pubpc void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override pubpc void execute(Tuple tuple) { String from = tuple.getString(0); String to = tuple.getString(1); Integer duration = tuple.getInteger(2); collector.emit(new Values(from + " - " + to, duration)); } @Override pubpc void cleanup() {} @Override pubpc void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call", "duration")); } @Override pubpc Map<String, Object> getComponentConfiguration() { return null; } }
Call log Counter Bolt
Call log counter bolt receives call and its duration as a tuple. This bolt initiapzes a dictionary (Map) object in the prepare method. In execute method, it checks the tuple and creates a new entry in the dictionary object for every new “call” value in the tuple and sets a value 1 in the dictionary object. For the already available entry in the dictionary, it just increment its value. In simple terms, this bolt saves the call and its count in the dictionary object. Instead of saving the call and its count in the dictionary, we can also save it to a datasource. The complete program code is as follows −
Coding − CallLogCounterBolt.java
import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; pubpc class CallLogCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override pubpc void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override pubpc void execute(Tuple tuple) { String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)){ counterMap.put(call, 1); }else{ Integer c = counterMap.get(call) + 1; counterMap.put(call, c); } collector.ack(tuple); } @Override pubpc void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override pubpc void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call")); } @Override pubpc Map<String, Object> getComponentConfiguration() { return null; } }
Creating Topology
The Storm topology is basically a Thrift structure. TopologyBuilder class provides simple and easy methods to create complex topologies. The TopologyBuilder class has methods to set spout (setSpout) and to set bolt (setBolt). Finally, TopologyBuilder has createTopology to create topology. Use the following code snippet to create a topology −
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
shuffleGrouping and fieldsGrouping methods help to set stream grouping for spout and bolts.
Local Cluster
For development purpose, we can create a local cluster using "LocalCluster" object and then submit the topology using "submitTopology" method of "LocalCluster" class. One of the arguments for "submitTopology" is an instance of "Config" class. The "Config" class is used to set configuration options before submitting the topology. This configuration option will be merged with the cluster configuration at run time and sent to all task (spout and bolt) with the prepare method. Once topology is submitted to the cluster, we will wait 10 seconds for the cluster to compute the submitted topology and then shutdown the cluster using “shutdown” method of "LocalCluster". The complete program code is as follows −
Coding − LogAnalyserStorm.java
import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import storm configuration packages import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; //Create main class LogAnalyserStorm submit topology. pubpc class LogAnalyserStorm { pubpc static void main(String[] args) throws Exception{ //Create Config instance for cluster configuration Config config = new Config(); config.setDebug(true); // TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology()); Thread.sleep(10000); //Stop the topology cluster.shutdown(); } }
Building and Running the Apppcation
The complete apppcation has four Java codes. They are −
FakeCallLogReaderSpout.java
CallLogCreaterBolt.java
CallLogCounterBolt.java
LogAnalyerStorm.java
The apppcation can be built using the following command −
javac -cp “/path/to/storm/apache-storm-0.9.5/pb/*” *.java
The apppcation can be run using the following command −
java -cp “/path/to/storm/apache-storm-0.9.5/pb/*”:. LogAnalyserStorm
Output
Once the apppcation is started, it will output the complete details about the cluster startup process, spout and bolt processing, and finally, the cluster shutdown process. In "CallLogCounterBolt", we have printed the call and its count details. This information will be displayed on the console as follows −
1234123402 - 1234123401 : 78 1234123402 - 1234123404 : 88 1234123402 - 1234123403 : 105 1234123401 - 1234123404 : 74 1234123401 - 1234123403 : 81 1234123401 - 1234123402 : 81 1234123403 - 1234123404 : 86 1234123404 - 1234123401 : 63 1234123404 - 1234123402 : 82 1234123403 - 1234123402 : 83 1234123404 - 1234123403 : 86 1234123403 - 1234123401 : 93
Non-JVM languages
Storm topologies are implemented by Thrift interfaces which makes it easy to submit topologies in any language. Storm supports Ruby, Python and many other languages. Let’s take a look at python binding.
Python Binding
Python is a general-purpose interpreted, interactive, object-oriented, and high-level programming language. Storm supports Python to implement its topology. Python supports emitting, anchoring, acking, and logging operations.
As you know, bolts can be defined in any language. Bolts written in another language are executed as sub-processes, and Storm communicates with those sub-processes with JSON messages over stdin/stdout. First take a sample bolt WordCount that supports python binding.
pubpc static class WordCount implements IRichBolt { pubpc WordSppt() { super("python", "spptword.py"); } pubpc void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
Here the class WordCount implements the IRichBolt interface and running with python implementation specified super method argument "spptword.py". Now create a python implementation named "spptword.py".
import storm class WordCountBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].sppt(" ") for word in words: storm.emit([word]) WordCountBolt().run()
This is the sample implementation for Python that counts the words in a given sentence. Similarly you can bind with other supporting languages as well.
Advertisements