Combining Sequential Streams with Node.js and Express

Asynchronously streaming bytes of data was perhaps the single most important feature of Node.js when it launched in 2009.

Combining Sequential Streams with Node.js and Express

Asynchronously streaming bytes of data was perhaps the single most important feature of Node.js when it launched in 2009. Node provided a brilliant API that allowed developers to stream bytes through a pipeline of operations and never block the main thread. Streams back everything from http requests and responses, to child processes and a lot more.

The primary benefit of streams is they remove the need to buffer large amounts of data in memory and then perform operations once that data is fully available. With streams you can effeciently operate on data in small chunks and push it through a pipeline where you're only ever consuming a small fraction of the total data in your currently running process. For example, let's say you wanted to build a basic proxy server in Node.js for large video files. When a request comes in for a particular file, you do not want the process to download the entire video into memory and then begin returning the video file to the client. Instead, its much more effecient to download the file in small chunks and send those chunks to the client as soon as they are received by the Node.js process. Once the chunks are written to the client Node can evict them from memory allowing us to handle many concurrent streams of data at once.

In order to implement this http proxy server lets start by building a basic async http request function that returns a stream:

const https = require('https')

function fetch(src, options = {}) {
  return new Promise((resolve, reject) => {
    const url = new URL(src)
    const options = {
      hostname: url.hostname,
      port: 443,
      path: url.pathname,
      method: 'GET',
      ...options
    }
    const req = https.request(options, (res) => {
      if (res.statusCode >= 300) {
        const error = new Error(res.statusMessage || 'Invalid file')
        reject(error)
        return
      }
      resolve({
        res,
        url: url.href,
        headers: res.headers
      })
    })
    req.end()
  })
}
It might not be immediately obvious, but the resolved `res` is a Node.js Readable Stream

Next lets create a small express app that accepts all traffic and proxies to a pre-defined origin:

const express = require('express')
const app = express()
const origin = 'https://example.com'

app.get('/*', async (req, res) => {
    const proxy = await fetch(`${origin}${req.path}`)
    res.writeHead(proxy.res.statusCode)
    proxy.res.pipe(res)
})

app.listen(process.env.PORT)
For a code-complete proxy we should also set the same response headers

Here's a breakdown of what this proxy route is doing:

  1. Issue a request to the defined origin + request path
  2. Write the same status code to our response
  3. Pipe the response from the fetch request to the current response stream

At this point we can efficiently stream a single file from an origin, through our Node.js proxy, and back to the client. To make things more interesting, let's say we are a video provider that is required to insert ads into the middle of our video streams. In order to accommodate this, the video team saves our video files in chunks, split up depending on where we want to insert ads. For example, lets say we have a video file that is 1 hour long, and we want to insert an ad at 20 min, and 40 min into the video. Our video team would save 3 files:

  1. https://cdn.com/video_0_20.mp4
  2. https://cdn.com/video_20_40.mp4
  3. https://cdn.com/video_40_60.mp4

In addition to this, our advertising team has provided us urls for video ads:

  1. https://cdn.com/ad_1.mp4
  2. https://cdn.com/ad_2.mp4

Our job is to combine these 5 files into a single video stream. Of course we could have our video team manually edit these files together and store them as one file, but lets use our knowledge of Node.js streams and combine these files on the fly.

First lets gather our video files into an array in the exact order we want to stich them together:

const urls = [
  'https://cdn.com/video_0_20.mp4',
  'https://cdn.com/ad_1.mp4',
  'https://cdn.com/video_20_40.mp4',
  'https://cdn.com/video_40_60.mp4',
  'https://cdn.com/ad_2.mp4'
]

Next, lets issue a request for each video file:

const reqs = await Promise.all(urls.map(fetch))
const streams = reqs.map(req => req.res)

Now we need to create a new function that can combine our array of streams into a single stream, maintaining the order of the bytes:

const { PassThrough } = require('stream')

function combineStreams(streams) {
  const stream = new PassThrough()
  _combineStreams(streams, stream).catch((err) => stream.destroy(err))
  return stream
}

async function _combineStreams(sources, destination) {
  for (const stream of sources) {
    await new Promise((resolve, reject) => {
      stream.pipe(destination, { end: false })
      stream.on('end', resolve)
      stream.on('error', reject)
    })
  }
  destination.emit('end')
}

Notice we need two functions to implement this correctly. The first function creates a new PassThrough stream which acts as a single container for piping the other streams into. The second function is responsible for actually writing the bytes of the source streams into the destination stream. Aside from error handling, the most important thing to remember is that pipe will automatically end the destination stream when called, but lucky for us, Node provides an option to disable that behavior allowing us to continue writing bytes to the stream. Finally, once we loop through all stream we can manually call end on the destination stream.

Our complete Express route now can now be implemented like so:

app.get('/video-ssai.mp4', async (req, res) => {
    const reqs = await Promise.all(urls.map(fetch))
    const streams = reqs.map(req => req.res)
    const combined = combineStreams(streams)
    res.writeHead(200)
    combined.pipe(res)
})

Wrapping Up

Combining streams in Node.js is an extremely powerful concept that can be used to accomplish sophisticated data workflows, server-side ad insertion being just one of many. One important feature missing from our Express server is the ability to support Byte Range requests. Byte Range requests are when the client requests a certain range of the file instead of the whole thing. This is a critical feature to support for any streaming media, as you do not want clients consuming more resoruces than necessary. If you're interested in how we built this functionality at Barstool feel free to email me at [email protected] or apply to an open position here: https://www.barstoolsports.com/jobs