Time window events with Apache Spark Streaming

If you’re working with Spark Streaming, you might run into an interesting problem if you want to output an event based on multiple messages within a specific time period.

For example, I want to send a security alert if I see 10 DDOS attempts to an IP address in a five-minute window.


groupByKeyAndWindow allows us to choose the IP address for the key and 5 minutes for the window. If we wanted to subsequently collect the sourceIp and the timestamp, it looks like this:

var messageLimit = 10
var messageWindow = Minutes(5)
val scc = new StreamingContext(conf, Minutes(1))

// ... setup Kafka consumer via SparkUtils
    .filter(m => m.securityType == 'DDOS')
    .map(m => m.targetIp -> Seq((m.timestamp, m.sourceIp)))
    .reduceByKeyAndWindow({(x, y) => x ++ y}, messageWindow)
    .filter(g => g._2.length >= messageLimit)
    .foreachRDD(m => m.foreach(createAlertEvent))



The problem is your event fires many times as the stateless RDD is re-run every batch period.

The simplest solution would be to make the batch interval the same as your message window size, but that causes more problems, namely:

  • Your job can’t perform any other triggers on the source data at a shorter interval
  • You won’t know about these alerts until some time after they happen (in this case 5 minutes)

External would be terrible, and neither Spark counters nor globals are much use here.


We need to do two things:

  1. Stop the RDD re-running and instead use the streaming state. We can do this by using the reduceByKeyAndWindow overload that allows us to specify the inverse function for removing data as it goes out of the window.
  2. Introduce a small amount of in-RDD state used to identify when the event is clear and when it should fire again.

Let us assume we have a class to handle part 2 named WindowEventTrigger that provides add and remove methods and a boolean triggerNow flag that identifies when the event should re-fire. Our RDD body would now look like this:

    .filter(m => m.securityType == 'DDOS')
    .map(m => m.targetIp -> WindowEventTrigger(Seq(m.timestamp, m.sourceIp), messageLimit))
    .reduceByKeyAndWindow(_ add _, _ remove _, messageWindow)
    .foreachRDD(m => m.foreach(createAlertEvent))

How this works is quite simple. We have a case class called WindowEventTrigger that we map into the stream for each incoming message. It then:

  1. Tracks incoming messages - if it hits the level, sets the flag, and notes the event
  2. Tracks outgoing messages - and resets when the event that caused the trigger leaves the window

By switching to the in-memory `groupByKeyAndWindow`, Spark needs to persist state in case executors go down or it is necessary to shuffle data between them. Ensure your SparkStreamingContext object has a checkpoint folder set to reliable storage like HDFS.

WindowEventTrigger class

Here is the WindowEventTrigger class for your utilisation.

case class WindowEventTrigger[T] private(eventsInWindow: Seq[T], triggerNow: Boolean, private val lastTriggeredEvent: Option[T], private val triggerLevel: Int) {
  def this(item: T, triggerLevel: Int) = this(Seq(item), false, None, triggerLevel)

  def add(incoming: WindowEventTrigger[T]): WindowEventTrigger[T] = {
    val combined = eventsInWindow ++ incoming.eventsInWindow
    val shouldTrigger = lastTriggeredEvent.isEmpty && combined.length >= triggerLevel
    val triggeredEvent = if (shouldTrigger) combined.seq.drop(triggerLevel - 1).headOption else lastTriggeredEvent
    new WindowEventTrigger(combined, shouldTrigger, triggeredEvent, triggerLevel)

  def remove(outgoing: WindowEventTrigger[T]): WindowEventTrigger[T] = {
    val reduced = eventsInWindow.filterNot(y => outgoing.eventsInWindow.contains(y))
    val triggeredEvent = if (lastTriggeredEvent.isDefined && outgoing.eventsInWindow.contains(lastTriggeredEvent.get)) None else lastTriggeredEvent
    new WindowEventTrigger(reduced, false, triggeredEvent, triggerLevel)

Happy streaming,


1 responses

  1. Avatar for Scott F

    Have you had a chance to use the other windowed functions like Lag on a Dstream? We are experiencing performance issues even with really low data volumes and it doesn't seem like the right solution for calculating the time duration from the current record to the previous for matching fields. At least not in 1.4.1.

    Scott F December 18, 2015