Runlet: Thought on Stream

2020/08/13
runlet, chip, pend, stream, job-system, observable, data, buffer, async
Runlet is a pattern provide most Stream feature with reduced code complexity and minimal clear API.

Stream in script language

Having dealt with Stream data for a long time, but not actually take time and think what the Stream code do provide, or what feature to expect, it's time to pay the debt of thoughts.

The idea of Stream is simple, but looking at actual implementation often it's really complex (check Nodejs Stream, or Web Streams API)

Nevertheless, ignore those for now, let's just start by locking the basic feature set.

First a look of simpler sort-of-stream patterns:

Then a list of common stream usage:

And last a limit to the topic scope: here we only discuss the stream pattern in script language. Unlike most compiled language, in which stream means passing value in order, the stream got more complex, with features like buffer/backpressure, async process function, merge/split attached. In script land, the stream is more like a Job system or Message queue, and most heavy work like file or socket IO is handled in separate thread, and often in native language like C.

Think of a stream pipeline for a fictional Node.js "Web-based unzip service", the main JS thread just need to handle client connection and pipeline setup, and the actual decompress is done in native worker thread. This pattern is clear and quite efficient for script language.

Stream feature

Most Stream implementation will provide features like:

But most Stream implementation is also quite bloated, to keep the feature while reducing the actual code, let's start by morphing the process model.

First imagine data on a single long conveyor belt, like:

Producer    9 8 7 6 5 4 3 2 1    Consumer
          >>>>>>>>>>>>>>>>>>>>>

Think multiple conveyor working together, like many robotic arm moving data between boxes: (like in the game Factorio)

Producer [9,8,7] [6,5,4,3] [2,1] Consumer
       |->     |->       |->   |->   Worker

In some sense, a full-featured stream in scripting language is comparable to a job system for compiled language, so similar to how we use shared memory, we could use a shared buffer for the whole pipeline, to free Worker code from buffer management.

Producer 9>>>>>>>6>>>>>>>>>2>>>> Consumer
       |->     |->       |->   |->   Worker
        [    8,7     5,4,3     1]    SharedBuffer

Then try ditching the linear pipeline mindset, and accept some absurd combo can be setup like:

Producer   Producer   Producer   Producer
        [  9,      8,      7    ]    SharedBuffer
           |->     |->     |->       Worker Group
        [ 6,    5,    4,    3   ]    SharedBuffer
          |->   |->   |->   |->      Worker Group
        [ 2,1                   ]    SharedBuffer
  Consumer      Consumer      Consumer

So to sum up:

The shared buffer can provide benefits like:

To also simplify the stream code, basically we want to just provide:

The Runlet

Runlet is a Stream with less code and clearer execution order.

Before the follow-up discussion, some word mapping:

A Runlet will contain multiple Pool and Chip, a Chip will declare it's prev/next Pend from any Pool, a manual step for Chip-Pend attach is required before flowing the values

And some additional concepts:

A basic linear Runlet will be:

<Linear Runlet with IO Chip (active)>         | <Linear Runlet without IO Chip (passive)>
        [                       ]    Pool     |         [                       ]    Pool
             [     ] [     ]         Pend     |         [  ] [     ] [     ] [  ]    Pend
       Input->     |->     |-Output  ChipSet  |            |->     |->     |->       ChipSet
        []                     []    IO Pend  |
        [                       ]    IO Pool  |

A more detailed look at each piece:

Runlet pattern

// hint:
//   Special symbol used to signal Chip and Pool how the flow should change,
//     send along with value in pack
const END = Symbol('runlet:hint:end') // for Chip receive/return, without value, meaning returning chip will not process/output more value (NOTE: to support split flow, allow return Number as value to pushPend extra END)
const SKIP = Symbol('runlet:hint:skip') // for Chip receive/return, without value, meaning chip want more input for an output (ratio N:1)
const REDO = Symbol('runlet:hint:redo') // for Chip return only, with value, meaning chip will output more for same input (ratio 1:N) (NOTE: do not pack reuse with REDO, and the pack will stay in runningChipMap when REDO)

// pack:
//   Simple array hold max 3 values, all may be `undefined`:
//   - `pack[ 0 ]`: the value, the data to send downstream
//   - `pack[ 1 ]`: hint, to signal flow change
//   - `pack[ 2 ]`: a promise, when the Chip process is running
//   The pack passing between Pend & Chip is always visible:
//   - most pack should be in one of the Pends
//   - a running Chip will hold a sin
const createPack = (value, hint) => [ value, hint, undefined /* promise */ ] // at `pack[ 2 ]` holds the promise of the running Chip process

