gen_stage v0.5.0 Experimental.Flow
Computational flows with stages.
Flow allows developers to express computations
on collections, similar to the Enum and Stream modules,
although computations will be executed in parallel using
multiple GenStages.
Flow was also designed to work with both bounded (finite) and unbounded (infinite) data. Allowing the data to be partitioned into arbitrary windows which are materialized at different triggers.
Note: this module is currently namespaced under
Experimental.Flow. You will need to alias Experimental.Flow
before writing the examples below.
As an example, let’s implement the classical word counting
algorithm using flow. The word counting program will receive
one file and count how many times each word appears in the
document. Using the Enum module it could be implemented
as follows:
File.stream!("path/to/some/file")
|> Enum.flat_map(&String.split(&1, " "))
|> Enum.reduce(%{}, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()
Unfortunately the implementation above is not quite efficient
as Enum.flat_map/2 will build a list with all the words in
the document before reducing it. If the document is, for example,
2GB, we will load 2GB of data into memory.
We can improve the solution above by using the Stream module:
File.stream!("path/to/some/file")
|> Stream.flat_map(&String.split(&1, " "))
|> Enum.reduce(%{}, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()
Now instead of loading the whole set into memory, we will only keep the current line in memory while we process it. While this allows us to process the whole data set efficiently, it does not leverage concurency. Flow solves that:
alias Experimental.Flow
File.stream!("path/to/some/file")
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()
To convert from stream to flow, we have done two changes:
- We have replaced the calls to
StreambyFlow - We called
partition/1so words are properly partitioned between stages
The example above will now use all cores available as well
as keep an on going flow of data instead of traversing them
line by line. Once all data is computed, it is sent to the
process which invoked Enum.to_list/1.
While we gain concurrency by using flow, many of the benefits in using flow is in the partioning the data. We will discuss the need for data partioning next.
Partitioning
To understand the need to partion the data, let’s change the example above and remove the partition call:
alias Experimental.Flow
File.stream!("path/to/some/file")
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.reduce(fn -> %{} end, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()
The example above will execute the flat_map and reduce
operations in parallel inside multiple stages. When running
on a machine with two cores:
[file stream] # Flow.from_enumerable/1 (producer)
| |
[M1] [M2] # Flow.flat_map/2 + Flow.reduce/3 (consumer)
Now imagine that the M1 and M2 stages above receive the
following lines:
M1 - "roses are red"
M2 - "violets are blue"
flat_map/2 will break them into:
M1 - ["roses", "are", "red"]
M2 - ["violets", "are", "blue"]
Then reduce/3 will make each stage have the following state:
M1 - %{"roses" => 1, "are" => 1, "red" => 1}
M2 - %{"violets" => 1, "are" => 1, "blue" => 1}
Which is converted to the list (in no particular ordering):
[{"roses", 1},
{"are", 1},
{"red", 1},
{"violets", 1},
{"are", 1},
{"blue", 1}]
Although both stages have performed word counting, we have words like “are” that appears on both stages. This means we would need to perform yet another pass on the data merging the duplicated words accross stages.
Partioning solves this by introducing a new set of stages and
making sure the same word is always mapped to the same stage
with the help of a hash function. Let’s introduce the call to
partition/1 back:
alias Experimental.Flow
File.stream!("path/to/some/file")
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()
Now we will have the following topology:
[file stream] # Flow.from_enumerable/1 (producer)
| |
[M1] [M2] # Flow.flat_map/2 (producer-consumer)
|\ /|
| \/ |
|/ \ |
[R1] [R2] # Flow.reduce/3 (consumer)
If the M1 and M2 stages receive the same lines and break
them into words as before:
M1 - ["roses", "are", "red"]
M2 - ["violets", "are", "blue"]
Now any given word will be consistently routed to R1 or R2
regardless of its origin. The default hashing function will route
them such as:
R1 - ["roses", "are", "red", "are"]
R2 - ["violets", "blue"]
Resulting in the reduced state of:
R1 - %{"roses" => 1, "are" => 2, "red" => 1}
R2 - %{"violets" => 1, "blue" => 1}
Which is converted to the list (in no particular ordering):
[{"roses", 1},
{"are", 2},
{"red", 1},
{"violets", 1},
{"blue", 1}]
In a way that each stage has a distinct subset of the data. This way, we know we don’t need to merge the data later on as the word in each stage is guaranteed to be unique.
Partioning the data is a very useful technique. For example, if we want to count the number of unique elements in a dataset, we could perform such count in each partition and then later sum their results as the partitioning guarantees the data on each partition won’t overlap. A unique element would never be counted twice.
The topology above alongside partitioning is very common in the MapReduce programming model which we will briefly discuss next.
MapReduce
The MapReduce programming model forces us to break our computations in two stages: map and reduce. The map stage is often quite easy to parallellize because events are processed individually and in isolation. The reduce stages need to group the data either partially or completely.
In the example above, the stages executing flat_map/2 are the
mapper stages. Because the flat_map/2 function works line by line,
we can have two, four, eight or more mapper processes that will
break line by line into words without any need for coordination.
However, the reducing stage is a bit more complicated. Reducer stages typically aggregate some result based on its inputs, such as how many times a word have appeared. This implies reducer computations need to traverse the whole data set and, in order to do so in parallel, we partition the data into distinct datasets.
The goal of the reduce/3 operation is to accumulate a value
which then becomes the partition state. Any operation that
happens after reduce/3 work on the whole state and are only
executed after all the data for a partition is collected.
While this approach works great for bounded (finite) data, it is quite limited for unbounded (infinite) data. After all, if the reduce operation needs to traverse the whole partition to complete, how can we do so if the data never finishes?
To answer this question, we need to talk about data completion, triggers and windows.
Data completion, windows and triggers
When working with an unbounded stream of data, there is no such thing as data completion. Therefore when can we consider a reduce function to be “completed”?
To handle such cases, Flow provides windows and triggers. Windows allow us to split the data based on the event time while triggers tells us when to write the results we have computed so far. By introducing windows, we no longer think the events are partitioned across stages. Instead each event belongs to a window and the window is partitioned across the stages.
By default all events belong to the same window, called global
window, which is partitioned across stages. However different
windowing strategies may be used by building a Flow.Window
and passing it to the Flow.partition/3 function.
Once a window is specified, we can build triggers that tells us when to checkpoint the data, allowing us to report our progress while the data streams through the system, regardless if the data is bounded (finite) or unbounded (infinite).
Windows and triggers effectively control how the reduce/3 function
work. reduce/3/ is invoked per window while a trigger configures
when reduce/3 halts so we can checkpoint the data before resuming
the computation with an old or new accumulator. See Flow.Window
for a complete introduction into windows and triggers.
Long running-flows
In the examples so far we have started a flow dynamically
and consumed it using Enum.to_list/1. Unfortunately calling
a function from Enum will cause the whole computed dataset
to be sent to a single process.
In many situations, this is either too expensive or completely undesired. For example, in data-processing pipelines, it is common to constantly receive data from external sources. This data is either written to disk or to another storage after processed, without a need to be sent to a single process.
Flow allows computations to be started as a group of processes
which may run indefinitely. Such can can be done by starting
the flow as part of a supervision tree using Flow.start_link/2.
Flow.into_stages/3 can also be used to start the flow as a
linked process which will send the events to a given consumers.
Performance discussions
In this section we will discuss points related to performance with flows.
Know your code
There are many optimizations we could perform in the flow above that are not necessarily related to flows themselves. Let’s rewrite the flow above using some of them:
alias Experimental.Flow
# The parent process which will own the table
parent = self()
# Let's compile common patterns for performance
empty_space = :binary.compile_pattern(" ") # BINARY
File.stream!("path/to/some/file", read_ahead: 100_000) # READ_AHEAD
|> Flow.from_enumerable()
|> Enum.flat_map(&String.split(&1, empty_space)) # BINARY
|> Flow.partition()
|> Flow.reduce(fn -> :ets.new(:words, []) end, fn word, ets -> # ETS
:ets.update_counter(ets, word, {2, 1}, {word, 0})
ets
end)
|> Flow.map_state(fn ets -> # ETS
:ets.give_away(ets, parent, [])
[ets]
end)
|> Enum.to_list()
We have performed three optimizations:
BINARY - the first optimization is to compile the pattern we use to split the string on
READ_AHEAD - the second optimization is to use the
:read_aheadoption for file streams allowing us to do less IO operations by reading large chunks of data at once- ETS - the third stores the data in a ETS table and uses its counter
operations. For counters and large dataset this provide a great
performance benefit as it generates less garbage. At the end, we
call
map_state/2to transfer the ETS table to the parent process and wrap the table in a list so we can access it onEnum.to_list/1. Such step is not strictly required. For example, one could write the table to disk with:ets.tab2file/2at the end of the computation
Configuration (demand and the number of stages)
Both new/2 and partition/3 allows a set of options to configure
how flows work. In particular, we recommend developers to play with
the :min_demand and :max_demand options, which control the amount
of data sent between stages. The difference between max_demand and
min_demand works as the batch size when the producer is full. If the
producer has less events than the batch size, its current events are
sent.
If stages may perform IO computations, we also recommend increasing
the number of stages. The default value is System.schedulers_online/0,
which is a good default if the stages are CPU bound, however, if stages
are waiting on external resources or other processes, increasing the
number of stages may be helpful.
Avoid single sources
In the examples so far we have used a single file as our data source. In practice such should be avoided as the source could end-up being the bottleneck of our whole computation.
In the file stream case above, instead of having one single large file, it is preferrable to break the file into smaller ones:
streams = for file <- File.ls!("dir/with/files") do
File.stream!("dir/with/files/#{file}", read_ahead: 100_000)
end
streams
|> Flow.from_enumerables()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.reduce(fn -> %{} end, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()
Instead of calling from_enumerable/1, we now called
from_enumerables/1 which expects a list of enumerables to
be used as source. Notice every stream also uses the :read_ahead
option which tells Elixir to buffer file data in memory to
avoid multiple IO lookups.
If the number of enumerables is equal to or more than the number of cores, flow will automatically fuse the enumerables with the mapper logic. For example, if three file streams are given as enumerables to a machine with two cores, we will have the following topology:
[F1][F2][F3] # file stream
[M1][M2][M3] # Flow.flat_map/2 (producer)
|\ /\ /|
| /\/\ |
|// \\|
[R1][R2] # Flow.reduce_by_key/2 (consumer)
Summary
Functions
Joins two bounded (finite) flows
Applies the given function to each input without modifying it
Applies the given function over the stage state without changing its value
Controls which values should be emitted from now
Applies the given function filtering each input in parallel
Applies the given function filtering and mapping each input in parallel
Applies the given function mapping each input in parallel and flattening the result, but only one level deep
Starts a flow with the given enumerable as producer
Sets the given enumerable as a producer in the given flow
Starts a flow with the list of enumerables as producers
Sets the given enumerables as producers in the given flow
Starts a flow with the given stage as producer
Sets the given stage as a producer in the given flow
Starts a flow with the list of stages as producers
Sets the given stages as producers in the given flow
Starts and runs the flow as a separate process which
will be a producer to the given consumers
Applies the given function mapping each input in parallel
Applies the given function over the window state
Merges the given flows in a new partition
Merges the given flows in a new partition with the given window or options
Merges the given flow into a new partition with the given window and options
Creates a new flow
Creates a new flow with the given window or options
Starts a new flow with the given window and options
Creates a new partition for the given flow
Creates a new partition for the given flow with the given window or options
Partitions the flow using the given window and options
Reduces the given values with the given accumulator
Applies the given function rejecting each input in parallel
Runs a given flow
Starts and runs the flow as a separate process
Only emit unique events
Only emit events that are unique according to the by function
Joins two flows with the given window
Types
t :: %Experimental.Flow{operations: [operation], options: keyword, producers: producers, window: Experimental.Flow.Window.t}
Functions
Specs
Joins two bounded (finite) flows.
It expects the left and right flow, the left_key and
right_key to calculate the key for both flows and the join
function which is invoked whenever there is a match.
A join creates a new partitioned flow that subscribes to the
two flows given as arguments. The newly created partitions
will accumulate the data received from both flows until there
is no more data. Therefore, this function is useful for merging
finite flows. If used for merging infinite flows, you will
eventually run out of memory due to the accumulated data. See
window_join/8 for applying a window to a join, allowing the
join data to be reset per window.
The join has 4 modes:
:inner- data will only be emitted when there is a match between the keys in left and right side:left_outer- similar to:innerplus all items given in the left that did not have a match will be emitted at the end withnilfor the right value:right_outer- similar to:innerplus all items given in the right that did not have a match will be emitted at the end withnilfor the left value:full_outer- similar to:innerplus all items given in the left and right that did not have a match will be emitted at the end withnilfor the right and left value respectively
The joined partitions can be configured via options with the
same values as shown on new/1.
Examples
iex> posts = [%{id: 1, title: "hello"}, %{id: 2, title: "world"}]
iex> comments = [{1, "excellent"}, {1, "outstanding"},
...> {2, "great follow up"}, {3, "unknown"}]
iex> flow = Flow.bounded_join(:inner,
...> Flow.from_enumerable(posts),
...> Flow.from_enumerable(comments),
...> & &1.id, # left key
...> & elem(&1, 0), # right key
...> fn post, {_post_id, comment} -> Map.put(post, :comment, comment) end)
iex> Enum.sort(flow)
[%{id: 1, title: "hello", comment: "excellent"},
%{id: 2, title: "world", comment: "great follow up"},
%{id: 1, title: "hello", comment: "outstanding"}]
Applies the given function to each input without modifying it.
Examples
iex> parent = self()
iex> [1, 2, 3] |> Flow.from_enumerable() |> Flow.each(&send(parent, &1)) |> Enum.sort()
[1, 2, 3]
iex> receive do
...> 1 -> :ok
...> end
:ok
Specs
each_state(t, (term -> term) | (term, term -> term) | (term, term, {Experimental.Flow.Window.type, Experimental.Flow.Window.id, Experimental.Flow.Window.trigger} -> term)) :: t
Applies the given function over the stage state without changing its value.
It is similar to map_state/2 except that the value returned by mapper
is ignored.
iex> parent = self()
iex> flow = Flow.from_enumerable(["the quick brown fox"]) |> Flow.flat_map(fn word ->
...> String.graphemes(word)
...> end)
iex> flow = flow |> Flow.partition(stages: 2) |> Flow.reduce(fn -> %{} end, &Map.put(&2, &1, true))
iex> flow = flow |> Flow.each_state(fn map -> send(parent, map_size(map)) end)
iex> Flow.run(flow)
iex> receive do
...> 6 -> :ok
...> end
:ok
iex> receive do
...> 10 -> :ok
...> end
:ok
Controls which values should be emitted from now.
It can either be :events (the default) or the current
stage state as :state. This step must be called after
the reduce operation and it will guarantee the state is
a list that can be sent downstream.
Most commonly, each partition will emit the events it has
processed to the next stages. However, sometimes we want
to emit counters or other data structures as a result of
our computations. In such cases, the :emit option can be
set to :state, to return the :state from reduce/3
or map_state/2 or even the processed collection as a whole.
Applies the given function filtering each input in parallel.
Examples
iex> flow = [1, 2, 3] |> Flow.from_enumerable() |> Flow.filter(& rem(&1, 2) == 0)
iex> Enum.sort(flow) # Call sort as we have no order guarantee
[2]
Applies the given function filtering and mapping each input in parallel.
Examples
iex> flow = [1, 2, 3] |> Flow.from_enumerable() |> Flow.filter_map(& rem(&1, 2) == 0, & &1 * 2)
iex> Enum.sort(flow) # Call sort as we have no order guarantee
[4]
Specs
flat_map(t, (term -> Enumerable.t)) :: t
Applies the given function mapping each input in parallel and flattening the result, but only one level deep.
Examples
iex> flow = [1, 2, 3] |> Flow.from_enumerable() |> Flow.flat_map(fn(x) -> [x, x * 2] end)
iex> Enum.sort(flow) # Call sort as we have no order guarantee
[1, 2, 2, 3, 4, 6]
Specs
from_enumerable(Enumerable.t) :: t
Starts a flow with the given enumerable as producer.
Calling this function is equivalent to:
Flow.new |> Flow.from_enumerables([enumerable])
See GenStage.from_enumerable/2 for information and
limitations on enumerable-based stages.
Examples
"some/file"
|> File.stream!(read_ahead: 100_000)
|> Flow.from_enumerable()
Specs
from_enumerable(t, Enumerable.t) :: t
Sets the given enumerable as a producer in the given flow.
See GenStage.from_enumerable/2 for information and
limitations on enumerable-based stages.
Examples
file = File.stream!("some/file", read_ahead: 100_000)
Flow.from_enumerable(Flow.new, file)
Specs
from_enumerables([Enumerable.t]) :: t
Starts a flow with the list of enumerables as producers.
Calling this function is equivalent to:
Flow.new |> Flow.from_enumerables(enumerables)
See GenStage.from_enumerable/2 for information and
limitations on enumerable-based stages.
Examples
files = [File.stream!("some/file1", read_ahead: 100_000),
File.stream!("some/file2", read_ahead: 100_000),
File.stream!("some/file3", read_ahead: 100_000)]
Flow.from_enumerable(files)
Specs
from_enumerables(t, [Enumerable.t]) :: t
Sets the given enumerables as producers in the given flow.
See GenStage.from_enumerable/2 for information and
limitations on enumerable-based stages.
Examples
files = [File.stream!("some/file1", read_ahead: 100_000),
File.stream!("some/file2", read_ahead: 100_000),
File.stream!("some/file3", read_ahead: 100_000)]
Flow.from_enumerable(Flow.new, files)
Specs
from_stage(Experimental.GenStage.stage) :: t
Starts a flow with the given stage as producer.
Calling this function is equivalent to:
Flow.new |> Flow.from_stages([stage])
Examples
Flow.from_stage(MyStage)
Specs
from_stage(t, Experimental.GenStage.stage) :: t
Sets the given stage as a producer in the given flow.
Examples
Flow.from_stage(Flow.new, MyStage)
Specs
from_stages([Experimental.GenStage.stage]) :: t
Starts a flow with the list of stages as producers.
Calling this function is equivalent to:
Flow.new |> Flow.from_stages(stages)
Examples
stages = [pid1, pid2, pid3]
Flow.from_stage(stages)
Specs
from_stages(t, [Experimental.GenStage.stage]) :: t
Sets the given stages as producers in the given flow.
Examples
stages = [pid1, pid2, pid3]
Flow.from_stage(Flow.new, stages)
Specs
into_stages(t, consumers, keyword) :: GenServer.on_start when consumers: [Experimental.GenStage.stage | {Experimental.GenStage.stage, keyword}]
Starts and runs the flow as a separate process which
will be a producer to the given consumers.
It expects a list of consumers to subscribe to. Each element
represents the consumer or a tuple with the consumer and the
subscription options as defined in GenStage.sync_subscribe/2.
Receives the same options as start_link/2.
Applies the given function mapping each input in parallel.
Examples
iex> flow = [1, 2, 3] |> Flow.from_enumerable() |> Flow.map(& &1 * 2)
iex> Enum.sort(flow) # Call sort as we have no order guarantee
[2, 4, 6]
iex> flow = Flow.from_enumerables([[1, 2, 3], 1..3]) |> Flow.map(& &1 * 2)
iex> Enum.sort(flow)
[2, 2, 4, 4, 6, 6]
Specs
map_state(t, (term -> term) | (term, term -> term) | (term, term, {Experimental.Flow.Window.type, Experimental.Flow.Window.id, Experimental.Flow.Window.trigger} -> term)) :: t
Applies the given function over the window state.
This function must be called after reduce/3 as it maps over
the state accumulated by reduce/3. map_state/2 is invoked
per window on every stage whenever there is a trigger: this
gives us an understanding of the window data while leveraging
the parallelism between states.
The mapper function may have arity 1, 2 or 3:
- when one - the state is given as argument
- when two - the state and the stage indexes are given as arguments. The index is a tuple with the current stage index as first element and the total number of stages for this partition as second
- when three - the state, the stage indexes and a tuple with window-
trigger parameters are given as argument. The tuple contains the
window type, the window identifier and the trigger name. By default,
the window is
:global, which implies the:globalidentifier with a default trigger of:done, emitted when there is no more data to process.
The value returned by this function is passed forward to the upcoming flow functions.
Examples
We can use map_state/2 to transform the collection after
processing. For example, if we want to count the amount of
unique letters in a sentence, we can partition the data,
then reduce over the unique entries and finally return the
size of each stage, summing it all:
iex> flow = Flow.from_enumerable(["the quick brown fox"]) |> Flow.flat_map(fn word ->
...> String.graphemes(word)
...> end)
iex> flow = Flow.partition(flow)
iex> flow = Flow.reduce(flow, fn -> %{} end, &Map.put(&2, &1, true))
iex> flow |> Flow.map_state(fn map -> map_size(map) end) |> Flow.emit(:state) |> Enum.sum()
16
Merges the given flows in a new partition.
Calling this function is equivalent to:
Flow.merge(flows, Flow.Window.global, [])
See merge/3.
Examples
Flow.merge([flow1, flow2])
Specs
merge(t, Experimental.Flow.Window.t | keyword) :: t
Merges the given flows in a new partition with the given window or options.
See merge/3.
Examples
Flow.merge([flow1, flow2], Flow.Global.window)
Flow.merge([flow1, flow2], stages: 4)
Specs
merge([t], Experimental.Flow.Window.t, keyword) :: t
Merges the given flow into a new partition with the given window and options.
Every time this function is called, a new partition
is created. It is typically recommended to invoke it
before a reducing function, such as reduce/3, so data
belonging to the same partition can be kept together.
The window parameter is a Flow.Window struct which
controls how the reducing function behaves, see
Flow.Window for more information.
It accepts the same options and hash shortcuts as
partition/3. See partition/3 for more information.
Specs
new :: t
Creates a new flow.
Calling this function is equivalent to:
Flow.new(Flow.Window.global, [])
See new/2.
Examples
Flow.new
Specs
new(Experimental.Flow.Window.t | keyword) :: t
Creates a new flow with the given window or options.
See new/2.
Examples
Flow.new(Flow.Global.window)
Flow.new(stages: 4)
Specs
new(Experimental.Flow.Window.t, keyword) :: t
Starts a new flow with the given window and options.
Options
These options configure the stages connected to producers before partitioning.
:stages- the number of stages:buffer_keep- how the buffer should behave, seec:GenStage.init/1:buffer_size- how many events to buffer, seec:GenStage.init/1
All remaining options are sent during subscription, allowing developers
to customize :min_demand, :max_demand and others.
Creates a new partition for the given flow.
Calling this function is equivalent to:
Flow.partition(flow, Flow.Window.global, [])
See partition/3.
Examples
flow |> Flow.partition()
Specs
partition(t, Experimental.Flow.Window.t | keyword) :: t
Creates a new partition for the given flow with the given window or options.
See partition/3.
Examples
flow |> Flow.partition(Flow.Global.window)
flow |> Flow.partition(stages: 4)
Specs
partition(t, Experimental.Flow.Window.t, keyword) :: t
Partitions the flow using the given window and options.
Every time this function is called, a new partition
is created. It is typically recommended to invoke it
before a reducing function, such as reduce/3, so data
belonging to the same partition can be kept together.
The window parameter is a Flow.Window struct which
controls how the reducing function behaves, see
Flow.Window for more information.
Options
:stages- the number of partitions (reducer stages):hash- the hash to use when partitioning. It is a function that receives two arguments: the event to partition on and the maximum number of partitions. However, to facilitate customization,:hashalso allows common values, such{:elem, 0}, to specify the hash should be calculated on the first element of a tuple. See more information on the “Hash shortcuts” section below. The default value hashing function:erlang.phash2/2.:dispatcher- by default,partition/3usesGenStage.PartitionDispatcherwith the given hash function but any other dispatcher can be given
Hash shortcuts
The following shortcuts can be given to the :hash option:
{:elem, pos}- apply the hash function to the element at positionposin the given tuple{:key, key}- apply the hash function to the key of a given map
Reduces the given values with the given accumulator.
acc is a function that receives no arguments and returns
the actual accumulator. The acc function is invoked per window
whenever a new window starts. If a trigger is emitted and it is
configured to reset the accumulator, the acc function will
be invoked once again.
Reducing will accumulate data until the a trigger is emitted or until a window completes. When that happens, the returned accumulator will be the new state of the stage and all functions after reduce will be invoked.
Examples
iex> flow = Flow.from_enumerable(["the quick brown fox"]) |> Flow.flat_map(fn word ->
...> String.graphemes(word)
...> end)
iex> flow = flow |> Flow.partition |> Flow.reduce(fn -> %{} end, fn grapheme, map ->
...> Map.update(map, grapheme, 1, & &1 + 1)
...> end)
iex> Enum.sort(flow)
[{" ", 3}, {"b", 1}, {"c", 1}, {"e", 1}, {"f", 1},
{"h", 1}, {"i", 1}, {"k", 1}, {"n", 1}, {"o", 2},
{"q", 1}, {"r", 1}, {"t", 1}, {"u", 1}, {"w", 1},
{"x", 1}]
Applies the given function rejecting each input in parallel.
Examples
iex> flow = [1, 2, 3] |> Flow.from_enumerable() |> Flow.reject(& rem(&1, 2) == 0)
iex> Enum.sort(flow) # Call sort as we have no order guarantee
[1, 3]
Specs
run(t) :: :ok
Runs a given flow.
This runs the given flow as a stream for its side-effects. No items are sent from the flow to the current process.
Examples
iex> parent = self()
iex> [1, 2, 3] |> Flow.from_enumerable() |> Flow.each(&send(parent, &1)) |> Flow.run()
:ok
iex> receive do
...> 1 -> :ok
...> end
:ok
Specs
start_link(t, keyword) :: GenServer.on_start
Starts and runs the flow as a separate process.
See into_stages/3 in case you want the flow to
work as a producer for another series of stages.
Options
:dispatcher- the dispatcher responsible for handling demands. Defaults toGenStage.DemandDispatch. May be either an atom or a tuple with the dispatcher and the dispatcher options:demand- configures the demand on the flow producers to:forwardor:accumulate. The default is:forward. SeeGenStage.demand/2for more information.
Only emit unique events.
Calling this function is equivalent to:
Flow.uniq_by(flow, & &1)
See uniq_by/2 for more information.
Only emit events that are unique according to the by function.
In order to verify if an item is unique or not, uniq_by/2
must store the value computed by by/1 into a set. This means
that, when working with unbounded data, it is recommended to
wrap uniq_by/2 in a window otherwise the data set will grow
forever, eventually using all memory available.
Also keep in mind that uniq_by/2 is applied per partition.
Therefore, if the data is not uniquely divided per partition,
it won’t be able to calculate the unique items properly.
Examples
To get started, let’s create a flow that emits only the first odd and even number for a range:
iex> flow = Flow.from_enumerable(1..100)
iex> flow = Flow.partition(flow, stages: 1)
iex> flow |> Flow.uniq_by(&rem(&1, 2)) |> Enum.sort()
[1, 2]
Since we have used only one stage when partitioning, we
correctly calculate [1, 2] for the given partition. Let’s see
what happens when we increase the number of stages in the partition:
iex> flow = Flow.from_enumerable(1..100)
iex> flow = Flow.partition(flow, stages: 4)
iex> flow |> Flow.uniq_by(&rem(&1, 2)) |> Enum.sort()
[1, 2, 3, 4, 10, 16, 23, 39]
Now we got 8 numbers, one odd and one even per partition. If we want to compute the unique items per partition, we must properly hash the events into two distinct partitions, one for odd numbers and another for even numbers:
iex> flow = Flow.from_enumerable(1..100)
iex> flow = Flow.partition(flow, stages: 2, hash: fn event, 2 -> {event, rem(event, 2)} end)
iex> flow |> Flow.uniq_by(&rem(&1, 2)) |> Enum.sort()
[1, 2]
Specs
window_join(:inner | :left_outer | :right_outer | :outer, t, t, Experimental.Flow.Window.t, (... -> any), (... -> any), (... -> any), keyword) :: t
Joins two flows with the given window.
It is similar to bounded_join/7 with the addition a window
can be given. The window function applies to elements of both
left and right side in isolation (and not the joined value). A
trigger will cause the join state to be cleared.
Examples
As an example, let’s expand the example given in bounded_join/7
and apply a window to it. The example in bounded_join/7 returned
3 results but in this example, because we will split the posts
and comments in two different windows, we will get only two results
as the later comment for post_id=1 won’t have a matching comment for
its window:
iex> posts = [%{id: 1, title: "hello", timestamp: 0}, %{id: 2, title: "world", timestamp: 1000}]
iex> comments = [{1, "excellent", 0}, {1, "outstanding", 1000},
...> {2, "great follow up", 1000}, {3, "unknown", 1000}]
iex> window = Flow.Window.fixed(1, :seconds, fn
...> {_, _, timestamp} -> timestamp
...> %{timestamp: timestamp} -> timestamp
...> end)
iex> flow = Flow.window_join(:inner,
...> Flow.from_enumerable(posts),
...> Flow.from_enumerable(comments),
...> window,
...> & &1.id, # left key
...> & elem(&1, 0), # right key
...> fn post, {_post_id, comment, _ts} -> Map.put(post, :comment, comment) end,
...> stages: 1, max_demand: 1)
iex> Enum.sort(flow)
[%{id: 1, title: "hello", comment: "excellent", timestamp: 0},
%{id: 2, title: "world", comment: "great follow up", timestamp: 1000}]