Hello,
This question is regarding streaming library. What is a good way to pass lazy bytestring from the result of a streaming function while keeping the memory bounded? I will like to pass lazy bytestring to Network websocket, but if I use `toLazy_ . fromChunks` to convert bytestring stream to lazy bytestring, it seems to take lot of memory, about 70MB residency for 100MB HTTP download.
First, some imports:
import qualified Data.ByteString.Streaming as SBS
import qualified Data.ByteString.Lazy as LBS
import Data.Conduit (($$+-))
import qualified Data.Conduit.List as CL (mapM_)
import Streaming.Prelude as S
import Streaming as S
import qualified Data.ByteString.Streaming.HTTP as SP
-- first type of HTTP download from AWS - we pass the HTTP body to SP.responseBody function
ghci > :t (($$+- CL.mapM_ S.yield) . hoist lift ) (SP.responseBody undefined)
(($$+- CL.mapM_ S.yield) . hoist lift ) (SP.responseBody undefined)
:: Monad m => Stream (Of a) m ()
The code below works fine and takes ~4MB memory for 100MB download. It just simulates an in-place sink of lazy bytestring - a smoke check to make sure memory can be bounded - rsp here is HTTP body
S.foldM_ (\_ a -> liftIO $ LBS.appendFile "out" (LBS.fromStrict a)) (liftIO $ LBS.writeFile "out" LBS.empty) (return . id) $ (($$+- CL.mapM_ Streaming.Prelude.yield) . hoist lift ) (SP.responseBody rsp)
Now, let us just take the lazy bytestring which we can then pass to Network websocket for outgoing message - writeFile here simulates write to network socket - this one takes ~70MB memory for 100MB download:
liftIO . (LBS.writeFile "out") =<< (SBS.toLazy_ . SBS.fromChunks $ (($$+- CL.mapM_ S.yield) . hoist lift ) (SP.responseBody rsp))
So, the memory usage goes up substantially when returning lazy bytestring. If I am not mistaken, `LBS.writeFile` doesn't keep the whole bytestring in memory unlike `LBS.readFile`. What will be a good way to return lazy bytestring for write to network socket while keeping memory usage low?
{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}
import qualified Aws
import qualified Aws.S3 as S3
import Data.Conduit (($$+-))
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as SP
import qualified Data.ByteString.Lazy as LBS
import Streaming as S
import Streaming.Prelude as S hiding (show,print)
import Control.Concurrent.Async (async,waitCatch)
import Data.Text as T (Text)
data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration, _aws_s3cfg :: S3.S3Configuration a, _aws_httpmgr :: SP.Manager }
getObject :: AwsConfig Aws.NormalQuery -> T.Text -> T.Text -> IO Int
getObject cfg bucket key = do
req <- waitCatch =<< async (runResourceT $ do
{- Create a request object with S3.getObject and run the request with pureAws. -}
S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <-
Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
S3.getObject bucket key
{- Stream the response to a lazy bytestring -}
liftIO $ LBS.writeFile "testaws" LBS.empty -- this will be replaced by content-length of the bytes
let obj = (($$+- CL.mapM_ S.yield) . hoist lift ) (SP.responseBody rsp)
S.mapM_ (liftIO . (LBS.appendFile "testaws") . LBS.fromStrict) obj
return $ lookup "content-length" (S3.omUserMetadata mdata))
case req of
Left _ -> return 2 -- perhaps, we could use this to send an error message over websocket