Deadlock hazards and joins

317 views
Skip to first unread message

Heikki Linnakangas

unread,
Jan 23, 2017, 10:00:46 AM1/23/17
to Greenplum Developers, Shreedhar A Hardikar
Hi!

In PR 1583 [1], me and Shreedhar pondered the mechanism that we have in
the planner/executor, to avoid so called "deadlock hazards". If I
understood correctly, we keep track of whether there is a Motion node on
both sides of a join, and if so, we force one side of the join to be
fully materialized, before fetching the first tuple from the other side
of the join. This effectively serializes the Motion nodes, so that only
one of them is active at any time.

There are comments explaining how we avoid that situation, and force the
"prefetching" of one side of a join, and there's even a debugging GUC,
gp_enable_motion_deadlock_sanity, to perform additional checks on plans,
and warn if a plan is nevertheless created that poses this "deadlock
hazard".

However, I could not find an explanation anywhere of what the underlying
deadlock problem is! Anyone remember?

I've been trying to think about this, but I don't see any fundamental
reason why a join with a Motion node on both sides would deadlock. For
the giggles, I removed all the code related to deadlock hazards [2]. And
it seems to pass all the regresion tests, except for one issue:

In segspace test, with gp_cte_sharing=on, this query gets stuck:

> regression=# explain update foo set j=m.cc1 from (
> with ctesisc as
> (select * from testsisc order by i2)
> select t1.i1 as cc1, t1.i2 as cc2
> from ctesisc as t1, ctesisc as t2
> where t1.i1 = t2.i2 ) as m;
> QUERY PLAN
> ----------------------------------------------------------------------------------------------------------------------------
> Update (slice0; segments: 3) (rows=2 width=18)
> -> Explicit Redistribute Motion 3:3 (slice3; segments: 3) (cost=2.70..3.91 rows=2 width=18)
> -> Nested Loop (cost=2.70..3.91 rows=2 width=18)
> -> Seq Scan on foo (cost=0.00..1.01 rows=1 width=14)
> -> Materialize (cost=2.70..2.80 rows=4 width=4)
> -> Broadcast Motion 3:3 (slice2; segments: 3) (cost=2.26..2.69 rows=4 width=4)
> -> Subquery Scan m (cost=2.26..2.56 rows=2 width=4)
> -> Hash Join (cost=2.26..2.52 rows=2 width=8)
> Hash Cond: share0_ref2.i2 = share0_ref1.i1
> -> Redistribute Motion 3:3 (slice1; segments: 3) (cost=1.02..1.25 rows=1 width=4)
> Hash Key: si2
> -> Shared Scan (shhare0_ref2.are slice:id 1:0) (cost=1.02..1.23 rows=1 width=8)
> -> Hash (cost=1.23..1.23 rows=1 width=8)
> -> Shared Scan (share slice:id 2:0) (cost=1.02..1.23 rows=1 width=8)
> -> Sort (cost=1.02..1.02 rows=1 width=8)
> Sort Key: public.testsisc.i2
> -> Seq Scan on testsisc (cost=0.00..1.01 rows=1 width=8)
> Settings: gp_cte_sharing=on
> Optimizer status: legacy query optimizer
> (19 rows)

It seems that the outer side of the join, with the first Shared Scan
node returns 0 rows. Because of that, the Hash Join node never bothers
to execute the outer side - the join returns 0 rows regardless of the
inner side, if there are no rows from the outer side. So, the second
Shared Scan node is never executed. And that SharedInputScan code
doesn't seem to handle that too well. When shutting down the executor,
the "producer" side of the Shared Scan waits until the "consumer" side
has read all the data, but the consumer never even started, so the
producer waits forever.

So, there is that "deadlock hazard". However, I don't see why that's
restricted to joins. Seems like the same could happen in any plan, with
a "cross-slice" Shared Scan node.

And indeed, I was able to construct a case that doesn't involve any
joins, and locks up indefinitely with an unmodified git checkout:

