Pipelining with Go and Generics

517 views
Skip to first unread message

K. Alex Mills

unread,
Aug 11, 2022, 1:55:14 PM8/11/22
to golang-nuts
Hello Gophers,

I recently had an opportunity to try out Go generics on a small pipelines package, along with some of my coworkers.

The overall goal of this package is to provide helpers for separating concurrency from the core logic of the computation. The result was intended for I/O bound computations, and so it's likely inappropriate for managing short-lived goroutines. It takes a functional programming approach, providing helpers with familiar names seen in other APIs like Map, FlatMap, OptionMap, etc. One feature which I am particularly happy with is that concurrency concerns like worker pool size and channel buffers are configurable with minimal disruption to the rest of the code.

Take a look at the library and its accompanying blog post. I'm open to any of your thoughts, suggestions, and issue reports.

Sincerely,

K. Alex Mills

Robert Engels

unread,
Aug 11, 2022, 3:37:05 PM8/11/22
to K. Alex Mills, golang-nuts
I’d say it certainly highlights a problem with Go’s error model. Exceptions would fit nicely here - instead it seems you needed to ignore all error handling - because chaining is impossible with error returns. 

A streams api with panic/recover is needed. 

On Aug 11, 2022, at 12:55 PM, K. Alex Mills <k.alex...@gmail.com> wrote:


--
You received this message because you are subscribed to the Google Groups "golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/CALJzkY_zASs-YOukv6ciSO45b93jz39DmjAWA915kfBuwimkgQ%40mail.gmail.com.

K. Alex Mills

unread,
Aug 11, 2022, 4:25:29 PM8/11/22
to Robert Engels, golang-nuts
Thanks so much for taking a look, and good catch! Error handling is definitely a missing piece in the library at the moment. I'm not sure panic/recover is the direction I'd take it, but I do think it could be made to work.

When using this library, the way we're handling errors is by hanging the entire pipeline system off a struct which provides access to an error chan. Every pipeline stage shares the same context, which a separate goroutine cancels if any error is seen on the error channel.

I've considered including error handling based either around this idea or an errgroup, but haven't had time to write it up. Might do so soon.

Jan Mercl

unread,
Aug 11, 2022, 4:27:31 PM8/11/22
to Robert Engels, K. Alex Mills, golang-nuts


On Thu, Aug 11, 2022, 21:36 Robert Engels <ren...@ix.netcom.com> wrote:
I’d say it certainly highlights a problem with Go’s error model. Exceptions would fit nicely here - instead it seems you needed to ignore all error handling - because chaining is impossible with error returns. 

It's okay if someone prefers the way Java does things, but the best thing about Go is IMHO that it is not Java. And I still hope it stays away from becoming Java as long as possible.


A streams api with panic/recover is needed. 

On Aug 11, 2022, at 12:55 PM, K. Alex Mills <k.alex...@gmail.com> wrote:


Hello Gophers,

I recently had an opportunity to try out Go generics on a small pipelines package, along with some of my coworkers.

The overall goal of this package is to provide helpers for separating concurrency from the core logic of the computation. The result was intended for I/O bound computations, and so it's likely inappropriate for managing short-lived goroutines. It takes a functional programming approach, providing helpers with familiar names seen in other APIs like Map, FlatMap, OptionMap, etc. One feature which I am particularly happy with is that concurrency concerns like worker pool size and channel buffers are configurable with minimal disruption to the rest of the code.

Take a look at the library and its accompanying blog post. I'm open to any of your thoughts, suggestions, and issue reports.

Sincerely,

K. Alex Mills

--
You received this message because you are subscribed to the Google Groups "golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/CALJzkY_zASs-YOukv6ciSO45b93jz39DmjAWA915kfBuwimkgQ%40mail.gmail.com.

--
You received this message because you are subscribed to the Google Groups "golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.

Robert Engels

unread,
Aug 11, 2022, 4:37:58 PM8/11/22
to Jan Mercl, K. Alex Mills, golang-nuts
I don’t think that is relevant. It is very difficult to do chaining with Go’s error model. You can pass a shared context to every node and store the error in the context and protect against concurrent access. It’s doable but not easy. 

