Spark Streaming VS Upsolver on Twitter Data

In this post we will provide a step-by-step implementation of an application that auto-completes Twitter hashtags, based on the most popular hashtags seen in the last 7 days. For example, when a user will write the text ‘trum’ in the auto-complete application, the application will probably suggest Trump related hashtags, since that’s a trending topic these days.

We will compare the implementation process in Spark Streaming and in Upsolver Streams.
Upsolver Streams is a big data streaming analytics platform as a service. It enables you to get your data ready for real-time insights, in minutes, without any additional data infrastructure.

Using Twitter sample stream is a great way to compare streaming analytics solutions on a rich input stream which is publicly available. We will compare both solutions for every step of the implementation: setup, stream data ingestion, hashtags’ prefixes extraction, time window aggregations and real-time queries per hashtag prefix.
This use case is interesting since it compares ease of use for a complex input with hundreds of fields, requires processing on a large time window (7 days) and requires real-time latency of query results, as the auto-suggested hashtags are displayed to the user.

Setup

For our Spark Streaming implementation, we will need to first install and run Spark and run Redis. We will use Redis to store and query the prefix to hashtags count aggregation.

We used Docker to run Redis and Spark for the simplicity of the example :

docker run -d -p 6379:6379 --name=redis redis
docker run --rm -it --link redis -p 4040:4040 -p 8080:8080 -p 8081:8081 -h spark --name=spark p7hb/docker-spark:2.1.0

Run Spark with Redis client and spark-streaming-twitter dependencies using the following command:

spark-shell --packages org.apache.bahir:spark-streaming-twitter_2.11:2.0.2,net.debasishg:redisclient_2.11:3.3

In Upsolver Streams, there is no need for setup, since it’s offered as a service.

Data Ingestion

Spark Streaming has built-in support for Twitter stream inputs. See below the code for creating the input:

// Initialization
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._

System.setProperty("twitter4j.oauth.consumerKey", "CONSUMER_KEY")
System.setProperty("twitter4j.oauth.consumerSecret", "CONSUMER_SECRET")
System.setProperty("twitter4j.oauth.accessToken", "ACESS_TOKEN")
System.setProperty("twitter4j.oauth.accessTokenSecret", "ACCESS_TOKEN_SECRET")

val ssc = new StreamingContext(sc, Seconds(1))
val twitterStream = TwitterUtils.createStream(ssc, None)

Due to the built-in support for the Twitter stream, there is no need to define the object schema.

However, for any proprietary input, an object with the schema of the input is a prerequisite for defining and running the Spark Streaming job. In some cases, this can be a real hassle, as you don’t necessarily know in advance exactly what your input data will look like.

Even the Twitter Developer Documentation notes that “consumers of Tweets should tolerate the addition of new fields and variance in ordering of fields with ease. Not all fields appear in all contexts.

In order to extract the hashtags, we will use the following code:

// Each tweet comes as a twitter4j.Status object, which we can use to
// extract hashtags. We use flatMap() since each status could have
// ZERO OR MORE hashtags.
val hashTags = twitterStream.flatMap(_.getHashtagEntities).map(_.getText)

To output the hashtags to console for validation, we will use the following:

hashtags.foreachRDD(rdd => rdd.foreach(r => println(r)))
ssc.start()

In Upsolver Streams, data ingestion is mostly implicit.

The user logs into a cloud based user interface and defines an endpoint (Kafka / Kinesis / HTTP / Cloud storage) to send the tweets to. In this case, we chose an HTTP endpoint and the tweets are then sent as an HTTP request with JSON data.
See input definition below:

Once the tweets are sent to the HTTP endpoint, a few things happen:

  1. A data lake is automatically created on AWS S3 with compressed Apache Avro files ordered by time.
  2. A hierarchical schema is automatically generated and managed including data type detection. The schema is kept up-to-date as more data is arriving. This process replaces the Spark RDD definition.
  3. Tweet samples are available and various statistics and breakdowns are offered to the user.

In the screenshot below you can see how incoming data is explored in the system:

Hashtags’ Prefixes Extraction

In order to query all possible hashtags in real-time, we need to store for every possible prefix, all hashtags beginning with this prefix and their count. The first step in generating this aggregation is extracting the hashtag prefixes.

In Spark Streaming, we would use the following code to extract the prefixes from each hashtag:

// Extracting prefixes
val prefixToHashtags = hashtags.flatMap(hashtag => """^((((((((((.)?.)?.)?.)?.)?.)?.)?.)?.)?.)?""".r.findAllMatchIn(hashtag)
    .flatMap(_.subgroups)
    .filter(_ != null)
    .map(prefix => prefix -> hashtag))

Note that for simplicity, we’re parsing the regex for every hashtag. This is not an efficient implementation.

In Upsolver Streams, the user would create a new calculated field from entities.hashtags[].text (this is the field in the Twitter stream JSON holding the hashtag text). Same as the Spark Streaming example, we will use a regular expression in order to extract the first ten prefixes for each hashtag:

 

 

As seen below, a new array field named prefixes is generated:

Time Window Aggregation

Now that we have the prefixes extracted from each hashtag, we need to generate an aggregation which, for every prefix (key) will hold all the hashtags beginning with this prefix and a count of the times they appeared (values). In our example, we will use a 7 day time window, which will enable hashtag auto-completion according to the most popular hashtags in the last week.

