Solution & Prototype Review: gp2gp federation using postgres_fdw

482 views
Skip to first unread message

Ming Li

unread,
Aug 23, 2018, 4:52:48 AM8/23/18
to Greenplum Developers

Hi all,


Nowadays most of enterprise customers use multiply different database products to support their business. It’s very common to have several Greenplum clusters and hope to  query across the Greenplum clusters or across other databases from greenplum. This comes GP2GP federation:

  1. Reduce data redundancy across clusters so customers can save money

  2. Efficiently query across Greenplum clusters to meet business needs (clusters maybe in different locations, with different minor versions)

  3. Virtually increase Greenplum cluster size to more nodes


Basically, GP2GP is to implement foreign tables, which are segments to segments transferred and optimized query execution, access remote query data of other Greenplum clusters. The target Greenplum version is 6.X at first.


Below two options are both based on postgres_fdw, however we need to make it supporting MPP.

postgres_fdw is used to access data stored in external PostgreSQL servers https://www.postgresql.org/docs/9.5/static/postgres-fdw.html


Solution Option 1:

(Thanks Adam Lee for summarizing this option)

The basic idea is to transfer foreign query results from foreign segments back to local cluster segments directly.

1, queries are sent to foreign segments via utility mode.

2, foreign segments share the same DTX (distributed transaction context) to have same MVCC visibility. The DTX bonds to a transaction we started on foreign master, no need to worry about vacuum.


Pros:

1, simple, straightforward.

2, compatible even with a different version cluster, DTX is both serialized and deserialized on the foreign cluster, other interfaces are libpq.

3, predicate pushdown is possible.


Cons:

1, utility mode doesn’t handle resource group.

2, future `UPDATE` might have deadlock issues (still investigating on this and the 2PC issue).

3, pg_hba.conf needs to be updated to allow segments direct connections.

4, ONLY support predicate push down.


The demo supporting single node clusters: https://github.com/adam8157/gpdb/tree/gp2gp

Solution Option 2:


Interconnect motions across clusters. The reason not only includes making as less code changes as possible (which may introduce bugs, and also need much more efforts), but also avoiding extra redistribution motion and also balancing all segments' workloads automatically.


Below is the main workflow of this option, left side is the federation cluster, and right side is the foreign cluster:


The basic idea is to transfer foreign query result from foreign segments to local cluster segments directly, and don't need to change foreign query which is generated from postgres_fdw.


Three cases:

  • In most cases, the only operation on foreign master is just gather (i.e. gather motion) the foreign query results, and direct to the requester. In this way, we can redirect from foreign segments to local segments.

  • The more complex case is more operations need to be executed on foreign master, if possible, we can change local query plan to compensate operation on foreign master firstly.

  • The last case which cannot be parallel processed: some operator only can be executed on foreign master (e.g. some UDFs are defined only on foreign cluster, and only executed on master mode), these cases cannot be parallel processed even in single greenplum cluster.


Now we focus on the first case (below discussion only refer to this case); the second case is based on the first case implementation, and also it needs more investigation (e.g. which operator on foreign master can be supported? And how to compensate it on local segment?); the last case, we just skip it as original postgres_fdw.


Points after Prototype:

  • FDW can be put into any slice, and one slice may contain several different fdw scan nodes.

  • FDW remote query needs to be sent only once, now only on seg0, other segments only do motion receive.

  • At first we had thought to execute foreign remote query only on master (on entrydb),  in this way, we need to support an new gang type to run on all segments with entry db, moreover it may cause local query plan changed because the flow type cross this new gang with other types of gang.

  • Dummy motions need to be added as child of FDW. In simple query like joining local table with foreign table, the parent node of FDW is also Redistribute Motion. I firstly had thought to reuse this node instead of add a new child node, but afterwards I realized that we cannot guarantee that in all scenarios FDW node has a motion typed direct parent node.

  • The side effect of this solution is: because all result data will be get from motion directly, and master query plan executor will drive the motion to fetch tuple directly, the remote statement "FETCH ... FROM CURSOR' called from postgresIterateForeignScan() --> create_cursor() will have no effect, it can not control the result row number any more, it just return 0 rows.

  • For simplicity, foreign table will be simply keep same distribution policy as it original table in foreign cluster(otherwise we need to add many paths for foreign scan in planner, and need to set cost for all these new added paths):  

    • If hash function is different, we need to change foreign table distribution policy to distributed randomly.

    • If the segment numbers are the same in two clusters, gp2gp motion is N:N mapping explicit motion.

    • If the segment numbers are different, gp2gp motion is a redistribution motion.


Presumptions:

  • These two greenplum clusters should have same value of GUC: gp_interconnect_type, which could be udpifc or tcp, we should make them the same before running GP2GP federation queries.

  • Segments on two greenplum clusters can be communicated with each others.

  • This solution using motions to transfer data directly, so if the motion interaction protocol changed, or the transferred data format(e.g. MemTuple) changed, we only support 2 clusters have the same.


Prototype Details of Option 2:


This prototype is not only to guarantee the option 2 solution works, but also to provide more details for discussion: whether the code changes for this solution is acceptable or not?  And is there any other better solution or implementation?


The code of this prototype is:

https://github.com/liming01/gpdb-postgres-merge/commits/fdw_iteration_9


Main Steps:

  • Step 0: Cherry pick postgres_fdw code from PostgreSQL, make in run successfully. And make foreign scan run on all segments instead of on master.

  • Step 1: Add fdw dummy motion nodes in the plan tree. This dummy motion only work as motion receiver, and for simplicity now it adds new slice without gang.

  • Step 2: Pass local segments' listener info (e.g. ports) to foreign server, GUCs gp_fdw_plan_rewrite, gp_fdw_motion_recv_port1 gp_fdw_motion_recv_port2 gp_fdw_motion_recv_port3, all other param can be passed in same way(using GUClike param_string="param1,param2,..."). Is there any better way?

  • Step 3: Remote query "DECLARE CURSOR" will hung until setup gp2gp motion finished, while interconnect setup need receiver listen()/accept() firstly, then socket client can connect. So new thread is added for sending this remote query, and in the main thread call SetupInterconnect4FdwMotion() to listen() for socket server.

  • Step 4: Remote query using the value of GUC gp_fdw_motion_recv_portX, create more setupOutgoingConnection() and setup gp2gp motions.

  • Step 5: Remote server redirect data at doSendTuple() to specific connection of the same segid of local cluster.

  • Step 6: Local fdw motion receiver sets result TupleDesc the same as fdw scan, and returns the results to upper plan nodes.


TODOs:

  • Thinking TCP is much easier for bug tracing with tools like netstat, only TCP is tried in the demo. So please run "gpconfig -c Gp_interconnect_type -v tcp; gpstop -u" firstly. UDP type needs to be supported in future.

  • Now temporally only support same segment number of federation cluster and foreign cluster.

  • Only on same host of federation cluster and foreign cluster, because I don't pass hostname as GUC to foreign server.

  • Now just skip client verification at readRegisterMessage(), we can postpone verification until fetched foreign server motion sender info (rcListenerPort & srcPid) after "DECLARE CURSOR FOR" remote query returns. One problem here is how to sync these info from seg0 to other segments in local cluster?

  • At ic_tcp.c:1164, hard coded local slice id for this foreign scan to 1(at my test case). Need to get from outer function.

  • I didn't check remote query operation is only gather motion, only redirect at the direct child slice of master node at present.

  • Need error and cancel handling in thread code.

  • Need to disable gp2gp motion if remote query plan has more operators than gather motion, which only can be known after "DECLARE CURSOR FOR" remote query returned.

  • One known bug about foreign column Datum is pointer. e.g.  column type is TEXT or CHAR(128).


Test Samples of this Prototype:


Step 1: Change interconnect type to TCP

gpconfig -c Gp_interconnect_type -v tcp; gpstop -u


Step 2: Prepare tables on foreign cluster

create database fdb;
\c fdb;
CREATE TABLE t1 AS SELECT id, id+1 as c1, id*2 as c2 FROM generate_series(1,5) id;
CREATE TABLE t2 AS SELECT id, id+1 as c1, id*2 as c2 FROM generate_series(3,15) id;


Step 3: Create foreign tables on local cluster

create database testdb;
\c testdb;

CREATE EXTENSION postgres_fdw;
CREATE SERVER foreign_server
       FOREIGN DATA WRAPPER postgres_fdw
       OPTIONS (host 'localhost', port '15432', dbname 'fdb');
CREATE USER MAPPING FOR gpadmin
       SERVER foreign_server
       OPTIONS (user 'gpadmin', password 'password');