//   Only do bare minimum work, no extra check,
//     and this is a general purpose non-optimal implementation.
//   Do not provide END callback, user need to get that from Output Chip,
//     this is for supporting Runlet with multi Output Chip, where multi END will be in flight.
//   Final outcome should be either END or error, and `error.isAbort = true` is recommended for abort.
//   No auto-close for Pool/Chip with external IO, and need manual init/clear outside Runlet.
const createRunlet = ({
  poolMap = new Map(),
  chipMap = new Map(),
  onError = (error) => { throw error } // normal error should handled in Chip, this is mostly for Bug reporting, should just report & crash
}) => {
  let isPause = false
  let isValid = true // marker to cut value passing after runlet detach

  return {
    poolMap,
    chipMap,

    getIsValid: () => isValid,
    getIsPause: () => isPause,
    setIsPause: (value) => { isPause = value },
    pause: () => { isPause = true }, // set pause flag to stop value passing/processing, will not stop running process
    resume: () => { isPause = false }, // unset pause flag, may need trigger() or push some value in to restore the flow

    attach: () => {}, // call this at lease once before start the flow, and after Pool/Chip change
    detach: () => ({ poolMap, chipMap, endChipKeySet, runningChipMap }), // cut off all Chip data flow, clear Pool, return running chip process
    trigger: () => {}, // trigger all runnable Chip: give a SKIP to signal InputChip, or passing value to Chip with prevPend

    // allow push/pull value to/from Pool Pend // NOTE: this is polling-based for sync peek/poke, for callback-based just add a Chip
    createPendInput: (poolKey, pendKey) => ({ pool, push: (pack) => pack, canPush: () => Boolean() }),
    createPendOutput: (poolKey, pendKey) => ({ pool, pull: () => (pack || undefined), canPull: () => Boolean() }),

    describe: () => [ 'runlet info' ] // for debug & monitor runlet status
  }
}

Pool pattern

// Pool:
//   The default Pool should use the key `default`,
//     and there's a special IO Pool with Pend for Input/Output Chip.
//   This pattern should support basic array implementation,
//     as well as advanced no-alloc buffer with custom Runlet code.
const createCountPool = ({ // TODO: for fast zero-copy buffer, should let Pool & Chip acquire Buffer from an optimized SharedBufferPool
  key = 'default', // String || Symbol
  sizeLimit: poolSizeLimit // Number
}) => {
  let poolSize = 0 // sum of all pend, size can be count or byte, sizePrivate is always counted
  const pendMap = new Map() // pendKey: { packQueue: [], size, sizePrivate, sizeLimit }
  return {
    key,
    pendKeyGroupMap: undefined, // new Map() // groupTag: pendKeyGroupSet // not used here, for marking group of Pend act together as one Pend, so the wakeKeySet is shared
    reset: () => {},
    getPoolSize: () => poolSize,
    configPend: (pendKey, sizePrivate = 0, sizeLimit = Infinity, ...extraConfig) => {}, // assign Pend exclusive size // NOTE: will reset existing pend, and should config all Pend before use
    // below assume all related `configPend` is called
    isPendLimited: (pendKey) => Boolean(), // allow sizePrivate to bust sizeLimit (also must have at lease 1 value)
    getPendSize: (pendKey) => pendMap.get(pendKey).size,
    pushPend: (pendKey, pack) => {},
    pullPend: (pendKey) => pack,
    describe: (stringList = [], getPoolExtraInfo, getPendExtraInfo) => stringList
  }
}

// PoolIO:
//   A special static Pool for I/O Chip to use, only output SKIP, and error on other operation
const KEY_POOL_IO = Symbol('runlet:pool:io') // for Input/Output Chip's prevPoolKey/nextPoolKey
const KEY_PEND_INPUT = Symbol('runlet:pend:input') // for Input Chip's prevPendKey
const KEY_PEND_OUTPUT = Symbol('runlet:pend:output') // for Output Chip's nextPendKey
const PoolIO = {
  key: KEY_POOL_IO,
  pendKeyGroupMap: undefined,
  reset: () => {},
  getPoolSize: () => 0,
  configPend: (pendKey, sizePrivate = 0) => {
    if (pendKey !== KEY_PEND_INPUT && pendKey !== KEY_PEND_OUTPUT) throw new Error(`invalid IO config pendKey: ${String(pendKey)}`)
    if (sizePrivate) throw new Error(`invalid IO config sizePrivate: ${sizePrivate} for pendKey: ${String(pendKey)}`)
  },
  isPendLimited: (pendKey) => pendKey !== KEY_PEND_OUTPUT, // only allow output
  getPendSize: (pendKey) => pendKey === KEY_PEND_INPUT ? 1 : 0,
  pushPend: (pendKey, value) => {
    if (pendKey === KEY_PEND_OUTPUT && value[ 1 ] === END) return
    throw new Error(`invalid IO push pendKey: ${String(pendKey)}, value: ${String(value)}`)
  },
  pullPend: (pendKey) => {
    if (pendKey === KEY_PEND_INPUT) return createPack(undefined, SKIP) // NOTE: must be new pack since downstream may keep reuse the pack
    else throw new Error(`invalid IO shift pendKey: ${String(pendKey)}`)
  },
  describe: (stringList = [], getPoolExtraInfo, getPendExtraInfo) => {
    stringList.push(`@${String(KEY_POOL_IO)}`)
    return stringList
  }
}

Chip pattern

