runResourceT type conversion in streaming library

74 views
Skip to first unread message

Sal

unread,
Jun 6, 2016, 5:29:07 PM6/6/16
to Haskell Pipes
I have run into trouble when trying to use streaming package for AWS S3 requests. The trouble seems to be that the API in question (`AWS.pureAws`) returns a type `ResourceT IO a` while the streaming package provides `ResourceT m a`. I couldn't locate any documentation on how to do the lift to convert between two types. My code is below - I get the error `No instance for (SP.MonadResource IO) arising from a use of ‘Q.readFile’`.



{-# LANGUAGE OverloadedStrings #-}

import qualified Aws
import qualified Aws.Core as Aws
import qualified Aws.S3 as S3
import qualified Data.ByteString.Lazy as LBS
import Control.Monad.IO.Class
import System.IO
import Control.Monad.Trans.Resource (runResourceT,liftResourceT)
import Control.Concurrent.Async (async,waitCatch)
import Control.Exception (displayException)
import qualified Data.ByteString.Streaming.HTTP as SP
import qualified Data.ByteString.Streaming as Q


main
:: IO ()
main
= do
 
{- Set up AWS credentials and S3 configuration using the IA endpoint. -}
 
Just creds <- Aws.loadCredentialsFromEnv
  let cfg
= Aws.Configuration Aws.Timestamp creds (Aws.defaultLog Aws.Error)
  let s3cfg
= S3.s3 Aws.HTTP S3.s3EndpointUsClassic False

 
{- Set up a ResourceT region with an available HTTP manager. -}
  httpmgr
<- SP.newManager SP.tlsManagerSettings
  let file
="out"
  inhandle
<- openFile file ReadMode
  lenb
<- System.IO.withFile file ReadMode hFileSize
  let inbytes
= Q.readFile file
  runResourceT $
do
     
Aws.pureAws cfg s3cfg httpmgr $
     
(S3.putObject "put-your-test-env-here" ("testbucket/test") (SP.streamN (fromIntegral lenb) inbytes))
 
return ()



This seems to be the problem:

*Main> :t SP.streamN
SP
.streamN :: GHC.Int.Int64 -> Q.ByteString IO () -> SP.RequestBody
*Main> :t inbytes
inbytes
:: SP.MonadResource m => Q.ByteString m ()


Will appreciate pointers on how to fix this. I also can't figure out how `monadResource` and `resourceT` are related (couldn't find any good documentation about their relationship), which seems to be essential to solving the above puzzle.

Michael Thompson

unread,
Jun 7, 2016, 10:56:20 AM6/7/16
to Haskell Pipes
`Q.readFile` is just a convenience - it opens and closes the handle when you apply `runResourceT`. In this case it's no convenience really, since you already have the handle open to check the file size. Does this seem reasonable? ...

      {-# LANGUAGE OverloadedStrings #-}

      import qualified Aws 
      import qualified Aws.Core as Aws 
      import qualified Aws.S3 as S3 
      import qualified Data.ByteString.Streaming.HTTP as SP 
      import qualified Data.ByteString.Streaming as Q 

      import Streaming -- for classes and methods lift, liftIO, hoist, runResourceT 
      import System.IO 
      import Control.Exception

      main :: IO () 
      main = do 
        {- Set up AWS credentials and S3 configuration using the IA endpoint. -} 
        Just creds <- Aws.loadCredentialsFromEnv 
        let cfg = Aws.Configuration Aws.Timestamp creds (Aws.defaultLog Aws.Error) 
        let s3cfg = S3.s3 Aws.HTTP S3.s3EndpointUsClassic False 
        httpmgr <- SP.newManager SP.tlsManagerSettings 
        let file ="out"
        withFile file ReadMode $ \inhandle -> 
          runResourceT $ do  
            lenb <- liftIO (hFileSize inhandle)
            Aws.pureAws cfg s3cfg httpmgr $
              (S3.putObject "put-your-test-env-here" ("testbucket/test")
              (SP.streamN (fromIntegral lenb) (Q.fromHandle inhandle)))
        return ()



Michael Thompson

unread,
Jun 7, 2016, 11:01:50 AM6/7/16
to Haskell Pipes
I meant to emphasize that the above code also type checks if you write

     import qualified Pipes.HTTP as SP 
     import qualified Pipes.ByteString as Q 

instead of 

Sal

unread,
Jun 7, 2016, 11:38:44 AM6/7/16
to Haskell Pipes
Michael, thanks for pointers. I am just curious why my code didn't type-check before. What exactly prevented it from type-checking? Like "monadResource" constraint on Q.readFile? Which forces it to run within `resourceT` monad (I haven't found any good article about relationship between monadResource and resourceT yet - so, still figuring this out)?

