UPSERT from staging tables

229 views
Skip to first unread message

Patrick

unread,
May 4, 2016, 4:40:06 PM5/4/16
to citus-users
I am loading data into staging tables using COPY.  When those are loaded, I need to insert or update the data into my main table, then delete the data from the staging tables.  All staging tables and the main table are distributed using hash partitioning.

My first attempt was simple SQL:

UPDATE profile_segment main
SET utc_timestamp = staging.utc_timestamp
FROM profile_segment_staging_000 staging
WHERE staging.segment_id = main.segment_id
AND staging.profile_id = main.profile_id
AND staging.utc_timestamp > main.utc_timestamp;

INSERT INTO profile_segment (profile_id,segment_id,utc_timestamp)
SELECT profile_id, segment_id, utc_timestamp
FROM profile_segment_staging_000
ON CONFLICT DO NOTHING;

(I was planning on combining these once I had each working.)

The first failed with this:
ERROR:  cannot perform distributed planning for the given modification
DETAIL:  Joins are not supported in distributed modifications.

The second failed with this:
ERROR:  cannot perform distributed planning for the given modifications
DETAIL:  Subqueries are not supported in distributed modifications.

That's a big limitation.  I then tried writing a function to accomplish the same thing:

CREATE OR REPLACE FUNCTION destage(IN staging_table TEXT) RETURNS void AS
$$
DECLARE r RECORD;
BEGIN
  FOR r IN EXECUTE 'SELECT * FROM ' || staging_table
  LOOP
    INSERT INTO profile_segment (profile_id,segment_id,utc_timestamp)
    VALUES (r.profile_id, r.segment_id, r.utc_timestamp)
    ON CONFLICT DO NOTHING;
  END LOOP;
END;
$$
LANGUAGE plpgsql;

This ran for several minutes and failed with:

ERROR:  cannot plan sharded modification containing values which are not constants or constant expressions

I'm stuck here.  This is a simple use case that shouldn't be so hard.

Patrick

Samay Sharma

unread,
May 4, 2016, 8:04:26 PM5/4/16
to citus-users
Hey Patrick,

The reason you are getting this error is because the INSERT command has variables from the SELECT query which aren't constants (or constant expressions).

I think wrapping the INSERT command into an EXECUTE statement should solve the issue.

Does something like this resolve the error?

CREATE OR REPLACE FUNCTION destage(IN staging_table TEXT) RETURNS void AS                    

$$                                                                                           

DECLARE r RECORD;                                                                            