create table testsisc (i1 integer, i2 integer);
insert into testsisc values (1, 1);
set gp_cte_sharing=on;
with ctesisc as
(select * from testsisc order by i2)
select i1 from ctesisc where 1=2
union all
select count(*) from ctesisc t;

The plan looks like this:

> regression=# explain with ctesisc as
> (select * from testsisc order by i2)
> select i1 from ctesisc where 1=2
> union all
> select count(*) from ctesisc t;
> QUERY PLAN
> --------------------------------------------------------------------------------------------------------------
> Gather Motion 3:1 (slice3; segments: 3) (cost=1.02..2.55 rows=2 width=6)
> -> Append (cost=1.02..2.55 rows=1 width=6)
> -> Subquery Scan "*SELECT* 1" (cost=1.02..1.24 rows=1 width=4)
> -> Result (cost=1.02..1.23 rows=1 width=4)
> One-Time Filter: false
> -> Shared Scan (share slice:id 3:0) (cost=1.02..1.23 rows=1 width=8)
> -> Sort (cost=1.02..1.02 rows=1 width=8)
> Sort Key: public.testsisc.i2
> -> Seq Scan on testsisc (cost=0.00..1.01 rows=1 width=8)
> -> Redistribute Motion 1:3 (slice2; segments: 1) (cost=1.30..1.32 rows=1 width=8)
> Hash Key: (count((count(*))))
> -> Aggregate (cost=1.30..1.31 rows=1 width=8)
> -> Gather Motion 3:1 (slice1; segments: 3) (cost=1.23..1.28 rows=1 width=8)
> -> Aggregate (cost=1.23..1.24 rows=1 width=8)
> -> Subquery Scan t (cost=1.02..1.23 rows=1 width=0)
> -> Shared Scan (share slice:id 1:0) (cost=1.02..1.23 rows=1 width=8)
> Settings: gp_cte_sharing=on
> Optimizer status: legacy query optimizer
> (18 rows)

The trick here is similar to what the join did in the original query:
the Shared Scan in the first branch of the Append is never executed,
because of the One-Time Filter in the Result node.


So my conclusions are:

1. There is a bug with cross-slice Shared Scans in general that needs to
be fixed.

2. There is no particular danger with Motion nodes below joins.

Am I missing something?

[1] https://github.com/greenplum-db/gpdb/pull/1583
[2] https://github.com/hlinnaka/gpdb/tree/motion-hazard

- Heikki

Gang

unread,
Jan 24, 2017, 4:18:28 AM1/24/17
to Greenplum Developers, shar...@pivotal.io
Hi Heikki,

I don't know the history of deadlock hazard neither. The issues is mentioned in jira mpp-898 at the first place, i changed a little bit on the sql script in the comments and i am able to reproduce a deadlock using your branch.

drop table tjk__page_lu;
drop table tjk__tmp_hit;
create table tjk__page_lu (id int, a text);
create table tjk__tmp_hit (id bigint, a text, cleansed smallint);
insert into tjk__page_lu select 1,'a' from generate_series(1,100000);
insert into tjk__page_lu select 2,'aa' from generate_series(1,100000);
insert into tjk__page_lu select 3,'a//a' from generate_series(1,100000);
insert into tjk__page_lu select 4,'a//a.dd' from generate_series(1,100000);
insert into tjk__page_lu select 5,'a//a.dd[dad' from generate_series(1,100000);
insert into tjk__page_lu select 6,'a//a./dd[dad' from generate_series(1,100000);
insert into tjk__tmp_hit select 1,'a//a./dd[dad',22 from generate_series(1,100000);
insert into tjk__tmp_hit select 2,'a/a.//dd[dad',22 from generate_series(1,100000);
insert into tjk__tmp_hit select 3,'a/a.//dd[dad',22 from generate_series(1,100000);
insert into tjk__tmp_hit select 4,'a/a.//ddad',22 from generate_series(1,100000);
analyze tjk__page_lu;
analyze tjk__tmp_hit;
 select replace(trim(substring(tjk__tmp_hit.a from '[ ].[ a-z]')),'//','/') from tjk__tmp_hit left outer join tjk__page_lu on tjk__page_lu.a = replace(trim(substring(tjk__tmp_hit.a from '[ ].[ a-z]')),'//','/') where tjk__tmp_hit.cleansed is not null;

