Saving Money with Custom RxJs Operators

Let’s say we have a power meter like the one that sense makes. Let’s also say that the API for this power meter is a little underwhelming. In this hypothetical situation, we can fetch the current home power usage so we could certainly wire up an observable stream to this API and (assuming we’re using Angular) async pipe our way to flawless victory. Drop that beautiful app on a Raspberry Pi, attach one of those fancy OLED monitors, and frame it on the wall for everyone to marvel at as they go around the house turning switches on and off and saving negligible amounts of money.

Great. We built a kill-a-watt clone. But do you know what’s even more eye-grabbing than a number on a wall? Graphs! Okay, I agree, that was not as climactic as I had hoped. But that’s what I have for you today.

We begin with an observable that occasionally emits the current power usage in watts.

this.powerService.currentPowerUsage$

We map that to an array that holds all of the other values, right?

// BAD BAD BAD BAD
export class PowerMeterComponent implements OnInit {
  private history = [];
  graphData$ = this.powerService.currentPowerUsage$.pipe(
    map(powerEvent => {
      // NO NO NO NO NO STOP!
      this.history.push(powerEvent);
      return this.history;
    });
  );
  constructor(private powerService: PowerService) { }
}  

While this will technically work, functions in an RxJs stream should always be pure functions. They should not access state outside of the stream. Doing so leads to code that is difficult to test and complicated to debug. Instead, let’s find an operator that better fits our need.

export class PowerMeterComponent implements OnInit {
  graphData$ = this.powerService.currentPowerUsage$.pipe(
    startWith([]),
    scan((accumulator, value) => {
      return [...accumulator, value];
    });
  }
  constructor(private powerService: PowerService) { }
}

// Given that currentPowerUsage$ emits 641, 832, 983, this outputs
// [ 641, 832, 983 ]

We startWith an empty array. Thescan operator consumes the first event (our empty array) and emits nothing. After that, scan functions like map, with the exception that it passes in both the current event as well as the previous event. Let’s walk through it.

  • startWith emits an empty array
  • scanreceives the empty array as its initial value and takes a nap
  • A value of 641 is emitted from the currentPowerUsage$ stream
  • The scan operator pairs the new value with the previous value (aka the accumulator, aka the empty array) and calls the callback function
  • We create a new array with the empty array (accumulator) and the new value and return it
  • graphData$ emits [641]
  • A value of 832 is emitted from the currentPowerUsage$ stream
  • The scan operator pairs the new value with the previous value ([641]) and calls the callback function
  • We create a new array with the single element in the accumulator array and the new value and return it
  • graphData$ emits [641, 832]
  • … one more time…
  • A value of 983 is emitted from the currentPowerUsage$ stream
    The scan operator pairs the new value with the previous value ([641, 832]) and calls the callback function
    We create a new array with the two elements in the accumulator array and the new value and return it
  • graphData$ emits [641, 832, 983]

We now have a series of data, but we’re missing an important component: time. That’s easily solved with a small change.

export class PowerMeterComponent implements OnInit {
  graphData$ = this.powerService.currentPowerUsage$.pipe(
    startWith([]),
    scan((accumulator, value) => {
      return [...accumulator, { time: new Date().getTime(), value }];
    });
  }
  constructor(private powerService: PowerService) { }
}  

// Given that currentPowerUsage$ emits 641, 832, 983, this outputs:
// [
//     { time: 1560224496, value: 641 },
//     { time: 1560226697, value: 842 },
//     { time: 1560228898, value: 983 },
//  ]

Marvelous. We can feed that data into a graph and readily evaluate our power usage patterns for optimal efficiency.

This code will collect points indefinitely. Let’s put a limit on that.

scan((accumulator, value) => {
  accumulator = accumulator.slice(-50); // only keep the last 50 values
  return [...accumulator, { time: new Date().getTime(), value }];
})

But wait! You were promised a custom operator. Well that just requires a little refactoring.

export const timeSeries = (limit: number) => {
  return source => defer(() => {
    return source.pipe(
      startWith([]),
      scan((accumulator, value) => {
        accumulator = accumulator.slice(-limit);
        return [...accumulator, { time: new Date().getTime(), value }];
      })
    );
  }
}

export class PowerMeterComponent implements OnInit {
    graphData$ = this.powerService.currentPowerUsage$.pipe(timeSeries());
    constructor(private powerService: PowerService) { }
}

At this point you’ve probably realized that refreshing the browser nukes your history. Well, that’s an adventure for another time.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>