CREATE FOREIGN TABLE ft1 (id integer, c1 integer, c2 integer)
SERVER foreign_server
OPTIONS (schema_name 'public', table_name 't1');
CREATE FOREIGN TABLE ft2 (id integer, c1 integer, c2 integer)
SERVER foreign_server
OPTIONS (schema_name 'public', table_name 't2');

CREATE TABLE lt1 AS SELECT id, id+1 as c1, id*2 as c2 FROM generate_series(1,5) id;
CREATE TABLE lt2 AS SELECT id, id+1 as c1, id*2 as c2 FROM generate_series(3,15) id;


Step 4: SQL for testing

testdb=# EXPLAIN select * from lt1;
                                   QUERY PLAN
-----------------------------------------------------------------------------------
Gather Motion 3:1  (slice1; segments: 3)  (cost=0.00..879.00 rows=77900 width=12)
  -> Seq Scan on lt1  (cost=0.00..879.00 rows=25967 width=12)
Optimizer: legacy query optimizer
(3 rows)

testdb=# EXPLAIN select * from ft1;
                                         QUERY PLAN
----------------------------------------------------------------------------------------------
Gather Motion 3:1  (slice2; segments: 3)  (cost=100.00..383.06 rows=9102 width=12)
  -> Foreign Scan on ft1  (cost=100.00..383.06 rows=3034 width=12)
        -> Redistribute Motion 3:3  (slice1; segments: 3)  (cost=0.00..0.00 rows=0 width=0)
Optimizer: legacy query optimizer
(4 rows)

testdb=# select * from ft1;
id | c1 | c2
----+----+----
 3 |  4 |  6
 4 |  5 |  8
 5 |  6 | 10
 1 |  2 |  2
 2 |  3 |  4
(5 rows)

testdb=# explain select * from lt1, lt2 where lt1.id=lt2.id;
                                       QUERY PLAN
-------------------------------------------------------------------------------------------
Gather Motion 3:1  (slice1; segments: 3)  (cost=1852.75..822161.85 rows=6068410 width=24)
  -> Hash Join  (cost=1852.75..822161.85 rows=2022804 width=24)
        Hash Cond: lt1.id = lt2.id
        -> Seq Scan on lt1  (cost=0.00..879.00 rows=25967 width=12)
        -> Hash  (cost=879.00..879.00 rows=25967 width=12)
              -> Seq Scan on lt2  (cost=0.00..879.00 rows=25967 width=12)
Optimizer: legacy query optimizer
(7 rows)

testdb=# explain select * from lt1, ft2 where lt1.id=ft2.id;
                                                  QUERY PLAN
----------------------------------------------------------------------------------------------------------------
Gather Motion 3:1  (slice3; segments: 3)  (cost=565.10..8826.68 rows=709046 width=24)
  -> Hash Join  (cost=565.10..8826.68 rows=236349 width=24)
        Hash Cond: lt1.id = ft2.id
        -> Seq Scan on lt1  (cost=0.00..879.00 rows=25967 width=12)
        -> Hash  (cost=565.10..565.10 rows=3034 width=12)
              -> Redistribute Motion 3:3  (slice2; segments: 3)  (cost=100.00..565.10 rows=3034 width=12)
                    Hash Key: ft2.id
                    -> Foreign Scan on ft2  (cost=100.00..383.06 rows=3034 width=12)
                          -> Redistribute Motion 3:3  (slice1; segments: 3)  (cost=0.00..0.00 rows=0 width=0)
Optimizer: legacy query optimizer
(10 rows)

testdb=# select * from lt1, ft2 where lt1.id=ft2.id;
id | c1 | c2 | id | c1 | c2
----+----+----+----+----+----
 3 |  4 |  6 |  3 |  4 |  6
 4 |  5 |  8 |  4 |  5 |  8
 5 |  6 | 10 |  5 |  6 | 10
(3 rows)



Any discussions/suggestions are welcomed, thanks a lot in advance.


P.S. Thanks for all Beijing colleagues who contributed to the solution and the ideas to overcome obstacles of this prototype, especially to Gang Xiong and Pengzhou Tang.



Heikki Linnakangas

unread,
Aug 28, 2018, 3:50:04 AM8/28/18
to Ming Li, Greenplum Developers
Phew, there's a lot to chew here! This is going to be along response,
please bear with me :-)

About System A and System B
---------------------------

1. The communication between system A and system B should use the libpq
protocol. The interconnect protocol is a non-starter. Firstly, it's
designed specifically for a fast, dedicated, inter-node network, I
wouldn't trust it over a general-purpose network. It's not encrypted,
the bandwidth control is pretty ad hoc, etc. And it won't work across
different versions of GPDB. It might not even be compatible across
different minor versions.

2. I'd like to generalize the use case a little bit:

In this proposal, we've been talking about having two GPDB clusters.
However, I remember that in the past, we've also talked about
'gptransfer' and 'gpload', which have similar problems. If you want to
load data into a cluster as fast as possible, you also want to bypass
the master and feed the data directly to segments. In these cases,
"System A" is not a GPDB cluster, but 'gpload' or 'gptransfer'. Or it
could be some other data loading tool or even user application, that
wants to avoid the master node as a bottleneck.

Even if you don't actually care about gptransfer or gpload right now, I
think that mindset, that the client can be anything, will help clarify
the design for the GP2GP use case, too.

The interface between System A and System B therefore needs to be as
simple as possible. That is the key architectural decision here. So
let's drill down into that first:


System A <-> System B interface
-------------------------------

I'm envisioning that the System A <-> B interface works much like normal
querying. Master A connects to master B using libpq, authenticates
normally, and issues a query the usual way. However, after sending the
query, instead of getting the results back over the same connection,
Master B returns a list of "endpoints", i.e. IP address+ports, of where
A can get the results. System A now needs to connect to those endpoints,
using libpq, to get the result.


Here's how I envision the exchange to go:

Master A connects to Master B normally using libpq:

Master A -> B: Send query, e.g. "SELECT * FROM foo"
Master B -> A: Please connect to the following endpoints to get the
result: 10.0.0.1:40001, 10.0.0.2:40002, 10.0.0.3:40003. Use the
following query token to get the result: 0x95f433093c2e7218.

Master A spawns three worker processes (possibly in different segments,
but e.g. with gpload, it could be just three local processes or
threads). Each worker process connects to one of the endpoints.

Worker A -> Endpoint B: Send special query "RETRIEVE RESULT
0x95f433093c2e7218"
Endpoint B -> Worker A: Ok, here's the result set


Once all the query results have been read, the Master B finally sends a
ReadyForQuery response through the master connection, to indicate that
the query has finished.


I'm not sure how exactly to shoehorn those communications into the libpq
protocol. It should be done in a way that doesn't require changes to
libpq in System A, so that it can be used with unchanged PostgreSQL
libpq, or JDBC driver, etc.


System B
--------

So as far as the A <-> B interface goes, this is pretty much what you're
doing in your Solution Option 1. But note the following differences or
clarifications:

* The "worker process" libpq connections from A to endpoints in B are
not utility mode connections. Or they might be implemented that way, but
that doesn't affect the interface, and System A doesn't know anything
about that.

In Solution Option 1, you noted that this "ONLY supports predicate push
down". But that's not a limitation of the interface. Behind the scenes,
System B can still use an interconnect and as many extra helper process
as part of the gang as needed. System A doesn't need know about any of that.

* The worker processes in B will presumably be like segment processes in
a normal distributed query. The master process launches them to form a
gang, sends the Plan tree to the worker processes, and the worker
process executes the given plan. However, the worker process will also
need to accept the connection directly from System A, and deliver the
result through that connection. That sounds like a very invasive change:
the process needs to have two libpq connections open at the same time,
one from the master, and another directly from the client. I think
that's what you were trying to avoid by using utility-mode connections,
but then the backend can't be part of the interconnect, you can't easily
send the plan from the master to the backend, etc. Can we find a better
design for that, where the backend process can be part of a gang as
usual, but still send the result through an additional libpq connection
to the client?


System A
--------

If System A is gpload or gptransfer or something else entirely, we don't
need to go into the details of how it would be implemented right now.
But let's drill a bit deeper into the GP <-> GP case, where you use a FDW.

* The query plan in System A will contain a Foreign Table Scan. Since we
want the segments in A to access segments in B directly, the Foreign
Table Scan should be executed in A's segments, not only the master. But
the query needs to be dispatched from Master A to Master B first, before
the segment processes can connect and fetch the result.

So we need the Foreign Table Scan in the Master to do some actions
first. And the Foreign Table Scan in Master needs to deliver some
information (the endpoints and query token) to the Foreign Table Scans
in the segments, before they can proceed. The FDW and dispatcher APIs
probably need some hacking to implement that.

