Posts in category development - page 4

Download files with progress in Electron via window.fetch

Working on Atom lately, I need to be able to download files to disk. We have ways to achieve this, but they do not show the download progress. This leads to confusion and sometimes frustration on larger downloads such as updates or large packages.

There are many npm libraries out there, but they either don’t expose a progress indicator, or they bypass Chrome (thus not using proxy settings, caching and network inspector) by using Node directly.

I’m also not a fan of sprawling dependencies to achieve what can be done simply in a function or two.

Hello window.fetch

window.fetch is a replacement for XMLHttpRequest currently shipping in Chrome (and therefore Electron) as well as a whatWG living standard. There is some documentation around, but most of it grabs the entire content as JSON, a blob, or text which is not advisable for streaming where the files might be large. You want to not only minimize memory impact but also display a progress indicator to your users.

Thankfully window.fetch has a getReader() function that gives you a ReadableStreamReader that reads in chunks (32KB on my machine) but isn’t compatible with Node’s streams, pipes, and data events.

Download function

With a little effort, we can wire these two things up to get us a file downloader that has no extra dependencies outside of Electron, honours the Chrome cache, proxy and network inspector and best of all, is incredibly easy to use;

import fs from 'fs';

export default async function download(sourceUrl, targetFile, progressCallback, length) {
  const request = new Request(sourceUrl, {
    headers: new Headers({'Content-Type': 'application/octet-stream'})
  });

  const response = await fetch(request);
  if (!response.ok) {
    throw Error(`Unable to download, server returned ${response.status} ${response.statusText}`);
  }

  const body = response.body;
  if (body == null) {
    throw Error('No response body');
  }

  const finalLength = length || parseInt(response.headers.get('Content-Length' || '0'), 10);
  const reader = body.getReader();
  const writer = fs.createWriteStream(targetFile);

  await streamWithProgress(finalLength, reader, writer, progressCallback);
  writer.end();
}

async function streamWithProgress(length, reader, writer, progressCallback) {
  let bytesDone = 0;

  while (true) {
    const result = await reader.read();
    if (result.done) {
      if (progressCallback != null) {
        progressCallback(length, 100);
      }
      return;
    }
    
    const chunk = result.value;
    if (chunk == null) {
      throw Error('Empty chunk received during download');
    } else {
      writer.write(Buffer.from(chunk));
      if (progressCallback != null) {
        bytesDone += chunk.byteLength;
        const percent = length === 0 ? null : Math.floor(bytesDone / length * 100);
        progressCallback(bytesDone, percent);
      }
    }
  }
}

A FlowType annotated version is also available.

Using it

Using it is simplicity. Call it with a URL to download and a local file name to save it, and an optional callback to receive download progress.

Downloader.download('https://download.damieng.com/fonts/original/EnvyCodeR-PR7.zip', 'envy-code-r.zip', (bytes, percent) => console.log(`Downloaded ${bytes} (${percent})`));

Caveats

Some servers do not send the Content-Length header. You have two options if this applies to you;

  1. Don’t display a percentage - just the KB downloaded count (the percentage is null in the callback)
  2. Bake-in the file size if it’s a static URL - pass it in as the final parameter to the download function

Enjoy!

[)amien

Random tips for PowerShell, Bash & AWS

Now that I am again freelancing, I find myself solving unusual issues, many of which had no online solutions.

Given these no doubt plague other developers, let’s share!

Pass quoted args from BAT/CMD files to PowerShell

Grabbing args from a batch/command files is easy – use %* – but have you ever tried passing them to PowerShell like:

powershell "Something" "%*"

Unfortunately, if one of your arguments has quotes around it (a filename containing a space perhaps), it becomes two separate arguments. e.g. "My File.txt" now becomes My and File.txt.

PowerShell will only preserve it if you use the -f option (to run a .PS1 file) and that requires a relaxed policy via Set-ExecutionPolicy so is a no-go for many people.

Given you can’t make PowerShell do the right thing with the args the trick here is - to not pass them as args at all!

SET MYPSARGS=%*
...
powershell -ArgumentList "$env:MYPSARGS"

Get Bash script path as Windows path

While Cygwin ships with cygpath to convert /c/something to c:\Something etc. MSYS Bash shells do not have this. You can get it another way there however:

