Struggling to compose pipes-postgresql and pipes-tar

165 views
Skip to first unread message

Oliver Charles

unread,
Oct 16, 2013, 1:23:28 PM10/16/13
to haskel...@googlegroups.com
Hi folks,

It's been a while since I struggled to implement my own libraries, but don't worry - I'm back! I have finally rewritten pipes-tar to use its own custom iterator, which means it now provides the following functions:

parseTarEntries :: (Functor m, Monad m) => Pipes.Producer BS.ByteString m () -> TarArchive m

iterTarArchive :: Monad m => (forall a. TarHeader -> Pipes.Consumer BS.ByteString m a)

Thus users can parse a ByteString producer into a TarArchive, and then iterate over all entries in the archive with iterTarArchive. All well so far.

My intended use case is to stream PostgreSQL table dumps out of a tar archive, and into a PostgreSQL database. This means that I need to write a Consumer for iterTarArchive:

streamTable :: Pg.Connection -> TarHeader -> Pipes.Consumer BS.ByteString m a

And then I can stitch it all together:

iterTarArchive streamTable (parseTarEntries ...)

The problem is mostly in writing that Consumer. Streaming tables works with 3 steps:

  1. Begin COPY command
  2. Stream all the rows into a table
  3. putCopyEnd (or putCopyError) if something went wrong
  4. Repeat
The problem I'm having is getting step 3 to happen promptly. My initial attempt looked like this:

toTable
    :: (MonadIO m, Pipes.MonadSafe m, Pipes.Base m ~ IO)
    => Pg.Connection
    -> Format
    -> String
    -> Pipes.Consumer ByteString m a
toTable c fmt tblName = do
  putCopyEnd <- Pipes.register (void $ Pg.putCopyEnd c)

  Pipes.liftBase $ Pg.copy_ c $ fromString $ concat
    [ "COPY ", tblName
    , " FROM STDIN WITH (FORMAT " , show fmt, ")"
    ]

  Pipes.for Pipes.cat (liftIO . Pg.putCopyData c)
    `onException` (\e -> do Pipes.release putCopyEnd
                            Pipes.liftBase $ Pg.putCopyError c $
                              Text.encodeUtf8 . Text.pack $ show e)

 where

  action `onException` handler =
    action `catchAll` \e -> handler e >> throwM e

I register a finalizer, but remove it if an exception happens (preferring instead to close with putCopyError). However, by doing this, the finalizer happens only when all entries in the archive have been streamed - which is too late! What I want is something like runSafeP, but that is only usable on Effects, and I have a Consumer. If I had a way to use the ReleaseKey in putCopyEnd, I think I could use bracket - my acquire action would do nothing, and my release action would run the putCopyEnd finalizer and delete it from the finalizer map. If the onException handler has deleted this already, then nothing would happen.

Thoughts?
- Ollie

Gabriel Gonzalez

unread,
Oct 19, 2013, 12:17:28 AM10/19/13
to haskel...@googlegroups.com, Oliver Charles
Your `iterTarArchive` type was cutoff, so for the benefit of others I'm pasting the full type here:
iterTarArchive
  :: Monad m
  => (forall a. TarHeader -> Pipes.Consumer BS.ByteString m a)
  -> TarArchive m -> m ()
This is a mistake in my implementation of `onException`.  It should release the finalizer promptly in the event of an exception and not delay it until the end of the `SafeT` block.

The origin of this mistake is that I initially had this for `onException`:

 m1 `onException` io = do
     key <- register (io >> return ())
     r   <- m1 `C.onException` liftBase io
     release key
     return r

However, that was a double-free bug because the `C.onException` did not release the finalizer.  In the following commit I "fixed" it by removing `C.onException` completely, but that accidentally caused the problem you observed: late finalization:

https://github.com/Gabriel439/Haskell-Pipes-Safe-Library/commit/a8746f4800745a3bc24b1b42f4a4c298d19bab03

The correct fix is to begin from the old version I had and instead correctly fix it by having the `C.onException` release the ReleaseKey to prevent double-frees while still preserving prompt finalization.
--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.
To post to this group, send email to haskel...@googlegroups.com.

Pierre R

unread,
Oct 19, 2013, 5:47:13 AM10/19/13
to haskel...@googlegroups.com
Hi Oliver, hi all,

Out of curiosity, why not "Pipes.Tar.iterArchive" or "Pipes.Tar.iter" instead of "Pipes.Tar.iterTarArchive" ?

This was intriguing enough. I've actually tried for a minute to see if there was some sort of consensus in the Haskell community about this ...

As an example, Google Go is one language that improves readability by enforcing strong naming conventions on namespace.