Seems like it is caused by the line nodeHashjoin.c:144, it fetch a tuple from the outer node before create the hash table, but i think this code is to fix a problem introduced when fix the deadlock hazard problem. So i am not sure how it is.

Heikki Linnakangas

unread,
Jan 24, 2017, 7:53:41 AM1/24/17
to Gang, Greenplum Developers, shar...@pivotal.io
On 01/24/2017 11:18 AM, Gang wrote:
> Hi Heikki,
>
> I don't know the history of deadlock hazard neither. The issues is
> mentioned in jira mpp-898 at the first place, i changed a little bit on the
> sql script in the comments and i am able to reproduce a deadlock using your
> branch.
>
> [repro]

A-ha, great, thanks for the repro!

> Seems like it is caused by the line nodeHashjoin.c:144, it fetch a tuple
> from the outer node before create the hash table, but i think this code is
> to fix a problem introduced when fix the deadlock hazard problem. So i am
> not sure how it is.

So, the query plan looks like this:

> regression=# explain select replace(trim(substring(tjk__tmp_hit.a from '[ ].[ a-z]')),'//','/')
> from tjk__tmp_hit left outer join tjk__page_lu on tjk__page_lu.a =
> replace(trim(substring(tjk__tmp_hit.a from '[ ].[ a-z]')),'//','/');
> QUERY PLAN
> ----------------------------------------------------------------------------------------------------------------------------
> Gather Motion 3:1 (slice3; segments: 3)
> -> Hash Left Join
> Hash Cond: replace(btrim("substring"(tjk__tmp_hit.a, '[ ].[ a-z]'::text)), '//'::text, '/'::text) = tjk__page_lu.a
> -> Redistribute Motion 3:3 (slice1; segments: 3)
> Hash Key: replace(btrim("substring"(tjk__tmp_hit.a, '[ ].[ a-z]'::text)), '//'::text, '/'::text)
> -> Seq Scan on tjk__tmp_hit
> -> Hash
> -> Redistribute Motion 3:3 (slice2; segments: 3)
> Hash Key: tjk__page_lu.a
> -> Seq Scan on tjk__page_lu
> Optimizer status: legacy query optimizer
> (11 rows)

Things that are crucial for the deadlock:

* With the prefetch-stuff removed, a HashJoin works in three steps:

1. Fetch 1 row from outer side, to check if there are any tuples there
2. Fetch all tuples from inner side, to build the hash table
3. Fetch all remaining tuples from outer side, and perform hash lookup
for each tuple.

* The Send side of a motion has limited buffer capacity. After the send
buffer fills up, it blocks until the receiver acknowledges a tuple as
received

When the deadlock occurs:

Process A: One of the backends performing the Hash join has received the
first tuple from the outer side (it is in step 2). It is waiting for
another tuple from the inner side. Let's call this process A

Process B: Other backends doing the Hash Join in other segments are
still waiting for the first tuple from the outer side. (they are in step
1). Let's call these processes together as B

Process C: All the backends performing the Seq Scan on tjk__tmp_hit have
filled their send queues with tuples destined for process A. There are
more tuples destined for the other processes in the table, but the send
queue is full, so it's waiting for process A to acknowledge some of them
before sending anything to the other processes

Process D: The processes performing the Seq Scan on tjk__page_lu has
filled its send queue with tuples destined for processes B. It will not
send anything to process A, until processes B have acknowledged some of
the tuples.


So:

Process A is waiting for tuples from D
Process B is waiting for tuples from C
Process C is waiting for process A to acknowledge
Process D is waiting for process B to acknowledge.

That forms a cycle, and so we have a deadlock.


The prefetching stuff breaks the cycle, by turning the HashJoin from a
three step process to a two-step process:

1. Fetch all tuples from inner side, building hash table
2. Fetch all tuples from outer side, perform lookup for each tuple.


I understand now, how the deadlock occurs, and how the prefetch stuff
prevents it. Let's add comments and/or a README somewhere to explain
this, now that we have this fresh in mind! Any volunteers?

- Heikki

Shreedhar A Hardikar

unread,
Jan 26, 2017, 7:48:02 PM1/26/17
to Heikki Linnakangas, Gang, Greenplum Developers
Heikki, thanks a bunch for giving a detailed explanation of what causes the deadlock. After reading the explanation, I had some questions about how we actually get into a state that causes the deadlock. I spent some time analyzing the repro that Gang send out, and wanted to share some of my conclusions/findings.

The query in the repro has the following plan :

regression=# explain select replace(trim(substring(tjk__tmp_hit.a from '[ ].[ a-z]')),'//','/')
from tjk__tmp_hit left outer join tjk__page_lu on tjk__page_lu.a =
replace(trim(substring(tjk__tmp_hit.a from '[ ].[ a-z]')),'//','/');
                                                         QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice3; segments: 3)
   ->  Hash Left Join
         Hash Cond: replace(btrim("substring"(tjk__tmp_hit.a, '[ ].[ a-z]'::text)), '//'::text, '/'::text) = tjk__page_lu.a
         ->  Redistribute Motion 3:3  (slice1; segments: 3)
               Hash Key: replace(btrim("substring"(tjk__tmp_hit.a, '[ ].[ a-z]'::text)), '//'::text, '/'::text)
               ->  Seq Scan on tjk__tmp_hit
         ->  Hash
               ->  Redistribute Motion 3:3  (slice2; segments: 3)
                     Hash Key: tjk__page_lu.a
                     ->  Seq Scan on tjk__page_lu
 Optimizer status: legacy query optimizer
(11 rows)

Here, tjk__tmp_hit is the outer table and  tjk__page_lu is the inner table. Renaming the tables slightly I wrote a tool that dumps and analyzes the stack trace tor get this:

$ ./status.sh
26889 : seg0  : HJ Waiting on OUTER tuple. [B]
26890 : seg1  : HJ is building inner Hash Table & is waiting on more Inner tuples. [A]
26891 : seg2  : HJ Waiting on OUTER tuple. [B]
26892 : seg0  : Sending tuple waiting for ACK (outer_table) [C]
26893 : seg1  : Sending tuple waiting for ACK (outer_table) [C]
26895 : seg0  : Sending tuple waiting for ACK (inner_table) [D]
26896 : seg1  : Sending tuple waiting for ACK (inner_table) [D]



To recap Heikki’s terminology :
A - The process that already retrieved one outer tuple, and is now building the inner hash table - waiting on inner tuples.
B - The process(es) that are still waiting for an outer tuple.
C - The process(es) corresponding to the outer_table sending tuples to A but has its send buffers full.
D - The process(es) corresponding to the inner_table sendinf tuples to B but has its send buffers full.


The reason I was confused was because of some wrong assumptions I had about the Motion node. So, after spending some time looking at the code in more detail, here's what I understand:
  1. The Send side of the Motion node in each slice runs a single thread. For example, there is only one thread in process 26892, that retrieves tuples from the child SeqScan, then determines the destination segment to send it to, and finally add it to the send buffer for that destination.
  2. The send buffers collect a number of tuples at time (based on gp_max_packet_size GUC) before sending it on the interconnect. Motion also allocates up to gp_interconnect_snd_queue_depth such buffers per destination slice. Based on gp_interconnect_fc_method, these are either shared between all destination slices or individually allocated. The specifics here doesn’t matter much for deadlock, what matters is that there is a hard limit that we buffer (kind of obvious but I wanted to point it out anyway)
  3. When that hard limit is hit and Motion cannot get another buffer spot, it blocks waiting for an ACK from any of the destination processes. It does not matter if we allocated buffers per destination slice, once any of the buffers fill up we block, waiting for an ACK, since there is only one thread doing the sending.

One last key component in these deadlocks is the data itself. Having 2 motions underneath a Join doesn’t imply a deadlock if for example :
  1. The outer table (C) fits entirely in the buffer space.
  2. The outer table (C) has rows that can it delivers to all the Join slices (A and B) such that they can move on to building the hash table.
  3. The inner table (D) only has rows to send to (A), in which case the inner hash table is built, and (A) can start retrieving outer tuple rows, clearing the way for other HJ processes (B).
  4. To confirm this I instrumented Motion to print out the segment it sends each tuple for the example query. I see that the processes 26892 and 26893 (C) ONLY sent tuples to seg1 HJ (A) before blocking, which means (B) processes are starved waiting for their outer tuple. Process 26895 (D) only sent tuples to (B) before blocking, starving (A) of inner tuples. Thus C is waiting on A which is waiting on D which is waiting on B which is waiting on C.

Understanding that - here is a slightly simpler repro of the deadlock (we probably should add this ICG or some other such suite) :

create table l (i int, j int) distributed by (i);
create table r (i int, j int) distributed by (j);

insert into l select i, 2 from generate_series(1, 100000); 
insert into r select i, 1 from generate_series(1, 100000);

-- Add gp_dist_random() to ensure Redistribute under Motions.
select count(*) from gp_dist_random('l') left join gp_dist_random('r') on l.j + 1 = r.j;


As for the fix for this issue, Heikki covers the HJ case, so I thought I’d talk more about MJ and NLJ. In both cases, if both children of the Join has Motions, we set join.prefetch_inner = true and add a Materialize on the inner side with cdb_strict = true. So, when the join tries to retrieve an inner tuple, we completely materialize the inner table, breaking the deadlock loop. See create_nestloop_plan() and create_mergejoin_plan methods. Also see the plans generated for the query above with MJ and NLJ :

                                                         QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=225475.94..225475.95 rows=1 width=8)
   ->  Gather Motion 3:1  (slice3; segments: 3)  (cost=225475.87..225475.92 rows=1 width=8)
         ->  Aggregate  (cost=225475.87..225475.88 rows=1 width=8)
               ->  Merge Right Join  (cost=22938.69..200252.87 rows=3363066 width=0)
                     Merge Cond: r.j = (l.j + 1)
                     ->  Sort  (cost=11467.54..11718.61 rows=33477 width=4)
                           Sort Key: r.j
                           ->  Redistribute Motion 3:3  (slice1; segments: 3)  (cost=0.00..3123.90 rows=33477 width=4)
                                 Hash Key: r.j
                                 ->  Seq Scan on r  (cost=0.00..1115.30 rows=33477 width=4)
                     ->  Sort  (cost=11471.15..11722.30 rows=33487 width=4)
                           Sort Key: (l.j + 1)
                           ->  Result  (cost=0.00..3124.80 rows=33487 width=4)
                                 ->  Redistribute Motion 3:3  (slice2; segments: 3)  (cost=0.00..3124.80 rows=33487 width=4)
                                       Hash Key: l.j + 1
                                       ->  Seq Scan on l  (cost=0.00..1115.60 rows=33487 width=4)
 Settings:  enable_hashjoin=off; enable_mergejoin=on

 Optimizer status: legacy query optimizer
(18 rows)

shardikar=# explain select count(*) from l left join r on l.j + 1 = r.j;
                                                      QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=252261517.19..252261517.20 rows=1 width=8)
   ->  Gather Motion 3:1  (slice3; segments: 3)  (cost=252261517.13..252261517.18 rows=1 width=8)
         ->  Aggregate  (cost=252261517.13..252261517.14 rows=1 width=8)
               ->  Nested Loop Left Join  (cost=3224.33..252236294.13 rows=3363066 width=0)
                     Join Filter: (l.j + 1) = r.j
                     ->  Redistribute Motion 3:3  (slice1; segments: 3)  (cost=0.00..3124.80 rows=33487 width=4)
                           Hash Key: l.j + 1
                           ->  Seq Scan on l  (cost=0.00..1115.60 rows=33487 width=4)
                     ->  Materialize  (cost=3224.33..4228.63 rows=33477 width=4)
                           ->  Redistribute Motion 3:3  (slice2; segments: 3)  (cost=0.00..3123.90 rows=33477 width=4)
                                 Hash Key: r.j
                                 ->  Seq Scan on r  (cost=0.00..1115.30 rows=33477 width=4)
 Settings:  enable_hashjoin=off; enable_mergejoin=off

 Optimizer status: legacy query optimizer
(14 rows)


Finally, I have added the explanation as a comment in the deadlock detection function See PR: https://github.com/greenplum-db/gpdb/pull/1642.
--
Thanks,
Shreedhar

Heikki Linnakangas

unread,
Jan 27, 2017, 2:49:52 AM1/27/17
to Shreedhar A Hardikar, Gang, Greenplum Developers
Great, thanks Shreedhar! PR 1642 helps a lot.

I wonder if this is only an issue with joins. Are there no other
executor nodes that might fetch from two different children? I guess
not. An Append node will only fetch from one child at a time, and in order.

PostgreSQL 9.1 adds the Merge Append node, which I think will have this
problem. When we get there, will have to remember to handle this.
> 1. The Send side of the Motion node in each slice runs a single thread.
> For example, there is only one thread in process 26892, that retrieves
> tuples from the child SeqScan, then determines the destination segment to
> send it to, and finally add it to the send buffer for that destination.
> 2. The send buffers collect a number of tuples at time (based on
> gp_max_packet_size GUC) before sending it on the interconnect. Motion also
> allocates up to gp_interconnect_snd_queue_depth such buffers per
> destination slice. Based on gp_interconnect_fc_method, these are either
> shared between all destination slices or individually allocated. The
> specifics here doesn’t matter much for deadlock, what matters is that there
> is a hard limit that we buffer (kind of obvious but I wanted to point it
> out anyway)
> 3. When that hard limit is hit and Motion cannot get another buffer
> spot, it blocks waiting for an ACK from any of the destination processes.
> It does not matter if we allocated buffers per destination slice, once any
> of the buffers fill up we block, waiting for an ACK, since there is only
> one thread doing the sending.
>
>
> One last key component in these deadlocks is the data itself. Having 2
> motions underneath a Join doesn’t imply a deadlock if for example :
>
> 1. The outer table (C) fits entirely in the buffer space.
> 2. The outer table (C) has rows that can it delivers to all the Join
> slices (A and B) such that they can move on to building the hash table.
> 3. The inner table (D) only has rows to send to (A), in which case the
> inner hash table is built, and (A) can start retrieving outer tuple rows,
> clearing the way for other HJ processes (B).
> 4. To confirm this I instrumented Motion to print out the segment it
> sends each tuple for the example query. I see that the processes 26892 and
> 26893 (C) ONLY sent tuples to seg1 HJ (A) before blocking, which means (B)
> processes are starved waiting for their outer tuple. Process 26895 (D) only
> sent tuples to (B) before blocking, starving (A) of inner tuples. Thus C is
> waiting on A which is waiting on D which is waiting on B which is waiting
> on C.
>
>
> Understanding that - here is a slightly simpler repro of the deadlock (*we
> probably should add this ICG or some other such suite*) :
- Heikki
Reply all
Reply to author
Forward
0 new messages