A Tree of Pipes
Today we have a guest post by Till Heinzel. Till is a physicist-turned-software engineer with a focus on code quality and a passion for C++, particularly the metaprogramming. You can find Till on LinkedIn or on his shiny new blog.
Pipes are pretty neat, don’t you think? They are a great metaphor for what they try to achieve, syntactically simpler than STL algorithms, composable, and avoid several of the issues of ranges. I can definitely see myself using them in my own C++ code in the future.
One thing that pipes still lack, however, is reusability. In a recent series of blog posts, Jonathan presents an approach to allow for the creation of reusable composites of pipes. Here, I would like to share my own approach to implementing pipes, and how it allows reusability in a different way.
Note: Throughout the post, variables that are capital letters (A,B,C,D,…) represent collections. I think it’s easiest to just think of them as std::vector<int>
or something similarly well behaved.
Another note: The code here uses C++17 and is optimized for presentation, so it omits some boilerplate, constructors etc, and is profligate with unnecessary copies. In the actual code I have done my best to avoid such issues. You can check it out on github.
Pipes-expressions are trees
The central idea for this implementation is that the expressions we create when using pipes make up a tree structure. Let’s look at an example that illustrates this:
A >>= demux(pipes::filter(...) >>= pipes::push_back(B), pipes::transform(...) >>= pipes::push_back(C));
Going from left to right, we have:
- A range
A
, over which we loop and send every bit on todemux
, a pipe that sends its input on tofilter
, which checks some predicate and sends the valid data on topush_back(B)
, which callsB.push_back
with its inputs
transform
, which applies some function to its inputs and sends the transformed data on topush_back(C)
, which callsC.push_back
with its inputs
This flow is visualized on the graph to the right, which also clearly shows the tree structure, and the different kinds of nodes we have:
- The “funnel” node, which contains a reference to a range A and a single child. It has no parent, making it the root node of the tree.
demux
, which has a parent and one or more childrenfilter
andtransform
, which both have a parent and a single child.- end nodes, which transfer data out of this pipeline into some collections B and C, and have a parent, but no children. They are thus leaf nodes.
Note: I am not going to go into detail with the root, as it is not required for the reusability of pipes.
Implementing and parsing the tree
To evaluate the expression, each node, except for the root, needs a way to receive data, something to do with said data, and somewhere to send the, possibly modified, data. We can express this in code as:
template<class Op, class… Tails> class Node{ Op op; std::tuple<Tails…> tails; // 0 or more Tails. 0 for end nodes, 1 for transform and filter, any number for demux template<class T> void send(T&& t) { auto f = [&t](auto... tails){op.send(std::forward<T>(t), tails...);} std::apply(f, tails); } }
Here, Op
is the thing that knows what to do with the data, and which differentiates pipes. E.g. the transform
, demux
and push_back
Op’s look like:
template<class F> class Transform{ F transformation; template<class T, class Tail> void send(T&& t, Tail& tail) { tail.send(transformation(std::forward<T>(t))); } }; class Demux{ template<class T, class... Tails> void send(const T& t, Tails&... tails) { // T must be copyable for demux to work (tails.send(t), ...); // fold expressions are neat } }; template<class Pushable> class PushBack{ Pushable& pushable; template<class T> void send(T&& t) { pushable.push_back(std::forward<T>(t)); } };
We could have implemented this using CRTP as well, but this composition-approach separates the precise implementation of individual pipes from the storage mechanism, and makes it easy to implement reusability. If we add the required typedefs and operators* and ++, we can also allow Node to be used as an output iterator, but that is, again, not necessary to get reusability.
Creating Pipe Nodes
One issue that is not shown here is the creation of pipes. What should the transform
(…), filter
(…) and demux
(…) functions return? Practically, it would make sense if they were Node
s, so we could have operator>>=
operate only on Node
s, but Node
s need to know the tail of the pipe (or tailpipe, if you will), which is not known for transform
and filter
before operator>>=
is called. We can resolve this, and have an excellent starting point for reusability, if we add a placeholder for unconnected tails:
struct OpenConnectionPlaceholder{};
and have e.g. the transform
-function return a node with an open connection:
template<class F> auto transform(F f) { return Node<Transform<T>, OpenConnectionPlaceholder>(...); }
The connection is then “closed” by operator>>=
by creating a new Node
, which moves the operation and replaces the open tail with the RHS.
template<class Lhs, class Rhs, isNode<Lhs> = true, isNode<Rhs> = true> auto operator>>=(Lhs lhs, Rhs rhs) { return Node(lhs.op, rhs); }
where isNode
is a SFINAE check as in making SFINAE pretty and robust.
The problem does not arise for multi-child pipes such as demux, because it takes its children as parameters at construction. We focus on the single-child pipes for now and extend the concepts to multi-child pipes later. There is also no problem for endpipes, because they do not have any children at all. So we are now all set to create and connect pipe-nodes.
Open vs. Closed pipes
With OpenConnectionPlaceholder
, we need to distinguish between Node
s that have an open connection somewhere, and those that don’t. We want different behaviours based on if a Node
is “open” (has any OpenConnectionPlaceholder
) or “closed:
- closed
Node
s can be used as output iterators. Open nodes can’t, as data would just be pushed into nothing. That is whatdev_null
allows done in an explicit manner. - closed
Node
s can be used as the RHS foroperator>>=
where the LHS is a root node. This is due to the same issue. - Open
Node
s are allowed on the LHS ofoperator>>=
, closed ones aren’t.
I am not going to go into too much detail here, but I ended up implementing this as two different kinds of node, Node
for open nodes and Output
for closed nodes. When the last connection of a Node
is closed, it is turned into an Output
.
Single child reusability
We almost have reusability of the kind
auto pipe = filter(...) >>= transform(...); A >>= pipe >>= push_back(B);
but not quite. pipe >>= B
would replace the tail of the filter with push_back(B)
, instead of the tail of the transform
.
We can remedy this by recursively looking for an OpenConnectionPlaceholder
in the operator:
template<class Lhs, class Rhs> auto operator>>=(Lhs lhs, Rhs rhs) { if constexpr(hasOpenConnection<Lhs>) { return Node(lhs.op, rhs); } else { return Node(lhs.op, std::get<0>(lhs.tails) >>= rhs); } }
Now the operator rebuilds the tree by finding the open connection and recursively adding the new node this results in.
Note: In reality, this becomes more messy because operator>>=
needs to account for quite a few more situations, and also give good error messages when misused.
Multi-child reusability
The OpenConnectionPlaceholder
was not required to create multi-child pipes like demux, but we can use it to make those reusable as well. This requires that we add it to the pipes-api in some way. I chose to add it as a simple global constant in the pipes-namespace:
namespace pipes { constexpr auto _ = OpenConnectionPlaceholder{}; }
I think _
is a neat name for a placeholder in expressions, but something more verbose is also a possibility.
This allows creating pipes as
auto pipe = demux(push_back(B), _, _);
To me, it seems as if the least surprising possibility is with operator()
on the node: but how can we use them afterwards?
auto pipe = demux(push_back(B), _, _); auto fullpipe = pipe(push_back(C), push_back(D));
This requires the operator to be implemented in such a way, that takes a number of nodes and plugs them in the place of the open connections. The idea is essentially the same as for the simple reusability using operator>>=
: we find an open connection, and create a new node that replaces that connection with the node we passed into the operator. However, now that we are talking about nodes with any number of children, we also need to find the correct OpenConnectionPlaceholder
to replace.
The Connect algorithm: first steps
In the simple example above, the algorithm seems pretty clear: iterate over the children of the pipe and if it is an OpenConnectionPlaceholder
, replace it with the next of the parameters.
In the actual implementation, the parameters and children are stored as tuples, and we need to use some metaprogramming to implement the algorithm. For the sake of developing the algorithm, let’s pretend they are stored in vectors instead, as that is easier to read. So the first algorithm could look something like this:
for(auto p: parameters){ for(auto& child: children){ if(isOpenConnectionPlaceholder(child)) { replace(child, p); break; } } }
Connecting with nested open connections
This works for the simple situation, but it gets more complicated when we consider children with open connections:
auto pipe = demux(_, demux(_, _), _);
Based on the interface, I believe the most sensible ordering isIn this case, we definitely need to fill the nested open connections in, but in what order?
auto pipe = demux(1, demux(2, 3), 4);
so that
pipe(A, B, C, D);
is the same as
auto pipe = demux(A, demux(B,C), D);
We can achieve this by recursively checking the children of the node
for(auto p: parameters) { for(auto& child: children) { if(isOpenConnectionPlaceholder(child)) { replace(child, p); break; } if(hasOpenConnections(child)) { child(p); // recursion step break; } } }
This is essentially the same approach that we used for operator>>=
on single-child nodes, and is a depth-first algorithm: we go along one branch of the tree until we find an open connection, and replace it. It assumes that we can call operator()
with a single parameter, even if there are more than one open connections, but there is no reason not to allow that anyway, so we are good.
Connection final iteration: passing open connections as parameters
So far, all that we have passed as parameters have been closed nodes. Let’s see if the algorithm holds up if we can also pass parameters with open connections:
auto pipe = demux(_, _); auto pipe2 = pipe(demux(_,_), B);
We would expect this to result in
auto pipe = demux(demux(_, _), B);
Let’s see if that is what we would get. With the depth first algorithm above, we will first plug the new demux into the first open slot of the pipe, so we get:
auto pipe = demux(demux(_, _), _);
However, when we try to plug in B, it will consider this whole new tree to go depth-first in and we end up with:
auto pipe = demux(demux(B, _), _);
which is not correct! We will need a different approach.
I tried different methods, and the algorithm I ended up with works like this:
for(auto& child: children) { auto n = openConnectionCount(child); auto paramsToPass = takeFirst(min(n, size(params)), params); child(paramsToPass); if(params.empty()) break; }
For each child we
- 1- figure out how many open connections exist in its subtree.
- 2- take up to that many nodes from the parameters, removing them from the original list.
- 3- recursively call the
operator()
in that subtree with the parameters we took. - 4- Once we have placed all parameters, we stop.
The algorithm is essentially still depth-first, but it has an aspect of being breadth-first, as we split the parameters on each level based on the open connections that each child has. Not only does this work with all the above cases, it also is simpler than the previous versions of the algorithm.
Conclusion
We have seen how one can construct pipes as a tree structure, and how creating reusable pieces of pipe corresponds to holes in that structure, which we can fill later. We have also established algorithms for plugging these holes to create complete pipelines using operator>>=
for simple pipes like transform and filter, and how we can extend that functionality to be usable for multi-child pipes such as demux through operator()
.
The ideas presented here are somewhat high-level, and the actual implementation contains a lot of nitty-gritty details necessary to make it all fit together, and you are welcome to have a look at it on github. It is way less readable than Jonathans implementation, so maybe I will clean it up at some point. Any comments or questions are welcome.
Finally, thanks to Jonathan for running a blog with as many useful ideas as fluentcpp, and allowing me to contribute to it.
You will also like
- Reducing the Code to Create a Pipe in the Pipes Library
- Smart Output Iterators >>= become(Pipes)
- Making C++ Pipes Compatible with STL Algorithms
- How to Make SFINAE Pretty and Robust
Share this post!