Time window events with Apache Spark Streaming
- 📅
- 📝 662 words
- 🕙 3 minutes
- 📦 Development
- 🏷️ Apache Spark, Scala
- 💬 1 response
If you’re working with Spark Streaming, you might run into an interesting problem if you want to output an event based on multiple messages within a specific time period.
For example, I want to send a security alert if I see 10 DDOS attempts to an IP address in a five-minute window.
groupByKeyAndWindow
groupByKeyAndWindow
allows us to choose the IP address for the key and 5 minutes for the window. If we wanted to subsequently collect the sourceIp and the timestamp, it looks like this:
var messageLimit = 10
var messageWindow = Minutes(5)
val scc = new StreamingContext(conf, Minutes(1))
// ... setup Kafka consumer via SparkUtils
kafkaConsumer
.flatMap(parseSecurityMessage)
.filter(m => m.securityType == 'DDOS')
.map(m => m.targetIp -> Seq((m.timestamp, m.sourceIp)))
.reduceByKeyAndWindow({(x, y) => x ++ y}, messageWindow)
.filter(g => g._2.length >= messageLimit)
.foreachRDD(m => m.foreach(createAlertEvent))
scc.start()
scc.streamUntilTerminated()
Problem
The problem is your event fires many times as the stateless RDD is re-run every batch period.
The simplest solution would be to make the batch interval the same as your message window size, but that causes more problems, namely:
- Your job can’t perform any other triggers on the source data at a shorter interval
- You won’t know about these alerts until some time after they happen (in this case 5 minutes)
External would be terrible, and neither Spark counters nor globals are much use here.
Solution
We need to do two things:
- Stop the RDD re-running and instead use the streaming state. We can do this by using the
reduceByKeyAndWindow
overload that allows us to specify the inverse function for removing data as it goes out of the window. - Introduce a small amount of in-RDD state used to identify when the event is clear and when it should fire again.
Let us assume we have a class to handle part 2 named WindowEventTrigger
that provides add and remove methods and a boolean triggerNow
flag that identifies when the event should re-fire. Our RDD body would now look like this:
kafkaConsumer
.flatMap(parseSecurityMessage)
.filter(m => m.securityType == 'DDOS')
.map(m => m.targetIp -> WindowEventTrigger(Seq(m.timestamp, m.sourceIp), messageLimit))
.reduceByKeyAndWindow(_ add _, _ remove _, messageWindow)
.filter(_._2.triggerNow)
.foreachRDD(m => m.foreach(createAlertEvent))
How this works is quite simple. We have a case class called WindowEventTrigger
that we map into the stream for each incoming message. It then:
- Tracks incoming messages — if it hits the level, sets the flag, and notes the event
- Tracks outgoing messages — and resets when the event that caused the trigger leaves the window
By switching to the in-memory
groupByKeyAndWindow
, Spark needs to persist state in case executors go down or it is necessary to shuffle data between them. Ensure your SparkStreamingContext object has a checkpoint folder set to reliable storage like HDFS.
WindowEventTrigger class
Here is the WindowEventTrigger
class for your utilisation.
case class WindowEventTrigger[T] private(eventsInWindow: Seq[T], triggerNow: Boolean, private val lastTriggeredEvent: Option[T], private val triggerLevel: Int) {
def this(item: T, triggerLevel: Int) = this(Seq(item), false, None, triggerLevel)
def add(incoming: WindowEventTrigger[T]): WindowEventTrigger[T] = {
val combined = eventsInWindow ++ incoming.eventsInWindow
val shouldTrigger = lastTriggeredEvent.isEmpty && combined.length >= triggerLevel
val triggeredEvent = if (shouldTrigger) combined.seq.drop(triggerLevel - 1).headOption else lastTriggeredEvent
new WindowEventTrigger(combined, shouldTrigger, triggeredEvent, triggerLevel)
}
def remove(outgoing: WindowEventTrigger[T]): WindowEventTrigger[T] = {
val reduced = eventsInWindow.filterNot(y => outgoing.eventsInWindow.contains(y))
val triggeredEvent = if (lastTriggeredEvent.isDefined && outgoing.eventsInWindow.contains(lastTriggeredEvent.get)) None else lastTriggeredEvent
new WindowEventTrigger(reduced, false, triggeredEvent, triggerLevel)
}
}
Happy streaming,
[)amien
1 response to Time window events with Apache Spark Streaming
Have you had a chance to use the other windowed functions like Lag on a Dstream? We are experiencing performance issues even with really low data volumes and it doesn’t seem like the right solution for calculating the time duration from the current record to the previous for matching fields. At least not in 1.4.1.