Pipes refactor and Cofunctors #1444
louthy
announced in
Announcements
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
LanguageExt Pipes Background
Part of the
v5refresh was to migrate thePipesfunctionality to be a proper monad-transformer (inv4it's a transformer too, but it can only liftEff<RT, A>, rather than the more generalK<M, A> where M : Monad<M>). I completed the generalisation work a while back, but it had some problems:For any users of pipes it was going to be a big upheaval
Obviously,
v5is a big change, but where possible I want the migrations to be quite mechanical - it wasn't going to be. That doesn't mean I shouldn't 'go for it', but I'm trying to make sure that every bit of pain a user has to go through to move fromv4tov5is strongly justified and will lead to a better experience once migrated.It was inconsistently named
The core type
Proxy, and the derived types:Producer,Consumer,Pipe, etc. don't follow the monad-transformer naming convention of having aTsuffix. Really, if they're going to be generalised for any monad then they should be calledProducerT,ConsumerT,PipeT, ...Pipes is hard to use
This is not a new problem with
v5. I made Pipes into a 1-for-1 clone of the Haskell Pipes library. Even in Haskell they can be quite hard to use as you chase alignment of generics. The desire for pipes to support: producers, pipes, clients, servers, and more seems (in hindsight) to be too greedy.Hard to retrofit
The generalisation process wasn't working well in some areas. The
Producer.mergewas blocking and fixing it with the original code was challenging to say the least.LanguageExt Pipes Refresh
So, I decided to take a step back. Instead of trying to make an exact clone of the Haskell version, I thought I'd build it from scratch in a way that's more 'csharpy', consistent, and simpler. In particular I looked at the techniques I used to refactor the
IOmonad (to support recursion, asynchrony, etc.) and brought them into a new Pipes implementation.I also decided to drop support for
Client,Server,Request,Response, and all of the other stuff that I suspect nobody used because they were too hard.That means:
Proxy<A1, A, B1, B, M, R>interface. This was only needed to support all flavours of client, server, producer, consumer, etc.PipeT<IN, OUT, M, R>ProducerT<OUT, M, R>is simply a pipe with the input set toUnit:PipeT<Unit, OUT, M, R>ConsumerT<IN, M, R>is simply a pipe with the output set toVoid:PipeT<IN, Void, M, R>EffectT<M, R>is simply a pipe with the input set toUnitand the output set toVoid. This enclosed effect is the result of fusing producer, pipe, and consumers together:PipeT<Unit, Void, M, R>Those four types:
ProducerT,PipeT,ConsumerT, andEffectTare the new simplified and, fully generalised, version of pipes.Now that the generalised implementation follows the naming convention of having a
Tsuffix for transformers, we can use the original namesProducer,Pipe,Consumer, andEffectto provide a more specialised version that only works withEff<RT, A>(like the original pipes).So,
Producer<RT, OUT, R>is (internally) aProducerT<OUT, Eff<RT>, R>Pipe<RT, IN, OUT, R>is (internally) aPipeT<IN, OUT, Eff<RT>, R>Consumer<RT, IN, R>is (internally) aConsumerT<IN, Eff<RT>, R>Effect<RT, IN, R>is (internally) aEffectT<IN, Eff<RT>, R>The good thing about this refactor is that there really is only one implementation of the pipes functionality and it all sits in the
PipesT.DSL.cs. This focused DSL is much easier to manage than before - it was implemented in a similar way before, but it's now just much easier for a C# dev to consume. I have put a real effort into making the interfaces, modules, preludes, etc. consistent for all types.Pipes concurrency
Concurrency wasn't front-and-centre in the original implementation. In some senses it was 'bolted on'. You got concurrency from the lifted
Efftype and from theProducer.mergefunction, but that was it.Now pipes has first-class support for concurrency:
IEnumerableandIAsyncEnumerablewithProducerT.yieldAll,Producer.yieldAll,PipeT.yieldAll, andPipe.yieldAll.PipeT.liftT,PipeT.liftM,Pipe.liftT,Pipe.liftM,ProducerT.liftT,ProducerT.liftM,Producer.liftT,Producer.liftM,ConsumerT.liftT,ConsumerT.liftM,Consumer.liftT,Consumer.liftM,EffectT.liftT,EffectT.liftM,Effect.liftT, andEffect.liftM!Mailbox,Inbox, andOutboxInspired by the original
Pipes.Concurrencylibrary, I implementedMailbox,Inbox, andOutbox. It's not a clone of the original, just inspired by. AMailboxconsists of anInboxand anOutbox. The inbox receives values posted to it. The outbox yields values posted to the inbox upon request.Backing the
Mailboxis aSystem.Threading.Channels.Channel. You can create aMailboxlike so:A mailbox is simply a
recordwith anInboxandOutbox:You can
Postto theMailboxand you canReadfrom theMailbox. But, even more critically, you can call:mailbox.ToConsumer<M>()- to get a consumer of values being posted into theInboxmailbox.ToProducer<M>()- to get a producer of values being yielded into theOutboxA good example of why this is useful is the new
Producer.mergefunction:The
mergefunction gets a collection of producers. What we want is for those to run concurrently so we can receive the values as they happen. Then we want to produce a single merged stream of values.This creates the merged stream
Mailbox:In
forkEffectswe process each producerpand pipe its values tomailbox.ToConsumerT:So, we get a
ConsumerTfor the merged-stream'sMailbox. It consumes every value fromp, fusing into anEffectT. We thenRun()thatEffectTwhich gives us the underlyingMmonad:We do this for every
ProducerT, which means the merged-valuesMailboxgets every value yielded from upstream.Finally, we
ForkIOeachEffectTso that it can run in parallel.Back to the
mergefunction, we then access the other side of the mailbox by asking for theOutboxproducer, usingToProducerT:This will then yield all of the merged values downstream (whilst there are values to yield). Once complete, we tidy up the forks:
CofunctorMailboxis pretty powerful in its own right and doesn't need pipes to function. This is a quick example of a loop that reads every value posted to aMailboxand writes it to the console:Mailbox<A, B>has two type parameters:Arepresents the values coming in andBrepresents the values being yielded.Values of type
Aare posted toMailbox.Inboxand values of typeBare yielded fromMailbox.Outbox.If you call
mailbox.Map<C>((B b) => ...)onMailboxthen you could imagineMailboxbeing represented like this:Subsequent calls to
Map<D>, and the like, would continue to transform the value being yielded from theMailbox.Outbox:But what if we wanted to transform the values being posted into the
Mailbox.Inbox(theAvalue).Mapdoesn't work here, because it transforms an existing value, we'd have toMaptheAto something else. But, to do that, we'd have to have anAvalue.So, there's no way we can change the values coming in? Well there is, but not with
FunctorandMap. We need Contravariant Functors; colloquially known as 'Co-functors'.When it comes to category-theoretic concepts, 'co', can usually be read to mean 'reverse the arrows'. Or, in other words, find the 'dual' of. So a co-functor is a functor with the arrows reversed.
Functor, looks like this:It maps an
A -> B. This can be seen as mapping the values thatF<A>yields after they've been yielded.Let's reverse the arrows:
Now, it takes an
F<B>and function fromA -> Band returns anF<A>. This may seem batshit crazy. How can we get a value ofAout of anF<B>to pass to theffunction?We can't. And we won't be doing that.
Fin this case is not a type that yields values, but a type that receives values. It's a sink rather than a stream. So, thefis being used to transform values coming into theF<B>(before arrival), not transforming values being yielded (after leaving).Mailbox.Inboxis aCofunctorand so, you can callContramapon theMailboxto transform values before they are posted into theInbox.Custom Mailboxes
Because
Mailboxis simply arecordthat takes anInboxand anOutbox, you can build your own without usingMailbox.spawn.Inboxis currently created from aSystem.Threading.Channels.ChannelWriter:This simply writes to the
Channelwhen a value is posted.And,
Inboxis created fromSystem.Threading.Channels.ChannelReader:Which simply reads a value from
Channelwhen one is available.You could extend
Inbox<A>andOutbox<A>to work with any sink or source-type you like.Channelworks pretty well and has good control over buffer-size and back-pressure, but there are other options too.DivisibleandDecidablecontravariant functorsAnother powerful aspect is that
Inbox<A>isDivisibleandDecidable.A
Divisiblecontravariant functor is the contravariant analogue ofApplicative.Continuing the intuition that Contravariant functors consume input, a
Divisiblecontravariant functor also has the ability to be composed "beside" another contravariant functor.If you 'follow the arrows' here, then you'll see that
fbandfcget somehow composed using theffunction that takes anAvalue, turns them into a pair(B, C)and then passes them on tofbandfc.Visually:
So,
Divideallows a singleF<A>to represent the splitting of values and the routing into two new sinks (F<B>andF<C>). WithInboxthis allows anInbox<A>to be a sink of values ofAthat then route to other (hidden)Inboxstructures.Decidablecontravariant functors are very similar toDivisiblecontravariant functors. But, instead of generating a tuple of(B, C)to route the incoming values to two other contravariant functors at the same time,Decidablecontravariant functors return anEither<B, C>, which means we route the values to only one contravariant functor.Again,
Inbox<A>is aDecidablecontravariant functor and so you can callRouteto direct the values downstream.Visually, if
Bis returned byf:If
Cis returned byfThis discussion was created from the release Pipes refactor.
Beta Was this translation helpful? Give feedback.
All reactions