Posts in category development - page 3

Time window events with Apache Spark Streaming

If you’re working with Spark Streaming you might run into an interesting problem if you want to output an event based on a number of messages within a specific time period.

For example: I want to send a security alert if I see 10 DDOS attempts to an IP address in a 5 minute window.

groupByKeyAndWindow

groupByKeyAndWindow allows us to choose the IP address for the key and 5 minutes for the window. If we wanted to then collect the sourceIp and the timestamp it would look like this:

var messageLimit = 10
var messageWindow = Minutes(5)
val scc = new StreamingContext(conf, Minutes(1))

// ... setup Kafka consumer via SparkUtils
kafkaConsumer
    .flatMap(parseSecurityMessage)
    .filter(m => m.securityType == 'DDOS')
    .map(m => m.targetIp -> Seq((m.timestamp, m.sourceIp)))
    .reduceByKeyAndWindow({(x, y) => x ++ y}, messageWindow)
    .filter(g => g._2.length >= messageLimit)
    .foreachRDD(m => m.foreach(createAlertEvent))

scc.start()
scc.streamUntilTerminated()

Problem

The problem is your event will fire many times as the stateless RDD is re-run every batch period.

The simplest solution would be to make the batch interval the same as your message window size but that causes more problems, namely:

  • Your job can’t perform any other triggers on the source data at a shorter interval
  • You won’t know about these alerts until some time after they happen (in this case 5 minutes)

External would be terrible and neither Spark counters or globals are much use here.

Solution

We need to do two things:

  1. Stop the RDD re-running and instead use streaming state. We can do this by using the reduceByKeyAndWindow overload that allows us to specify the inverse function for removing data as it goes out of window.
  2. Introduce a small amount of in-RDD state that can be used to identify when the event is cleared and when it should fire again.

Let us assume there is a class to handle part 2 named WindowEventTrigger that provides add and remove methods as well as a boolean triggerNow flag that identifies when the event should re-fire. Our RDD body would now look like this:

kafkaConsumer
    .flatMap(parseSecurityMessage)
    .filter(m => m.securityType == 'DDOS')
    .map(m => m.targetIp -> WindowEventTrigger(Seq(m.timestamp, m.sourceIp), messageLimit))
    .reduceByKeyAndWindow(_ add _, _ remove _, messageWindow)
    .filter(_._2.triggerNow)
    .foreachRDD(m => m.foreach(createAlertEvent))

How this works is actually quite simple. We have a case class called WindowEventTrigger that we map into the stream for each incoming message, it then:

  1. Tracks incoming messages and if it hits the level sets the flag and makes note of the event
  2. Tracks outgoing messages and resets when the event that caused the trigger goes out of window

By switching to the in-memory groupByKeyAndWindow Spark will need to persist state in case executors go down or it is necessary to shuffle data between them. Ensure your SparkStreamingContext object has a checkpoint folder set to reliable storage like HDFS

WindowEventTrigger class

Here is the WindowEventTrigger class for your enjoyment.

case class WindowEventTrigger[T] private(eventsInWindow: Seq[T], triggerNow: Boolean, private val lastTriggeredEvent: Option[T], private val triggerLevel: Int) {
  def this(item: T, triggerLevel: Int) = this(Seq(item), false, None, triggerLevel)

  def add(incoming: WindowEventTrigger[T]): WindowEventTrigger[T] = {
    val combined = eventsInWindow ++ incoming.eventsInWindow
    val shouldTrigger = lastTriggeredEvent.isEmpty && combined.length >= triggerLevel
    val triggeredEvent = if (shouldTrigger) combined.seq.drop(triggerLevel - 1).headOption else lastTriggeredEvent
    new WindowEventTrigger(combined, shouldTrigger, triggeredEvent, triggerLevel)
  }

  def remove(outgoing: WindowEventTrigger[T]): WindowEventTrigger[T] = {
    val reduced = eventsInWindow.filterNot(y => outgoing.eventsInWindow.contains(y))
    val triggeredEvent = if (lastTriggeredEvent.isDefined && outgoing.eventsInWindow.contains(lastTriggeredEvent.get)) None else lastTriggeredEvent
    new WindowEventTrigger(reduced, false, triggeredEvent, triggerLevel)
  }
}

Happy streaming,

[)amien

Sequence averages in Scala

I’ve been learning Scala and decided to put together a C# to Scala cheat sheet. All is going pretty well but then I got stuck on the equivalent of Average.

Enumerable.Average in .NET calculates a mean average from your sequence by summing up all the values and counting them in a single pass then returning the sum divided by the count in a floating point format (or decimal).

The problem

Given that Scala has nothing built-in there are more than a few suggestions online that boil down to:

val average = seq.sum / seq.length

This has a few problems:

  1. Visiting a sequence twice can be inefficient
  2. Sum can overflow as it is the same type as the sequence
  3. Applied to an integer without casting it returns an integer average

A solution

Scala provides a useful high-order function called foldLeft. Its job is to take an initial state and a function then keep applying the function with each value to the state. So one more efficient solution to the problem is:

val average = seq.foldLeft((0.0, 1)) ((acc, i) => ((acc._1 + (i - acc._1) / acc._2), acc._2 + 1))._1

How does this work?

What we do here is calculate an average as we go, adding the new weighted average each time.

It achieves this by setting up a tuple to contain our initial state with (0.0, 1). This specifies our starting average of 0.0 and our starting position of 1.

The next part specifies the function that takes that state as acc (for accumulator) and the next value in the sequence as i and calculates our rolling average for each value and increases the position as it goes along.

Finally at the end of our call we specify ._1 which tells the compiler we want the first value from the tuple – the average – as we no longer care about the position.

If you wanted to make this function more reusable you could do this:

def average(s: Seq[Int]): Double = s.foldLeft((0.0, 1)) ((acc, i) => ((acc._1 + (i - acc._1) / acc._2), acc._2 + 1))._1

Be aware you might need multiple overloads for each numeric sequence type you want to be able to average given the lack of a common numeric trait that allows for the subtraction and division.

Precision and rounding

There is some slight variance in results between this approach and the total / count due to rounding precision. If you wanted to preserve that you could always add and then divide at the end still in a single pass much like .NET does but with Scala’s foldLeft rather than a foreach.

def average(s: Seq[Int]): Double = { val t = s.foldLeft((0.0, 0)) ((acc, i) => (acc._1 + i, acc._2 + 1)); t._1 / t._2 }

[)amien

8 Visual Studio debugging tips – debug like a boss

There are so many useful debugging features built into Visual Studio that aren’t well-known. Here are a few my favorites including some recent finds in VS 2013.

1. Breakpoint inside a lambda

If you click the left gutter to set breakpoints you could be easily mislead into thinking breakpoints happen at line level.

You can actually insert a breakpoint inside parts of the line such as inside a lambda in your LINQ expression. Just right-click the part of the code and choose Breakpoint > Insert breakpoint from the context menu.

2. Usable output window

Visual Studio output window filtering optionsThe output window is useful for debugging where breakpoints would be too invasive or interrupt flow but it’s pretty noisy.

Just right-click in the output window (make sure output is set to debug) and turn off the Module Load, Module Unload, Process Exit and Thread Exit to leave you with stuff you actually care about. Now Debug.WriteLine to your heart’s content.

You can also press CtrlS in the output window to save the contents.

3. Attach debugger to client and server (VS 2012)

It’s useful to have both server and client projects in a single solution so you only need one copy of Visual Studio running and don’t get lost alt-tabbing back and forth especially if they share common code such as a data model project.

One disadvantage is that the start-up project is the only one to get a debugger attached. If you encounter an exception it will show in your client not your server project.

That’s easily solved now. Right-click on the solution, choose properties and choose Multiple startup projects then select the Start action for the projects you need to attach to.

Visual Studio Solution properties dialog

4. Create a repro project template

If you’re responsible for a SDK or API create a simple application that uses your stuff in a small self-contained way. Then use File > Export template… to save it.

Now you can create a new project from your template whenever you need it with a few clicks. Even better make it available to users and testers so they can send you minimal repros.

5. Use the DebuggerDisplay attribute

By default the debugger will use ToString() for watch and auto windows which normally outputs class name. Even if you overrode ToString it’s probably not what somebody debugging wants to see at a glance.

Add DebuggerDisplay to your class with a simple expression to evaluate properties instead. e.g.:

[DebuggerDisplay("Order {ID,nq}")
class Order {
    public string ID { get { return id; } }
}

The “nq” prevents double-quotes from being emitted. You can also use methods here too but don’t do anything with subtle side-effects otherwise your observation of the subject will change its behavior and could cause weird issues.

6. Manage breakpoints

You set-up some interesting breakpoints and now you need to switch one-off for as it’s getting hit too much but you’ll need it again in a minute. If you remove the breakpoint you’ll have to come back and find it again.

Enter the much-overlooked Breakpoints window CtrlAltB. This will show all breakpoints you have set but crucially lets you disable them without unsetting them by simply removing the check-mark. Check it again to re-enable it.

Visual Studio breakpoints window

This window also provides the ability to quickly:

  • Condition when a breakpoint should occur
  • Hit count to see how often it is hit and to only break based on that count
  • Label a breakpoint to allow toggling on and off in batches
  • When Hit to put a message in the output window instead of actually breaking

7. Break on or output the caller information (.NET 4.5/Windows 8 Store)

There isn’t a global variable for the current method of the caller and getting the current stack can be a very slow operation.

One quick and simple trick is to add an extra optional string parameter to the method with the CallerMemberName attribute. e.g.

void MyFunction(string someValue, [CallerMemberName] string caller = null) {
    ...
}

Because it is an optional value you don’t need to modify any callers but you can now:

  1. Set a breakpoint condition inside DoSomething based on the caller variable
  2. Output the contents of caller to a log or output window

You can also use CallerLineNumber and CallerFilePath. Also remember that constructors, finalizers and operator overloads will display their underlying method names (.ctor, op_Equals etc).</a>

8. See the value returned by a function (VS 2013, .NET 4.5.1/Windows 8.1 Store)

Visual Studio autos windowSometimes you want to see what a function returned but you can’t easily because you didn’t store the value because it was the input to another function.

This was added in VS 2013 but is incredibly easy to miss as you have to be in the right place at the right time. The right place is the Autos window and the right time is exactly the step that returned you to where the function was called from. You won’t see this before you call the function or while in the function. It’s there for a single step and looks like this:

The arrow icon indicates it’s a return value and it lets you know the name of the function alongside it.

Wrap up

I also can’t stress enough how useful having logs are for troubleshooting once the software leaves your machine but that’s a much bigger discussion than this one.

Am I missing some great debugging tips? Feel free to let me know below :)

