stream :: MonadIO m => S.Stream (S.Of B.ByteString) m r -> Result -> S.Stream (S.Of B.ByteString) m r
stream :: MonadIO m => ByteString m r -> Result -> ByteString m r
stream (Go m) res = lift m >>= flip stream res
stream bs (Error who what) = error (who ++ ": " ++ what)
stream bs (Produce bytes res') = Chunk bytes (liftIO res' >>= stream bs)
stream (Chunk c cs) (Consume f) = liftIO (f c) >>= stream cs
stream (Empty r) (Consume f) = liftIO (f mempty) >>= stream (Empty r)
stream (Empty r) (Done o) = Chunk o (Empty r)
stream input state = error $ "unpossible! bytes of input left in stream state "
++ show state
import Streaming.Prelude as S
import Data.IORef
import Streaming as S
import qualified Codec.Compression.Zstd.Streaming as Z
import qualified Data.ByteString as BS (ByteString,empty)
-- Compression streamer - uses Zstd compression
streamZstd :: (MonadIO m,Monad m) => IO Z.Result -> Stream (Of BS.ByteString) m () -> Stream (Of BS.ByteString) m ()
streamZstd pop inp = loop inp pop
where
loop bytes res = do
bs <- liftIO res
case bs of
Z.Error who what -> error (who ++ ": " ++ what)
Z.Done bs -> (lift . S.uncons $ bytes) >>= (maybe (S.yield bs) (\_ -> error "Compress/Decompress ended while input stream still had bytes"))
Z.Produce bs npop -> S.yield bs >> loop bytes npop
-- if we run out of input stream, call loop with empty stream, and compress function with empty ByteString
-- to signal end - we should then be in Done state in next call to loop
Z.Consume f -> (lift . S.uncons $ bytes) >>= (maybe (loop (return ()) (f BS.empty)) (\(bs,nbs) -> loop nbs (f bs)))
decompress :: (MonadIO m,Monad m) => Stream (Of BS.ByteString) m () -> Stream (Of BS.ByteString) m ()
decompress = streamZstd Z.decompress
compress :: (MonadIO m,Monad m) => Int -> Stream (Of BS.ByteString) m () -> Stream (Of BS.ByteString) m ()
compress level = streamZstd (Z.compress level)