Streams is a C++ library that provides lazy evaluation and functional-style transformations on the data, to ease the use of C++ standard library containers and algorithms. Streams support many common functiona operations such as map, filter, and reduce, as well as various other useful operations such as various set operations (union, intersection, difference), partial sum, and adjacent difference, as well as many others. Please visit the Github page to download and try it yourself.
Streams is developed by Jonah Scheinerman, and is always looking to improve. Please get in touch if you find bugs or have ideas on how to make Streams even better.
To use streams download them from Github. To use, simply #include "Stream.h". Streams require C++14 to compile. Streams consist only of header files, so you shouldn't have to modify your build process to use them.
Introduction
Streams are an abstraction on a set of data that has a specific order (be it a strict ordering or just some encounter order). Various operations can be applied to streams such that data passes through the stream pipeline in a lazy manner, only getting passed to then next operation when it is requested. There are 3 main kinds of stream operations: generators, intermediate operators, and terminal operators.
- Generators are methods that create streams, and are thus considered stream sources. Stream generators will not be evaluated at all until some terminal operation on a stream has been called.
- Intermediate operators take in streams and return streams, applying some additional operation to the end of the stream pipeline. Intermediate operations will not be evaluated until some terminal operation on the stream has been called. There are two kinds of intermediate operators: stateless and stateful. Stateless operators require O(1) memory, whereas stateful operators may accumulate up to O(n) memory where n is the stream length.
- Terminal operators close a stream and finally cause all of the stream operations to evaluate up the stream pipeline, until some final non-stream value gets returned by the terminal operation.
A stream is a single source of data, and thus will uniquely own its data source. There is no way of copying streams without accruing a lot of state and thus streams are not copiable. However, streams are movable. Moving a stream will result in the source stream being "vacant." One can check the vacancy of a stream using the occupied() method. Attempting to apply any stream operation to a vacant stream will result in a VacantStreamException being thrown. Additionally, all intermediate operations will create new streams which now own the original data source of the calling stream. Thus calling and intermediate operation on a stream will result in the original stream being vacant. See the example for occupied().
Streams can be iterated through in the standard C++ way using the begin() and end() methods. However, one should be careful when using these (as they don't work exactly like standard library iterators), and using these may not be as efficient. For example, even though it may be nicer to iterate through a stream via a range-for loop, it may be more efficient to use the for_each() method, as it does not have to incur the overhead cost of making stream iterators safe to use.
Specializations
Streams are specialized based on the element type of the stream to allow extended functionality based on that type. For example, for streams of arithmetic types (int, double, etc.) there is specialization of sum() such that it has the default identity of 0. Specializations for most type are listed in their relevant sections. However, for class types, there are specializations that are further reaching, and we shall discuss here. For any method that takes function of the element type (e.g. map(), filter(), take_while(), to name a few), there is a specialization that allows for passing member function pointers to the function. For example consider the following example:
class Thing {
int x;
int value() const { return x }
}
Stream<Thing> things = /* ... */
In this case, all of the following are equivalent, with the last one being the one that is provided by the stream specialization for class types:
Stream<int> values = things.map([](Thing&& thing) { return thing.value(); });
Stream<int> values = things.map(std::mem_fn(&Thing::value));
Stream<int> values = things.map(&Thing::value);
Methods
These are methods on the stream class that do not directly effect the data in the stream, and thus are not considered stream operators.
iterator Stream<T>::begin();
Returns an iterator to the beginning of the stream.
Iterators for streams have all of the standard iterator operator overloads,
and act for the most part just like normal forward iterators. However,
there are some caveats to their use.First of all, when getting a starting iterator from begin(), you have not yet consumed anything in the stream. But, as soon as you perform some operation (dereference, increment, test equality or inequality) on this. iterator, the first element of the stream (if there is one) will be consumed and this iterator will resolve itself to its actual value. Example:
auto stream = MakeStream::counter(1)
.peek([](int x) { std::cout << "Peek = " << x << "\n" });
auto iter = stream.begin();
int value = *iter;
std::cout << "Iter = " << value << "\n";
Produces the following output:
Peek = 1
Iter = 1
Second, be very careful when using the return value of the postfix increment
operator. The return value of the postfix increment of an iterator is an iterator
that is consider to be "consumed," meaning you can dereference it, but you
can't do anything else with it, you can't increment it or check its equality.
Why is this? Well the idea is that we don't want to split the stream or copy it. If you had two independent iterators that were both attempting to iterate through the same stream, very strange behavior could occur, because a stream isn't a true container, its simply an wrapper around a next method. This isn't to say getting two iterators is impossible (simply call begin() twice). However, we want to safeguard against this type of behavior. Attempting to perform unauthorized actions on a consumed iterator results in a ConsumedIteratorException. Example:
auto stream = MakeStream::closed_range(1,10);
auto iter = stream.begin();
cout << *iter << endl;
auto temp_iter = iter++;
cout << *temp_iter << endl;
try {
++temp_iter;
} catch(ConsumedIteratorException& e) {
cout << e.what() << endl;
}
Produces the following output:
1
2
Cannot perform prefix increment on consumed stream iterator.
That being said, using stream iterators should be fine with most if not
all standard library algorithms. However, be cognizant of the fact that
you are paying a slight overhead cost for using an iterator, so if
speed is your concern, use a reduction method instead of something
that uses iterators. For example, even though this is prettier:
for(auto element : stream) {
/* do something */
}
This is more efficient:
stream.for_each([](auto element) {
/* do something */
});
iterator Stream<T>::end();
Returns an iterator to something one past the end of the stream. Since
the end of the stream is unknown, this is simply a sentinel iterator.
For a discussion of the intricacies of using iterators see the
begin() method.
bool Stream<T>::occupied();
Returns true if the stream object owns stream data. Streams are only
movable (not copiable), and moving a stream results in a "vacant"
stream (calling this method will return false). Attempting to
perform any stream operation on a vacant stream will result in a
VacantStreamException. Similarly, every
stream operation returns a new stream that owns the data of the stream(s)
it was called on. Thus attempting to do two operations on the same
stream will result in an exception being thrown.
Example:
Stream<int> stream1 = MakeStream::counter(1);
std::cout << std::boolalpha;
std::cout << "Stream 1 occupied: " << stream1.occupied() << std::endl;
Stream<int> stream2 = stream1.limit(10);
std::cout << "Stream 1 occupied: " << stream1.occupied() << std::endl;
std::cout << "Stream 2 occupied: " << stream2.occupied() << std::endl;
try {
stream1.filter([] (int x) { return x % 2 == 0; })
} catch(VacantStreamException& e) {
std::cout << e.what() << std::endl;
}
Produces the following output:
Stream 1 occupied: true
Stream 1 occupied: false
Stream 2 occupied: true
Cannot call Stream::filter on a vacant stream
void Stream<T>::print_pipeline(std::ostream& os);
Prints a representation of the stream pipeline, including all of the basic
transformations on the stream and all sources, printing the number
of pipelines and the number of stream sources.
Example:
std::vector<std::string> v = /* ... */
MakeStream::from(v)
.limit(100)
.filter([](std::string& s) {
return !s.empty();
})
.map([](std::string& s) {
return s[0];
})
.zip_with(MakeStream::counter(1) * 5)
.zip_with(MakeStream::repeat(10))
.print_pipeline(std::cout);
Produces the following output:
> Zip:
> Zip:
> Map:
> Filter:
> Slice[0, 100, 1]:
> [iterator stream]
> Map:
> [iterated stream]
> [repeated value stream]
Stream pipeline with 6 stages and 3 sources.
Stream Generators
The stream generator factory methods can all be found as static methods in the MakeStream class. These are not wrapped in the stream, so that they can perform type deduction for you.
Be careful when using factory methods that draw from referenced data sources (for example, MakeStream::from(const Container&)). These are perfectly safe to use if the usage of the stream is entirely contained with in the current scope. However, if the scope is left, and the stream is referencing a container within that scope, bad stuff can happen. In this case, prefer the MakeStream::from_move() generator.
template<typename T>
Stream<T> MakeStream::empty();
Creates an empty stream of the given type. Calling
MakeStream::empty<T>() is equivalent
to default constructing a stream.
Example:
MakeStream::empty().count(); // 0
template<typename T>
Stream<T> MakeStream::singleton(T&& value);
Creates a stream with a single given value.
Example:
MakeStream::singleton(5).to_vector(); // {5}
Stream<T> MakeStream::from(Iterator begin, Iterator end);
Stream<T> MakeStream::from(const Container& cont);
Stream<T> MakeStream::from(T* arr, size_t length);
Stream<T> MakeStream::from(std::initializer_list<T> init);
Creates a stream from some existing set of data, a container for which
std::begin(), and std::end() are defined, a C-style array or an initializer list.Warning! Beware using most of these methods if your Stream is going to be used outside of the current scope. The exception to this is the initializer_list generator, which will capture the list, and therefore is safe to use outside of the current scope. To safely capture a container in a cycle, use the from_move() generator method.
Example:
std::vector<int> x = {1, 3, 4, 2};
MakeStream::from(x);
MakeStream::from(x.begin(), x.end())
int arr[4] = {1, 3, 4, 2};
MakeStream::from(arr, 4);
MakeStream::from({1, 3, 4, 2});
template<typename Container>
Stream<T> MakeStream::from_move(Container&& cont);
Creates a stream from a container of data, moving that container into
itself, so that the stream may be used safely outside of the current scope.
Example:
#include "Stream.h"
#include <vector>
#include <iostream>
std::vector<int> make_vector() {
return {1, 2, 3};
}
Stream<int> make_stream_safe1() {
return MakeStream::from_move(make_vector());
}
Stream<int> make_stream_safe2() {
std::vector<int> vec = {4, 5, 6};
return MakeStream::from_move(vec);
}
Stream<int> make_stream_unsafe() {
std::vector<int> vec = {7, 8, 9};
return MakeStream::from(vec); // BAD!
}
int main(int argc, char const *argv[]) {
make_stream_safe1().print_to(std::cout) << std::endl;
make_stream_safe2().print_to(std::cout) << std::endl;
make_stream_unsafe().print_to(std::cout) << std::endl;
}
Produces the following output (on one run on my computer):
1 2 3
4 5 6
7 -536870912 -2026110050
Rule of thumb: If you're only going to be accessing data through the
stream, probably just use from_move().
template<typename T>
Stream<T> MakeStream::repeat(T&& value);
template<typename T>
Stream<T> MakeStream::repeat(T&& value, size_t times);
Creates a stream consisting of the same value repeated over and
over again. The first method creates an infinite stream of the repeated
value. The second method only repeats the value a fixed number of
times. Calling stream.repeat(x, n) is
equivalent to calling stream.repeat(x).limit(n).
Example:
MakeStream::repeat(1)
.partial_sum(); // Stream contains 1, 2, 3, 4, ...
Stream<T> MakeStream::cycle(Iterator begin, Iterator end);
Stream<T> MakeStream::cycle(Iterator begin, Iterator end, size_t times);
Stream<T> MakeStream::cycle(const Container& cont);
Stream<T> MakeStream::cycle(const Container& cont, size_t times);
Stream<T> MakeStream::cycle(std::initializer_list<T> init);
Stream<T> MakeStream::cycle(std::initializer_list<T> init, size_t times);
Creates a stream of a sequence of elements repeated over and over again.
The signatures without a times parameter
will loop over a range indefinitely, whereas the ones with the parameter
will only loop over the sequence that many times. Warning! Beware using most of these methods if your Stream is going to be used outside of the current scope. The exception to this is the initializer_list generator, which will capture the list, and therefore is safe to use outside of the current scope. To safely capture a container in a cycle, use the cycle_move() generator method.
Example:
vector x{1, 3, 8};
MakeStream::cycle(x.begin(), x.end) // Contains 1, 3, 8, 1, 3, 8, 1, ...
MakeStream::cycle(x, 2) // Contains 1, 3, 8, 1, 3, 8.
template<typename Container>
Stream<T> MakeStream::cycle_move(Container&& cont);
template<typename Container>
Stream<T> MakeStream::cycle_move(Container&& cont, size_t times)
Creates a stream of a sequence of elements repeated over and over again.
The signatures without a times parameter
will loop over a range indefinitely, whereas the ones with the parameter
will only loop over the sequence that many times. Warning! Beware using most of these methods if your Stream is going to be used outside of the current scope. The exception to this is the initializer_list generator, which will capture the list, and therefore is safe to use outside of the current scope. To safely capture a container in a cycle, use the cycle_move() generator method.
Example:
#include "Stream.h"
#include <vector>
#include <iostream>
std::vector<int> make_vector() {
return {1, 2, 3};
}
Stream<int> make_stream_safe1() {
return MakeStream::cycle_move(make_vector(), 2);
}
Stream<int> make_stream_safe2() {
std::vector<int> vec = {4, 5, 6};
return MakeStream::cycle_move(vec, 2);
}
Stream<int> make_stream_unsafe() {
std::vector<int> vec = {7, 8, 9};
return MakeStream::from(vec); // BAD!
}
int main(int argc, char const *argv[]) {
make_stream_safe1().print_to(std::cout) << std::endl;
make_stream_safe2().print_to(std::cout) << std::endl;
make_stream_unsafe().print_to(std::cout) << std::endl;
}
Produces the following output (on one run on my computer):
1 2 3 1 2 3
4 5 6 4 5 6
0 -1879048192 0 0 -1879048192 0
template<typename Generator>
Stream<T> MakeStream::generate(Generator&& generator);
Creates a stream whose values the return values of repeated calls to
the generate function with no arguments.
Example:
MakeStream::generate(rand); // Stream of random integers
template<typename T, typename Function>
Stream<T> MakeStream::iterate(T&& value, Function&& function);
Creates a stream that, given a value x and
a function f returns the stream produced by
x, f(x),
f(f(x)) and so on. In the below example
we produce a stream which investigates the
Collatz conjecture.
Example:
auto stream = MakeStream::iterate(1245, [](int x) {
if(x % 2 == 0) {
return x / 2;
} else {
return 3 * x + 1;
}
});
template<typename T>
Stream<T> MakeStream::counter(T&& start);
template<typename T, typename U>
Stream<T> MakeStream::counter(T&& start, U&& increment);
template<typename T, typename U>
Stream<T> MakeStream::counter(T&& start, const U& increment);
Creates a stream of elements produced by incrementing a given element
indefinitely. Incrementing in the case of the first method is done
via a prefix increment operator. The last two methods increment by a
fixed value each time, by adding the increment to the current value
(on the right side of the operator).
Example:
MakeStream::counter(1); // 1, 2, 3, 4, ...
MakeStream::counter('A', 2) // A, C, E, G, ...
template<typename T>
Stream<T> MakeStream::range(T&& lower, T&& upper);
template<typename T, typename U>
Stream<T> MakeStream::range(T&& lower, T&& upper, U&& increment);
template<typename T, typename U>
Stream<T> MakeStream::range(T&& lower, T&& upper, const U& increment);
Creates a stream of elements that iterate through a range starting at
lower, up to but not including
upper. Testing against the upper bound
is done via the built in != operator.
The rules of incrementation for this method are identical to those of
counter().
Example:
MakeStream::range(0, 5); // 0, 1, 2, 3, 4
MakeStream::range(0, 8, 2); // 0, 2, 4, 6
template<typename T>
Stream<T> MakeStream::closed_range(T&& lower, T&& upper);
template<typename T, typename U>
Stream<T> MakeStream::closed_range(T&& lower, T&& upper, U&& increment);
template<typename T, typename U>
Stream<T> MakeStream::closed_range(T&& lower, T&& upper, const U& increment);
Creates a stream of elements that iterate through a range starting at
lower, up and including
upper. Testing against the upper bound
is done via the built in <= operator.
The rules of incrementation for this method are identical to those of
counter().
Example:
MakeStream::closed_range(1, 5); // 1, 2, 3, 4, 5
MakeStream::closed_range(0, 8, 2); // 0, 2, 4, 6, 8
template<typename T = bool,
typename Engine = std::default_random_engine>
Stream<T> MakeStream::coin_flips();
template<typename T = bool,
typename Engine = std::default_random_engine,
typename Seed>
Stream<T> MakeStream::coin_flips(Seed&& seed);
Creates an infinite stream of random integers whose values are uniformly 0 or 1.
The user can specify the underlying random engine which defaults to
std::default_random_engine and the initial
seed of the random number generator. If not given, the seed will be
initialized to the current time. By default, this returns a stream of
bools, though the type can be specified
as any time which makes sense.
Example:
The following runs a test to see how many coin flips come up heads after 1000 flips.
int heads = MakeStream::coin_flips()
.limit(1000)
.filter()
.count();
template<typename T,
typename Engine = std::default_random_engine>
Stream<T> MakeStream::uniform_random_ints(T lower, T upper);
template<typename T,
typename Engine = std::default_random_engine,
typename Seed>
Stream<T> MakeStream::uniform_random_ints(T lower, T upper, Seed&& seed);
Creates an infinite stream of random integers whose values are distributed uniformly
between the upper and lower bounds. The user can specify the underlying
random engine which defaults to std::default_random_engine
and the initial seed of the random number generator. If not given, the seed will be
initialized to the current time.
Example:
MakeStream::uniform_random_ints(0, 10);
template<typename T = double,
typename Engine = std::default_random_engine>
Stream<T> MakeStream::uniform_random_reals(T lower = 0.0, T upper = 1.0);
template<typename T,
typename Engine = std::default_random_engine,
typename Seed>
Stream<T> MakeStream::uniform_random_reals(T lower, T upper, Seed&& seed);
Creates an infinite stream of random real numbers whose values are
distributed uniformly between the upper and lower bounds. The first
signature defaults the lower and upper bounds to 0 and 1 respectively.
The user can specify the underlying random engine which defaults to
std::default_random_engine and the initial
seed of the random number generator. If not given, the seed will be
initialized to the current time.
Example:
MakeStream::uniform_random_reals(); // Doubles between 0 and 1
MakeStream::uniform_random_reals<float>(.3, .7); // Floats between .3 and .7
template<typename T = double,
typename Engine = std::default_random_engine>
Stream<T> MakeStream::normal_randoms(T mean = 0.0, T stddev = 1.0);
template<typename T = double,
typename Engine = std::default_random_engine,
typename Seed>
Stream<T> MakeStream::normal_randoms(T mean, T stddev, Seed&& seed);
Creates an infinite stream of random real numbers whose values are normally
distributed with a given mean and stand deviation. The first signature
defaults the parameters to the standard normal distribution, μ = 0
and s = 1.
The user can specify the underlying random engine which defaults to
std::default_random_engine and the initial
seed of the random number generator. If not given, the seed will be
initialized to the current time.
Example:
MakeStream::normal_randoms(); // Doubles distributed normally
MakeStream::normal_randoms<float>(5.3, 1.8); // Floats distributed normally.
template<typename T,
template<typename> class Distribution,
typename Engine = std::default_random_engine,
typename Seed,
typename... GenArgs>
Stream<T> MakeStream::randoms(Seed&& seed, GenArgs&&... args);
template<typename T,
template<typename> class Distribution,
typename Engine = std::default_random_engine,
typename... GenArgs>
Stream<T> MakeStream::randoms(GenArgs&&... args);
Creates an infinite stream of random values as generically as possible.
In this function, T is the type of the resulting
stream, Distribution is an unqualified number
distribution (e.g. std::uniform_int_distribution),
Engine is the underlying random number generator
and the args... are the parameters passed into
the constructor of the number generator. If the seed is not provided,
the current time will be used.
If there is no factory method that supports your specific distribution, this
is the method for you.
Example:
MakeStream::randoms<double, std::poisson_distribution>(5.0);
// Stream of doubles coming from a poisson process with mean 5.
Stateless Intermediate Stream Operators
These operators insert another stage of a stream pipeline or combine two streams in some way forming a new stream, which contains the data of the old streams. Intermediate operators do not get evaluated until some terminal operation is called on the stream. These operators are all methods on the Stream class.
template<typename Predicate>
Stream<T> Stream<T>::filter(Predicate&& predicate);
Only allows elements of the stream that pass the given predicate.
Example:
MakeStream::counter(1)
.filter([](int x) {
return x % 2 == 0;
}); // Stream now contains only even positive integers
Specializations:Specializations of filter() exist for types that have an implicit conversion to bool. For these types, two specializations exist:
Stream<T> Stream<T>::filter();
Stream<T> Stream<T>::filter_not();
The first of these filters for elements whose bool
values are true. The second filters for
elements that evaluate to false.
template<typename Transform>
Stream<Result> Stream<T>::map(Transform&& transform);
Transforms each element of the stream using the given transformation
function. The resulting stream is the produced by applying the
transformation to each element of the input stream.
Example:
MakeStream::counter(1)
.map([](int x) {
return x * x;
}); // Stream of perfect squares
template<typename Transform>
Stream<Result> Stream<T>::flat_map(Transform&& transform);
Applies a transformation to every element of the stream, that is
expected to return a stream as a result. The resulting stream is the
concatenation of these output streams.
Example:
MakeStream::counter(1)
.flat_map([](int x) {
return MakeStream::counter(1)
.limit(x);
}); // Stream contains 1, 1, 2, 1, 2, 3, 1, 2, 3, 4, ...
Stream<T> Stream<T>::limit(size_t bound);
Ensures that the stream contains a maximum number of elements.
Example:
MakeStream::counter(1)
.limit(5); // Stream contains 1, 2, 3, 4, 5
Note that stream.limit(n) is equivalent to
stream.slice(0, n, 1);.
Stream<T> Stream<T>::skip(size_t amount);
Skips the first k elements of a stream.
Example:
MakeStream::counter(1)
.skip(10); // Stream contains 11, 12, 13, ...
Note that stream.skip(n) is equivalent to
stream.slice_to_end(n, 1);.
Stream<T> Stream<T>::slice(size_t startIndex,
size_t endIndex,
size_t increment = 1);
Retrieves elements of the stream starting at startIndex,
up to, but not including, endIndex, iterating
by increment. By default, increment
is 1.
Example:
auto stream1 = MakeStream::counter(0).slice(5, 10); // 5 6 7 8 9
auto stream2 = MakeStream::counter(0).slice(1, 8, 2); // 1 3 5 7
Stream<T> Stream<T>::slice_to_end(size_t startIndex,
size_t increment);
A version of the slice operation which allows an unbounded slice.
Slices are taken starting at startIndex
and go by increment.
Example:
auto stream1 = MakeStream::counter(0).slice_to_end(0, 5) // 0 5 10 15 ...
auto stream2 = MakeStream::counter(0).slice_to_end(3, 2) // 3 5 7 9 ...
template<typename Predicate>
Stream<T> Stream<T>::take_while(Predicate&& predicate);
Takes elements from the stream until the given predicate becomes false.
Example:
MakeStream::counter(1)
.take_while([](int x) {
return x < 5;
}); // Stream contains 1, 2, 3, 4
Specializations:Specializations of take_while() exist for types that have an implicit conversion to bool. For these types, the following specializations exist:
Stream<T> Stream<T>::take_while();
Stream<T> Stream<T>::take_until();
The first takes elements of the stream while their bool
value is true, the second drops values while
their bool value is false.
template<typename Predicate>
Stream<T> Stream<T>::drop_while(Predicate&& predicate);
Drops the first elements of the stream until the given predicate becomes
false, after which the stream is returned intact.
Example:
MakeStream::counter(1)
.drop_while([](int x) {
return x < 5;
}); // Stream contains 5, 6, 7, ...
Specializations:Specializations of drop_while() exist for types that have an implicit conversion to bool. For these types, the following specializations exist:
Stream<T> Stream<T>::drop_while();
Stream<T> Stream<T>::drop_until();
The first drops elements of the stream while their bool
value is true, the second drops values while
their bool value is false.
template<typename Function>
Stream<T> Stream<T>::peek(Function&& function);
Allows a lazy peek into the stream. When a value passes through the
pipeline, and hits a peek, the function will be called on that value,
the functions return value is ignored and the original value is passed
onto the next pipeline. Like all stateless operations, this will not
be executed until some stateful or terminal operation is called on
the stream.
Example:
std::vector<int> result = MakeStream::counter(1)
.filter([](int x) { x % 3 == 0; })
.limit(3)
.peek([](int x) {
std::cout << "Value = " << x << std::endl;
})
.to_vector();
Output:
Value = 3
Value = 6
Value = 9
template<typename Equal = std::equal<T>>
Stream<T> Stream<T>::adjacent_distinct(Equal&& equal = Equal());
Removes adjacent duplicates from the stream. By default, duplicates
are determined using the standard ==
operator. However, a different equality operation can be given.
Example:
MakeStream::from({1, 1, 3, 2, 2, 5, 5, 5, 5, 2, 3, 3})
.adjacent_distinct(); // Stream contains 1, 3, 2, 5, 2, 3;
template<typename Subtract = std::minus<void>>
Stream<DiffType> Stream<T>::adjacent_difference(Subtract&& subtract = Subtract());
Returns a stream of the pairwise differences of the elements.
By default, subtraction is performed using the standard -
operator. However, a different subtraction operation can be given. Type
of the resulting stream is the difference type returned by the subtraction
function.
Example:
MakeStream::counter(1)
.map([](int x) { return x * x; })
.adjacent_difference(); // Stream contains 3, 5, 7, 9, ...
template<typename Add = std::plus<T>>
Stream<T> Stream<T>::partial_sum(Add&& add = Add());
Computes the partial sum of the elements in the stream. By default,
addition is performed using the standard +
operator. However, a different addition operation can be given. This
addition operation must take elements of type T
and return an element of type T.
Example:
MakeStream::from({1, 5, 3, 7, -2, 6})
.partial_sum(); // Stream contains 1, 6, 9, 16, 14, 20
template<typename Iterator>
Stream<T> Stream<T>::concat(Iterator begin, Iterator end);
Stream<T> Stream<T>::concat(Stream<T>&& other);
Concatenates the given stream to the end of the current stream,
to be processed when the current stream reaches its end. The first
form of concat is a convenience for
concatenating with
MakeStream::from(begin, end).
Example:
std::vector<int> x = {4, 5, 6};
std::vector<int> y = {1, 2, 3};
MakeStream::from(x)
.concat(Makestream::from(y)); // Stream contains 4, 5, 6, 1, 2, 3
Stream<T> Stream<T>::pad(T&& padding);
Concatenates and infinite stream of value of repeated values to the
end of the currents stream. Note that stream.pad(x)
is equivalent to stream.concat(MakeStream::repeat(x)).
Example:
MakeStream::from({1, 2, 3})
.pad(0); // Stream contains 1, 2, 3, 0, 0, 0, ...
template<typename Right, typename Function = Zipper>
Stream<Result> Stream<Left>::zip_with(Stream<Right>&& other,
Function&& zipper = Function());
Zips two streams together using the provided function, stopping when
either of the streams finishes. By default, the zipping function takes
the two elements of the streams and returns a tuple of the two elements.
This function is specialized so that if either stream is a stream of
tuples, the resulting of the zipping function is the concatenation of
the tuples (rather than nested tuples).
Example:
std::vector<std::string> input = {"Hello", "this", "is", "a", "stream"};
MakeStream::from(input)
.zip_with(MakeStream::counter(1))// Stream of std::tuple<std::string, int>
.for_each([](std::tuple<std::string, int>& tup) {
std::cout << tup << std::endl;
});
Produces the following output:
(Hello, 1)
(this, 2)
(is, 3)
(a, 4)
(stream, 5)
Note: As can be seen above, we've included an overload for
operator<< for inserting tuples
into I/O streams.Zipping multiple times produces a concatenated tuple rather than nested tuples:
MakeStream::counter(0)
.zip_with(MakeStream::counter(10))
.zip_with(MakeStream::counter(20))
// Stream is now a stream of std::tuple<int, int, int> which contains:
// (0, 10, 20), (1, 11, 21), (2, 12, 22), ...
One particularly nice tuple operation we've included is a function
called splat which takes a function and
returns a function that takes a tuple and splats that tuples elements
in as the arguments to the function. For example:
auto splatted = splat([](std::string x, int y) {
return x.length() + y;
});
splatted(std::make_tuple("Hello", 5)); // returns 10
This is useful, because it makes working with zipped streams particularly easy.
So instead of having to do the ugly and unreadable:
MakeStream::counter(1))
.zip_with(MakeStream::counter(11))
.map([](const std::tuple<int, int>& tup) {
return std::get<0>(tup) * std::get<1>(1);
});
You can now have the following:
MakeStream::counter(1)
.zip_with(MakeStream::counter(11))
.map(splat([](int first, int second) {
return first * second;
}));
A different function can be used for zipping, as an excellent way of
performing element-wise operations to combine streams. All of the
binary operators have stream overloads that are effectively zips.
For example, consider the multiplication operator, which can be
implemented on two streams in the following way (its not quite this,
but close):
template<typename L, typename R>
auto operator* (Stream<L>&& left, Stream<R>&& right) {
return left.zip_with(right, [](const L& lvalue, const R& rvalue) {
return lvalue * rvalue;
});
}
Method signature:
template<typename Less = std::less<T>>
Stream<T> Stream<T>::merge_with(Stream<T>&& other,
Less less = Less());
Computes the merging of two streams (the merge step in mergesort).
By default, elements are compared using their built in
< operator. However, a different less
than operator can be provided.
Example:
Stream left = MakeStream::from({1, 2, 4, 4, 12, 13, 17});
Stream right = MakeStream::from({2, 5, 12, 17, 18});
left.merge_with(right); // 1, 2, 2, 4, 4, 5, 12, 12, 13, 17, 17, 18
Method signature:
template<typename Less = std::less<T>>
Stream<T> Stream<T>::union_with(Stream<T>&& other,
Less less = Less());
Computes the set union of two streams.
By default, elements are compared using their built in
< operator. However, a different less
than operator can be provided.
Example:
Stream left = MakeStream::from({1, 2, 4, 12, 13, 17});
Stream right = MakeStream::from({2, 5, 12, 17, 18});
left.union_with(right); // 1, 2, 4, 12, 13, 17, 18
Method signature:
template<typename Less = std::less<T>>
Stream<T> Stream<T>::intersection_with(Stream<T>&& other,
Less less = Less());
Computes the set intersection of two streams.
By default, elements are compared using their built in
< operator. However, a different less
than operator can be provided.
Example:
Stream left = MakeStream::from({1, 2, 4, 12, 13, 17});
Stream right = MakeStream::from({2, 5, 12, 17, 18});
left.intersection_with(right); // 2, 12, 17
Method signature:
template<typename Less = std::less<T>>
Stream<T> Stream<T>::difference_with(Stream<T>&& other,
Less less = Less());
Computes the set difference of two streams, with the base stream as
the left argument, and the argument stream as the right argument to
the difference.
By default, elements are compared using their built in
< operator. However, a different less
than operator can be provided.
Example:
Stream left = MakeStream::from({1, 2, 4, 12, 13, 17});
Stream right = MakeStream::from({2, 5, 12, 17, 18});
left.difference_with(right); // 1, 4, 13
Method signature:
template<typename Less = std::less<T>>
Stream<T> Stream<T>::symmetric_difference_with(Stream<T>&& other,
Less less = Less());
Computes the set symmetric difference of two streams.
By default, elements are compared using their built in
< operator. However, a different less
than operator can be provided.
Example:
Stream left = MakeStream::from({1, 2, 4, 12, 13, 17});
Stream right = MakeStream::from({2, 5, 12, 17, 18});
left.symmetric_difference_with(right); // 1, 4, 5, 13, 18
Stream<std::pair<T, T>> Stream<T>::pairwise();
Returns a stream with adjacent elements grouped into pairs.
Example:
MakeStream::counter(1)
.pairwise() // Stream contains (1, 2), (3, 4), (5, 6), ...
Note that this is a convenience for stream.grouped<2>().
template<size_t N>
Stream<std::tuple<T...>> Stream<T>::grouped();
Returns a stream with adjacent elements grouped into groups of size
N, where N
is a compile time constant. The resulting stream is a stream of
N-tuples, all of whose elements are of
the original stream type, T.
Example:
MakeStream::counter(1)
.grouped<3>() // Stream contains (1, 2, 3), (4, 5, 6), ...
This is specialied for N = 2, so that
the resulting stream is a stream of pairs rather than tuple. Calling
stream.grouped<2>() is the same as calling
stream.pairwise().
Method signatures (template definitions excluded for brevity):
Stream<X> operator$ (Stream<L>&& left, R&& right);
Stream<X> operator$ (L&& left, Stream<R>&& right);
Stream<X> operator$ (Stream<L>&& left, Stream<R>&& right);
The first two of these signatures provide mapping operations, applying the
$ operator to every element of the stream with
the given value on the left or the right. For example:
auto stream1 = MakeStream::counter(1.0) / 3.0; // 1/3, 2/3, 1, 4/3, ....
auto stream2 = 5 * MakeStream::counter(0); // 0, 5, 10, 15, ...
The third signature provides an zipping operation of the elements in
both streams, using the $ operator as
the zipping function. For example, one can compute the dot product
of two streams as follows:
Stream<double> v1 = /* ... */, v2 = /* ... */;
double dot_product = (v1 * v2).sum();
Unsupported overloadable binary operators:
- Assignment operators, =, +=, *=, etc.
- Subscript operator, [] (In consideration)
- Member pointer operator, ->*
- Comma operator, ,
Method signature:
Stream<X> operator$ (Stream<T>&& stream);
This provides a mapping operation, applying the $
operator to every element of the stream. For example, to deference a stream
of pointers, we simply call *stream.Unsupported overloadable unary operators:
- Postfix/prefix increment/decrement operators, ++, --
- Reference operator, &
- Structure dereference operator, ->
- Conversion operator operator, (type)
- Allocation operators, new, new[]
- Deallocation operators, delete, delete[]
- Function application, (args...) (Coming soon!)
Stateful Intermediate Stream Operators
These operators insert another stage of a stream pipeline or combine two streams in some way forming a new stream, which contains the data of the old streams. These operations accumulate some non constant amount of space to store their necessary state.
Stream<T> Stream<T>::state_point();
Forces the stream to collect its state before continuing with operations. This
is a good way to allow a complete peek into the data at a certain point in the
stream pipeline.
Example:
MakeStream::counter(1)
.peek([](int x) {
std::cout << "x = " << x << std::endl;
})
.limit(3)
.state_point()
.map([](int x) { return 2 * x; })
.for_each([](int x) {
std::cout << "2x = " << x << std::endl;
});
Produces the following output:
x = 1
x = 2
x = 3
2x = 2
2x = 4
2x = 6
Without the call to sate_point(),
the following would have been the output:
x = 1
2x = 2
x = 2
2x = 4
x = 3
2x = 6
Note that calling stream.state_point() is
functionally equivalent to calling
MakeStream::from(stream.to_deque()).
template<typename Less = std::less<T>>
Stream<T> Stream<T>::sort(Less&& less = Less());
Sorts the elements of the streams, using the given less than. By default,
uses the built in < operator.
Example:
MakeStream::from({3, 1, 5, 3, 2, 8})
.sort(); // Stream contains 1, 2, 3, 3, 5, 8
template<typename Less = std::less<T>>
Stream<T> Stream<T>::distinct(Less&& less = Less());
Removes duplicate elements from the stream and returns the results
in sorted order.
Example:
MakeStream::from({3, 1, 3, 5, 2, 5, 8, 8})
.distinct(); // Stream contains 1 2 3 5 8
Terminal Stream Operators
size_t Stream<T>::count();
Returns the number of elements in the stream.
T Stream<T>::sum(const T& identity);
T Stream<T>::sum();
Sums the elements of the stream, returning the total. If no additive
identity is provided and the stream is empty, throws an
EmptyStreamException. If an identity is
provided and the stream is empty, the identity is returned.Specializations:
Specializations of sum() exist for types that have are arithmetic (given by std::is_arithmetic<T>). In this case, the no argument version of sum() assumes that the additive identity is 1.
T Stream<T>::product(const T& identity);
T Stream<T>::product();
Sums the elements of the stream, returning the total. If no multiplicative
identity is provided and the stream is empty, throws an
EmptyStreamException. If an identity is
provided and the stream is empty, the identity is returned.Specializations:
Specializations of sum() exist for types that have are arithmetic (given by std::is_arithmetic<T>). In this case, the no argument version of sum() assumes that the multiplicative identity is 1.
template<typename Less = std::less<T>>
T Stream<T>::min(Less&& less = Less());
Returns the smallest element of the stream as given by the provided
less than operator (by default, the built in <
operator); If the stream is empty, throws an
EmptyStreamException.
template<typename Less = std::less<T>>
T Stream<T>::max(Less&& less = Less());
Returns the largest element of the stream as given by the provided
less than operator (by default, the built in <
operator); If the stream is empty, throws an
EmptyStreamException.
template<typename Less = std::less<T>>
std::pair<T, T> Stream<T>::minmax(Less&& less = Less());
Returns the smallest and largests of the stream as given by the provided
less than operator (by default, the built in <
operator); The first field of the resulting pair is the minimum,
the second field is the maximum element. If the stream is empty, throws an
EmptyStreamException.
T Stream<T>::first();
Returns the first element of the stream. If the stream is empty, throws an
EmptyStreamException.
T Stream<T>::last();
Returns the last element of the stream. If the stream is empty, throws an
EmptyStreamException.
T Stream<T>::nth(size_t index);
Returns the nth of the stream, indexed starting at 0. If the
stream does not contain that many elements, throws an
EmptyStreamException.
Calling stream.nth(n) is functionally
equivalent to calling stream.skip(n).first().
U Stream<T>::reduce(const U& identity, Accumulator&& accum);
U Stream<T>::reduce(IdentityFn&& identity, Accumulator&& accum);
T Stream<T>::reduce(Accumulator&& accum);
Performs a reduction (or a fold) of the elements in the stream. There are three
forms of the reduce operation, given by the signatures above.The first reduce operation takes some identity element of type U and an accumulator function that should take two arguments, the first of type U, the second of type T and return an element of type U. If the stream is empty, the identity value will be returned. The functionality is equivalent to:
U result = identity
for(T element : stream) {
result = accum(result, element);
}
return result;
An example of this type of reduction is computing a sum with identity.
For example, computing the sum of a stream of integers could be computed by:
int sum = stream.reduce(0, [](int left, int right) {
return left + right;
});
The last two reduce operations don't handle cases where the stream is
empty. In other words, there is no identity element. In these cases,
if the stream is empty, an EmptyStreamException
is thrown. The second reduce operation takes and identity function which converts elements of the stream into elements of the result type, and an accumulator which should behave just as before. The functionality is equivalent to (in pseudocode):
U result = identity(stream[0]);
for(T element : stream[1 ...]) {
result = accum(result, element);
}
return result;
This is the type of reduction that is used to compute
minmax(), which can be implemented as
follows:
auto to_pair = [](auto x) {
return std::make_pair(x, x);
};
auto next_minmax = [](const auto& prev_minmax, auto& value) {
if(value < prev_minmax.first) {
return std::make_pair(value, prev_minmax.second);
} else if (value > prev_minmax.second) {
return std::make_pair(prev_minmax.first, value);
} else {
return prev_minmax;
}
};
auto minmax = stream.reduce(to_pair, next_minmax);
The final reduce operation is a convenience for the second reduce wherein
the identity function is just a function that returns its argument and
the accumulator takes both arguments of type T
and returns a value of type T. The functionality
is equivalent to (in pseudocode):
T result = stream[0];
for(T element : stream[1 ...]) {
result = accum(result, element);
}
return result;
This reduce operation can be used to compute a sum without identity:
stream.reduce(std::add<void>());
template<typename U>
U Stream<T>::reduce_by(const Reducer<T, U>& reducer);
Reduces the stream using a Reducer object.
This is really a convenience for packaging up reductions to pass into
reduce(IdentityFn&&, Accumulator&&). The
Reducer class has the following definition:
template<typename In, typename Out>
struct Reducer {
virtual Out initial(In& in) const = 0;
virtual Out accumulate(Out& out, In& in) const = 0;
};
Provided subclasses reside in the Reducers
namespace. One such implementation is the SummaryStats
reducer:
auto stats = MakeStream::normal_randoms()
.limit(1000)
.reduce_by(Reducers::SummaryStats());
std::cout << stats << std::endl;
Would produce output that looks like:
N=1000, u=0.00986662, s=0.980492, min=-3.7676, max=3.52161
T Stream<T>::random_element();
Returns a random element drawn uniformly from the elements of the stream.
If the stream is empty, throws an
EmptyStreamException.
If the stream is infinite, this will never terminate.
std::vector<T> Stream<T>::random_sample(size_t size);
Returns a random sample of elements of the given size drawn randomly
from the stream. If the stream has less elements than the requested size,
the sample will consist of all of the elements of the stream. If the
stream is infinite, this will never terminate.
template<typename Predicate>
bool Stream<T>::any(Predicate&& predicate);
Returns true if some element of the stream matches the given predicate.
If the stream is empty, the result is vacuously false. If the stream is
infinite, this operation will shortcut in the case that an element that
matches the predicate is found.Specializations:
A specialization of any() exists for types that have an implicit conversion to bool. For these types, the following specialization exists:
Stream<T> Stream<T>::any();
This returns true if any element of the stream has a conversion to a
bool with value true.
template<typename Predicate>
bool Stream<T>::all(Predicate&& predicate);
Returns true if all of the elements of the stream matches the given predicate.
If the stream is empty, the result is vacuously true. If the stream is
infinite, this operation will shortcut in the case that an element that
doesn't match the predicate is found.Specializations:
Specializations of any() exist for types that have an implicit conversion to bool. For these types, the following specializations exist:
Stream<T> Stream<T>::all();
Stream<T> Stream<T>::not_all();
The first returns true if all of the elements of the stream have a conversion to a
bool with value true. The second is a convenience for
the logical not of the first.
template<typename Predicate>
bool Stream<T>::none(Predicate&& predicate);
Returns true if none of the elements of the stream matches the given predicate.
If the stream is empty, the result is vacuously true. If the stream is
infinite, this operation will shortcut in the case that an element that
doesn't match the predicate is found.Specializations:
A specialization of none() exists for types that have an implicit conversion to bool. For these types, the following specialization exists:
Stream<T> Stream<T>::none();
This returns true if none of the elements of the stream have a conversion to a
bool with value true.
template<typename Function>
void Stream<T>::for_each(Function&& function);
Calls the given function on each element of the stream.
std::vector<T> Stream<T>::to_vector();
std::list<T> Stream<T>::to_list();
std::deque<T> Stream<T>::to_deque();
std::set<T> Stream<T>::to_set();
std::multiset<T> Stream<T>::to_multiset();
Creates a container of the given type and inserts the elements of the
stream into that container thereafter returning the contianer.
template<typename OutputIterator>
OutputIterator Stream<T>::copy_to(OutputIterator iterator);
Copies the elements of the stream into the given iterator, returning
the iterator one past the end of the sequence, much like
std::copy.
template<typename OutputIterator>
OutputIterator Stream<T>::move_to(OutputIterator iterator);
Moves the elements of the stream into the given iterator, returning
the iterator one past the end of the sequence, much like
std::move.
std::ostream& Stream<T>::print_to(std::ostream& os,
const char* delimiter = ' ');
Prints the elements of the stream to the given
ostream, with a delimiter after every element. The return value
is simply the input stream, thus allowing chaining of stream insertions.
Calling stream.print_to(os, delimiter) is a convenience method for:
stream.copy_to(std::ostream_iterator<T>(os), delimiter);
Stream Exceptions
class StreamException {
public:
explicit StreamException(const std::string& msg);
explicit StreamException(const char* msg);
std::string what() const;
}
The root class of all stream based exceptions.
class EmptyStreamException : public StreamException {
public:
explicit EmptyStreamException(const std::string& method);
}
The exception that gets thrown when attempting to call some terminal
stream operation that has no identity element for that operation on
a stream that is empty.
Example:
try {
MakeStream::empty().min()
} catch(EmptyStreamException& e) {
std::cout << e.what() << std::endl;
}
Produces the following output:
No terminal result for operation Stream::min
class VacantStreamException : public StreamException {
public:
explicit EmptyStreamException(const std::string& method);
}
The exception that gets thrown when attempting to call any stream operation
on some stream that no longer holds data. A stream is considered vacant when
it has been moved from some other stream, or some stream operation has been
called on the stream. To check vacancy, use the
occupied() method.
Example:
auto stream1 = MakeStream::range(0, 5);
auto stream2 = std::move(stream1); // Copying disallowed
try {
stream1.skip(2);
} catch(VacantStreamException& e) {
std::cout << e.what() << std::endl;
}
stream2.skip(2); // Didn't save it so stream2's data is lost!
try {
stream2.first();
} catch(VacantStreamException& e) {
cout << e.whate() << endl;
}
Produces the following output:
Cannot call Stream::skip on a vacant stream
Cannot call Stream::first on a vacant stream
class ConsumedIteratorException : public StreamException {
public:
explicit ConsumedIteratorException(const std::string& op);
}
The exception that gets thrown when attempting to call some non dereferencing
operation on a stream iterator after that iterator is declared to be
in a "consumed" state. For a discussion of this, see the comments for
the begin() method.
Example:
auto stream = MakeStream::closed_range(1,10);
auto iter = stream.begin();
cout << *iter << endl;
auto temp_iter = iter++;
cout << *temp_iter << endl;
try {
++temp_iter;
} catch(ConsumedIteratorException& e) {
cout << e.what() << endl;
}
Produces the following output:
1
2
Cannot perform prefix increment on consumed stream iterator.
class StopStream : public StreamException {
public:
explicit StopStream();
}
An exception to be thrown by the user in any intermediary or generating
(but not terminating) stream operation, if they want the stream to stop
iterating at that point. This is most often helpful with the
generate()
method.
Example:
std::ifstream fin = /* ... */
auto stream = MakeStream::generate([&fin]() {
std::string line;
if(!std::getline(fin, line))
throw StopStream(); // Stop iterating when we reach EOF
return line;
});
// Silly version of take while
stream.peek([](auto& value) {
if(!pred(value))
throw StopStream();
});
Stream Recipes
These will go here eventually.C++ Streams are developed by Jonah Scheinerman. Please contact me if you have question or concerns.
C++ Streams are distributed under the MIT open source license.
Copyright © 2014 by Jonah Scheinerman