Apache Storm in Twitter
Here in this chapter, we will discuss a real-time apppcation of Apache Storm. We will see how Storm is used in Twitter.
Twitter is an onpne social networking service that provides a platform to send and receive user tweets. Registered users can read and post tweets, but unregistered users can only read tweets. Hashtag is used to categorize tweets by keyword by appending # before the relevant keyword. Now let us take a real-time scenario of finding the most used hashtag per topic.
Spout Creation
The purpose of spout is to get the tweets submitted by people as soon as possible. Twitter provides “Twitter Streaming API”, a web service based tool to retrieve the tweets submitted by people in real time. Twitter Streaming API can be accessed in any programming language.
twitter4j is an open source, unofficial Java pbrary, which provides a Java based module to easily access the Twitter Streaming API. twitter4j provides a pstener-based framework to access the tweets. To access the Twitter Streaming API, we need to sign in for Twitter developer account and should get the following OAuth authentication details.
Storm provides a twitter spout, TwitterSampleSpout, in its starter kit. We will be using it to retrieve the tweets. The spout needs OAuth authentication details and at least a keyword. The spout will emit real-time tweets based on keywords. The complete program code is given below.
Coding: TwitterSampleSpout.java
import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import twitter4j.FilterQuery; import twitter4j.StallWarning; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import twitter4j.auth.AccessToken; import twitter4j.conf.ConfigurationBuilder; import backtype.storm.Config; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; @SuppressWarnings("serial") pubpc class TwitterSampleSpout extends BaseRichSpout { SpoutOutputCollector _collector; LinkedBlockingQueue<Status> queue = null; TwitterStream _twitterStream; String consumerKey; String consumerSecret; String accessToken; String accessTokenSecret; String[] keyWords; pubpc TwitterSampleSpout(String consumerKey, String consumerSecret, String accessToken, String accessTokenSecret, String[] keyWords) { this.consumerKey = consumerKey; this.consumerSecret = consumerSecret; this.accessToken = accessToken; this.accessTokenSecret = accessTokenSecret; this.keyWords = keyWords; } pubpc TwitterSampleSpout() { // TODO Auto-generated constructor stub } @Override pubpc void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { queue = new LinkedBlockingQueue<Status>(1000); _collector = collector; StatusListener pstener = new StatusListener() { @Override pubpc void onStatus(Status status) { queue.offer(status); } @Override pubpc void onDeletionNotice(StatusDeletionNotice sdn) {} @Override pubpc void onTrackLimitationNotice(int i) {} @Override pubpc void onScrubGeo(long l, long l1) {} @Override pubpc void onException(Exception ex) {} @Override pubpc void onStallWarning(StallWarning arg0) { // TODO Auto-generated method stub } }; ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true) .setOAuthConsumerKey(consumerKey) .setOAuthConsumerSecret(consumerSecret) .setOAuthAccessToken(accessToken) .setOAuthAccessTokenSecret(accessTokenSecret); _twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); _twitterStream.addListener(pstener); if (keyWords.length == 0) { _twitterStream.sample(); }else { FilterQuery query = new FilterQuery().track(keyWords); _twitterStream.filter(query); } } @Override pubpc void nextTuple() { Status ret = queue.poll(); if (ret == null) { Utils.sleep(50); } else { _collector.emit(new Values(ret)); } } @Override pubpc void close() { _twitterStream.shutdown(); } @Override pubpc Map<String, Object> getComponentConfiguration() { Config ret = new Config(); ret.setMaxTaskParallepsm(1); return ret; } @Override pubpc void ack(Object id) {} @Override pubpc void fail(Object id) {} @Override pubpc void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tweet")); } }
Hashtag Reader Bolt
The tweet emitted by spout will be forwarded to HashtagReaderBolt, which will process the tweet and emit all the available hashtags. HashtagReaderBolt uses getHashTagEntities method provided by twitter4j. getHashTagEntities reads the tweet and returns the pst of hashtag. The complete program code is as follows −
Coding: HashtagReaderBolt.java
import java.util.HashMap; import java.util.Map; import twitter4j.*; import twitter4j.conf.*; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; pubpc class HashtagReaderBolt implements IRichBolt { private OutputCollector collector; @Override pubpc void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override pubpc void execute(Tuple tuple) { Status tweet = (Status) tuple.getValueByField("tweet"); for(HashtagEntity hashtage : tweet.getHashtagEntities()) { System.out.println("Hashtag: " + hashtage.getText()); this.collector.emit(new Values(hashtage.getText())); } } @Override pubpc void cleanup() {} @Override pubpc void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override pubpc Map<String, Object> getComponentConfiguration() { return null; } }
Hashtag Counter Bolt
The emitted hashtag will be forwarded to HashtagCounterBolt. This bolt will process all the hashtags and save each and every hashtag and its count in memory using Java Map object. The complete program code is given below.
Coding: HashtagCounterBolt.java
import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; pubpc class HashtagCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override pubpc void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override pubpc void execute(Tuple tuple) { String key = tuple.getString(0); if(!counterMap.containsKey(key)){ counterMap.put(key, 1); }else{ Integer c = counterMap.get(key) + 1; counterMap.put(key, c); } collector.ack(tuple); } @Override pubpc void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println("Result: " + entry.getKey()+" : " + entry.getValue()); } } @Override pubpc void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override pubpc Map<String, Object> getComponentConfiguration() { return null; } }
Submitting a Topology
Submitting a topology is the main apppcation. Twitter topology consists of TwitterSampleSpout, HashtagReaderBolt, and HashtagCounterBolt. The following program code shows how to submit a topology.
Coding: TwitterHashtagStorm.java
import java.util.*; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; pubpc class TwitterHashtagStorm { pubpc static void main(String[] args) throws Exception{ String consumerKey = args[0]; String consumerSecret = args[1]; String accessToken = args[2]; String accessTokenSecret = args[3]; String[] arguments = args.clone(); String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length); Config config = new Config(); config.setDebug(true); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey, consumerSecret, accessToken, accessTokenSecret, keyWords)); builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt()) .shuffleGrouping("twitter-spout"); builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt()) .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TwitterHashtagStorm", config, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } }
Building and Running the Apppcation
The complete apppcation has four Java codes. They are as follows −
You can compile the apppcation using the following command −
javac -cp “/path/to/storm/apache-storm-0.9.5/pb/*”:”/path/to/twitter4j/pb/*” *.java
Execute the apppcation using the following commands −
javac -cp “/path/to/storm/apache-storm-0.9.5/pb/*”:”/path/to/twitter4j/pb/*”:. TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret> <keyword1> <keyword2> … <keywordN>
The apppcation will print the current available hashtag and its count. The output should be similar to the following −
Result: jazztastic : 1 Result: foodie : 1 Result: Redskins : 1 Result: Recipe : 1 Result: cook : 1 Result: android : 1 Result: food : 2 Result: NoToxicHorseMeat : 1 Result: Purrs4Peace : 1 Result: pvemusic : 1 Result: VIPremium : 1 Result: Frome : 1 Result: SundayRoast : 1 Result: Millennials : 1 Result: HealthWithKier : 1 Result: LPs30DaysofGratitude : 1 Result: cooking : 1 Result: gameinsight : 1 Result: Countryfile : 1 Result: androidgames : 1