Hi all,
Nowadays most of enterprise customers use multiply different database products to support their business. It’s very common to have several Greenplum clusters and hope to query across the Greenplum clusters or across other databases from greenplum. This comes GP2GP federation:
Reduce data redundancy across clusters so customers can save money
Efficiently query across Greenplum clusters to meet business needs (clusters maybe in different locations, with different minor versions)
Virtually increase Greenplum cluster size to more nodes
Basically, GP2GP is to implement foreign tables, which are segments to segments transferred and optimized query execution, access remote query data of other Greenplum clusters. The target Greenplum version is 6.X at first.
Below two options are both based on postgres_fdw, however we need to make it supporting MPP.
postgres_fdw is used to access data stored in external PostgreSQL servers https://www.postgresql.org/docs/9.5/static/postgres-fdw.html
(Thanks Adam Lee for summarizing this option)
The basic idea is to transfer foreign query results from foreign segments back to local cluster segments directly.
1, queries are sent to foreign segments via utility mode.
2, foreign segments share the same DTX (distributed transaction context) to have same MVCC visibility. The DTX bonds to a transaction we started on foreign master, no need to worry about vacuum.
Pros:
1, simple, straightforward.
2, compatible even with a different version cluster, DTX is both serialized and deserialized on the foreign cluster, other interfaces are libpq.
3, predicate pushdown is possible.
Cons:
1, utility mode doesn’t handle resource group.
2, future `UPDATE` might have deadlock issues (still investigating on this and the 2PC issue).
3, pg_hba.conf needs to be updated to allow segments direct connections.
4, ONLY support predicate push down.
The demo supporting single node clusters: https://github.com/adam8157/gpdb/tree/gp2gp
Interconnect motions across clusters. The reason not only includes making as less code changes as possible (which may introduce bugs, and also need much more efforts), but also avoiding extra redistribution motion and also balancing all segments' workloads automatically.
Below is the main workflow of this option, left side is the federation cluster, and right side is the foreign cluster:
The basic idea is to transfer foreign query result from foreign segments to local cluster segments directly, and don't need to change foreign query which is generated from postgres_fdw.
Three cases:
In most cases, the only operation on foreign master is just gather (i.e. gather motion) the foreign query results, and direct to the requester. In this way, we can redirect from foreign segments to local segments.
The more complex case is more operations need to be executed on foreign master, if possible, we can change local query plan to compensate operation on foreign master firstly.
The last case which cannot be parallel processed: some operator only can be executed on foreign master (e.g. some UDFs are defined only on foreign cluster, and only executed on master mode), these cases cannot be parallel processed even in single greenplum cluster.
Now we focus on the first case (below discussion only refer to this case); the second case is based on the first case implementation, and also it needs more investigation (e.g. which operator on foreign master can be supported? And how to compensate it on local segment?); the last case, we just skip it as original postgres_fdw.
Points after Prototype:
FDW can be put into any slice, and one slice may contain several different fdw scan nodes.
FDW remote query needs to be sent only once, now only on seg0, other segments only do motion receive.
At first we had thought to execute foreign remote query only on master (on entrydb), in this way, we need to support an new gang type to run on all segments with entry db, moreover it may cause local query plan changed because the flow type cross this new gang with other types of gang.
Dummy motions need to be added as child of FDW. In simple query like joining local table with foreign table, the parent node of FDW is also Redistribute Motion. I firstly had thought to reuse this node instead of add a new child node, but afterwards I realized that we cannot guarantee that in all scenarios FDW node has a motion typed direct parent node.
The side effect of this solution is: because all result data will be get from motion directly, and master query plan executor will drive the motion to fetch tuple directly, the remote statement "FETCH ... FROM CURSOR' called from postgresIterateForeignScan() --> create_cursor() will have no effect, it can not control the result row number any more, it just return 0 rows.
For simplicity, foreign table will be simply keep same distribution policy as it original table in foreign cluster(otherwise we need to add many paths for foreign scan in planner, and need to set cost for all these new added paths):
If hash function is different, we need to change foreign table distribution policy to distributed randomly.
If the segment numbers are the same in two clusters, gp2gp motion is N:N mapping explicit motion.
If the segment numbers are different, gp2gp motion is a redistribution motion.
Presumptions:
These two greenplum clusters should have same value of GUC: gp_interconnect_type, which could be udpifc or tcp, we should make them the same before running GP2GP federation queries.
Segments on two greenplum clusters can be communicated with each others.
This solution using motions to transfer data directly, so if the motion interaction protocol changed, or the transferred data format(e.g. MemTuple) changed, we only support 2 clusters have the same.
This prototype is not only to guarantee the option 2 solution works, but also to provide more details for discussion: whether the code changes for this solution is acceptable or not? And is there any other better solution or implementation?
The code of this prototype is:
https://github.com/liming01/gpdb-postgres-merge/commits/fdw_iteration_9
Main Steps:
Step 0: Cherry pick postgres_fdw code from PostgreSQL, make in run successfully. And make foreign scan run on all segments instead of on master.
Step 1: Add fdw dummy motion nodes in the plan tree. This dummy motion only work as motion receiver, and for simplicity now it adds new slice without gang.
Step 2: Pass local segments' listener info (e.g. ports) to foreign server, GUCs gp_fdw_plan_rewrite, gp_fdw_motion_recv_port1 gp_fdw_motion_recv_port2 gp_fdw_motion_recv_port3, all other param can be passed in same way(using GUClike param_string="param1,param2,..."). Is there any better way?
Step 3: Remote query "DECLARE CURSOR" will hung until setup gp2gp motion finished, while interconnect setup need receiver listen()/accept() firstly, then socket client can connect. So new thread is added for sending this remote query, and in the main thread call SetupInterconnect4FdwMotion() to listen() for socket server.
Step 4: Remote query using the value of GUC gp_fdw_motion_recv_portX, create more setupOutgoingConnection() and setup gp2gp motions.
Step 5: Remote server redirect data at doSendTuple() to specific connection of the same segid of local cluster.
Step 6: Local fdw motion receiver sets result TupleDesc the same as fdw scan, and returns the results to upper plan nodes.
TODOs:
Thinking TCP is much easier for bug tracing with tools like netstat, only TCP is tried in the demo. So please run "gpconfig -c Gp_interconnect_type -v tcp; gpstop -u" firstly. UDP type needs to be supported in future.
Now temporally only support same segment number of federation cluster and foreign cluster.
Only on same host of federation cluster and foreign cluster, because I don't pass hostname as GUC to foreign server.
Now just skip client verification at readRegisterMessage(), we can postpone verification until fetched foreign server motion sender info (rcListenerPort & srcPid) after "DECLARE CURSOR FOR" remote query returns. One problem here is how to sync these info from seg0 to other segments in local cluster?
At ic_tcp.c:1164, hard coded local slice id for this foreign scan to 1(at my test case). Need to get from outer function.
I didn't check remote query operation is only gather motion, only redirect at the direct child slice of master node at present.
Need error and cancel handling in thread code.
Need to disable gp2gp motion if remote query plan has more operators than gather motion, which only can be known after "DECLARE CURSOR FOR" remote query returned.
One known bug about foreign column Datum is pointer. e.g. column type is TEXT or CHAR(128).
Test Samples of this Prototype:
Step 1: Change interconnect type to TCP
gpconfig -c Gp_interconnect_type -v tcp; gpstop -u |
Step 2: Prepare tables on foreign cluster
create database fdb; |
Step 3: Create foreign tables on local cluster
create database testdb; |
Step 4: SQL for testing
testdb=# EXPLAIN select * from lt1; |
Any discussions/suggestions are welcomed, thanks a lot in advance.
P.S. Thanks for all Beijing colleagues who contributed to the solution and the ideas to overcome obstacles of this prototype, especially to Gang Xiong and Pengzhou Tang.
So as far as the A <-> B interface goes, this is pretty much what you're doing in your Solution Option 1. But note the following differences or clarifications:
* The "worker process" libpq connections from A to endpoints in B are not utility mode connections. Or they might be implemented that way, but that doesn't affect the interface, and System A doesn't know anything about that.
In Solution Option 1, you noted that this "ONLY supports predicate push down". But that's not a limitation of the interface. Behind the scenes, System B can still use an interconnect and as many extra helper process as part of the gang as needed. System A doesn't need know about any of that.
avoid extra redistribution motion and also balancing all segments' workloads automatically.
If we want to support it, it make the problem 3) more worse.
Pengzhou Tang, it did not changed from gpdb v4 to current version) or the transferred tuple data format (e.g. MemTuple) maybe changed in 2 gpdb clusters.
About the A <-> B interface, how about this:
Client sends the main query to QD via libpq, which will finish at the end (step 7)
`CREATE ENDPOINTS AS SELECT … OPTIONS …`
Options could be how many endpoints the client wants to connect, how does the client like data distributed...
Or Set a special GUC then `SELECT …`
QD generates a token, dispatches the query
QE processes execute the query, but not return the results to QD
QD returns the token and the involved postmasters’ info (host, port …), the involved postmaster might be only QD, in the aggregate case like `SELECT count(*) ...`
Client connects to these postmasters with a special option, endpoints on RETRIEVE mode are forked for client, client queries `RETRIEVE token` or `RETRIEVE n FROM token`
Endpoints should be connected via libpq with auth and encryption
QEs send the results to corresponding endpoints, then to client
Use a pipe to connect QE and endpoint, maybe
All data finished transfer, then main query finished, endpoints exit
The client could also be a Greenplum cluster, as the incoming GP2GP.
--
You received this message because you are subscribed to the Google Groups "Greenplum Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gpdb-dev+u...@greenplum.org.