# Simplifying reactive operators

I've argued that I have reason to believe that ReactiveX (Rx) is backwards. At first I promoted InteractiveX (Ix), which inverts the control, swapping callbacks for await. If you follow the road of trying to simplify Ix and its iterator acrobatics, I believe you'll arrive where I was, reimplementing Reagent. Originally built to shim all of JVM/JS's reactive interfaces into Kotlin's concurrency framework, the result ends up suiting async-await languages better than both Rx and Ix. For many details, see "Reagent is theoretically the best…", but in short, Reagent approaches an exact equivalence to natural code — an isomorphism to what you would write in that situation without the library. In particular, Reagent provides expressive and natural features to implement all aspects of an asynchronous loop to chomp away at a data source.

Building off of Reagent because of its minimalism, I began work over the last few months on a linter for race conditions in reactive code. This linter tracks the flow of values through operators and state to see if any ordering assumptions (what few people can make in Rx) are violated anywhere, say for example, by two values from the same source being pushed to common state by two threads. In the process however, I encountered some pain points in Reagent too:

1. Thread creation could happen anywhere, and
2. The Operator type was overcomplicated: the value source is nested two types in as the argument to the argument to the Supplier type.

They are minor for Reagent, but important for analysis. Reagent can be redesigned to mitigate these (and these changes can be called tweaks really). Here's what I'm proposing:

1. Keep the operators synchronous. In await-async languages, that means no operators are labelled async, and all Tasks or Promises (or whatever the unit of work in called in the given language) are handled by lifting a select few sync functions to async, akin to monadic programming. That way, the operator stays synchronous and unconcerned with the values passing through the async fabric that it's handling.
2. Make data sinks the first-class citizen rather than data sources. Critically, this means Operators are mappings of sinks to sinks, rather than sources to sources. The latter is how Rx, Ix and Reagent all work:
• Rx: Operators take an Observable (be it explicitly as an argument, or implicitly by being an instance method) and map values and timings from it into another Observable. Observable may have onNext within it that is called within the plumbing of the pipeline, or it's a Subject, but canonically, semantically, and often, Observable is used as a pure value source.
• Ix: Operators take asynchronous iterators and transform them into new asynchronous iterators. These iterators are passed around the application, spewing values wherever they're iterated. Again, value sources get the attention.
• Reagent: The core Reagent operators and Observables are almost identical to Rx and are also source-driven. (For these purposes, the two really only differ in the Subscriber signature: where Rx's is T -> (), Reagent's is async T -> Task<Bool>.)

This tightens the equivalence to raw async loops. In the following implementation of buffer, it's clear to see where calls from the raw implementation have gone in the abstracted version:

buf = new List<T>();
all(
async {
// GATES
foreach(_ await in gate) {
f(new List<T>(buf));
buf = new List<T>();
}
},
async {
// VALUES
foreach(v await in src) {
}
}
);

// BECOMES

Sink<T> buffer(Source<_> gates, Sink<List<T>> c) {
buf = new List<T>(); // state moved to operator-local
gates(() => {
List<T> next = new List<T>(buf);
buf = new List<T>();
c(buf); // <- f will go here...
})
return v => buf.Add(v); // from the "VALUES" body
}

Source<void> s_gates = async sink => {
foreach(_ await in gate)
sink();
}
Sink<T> s = buffer(gate, f); // <- ... f went there
foreach(v await in src) {
s(v);
}



But let's not be vague about the benefits of these features. If we consider how code building off of the Sink -> Sink operator will look, we find that the rule has its niche among either monolithic applications, or data pure sinks (e.g. database write layers).

1. Keeping the operators synchronous: firstly, this is doable without any changes to the framework at all. A user making custom operators can follow this design principle. Rx has already proven this to make sense — in particular, Rx implementations in single-threaded event-loop languages (I wonder which one that could be...). While they can schedule events for the future, they can't block on them through the imperative language features.

However, although not out of necessity, there is a good reason to do this in async-await languages too. Where the event-loop languages have an obvious barrier where control crosses into the event loop, there is none in async-await, which is partly what makes it so enjoyable (aside from coloring all your functions). However, the discipline to enforce this allows us to reform the barrier and structure operators in the middle of an asynchronous shell.

Conceptually, this form is very powerful. First, . Second, while we preserve the abstraction of sources and sinks throughout the application, we can also quickly identify the outer async shell, which is where we coordinate all the timing and handle errors.

This is more restrictive than freeform operator programming of course, so we naturally ask what functionality we lose. While not insubstantial, it's also in places with replacements:

1. There is a big loss in error handling flexibility. None of the operators can handle errors because the errors are locked inside the Tasks. There is an argument to be made that in this restricted environment, operators should be so simple and abstract that none should handle errors either. If we transform an operator graph back to an async loop, try-catch that encapsulate await statements would be implemented in the operator graph at the base sources anyways, because that's where the operator call graphs are rooted and where everything is awaited.

2. We also lose operators in the bufferTime and debounce family because both require some scheduled callbacks that come from the scheduler directly, rather than from being called from the operator above it. In general, operators that depend on scheduled things from the ambient scope will violate this condition. The general solution to match is just refactoring the dependencies into "reactive" sources, then feed them through the arguments. The top-level scope will await them in this case, rather than the operator itself.

3. Making data sinks the first-class citizen: the noteworthy thing is that this is not theoretically preferable — it seems to stem instead from a materialistic preference to work with values from sources that we control rather than fulfilling contracts to sinks. However, just looking at the structure of an async loops shows that thinking in terms of sinks has its benefits.

The most obvious advantage comes from the type signature. In Rx and Reagent implementation, sinks are functions that convert a value type to side effects and may call sinks themselves. Therefore, sources are things that take accept sinks and pushes values through them. Compare this to non-reactive programming, where sources are almost always just the return position of some function. A mapping is then just a function: FlatOperator<Ta, Tb> := Ta -> Tb, so of course we would work with sources.

The difference is that a flat return value is not evented: it just happens once and exists. Reactive sources hence need to nest the value type twice, whereas reactive sinks are just first-order functions and are hence the flattest form:

SourceOperator<Ta, Tb> := Source<Ta> -> Source<Ta>
== ((Ta -> *) -> *) -> ((Tb -> *) -> *)
SinkOperator<Ta, Tb> := Sink<Tb> -> Sink<Ta>
== (Tb -> *) -> (Ta -> *)


Flipping the operator on its head has interesting consequences for programming with this type. They are not just negative consequences as such unnatural of a flow would suggest, but is instead just reveals symmetries of reactive code.

merge becomes trivial for example: the sink is just passed into the sources to be merged. However, share becomes hard: instead of pulling values out of whatever scope the supplier ends up in, we have to manage the subscriptions ourselves. This is mostly a matter of Ix vs. the world, as Rx and Reagent have to do this anyways when subscribers are subscribed to the Observable.

Above all else, program flow changes. Here, the leaves of the flow graph (i.e. the subscribers) must exist first, then the graph is constructed from there. From an abstraction standpoint, it's interesting that developers would tend not to want this. Many APIs are massaged data sources to plug consumers into, not consumers to plug data into. Of course, we can define a reciprocal definition of a "base consumer": a Subject-like leaf that caps off the operator graph and allows for deferred subscription.