Map/reduce and most functional patterns are easily represented using chains. 

But like I said, I would use panic/recover in the framework to make it easier. 



On Aug 11, 2022, at 3:27 PM, Jan Mercl <0xj...@gmail.com> wrote:



Robert Engels

unread,
Aug 11, 2022, 4:47:04 PM8/11/22
to Jan Mercl, K. Alex Mills, golang-nuts
To finish, you would pass the same context to the execution group to support parallel streams, etc. 

It is not a drastic change over Go’s error model but every node needs to handle the context cancellation/error properly. Again not hard - certainly doable. I think in some ways it would be easier than the current iterator discussion because the options are more limited in how you could do it (without additional language changes)

On Aug 11, 2022, at 3:37 PM, Robert Engels <ren...@ix.netcom.com> wrote:



burak serdar

unread,
Aug 11, 2022, 4:48:12 PM8/11/22
to Robert Engels, Jan Mercl, K. Alex Mills, golang-nuts
On Thu, Aug 11, 2022 at 2:37 PM Robert Engels <ren...@ix.netcom.com> wrote:
I don’t think that is relevant. It is very difficult to do chaining with Go’s error model. You can pass a shared context to every node and store the error in the context and protect against concurrent access. It’s doable but not easy. 

Map/reduce and most functional patterns are easily represented using chains. 

But like I said, I would use panic/recover in the framework to make it easier. 

I recently worked on a concurrent data pipeline to process a stream of data packages. Error handling was done with an error channel. Each stage of the pipeline had to prepare an error message capturing the current context, cause of the error, etc., and then write to the error channel before processing the next entry. 
 

Jan Mercl

unread,
Aug 11, 2022, 4:59:30 PM8/11/22
to Robert Engels, K. Alex Mills, golang-nuts


On Thu, Aug 11, 2022, 22:37 Robert Engels <ren...@ix.netcom.com> wrote:
I don’t think that is relevant. It is very difficult to do chaining with Go’s error model.

I don't think "Go's error model" is a thing. The language specification does not mention it and I'm not aware of any official Go documentation that should be considered the "Go's error model".

Go has the predefined error interface with a single method. That's all. The rest is at most a convention. No need to make a  ceremony of it.

Wrt "it's very difficult...". Of course it's difficult to do idioms from other languages in Go when those idioms Go intentionally does not support. So the claim is sort of a tautology, isn't it?

Once again, having different preferences is okay. Generalizing personal preferences as Go fault is IMO not okay.

Maybe you can fill a proposal at the Go issue tracker to add all/some/any Java/etc idioms to Go and let's see how it works.

Robert Engels

unread,
Aug 11, 2022, 5:18:50 PM8/11/22
to Jan Mercl, K. Alex Mills, golang-nuts
Every modern language with generics or similar has a streams package so there’s no need to call out Java. 

Like I said, you can chain by passing and checking the errors within the streamcontext.

On Aug 11, 2022, at 3:59 PM, Jan Mercl <0xj...@gmail.com> wrote:



K. Alex Mills

unread,
Aug 11, 2022, 5:57:08 PM8/11/22
to Robert Engels, Jan Mercl, golang-nuts
See the example added in this PR for one way to do error handling with this library without involving panic/recover.

One possible drawback to this approach is that it doesn't provide a way to return an error directly from a `func` which has been passed to a pipeline stage. I considered that, but it would seem to involve yet another pair of variants for each proto stage: e.g. Map, MapCtx, MapCtxErr, and MapErr, with the following signatures:

func Map      [S, T any](ctx context.Context, in <-chan S, f func(S) T,                           opts ...Option) <-chan T
func MapCtx   [S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) T,          opts ...Option) <-chan T
func MapErr   [S, T any](ctx context.Context, in <-chan S, f func(S) (T, error),                  opts ...Option) <-chan T
func MapCtxErr[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) (T, error), opts ...Option) <-chan T