* As described so far, System A doesn't know anything about the data
distribution in System B. It just launches a bunch of processes, and
each of them connects to a random segment in B. In many cases, you'd
wish to exploit the data distribution, though. For example, if you're
copying a table from B to A, you'd want each segment in A to connect to
the corresponding segment in B, with the matching distribution. We can
add some information about the data distribution to the A <-> B
interface. System A can make use of it, if it wishes, but otherwise it
can ignore it.


Summary
-------

The A <-> B interface is the most important architectural decision here.
It should be simple, and it should use libpq. There's a lot of open
design questions in what happens inside System A and B, but if we agree
on the interface, the details within A and B can be handled independently.


- Heikki


On 23/08/18 11:52, Ming Li wrote:
> *
>
> Hi all,
>
>
> Nowadays most of enterprise customers use multiply different database
> products to support their business. It’s very common to have several
> Greenplum clusters and hope to  query across the Greenplum clusters or
> across other databases from greenplum. This comes GP2GP federation:
>
> 1.
>
> Reduce data redundancy across clusters so customers can save money
>
> 2.
>
> Efficiently query across Greenplum clusters to meet business needs
> (clusters maybe in different locations, with different minor versions)
>
> 3.
>
> Virtually increase Greenplum cluster size to more nodes
>
>
> Basically, GP2GP is to implement foreign tables, which are segments to
> segments transferred and optimized query execution, access remote query
> data of other Greenplum clusters. The target Greenplum version is 6.X at
> first.
>
>
> Below two options are both based on postgres_fdw, however we need to
> make it supporting MPP.
>
> postgres_fdw is used to access data stored in external PostgreSQL
> servers https://www.postgresql.org/docs/9.5/static/postgres-fdw.html
> <https://www.postgresql.org/docs/9.5/static/postgres-fdw.html>
>
>
> Solution Option 1:
>
> <https://www.draw.io/#G1Grbz9JRhWuLZb2rAghk2OTzzIUoNc5oJ>
> The basic idea is to transfer foreign query result from foreign segments
> to local cluster segments directly, and don't need to change foreign
> query which is generated from postgres_fdw.
>
>
> Three cases:
>
> *
>
> In most cases, the only operation on foreign master is just gather
> (i.e. gather motion) the foreign query results, and direct to the
> requester. In this way, we can redirect from foreign segments to
> local segments.
>
> *
>
> The more complex case is more operations need to be executed on
> foreign master, if possible, we can change local query plan to
> compensate operation on foreign master firstly.
>
> *
>
> The last case which cannot be parallel processed: some operator only
> can be executed on foreign master (e.g. some UDFs are defined only
> on foreign cluster, and only executed on master mode), these cases
> cannot beparallel processed even in single greenplum cluster.
>
>
> Now we focus on the first case (below discussion only refer to this
> case); the second case is based on the first case implementation, and
> also it needs more investigation (e.g. which operator on foreign master
> can be supported? And how to compensate it on local segment?); the last
> case, we just skip it as original postgres_fdw.
>
>
> Points after Prototype:
>
> *
>
> FDW can be put into any slice, and one slice may contain several
> different fdw scan nodes.
>
> *
>
> FDW remote query needs to be sent only once, now only on seg0, other
> segments only do motion receive.
>
> *
>
> At first we had thought to execute foreign remote query only on
> master (on entrydb),  in this way, we need to support an new gang
> type to run on all segments with entry db, moreover it may cause
> local query plan changed because the flow type cross this new gang
> with other types of gang.
>
> *
>
> Dummy motions need to be added as child of FDW. In simple query like
> joining local table with foreign table, the parent node of FDW is
> also Redistribute Motion. I firstly had thought to reuse this node
> instead of add a new child node, but afterwards I realized that we
> cannot guarantee that in all scenarios FDW node has a motion typed
> direct parent node.
>
> *
>
> The side effect of this solution is: because all result data will be
> get from motion directly, and master query plan executor will drive
> the motion to fetch tuple directly, the remote statement "FETCH ...
> FROM CURSOR' called from postgresIterateForeignScan() -->
> create_cursor() will have no effect, it can not control the result
> row number any more, it just return 0 rows.
>
> *
>
> For simplicity, foreign table will be simply keep same distribution
> policy as it original table in foreign cluster(otherwise we need to
> add many paths for foreign scan in planner, and need to set cost for
> all these new added paths):
>
> o
>
> If hash function is different, we need to change foreign table
> distribution policy to distributed randomly.
>
> o
>
> If the segment numbers are the same in two clusters, gp2gp
> motion is N:N mapping explicit motion.
>
> o
>
> If the segment numbers are different, gp2gp motion is a
> redistribution motion.
>
>
> Presumptions:
>
> *
>
> These two greenplum clusters should have same value of GUC:
> gp_interconnect_type, which could be udpifc or tcp, we should make
> them the same before running GP2GP federation queries.
>
> *
>
> Segments on two greenplum clusters can be communicated with each others.
>
> *
>
> This solution using motions to transfer data directly, so if the
> motion interaction protocol changed, or the transferred data
> format(e.g. MemTuple) changed, we only support 2 clusters have the same.
>
>
> Prototype Details of Option 2:
>
>
> This prototype is not only to guarantee the option 2 solution works, but
> also to provide more details for discussion: whether the code changes
> for this solution is acceptable or not?  And is there any other better
> solution or implementation?
>
>
> The code of this prototype is:
>
> https://github.com/liming01/gpdb-postgres-merge/commits/fdw_iteration_9
> <https://github.com/liming01/gpdb-postgres-merge/commits/fdw_iteration_9>
>
>
> Main Steps:
>
> *
>
> Step 0: Cherry pick postgres_fdw code from PostgreSQL, make in run
> successfully. And make foreign scan run on all segments instead of
> on master.
>
> *
>
> Step 1: Add fdw dummy motion nodes in the plan tree. This dummy
> motion only work as motion receiver, and for simplicity now it adds
> new slice without gang.
>
> *
>
> Step 2:Pass local segments' listener info (e.g. ports) to foreign
> server, GUCs gp_fdw_plan_rewrite, gp_fdw_motion_recv_port1
> gp_fdw_motion_recv_port2 gp_fdw_motion_recv_port3, all other param
> can be passed in same way(using GUClike
> param_string="param1,param2,..."). Is there any better way?
>
> *
>
> Step 3:Remote query "DECLARE CURSOR" will hung until setup gp2gp
> motion finished, while interconnect setup need receiver
> listen()/accept() firstly, then socket client can connect. So new
> thread is addedfor sending this remote query, and in the main thread
> call SetupInterconnect4FdwMotion() to listen() for socket server.
>
> *
>
> Step 4:Remote query using the value of GUC gp_fdw_motion_recv_portX,
> create more setupOutgoingConnection() and setup gp2gp motions.
>
> *
>
> Step 5:Remote server redirect data at doSendTuple() to specific
> connection of the same segid of local cluster.
>
> *
>
> Step 6:Local fdw motion receiver sets result TupleDesc the same as
> fdw scan, and returns the results to upper plan nodes.
>
>
> TODOs:
>
> *
>
> Thinking TCP is much easier for bug tracing with tools like netstat,
> only TCP is tried in the demo. So please run "gpconfig -c
> Gp_interconnect_type -v tcp; gpstop -u" firstly. UDP type needs to
> be supported in future.
>
> *
>
> Now temporally only support same segment number of federation
> cluster and foreign cluster.
>
> *
>
> Only on same host of federation cluster and foreign cluster, because
> I don't pass hostname as GUC to foreign server.
>
> *
>
> Now just skip client verification at readRegisterMessage(), we can
> postpone verification until fetched foreign server motion sender
> info (rcListenerPort & srcPid) after "DECLARE CURSOR FOR" remote
> query returns.One problem hereis how to sync these info from seg0 to
> other segments in local cluster?
>
> *
>
> At ic_tcp.c:1164, hard coded local slice id for this foreign scan to
> 1(at my test case). Need to get from outer function.
>
> *
>
> I didn't check remote query operation is only gather motion, only
> redirect at the direct child slice of master node at present.
>
> *
>
> Need error and cancel handlingin thread code.
>
> *
>
> Need to disable gp2gp motion if remote query plan has more operators
> than gather motion, which only can be known after "DECLARE CURSOR
> FOR" remote query returned.
>
> *
>
> One known bug about foreign column Datum is pointer. e.g.  column
> type is TEXT or CHAR(128).
>
>
> Test Samples of this Prototype:
>
>
> Step 1: Change interconnect type to TCP
>
> gpconfig-c Gp_interconnect_type -v tcp; gpstop-u
>
>
> Step 2: Prepare tables on foreign cluster
>
> createdatabasefdb;
> \c fdb;
> CREATETABLEt1 ASSELECTid, id+1asc1, id*2asc2 FROMgenerate_series(1,5) id;
> CREATETABLEt2 ASSELECTid, id+1asc1, id*2asc2 FROMgenerate_series(3,15) id;
>
>
> Step 3: Create foreign tables on local cluster
>
> createdatabasetestdb;
> \c testdb;
>
> CREATEEXTENSION postgres_fdw;
> CREATESERVERforeign_server
>        FOREIGN DATAWRAPPER postgres_fdw
>        OPTIONS (host 'localhost', port '15432', dbname 'fdb');
> CREATEUSERMAPPINGFORgpadmin
> SERVERforeign_server
>        OPTIONS (user'gpadmin', password'password');
> CREATEFOREIGN TABLEft1 (idinteger, c1 integer, c2 integer)
> SERVERforeign_server
> OPTIONS (schema_name 'public', table_name 't1');
> CREATEFOREIGN TABLEft2 (idinteger, c1 integer, c2 integer)
> SERVERforeign_server
> OPTIONS (schema_name 'public', table_name 't2');
>
> CREATETABLElt1 ASSELECTid, id+1asc1, id*2asc2 FROMgenerate_series(1,5) id;
> CREATETABLElt2 ASSELECTid, id+1asc1, id*2asc2 FROMgenerate_series(3,15) id;
> <http://lt1.id>=lt2.id <http://lt2.id>;
> QUERYPLAN
> -------------------------------------------------------------------------------------------
> GatherMotion3:1 (slice1; segments: 3)
>  (cost=1852.75..822161.85rows=6068410width=24)
>   -> HashJoin (cost=1852.75..822161.85rows=2022804width=24)
> HashCond: lt1.id <http://lt1.id> = lt2.id <http://lt2.id>
>         -> SeqScanon lt1  (cost=0.00..879.00rows=25967width=12)
>         -> Hash (cost=879.00..879.00rows=25967width=12)
>               -> SeqScanon lt2  (cost=0.00..879.00rows=25967width=12)
> Optimizer: legacy query optimizer
> (7rows)
>
> testdb=# explain select * from lt1, ft2 wherelt1.id
> <http://lt1.id>=ft2.id <http://ft2.id>;
> QUERYPLAN
> ----------------------------------------------------------------------------------------------------------------
> GatherMotion3:1 (slice3; segments: 3)
>  (cost=565.10..8826.68rows=709046width=24)
>   -> HashJoin (cost=565.10..8826.68rows=236349width=24)
> HashCond: lt1.id <http://lt1.id> = ft2.id <http://ft2.id>
>         -> SeqScanon lt1  (cost=0.00..879.00rows=25967width=12)
>         -> Hash (cost=565.10..565.10rows=3034width=12)
>               -> RedistributeMotion3:3 (slice2; segments: 3)
>  (cost=100.00..565.10rows=3034width=12)
> HashKey: ft2.id <http://ft2.id>
>                     -> ForeignScanon ft2
>  (cost=100.00..383.06rows=3034width=12)
>                           -> RedistributeMotion3:3 (slice1; segments:
> 3)  (cost=0.00..0.00rows=0width=0)
> Optimizer: legacy query optimizer
> (10rows)
>
> testdb=# select * from lt1, ft2 wherelt1.id <http://lt1.id>=ft2.id
> <http://ft2.id>;
> id | c1 | c2 | id | c1 | c2
> ----+----+----+----+----+----
> 3| 4| 6| 3| 4| 6
> 4| 5| 8| 4| 5| 8
> 5| 6| 10| 5| 6| 10
> (3rows)
>
>
>
> Any discussions/suggestions are welcomed, thanks a lot in advance.
>
>
> P.S. Thanks for all Beijing colleagues who contributed to the solution
> and the ideas to overcome obstacles of this prototype, especially to
> Gang Xiong and Pengzhou Tang.
>
> *
>