As mentioned above, we will use Redis in our Spark Streaming implementation. We are required to use some external key-value store, since time window aggregations in Spark Streaming are only used for very short time windows. We chose Redis due to the low latency queries it enables.

See below the code for insertion into Redis. We use Sorted Sets structures to save for every prefix, the number of times that hashtag appeared in tweets.

import com.redis._
prefixToHashtags.foreachRDD(rdd => {
  val client = new RedisClient("redis", 6379)
  rdd.collect().foreach({ case (prefix, hashtag) =>
    client.zincrby(prefix, 1, hashtag);
  });
});
ssc.start()

Note that we created a Redis client for every “RDD stream” for this example. For the full implementation, you’ll need to create a Redis connection pool per server.

The implementation above creates an aggregation for an unlimited time window. The implementation of a 7-day time window is a bit more complicated. When implementing a sliding window, there are two factors we need to consider: the window size (in our case 7 days) and the precision. For the implementation below, we chose precision of one hour, which means that we will save the hashtags for every prefix and calendar hour:

import com.redis._
val hashtags = twitterStream.flatMap(s => s.getHashtagEntities.map(h => s.getCreatedAt -> h.getText))
val prefixToHashtags = hashtags.flatMap({ case (date, hashtag) =>
  val millisecondsInHour = 1000 * 60 * 60 """^((((((((((.)?.)?.)?.)?.)?.)?.)?.)?.)?.)?""".r.findAllMatchIn(hashtag)
    .flatMap(_.subgroups)
    .filter(_ != null)
    .map(prefix => prefix + "." + (date.getTime() / millisecondsInHour) -> hashtag)
})
prefixToHashtags.foreachRDD(rdd => {
  val client = new RedisClient("redis", 6379)
  rdd.collect().foreach({ case (prefix, hashtag) =>
    client.zincrby(prefix, 1, hashtag);
  })
})

Also, for the full implementation you will need to implement Time-to-Live for the Redis records.

In Upsolver Streams, you can create an “Index” for a time window of any size. The index is built and updated incrementally, thus offering an always up-to-date (milliseconds precision) alternative to the process described above. Also, the index is compressed, saved in-memory and can be accessed via API in milliseconds.

See below the index definition in Upsolver Streams UI:

Real-time Queries per Hashtag Prefix

For the actual auto-complete application, we need to query the aggregation we created.

For the naïve implementation of the unlimited time window, querying Redis for the top 10 hashtags for a prefix, will be done as follows:

ZRANGE [YOUR_PREFIX_HERE] -10 -1 WITHSCORES

For the 7-day window, we will need to divide the date we want to query by the precision, and get the previous 168 (window size / precision) values. We’ll use LUA scripting feature in Redis:

var Redis = require('ioredis');
var redis = new Redis();
 
var windowSize = 7 * 24 * 60 * 60; // 7 days
var precision = 60 * 60 // 1 hour
 
redis.defineCommand('selectWindow', {
  numberOfKeys: 1,
  lua: `  local result = {}
    local windowSizeInSeconds = ARGV[1]
    local precisionInSeconds = ARGV[2]
    local startDate = ARGV[3]
    local endDate = startDate - windowSizeInSeconds;
    local prefix = KEYS[1];
    for time=startDate,endDate,-precisionInSeconds do
      local timeInPrecision = math.floor(time/precisionInSeconds)
      local windowResult = redis.call('ZRANGE', prefix .. "." .. timeInPrecision, 0, -1, 'WITHSCORES')
      for i=1,#windowResult,2 do
        local hashtag = windowResult[i]
        local rank = windowResult[i+1]
        result[hashtag] = (result[hashtag] or 0) + rank
      end
    end
    local arr = {};
    local i = 1
    for key, value in pairs(result) do
      arr[i] = { key, value }
      i = i + 1
    end
    local byValueDescending = function(a, b)
      return a[2] > b[2];
    end
    local slice = function(tbl, first, last, step)
      local sliced = {}
      for i = first or 1, last or #tbl, step or 1 do
        sliced[#sliced+1] = tbl[i]
      end
      return sliced
    end
    table.sort(arr, byValueDescending)
    return slice(arr, 0, 10);`
});

redis.selectWindow("YOUR_PREFIX_HERE",
  windowSize,
  precision,
  Math.floor(new Date().getTime()/1000),
  function(err, result) {
    console.log(result);
});
In Upsolver Streams, the index we defined is stored compressed in-memory. This enables storing a very large index in memory, which allows querying indices with very low latency.
For the API call, you’ll simply need to send an HTTP get request with the index token provided by Upsolver and the prefix.

See below an example of a test using the Upsolver query API:

Our indices scale up with your data up to billions of unique keys, while continuing to provide millisecond response times and unlimited throughput.

Summary

We reviewed a step-by-step walk through for implementing a data flow in Upsolver Streams and in Spark Streaming as a comparison.

As you can see, the Upsolver Streams implementation is very simple and intuitive and facilitates the end-to-end implementation of the data flow. Building the end-to-end auto-complete application took less than 5 minutes on Upsolver’s platform.

For the Spark Streaming implementation, we had to use an external data store for the aggregation and the time-window implementation was quite complex.

Upsolver also provides very straightforward access to the incoming data to facilitate data inspection and data transformations. It is 100% managed, so no data infrastructure is required from you.

If you want to learn more about Upsolver, visit our website http://www.upsolver.com, or sign up for your free demo.

Connect your data streams and see the difference