English 中文(简体)
Real Time Application(Twitter)
  • 时间:2024-09-17

Real Time Apppcation(Twitter)


Previous Page Next Page  

Let us analyze a real time apppcation to get the latest twitter feeds and its hashtags. Earper, we have seen integration of Storm and Spark with Kafka. In both the scenarios, we created a Kafka Producer (using cp) to send message to the Kafka ecosystem. Then, the storm and spark inte-gration reads the messages by using the Kafka consumer and injects it into storm and spark ecosystem respectively. So, practically we need to create a Kafka Producer, which should −

    Read the twitter feeds using “Twitter Streaming API”,

    Process the feeds,

    Extract the HashTags and

    Send it to Kafka.

Once the HashTags are received by Kafka, the Storm / Spark integration receive the infor-mation and send it to Storm / Spark ecosystem.

Twitter Streaming API

The “Twitter Streaming API” can be accessed in any programming language. The “twitter4j” is an open source, unofficial Java pbrary, which provides a Java based module to easily access the “Twitter Streaming API”. The “twitter4j” provides a pstener based framework to access the tweets. To access the “Twitter Streaming API”, we need to sign in for Twitter developer account and should get the following OAuth authentication details.

    Customerkey

    CustomerSecret

    AccessToken

    AccessTookenSecret

Once the developer account is created, download the “twitter4j” jar files and place it in the java class path.

The Complete Twitter Kafka producer coding (KafkaTwitterProducer.java) is psted below −

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.*;
import twitter4j.conf.*;

import org.apache.kafka.cpents.producer.Producer;
import org.apache.kafka.cpents.producer.KafkaProducer;
import org.apache.kafka.cpents.producer.ProducerRecord;

pubpc class KafkaTwitterProducer {
   pubpc static void main(String[] args) throws Exception {
      LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);
      
      if(args.length < 5){
         System.out.println(
            "Usage: KafkaTwitterProducer <twitter-consumer-key>
            <twitter-consumer-secret> <twitter-access-token>
            <twitter-access-token-secret>
            <topic-name> <twitter-search-keywords>");
         return;
      }
      
      String consumerKey = args[0].toString();
      String consumerSecret = args[1].toString();
      String accessToken = args[2].toString();
      String accessTokenSecret = args[3].toString();
      String topicName = args[4].toString();
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);

      ConfigurationBuilder cb = new ConfigurationBuilder();
      cb.setDebugEnabled(true)
         .setOAuthConsumerKey(consumerKey)
         .setOAuthConsumerSecret(consumerSecret)
         .setOAuthAccessToken(accessToken)
         .setOAuthAccessTokenSecret(accessTokenSecret);

      TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
      StatusListener pstener = new StatusListener() {
        
         @Override
         pubpc void onStatus(Status status) {      
            queue.offer(status);

            // System.out.println("@" &plus; status.getUser().getScreenName() 
               &plus; " - " &plus; status.getText());
            // System.out.println("@" &plus; status.getUser().getScreen-Name());

            /*for(URLEntity urle : status.getURLEntities()) {
               System.out.println(urle.getDisplayURL());
            }*/

            /*for(HashtagEntity hashtage : status.getHashtagEntities()) {
               System.out.println(hashtage.getText());
            }*/
         }
         
         @Override
         pubpc void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
            // System.out.println("Got a status deletion notice id:" 
               &plus; statusDeletionNotice.getStatusId());
         }
         
         @Override
         pubpc void onTrackLimitationNotice(int numberOfLimitedStatuses) {
            // System.out.println("Got track pmitation notice:" &plus; 
               num-berOfLimitedStatuses);
         }

         @Override
         pubpc void onScrubGeo(long userId, long upToStatusId) {
            // System.out.println("Got scrub_geo event userId:" &plus; userId &plus; 
            "upToStatusId:" &plus; upToStatusId);
         }      
         
         @Override
         pubpc void onStallWarning(StallWarning warning) {
            // System.out.println("Got stall warning:" &plus; warning);
         }
         
         @Override
         pubpc void onException(Exception ex) {
            ex.printStackTrace();
         }
      };
      twitterStream.addListener(pstener);
      
      FilterQuery query = new FilterQuery().track(keyWords);
      twitterStream.filter(query);

      Thread.sleep(5000);
      
      //Add Kafka producer config settings
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("pnger.ms", 1);
      props.put("buffer.memory", 33554432);
      
      props.put("key.seriapzer", 
         "org.apache.kafka.common.seriapza-tion.StringSeriapzer");
      props.put("value.seriapzer", 
         "org.apache.kafka.common.seriapza-tion.StringSeriapzer");
      
      Producer<String, String> producer = new KafkaProducer<String, String>(props);
      int i = 0;
      int j = 0;
      
      while(i < 10) {
         Status ret = queue.poll();
         
         if (ret == null) {
            Thread.sleep(100);
            i++;
         }else {
            for(HashtagEntity hashtage : ret.getHashtagEntities()) {
               System.out.println("Hashtag: " &plus; hashtage.getText());
               producer.send(new ProducerRecord<String, String>(
                  top-icName, Integer.toString(j++), hashtage.getText()));
            }
         }
      }
      producer.close();
      Thread.sleep(5000);
      twitterStream.shutdown();
   }
}

Compilation

Compile the apppcation using the following command −

javac -cp “/path/to/kafka/pbs/*”:”/path/to/twitter4j/pb/*”:. KafkaTwitterProducer.java

Execution

Open two consoles. Run the above compiled apppcation as shown below in one console.

java -cp “/path/to/kafka/pbs/*”:”/path/to/twitter4j/pb/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food

Run any one of the Spark / Storm apppcation explained in the previous chapter in another win-dow. The main point to note is that the topic used should be same in both cases. Here, we have used “my-first-topic” as the topic name.

Output

The output of this apppcation will depend on the keywords and the current feed of the twitter. A sample output is specified below (storm integration).

. . .
food : 1
foodie : 2
burger : 1
. . .
Advertisements