PS: Michael Parshin has some great tips on debugging too.

[)amien

Designing a great API

Several years ago I worked on a payroll package developing a core engine that required an API to let third parties write calculations, validations and security gates that would execute as part of it’s regular operation.

We were a small team and I had many conversations with another developer tasked with building a payroll using the API I would provide. Some methods here, classes there, the odd helper function and I had an API and then we had a mini payroll running.

Then he showed me the code he had written and that smug grin dropped off my face. It was awful.

Perhaps this other developer wasn’t as great as I’d thought? Looking at the code though made me realize he had done the best anyone could with a terrible API. I’d exposed parts of this core payroll engine with hooks when it needed a decision. Its job was to run the payroll – a very complex task that involved storage, translation, time periods, users and companies. That complexity and context had leaked out.

Unfortunately it’s not a unique story – many API’s are terrible to use. They’re concerned with their own terminology, limitations and quirks because they are exposed sections of an underlying system developed by those responsible for the underlying system.

If you want others to have a good experience with your product you have to put yourself in their shoes. Whether it’s a UI or an API makes no difference.

You are not the user

That’s the real difference between writing the classes that form your regular implementation and those that make up your public API.

We had time to fix our payroll API. Instead of refining and polishing here and there we took the 20 or so snippets developed for the mini payroll and pruned, cleaned and polished until they looked beautiful. They scanned well and made sense to payroll developers unfamiliar with our package. When a third developer familiar with payrolls but unfamiliar with out package developed the necessary code for a fully-functional jurisdiction in record time with minimal assistance we knew we had hit our goal.

Sure implementing that new API was hard work. Instead of simple methods sticking out of the engine we had a facade over our engine but it was justified. They were two different systems for two different types of user with distinct ideas about what the system was and how it was going to be used.

Code First

Many years later I found myself on a small team of 3 people tasked with putting a brand new API on top of Entity Framework for configuring models with code the .NET world would come to know as Code First. I was determined to use my experience and avoid another complex API surface littered with terminology and leaky abstractions. Parts of EF already suffered from that problem.

So for the first few weeks of that project we didn’t write any of the code that would in fact become Code First.

Instead we decided who our user was – in this case a C# developer who likes writing code, knows LINQ and some database concepts but doesn’t know Entity Framework as people who did were already using Model First or Database First.

Then we wrote tiny sample apps and tried to find simpler and simpler ways to describe them in code. We’d often start on a whiteboard with a scenario and write the complete mapping. We’d then try and find conventions that would remove the need for most of it and then try to write succinct code to configure the rest. As the newest guy to the team I’d fight to keep EF terms away from the main API surface in order to reduce that barrier to entry and help drive adoption.

Finally we’d hit the computer and develop stub classes and methods to make samples compile and let us try the IntelliSense. This isn’t always necessary but if you want to develop a fluent API or provide lots of type-safety such as Code First’s relationship mapping it’s highly recommended.

We’d then revisit the samples later and see if they could be read as easily as they were written and figure out what problems people were likely to run into and whether we could solve them without too much noise. Sometimes this meant having more than one way to do things such as chaining the fluent methods or allowing a bunch of properties to be set (solved with an extension method class providing the fluent API) and how users could manage larger models (solved by sub-classing EntityConfiguration – now EntityTypeConfiguration sigh – and allowing redundant specification for things like relationships that span more than one class).

We finally ended up with succinct code like this with IntelliSense guiding you along the way and preventing you from even being able to specify invalid combinations. The HasMany prompts the properties on Customer and it won’t show you WithRequired unless it is valid. In the case of Required to Required it will ensure that the WithRequired specified which end is principle and dependent. In short it guides you through the process and results in highly readable code.

Entity<Customer>().HasMany(c => c.Orders).WithRequired(o => o.Customer).WillCascadeOnDelete();

This process took a little longer but given the amount of use the API will get that time will be saved by users countless times over.

Code First went down incredibly well with both the target audience and existing EF users and inspired the simpler DbContext interface that became the recommended way of accessing EF.

I think it’s one of the nicer APIs to come out of Microsoft and .NET.

[)amien

PS. Martin Fowler has some great guidance in his book Domain Specific Languages.