Adapting zstandard streaming compression in Streaming

28 views
Skip to first unread message

Sal

unread,
Mar 10, 2018, 5:56:10 PM3/10/18
to Haskell Pipes
Hello,

I am trying to adapt streaming version of `zstandard` using `Streaming` library. There is already a version that exists using `ByteString m r`. I can't figure out how to implement a function like below, and will appreciate help:

stream :: MonadIO m => S.Stream (S.Of B.ByteString) m r -> Result -> S.Stream (S.Of B.ByteString) m r

Here is the original code from `streaming-zstd`:

stream :: MonadIO m => ByteString m r -> Result -> ByteString m r
stream
(Go m) res = lift m >>= flip stream res
stream bs
(Error who what) =  error (who ++ ": " ++ what)
stream bs
(Produce bytes res') = Chunk bytes (liftIO res' >>= stream bs)
stream
(Chunk c cs) (Consume f) = liftIO (f c) >>= stream cs
stream
(Empty r) (Consume f)    = liftIO (f mempty) >>= stream (Empty r)
stream
(Empty r) (Done o) =  Chunk o (Empty r)
stream input state
= error $ "unpossible! bytes of input left in stream state "
                         
++ show state





Sal

unread,
Mar 14, 2018, 4:49:06 PM3/14/18
to Haskell Pipes
Figured out how to do this. Adding the code here for others who might try to solve this problem in future. Improvement suggestions are welcome.

import Streaming.Prelude as S
import Data.IORef
import Streaming as S
import qualified Codec.Compression.Zstd.Streaming as Z
import qualified Data.ByteString as BS (ByteString,empty)


-- Compression streamer - uses Zstd compression
streamZstd
:: (MonadIO m,Monad m) => IO Z.Result -> Stream (Of BS.ByteString) m () -> Stream (Of BS.ByteString) m ()
streamZstd pop inp
= loop inp pop
 
where
    loop bytes res
= do
      bs
<- liftIO res
     
case bs of
        Z
.Error who what -> error (who ++ ": " ++ what)
        Z
.Done bs -> (lift . S.uncons $ bytes) >>= (maybe (S.yield bs) (\_ -> error "Compress/Decompress ended while input stream still had bytes"))
        Z
.Produce bs npop -> S.yield bs >> loop bytes npop
       
-- if we run out of input stream, call loop with empty stream, and compress function with empty ByteString
       
-- to signal end - we should then be in Done state in next call to loop
        Z
.Consume f -> (lift . S.uncons $ bytes) >>= (maybe (loop (return ()) (f BS.empty)) (\(bs,nbs) -> loop nbs (f bs)))


decompress
:: (MonadIO m,Monad m) => Stream (Of BS.ByteString) m () -> Stream (Of BS.ByteString) m ()
decompress
= streamZstd Z.decompress


compress
:: (MonadIO m,Monad m) => Int -> Stream (Of BS.ByteString) m () -> Stream (Of BS.ByteString) m ()
compress level
= streamZstd (Z.compress level)
Reply all
Reply to author
Forward
0 new messages