Keeping memory bounded when converting from streaming to lazy bytestring

43 views
Skip to first unread message

Sal

unread,
Jun 13, 2016, 11:12:17 PM6/13/16
to Haskell Pipes

Hello,


This question is regarding streaming library. What is a good way to pass lazy bytestring from the result of a streaming function while keeping the memory bounded? I will like to pass lazy bytestring to Network websocket, but if I use `toLazy_ . fromChunks` to convert bytestring stream to lazy bytestring, it seems to take lot of memory, about 70MB residency for 100MB HTTP download. 


First, some imports:


import qualified Data.ByteString.Streaming as SBS
import qualified Data.ByteString.Lazy as LBS
import           Data.Conduit (($$+-))
import qualified Data.Conduit.List as CL (mapM_)
import Streaming.Prelude as S
import Streaming as S
import qualified Data.ByteString.Streaming.HTTP as SP




-- first type of HTTP download from AWS - we pass the HTTP body to SP.responseBody function

ghci > :t (($$+- CL.mapM_ S.yield) . hoist lift ) (SP.responseBody undefined)
(($$+- CL.mapM_ S.yield) . hoist lift ) (SP.responseBody undefined)
 
:: Monad m => Stream (Of a) m ()



The code below works fine and takes ~4MB memory for 100MB download. It just simulates an in-place sink of lazy bytestring - a smoke check to make sure memory can be bounded - rsp here is HTTP body

S.foldM_ (\_ a -> liftIO $ LBS.appendFile "out" (LBS.fromStrict a)) (liftIO $ LBS.writeFile "out" LBS.empty) (return . id) $ (($$+- CL.mapM_ Streaming.Prelude.yield) . hoist lift ) (SP.responseBody rsp)


Now, let us just take the lazy bytestring which we can then pass to Network websocket for outgoing message - writeFile here simulates write to network socket - this one takes ~70MB memory for 100MB download:

liftIO . (LBS.writeFile "out") =<< (SBS.toLazy_ . SBS.fromChunks $ (($$+- CL.mapM_ S.yield) . hoist lift ) (SP.responseBody rsp))

So, the memory usage goes up substantially when returning lazy bytestring. If I am not mistaken, `LBS.writeFile` doesn't keep the whole bytestring in memory unlike `LBS.readFile`. What will be a good way to return lazy bytestring for write to network socket while keeping memory usage low?

Michael Thompson

unread,
Jun 14, 2016, 9:01:13 AM6/14/16
to Haskell Pipes
Right, `toLazy` (and thus `toLazy_`, which just drops the final return value) return a strictly evaluated lazy bytestring. They are like `Pipes.ByteString` `toLazyM` and `Pipes.Prelude.toListM` and conduit things like `sinkList` and `sinkLazy`[^1] -- they all just make a "difference list" of each chunk or element, and then when the stream is done, compose them and apply to LBS.empty to make a lazy bytestring / lazy text / list . So you can't inspect the beginning without until you the whole materialized stream in memory. (They don't have to write a new bytestring at least, they just recycle the references to the chunks.) The object of the game is to keep all that from happening, of course, though sometimes it is necessary. I'm not sure I'm following the round-tripping through conduit in the snippets, are you working with a conduit source somewhere?

Michael Thompson

unread,
Jun 14, 2016, 9:06:01 AM6/14/16
to Haskell Pipes

Sal

unread,
Jun 14, 2016, 9:22:43 AM6/14/16
to Haskell Pipes
Hi Michael,

Yep, I am working with conduit because of AWS API. Object response is of type Response (ResumableSource (ResourceT IOByteString).

I figured the same thing that you mentioned about inspecting the objects which forces them to be in memory. 

So, I re-wrote it to do a `mapM_` instead to a sink. Now, I am trying to figure out how to map this solution to websocket. Since you asked about the conduit source, here is the code from a StackOverflow question I just posted - I have refactored it now to use constant memory:

{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}

import qualified Aws
import qualified Aws.S3 as S3
import           Data.Conduit (($$+-))
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as SP
import qualified Data.ByteString.Lazy as LBS
import Streaming as S
import Streaming.Prelude as S hiding (show,print)
import Control.Concurrent.Async (async,waitCatch)
import Data.Text as T (Text)

data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration, _aws_s3cfg :: S3.S3Configuration a, _aws_httpmgr :: SP.Manager }

getObject :: AwsConfig Aws.NormalQuery -> T.Text -> T.Text ->  IO Int
getObject cfg bucket key = do
  req <- waitCatch =<< async (runResourceT $ do
    {- Create a request object with S3.getObject and run the request with pureAws. -}
    S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <- 
      Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
        S3.getObject bucket key
    {- Stream the response to a lazy bytestring -}
    liftIO $ LBS.writeFile "testaws" LBS.empty -- this will be replaced by content-length of the bytes 
    let obj = (($$+- CL.mapM_ S.yield) . hoist lift ) (SP.responseBody rsp)
    S.mapM_ (liftIO . (LBS.appendFile "testaws") . LBS.fromStrict) obj
    return $ lookup "content-length" (S3.omUserMetadata mdata))
  case req of
    Left _ -> return 2 -- perhaps, we could use this to send an error message over websocket 
    Right _ -> return 0 
Reply all
Reply to author
Forward
0 new messages