I am actually quite interested in the general issue here. Haskell is quite an elegant language but I have surely not reach the point where I can say it is a particularly easy one to read ;-)

Cheers,


Gabriel Gonzalez

unread,
Oct 19, 2013, 11:40:20 AM10/19/13
to haskell-pipes, Oliver Charles

Let me clarify that I will implement my suggested fix.  I just need to make sure there are no problems with it first.

--

Oliver Charles

unread,
Oct 19, 2013, 12:41:26 PM10/19/13
to Gabriel Gonzalez, haskel...@googlegroups.com
On 10/19/2013 05:17 AM, Gabriel Gonzalez wrote:
Your `iterTarArchive` type was cutoff, so for the benefit of others I'm pasting the full type here:
Whoops, so it did!


This is a mistake in my implementation of `onException`.  It should release the finalizer promptly in the event of an exception and not delay it until the end of the `SafeT` block.

The origin of this mistake is that I initially had this for `onException`:

 m1 `onException` io = do
     key <- register (io >> return ())
     r   <- m1 `C.onException` liftBase io
     release key
     return r

However, that was a double-free bug because the `C.onException` did not release the finalizer.  In the following commit I "fixed" it by removing `C.onException` completely, but that accidentally caused the problem you observed: late finalization:

https://github.com/Gabriel439/Haskell-Pipes-Safe-Library/commit/a8746f4800745a3bc24b1b42f4a4c298d19bab03

The correct fix is to begin from the old version I had and instead correctly fix it by having the `C.onException` release the ReleaseKey to prevent double-frees while still preserving prompt finalization.

Ok, this solves the exceptional case, but I'm still unclear on how it solves the successful case. For that I want to:

  1. Begin the COPY command
  2. Stream all the rows with putCopyData
  3. When there are no more rows (that is, when upstream terminates), run putCopyEnd
As far as I can tell, this can still not be done inside a Consumer with pipes-safe, because there is no way to 'wrap' the Consumer with a runSafeP. Right now, I hacked pipes-tar to use SafeT as the base monad, and then I can runSafeP for every entry in the tar archive. This might be the final solution, but it means I'm a little less polymorphic in pipes-tar, which means for people who don't care about safety they have to hoist.

I'm also not sure if it remains possible for people to use SafeT around the entire tar archive, if that is their intention - but I think it would be by layering my per-entry SafeT over another SafeT.

And don't rush to fix anything - it's the weekend :)

- Ollie
signature.asc

Gabriel Gonzalez

unread,
Oct 19, 2013, 1:08:33 PM10/19/13
to Oliver Charles, haskel...@googlegroups.com
Yeah, you're right.  I don't think there is a way to implement this using your current `iterTarArchive`.  However, what's wrong with just using `iterT` on `TarArchive` directly?  That is much more flexible and allows one to use `runSafeP` and it still prevents misuse.

Also, side note: What is the purpose behind the `TarEntryProducer` constructor of the `TarEntry` type?

Oliver Charles

unread,
Oct 19, 2013, 2:17:20 PM10/19/13
to Gabriel Gonzalez, haskel...@googlegroups.com
On 10/19/2013 06:08 PM, Gabriel Gonzalez wrote:
Yeah, you're right.  I don't think there is a way to implement this using your current `iterTarArchive`.  However, what's wrong with just using `iterT` on `TarArchive` directly?  That is much more flexible and allows one to use `runSafeP` and it still prevents misuse.
iterT allows the user to call back into the parser multiple times, which would lead to corruption of the parser state. If the functor contains a Producer that must be ran to get the next action, this should be ran exactly once - no more, no less. Writing my own iterator meant I could get that guarantee.


Also, side note: What is the purpose behind the `TarEntryProducer` constructor of the `TarEntry` type?
That is an indication that the contents of an entry must be stream to determine the amount of bytes to go in the tar head.

- ocharles
signature.asc

Gabriel Gonzalez

unread,
Oct 19, 2013, 2:46:44 PM10/19/13
to Oliver Charles, haskel...@googlegroups.com
On 10/19/2013 11:17 AM, Oliver Charles wrote:
On 10/19/2013 06:08 PM, Gabriel Gonzalez wrote:
Yeah, you're right.  I don't think there is a way to implement this using your current `iterTarArchive`.  However, what's wrong with just using `iterT` on `TarArchive` directly?  That is much more flexible and allows one to use `runSafeP` and it still prevents misuse.
iterT allows the user to call back into the parser multiple times, which would lead to corruption of the parser state. If the functor contains a Producer that must be ran to get the next action, this should be ran exactly once - no more, no less. Writing my own iterator meant I could get that guarantee.

I think trying to enforce this stronger property is much more trouble than its worth.