--

- Heikki

Daniel Gustafsson

unread,
Aug 28, 2018, 4:51:49 AM8/28/18
to Heikki Linnakangas, Ming Li, Greenplum Developers
> On 28 Aug 2018, at 09:49, Heikki Linnakangas <hlinna...@pivotal.io> wrote:

> The A <-> B interface is the most important architectural decision here. It should be simple, and it should use libpq.

+1, completely agree. Further, we should make it easy to set up with encrypted
channels as we must assume that we are travelling across a hostile network.

cheers ./daniel

Hubert Zhang

unread,
Aug 28, 2018, 9:20:57 AM8/28/18
to Heikki Linnakangas, Ming Li, Greenplum Developers
Heikki wrote:
So as far as the A <-> B interface goes, this is pretty much what you're doing in your Solution Option 1. But note the following differences or clarifications:

* The "worker process" libpq connections from A to endpoints in B are not utility mode connections. Or they might be implemented that way, but that doesn't affect the interface, and System A doesn't know anything about that.
Endpoint in B is still a postgres process on segment, right? GPDB Segment currently only support utility mode connection: psql: FATAL:  System was started in master-only utility mode - only utility mode connections are allowed. I think that's why Option1 considering utility mode. I guess utility mode limitation is introduced by security reason. 


In Solution Option 1, you noted that this "ONLY supports predicate push down". But that's not a limitation of the interface. Behind the scenes, System B can still use an interconnect and as many extra helper process as part of the gang as needed. System A doesn't need know about any of that.

How does System B use interconnect? Support query (select * from a,b,c) where table b and c are located on System B. We could suppose b and c are randomly distributed, the join need to use interconnect to shuffle data. Could you explain more on how does Worker1 in System A get the join result of b and c on endpoint1?(e.g. how does special query "RETRIEVE RESULT 0x95f433093c2e7218" works?

--Hubert


--
Thanks

Hubert Zhang

Heikki Linnakangas

unread,
Aug 29, 2018, 2:29:56 AM8/29/18
to Hubert Zhang, Ming Li, Greenplum Developers
On 28/08/18 16:20, Hubert Zhang wrote:
> Heikki wrote:
>
>> So as far as the A <-> B interface goes, this is pretty much what you're
>> doing in your Solution Option 1. But note the following differences or
>> clarifications:
>>
>> * The "worker process" libpq connections from A to endpoints in B are not
>> utility mode connections. Or they might be implemented that way, but that
>> doesn't affect the interface, and System A doesn't know anything about that.
>
> Endpoint in B is still a postgres process on segment, right?

Yeah.

> GPDB Segment
> currently only support utility mode connection: psql: FATAL: System was
> started in master-only utility mode - only utility mode connections are
> allowed. I think that's why Option1 considering utility mode. I guess utility
> mode limitation is introduced by security reason.

Sure, that's how it works today. That will need to be changed as part of
this project.

>> In Solution Option 1, you noted that this "ONLY supports predicate push
>> down". But that's not a limitation of the interface. Behind the scenes,
>> System B can still use an interconnect and as many extra helper process as
>> part of the gang as needed. System A doesn't need know about any of that.
>
> How does System B use interconnect? Support query (select * from a,b,c)
> where table b and c are located on System B. We could suppose b and c are
> randomly distributed, the join need to use interconnect to shuffle data.
> Could you explain more on how does Worker1 in System A get the join result
> of b and c on endpoint1?(e.g. how does special query "RETRIEVE RESULT
> 0x95f433093c2e7218" works?)

The interconnect in System B works the same as in regular queries. The
master process dispatch the query plan to the segment backends, and the
backends start up the interconnect connections and Motion nodes as usual.

Attached is a diagram that shows all the connections between processes
in System B.

In a normal query, each segment worker contains a Motion node that sends
the result back to the dispatcher (or to another QE process). But not
always: in an "INSERT ... SELECT" command for example, the resulting
rows are inserted to the table, instead. Similarly in "CREATE TABLE AS
SELECT ..." commands. And in "COPY (SELECT ...) TO ON SEGMENT", the
resulting rows are written to a file or external process (although COPY
processing is a bit different).

In this case, each segment sends the query result to the "endpoint libpq
connection", instead [*]. The endpoint connection is the extra libpq
connection that a System A worker process has established. So this is
very similar to e.g. a "INSERT ... SELECT" command, but instead of
writing the resulting rows to a table, they are sent to the extra libpq
connection.

The special "RETRIEVE RESULT" command is used in establishing the
endpoint connection. The token identifies the query result that the
client wishes to retrieve. (Or we could use a one-time username&password
combination, or a new connection option, instead.)

Did that help?


[*] I'm not wedded to the name "endpoint connection". Perhaps "result
set retrieving connection" or "partial result connection" would be better?

- Heikki
gp2gp-connections.png

Hubert Zhang

unread,
Aug 29, 2018, 8:47:46 AM8/29/18
to Heikki Linnakangas, Ming Li, Greenplum Developers
Thanks Heikki.
I got your points. So write gang on System B will hold two libpq connections, One for QD(to receive fdw query plan), the other for QE in System A.

--
Thanks

Hubert Zhang

Ming Li

unread,
Aug 30, 2018, 7:03:34 AM8/30/18
to Heikki Linnakangas, Greenplum Developers, Hubert Zhang
Hi Heikki,

Thanks for your great comments.

You show us the global picture how to make gpdb not only as cluster A to federate all other types of data sources, but also to make gpdb as cluster B to easily be accessed by other tools which can fetch query result in parallel. It brought up the blueprint of making gpdb be easily used/integrated by other parallel tools/softwares, however still we have many technical points need more discussion:

1) Which node to send remote query in cluster A: 
  • If master send remote query:
    • If running at init plan node phrase, maybe the logic changes a lot (original in specific slice running), and error report may have problem (I just guess, didn't try it). 
    • If running in the master process of the gang with the slice during plan node execution, then we need to support new gang type: N+1 gangs (i.e. all segments + master entry db), and also need to change query plan dispatcher, query slice execution on master entry db, maybe flow type also need to be changed. Another problem is how to make all segments waiting for master to run firstly? And how to pass the fetched query token to segments during query execution instead of query dispatching?
  • If seg0 send remote query:
    • How to pass remote query token to other segments on cluster A during plan node execution, there is no connection between segments.
2) 2 steps for remote query ( select query + retrieve result ) in cluster B:
  • How to differentiate it from the normal query? Should we introduce new SQL for this kinds of special query?
  • "Fetch n" statement can not work any more, how to let outer tools know about it? How this functionality can be implemented (thinking that outer tools may need it)? How to coordinated fetch number with segment retrieve tuple number?
  • Now the query token should be fetched after the remote query "DECLARE CURSOR" returned, so we should return it firstly to libpq client, even if some error occurs in the next steps, should report error at  "RETRIEVING DATA" or at  "FETCH" in this case? Moreover if the out tool only send some requests of the "RETRIEVING DATA", i.e. now other segments are still waiting for outer tool to connect, but some segments already feed the result data to libpq client. Should we set a timeout and let other libpq connections waiting for this connection to connect? These need a lot of code changes
3)  2 libpq connections for one single postgres process in cluster B( 1 for QD of cluster B, the other for QE of cluster A ): 
  • Now libpq code share a lot of global variables, hard to support 2 connections in one single process, 2 connection need 2 runtime, how to make these 2 runtime running in isolated env?
  • Mixed with the control message and data message processing, how to cancel one and notify 2 libpq connections during the other are date receiving?
  • One option is to create one dedicated postgres process for retrieving query data libpq request, this process can use pipe to get the query data from the specific QE process. However if we want to support like the M:N gp2gp redistributed motion, we should create M postgres processes on cluster B segment, so the burden is too heavy for too many extra processes created just for these data retrieving connections (maybe process/socket/FD limitation of OS).
