Skip to content

Development articles

Download files with progress in Electron via window.fetch  

Working on Atom lately I need to be able to download files to disk. We have a couple of ways to do this today but they do not show download progress which leads to confusion and sometimes frustration on larger downloads such as updates or big packages.

There are many npm libraries out there but they either don’t expose a progress indicator or they bypass Chrome (thus not using proxy settings, caching and network inspector) by using node directly.

I’m also not a fan of sprawling dependencies to achieve what can be done simply in a function or two.

Hello window.fetch

window.fetch is a replacement for XMLHttpRequest currently shipping in Chrome (and therefore Electron) as well as a whatWG living standard. While there is some documentation around most of it relies on grabbing the entire content as JSON, a blob or text. This is not advised for streaming where the files might be large and you want to not only minimize memory impact but also display a progress indicator to your users.

Thankfully window.fetch has a getReader() function that will give you a ReadableStreamReader although this reads in chunks (32KB on my machine) and isn’t compatible with Node’s streams, pipes and data events.

Download function

With a little work though we can wire these two things up to get us a file downloader that has no extra dependencies outside of Electron, honors the Chrome cache, proxy and network inspector and best of all is incredibly easy to use;

import fs from 'fs';

export default async function download(sourceUrl, targetFile, progressCallback, length) {
  const request = new Request(sourceUrl, {
    headers: new Headers({'Content-Type': 'application/octet-stream'})
  });

  const response = await fetch(request);
  if (!response.ok) throw Error(`Unable to download, server returned ${response.status} ${response.statusText}`);

  const body = response.body;
  if (body == null) throw Error('No response body');

  const finalLength = length || parseInt(response.headers.get('Content-Length' || '0'), 10);
  const reader = body.getReader();
  const writer = fs.createWriteStream(targetFile);

  await streamWithProgress(finalLength, reader, writer, progressCallback);
  writer.end();
}

async function streamWithProgress(length, reader, writer, progressCallback) {
  let bytesDone = 0;

  while (true) {
    const result = await reader.read();
    if (result.done) {
      if (progressCallback != null) {
        progressCallback(length, 100);
      }
      return;
    }

    writer.write(Buffer.from(chunk));
    if (progressCallback != null) {
      bytesDone += chunk.byteLength;
      const percent = length === 0 ? null : Math.floor(bytesDone / length * 100);
      progressCallback(bytesDone, percent);
    }
  }
}

A FlowType annotated version is also available.

Using it

Using it is simplicity – call it with a URL to download and a local file name to save it as along with an optional callback that will receive download progress.

Downloader.download('https://download.damieng.com/fonts/original/EnvyCodeR-PR7.zip', 'envy-code-r.zip', 
   (bytes, percent) => console.log(`Downloaded ${bytes} (${percent})`));

Caveats

Some servers do not send the Content-Length header. You have two options if this applies to you;

  1. Don’t display a percentage just the KB downloaded count (percentage will be null in the callback)
  2. Bake-in the file size if it’s a static URL – just pass in as final parameter to the download function

Enjoy!

[)amien

Random tips for PowerShell, Bash & AWS  

Now freelance again I find myself solving a variety of unusual issues many of which I could find no online solutions for.

Given these no doubt plague other developers let’s share!

Pass quoted args from BAT/CMD files to PowerShell

Grabbing args from a batch/command files is easy – just use %* – but have you ever tried passing them to PowerShell like:

powershell "Something" "%*"

Unfortunately if one of your arguments has quotes around it (a filename with a space perhaps) then it becomes two separate arguments. e.g. "My File.txt" now becomes My and File.txt.

PowerShell will only preserve is if you use the -f option (to run a .PS1 file) but that requires a relaxed policy via Set-ExecutionPolicy and so is a no-go for many people.

Given you can’t make PowerShell do the right thing with the args the trick here is to not pass them as args at all!

SET MYPSARGS=%*
...
powershell -ArgumentList "$env:MYPSARGS"

Get Bash script path as Windows path

While Cygwin ships with cygpath to convert /c/something to c:\\Something etc. MSYS Bash shells do not have this. However you can get it another way there:

#!/bin/sh
pushd "$(dirname "$0")" > /dev/null
if command -v "cygpath" > /dev/null; then
  WINPWD=""$(cygpath . -a -w)""
else
  WINPWD=""$(pwd -W)""
fi
popd > /dev/null
echo $WINPWD