#!/bin/sh
pushd "$(dirname "$0")" > /dev/null
if command -v "cygpath" > /dev/null; then
  WINPWD=""$(cygpath . -a -w)""
else
  WINPWD=""$(pwd -W)""
fi
popd > /dev/null
echo $WINPWD

This solution works by switching the working directory to the one the script is in "$(dirname "$0")" and then capturing the print-working-directory command output using the -W option that grabs it in Windows format. It then pops the working directory to make sure it goes back to where it was.

Note that this uses forward slashes as a directory separator still. Many tools and apps are okay with that but some older ones are not.

JSON encoding in API Gateway mapping templates

If you use Amazon’s AWS Lambda you’ll also find yourself touching API Gateway. While most of it is great, the mapping templates are deficient in that they do not encode output by default despite specifying the MIME types.

All of Amazon’s example templates are exploitable via JSON injection. Just put a double-quote in a field and start writing any JSON payload.

Amazon must fix this – encode by default like other templating systems have done, such as ASP.NET Razor. Until then some recommend the Amazon-provided $util.escapeJavaScript() however while it encodes " as \" it also produces illegal JSON by encoding ' as \' .

The mapping language is Apache Velocity Template Language (VTL), and while not extendable, the fine print reveals that it internally uses Java strings and does not sandbox them which let’s us utilize Java’s replace functionality:

#set($i = $input.path('$'))
{
   "safeString": "$i.unsafeString.replaceAll("\""", "\\""")
}

Show active known IPs on the local network

I’m surprised more people don’t know how useful arp -a is, especially if you pipe it into ping…

Bash

arp -a | grep -o '[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}' | xargs -L1 ping -c 1 -t 1 | sed -n -e 's/^.*bytes from //p'

PowerShell

(arp -a) -match "dynamic" | Foreach { ping -n 1 -w 1000 ($_ -split "\s+")[1] } | where { $_ -match "Reply from " } | % { $_.replace("Reply from ","") }

Wrapping up

I just want to mention that if you are doing anything on a command-line, be it Bash, OS X, PowerShell or Command/Batch then SS64 is a site worth visiting as they have great docs on many of these things!

[)amien

Time window events with Apache Spark Streaming

If you’re working with Spark Streaming, you might run into an interesting problem if you want to output an event based on multiple messages within a specific time period.

For example, I want to send a security alert if I see 10 DDOS attempts to an IP address in a five-minute window.

groupByKeyAndWindow

groupByKeyAndWindow allows us to choose the IP address for the key and 5 minutes for the window. If we wanted to subsequently collect the sourceIp and the timestamp, it looks like this:

var messageLimit = 10
var messageWindow = Minutes(5)
val scc = new StreamingContext(conf, Minutes(1))

// ... setup Kafka consumer via SparkUtils
kafkaConsumer
    .flatMap(parseSecurityMessage)
    .filter(m => m.securityType == 'DDOS')
    .map(m => m.targetIp -> Seq((m.timestamp, m.sourceIp)))
    .reduceByKeyAndWindow({(x, y) => x ++ y}, messageWindow)
    .filter(g => g._2.length >= messageLimit)
    .foreachRDD(m => m.foreach(createAlertEvent))

scc.start()
scc.streamUntilTerminated()

Problem

The problem is your event fires many times as the stateless RDD is re-run every batch period.

The simplest solution would be to make the batch interval the same as your message window size, but that causes more problems, namely:

  • Your job can’t perform any other triggers on the source data at a shorter interval
  • You won’t know about these alerts until some time after they happen (in this case 5 minutes)

External would be terrible, and neither Spark counters nor globals are much use here.

Solution

We need to do two things:

  1. Stop the RDD re-running and instead use the streaming state. We can do this by using the reduceByKeyAndWindow overload that allows us to specify the inverse function for removing data as it goes out of the window.
  2. Introduce a small amount of in-RDD state used to identify when the event is clear and when it should fire again.

Let us assume we have a class to handle part 2 named WindowEventTrigger that provides add and remove methods and a boolean triggerNow flag that identifies when the event should re-fire. Our RDD body would now look like this:

kafkaConsumer
    .flatMap(parseSecurityMessage)
    .filter(m => m.securityType == 'DDOS')
    .map(m => m.targetIp -> WindowEventTrigger(Seq(m.timestamp, m.sourceIp), messageLimit))
    .reduceByKeyAndWindow(_ add _, _ remove _, messageWindow)
    .filter(_._2.triggerNow)
    .foreachRDD(m => m.foreach(createAlertEvent))