4) About M:N redistribution gp2gp motion between cluster A and cluster B:
  • During implement the gpcopy project, because now COPY ... ON SEGMENT don't support data redistribution, we need to think a lot of ways to meet the demand of data load balance on all segments, write many code to process all cases about: M>N, M<N, M=N. It made us headache, there is no simple way to 

    avoid extra redistribution motion and also balancing all segments' workloads automatically.

  • If we want to support it, it make the problem 3) more worse.

5) About libpq security in cluster B:
  • Now the backend postgres process don't support SSL and authentication. 
  • Now the pg_hba on segments only grant the master node to access segment instance (also only in utility mode supported), if we want to loose the setting of pg_hba on segment ( need to tell user to change this config before enable gp2db feature) and the utility mode restriction, it will introduce a great security vulnerability. Note that the gp2gp motion only make query results vulnerable, but the libpq connection make all operations including query execution vulnerable ( including execute  "COPY ... ON PROGRAM '...' " to write file to OS).

All these problems need to be solved before this solution works.


Idea for Discussion:

I have an idea which need more discussion. I think your solution contains 2 parts: 
  1. gp2gp motion need to be replaced by libpq, thinking that the motion protocol maybe changed (confirmed with 

    Pengzhou Tang, it did not changed from gpdb v4 to current version) or the transferred tuple data format (e.g. MemTuple) maybe changed in 2 gpdb clusters.

  2. make gpdb works great in cluster A and cluster B scenario.
However as for the gp2gp project original scope, it only concern about cluster A scenario, we can integrated with any implementation of cluster B. Also we don't think more query types as you pointed out (e.g. "CREATE TABLE AS SELECT ..." or  "COPY (SELECT ...) TO ON SEGMENT") because postgres_fdw cannot support them at present

So can we just make gp2gp project focus on the part 1 at first?  And the part 2 can be implemented in the next project (need PM helps), with one specific outer tools to serviced as requester at the same project?

There is one possible solution which maybe a little simpler for the part 1:
  • Hack the libpq connection to use tcp connection which is already created before at the gp2gp motion setup phrase in the prototype.
  • Refactor the libpq code, so that we easily call the code in libpq about below functionalities:  change binary tuple to the format that libpq support, send result to libpq client with specific message types, maybe encryption/authentication also needed?
  • Replace the implementation of gp2gp motion in the prototype by calling these changed code in libpq.
In this way, if libpq version updated, we just simply update the libpq client version to new version, (thinking that new version libpq client always support old version gpdb server), then the gp2gp can easily support different version gpdb versions.

Appreciate your notice, and waiting for your reply. Many thanks.

Ming.

Heikki Linnakangas

unread,
Aug 30, 2018, 8:13:55 AM8/30/18
to Ming Li, Greenplum Developers, Hubert Zhang
On 30/08/18 14:03, Ming Li wrote:
> Hi Heikki,
>
> Thanks for your great comments.
>
> You show us the global picture how to make gpdb not only as cluster A to
> federate all other types of data sources, but also to make gpdb as
> cluster B to easily be accessed by other tools which can fetch query
> result in parallel. It brought up the blueprint of making gpdb be easily
> used/integrated by other parallel tools/softwares, however still we have
> many technical points need more discussion:

Yeah, no matter how you do it, there's a lot of little details and a lot
of programming involved...

> 1) Which node to send remote query in cluster A:
>
> * If master send remote query:
> o If running at init plan node phrase, maybe the logic changes a
> lot (original in specific slice running), and error report may
> have problem (I just guess, didn't try it).
> o If running in the master process of the gang with the slice
> during plan node execution, then we need to support new gang
> type: N+1 gangs (i.e. all segments + master entry db), and also
> need to change query plan dispatcher, query slice execution on
> master entry db, maybe flow type also need to be changed.
> Another problem is how to make all segments waiting for master
> to run firstly? And how to pass the fetched query token to
> segments during query execution instead of query dispatching?
> * If seg0 send remote query:
> o How to pass remote query token to other segments on cluster A
> during plan node execution, there is no connection between segments.

Yeah. You'll face these questions with any of the proposals that we've
discussed. It seems simplest to me to send the query in the init plan
phase. Fortunately, that won't affect the A <-> B protocol, System B
doesn't need to know about that.

> 2) 2 steps for remote query ( select query + retrieve result ) in cluster B:
>
> * How to differentiate it from the normal query? Should we introduce
> new SQL for this kinds of special query?

Yeah, something like that. Or maybe a GUC that you set before sending
the query. "SET gp_multi_process_fetch = on" or something.

> * "Fetch n" statement can not work any more, how to let outer tools
> know about it? How this functionality can be implemented (thinking
> that outer tools may need it)? How to coordinated fetch number with
> segment retrieve tuple number?

In order to use this new interface, the application needs to be heavily
modified anyway.

The application may read as many rows as it wishes from each connection.
However, if there are e.g. Redistribute Motions in the plan, one backend
might not get any more rows, until the client reads all the in-flight
rows from another connection. That seems OK, but it needs to be
documented. If you bother using an interface like this, that is
specifically designed for high throughput, presumably you want to read
all the data, and as fast as possible.

If you actually have a "LIMIT 10" in the query, you'll get a plan with a
Gather Motion node, to enforce the limit, and then another Motion to
disperse the rows again. Similar to a CREATE TABLE AS:

postgres=# explain (costs off) create table bar as select * from foo
LIMIT 10 distributed randomly;
QUERY PLAN
------------------------------------------------------
Redistribute Motion 1:3 (slice2; segments: 1)
-> Limit
-> Gather Motion 3:1 (slice1; segments: 3)
-> Limit
-> Seq Scan on foo
Optimizer: legacy query optimizer
(6 rows)

It doesn't make much sense to use this new interface for such queries,
as just reading the result through the master connection would probably
perform just as well.

> * Now the query token should be fetched after the remote query
> "DECLARE CURSOR" returned, so we should return it firstly to libpq
> client, even if some error occurs in the next steps, should report
> error at "RETRIEVING DATA" or at "FETCH" in this case? Moreover if
> the out tool only send some requests of the "RETRIEVING DATA", i.e.
> now other segments are still waiting for outer tool to connect, but
> some segments already feed the result data to libpq client. Should
> we set a timeout and let other libpq connections waiting for this
> connection to connect? These need a lot of code changes.

It's probably easiest to do it similarly to how it currently works: If
an error occurs in a segment, it's returned to the dispatcher, which
forwards it to the client.

> 3)  2 libpq connections for one single postgres process in cluster B( 1
> for QD of cluster B, the other for QE of cluster A ):
>
> * Now libpq code share a lot of global variables, hard to support 2
> connections in one single process, 2 connection need 2 runtime, how
> to make these 2 runtime running in isolated env?
> * Mixed with the control message and data message processing, how to
> cancel one and notify 2 libpq connections during the other are date
> receiving?
> * *One option* is to create one dedicated postgres process for
> retrieving query data libpq request, this process can use pipe to
> get the query data from the specific QE process. However if we want
> to support like the M:N gp2gp redistributed motion, we should create
> M postgres processes on cluster B segment, so the burden is too
> heavy for too many extra processes created just for these data
> retrieving connections (maybe process/socket/FD limitation of OS).

I don't think you'll hit OS limitations. If you have one extra backend
in each segment, to handle the extra connection from system A, you'll
have at most twice as many processes and connections as you normally
would. That doesn't sound too bad.

Yeah, a separate backend process to handle these new connections seems
like a good solution. It adds some overhead, because then you need to
transfer all the rows from the segment worker process to the separate
backend process. But it solves a lot of problems.

> 4) About M:N redistribution gp2gp motion between cluster A and cluster B:
>
> * During implement the gpcopy project, because now COPY ... ON SEGMENT
> don't support data redistribution, we need to think a lot of ways to
> meet the demand of data load balance on all segments, write many
> code to process all cases about: M>N, M<N, M=N. It made us headache,
> there is no simple way to
>
> avoid extra redistribution motion and also balancing all segments'
> workloads automatically.
>
> *
>
> If we want to support it, it make the problem 3) more worse.
>
> 5) About libpq security in cluster B:
>
> * Now the backend postgres process don't support SSL and authentication.

We have all the code for SSL and authentication, we just need to enable
it in segments, too.

> * Now the pg_hba on segments only grant the master node to access
> segment instance (also only in utility mode supported), if we want
> to loose the setting of pg_hba on segment ( need to tell user to
> change this config before enable gp2db feature) and the utility mode
> restriction, it will introduce a great security vulnerability. Note
> that the gp2gp motion only make query results vulnerable, but the
> libpq connection make all operations including query execution
> vulnerable ( including execute  "COPY ... ON PROGRAM '...' " to
> write file to OS).

I don't see a problem here. You can lock down the libpq connections, so
that they only accept the special "RETRIEVE RESULT" command, and nothing
else.

Authentication needs some thought, though. Do you require the client to
use the same credentials it used with the master? Or perhaps assign a
new temporary username & password for each result, that works for that
query and nothing else? But those are all details that can be worked
out. (For contrast, with the interconnect, there would be no
authentication whatsoever.)

> All these problems need to be solved before this solution works.

Yeah. The important thing at this stage is to define the A <-> B
interface. All of those problems need to be solved somehow, but they
don't affect the overall protocol. If we make bad choices, we can change
the details without breaking the protocol, at least not too badly.

> *_Idea for Discussion:_*
> *_
> _*
> I have an idea which need more discussion. I think your solution
> contains 2 parts:
>
> 1. gp2gp motion need to be replaced by libpq, thinking that the motion
> protocol maybe changed (confirmed with *
>
> Pengzhou Tang, it did not changed from gpdb v4 to current version)
> or the transferred tuple data format (e.g. MemTuple) maybe changed
> in 2 gpdb clusters.
>
> *
> 2. make gpdb works great in cluster A and cluster B scenario.
>
> However as for the gp2gp project original scope, it only concern about
> cluster A scenario, we can integrated with any implementation of cluster
> B. Also we don't think more query types as you pointed out (e.g. "CREATE
> TABLE ASSELECT ..." or  "COPY (SELECT ...) TO ON SEGMENT") because
> postgres_fdw cannot support them at present.
>
> So can we just make gp2gp project focus on the part 1 at first?  And the
> part 2 can be implemented in the next project (need PM helps), with one
> specific outer tools to serviced as requester at the same project?
>
> There is one possible solution which maybe a little simpler for the part 1:
>
> * Hack the libpq connection to use tcp connection which is already
> created before at the gp2gp motion setup phrase in the prototype.
> * Refactor the libpq code, so that we easily call the code in libpq
> about below functionalities:  change binary tuple to the format that
> libpq support, send result to libpq client with specific message
> types, maybe encryption/authentication also needed?
> * Replace the implementation of gp2gp motion in the prototype by
> calling these changed code in libpq.
>
> In this way, if libpq version updated, we just simply update the libpq
> client version to new version, (thinking that new version libpq client
> always support old version gpdb server), then the gp2gp can easily
> support different version gpdb versions.

Sorry, I didn't quite understand this proposal.

I am strongly against using the interconnect between two different GPDB
cluster, though, if that's what you were suggesting. Again, getting the
A <-> B interface right is the important decision here. The details
within both systems can be changed later.

A possible roadmap would be to implement a stand-alone multi-process
client application first, that uses the new interface. It could, for
example, run a SQL query, and write the results to N files. With that,
you could hone the details of the protocol, and the implementation
within System B to support it, ignoring System A problems for now. Once
that's working, figure out how to make it work with postgres_fdw as the
client.

- Heikki

Yandong Yao

unread,
Aug 31, 2018, 3:44:49 AM8/31/18
to Heikki Linnakangas, Ming Li, Greenplum Developers, Hubert Zhang
If we want to deliver something workable in 6.0, how about option A?  

('Utility mode' in option A is misleading, it should be a libpq connection with credentials/tokens from master.)

We could do more innovation work around this area post 6.0.
--
Best Regards,
Yandong

Heikki Linnakangas

unread,
Aug 31, 2018, 3:48:31 AM8/31/18
to Yandong Yao, Ming Li, Greenplum Developers, Hubert Zhang
On 31/08/18 10:44, Yandong Yao wrote:
> If we want to deliver something workable in 6.0, how about option A?
>
> ('Utility mode' in option A is misleading, it should be a libpq
> connection with credentials/tokens from master.)
>
> We could do more innovation work around this area post 6.0.

I'm not entirely sure what you mean by "libpq connection with
credentials/tokens from master". But the important thing is the A <-> B
interface. As long as you get that right, we can indeed iterate on the
details within both systems after 6.0.

- Heikki

Yandong Yao

unread,
Aug 31, 2018, 6:45:16 AM8/31/18
to Heikki Linnakangas, Ming Li, Greenplum Developers, Hubert Zhang
Here is the flow:

cluster A's master send a pg_export_snapshot() like query to cluster B's master over libpq, and get 1) DTX info as libpq Result instead of store exported snapshot on master; 2) any other private info needed for potential authentication.

cluster A's master dispatch query to A's segment with whatever info retrieved from cluster B's master.

cluster A's segment connect to B's segment over libpq connection, using libpq authentication mechanism, plus DTX and other private info. The effects are cluster B's segment treat the libpq connection from cluster A's segment the same as from its own master.

It is harder for join pushdown, while easier to get it up and running for customer who needs more efficient communication than dblink.
--
Best Regards,
Yandong

Adam Lee

unread,
Sep 6, 2018, 5:46:08 AM9/6/18
to Heikki Linnakangas, Yandong Yao, Ming Li, Greenplum Developers, Hubert Zhang

About the A <-> B interface, how about this:


  1. Client sends the main query to QD via libpq, which will finish at the end (step 7)

    1. `CREATE ENDPOINTS AS SELECT … OPTIONS …`

      1. Options could be how many endpoints the client wants to connect, how does the client like data distributed...

    2. Or Set a special GUC then `SELECT …`


  1. QD generates a token, dispatches the query


  1. QE processes execute the query, but not return the results to QD


  1. QD returns the token and the involved postmasters’ info (host, port …), the involved postmaster might be only QD, in the aggregate case like `SELECT count(*) ...`


  1. Client connects to these postmasters with a special option, endpoints on RETRIEVE mode are forked for client, client queries `RETRIEVE token` or `RETRIEVE n FROM token`

    1. Endpoints should be connected via libpq with auth and encryption


  1. QEs send the results to corresponding endpoints, then to client

    1. Use a pipe to connect QE and endpoint, maybe


  1. All data finished transfer, then main query finished, endpoints exit


The client could also be a Greenplum cluster, as the incoming GP2GP.


Ming Li

unread,
Sep 12, 2018, 5:00:11 AM9/12/18
to Heikki Linnakangas, Adam Lee, Yandong Yao, Greenplum Developers, Hubert Zhang
Hi Heikki,

As for the IPC mechanism between QE backend and RETRIEVE mode backend, we have some choices:
  • Using shared memory to transfer data. However shared memory is critical resource which is not proper for transferring data between only 2 processes.
  • Using named pipe, which can be accessible by other process from OS file API. This way has such similar problem to the problem we met in customer environment about gpfdist works with pipe to external table: other tools e.g. virus detector will scan all files, which will read some contents of the pipe, so that the data stream is intercepted, and the pipe reader will report error. We tried to limit the reader number to 1, but there is no way to do it, finally we just offer unsatisfied solution: using flock() to limit reader which would call flock() before read data stream. Do you have any better way about this problem?
  • Using un-named pipe, which need to be created in parent-child processes, so maybe we an firstly create the un-named pipe in postmaster process, however the cons of this way is: the code spreads at many places, which make the code hard to read and maintained.
What's your preference? Many thanks for your helps!

Heikki Linnakangas

unread,
Sep 12, 2018, 5:34:31 AM9/12/18
to Ming Li, Adam Lee, Yandong Yao, Greenplum Developers, Hubert Zhang
On 12/09/18 11:59, Ming Li wrote:
> Hi Heikki,
>
> As for the IPC mechanism between QE backend and RETRIEVE mode backend,
> we have some choices:
>
> * Using shared memory to transfer data. However shared memory is
> critical resource which is not proper for transferring data between
> only 2 processes.
> * Using named pipe, which can be accessible by other process from OS
> file API. This way has such similar problem to theproblem we met in
> customer environment about gpfdist works with pipe to external
> table: other tools e.g. virus detector will scan all files, which
> will read some contents of the pipe, so that the data stream is
> intercepted, and the pipe reader will report error. We tried to
> limit the reader number to 1, but there is no way to do it, finally
> we just offer unsatisfied solution: using flock() to limit reader
> which would call flock() before read data stream. Do you have any
> better way about this problem?
> * Using un-named pipe, which need to be created in parent-child
> processes, so maybe we an firstly create the un-named pipe in
> postmaster process, however the cons of this way is: the code
> spreads at many places, which make the code hard to read and maintained.
>
> What's your preference? Many thanks for your helps!
I think I'd prefer using shared memory. In PostgreSQL (9.6 and above),
that's how parallel workers and the parent communicate with each other.
See the shared memory tuple queue facility, in
src/backend/executor/tqueue.c and execParallel.c

- Heikki

Ming Li

unread,
Sep 12, 2018, 5:47:30 AM9/12/18
to Heikki Linnakangas, Adam Lee, Yandong Yao, Greenplum Developers, Hubert Zhang
Great. Many thanks! ~_~

Max Yang

unread,
Sep 12, 2018, 9:41:53 PM9/12/18
to Heikki Linnakangas, Ming Li, Adam Lee, Yandong Yao, Greenplum Developers, Hubert Zhang
Since the code is in PostgreSQL (9.6 and above),  but it is in near future. Is it fine if we back port this dependency and be careful to make merge life easy when back porting?
--
Best Regards,
Max

Heikki Linnakangas

unread,
Sep 13, 2018, 1:42:44 PM9/13/18
to Max Yang, Ming Li, Adam Lee, Yandong Yao, Greenplum Developers, Hubert Zhang
On 13/09/18 04:41, Max Yang wrote:
> Since the code is in PostgreSQL (9.6 and above),  but it is in near
> future. Is it fine if we back port this dependency and be careful to
> make merge life easy when back porting?

Yeah, makes sense. It was developed over a couple of releases, IIRC
dynamic shared memory was added in 9.5, and the shmem queues depend on
that. So you might need to backport a bunch of things to get it working.
But still seems better than developing something from scratch.

- Heikki

Robert Eckhardt

unread,
Sep 13, 2018, 1:45:51 PM9/13/18
to Heikki Linnakangas, Max Yang, Ming Li, Adam Lee, Yandong Yao, Greenplum Developers, Hubert Zhang
Should we wait for 9.3 and at least get the full background worker
infra in first?

-- Rob

>
> - Heikki
>
>

Heikki Linnakangas

unread,
Sep 13, 2018, 3:09:38 PM9/13/18
to Robert Eckhardt, Max Yang, Ming Li, Adam Lee, Yandong Yao, Greenplum Developers, Hubert Zhang
I don't think we have any use for background workers here.

- Heikki

Adam Lee

unread,
Jan 7, 2019, 5:25:11 AM1/7/19
to Greenplum Developers
Hi,

Our team is working on a feature called Greenplum to Greenplum (gp2gp),
which is an extension that provides the ability to run queries across
multiple Greenplum clusters.

We have a basic implementation now, to get more design feed back and
track the status, a new wiki page is updated here:
https://github.com/greenplum-db/gpdb/wiki/Greenplum-to-Greenplum

How do you think? Comments would help a lot, thanks.

--
Adam Lee

chris

unread,
Jan 8, 2019, 10:45:38 PM1/8/19
to Adam Lee, Greenplum Developers
Hi Adam,

I walked through all the gp2gp feature, I think it is a talent feature. Firstly thanks for yours’ hard work.

I have a scene : In some companies or organizations(especially governments), the two gp cluster may be only connected by master instance according to some security reason. How to handle this is difficult because the segments instance was only accessed in their own cluster. Do you have plan for it now?

Sincerely,
Thanks,

--
You received this message because you are subscribed to the Google Groups "Greenplum Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gpdb-dev+u...@greenplum.org.

Adam Lee

unread,
Jan 9, 2019, 1:22:56 AM1/9/19
to chris, Greenplum Developers
On Wed, Jan 09, 2019 at 11:45:30AM +0800, chris wrote:
> Hi Adam,
>
> I walked through all the gp2gp feature, I think it is a talent feature. Firstly
> thanks for yours’ hard work.
>
> I have a scene : In some companies or organizations(especially governments),
> the two gp cluster may be only connected by master instance according to some
> security reason. How to handle this is difficult because the segments instance
> was only accessed in their own cluster. Do you have plan for it now?

They could simply fall back to postgres_fdw.

We may provide an option, let the gp2gp fall back to postgres_fdw via
`alter table` instead of recreating the table of another server and
wrapper.

--
Adam Lee

Ming Li

unread,
Jan 9, 2019, 4:51:45 AM1/9/19
to chris, Adam Lee, Greenplum Developers
Hi Chris,

I have already thought about it.

If the segments in local gpdb cluster can not directly connect to any segment node of the foreign gpdb cluster because network setting, it means only the master node in foreign gpdb cluster can be accessed, we can not do any optimization.

DONE:
The foreign table has a option named mpp_execute, it can be set to:
 - 'all segments': 
  • Fetch remote query result in parallel retrieved mode. 
  • It need segments in local gpdb cluster can connect to segments in foreign gpdb cluster.
 - 'any':  
  • Fetch remote query result only from the master node of foreign gpdb server, but the execution node in local gpdb cluster can be master or any segments (which node will be chosen depends on the query planner) 
  • It need segments in local gpdb cluster can connect to segments in foreign gpdb cluster.
 - 'master': 
  • Fetch remote query result only from the master node of foreign gpdb server, but the execution node in local gpdb cluster is already the master node.
  • It need segments in local gpdb cluster can connect to segments in foreign gpdb cluster.
TODO:
Foreign server also need to support the option 'mpp_execute', and if foreign table don't have this option, it will get the value of the foreign server option 'mpp_execute'. So in most cases, if the network setting changed,  we just need to change the value of the foreign server option 'mpp_execute', and do not need to alter all foreign tables.

Robert Eckhardt

unread,
Jan 11, 2019, 2:43:03 AM1/11/19
to Ming Li, chris, Adam Lee, Greenplum Developers

Generally speaking I'd be careful about referring to this as a Federated query engine. That is exactly what it is but the use of those words causes customers to have a set of expectations that I'm not sure we initially want to meet. 

I'm going to extract a few chunks from the wiki to comment on them here. 

> The extension would be based on postgres_fdw, we haven't decided yet whether fork or expand, the syntax is not finalized either

I'm pretty opposed to both of these. Fundamentally if we fork we don't get any updates from the FDW code in Postgres and this has been a pretty actively developed part of the code. 

It would be nice if we could wrap something around the API. That wrapper could contain the logic to do the m-->n mapping of different sized clusters as well as controlling which process actively talks to the foreign server. 

It seems like what is missing is any discussion of differently sized clusters, i.e. 40 nodes querying 44 nodes. I don't think that should be part of the MVP per se but having somePoC quality code to convince yourself how the abstraction would work is probably needed so we don't lose the problem while focusing on a specific solution. 

-- Rob

Heikki Linnakangas

unread,
Jan 11, 2019, 3:16:33 AM1/11/19
to Adam Lee, Greenplum Developers
On 07/01/2019 12:25, Adam Lee wrote:
> Hi,
>
> Our team is working on a feature called Greenplum to Greenplum (gp2gp),
> which is an extension that provides the ability to run queries across
> multiple Greenplum clusters.
>
> We have a basic implementation now, to get more design feed back and
> track the status, a new wiki page is updated here:
> https://github.com/greenplum-db/gpdb/wiki/Greenplum-to-Greenplum

At a quick glance, that follows what we discussed on the gpdb-dev thread
earlier. The high-level architecture and workflow seems fine.

I'm not wedded on all the new syntax, tying this to the DECLARE CURSOR
command seems like a bad idea, and I don't understand why we need a new
RETRIEVE command when it seems to do exactly the same as FETCH (if we
didn't use DECLARE CURSOR, though, then we'd probably need a separate
command). Cursors are really complicated beasts, especially in GPDB, so
I'm afraid if we treat these like cursors, we'll make the cursor code
even more complicated, and we'll making the new code for this also more
complicated, just to make them behave like cursors.

There's no consideration of security and authentication in the wiki yet.

Once we get to actually implement this thing, I'm sure there will be a
lot of little issues to resolve. For example, when you do the DECLARE
PARALLEL CURSOR call in the BeginForeignScan() call in the QD, you'll
need some way to pass the information about the end points to the QEs.
InitPlan() currently happens before dispatching to the segments, which
is good, but there is no place to put plan-node-specific information
that would be included in the dispatched command, currently.

I'd like to hear more on how the inter-process communication in the
"Remote Greenplum" side is going to work. From the pictures, I gather
that you'll have two kinds of QEs running, "Retrieve" QEs, and "normal"
QEs, and they will communicate somehow. Will that be over the
interconnect, or directly via shared memory, or? What will the EXPLAIN
output look like for this? Will there be new kind of Motion node at the
top, or?

- Heikki

Adam Lee

unread,
Jan 11, 2019, 4:17:54 AM1/11/19
to Robert Eckhardt, Ming Li, chris, Greenplum Developers
On Fri, Jan 11, 2019 at 03:42:49PM +0800, Robert Eckhardt wrote:
>
> Generally speaking I'd be careful about referring to this as a Federated query
> engine. That is exactly what it is but the use of those words causes customers
> to have a set of expectations that I'm not sure we initially want to meet. 

Good point.

> It seems like what is missing is any discussion of differently sized clusters,
> i.e. 40 nodes querying 44 nodes. I don't think that should be part of the MVP
> per se but having somePoC quality code to convince yourself how the abstraction
> would work is probably needed so we don't lose the problem while focusing on a
> specific solution. 

44 nodes querying 40 nodes is simple, just skip 4 nodes, sometimes a 44
nodes cluster could have results only on several endpoints with where
clause, the POC works.

40 nodes querying 44 nodes is totally different, we are spiking flexible
gang, having 44 QEs on the 40 nodes cluster, basic select works, complex
query doesn't yet.

Thanks for your reply, we will rethink it.

--
Adam Lee

Adam Lee

unread,
Jan 11, 2019, 4:38:51 AM1/11/19
to Heikki Linnakangas, Greenplum Developers
On Fri, Jan 11, 2019 at 10:16:28AM +0200, Heikki Linnakangas wrote:
> ...
> At a quick glance, that follows what we discussed on the gpdb-dev thread
> earlier. The high-level architecture and workflow seems fine.
>
> I'm not wedded on all the new syntax, tying this to the DECLARE CURSOR
> command seems like a bad idea, and I don't understand why we need a new
> RETRIEVE command when it seems to do exactly the same as FETCH (if we didn't
> use DECLARE CURSOR, though, then we'd probably need a separate command).
> Cursors are really complicated beasts, especially in GPDB, so I'm afraid if
> we treat these like cursors, we'll make the cursor code even more
> complicated, and we'll making the new code for this also more complicated,
> just to make them behave like cursors.

The reason of RETRIEVE is to have a specific one for PARALLEL CURSOR,
which we want to live the whole working window via EXECUTE PARALLEL
CURSOR to monitor and handle error.

> There's no consideration of security and authentication in the wiki yet.

Now we allow retrieve role with any random password, it could only run
RETRIEVE, with correct token. Probably OK?

> Once we get to actually implement this thing, I'm sure there will be a lot
> of little issues to resolve. For example, when you do the DECLARE PARALLEL
> CURSOR call in the BeginForeignScan() call in the QD, you'll need some way
> to pass the information about the end points to the QEs. InitPlan()
> currently happens before dispatching to the segments, which is good, but
> there is no place to put plan-node-specific information that would be
> included in the dispatched command, currently.

Actually `foreign_scan->fdw_private` does exactly what we want, and the
developing branch works.

> I'd like to hear more on how the inter-process communication in the "Remote
> Greenplum" side is going to work. From the pictures, I gather that you'll
> have two kinds of QEs running, "Retrieve" QEs, and "normal" QEs, and they
> will communicate somehow. Will that be over the interconnect, or directly
> via shared memory, or? What will the EXPLAIN output look like for this? Will
> there be new kind of Motion node at the top, or?

Currently it's named FIFO. We spiked the shared memory, shm_mq, however
it's combined a little too much with parallel scan.

Thanks for the reply, will update the wiki and rethink the syntax.

--
Adam Lee

Adam Lee

unread,
Jan 11, 2019, 4:43:18 AM1/11/19
to Heikki Linnakangas, Greenplum Developers
On Fri, Jan 11, 2019 at 05:38:44PM +0800, Adam Lee wrote:
> ...
> > There's no consideration of security and authentication in the wiki yet.
>
> Now we allow retrieve role with any random password, it could only run
> RETRIEVE, with correct token. Probably OK?

Oops, the valid password would be any valid token in system.

--
Adam Lee

Divya Bhargov

unread,
Jan 12, 2019, 9:58:49 AM1/12/19
to Adam Lee, Heikki Linnakangas, Greenplum Developers
Hi Adam,

Is there a plan to whitelist the operations when connecting to a different greenplum cluster to "SELECT" operation ? If postgres_fdw would support UPDATE and DELETE operations, would we want gp2gp to ever be able to update or delete data from the remote cluster ? And if so, is there any risk to think about ?

regards,
Divya

chris

unread,
Jan 22, 2019, 9:20:35 PM1/22/19
to Ming Li, Adam Lee, Greenplum Developers
Hi Ming,

That sounds great~~

Thanks for your kind reply,

Adam Lee

unread,
Feb 1, 2019, 8:00:48 AM2/1/19
to Heikki Linnakangas, Greenplum Developers
On Fri, Jan 11, 2019 at 10:16:28AM +0200, Heikki Linnakangas wrote:
> At a quick glance, that follows what we discussed on the gpdb-dev thread
> earlier. The high-level architecture and workflow seems fine.
>
> I'm not wedded on all the new syntax, tying this to the DECLARE CURSOR
> command seems like a bad idea, and I don't understand why we need a new
> RETRIEVE command when it seems to do exactly the same as FETCH (if we didn't
> use DECLARE CURSOR, though, then we'd probably need a separate command).
> Cursors are really complicated beasts, especially in GPDB, so I'm afraid if
> we treat these like cursors, we'll make the cursor code even more
> complicated, and we'll making the new code for this also more complicated,
> just to make them behave like cursors.

Hi, Heikki

Based on above we have few options.

1) Use CURSOR and get benefit of code reuse

We actually don't change the cursor code a lot, other than the syntax
codes, only choose the MULTY_QUERY strategy, set a flag, and dispatch
the token info. portalcmds.c only has one chunk of code added.

2) Create a new command for this in the portalcmds.c with dedicated
codes

3) Some other approach?

Could you please confirm you prefer the second option?

--
Adam Lee
Reply all
Reply to author
Forward
0 new messages