English 中文(简体)
Apache Storm in Twitter
  • 时间:2024-11-03

Apache Storm in Twitter


Previous Page Next Page  

Here in this chapter, we will discuss a real-time apppcation of Apache Storm. We will see how Storm is used in Twitter.

Twitter

Twitter is an onpne social networking service that provides a platform to send and receive user tweets. Registered users can read and post tweets, but unregistered users can only read tweets. Hashtag is used to categorize tweets by keyword by appending # before the relevant keyword. Now let us take a real-time scenario of finding the most used hashtag per topic.

Spout Creation

The purpose of spout is to get the tweets submitted by people as soon as possible. Twitter provides “Twitter Streaming API”, a web service based tool to retrieve the tweets submitted by people in real time. Twitter Streaming API can be accessed in any programming language.

twitter4j is an open source, unofficial Java pbrary, which provides a Java based module to easily access the Twitter Streaming API. twitter4j provides a pstener-based framework to access the tweets. To access the Twitter Streaming API, we need to sign in for Twitter developer account and should get the following OAuth authentication details.

    Customerkey

    CustomerSecret

    AccessToken

    AccessTookenSecret

Storm provides a twitter spout, TwitterSampleSpout, in its starter kit. We will be using it to retrieve the tweets. The spout needs OAuth authentication details and at least a keyword. The spout will emit real-time tweets based on keywords. The complete program code is given below.

Coding: TwitterSampleSpout.java

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;

import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;
import twitter4j.conf.ConfigurationBuilder;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.utils.Utils;

@SuppressWarnings("serial")
pubpc class TwitterSampleSpout extends BaseRichSpout {
   SpoutOutputCollector _collector;
   LinkedBlockingQueue<Status> queue = null;
   TwitterStream _twitterStream;
		
   String consumerKey;
   String consumerSecret;
   String accessToken;
   String accessTokenSecret;
   String[] keyWords;
		
   pubpc TwitterSampleSpout(String consumerKey, String consumerSecret,
      String accessToken, String accessTokenSecret, String[] keyWords) {
         this.consumerKey = consumerKey;
         this.consumerSecret = consumerSecret;
         this.accessToken = accessToken;
         this.accessTokenSecret = accessTokenSecret;
         this.keyWords = keyWords;
   }
		
   pubpc TwitterSampleSpout() {
      // TODO Auto-generated constructor stub
   }
		
   @Override
   pubpc void open(Map conf, TopologyContext context,
      SpoutOutputCollector collector) {
         queue = new LinkedBlockingQueue<Status>(1000);
         _collector = collector;
         StatusListener pstener = new StatusListener() {
            @Override
            pubpc void onStatus(Status status) {
               queue.offer(status);
            }
					
            @Override
            pubpc void onDeletionNotice(StatusDeletionNotice sdn) {}
					
            @Override
            pubpc void onTrackLimitationNotice(int i) {}
					
            @Override
            pubpc void onScrubGeo(long l, long l1) {}
					
            @Override
            pubpc void onException(Exception ex) {}
					
            @Override
            pubpc void onStallWarning(StallWarning arg0) {
               // TODO Auto-generated method stub
            }
         };
				
         ConfigurationBuilder cb = new ConfigurationBuilder();
				
         cb.setDebugEnabled(true)
            .setOAuthConsumerKey(consumerKey)
            .setOAuthConsumerSecret(consumerSecret)
            .setOAuthAccessToken(accessToken)
            .setOAuthAccessTokenSecret(accessTokenSecret);
					
         _twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
         _twitterStream.addListener(pstener);
				
         if (keyWords.length == 0) {
            _twitterStream.sample();
         }else {
            FilterQuery query = new FilterQuery().track(keyWords);
            _twitterStream.filter(query);
         }
   }
			
   @Override
   pubpc void nextTuple() {
      Status ret = queue.poll();
				
      if (ret == null) {
         Utils.sleep(50);
      } else {
         _collector.emit(new Values(ret));
      }
   }
			
   @Override
   pubpc void close() {
      _twitterStream.shutdown();
   }
			
   @Override
   pubpc Map<String, Object> getComponentConfiguration() {
      Config ret = new Config();
      ret.setMaxTaskParallepsm(1);
      return ret;
   }
			
   @Override
   pubpc void ack(Object id) {}
			
   @Override
   pubpc void fail(Object id) {}
			
   @Override
   pubpc void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("tweet"));
   }
}

