Skip to content

kichooo/object-stream-tools

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

65 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Build Status

object-stream-tools

This package brings goodies of functional programming (map, filter, reduce) to node streams.

Installation

npm install --save object-stream-tools

Usage

arrayToStream

Converts existing array to stream of objects. Useful if you want to inject/merge those object to the existing stream.

const ost = require('object-stream-tools')
ost.arrayToStream([{foo: 'bar'}, {web: 'scale'}])
    .on('data', data => {
        console.log(data)
    })
    .pipe(somewhereWritable)        

Prints

[{foo: 'bar'}, {web: 'scale'}]

streamToSet

Its very useful if you want to get unique elements / set of values

const jsonStream = require('JSONStream')
fs.createReadStream('../test/data.json')
    .pipe(jsonStream.parse('*'))
    .pipe(ost.map(obj => obj.requiredProperty))
    .pipe(ost.streamToSet())
    .on('data', uniqueSet => {
        // here one get array of unique elements
        const uniqueArray = Array.from(uniqueSet.values()).sort()
    })

filter

If you just want to remove some objects from stream, you probably want to use filter function.

fs.createReadStream('../test/data.json')
    .pipe(jsonStream.parse('*'))
    .pipe(ost.filter(e => e.value > 6))
    // here you will get filtered objects
    .pipe(jsonStream.stringify())
    .pipe(process.stdout)

map-reduce

Map is useful when you want to modify existing objects in the stream.

Reduce is useful if you want to get single object/value based on whole stream, but you dont want to load whole stream to memory.

Example: sum / average value of huge stream

const jsonStream = require('JSONStream')
fs.createReadStream('./test/data.json')
    .pipe(jsonStream.parse('*'))
    // pick required property you want to reduce over
    .pipe(ost.map(obj => obj.requiredProperty))
    .pipe(ost.reduce((acc, curr, i) => {
        return acc + curr + i
    }, 0))
    .on('data', reducedValue => {
        // here you will get reduced value
    })

Here is example with buffered/string input output:

const jsonStream = require('JSONStream')
fs.createReadStream('./test/data.json')
    .pipe(jsonStream.parse('*'))
    .pipe(ost.map(obj => obj.requiredProperty))
    .pipe(ost.reduce((acc, curr, i) => {
        return acc + curr + i
    }, 0))
    .on('data', reducedValue =>
        // here you will get reduced value 
    })
    .pipe(jsonStream.stringify())
    .pipe(process.stdout)

Please note that if you do not pass initial value reduce function will start in (prev, curr, i) mode. Objects/Array/Reduce

Promises

Very handy when you want to aggregate streams using reduce or derrivated calls. Keep in mind .promise() will not work if you use only ost.map or ost.reduce features - as they do not aggregate.

fs.createReadStream('../test/data.json')
    .pipe(jsonStream.parse('*'))
    .pipe(ost.streamToArray())
    .promise()
    .then(data => {
        // here you will get your aggregated data - array of values.
    })

find

Find is super handy if we want to quickly check if vale/objects exists in the stream. Think about it as a grep on the steroids.

fs.createReadStream('../test/data.json')
    .pipe(jsonStream.parse('*'))
    .pipe(ost.find(e => e.value > 6))
    .then(foundObj => {
        // here you will get found first object that matches condition
        // or undefined when there were none that matches
    })

Please look at the tests for more use cases.

About

Work on you streams like you work with arrays.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors