Skip to content

Development articles

Random tips for PowerShell, Bash & AWS  

Now freelance again I find myself solving a variety of unusual issues many of which I could find no online solutions for.

Given these no doubt plague other developers let’s share!

Pass quoted args from BAT/CMD files to PowerShell

Grabbing args from a batch/command files is easy – just use %* – but have you ever tried passing them to PowerShell like:

powershell "Something" "%*"

Unfortunately if one of your arguments has quotes around it (a filename with a space perhaps) then it becomes two separate arguments. e.g. "My File.txt" now becomes My and File.txt.

PowerShell will only preserve is if you use the -f option (to run a .PS1 file) but that requires a relaxed policy via Set-ExecutionPolicy and so is a no-go for many people.

Given you can’t make PowerShell do the right thing with the args the trick here is to not pass them as args at all!

SET MYPSARGS=%*
...
powershell -ArgumentList "$env:MYPSARGS"

Get Bash script path as Windows path

While Cygwin ships with cygpath to convert /c/something to c:\\Something etc. MSYS Bash shells do not have this. However you can get it another way there:

#!/bin/sh
pushd "$(dirname "$0")" > /dev/null
if command -v "cygpath" > /dev/null; then
  WINPWD=""$(cygpath . -a -w)""
else
  WINPWD=""$(pwd -W)""
fi
popd > /dev/null
echo $WINPWD

This works by switching the working directory to the one the script is in "$(dirname "$0")" and then capturing the print-working-directory command output using the -W option that grabs it in Windows format. It then pops the working directory to make sure it goes back to where it was.

Note that this uses forward slashes as a directory separator still – a lot of stuff is okay with that but older apps and tools are not.

JSON encoding in API Gateway mapping templates

Using Amazon’s AWS Lambda you’ll also find yourself touching API Gateway and while most of it is great the mapping templates are quite deficient in that they do not encode output by default despite specifying the MIME types.

All of Amazon’s example templates are exploitable via JSON injection. Just put a double-quote in a field and start writing your own JSON payload.

Amazon must fix this – encode by default like other templating systems have done such as ASP.NET Razor. Until then some recommend the Amazon-provided $util.escapeJavaScript() however while it encodes " as \" it also produces illegal JSON by encoding ' as \' .

The mapping language is Apache Velocity Template Language (VTL) and while not extendable the fine-print reveals that it internally uses Java strings and does not sandbox us. This let’s us utilize Java’s replace functionality:

#set($i = $input.path('$'))
{
   "safeString": "$i.unsafeString.replaceAll("\""", "\\""")
}

Show active known IPs on local network

I’m surprised more people don’t know how useful arp -a is especially if you pipe it into ping…

Bash

arp -a | grep -o '[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}' | xargs -L1 ping -c 1 -t 1 | sed -n -e 's/^.*bytes from //p'

PowerShell

(arp -a) -match "dynamic" | Foreach { ping -n 1 -w 1000 ($_ -split "\s+")[1] } | where { $_ -match "Reply from " } | % { $_.replace("Reply from ","") }

Wrapping up

I just want to mention that if you are doing anything on a command-line be it Bash, OS X, PowerShell or Command/Batch then SS64 is a site worth visiting as they have great docs on many of these things!

[)amien

Time window events with Apache Spark Streaming  

If you’re working with Spark Streaming you might run into an interesting problem if you want to output an event based on a number of messages within a specific time period.

For example: I want to send a security alert if I see 10 DDOS attempts to an IP address in a 5 minute window.

groupByKeyAndWindow

groupByKeyAndWindow allows us to choose the IP address for the key and 5 minutes for the window. If we wanted to then collect the sourceIp and the timestamp it would look like this:

var messageLimit = 10
var messageWindow = Minutes(5)
val scc = new StreamingContext(conf, Minutes(1))

// ... setup Kafka consumer via SparkUtils
kafkaConsumer
    .flatMap(parseSecurityMessage)
    .filter(m => m.securityType == 'DDOS')
    .map(m => m.targetIp -> Seq((m.timestamp, m.sourceIp)))
    .reduceByKeyAndWindow({(x, y) => x ++ y}, messageWindow)
    .filter(g => g._2.length >= messageLimit)
    .foreachRDD(m => m.foreach(createAlertEvent))