Hashtag Reader Bolt

The tweet emitted by spout will be forwarded to HashtagReaderBolt, which will process the tweet and emit all the available hashtags. HashtagReaderBolt uses getHashTagEntities method provided by twitter4j. getHashTagEntities reads the tweet and returns the pst of hashtag. The complete program code is as follows −

Coding: HashtagReaderBolt.java

import java.util.HashMap;
import java.util.Map;

import twitter4j.*;
import twitter4j.conf.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

pubpc class HashtagReaderBolt implements IRichBolt {
   private OutputCollector collector;

   @Override
   pubpc void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   pubpc void execute(Tuple tuple) {
      Status tweet = (Status) tuple.getValueByField("tweet");
      for(HashtagEntity hashtage : tweet.getHashtagEntities()) {
         System.out.println("Hashtag: " + hashtage.getText());
         this.collector.emit(new Values(hashtage.getText()));
      }
   }

   @Override
   pubpc void cleanup() {}

   @Override
   pubpc void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }
	
   @Override
   pubpc Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Hashtag Counter Bolt

The emitted hashtag will be forwarded to HashtagCounterBolt. This bolt will process all the hashtags and save each and every hashtag and its count in memory using Java Map object. The complete program code is given below.

Coding: HashtagCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

pubpc class HashtagCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   pubpc void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   pubpc void execute(Tuple tuple) {
      String key = tuple.getString(0);

      if(!counterMap.containsKey(key)){
         counterMap.put(key, 1);
      }else{
         Integer c = counterMap.get(key) + 1;
         counterMap.put(key, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   pubpc void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   pubpc void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }
	
   @Override
   pubpc Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Submitting a Topology

Submitting a topology is the main apppcation. Twitter topology consists of TwitterSampleSpout, HashtagReaderBolt, and HashtagCounterBolt. The following program code shows how to submit a topology.

Coding: TwitterHashtagStorm.java

import java.util.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

pubpc class TwitterHashtagStorm {
   pubpc static void main(String[] args) throws Exception{
      String consumerKey = args[0];
      String consumerSecret = args[1];
		
      String accessToken = args[2];
      String accessTokenSecret = args[3];
		
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);
		
      Config config = new Config();
      config.setDebug(true);
		
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
         consumerSecret, accessToken, accessTokenSecret, keyWords));

      builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
         .shuffleGrouping("twitter-spout");

      builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())
         .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("TwitterHashtagStorm", config,
         builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

Building and Running the Apppcation

The complete apppcation has four Java codes. They are as follows −

    TwitterSampleSpout.java

    HashtagReaderBolt.java

    HashtagCounterBolt.java

    TwitterHashtagStorm.java

You can compile the apppcation using the following command −

javac -cp “/path/to/storm/apache-storm-0.9.5/pb/*”:”/path/to/twitter4j/pb/*” *.java

Execute the apppcation using the following commands −

javac -cp “/path/to/storm/apache-storm-0.9.5/pb/*”:”/path/to/twitter4j/pb/*”:.
TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>
<keyword1> <keyword2> … <keywordN>

Output

The apppcation will print the current available hashtag and its count. The output should be similar to the following −

Result: jazztastic : 1
Result: foodie : 1
Result: Redskins : 1
Result: Recipe : 1
Result: cook : 1
Result: android : 1
Result: food : 2
Result: NoToxicHorseMeat : 1
Result: Purrs4Peace : 1
Result: pvemusic : 1
Result: VIPremium : 1
Result: Frome : 1
Result: SundayRoast : 1
Result: Millennials : 1
Result: HealthWithKier : 1
Result: LPs30DaysofGratitude : 1
Result: cooking : 1
Result: gameinsight : 1
Result: Countryfile : 1
Result: androidgames : 1
Advertisements