For example, consider this code:

    import Pipes
    import qualified Pipes.Prelude as P

    p = P.stdinLn >-> P.take 10

Nothing prevents me from reading more than 10 lines by just restarting `p`.  Getting this right requires linear types (which Haskell does not have).

Taking your idea to its logical conclusion would basically require replacing every `Producer` in the `pipes` ecosystem with something like this:

    stdinLn2 :: (String -> IO ()) -> IO ()

This `stdinLn2` example has the same problems as your `iterTarArchive`.  For example, there is no way to limit `stdinLn2` to only print the first 10 lines of input, just like you can't limit `iterTarArchive` to only process 10 tar entries.

This whole `pipes-safe` problem is just the tip of the iceberg for the issues you are going to encounter, the kind of issues I was trying to solve when I came up with `pipes`.  For example, using `iterTarArchive` there is no way to `hoist lift` the `Producer` if its base monad does not match the `Consumer`.

So my point is that if you try to statically prevent multiple traversals you end up losing the cool features that makes using `pipes` worthwhile.

Oliver Charles

unread,
Oct 19, 2013, 3:50:54 PM10/19/13
to Gabriel Gonzalez, haskel...@googlegroups.com
On 10/19/2013 07:46 PM, Gabriel Gonzalez wrote:
On 10/19/2013 11:17 AM, Oliver Charles wrote:
On 10/19/2013 06:08 PM, Gabriel Gonzalez wrote:
Yeah, you're right.  I don't think there is a way to implement this using your current `iterTarArchive`.  However, what's wrong with just using `iterT` on `TarArchive` directly?  That is much more flexible and allows one to use `runSafeP` and it still prevents misuse.
iterT allows the user to call back into the parser multiple times, which would lead to corruption of the parser state. If the functor contains a Producer that must be ran to get the next action, this should be ran exactly once - no more, no less. Writing my own iterator meant I could get that guarantee.

I think trying to enforce this stronger property is much more trouble than its worth.

For example, consider this code:

    import Pipes
    import qualified Pipes.Prelude as P

    p = P.stdinLn >-> P.take 10

Nothing prevents me from reading more than 10 lines by just restarting `p`.  Getting this right requires linear types (which Haskell does not have).
Yes, I want linear types on an almost daily basis it seems ;)


Taking your idea to its logical conclusion would basically require replacing every `Producer` in the `pipes` ecosystem with something like this:

    stdinLn2 :: (String -> IO ()) -> IO ()

This `stdinLn2` example has the same problems as your `iterTarArchive`.  For example, there is no way to limit `stdinLn2` to only print the first 10 lines of input, just like you can't limit `iterTarArchive` to only process 10 tar entries.

This whole `pipes-safe` problem is just the tip of the iceberg for the issues you are going to encounter, the kind of issues I was trying to solve when I came up with `pipes`.  For example, using `iterTarArchive` there is no way to `hoist lift` the `Producer` if its base monad does not match the `Consumer`.

Alright, damaging composition is a really convincing argument. I'll have to live with documentation to solve this problem then, as much as that makes me grumble.

Thanks for providing a sufficient argument :)
- ocharles
signature.asc

Gabriel Gonzalez

unread,
Oct 19, 2013, 3:53:10 PM10/19/13
to Oliver Charles, haskel...@googlegroups.com
I, too, desire linear types on a very regular basis.  It would make resource management sooooo much easier.

Gabriel Gonzalez

unread,
Oct 23, 2013, 7:27:04 PM10/23/13
to haskel...@googlegroups.com, Pierre R
On 10/19/2013 2:47 AM, Pierre R wrote:
> Hi Oliver, hi all,
>
> Out of curiosity, why not "Pipes.Tar.iterArchive" or "Pipes.Tar.iter"
> instead of "Pipes.Tar.iterTarArchive" ?
>
> This was intriguing enough. I've actually tried for a minute to see if
> there was some sort of consensus in the Haskell community about this ...
>
> As an example, Google Go is one language that improves readability by
> enforcing strong naming conventions on namespace.

I really wish Haskell had something like `gofmt`.

>
> I am actually quite interested in the general issue here. Haskell is
> quite an elegant language but I have surely not reach the point where
> I can say it is a particularly easy one to read ;-)

Part of what makes it hard to read for me is all the optional syntax
sugar. It's not because I don't like the syntax sugar, but rather
because everybody has their own subset of which syntax sugar that they
use. For example, I use `do` notation religiously, almost to a fault,
while other people like to mix `do` notation with `(>>=)`/`(>>)`.
Similarly, some people use pattern matching whereas others use `case`
statements.

Also, really point-free code is difficult for me to read as well.

>
> Cheers,
Reply all
Reply to author
Forward
0 new messages