scc.start()
scc.streamUntilTerminated()

Problem

The problem is your event will fire many times as the stateless RDD is re-run every batch period.

The simplest solution would be to make the batch interval the same as your message window size but that causes more problems, namely:

  • Your job can’t perform any other triggers on the source data at a shorter interval
  • You won’t know about these alerts until some time after they happen (in this case 5 minutes)

External would be terrible and neither Spark counters or globals are much use here.

Solution

We need to do two things:

  1. Stop the RDD re-running and instead use streaming state. We can do this by using the reduceByKeyAndWindow overload that allows us to specify the inverse function for removing data as it goes out of window.
  2. Introduce a small amount of in-RDD state that can be used to identify when the event is cleared and when it should fire again.

Let us assume there is a class to handle part 2 named WindowEventTrigger that provides add and remove methods as well as a boolean triggerNow flag that identifies when the event should re-fire. Our RDD body would now look like this:

kafkaConsumer
    .flatMap(parseSecurityMessage)
    .filter(m => m.securityType == 'DDOS')
    .map(m => m.targetIp -> WindowEventTrigger(Seq(m.timestamp, m.sourceIp), messageLimit))
    .reduceByKeyAndWindow(_ add _, _ remove _, messageWindow)
    .filter(_._2.triggerNow)
    .foreachRDD(m => m.foreach(createAlertEvent))

How this works is actually quite simple. We have a case class called WindowEventTrigger that we map into the stream for each incoming message, it then:

  1. Tracks incoming messages and if it hits the level sets the flag and makes note of the event
  2. Tracks outgoing messages and resets when the event that caused the trigger goes out of window

By switching to the in-memory groupByKeyAndWindow Spark will need to persist state in case executors go down or it is necessary to shuffle data between them. Ensure your SparkStreamingContext object has a checkpoint folder set to reliable storage like HDFS

WindowEventTrigger class

Here is the WindowEventTrigger class for your enjoyment.

case class WindowEventTrigger[T] private(eventsInWindow: Seq[T], triggerNow: Boolean, private val lastTriggeredEvent: Option[T], private val triggerLevel: Int) {
  def this(item: T, triggerLevel: Int) = this(Seq(item), false, None, triggerLevel)

  def add(incoming: WindowEventTrigger[T]): WindowEventTrigger[T] = {
    val combined = eventsInWindow ++ incoming.eventsInWindow
    val shouldTrigger = lastTriggeredEvent.isEmpty && combined.length >= triggerLevel
    val triggeredEvent = if (shouldTrigger) combined.seq.drop(triggerLevel - 1).headOption else lastTriggeredEvent
    new WindowEventTrigger(combined, shouldTrigger, triggeredEvent, triggerLevel)
  }

  def remove(outgoing: WindowEventTrigger[T]): WindowEventTrigger[T] = {
    val reduced = eventsInWindow.filterNot(y => outgoing.eventsInWindow.contains(y))
    val triggeredEvent = if (lastTriggeredEvent.isDefined && outgoing.eventsInWindow.contains(lastTriggeredEvent.get)) None else lastTriggeredEvent
    new WindowEventTrigger(reduced, false, triggeredEvent, triggerLevel)
  }
}

Happy streaming,

[)amien

Table per hierarchy in Azure Table Storage  

If you’re coming from an ORM background to Azure Table Storage you might be wondering how to map class hierarchies to tables.

Documentation on the topic is hard to find unless you know the magic class name EntityResolver which you can discover by digging into the Azure Client for .NET source code.

Let’s say we have a basic blog style system (minimal fields shown):

public class Content {
  public string Id { get; set; }
  public string Title { get; set }
}

public class BlogPost : Content {
  public List<string> Topics { get; set; }
}

public class Page : Content {
  public String Slug { get; set; }
}

The trick is to create an instance of EntityResolver where T is your base class, e.g. Content. Strangely EntityResolver’s signature requires T implement new() so you can’t make your base class abstract.