// Chip:
//   Should be as simple as possible, but also do not divide work too much,
//     since pack passing still has costs.
//   Added state to store side effect, so the process function can be pure function, conceptually.
//   For performance, the state is expected to be changed by direct mutate so less GC involved,
//     but it may be reasonable to go full immutable for some case.

// ChipSyncBasic:
//   A sample pass-though Chip of all supported config.
const ChipSyncBasic = {
  key: 'chip:sync-basic',
  prevPoolKey: KEY_POOL_IO, prevPendKey: KEY_PEND_INPUT, prevPendSizePrivate: 0, prevPendSizeLimit: Infinity, prevPendLogic: {}, // all after PendSize is optional
  nextPoolKey: KEY_POOL_IO, nextPendKey: KEY_PEND_OUTPUT, nextPendSizePrivate: 0, nextPendSizeLimit: Infinity, nextPendLogic: {}, // all after PendSize is optional
  state: {}, // optional
  sync: true, // will get faster loop (no added await)
  process: (pack, state, error) => error ? undefined : { pack, state }, // pass through
  describe: () => 'CHIP-SYNC-BASIC' // optional
}

// ENDRegulatorChip:
//   Needed after the merge Pend after 2+ Input Chip, so only the last END pass,
//     or before the split Pend before 2+ Output Chip, so there's enough END for each.
const createENDRegulatorChip = ({
  inputChipCount = 1, outputChipCount = 1, // should pass in at least a 2+, or just skip this Chip
  key = 'chip:end-regulator',
  ...extra // all the extra Pool/Pend config
}) => ({
  ...extra, key,
  state: { inputEND: inputChipCount, outputEND: outputChipCount },
  process: async (pack, state, error) => {
    if (error) return
    if (pack[ 1 ] === END) {
      state.inputEND--
      const pack = state.inputEND > 0
        ? createPack(undefined, SKIP)
        : createPack(state.outputEND >= 2 ? state.outputEND : undefined, END)
      return { pack, state }
    }
    return { pack, state } // pass through
  }
})

By design, only the Runlet need to maintain the flow to be active/auto, so the Pool & Chip code can be passive and minimal, and easier for user to write custom ones. The config/declarative code style is used, since most time there's no need to alter the flow after Runlet is built, and this can make planning more complex flow graph easier. The execution order is optimised to allow more repeat of the same process, with the help of Pool buffering, hoping the CPU cache hit chance can be higher. The pattern aim to make the flow state as transparent as possible, so the user can predict the state whether the flow is running, end, or error, while keep each part's job clear cut and hard to mix-up.

Note that by definition the Runlet is a directional graph, so the shape can be tree or loop. (TODO: this level of complexity really needed?) And a Runlet can have more than one Pool.

the Chip's Pend:

[ reserved | under limit   | busted                    ]
  reserved: kept private for Pend, and always allow at least one value
           | under limit: shared & dynamic in Pool, can fill more if there's free size available
                           | busted: over the size limit, the Runlet data flow should pause

1:N/N:1 ratio support

With the hint SKIP and REDO, Runlet can support value process ratio other than 1:1. Just return SKIP with no output, if the input is not enough to generate a result. Or return REDO with the first chunk of output, the Chip will receive the same input again, allow more output to be generated. And with the Chip.state as optional data store, most work should be possible to implement.

LogicalPool

Currently, flow unordered-merge or random-split can be supported naturally by push/pull to/from the same Pend, but for flow duplicate/load-balance which requires to pushing to a specific Pend, custom Pend logic is needed.

Think of the duplicate flow, since the backpressure from both downstream can both choke the single upstream Chip, there may be dead lock, so doing this implicitly in the same Runlet is not a good idea.

Without turning to something hacky, like creatComplexRunlet or magicChip that have access to Pool/Pend, or add a getNextPendKey function to Chip, we propose a solution with LogicalPool.

With LogicalPool, Pend is divided as PendView & PendViewee, with custom code slot left open. The PendView should hold no data, like an SQL Table View, and provide logic to push/pull to/from PendViewee. And the PendViewee can be just like normal Pend, holding data, and can have extra logic if needed. This can will allow most use case to preserve the backpressure, even between multiple Pends.

Or just introduce more Runlets like sub-Runlet, or inter-Runlet, so each single Runlet can still be understandable, while exposing the backpressure problem for user to choose a proper strategy.

Error

Assume the Runlet code is perfectly error-free (which may not be true, yet), then all error should occur in Pend or Chip. When the first error occurs, it will be packed and passed to all Chip in sync mode, then the Runlet will be set to detached mode. Most Chip process can just ignore and return, but the Output Chip should notify outer code, and Chip with external IO may need to start the release process. And there's an extra onError Runlet option, to receive the second or more error generated after detach, but most code should just stop and be still.

Random design thoughts

Most stream/step/iterator will try to min-max at different aspects:

Chip execution order choice:

Chip trigger strategy:

FAQ-ish answers:

TODO

Reference

Some existing implementation

About cache optimization

Source file in Repo

github:dr-js/dr-js#source/common/module/Runlet.md