Skip to content

Time window events with Apache Spark Streaming  

If you’re working with Spark Streaming you might run into an interesting problem if you want to output an event based on a number of messages within a specific time period.

For example: I want to send a security alert if I see 10 DDOS attempts to an IP address in a 5 minute window.

groupByKeyAndWindow

groupByKeyAndWindow allows us to choose the IP address for the key and 5 minutes for the window. If we wanted to then collect the sourceIp and the timestamp it would look like this:

var messageLimit = 10
var messageWindow = Minutes(5)
val scc = new StreamingContext(conf, Minutes(1))

// ... setup Kafka consumer via SparkUtils
kafkaConsumer
    .flatMap(parseSecurityMessage)
    .filter(m => m.securityType == 'DDOS')
    .map(m => m.targetIp -> Seq((m.timestamp, m.sourceIp)))
    .reduceByKeyAndWindow({(x, y) => x ++ y}, messageWindow)
    .filter(g => g._2.length >= messageLimit)
    .foreachRDD(m => m.foreach(createAlertEvent))

scc.start()
scc.streamUntilTerminated()

Problem

The problem is your event will fire many times as the stateless RDD is re-run every batch period.

The simplest solution would be to make the batch interval the same as your message window size but that causes more problems, namely:

  • Your job can’t perform any other triggers on the source data at a shorter interval
  • You won’t know about these alerts until some time after they happen (in this case 5 minutes)

External would be terrible and neither Spark counters or globals are much use here.

Solution

We need to do two things:

  1. Stop the RDD re-running and instead use streaming state. We can do this by using the reduceByKeyAndWindow overload that allows us to specify the inverse function for removing data as it goes out of window.
  2. Introduce a small amount of in-RDD state that can be used to identify when the event is cleared and when it should fire again.

Let us assume there is a class to handle part 2 named WindowEventTrigger that provides add and remove methods as well as a boolean triggerNow flag that identifies when the event should re-fire. Our RDD body would now look like this:

kafkaConsumer
    .flatMap(parseSecurityMessage)
    .filter(m => m.securityType == 'DDOS')
    .map(m => m.targetIp -> WindowEventTrigger(Seq(m.timestamp, m.sourceIp), messageLimit))
    .reduceByKeyAndWindow(_ add _, _ remove _, messageWindow)
    .filter(_._2.triggerNow)
    .foreachRDD(m => m.foreach(createAlertEvent))

How this works is actually quite simple. We have a case class called WindowEventTrigger that we map into the stream for each incoming message, it then:

  1. Tracks incoming messages and if it hits the level sets the flag and makes note of the event
  2. Tracks outgoing messages and resets when the event that caused the trigger goes out of window

By switching to the in-memory groupByKeyAndWindow Spark will need to persist state in case executors go down or it is necessary to shuffle data between them. Ensure your SparkStreamingContext object has a checkpoint folder set to reliable storage like HDFS

WindowEventTrigger class

Here is the WindowEventTrigger class for your enjoyment.

case class WindowEventTrigger[T] private(eventsInWindow: Seq[T], triggerNow: Boolean, private val lastTriggeredEvent: Option[T], private val triggerLevel: Int) {
  def this(item: T, triggerLevel: Int) = this(Seq(item), false, None, triggerLevel)

  def add(incoming: WindowEventTrigger[T]): WindowEventTrigger[T] = {
    val combined = eventsInWindow ++ incoming.eventsInWindow
    val shouldTrigger = lastTriggeredEvent.isEmpty && combined.length >= triggerLevel
    val triggeredEvent = if (shouldTrigger) combined.seq.drop(triggerLevel - 1).headOption else lastTriggeredEvent
    new WindowEventTrigger(combined, shouldTrigger, triggeredEvent, triggerLevel)
  }

  def remove(outgoing: WindowEventTrigger[T]): WindowEventTrigger[T] = {
    val reduced = eventsInWindow.filterNot(y => outgoing.eventsInWindow.contains(y))
    val triggeredEvent = if (lastTriggeredEvent.isDefined && outgoing.eventsInWindow.contains(lastTriggeredEvent.get)) None else lastTriggeredEvent
    new WindowEventTrigger(reduced, false, triggeredEvent, triggerLevel)
  }
}

Happy streaming,

[)amien

One response  

  1. Have you had a chance to use the other windowed functions like Lag on a Dstream? We are experiencing performance issues even with really low data volumes and it doesn’t seem like the right solution for calculating the time duration from the current record to the previous for matching fields. At least not in 1.4.1.

    Scott F – December 18th, 2015

Respond to this