folding over groups?

瀏覽次數:92 次
跳到第一則未讀訊息

Erik Rantapaa

未讀,
2015年8月14日 晚上7:49:072015/8/14
收件者:Haskell Pipes
I'm trying to figure out how to process a large number of files (ByteStrings) in groups of, say 20, with the results of each group going to a different output file.

For instance, I have:

    theFiles :: Producer ByteString m r
    processFile :: Handle -> ByteString -> IO ()

and for the first 20 ByteStrings I want the handle passed to processFile to be opened to "output-1", and for the next 20 it should be opened to "output-2", etc.

I'm sure I could write it in a very mundane fashion using Pipes.foldM, but I'm sure there is a better way.

Thanks!

Erik Rantapaa

未讀,
2015年8月14日 晚上11:40:472015/8/14
收件者:Haskell Pipes


On Friday, August 14, 2015 at 6:49:07 PM UTC-5, Erik Rantapaa wrote:
I'm trying to figure out how to process a large number of files (ByteStrings) in groups of, say 20, with the results of each group going to a different output file.

 
Ok, I've come up with something I can live with which uses Pipes.Parse.splitAt:

loop n = do
  g <- zoom (PP.splitAt 3) PP.drawAll
  case g of
    [] -> return ()
    _  -> do let out = "output-" ++ show n
             liftIO $ withFile out WriteMode $ \h -> do
                        putStrLn $ "got a group, writing to " ++ out
                        forM_ g $ \(fname,content) -> putStrLn $ " - processing " ++ fname
             loop (n+1)

runExample = do
    let input = P.each [ (c, "content for file " ++ {c]) | c <- "abcdefghij" ]
    evalStateT (loop 1) input

What I don't like about it is that it (apparently) has to construct the entire list `g` in memory before processing its elements.

I also came up with this idea which (I think) addresses that concern:

loop = do
  end <- PP.isEndOfInput
  if end
    then return ()
    else do s <- S.get
            let x = s ^. (PP.splitAt 3)
            liftIO $ putStrLn "starting a new group"
            y <- lift $ runEffect (x >-> P.print)
            S.put y
            loop

runLoop = do
  let input = P.each [ ("a", "asds"), ("b", "qwe"), ("c", "asdas"), ("d", "qwkjqkwe") ]
  evalStateT loop input

Am I getting warmer? colder?

Alexey Raga

未讀,
2015年8月15日 清晨6:44:002015/8/15
收件者:Haskell Pipes
Have you tried "pipes-group" library? It has the ability to group and to fold over groups.

Michael Thompson

未讀,
2015年8月15日 下午1:04:232015/8/15
收件者:Haskell Pipes
Here's a little noodling. The desired operation is something like

     numberedChunkLoop 20 action2b documents

I don't think this is a particularly idiomatic implemenation, but the numbering
device is pleasantly simple and avoids using StateT and so on.  Also, the 
file handling part is completely dumb, but only one handle is open 
at a time.


      {-#LANGUAGE DeriveFunctor, OverloadedStrings #-}
      import qualified Pipes.Prelude as P
      import Pipes.Group 
      import Pipes.Parse as PP
      import Pipes
      import qualified Pipes.ByteString as PB
      import qualified Control.Foldl as L
      import Lens.Simple -- or Control.Lens or microlens etc.
      import Control.Monad.Trans.Free
      import Control.Monad.Trans.State.Strict
      import qualified Data.ByteString.Char8 as B
      import qualified System.IO as IO

      seven :: Monad m => Producer Int m ()
      seven = each [1..7::Int] 


      documents :: Monad m => Producer B.ByteString m ()
      documents = seven >-> P.map (B.pack . show) 

      seven_three :: Monad m => FreeT (Producer Int m) m ()
      seven_three =  view (chunksOf 3) seven

      seven_three_sums :: Monad m => Producer Int m ()
      seven_three_sums = L.purely folds L.sum seven_three


      -- probably reducible to standard combinators
      chunkLoop
        :: Monad m
        => Int
        -> (forall x . Producer a' m x -> m x)
        -> Producer a' m r -> m r
      chunkLoop n action p = loop (p ^. chunksOf n) where
        loop free = do 
          e <- runFreeT free
          case e of 
            Pure r -> return r
            Free ff -> do
              free2 <- action ff
              loop free2
        
      action0 p = do 
          (b, p') <- L.purely P.fold' L.sum p
          liftIO $ print b
          return p'
          
      -- >  chunkLoop 2 action0 seven
      -- 3
      -- 7
      -- 11
      -- 7
      -- [*Main]
      -- >  chunkLoop 5 action0 seven
      -- 15
      -- 13

      --  contained in standard combinators
      chunkLoop'
        :: Monad m
        => Int
        -> (forall x . Producer a m x -> Producer b m x)
        -> Producer a m r -> Producer b m r
      chunkLoop' n action = concats . maps action . view (chunksOf n) 

      action1 p = do
        x <- p
        liftIO $ putStrLn "I'm just a string marking end of group"
        return x
      -- > runEffect $ chunkLoop' 3 action1  seven >-> P.print
      -- 1
      -- 2
      -- 3
      -- I'm just a string marking end of group
      -- 4
      -- 5
      -- 6
      -- I'm just a string marking end of group
      -- 7
      -- I'm just a string marking end of group


      -- see implementation of numberedFrom below
      type ChunkSize = Int
      numberedChunkLoop
        :: Monad m
        => ChunkSize
        -> (forall x . Int -> Producer a' m x -> m x)
        -> Producer a' m r
        -> m r
      numberedChunkLoop chunk_size action p = loop numbered_chunky
       where
         numbered_chunky = numberFrom 1 (view (chunksOf chunk_size) p)
         loop free = do
           e <- runFreeT free
           case e of 
             Pure r -> return r
             Free (Number m ff) -> do
               free2 <- action m ff
               loop free2

      action2a  :: MonadIO m => Int -> Producer B.ByteString m a -> m a
      action2a n p = do 
        liftIO $ putStrLn $ "group " ++ show n
        fmap snd $ L.impurely P.foldM' printFold p
        where
          printFold = L.FoldM (\_ x-> liftIO $ putStr "    " >> print x) (return ()) return
    
      -- > numberedChunkLoop 3 action2a documents
      -- group 1
      --     "1"
      --     "2"
      --     "3"
      -- group 2
      --     "4"
      --     "5"
      --     "6"
      -- group 3
      --     "7"

      action2b :: MonadIO m => Int -> Producer B.ByteString m a -> m a
      action2b n p = do
       h <- liftIO $ IO.openFile ("xyz" ++ show n ++ ".txt") IO.WriteMode
       rest <- runEffect $ p >-> PB.toHandle h
       liftIO $ B.hPut h "\n"
       liftIO $ IO.hClose h
       return rest
 
       -- > numberedChunkLoop 3 action2b documents
       -- > :! ls | grep xyz
       -- xyz1.txt
       -- xyz2.txt
       -- xyz3.txt
       -- > :! cat xyz1.txt
       -- 123
       -- > :! cat xyz2.txt
       -- 456
       -- > :! cat xyz3.txt
       -- 7



      -- for the definition of 'numberedChunkLoop'
      -- one could use `Compose ((,) Int) f`
      data Numbered f r = Number !Int (f r)  deriving (Show, Eq, Ord, Functor)
      -- ghc derives : instance Functor f => Functor (Number f)

      numberFrom :: (Functor f, Monad m) => Int -> FreeT f m r -> FreeT (Numbered f) m r
      numberFrom = loop where
        loop n f = FreeT $ do
          p <- runFreeT f
          case p of
            Pure r -> return (Pure r)
            Free gg -> return $ Free $ Number n (fmap (loop (n+1)) gg)


Michael Thompson

未讀,
2015年8月15日 下午1:24:222015/8/15
收件者:Haskell Pipes
This is a little better, I think; I'm not too good at handle handling:

     action2c :: Int -> Producer B.ByteString IO a -> IO a
     action2c n p = IO.withBinaryFile ("xyz" ++ show n ++ ".txt") IO.WriteMode $ \h ->
         do rest <- runEffect $ p >-> PB.toHandle h
            liftIO $ B.hPut h "\n" >> IO.hClose h
            return rest

     -- > numberedChunkLoop 5 action2c documents
     -- > :! ls | grep xyz
     -- xyz1.txt
     -- xyz2.txt
     -- > :! cat xyz1.txt
     -- 12345
     -- > :! cat xyz2.txt
     -- 67

Gabriel Gonzalez

未讀,
2015年8月16日 上午10:49:492015/8/16
收件者:haskel...@googlegroups.com、eran...@gmail.com
This is what the `pipes-group` library was designed to handle: you can split a stream into groups without having to fully materialize each group.  If you are new to `pipes-group`, you should check out the tutorial here:

http://hackage.haskell.org/package/pipes-group-1.0.2/docs/Pipes-Group-Tutorial.html

... but I'll summarize the parts relevant to your problem.

First, you need a way to split `theFiles` into groups of 20 elements.  The relevant utility from `pipes-group` is the `chunksOf` lens, which you would use like this:

    view (chunksOf 20) theFiles :: FreeT (Producer ByteString IO) IO ()

That creates a "linked list" of `Producer`s and each `Producer` contains 20 `ByteString`s except for the last one which contains up to 20 elements.

Then you need a way to reduce each `Producer` to write out to a single file.  The most direct way to do this is to just explicitly recurse over the linked list of `Producer`s to extract one `Producer` at a time.  You can "pattern match" on the head of the "linked list" by using the `runFreeT` function:

    loop :: Int -> FreeT (Producer ByteString IO) IO () -> IO ()
    loop n f = do
        x <- runFreeT f
        case x of
            -- No more `Producer`s left, we're done
            -- Think of this case as analogous to `Nil`
            Pure () -> return ()

            -- Found a `Producer`, process it
            -- Think of this case as analogous to `Cons`
            -- The `Producer`'s return value is the rest of the "linked list"
            Free p -> do
                -- p :: Producer ByteString IO (FreeT (Producer ByteString IO) IO ())
                f' <- withFile ("output" <> show n) ReadMode (\handle ->
                    for p (\bytestring -> liftIO (processFile handle bytestring)) )

                -- f' :: FreeT (Producer ByteString IO) IO ()
                loop (n + 1) f'

... and once you have that then the final solution is:

    loop 0 (view (chunksOf 20) theFiles)

There are some higher-order combinators that you can find in `Control.Monad.Trans.Free` (from the `free` package) for folding a `FreeT` data structure like `iter` and `iterT` and there are also the `folds`/`foldsM` utilities from `pipes-group` as well.  However, I think in this particular case the simplest approach is just explicit manual recursion.
--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.
To post to this group, send email to haskel...@googlegroups.com.

Michael Thompson

未讀,
2016年1月1日 晚上10:52:152016/1/1
收件者:Haskell Pipes、eran...@gmail.com
It just occurred to me, for what it's worth, that this problem is easily solved using the `chunksOf` 

Then you can use one of the functor-general zipping functions from the `Streaming` module e.g.

   zips :: (Monad m, Functor f, Functor g) => Stream f m r -> Stream g m r -> Stream (Compose f g) m r
   zipsWith:: (Monad m, Functor h) => (forall x y. f x -> g y -> h (x, y)) -> Stream f m r -> Stream g m r -> Stream h m r

Of course these zips could as well be implemented in `Control.Monad.Trans.Free`.  Then you just write something that in Data.List style would
look like, say, `mapM_ action $ zip filepaths $ chunksOf n producer`

 
    module Main (main) where

    import Pipes
    import qualified Pipes.Prelude as P
    import Streaming 
    import qualified Streaming.Prelude as S
    import qualified Streaming.Pipes as SP
    import qualified System.IO as IO 

    main = run $ maps action $ zips filepaths $ SP.chunksOf 2 $ text_producer 3
      where

      text_producer :: Int -> Producer String IO ()
      text_producer n = P.stdinLn >-> P.take n

      filepaths :: Stream (Of FilePath) IO ()
      filepaths =  S.each $ map (\n -> show n ++ ".txt") [1..] -- ["1.txt","2.txt"..]

      action :: Compose (Of FilePath) (Producer String IO) r -> IO r
      action (Compose (file :> p)) = IO.withFile file IO.WriteMode $ \h ->
               runEffect $ p >-> P.toHandle h



     -- >>> main
     -- hello<Enter> 
     -- world<Enter> 
     -- goodbye<Enter> 
     -- >>> :! cat 1.txt
     -- hello
     -- world
     -- >>> :! cat 2.txt 

    -- goodbye

    -- >>> 

回覆所有人
回覆作者
轉寄
0 則新訊息