CREATE OR REPLACE FUNCTION destage(IN staging_table TEXT) RETURNS void AS
$$
DECLARE r RECORD;
BEGIN
FOR r IN EXECUTE 'SELECT * FROM ' || staging_table
LOOP
execute format('INSERT INTO profile_segment (profile_id,segment_id,utc_timestamp)
VALUES (%s, %s, %s) on conflict do nothing', r.profile_id, r.segment_id, r.utc_timestamp);
END LOOP;
END;
$$
LANGUAGE plpgsql;
However, it is painfully slow. I've been loading 600 million rows for the past two hours with no signs of it stopping.What is the roadmap for supporting the more natural, and hopefully significantly more performant, insert and update mechanisms?
CREATE TABLE profile_segment (
profile_id text primary key,
segment_id text,
utc_timestamp text
);
SELECT master_create_distributed_table('profile_segment','profile_id','hash');
SELECT master_create_worker_shards('profile_segment',16,2);
CREATE TABLE staging_table ( LIKE profile_segment );
SELECT master_create_distributed_table('staging_table','profile_id','hash');
SELECT master_create_worker_shards('staging_table',16,2);
The shard placements of the staging_table will be co-located with the profile_segment table. Therefore I can put a trigger on each shard placement of the staging table that upserts into the profile_segment table.
CREATE OR REPLACE FUNCTION upsert_row() RETURNS trigger AS $function$
BEGIN
EXECUTE format($$
INSERT INTO %I VALUES(%L,%L,%L) ON CONFLICT DO NOTHING$$,
TG_ARGV[0],
NEW.profile_id,
NEW.segment_id,
NEW.utc_timestamp);
RETURN NULL;
END;
$function$ LANGUAGE plpgsql;
Finally, I can run the following monstrous command to find the co-located shard placements and put a trigger on the staging_table shards:
psql -c "SELECT p1.nodename, p1.nodeport, s1.logicalrelid::regclass||'_'||s1.shardid, s2.logicalrelid::regclass||'_'||s2.shardid FROM pg_dist_shard s1 JOIN pg_dist_shard s2 ON (s1.shardminvalue = s2.shardminvalue ) JOIN pg_dist_shard_placement p1 ON (s1.shardid = p1.shardid) WHERE s1.logicalrelid = 'profile_segment'::regclass AND s2.logicalrelid = 'staging_table'::regclass;" -tA -F" " | \
xargs -n 4 sh -c "psql -h \$0 -p \$1 -c \"CREATE TRIGGER ins_\$3 BEFORE INSERT ON \$3 FOR EACH ROW EXECUTE PROCEDURE upsert_row('\$2')\""
After that, any INSERT or COPY I do on the staging_table will do an UPSERT into the profile_segment table.
Ignoring the semantics of the fields:
postgres=# INSERT INTO staging_table VALUES ('hello','world','book');
INSERT 0 0
postgres=# INSERT INTO staging_table VALUES ('hello','world','book');
INSERT 0 0
postgres=# SELECT * FROM staging_table ;
profile_id | segment_id | utc_timestamp
------------+------------+---------------
(0 rows)
postgres=# SELECT * FROM profile_segment;
profile_id | segment_id | utc_timestamp
------------+------------+---------------
hello | world | book
(1 row)
Note that UPSERT is inherently much slower than INSERT or COPY since every upsert requires an index look-up and row-level locks. I got around 15k/s on my desktop.
Hope this helps,
Marco
Better INSERT/SELECT support is definitely high on our roadmap, but I haven't really thought through what it means for UPSERT.
It's worth noting that shard placements are just tables, so you could do whatever you want with them. Below is an example of adding a trigger to shard placements of a staging table that upserts into the main table.
Note that UPSERT is inherently much slower than INSERT or COPY since every upsert requires an index look-up and row-level locks. I got around 15k/s on my desktop.
* We tried to capture your use-case in issue #509. Could you look at this issue to see if it's accurate? I also had a clarification question on this. Do the staging tables need to be distributed for performance reasons?
The reason I have the staging table distributed is to allow parallel inserts. If we could do the same with a non-distributed table, that would be fine.