BEGIN                                                                                        

    FOR r IN EXECUTE 'SELECT * FROM ' || staging_table                                        

        LOOP                                                                                  

            execute format('INSERT INTO profile_segment (profile_id,segment_id,utc_timestamp)        

                VALUES (%s, %s, %s) on conflict do nothing', r.profile_id, r.segment_id, r.utc_timestamp);

        END LOOP;                                                                             

    END;                                                                                      

$$                                                                                           

LANGUAGE plpgsql;


Regards,
Samay

Patrick

unread,
May 5, 2016, 11:40:03 AM5/5/16
to citus-users
Samay,

With some judicious quote escaping your suggestion works:

CREATE OR REPLACE FUNCTION destage(IN staging_table TEXT) RETURNS void AS
$$
DECLARE r RECORD;
BEGIN
  FOR r IN EXECUTE 'SELECT * FROM ' || staging_table
  LOOP
    EXECUTE
      format('INSERT INTO profile_segment (profile_id,segment_id,utc_timestamp)
              VALUES (''%s'',''%s'',%s)
              ON CONFLICT DO NOTHING',
              r.profile_id,r.segment_id,r.utc_timestamp);
  END LOOP;
END;
$$
LANGUAGE plpgsql;

However, it is painfully slow.  I've been loading 600 million rows for the past two hours with no signs of it stopping.

What is the roadmap for supporting the more natural, and hopefully significantly more performant, insert and update mechanisms?

Thanks,

Patrick

Marco Slot

unread,
May 6, 2016, 8:24:44 AM5/6/16
to citus-users
On Thursday, May 5, 2016 at 5:40:03 PM UTC+2, Patrick wrote:
However, it is painfully slow.  I've been loading 600 million rows for the past two hours with no signs of it stopping.

What is the roadmap for supporting the more natural, and hopefully significantly more performant, insert and update mechanisms?

Better INSERT/SELECT support is definitely high on our roadmap, but I haven't really thought through what it means for UPSERT.

It's worth noting that shard placements are just tables, so you could do whatever you want with them. Below is an example of adding a trigger to shard placements of a staging table that upserts into the main table.

Not knowing the details, I used the following table schema:

CREATE TABLE profile_segment (

    profile_id text primary key,

    segment_id text,

    utc_timestamp text

);

SELECT master_create_distributed_table('profile_segment','profile_id','hash');

SELECT master_create_worker_shards('profile_segment',16,2);


CREATE TABLE staging_table ( LIKE profile_segment );

SELECT master_create_distributed_table('staging_table','profile_id','hash');

SELECT master_create_worker_shards('staging_table',16,2);


The shard placements of the staging_table will be co-located with the profile_segment table. Therefore I can put a trigger on each shard placement of the staging table that upserts into the profile_segment table.


CREATE OR REPLACE FUNCTION upsert_row() RETURNS trigger AS $function$

BEGIN

    EXECUTE format($$

                                INSERT INTO %I VALUES(%L,%L,%L) ON CONFLICT DO NOTHING$$,

                                TG_ARGV[0],

                                NEW.profile_id,

                                NEW.segment_id,

                                NEW.utc_timestamp);

    RETURN NULL;

END;

$function$ LANGUAGE plpgsql;


Finally, I can run the following monstrous command to find the co-located shard placements and put a trigger on the staging_table shards:


psql -c "SELECT p1.nodename, p1.nodeport, s1.logicalrelid::regclass||'_'||s1.shardid, s2.logicalrelid::regclass||'_'||s2.shardid FROM pg_dist_shard s1 JOIN pg_dist_shard s2 ON (s1.shardminvalue = s2.shardminvalue ) JOIN pg_dist_shard_placement p1 ON (s1.shardid = p1.shardid) WHERE s1.logicalrelid = 'profile_segment'::regclass AND s2.logicalrelid = 'staging_table'::regclass;" -tA -F" " | \

xargs -n 4 sh -c "psql -h \$0 -p \$1 -c \"CREATE TRIGGER ins_\$3 BEFORE INSERT ON \$3 FOR EACH ROW EXECUTE PROCEDURE upsert_row('\$2')\""


After that, any INSERT or COPY I do on the staging_table will do an UPSERT into the profile_segment table.


Ignoring the semantics of the fields:


postgres=# INSERT INTO staging_table VALUES ('hello','world','book');

INSERT 0 0 

postgres=# INSERT INTO staging_table VALUES ('hello','world','book');

INSERT 0 0 

postgres=# SELECT * FROM staging_table ;

 profile_id | segment_id | utc_timestamp

------------+------------+---------------

(0 rows)   


postgres=# SELECT * FROM profile_segment;

 profile_id | segment_id | utc_timestamp

------------+------------+---------------

 hello      | world      | book

(1 row)


Note that UPSERT is inherently much slower than INSERT or COPY since every upsert requires an index look-up and row-level locks. I got around 15k/s on my desktop.


Hope this helps,

Marco

Patrick

unread,
May 6, 2016, 10:37:21 AM5/6/16
to citus-users
Marco,

Thanks for the reply.  Comments inline.


Better INSERT/SELECT support is definitely high on our roadmap, but I haven't really thought through what it means for UPSERT.

Do you have a timeline for when standard INSERT and UPDATE commands will work as expected?
 
It's worth noting that shard placements are just tables, so you could do whatever you want with them. Below is an example of adding a trigger to shard placements of a staging table that upserts into the main table.

That's an interesting approach.  It looks like it will work in the short term, but I can see it becoming a maintenance nightmare.

If we had distributed INSERT and UPDATE, that would address most of our use cases and make any necessary workaround significantly easier.
 
Note that UPSERT is inherently much slower than INSERT or COPY since every upsert requires an index look-up and row-level locks. I got around 15k/s on my desktop.

The way I have UPSERT implemented uses the ON CONFLICT clause of the INSERT statement, so I'm not sure how it's significantly slower than INSERT alone, at least when the INSERT succeeds.

In any case, as I noted above, I could work around lack of support for UPSERT if INSERT and UPDATE worked transparently.  Any information you can provide about when that will be the case would be helpful.

Thanks again for the response.

Regards,

Patrick

oz...@citusdata.com

unread,
May 7, 2016, 2:59:19 PM5/7/16
to citus-users
Hi Patrick,

We have a few issues open in our Github repository on INSERT (SELECT ...); and I also opened a new one with the information in this thread.

* Issue #508 relates to the user running INSERT INTO (SELECT ...) to roll-up data from a distributed table into another one. The primary use case here is distributed materialized views for the analytical use-case. We may prioritize this issue for our v5.2 release.
* Issue #212 relates to the user running INSERT INTO (SELECT ...) to stage data from a local table into a distributed one. The primary use case here is fast/easy ingest of data into the cluster.
* We tried to capture your use-case in issue #509. Could you look at this issue to see if it's accurate? I also had a clarification question on this. Do the staging tables need to be distributed for performance reasons?

Best,
Ozgun

Patrick

unread,
May 10, 2016, 10:29:50 AM5/10/16
to citus-users, oz...@citusdata.com
On Saturday, May 7, 2016 at 2:59:19 PM UTC-4, oz...@citusdata.com wrote:
* We tried to capture your use-case in issue #509. Could you look at this issue to see if it's accurate? I also had a clarification question on this. Do the staging tables need to be distributed for performance reasons?

Ozgun,

Thanks for the response and creating the tickets.  #509 looks good.

The reason I have the staging table distributed is to allow parallel inserts.  If we could do the same with a non-distributed table, that would be fine.

Thanks again,

Patrick

Patrick

unread,
May 10, 2016, 11:42:11 AM5/10/16
to citus-users
Correcting/clarifying my previous email:

The reason I have the staging table distributed is to allow parallel inserts.  If we could do the same with a non-distributed table, that would be fine.

The staging table is distributed to be able to handle more volume.  There are multiple staging tables in order to handle the load in parallel.
 

oz...@citusdata.com

unread,
Aug 9, 2016, 6:54:26 PM8/9/16
to citus-users
Hi Patrick,

I wanted to drop a quick update in this email thread.

We talked about this feature as part of our v5.3 planning and prioritized it: https://github.com/citusdata/citus/issues/508

I added comments to #508/#509 to capture our understanding of your workflow. If you have any input or feedback for us, please feel free to share them over Github.

Best,
Ozgun
Reply all
Reply to author
Forward
0 new messages