Firstly we need to add to our base class some kind of identifier for the type – in ORM terms this is referred to as a discriminator. Then we’d override that in the subtypes to ensure new instances get the correct type set on insertion.

public class Content {
  public string Id { get; set; }
  public string Title { get; set }
}

public class BlogPost : Content {
  public List<string> Topics { get; set; }
}

public class Page : Content {
  public String Slug { get; set; }
}

Let’s say we want to store all of these in a table called ‘content’. We would typically write a small helper class to handle the cloud table and storage, e.g.

public class Content {
  public string Id { get; set; }
  public string Title { get; set }
  public virtual string ContentType { get; set; }
}

public class BlogPost : Content {
  public List<string> Topics { get; set; }
  public override string ContentType {
    get { return "blog"; }
    set { }
  }
}

public class Page : Content {
  public String Slug { get; set; }
  public override string ContentType {
    get { return "page"; }
    set { }
  }
}

With just that change you can actually start inserting rows into Azure Table Storage but querying them back will always result in Content types and saving those back will result in data loss!

We can however help the CloudTable client materialize the correct results by creating an EntityResolver:

EntityResolver<Content> contentResolver = (partitionKey, rowKey, timestamp, properties, etag) => {
    var contentType = properties["ContentType"].StringValue;
    switch (contentType) {
        case "blog": return new BlogPost();
        case "page": return new Page();
        default: throw new NotSupportedException(String.Format("Unknown ContentType '{0}'", contentType));
    }
}

Which is then passed into operations that materialize results. Note that some signatures don’t accept a resolver so find one that does even if it means suppling a default OperationContent. For example:

var query = table.CreateQuery<Content>().Where(c => c.PartitionKey == yearMonth);
var results = query.ExecuteQuery(query.AsTableQuery(), contentResolver, myRequestOptions, myOperationContext);

Given that these entity resolvers are essential to correctly materializing your results without data loss it’s worth wrapping the CloudTable client with the necessary setup/table-creation/entity resolver.

[)amien

Sequence averages in Scala  

I’ve been learning Scala and decided to put together a C# to Scala cheat sheet. All is going pretty well but then I got stuck on the equivalent of Average.

Enumerable.Average in .NET calculates a mean average from your sequence by summing up all the values and counting them in a single pass then returning the sum divided by the count in a floating point format (or decimal).

The problem

Given that Scala has nothing built-in there are more than a few suggestions online that boil down to:

val average = seq.sum / seq.length

This has a few problems:

  1. Visiting a sequence twice can be inefficient
  2. Sum can overflow as it is the same type as the sequence
  3. Applied to an integer without casting it returns an integer average

A solution

Scala provides a useful high-order function called foldLeft. Its job is to take an initial state and a function then keep applying the function with each value to the state. So one more efficient solution to the problem is:

val average = seq.foldLeft((0.0, 1)) ((acc, i) => ((acc._1 + (i - acc._1) / acc._2), acc._2 + 1))._1

How does this work?

What we do here is calculate an average as we go, adding the new weighted average each time.

It achieves this by setting up a tuple to contain our initial state with (0.0, 1). This specifies our starting average of 0.0 and our starting position of 1.

The next part specifies the function that takes that state as acc (for accumulator) and the next value in the sequence as i and calculates our rolling average for each value and increases the position as it goes along.

Finally at the end of our call we specify ._1 which tells the compiler we want the first value from the tuple – the average – as we no longer care about the position.

If you wanted to make this function more reusable you could do this:

def average(s: Seq[Int]): Double = s.foldLeft((0.0, 1)) ((acc, i) => ((acc._1 + (i - acc._1) / acc._2), acc._2 + 1))._1

Be aware you might need multiple overloads for each numeric sequence type you want to be able to average given the lack of a common numeric trait that allows for the subtraction and division.

Precision and rounding

There is some slight variance in results between this approach and the total / count due to rounding precision. If you wanted to preserve that you could always add and then divide at the end still in a single pass much like .NET does but with Scala’s foldLeft rather than a foreach.

def average(s: Seq[Int]): Double = { val t = s.foldLeft((0.0, 0)) ((acc, i) => (acc._1 + i, acc._2 + 1)); t._1 / t._2 }

[)amien