## Efficient Stream Processing

From: andrew cooke <andrew@...>

Date: Tue, 8 Dec 2015 08:45:00 -0300

I needed to process a stream of data efficiently: both quickly and
without using too much memory.  The processing included up-sampling
data (sample and hold) and down-sampling data (low pass filter and
decimation).  The language was C.

The "standard" approach for reducing memory use is to use streams.
This can be time-inefficient when the stream consists of single
values, so I used an interface that supported chunking (passing an
array of values that is less than the total size of the stream, but
large enough for the overhead of passing values to be unimportant
compared to processing time for the array).

However, there is a problem here.  If each stream component requests
an array of size appropriate to service its own request, and we have
significant downsampling, then the requests "up the stream" can become
large.

For example, if we request a chunk 1MB in size from a 16x downsampler
component, that component will request a 16MB chunk from its parent.
And if that is a 16x downsampler too, the grandparent will be
providing 256MB.

So the final interface requests, but does not enforce, an array size.
Parent components return the number of values actually provided.  In
this way components can limit memory use.

In practice, this is best done by a separate component whose only job
is to limit requests.  This can be composed with other components as
needed.

So the final interface is:

struct source;

typedef int sfree(struct source **s, int status);

typedef int stake(struct source *s, double *data, int length, int *available);

typedef struct {
void *state;
sfree *free;
stake *take;
} source;

(from memory and untested - please forgive any typos).  Where:

* int return values are staus (0 for OK)

* sfree allows nested sources to be freed on cleanup

* stake is the main interface, as described above

* state depends on the particular component

* available is the amount of data provided and is zero ony when the
source is permanently exhausted

And, of course, each component has a constructor something like:

int mkfoo(source **s, ...);

In addition, the following components and functions were particularly
useful:

int collect(source *s, double *data, int length, int *available);

int rcollect(...as above...);

These two functions repeatedly call the provided source,
accumulating data until the array is filled or the sources are
exhausted.  So these "fix up" the chunking variablility described
earlier.

The rcollect() function is similar to collect() but (repeatedly)
calls the parent with a length value selected at random betweeen 1
and the required length.  This is useful in testing for cache bugs.

int mkchunk(source **s, source *parent, int n);

This implements the chunking limit described above.  So its take
method calls the parent repeatedly, limiting the length to n.

int mkcat(source **s, source *a, source *b, ...);

int mkdrop(source **s, source *parent, int n);

int mksum(source **s, source *a, source *b, ...);

int mkprod(source **s, source *a, source *b, ...);

int mkconst(source **s, double k);

int mkcount(source **s);

These are all pretty obvious ad help with testing etc.

It can also be useful to have a source that reads from a file, a
function that writs to a file, a function to compare source and file,
etc.

The entire library plus testing was 3 days work and performed
surprisingly well.

Andrew