This works by switching the working directory to the one the script is in "$(dirname "$0")" and then capturing the print-working-directory command output using the -W option that grabs it in Windows format. It then pops the working directory to make sure it goes back to where it was.

Note that this uses forward slashes as a directory separator still – a lot of stuff is okay with that but older apps and tools are not.

JSON encoding in API Gateway mapping templates

Using Amazon’s AWS Lambda you’ll also find yourself touching API Gateway and while most of it is great the mapping templates are quite deficient in that they do not encode output by default despite specifying the MIME types.

All of Amazon’s example templates are exploitable via JSON injection. Just put a double-quote in a field and start writing your own JSON payload.

Amazon must fix this – encode by default like other templating systems have done such as ASP.NET Razor. Until then some recommend the Amazon-provided $util.escapeJavaScript() however while it encodes " as \" it also produces illegal JSON by encoding ' as \' .

The mapping language is Apache Velocity Template Language (VTL) and while not extendable the fine-print reveals that it internally uses Java strings and does not sandbox us. This let’s us utilize Java’s replace functionality:

#set($i = $input.path('$'))
{
   "safeString": "$i.unsafeString.replaceAll("\""", "\\""")
}

Show active known IPs on local network

I’m surprised more people don’t know how useful arp -a is especially if you pipe it into ping…

Bash

arp -a | grep -o '[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}' | xargs -L1 ping -c 1 -t 1 | sed -n -e 's/^.*bytes from //p'

PowerShell

(arp -a) -match "dynamic" | Foreach { ping -n 1 -w 1000 ($_ -split "\s+")[1] } | where { $_ -match "Reply from " } | % { $_.replace("Reply from ","") }

Wrapping up

I just want to mention that if you are doing anything on a command-line be it Bash, OS X, PowerShell or Command/Batch then SS64 is a site worth visiting as they have great docs on many of these things!

[)amien

Time window events with Apache Spark Streaming  

If you’re working with Spark Streaming you might run into an interesting problem if you want to output an event based on a number of messages within a specific time period.

For example: I want to send a security alert if I see 10 DDOS attempts to an IP address in a 5 minute window.

groupByKeyAndWindow

groupByKeyAndWindow allows us to choose the IP address for the key and 5 minutes for the window. If we wanted to then collect the sourceIp and the timestamp it would look like this:

var messageLimit = 10
var messageWindow = Minutes(5)
val scc = new StreamingContext(conf, Minutes(1))

// ... setup Kafka consumer via SparkUtils
kafkaConsumer
    .flatMap(parseSecurityMessage)
    .filter(m => m.securityType == 'DDOS')
    .map(m => m.targetIp -> Seq((m.timestamp, m.sourceIp)))
    .reduceByKeyAndWindow({(x, y) => x ++ y}, messageWindow)
    .filter(g => g._2.length >= messageLimit)
    .foreachRDD(m => m.foreach(createAlertEvent))

scc.start()
scc.streamUntilTerminated()

Problem

The problem is your event will fire many times as the stateless RDD is re-run every batch period.

The simplest solution would be to make the batch interval the same as your message window size but that causes more problems, namely:

  • Your job can’t perform any other triggers on the source data at a shorter interval
  • You won’t know about these alerts until some time after they happen (in this case 5 minutes)

External would be terrible and neither Spark counters or globals are much use here.

Solution

We need to do two things:

  1. Stop the RDD re-running and instead use streaming state. We can do this by using the reduceByKeyAndWindow overload that allows us to specify the inverse function for removing data as it goes out of window.
  2. Introduce a small amount of in-RDD state that can be used to identify when the event is cleared and when it should fire again.

Let us assume there is a class to handle part 2 named WindowEventTrigger that provides add and remove methods as well as a boolean triggerNow flag that identifies when the event should re-fire. Our RDD body would now look like this:

kafkaConsumer
    .flatMap(parseSecurityMessage)
    .filter(m => m.securityType == 'DDOS')
    .map(m => m.targetIp -> WindowEventTrigger(Seq(m.timestamp, m.sourceIp), messageLimit))
    .reduceByKeyAndWindow(_ add _, _ remove _, messageWindow)
    .filter(_._2.triggerNow)
    .foreachRDD(m => m.foreach(createAlertEvent))

How this works is actually quite simple. We have a case class called WindowEventTrigger that we map into the stream for each incoming message, it then:

  1. Tracks incoming messages and if it hits the level sets the flag and makes note of the event
  2. Tracks outgoing messages and resets when the event that caused the trigger goes out of window

