ETL: remove the CachingEnumerable from JoinOperation

50 views
Skip to first unread message

jalchr

unread,
Jul 8, 2011, 9:47:44 AM7/8/11
to rhino-t...@googlegroups.com
I'm trying to getting out of OutOfMemory Exception, so I noticed about 1.5 Million rows are being cached in the CachingEnumerable object ... I wonder why this doesn't stream ... why its cached. So I changed the following lines:

   
    protected IEnumerable<Row> GetRightEnumerable()
        {
            //IEnumerable<Row> rightEnumerable = new CachingEnumerable<Row>(
            //    new EventRaisingEnumerator(right, right.Execute(null))
            //    );
            IEnumerable<Row> rightEnumerable = new EventRaisingEnumerator(right, right.Execute(null));


and it works perfectly with much less memory ... I only had to increase the input operation timeout value 

All is good except with MultiThreadedBranchingOperation ... where after some processing it get stuck on:

lock (sync)
while (input.ConsumersLeft > 0)
Monitor.Wait(sync);  // <--------------------------- here


Questions: 
why you cache to join?
why it stucks there ?

Thanks


Simone Busoli

unread,
Jul 9, 2011, 7:25:54 PM7/9/11
to rhino-t...@googlegroups.com

The operation result is shoved into a caching enumerabke because it's iterated more than once. Please note that if you remove that then you're effectively enumerating the source more than one time, which may not be what you expect.
I can't tell why it gets stuck on the multi threaded branching operation, have you tried then single threaded one? Do you have a repro for the issue?

> --
> You received this message because you are subscribed to the Google Groups "Rhino Tools Dev" group.
> To view this discussion on the web visit https://groups.google.com/d/msg/rhino-tools-dev/-/rOzjgybGSYkJ.
> To post to this group, send email to rhino-t...@googlegroups.com.
> To unsubscribe from this group, send email to rhino-tools-d...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/rhino-tools-dev?hl=en.
>

Miles Waller

unread,
Jul 10, 2011, 10:58:19 AM7/10/11
to rhino-t...@googlegroups.com
Hi,

FWIW, I once had a problem where the caching enumerable can interact with the branching operation in such a way that the pipeline thinks it has finished when it hasn't.  IIRC, this was because the rows had been read into the caching enumerables, but not necessarily processed by all the subsequent operations in all the branches - it appeared to terminate when the first branch finished.

The problem went away when I switched to the single-threaded pipeline executer, which is what I had meant to use in the first place, so I didn't investigate any further!

@jalchr: if you have memory problems, use the single-threaded/simple pipeline executer when running your ETL to avoid all the rows being read into memory.  This will avoid having to change the code in any of the operations.

Miles

jalchr

unread,
Jul 11, 2011, 6:06:16 AM7/11/11
to rhino-t...@googlegroups.com
@SimoneB,
I don't care if it iterates more than once as long as it doesn't kill the memory ... so resources are more critical than time in this case.
Question: How does the Single-Threaded pipeline play with the Multi-Threaded Branch ? I think something is inconsistent over here ?


@Miles, I'm using the single-threaded pipeline executer, and still the JoinOperation does use the CachingEnumerable which doesn't respect the pipeline work:

A check should be there in the JoinOperation 

If(Pipeline.UseCache)
  // then use the cachingEnumerable
else
// don't use it ... 


jalchr

unread,
Jul 11, 2011, 7:04:44 AM7/11/11
to rhino-t...@googlegroups.com
One more thing: 
I can't tell why it gets stuck on the multi threaded branching operation, have you tried then single threaded one? 
The BranchOperation suffers the same problem ... it caches the rows then branches to child operations  

Miles Waller

unread,
Jul 11, 2011, 7:45:43 AM7/11/11
to rhino-t...@googlegroups.com
Hi,

@Miles, I'm using the single-threaded pipeline executer, and still the JoinOperation does use the CachingEnumerable which doesn't respect the pipeline work:

I think you've misunderstood how the JoinOperation works internally: a row from the left can match multiple rows on the right (just like a sql join), which, as SimoneB says, requires the 
enumerable to be cached so it can be enumerated multiple times.  Otherwise, you're really talking about some sort of "merge" operation - and you have to write that yourself :-), or executing some sort of lookup query once per row, which is pretty expensive, and you have to write that yourself too!  

One thing - you could try switching which query is left and which is right, so the query with the fewest rows is the cached one.

Miles

 

A check should be there in the JoinOperation 

If(Pipeline.UseCache)
  // then use the cachingEnumerable
else
// don't use it ... 

--
You received this message because you are subscribed to the Google Groups "Rhino Tools Dev" group.
To view this discussion on the web visit https://groups.google.com/d/msg/rhino-tools-dev/-/vLPKO9szxNcJ.

jalchr

unread,
Jul 11, 2011, 8:44:58 AM7/11/11
to rhino-t...@googlegroups.com
Miles,
If you want to use the Single-Threaded-Pipe-line to consume less memory, then you don't want any "CachingEnumerable" in the way holding resources .... think of millions of records which can't stay in the memory. In this case, you don't care if it is queried more than once from the source, as long as it doesn't OOM.

So the join operation should bend with the current pipeline

protected IEnumerable<Row> GetRightEnumerable()
        {
           
If(Pipeline.UseCache)
  // then use the cachingEnumerable
IEnumerable<Row> rightEnumerable = new CachingEnumerable<Row>(
               new EventRaisingEnumerator(right, right.Execute(null))
                );
else
// don't use it ... 

Simone Busoli

unread,
Jul 11, 2011, 7:15:32 PM7/11/11
to rhino-t...@googlegroups.com
On Mon, Jul 11, 2011 at 12:06, jalchr <jal...@gmail.com> wrote:
@SimoneB,
I don't care if it iterates more than once as long as it doesn't kill the memory ... so resources are more critical than time in this case.
Question: How does the Single-Threaded pipeline play with the Multi-Threaded Branch ? I think something is inconsistent over here ?

Take a look at the SingleThreadedNonCachedPipelineExecuter class, it may help but currently I think join and branching operations are all performing some caching.
Pipeline executor takes care of executing the main pipeline. Branches imply a split in the pipeline and currently branching operations take care of executing their inner operations.
This means that using a single threaded pipeline with a multi threaded branching operation you have a single thread main pipeline and a multi thread branching. There are several tests which cover all permutations.
 


@Miles, I'm using the single-threaded pipeline executer, and still the JoinOperation does use the CachingEnumerable which doesn't respect the pipeline work:

Right rows are cached because they are needed for generating lookups by key and then report orphan rows. How would you do that without caching them? That said, there is probably room for improvement.
 

A check should be there in the JoinOperation 

If(Pipeline.UseCache)
  // then use the cachingEnumerable
else
// don't use it ... 

--
You received this message because you are subscribed to the Google Groups "Rhino Tools Dev" group.
To view this discussion on the web visit https://groups.google.com/d/msg/rhino-tools-dev/-/vLPKO9szxNcJ.

Simone Busoli

unread,
Jul 11, 2011, 7:19:08 PM7/11/11
to rhino-t...@googlegroups.com
The caching of input is performed in several places for good reasons, and branching is one of them. Your requirement is certainly valid, but that's not how it currently works.

--
You received this message because you are subscribed to the Google Groups "Rhino Tools Dev" group.
To view this discussion on the web visit https://groups.google.com/d/msg/rhino-tools-dev/-/6EmNs_2QtkoJ.

Simone Busoli

unread,
Jul 11, 2011, 7:23:26 PM7/11/11
to rhino-t...@googlegroups.com
The problem is that join and branching operations are special cases which are somewhat not directly tied to the pipeline executor and perform themselves execution of their inner operations in whatever way they believe is the best way. Perhaps the concept of executor should be extended to be consistent in the entire pipeline, but I'm not sure it would work in every case.

--
You received this message because you are subscribed to the Google Groups "Rhino Tools Dev" group.
To view this discussion on the web visit https://groups.google.com/d/msg/rhino-tools-dev/-/u_XrzPJ7zroJ.
Reply all
Reply to author
Forward
0 new messages