Time window events with Apache Spark Streaming  

If you’re working with Spark Streaming you might run into an interesting problem if you want to output an event based on a number of messages within a specific time period.

For example: I want to send a security alert if I see 10 DDOS attempts to an IP address in a 5 minute window.

groupByKeyAndWindow

groupByKeyAndWindow allows us to chose the IP address for the key and 5 minutes for the window. If we wanted to then collect the sourceIp and the timestamp it would look like this:

var messageLimit = 10
var messageWindow = Minutes(5)
val scc = new StreamingContext(conf, Minutes(1))

// ... setup Kafka consumer via SparkUtils
kafkaConsumer
    .flatMap(parseSecurityMessage)
    .filter(m => m.securityType == 'DDOS')
    .map(m => m.targetIp -> Seq((m.timestamp, m.sourceIp)))
    .reduceByKeyAndWindow({(x, y) => x ++ y}, messageWindow)
    .filter(g => g._2.length >= messageLimit)
    .foreachRDD(m => m.foreach(createAlertEvent))

scc.start()
scc.streamUntilTerminated()

Problem

The problem is your event will fire many times as the stateless RDD is re-run every batch period.

The simplest solution would be to make the batch interval the same as your message window size but that causes more problems, namely:

  • Your job can’t perform any other triggers on the source data at a shorter interval
  • You won’t know about these alerts until some time after they happen (in this case 5 minutes)

External would be terrible and neither Spark counters or globals are much use here.

Solution

We need to do two things:

  1. Stop the RDD re-running and instead use streaming state. We can do this by using the reduceByKeyAndWindow overload that allows us to specify the inverse function for removing data as it goes out of window.
  2. Introduce a small amount of in-RDD state that can be used to identify when the event is cleared and when it should fire again.

Let us assume there is a class to handle part 2 named WindowEventTrigger that provides add and remove methods as well as a boolean triggerNow flag that identifies when the event should re-fire. Our RDD body would now look like this:

kafkaConsumer
    .flatMap(parseSecurityMessage)
    .filter(m => m.securityType == 'DDOS')
    .map(m => m.targetIp -> WindowEventTrigger(Seq(m.timestamp, m.sourceIp), messageLimit))
    .reduceByKeyAndWindow(_ add _, _ remove _, messageWindow)
    .filter(_._2.triggerNow)
    .foreachRDD(m => m.foreach(createAlertEvent))

How this works is actually quite simple. We have a case class called WindowEventTrigger that we map into the stream for each incoming message, it then:

  1. Tracks incoming messages and if it hits the level sets the flag and makes note of the event
  2. Tracks outgoing messages and resets when the event that caused the trigger goes out of window

By switching to the in-memory groupByKeyAndWindow Spark will need to persist state in case executors go down or it is necessary to shuffle data between them. Ensure your SparkStreamingContext object has a checkpoint folder set to reliable storage like HDFS

WindowEventTrigger class

Here is the WindowEventTrigger class for your enjoyment.

case class WindowEventTrigger[T] private(eventsInWindow: Seq[T], triggerNow: Boolean, private val lastTriggeredEvent: Option[T], private val triggerLevel: Int) {
  def this(item: T, triggerLevel: Int) = this(Seq(item), false, None, triggerLevel)

  def add(incoming: WindowEventTrigger[T]): WindowEventTrigger[T] = {
    val combined = eventsInWindow ++ incoming.eventsInWindow
    val shouldTrigger = lastTriggeredEvent.isEmpty && combined.length >= triggerLevel
    val triggeredEvent = if (shouldTrigger) combined.seq.drop(triggerLevel - 1).headOption else lastTriggeredEvent
    new WindowEventTrigger(combined, shouldTrigger, triggeredEvent, triggerLevel)
  }

  def remove(outgoing: WindowEventTrigger[T]): WindowEventTrigger[T] = {
    val reduced = eventsInWindow.filterNot(y => outgoing.eventsInWindow.contains(y))
    val triggeredEvent = if (lastTriggeredEvent.isDefined && outgoing.eventsInWindow.contains(lastTriggeredEvent.get)) None else lastTriggeredEvent
    new WindowEventTrigger(reduced, false, triggeredEvent, triggerLevel)
  }
}

Happy streaming,

[)amien

Table per hierarchy in Azure Table Storage  

If you’re coming from an ORM background to Azure Table Storage you might be wondering how to map class hierarchies to tables.