By switching to the in-memory groupByKeyAndWindow Spark will need to persist state in case executors go down or it is necessary to shuffle data between them. Ensure your SparkStreamingContext object has a checkpoint folder set to reliable storage like HDFS

WindowEventTrigger class

Here is the WindowEventTrigger class for your enjoyment.

case class WindowEventTrigger[T] private(eventsInWindow: Seq[T], triggerNow: Boolean, private val lastTriggeredEvent: Option[T], private val triggerLevel: Int) {
  def this(item: T, triggerLevel: Int) = this(Seq(item), false, None, triggerLevel)

  def add(incoming: WindowEventTrigger[T]): WindowEventTrigger[T] = {
    val combined = eventsInWindow ++ incoming.eventsInWindow
    val shouldTrigger = lastTriggeredEvent.isEmpty && combined.length >= triggerLevel
    val triggeredEvent = if (shouldTrigger) combined.seq.drop(triggerLevel - 1).headOption else lastTriggeredEvent
    new WindowEventTrigger(combined, shouldTrigger, triggeredEvent, triggerLevel)
  }

  def remove(outgoing: WindowEventTrigger[T]): WindowEventTrigger[T] = {
    val reduced = eventsInWindow.filterNot(y => outgoing.eventsInWindow.contains(y))
    val triggeredEvent = if (lastTriggeredEvent.isDefined && outgoing.eventsInWindow.contains(lastTriggeredEvent.get)) None else lastTriggeredEvent
    new WindowEventTrigger(reduced, false, triggeredEvent, triggerLevel)
  }
}

Happy streaming,

[)amien

Table per hierarchy in Azure Table Storage  

If you’re coming from an ORM background to Azure Table Storage you might be wondering how to map class hierarchies to tables.

Documentation on the topic is hard to find unless you know the magic class name EntityResolver which you can discover by digging into the Azure Client for .NET source code.

Let’s say we have a basic blog style system (minimal fields shown):

public class Content {
  public string Id { get; set; }
  public string Title { get; set }
}

public class BlogPost : Content {
  public List<string> Topics { get; set; }
}

public class Page : Content {
  public String Slug { get; set; }
}

The trick is to create an instance of EntityResolver where T is your base class, e.g. Content. Strangely EntityResolver’s signature requires T implement new() so you can’t make your base class abstract.

Firstly we need to add to our base class some kind of identifier for the type – in ORM terms this is referred to as a discriminator. Then we’d override that in the subtypes to ensure new instances get the correct type set on insertion.

public class Content {
  public string Id { get; set; }
  public string Title { get; set }
}

public class BlogPost : Content {
  public List<string> Topics { get; set; }
}

public class Page : Content {
  public String Slug { get; set; }
}

Let’s say we want to store all of these in a table called ‘content’. We would typically write a small helper class to handle the cloud table and storage, e.g.

public class Content {
  public string Id { get; set; }
  public string Title { get; set }
  public virtual string ContentType { get; set; }
}

public class BlogPost : Content {
  public List<string> Topics { get; set; }
  public override string ContentType {
    get { return "blog"; }
    set { }
  }
}

public class Page : Content {
  public String Slug { get; set; }
  public override string ContentType {
    get { return "page"; }
    set { }
  }
}

With just that change you can actually start inserting rows into Azure Table Storage but querying them back will always result in Content types and saving those back will result in data loss!

We can however help the CloudTable client materialize the correct results by creating an EntityResolver:

EntityResolver<Content> contentResolver = (partitionKey, rowKey, timestamp, properties, etag) => {
    var contentType = properties["ContentType"].StringValue;
    switch (contentType) {
        case "blog": return new BlogPost();
        case "page": return new Page();
        default: throw new NotSupportedException(String.Format("Unknown ContentType '{0}'", contentType));
    }
}

Which is then passed into operations that materialize results. Note that some signatures don’t accept a resolver so find one that does even if it means suppling a default OperationContent. For example:

var query = table.CreateQuery<Content>().Where(c => c.PartitionKey == yearMonth);
var results = query.ExecuteQuery(query.AsTableQuery(), contentResolver, myRequestOptions, myOperationContext);

Given that these entity resolvers are essential to correctly materializing your results without data loss it’s worth wrapping the CloudTable client with the necessary setup/table-creation/entity resolver.

[)amien