Streams
Streams are the most common type of live collection in Hydro; they can be used to model streaming data collections or a feed of API requests. A Stream
represents a sequence of elements, with new elements being asynchronously appended to the end of the sequence. Streams can be transformed using APIs like map
and filter
, based on Rust iterators. You can view the full API documentation for Streams here.
Streams have several type parameters:
T
: the type of elements in the streamL
: the location the stream is on (see Locations)B
: indicates whether the stream is bounded or unboundedOrder
: indicates whether the elements in the stream have a deterministic order or not- This type parameter is optional; by default the order is deterministic
Creating a Stream
The simplest way to create a stream is to use Location::source_iter
, which creates a stream from any Rust type that can be converted into an Iterator
(via IntoIterator
). For example, we can create a stream of integers on a process and transform it:
let process = flow.process::<()>();
let numbers: Stream<_, Process<_>, Unbounded> = process
.source_iter(q!(vec![1, 2, 3]))
.map(q!(|x| x + 1));
// 2, 3, 4
Streams also can be sent over the network to participate in distributed programs. Under the hood, sending a stream sets up an RPC handler at the target location that will receive the stream elements. For example, we can send a stream of integers from one process to another with bincode serialization:
let p1 = flow.process::<()>();
let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
let p2 = flow.process::<()>();
let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
// 1, 2, 3
Stream Ordering and Determinism
When sending a stream over the network, there are certain situations in which the order of messages will not be deterministic for the receiver. For example, when sending streams from a cluster to a process, delays will cause messages from different cluster members to be interleaved in a non-deterministic order.
To track this behavior, stream have an Order
type parameter that indicates whether the elements in the stream will have a deterministic order (TotalOrder
) or not (NoOrder
). When the type parameter is omitted, it defaults to TotalOrder
for brevity.
If we send a stream from a cluster to a process, the return type will be a stream with NoOrder
:
let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Cluster<_>, Unbounded, TotalOrder> =
workers.source_iter(q!(vec![1, 2, 3]));
let process: Process<()> = flow.process::<()>();
let on_p2: Stream<_, Process<_>, Unbounded, NoOrder> =
numbers.send_bincode(&process);
The ordering of a stream determines which APIs are available on it. For example, map
and filter
are available on all streams, but last
is only available on streams with TotalOrder
. This ensures that even when the network introduces non-determinism, the program will not compile if it tries to use an API that requires a deterministic order.
A particularly common API that faces this restriction is fold
(and reduce
). These APIs require the stream to have a deterministic order, since the result may depend on the order of elements. For example, the following code will not compile because fold
is not available on NoOrder
streams (note that the error is a bit misleading due to the Rust compiler attempting to apply Iterator
methods):
let workers: Cluster<()> = flow.cluster::<()>();
let process: Process<()> = flow.process::<()>();
let all_words: Stream<_, Process<_>, Unbounded, NoOrder> = workers
.source_iter(q!(vec!["hello", "world"]))
.map(q!(|x| x.to_string()))
.send_bincode_anonymous(&process);
let words_concat = all_words
.fold(q!(|| "".to_string()), q!(|acc, x| acc += x));
// ^^^^ error: `hydro_lang::Stream<String, hydro_lang::Process<'_>, hydro_lang::Unbounded, NoOrder>` is not an iterator
We use send_bincode_anonymous
here to drop the cluster IDs which are included in send_bincode
. See Clusters for more details.
Running an aggregation (fold
, reduce
) converts a Stream
into a Singleton
, as we see in the type signature here. The Singleton
type is still "live" in the sense of a Live Collection, so updates to the Stream
input cause updates to the Singleton
output. See Singletons and Optionals for more information.
To perform an aggregation with an unordered stream, you must use fold_commutative
, which requires the provided closure to be commutative (and therefore immune to non-deterministic ordering):
let words_count = all_words
.fold_commutative(q!(|| 0), q!(|acc, x| *acc += 1));
Developers are responsible for the commutativity of the closure they pass into *_commutative
methods. In the future, commutativity checks will be automatically provided by the compiler (via tools like Kani).
Bounded and Unbounded Streams
The Hydro documentation is currently under active development! This is a placeholder for future content.