How this works is quite simple. We have a case class called WindowEventTrigger that we map into the stream for each incoming message. It then:

  1. Tracks incoming messages - if it hits the level, sets the flag, and notes the event
  2. Tracks outgoing messages - and resets when the event that caused the trigger leaves the window

By switching to the in-memory `groupByKeyAndWindow`, Spark needs to persist state in case executors go down or it is necessary to shuffle data between them. Ensure your SparkStreamingContext object has a checkpoint folder set to reliable storage like HDFS.

WindowEventTrigger class

Here is the WindowEventTrigger class for your utilisation.

case class WindowEventTrigger[T] private(eventsInWindow: Seq[T], triggerNow: Boolean, private val lastTriggeredEvent: Option[T], private val triggerLevel: Int) {
  def this(item: T, triggerLevel: Int) = this(Seq(item), false, None, triggerLevel)

  def add(incoming: WindowEventTrigger[T]): WindowEventTrigger[T] = {
    val combined = eventsInWindow ++ incoming.eventsInWindow
    val shouldTrigger = lastTriggeredEvent.isEmpty && combined.length >= triggerLevel
    val triggeredEvent = if (shouldTrigger) combined.seq.drop(triggerLevel - 1).headOption else lastTriggeredEvent
    new WindowEventTrigger(combined, shouldTrigger, triggeredEvent, triggerLevel)
  }

  def remove(outgoing: WindowEventTrigger[T]): WindowEventTrigger[T] = {
    val reduced = eventsInWindow.filterNot(y => outgoing.eventsInWindow.contains(y))
    val triggeredEvent = if (lastTriggeredEvent.isDefined && outgoing.eventsInWindow.contains(lastTriggeredEvent.get)) None else lastTriggeredEvent
    new WindowEventTrigger(reduced, false, triggeredEvent, triggerLevel)
  }
}

Happy streaming,

[)amien

Sequence averages in Scala

I’ve been learning Scala and decided to put together a C# to Scala cheat sheet. All is going pretty well but then I got stuck on the equivalent of Average.

Enumerable.Average in .NET calculates a mean average from your sequence by summing up all the values and counting them in a single pass then returning the sum divided by the count in a floating point format (or decimal).

The problem

Given that Scala has nothing built-in there are more than a few suggestions online that boil down to:

val average = seq.sum / seq.length

This has a few problems:

  1. Visiting a sequence twice can be inefficient
  2. Sum can overflow as it is the same type as the sequence
  3. Applied to an integer without casting it returns an integer average

A solution

Scala provides a useful high-order function called foldLeft. Its job is to take an initial state and a function then keep applying the function with each value to the state. So one more efficient solution to the problem is:

val average = seq.foldLeft((0.0, 1)) ((acc, i) => ((acc._1 + (i - acc._1) / acc._2), acc._2 + 1))._1

How does this work?

What we do here is calculate an average as we go, adding the new weighted average each time.

It achieves this by setting up a tuple to contain our initial state with (0.0, 1). This specifies our starting average of 0.0 and our starting position of 1.

The next part specifies the function that takes that state as acc (for accumulator) and the next value in the sequence as i and calculates our rolling average for each value and increases the position as it goes along.

Finally at the end of our call we specify ._1 which tells the compiler we want the first value from the tuple – the average – as we no longer care about the position.

If you wanted to make this function more reusable you could do this:

def average(s: Seq[Int]): Double = s.foldLeft((0.0, 1)) ((acc, i) => ((acc._1 + (i - acc._1) / acc._2), acc._2 + 1))._1

Be aware you might need multiple overloads for each numeric sequence type you want to be able to average given the lack of a common numeric trait that allows for the subtraction and division.

Precision and rounding

There is some slight variance in results between this approach and the total / count due to rounding precision. If you wanted to preserve that you could always add and then divide at the end still in a single pass much like .NET does but with Scala’s foldLeft rather than a foreach.

def average(s: Seq[Int]): Double = { val t = s.foldLeft((0.0, 0)) ((acc, i) => (acc._1 + i, acc._2 + 1)); t._1 / t._2 }

[)amien