BTW, I like the design of streaming library very much. It made it much easier for me to wrap a parser I wrote, and reason about it. Vs io-streams which I used it in the past. This is how the main part of code looks like, with streaming library for that parser - very efficient too in performance tests:

(|>) = (&) -- pipe alias for readability - borrowed from Elm

-- Test function

main
:: IO ()
main
= do
 
-- oneRec pretty-prints one JSON output which we pass to putStrLn so newlines are interpreted
  Q
.getContents |> A.parsed oneRec |> void |> S.mapM_ BSC.putStrLn
 
return ()

Michael Thompson

unread,
Jun 7, 2016, 5:39:38 PM6/7/16
to Haskell Pipes
`streamN` just replicates the`streamN` in pipes-http, which presupposes that the underlying monad for the byte stream is IO. But `Q.writeFile file` gives you a stream with the underlying monad `ResourceT IO`.

It should be possible to write a `streamN` that takes a `ByteString (ResourceT IO) ()`, though. See the duplication of functions in `http-conduit`

    - IO :                      http://hackage.haskell.org/package/http-conduit-2.1.10.1/docs/Network-HTTP-Client-Conduit.html#v:requestBodySource  

The first takes a `Source IO ByteString`, the second a `Source (ResourceT IO) ByteString`. `streamN` here, replicating pipes-http, is like the first of these.

It's always possible I'm missing something in the `ResourceT` + `Conduit` wilderness of mirrors, but we just need an equivalent of `to` from Pipes.HTTP that is more like something like `srcToPopper` inside Network.HTTP.Conduit.  

    streamN' :: Int64 -> Q.ByteString (ResourceT IO) () -> SP.RequestBody
    streamN' n str =  SP.RequestBodyStream n (popperize str)

    -- cp the helper functions `Pipes.HTTP.to` and `Network.HTTP.srcToPopper`
    popperize ::  Q.ByteString (ResourceT IO) () -> (IO ByteString -> IO a) -> IO a
    popperize str0 with_reader = runResourceT $ do
      ref <- liftIO (newIORef str0)
      is  <- getInternalState
      let reader :: IO ByteString
          reader = do
              str <- readIORef ref
              e   <- runInternalState (Q.nextChunk str) is  -- Q.nextChunk str is in ResourceT IO but we 
              case e of                                     -- here purport to run it safely in IO
                Left r -> do 
                  writeIORef ref (return r)
                  return mempty     -- the apparatus will take this null chunk as eof, so ...
                Right (chunk, rsrc') -> do
                    writeIORef ref rsrc'
                    if B.null chunk  -- ... here we have to make sure not to return one.
                      then reader
                      else return chunk
      liftIO (with_reader reader) 



I haven't tested this with anything real, but it works with this trivial `NeedsPopper`

    -- lengthy :: SP.NeedsPopper Int
    lengthy :: IO ByteString -> IO Int
    lengthy act = loop 0 
      where
        loop n = do 
          chunk <- act
          let len = B.length chunk
          if len > 0 
            then loop (n+len)
            else return n
 
    -- >>> popperize (Q.readFile "/usr/share/dict/words") lengthy
    -- 2493109

    -- >>> runResourceT $ Q.length  (Q.readFile "/usr/share/dict/words")
    -- 2493109 :> ()

    -- >>> :! wc -c /usr/share/dict/words
    --  2493109 /usr/share/dict/words


 It would be interesting to know if it can survive the aws machinery.

Michael Thompson

unread,
Jun 7, 2016, 5:44:06 PM6/7/16
to Haskell Pipes
Reply all
Reply to author
Forward
0 new messages