Documentation on the topic is hard to find unless you know the magic class class name EntityResolver which you can find out by digging into the Azure Client for .NET source code.

Let’s say we have a basic blog style system (minimal fields shown):

public class Content {
  public string Id { get; set; }
  public string Title { get; set }
}

public class BlogPost : Content {
  public List<string> Topics { get; set; }
}

public class Page : Content {
  public String Slug { get; set; }
}

The trick is to create an instance of EntityResolver where T is your base class, e.g. Content. Strangely EntityResolver’s signature requires T implement new() so you can’t make your base class abstract.

Firstly we need to add to our base class some kind of identifier for the type – in ORM terms this is referred to as a discriminator. Then we’d override that in the subtypes to ensure new instances get the correct type set on insertion.

public class Content {
  public string Id { get; set; }
  public string Title { get; set }
}

public class BlogPost : Content {
  public List<string> Topics { get; set; }
}

public class Page : Content {
  public String Slug { get; set; }
}

Let’s say we want to store all of these in a table called ‘content’. We would typically write a small helper class to handle the cloud table and storage, e.g.

public class Content {
  public string Id { get; set; }
  public string Title { get; set }
  public virtual string ContentType { get; set; }
}

public class BlogPost : Content {
  public List<string> Topics { get; set; }
  public override string ContentType {
    get { return "blog"; }
    set { }
  }
}

public class Page : Content {
  public String Slug { get; set; }
  public override string ContentType {
    get { return "page"; }
    set { }
  }
}

With just that change you can actually start inserting rows into Azure Table Storage but querying them back will always result in Content types and saving those back will result in data loss!

We can however help the CloudTable client materialize the correct results by creating an EntityResolver:

EntityResolver<Content> contentResolver = (partitionKey, rowKey, timestamp, properties, etag) => {
    var contentType = properties["ContentType"].StringValue;
    switch (contentType) {
        case "blog": return new BlogPost();
        case "page": return new Page();
        default: throw new NotSupportedException(String.Format("Unknown ContentType '{0}'", contentType));
    }
}

Which is then passed into operations that materialize results. Note that some signatures don’t accept a resolver so find one that does even if it means suppling a default OperationContent. For example:

var query = table.CreateQuery<Content>().Where(c => c.PartitionKey == yearMonth);
var results = query.ExecuteQuery(query.AsTableQuery(), contentResolver, myRequestOptions, myOperationContext);

Given that these entity resolvers are essential to correctly materializing your results without data loss it’s worth wrapping the CloudTable client with the necessary setup/table-creation/entity resolver.

[)amien

Quality of SSL protection for US financial institutions  

Troy Hunt put together a list of top Australian banks and their SSL rating using the Qualys SSL Server Test that reveals the somewhat depressing state of SSL security of various banks down under.

This got me wondering how US financial institutions stack up and I thought I’d share:


[)amien

Make Home & End keys behave like Windows on Mac OS X  

I’ve been using Mac OS X daily since 2001 when I purchased my Titanium PowerBook and I still can’t get used the home and end key behavior.

If, like me, you want Home to send you to the start of the line and not to the top of the document then create a file called DefaultKeyBinding.dict in your ~/Library/KeyBindings folder (might need to create that folder too) with the following contents:

{
    "\UF729"  = moveToBeginningOfParagraph:; // home
    "\UF72B"  = moveToEndOfParagraph:; // end
    "$\UF729" = moveToBeginningOfParagraphAndModifySelection:; // shift-home
    "$\UF72B" = moveToEndOfParagraphAndModifySelection:; // shift-end
    "^\UF729" = moveToBeginningOfDocument:; // ctrl-home
    "^\UF72B" = moveToEndOfDocument:; // ctrl-end
    "^$\UF729" = moveToBeginningOfDocumentAndModifySelection:; // ctrl-shift-home
    "^$\UF72B" = moveToEndOfDocumentAndModifySelection:; // ctrl-shift-end
}

This remapping does the following in most Mac apps including Chrome (some apps do their own key handling):

  • Home and End will go to start and end of line
  • ShiftHome and ShiftEnd will select to start and end of line
  • CtrlHome and CtrlEnd will go to start and end of document
  • ShiftCtrlHome and ShiftCtrlEnd will select to start and end of document

Note that you will need to reboot after creating this file for it to take effect.

Also: If you have a PC keyboard with LED backlighting and would like the scroll-lock, num-lock or caps-lock LEDs on when using your Mac check out my free SetLEDS for Mac

[)amien