...this proliferation of variants of essentially "the same" function is making me reconsider some of my life choices. Asking each stage to capture and interact with the ErrorSink yields a smaller API surface area. That said, the Err variants would allow the library to do things like store the ErrorSink in the context and use it if it's present.

I'm not sure that's worth the complexity, and it feels very much like "magic", where it seems Go "should" be explicit, but I can be convinced otherwise.

All that said, if you really want to panic and you don't mind the pipeline shutting down whenever you do, I don't think there's anything stopping you from doing so with this library. Just use recover at the top of the func that spins up the pipeline and use context.WithCancel to shut the rest of the pipeline down in that case. Let me know if that's unclear and I'll do my best to provide an example.

Sincerely,

K. Alex Mills

K. Alex Mills

unread,
Aug 11, 2022, 6:19:30 PM8/11/22
to Robert Engels, Jan Mercl, golang-nuts
Ah, my apologies, I was wrong. Panicking in the midst of a func passed to one of these pipeline stages most certainly will not shut down the entire pipeline and cannot be recovered like I had suggested. Sorry about that.

Sincerely,

K. Alex Mills

robert engels

unread,
Aug 11, 2022, 9:51:57 PM8/11/22
to K. Alex Mills, Jan Mercl, golang-nuts
I don’t think the chaining of the stream operations can work.

AFAICT you cannot have optional type parameters, and you cannot add type parameters to methods, so a method like Map() on a stream instance can’t be written - because the stream is of type T, and the Map() produces a stream of type R.

psuedo code:

type Stream[T any] interface {
    ...
    Map[R any](func (T) R) Stream[R] // syntax not supported
    ...
}

What am I missing?

K. Alex Mills

unread,
Aug 11, 2022, 10:38:44 PM8/11/22
to robert engels, Jan Mercl, golang-nuts
I don't think you're missing anything. AFAICT, you're right, a generic Stream [T] interface with generic methods hanging off a struct cannot be supported by the current generics implementation because of the restriction on generic methods. I tried to do similar things by implementing interfaces for common monads when the generics beta was released and could not make it work.

But it's not clear to me that you need to have an explicit Stream[T] type in order to have a generic streams package. As an alternative, the pipelines package I've introduced here takes the position that "Stream[T]" in Go is spelled "chan T". ;-)

What does instead is provide generic top-level functions which can start and manage individual stages of the pipeline, leaving the particulars of stitching the inputs and outputs of each stage to the caller. This approach might not meet your definition of "chaining", but it gets the job done.

Jan Mercl

unread,
Aug 12, 2022, 5:22:55 AM8/12/22
to Robert Engels, K. Alex Mills, golang-nuts
On Thu, Aug 11, 2022 at 11:18 PM Robert Engels <ren...@ix.netcom.com> wrote:

> Every modern language with generics or similar has a streams package so there’s no need to call out Java.

For some reason unknown to me you seem to consider Go not being a
modern language as it does not have a "streams" package. I don't care,
but it's worth noting.

Back to the pipelining, error "models" and exceptions. Some languages
I consider not modern are limited to single values returned from a
function. That makes pipelining a problem if errors can get involved.
Exceptions are more or less a poor solution to overcome that
limitation. Go does not have this limitation so it does not need the
poor solution to it. Yet you call it a problem of the Go error
"model". It makes little sense to me.

Robert Engels

unread,
Aug 12, 2022, 8:26:02 AM8/12/22
to Jan Mercl, K. Alex Mills, golang-nuts
Maybe there is a language barrier. My statement implied Go was a modern language yet it did not have a streams package - not that it was not a modern language.

You seem to be looking for a fight and I’m not sure why. Why don’t you focus on solutions - it’s better for everyone.

> On Aug 12, 2022, at 4:22 AM, Jan Mercl <0xj...@gmail.com> wrote:
> --
> You received this message because you are subscribed to the Google Groups "golang-nuts" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/CAA40n-WRiskdC0NMb9R1A0H6Gs0hMN11XZyYhFjp8TxOF0i-Cw%40mail.gmail.com.
Reply all
Reply to author
Forward
0 new messages