Teradata source plugin

41 views
Skip to first unread message

Daniel SA

unread,
Jul 14, 2023, 7:15:23 AM7/14/23
to CDAP User
Hi there!

I've a pipeline with a Teradata source plugin v1.7.0 that runs into CDAP v6.7.1 on a kubernetes cluster. 

Already tweaked params (spark instances, nº partitions from source and sink, workers, memory and also "fetch size" in teradata source plugin). Execution times could be lower down but I wonder if there is still room for improvement, since considering times given into metrics, fetching-data takes about 70% of execution time vs 6% doing transformations and 24% inserting records into BQ.

Could the "Number of Splits to Generate" affect possibly in the performance? Not sure if there might be some kind of limit in teradata for fetching data...

Any ideas would be wellcome.

Thanks and regards, Daniel

Albert Shau

unread,
Jul 14, 2023, 12:30:38 PM7/14/23
to cdap...@googlegroups.com
Hi Daniel,

The number of splits will be the primary thing to adjust for performance, as it will affect the level of parallelism used (see https://cdap.atlassian.net/wiki/spaces/DOCS/pages/818544644/Parallel+Processing for more detail). Keep in mind that increasing that number will also increase the load on your teradata instance, so there is a balancing act.

Regards,
Albert

--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cdap-user/53906255-71ea-4f30-b55a-45ff3f887392n%40googlegroups.com.

Daniel SA

unread,
Jul 21, 2023, 5:56:03 AM7/21/23
to CDAP User
Hi Albert,

thanks for your reply. Already increased the number of splits/partitions from teradata-plugin and I'm getting a positive time reduction fetching data with small subsets. But I'm struggling to get the pipeline to work with the full-table. The whole table in BQ has about 15Gb and I'm working in a dev-cluster with reasonable resources (about 170 cores and 800Gb). In the params i have teradata splits/partitions/spark instances = 16, with 1 core core and 4Gb under resources. 

When using the full-table I'm facing a variety of errors; out of spool space, SQLException in nextKeyValue and almost all the time the counts displayed in the UI for the teradata plugin go over 18million rows and the pipe ends up crushing with heap space issues.

Do you have an idea of what could be happening or what could help? might skewness in the teradata split field affect negatively? 
Might the compression in teradata affect negatively by fetching the data?
Do you know where could I find some info about how is the bounding query translate into the $CONDITIONS?

Thanks again and regards, Daniel

Albert Shau

unread,
Jul 24, 2023, 2:02:25 PM7/24/23
to cdap...@googlegroups.com
Hi Daniel,

In terms of data skew and splits, internally the plugin will execute the bounding query to determine the min/max values for the split field. It then replaces the $CONDITIONS part with something like [split field] >= [lower bound] and [split field] < [upper bound], where the lower and upper bounds for each partition/split is determined by the number of splits and the min/max values from the bounding query. So it will end up executing queries like select * from table where id >= 0 and id < 1000, select * from table where id >= 1000 and id < 2000, etc. So if one partition is larger than the others, it will become the bottleneck for the pipeline, but it shouldn't be causing out of memory issues on its own.

If a partition, it will get retried, which is why the counts go higher than the expected number of rows in the table. The pipeline itself does not hold data from the source in memory - it gets streamed through. But the teradata driver itself could be batching things in memory causing the issues. This is normally controlled with the fetch size configuration, I would try reducing it to see if it fixes the issue.

Regards,
Albert

Richard Nemeth

unread,
Jul 24, 2023, 2:49:32 PM7/24/23
to cdap...@googlegroups.com
Are there stats\indexes on the TeraData table ? Are the stats fresh on the teradata table ? It sound like it is doing a full table scan for every split probably. 

Try to do your splits on a field (non string) that has statistics on it in TeraData AND one that is evenly distributed..
If you do the split on a field that is not evenly distributed you will have a lot of skew happening and not good performance.

When I say evenly distributed I mean like a field ITEM_NO if it  goes from 1 to 1000000 and it is incremental it will be evenly distributed when you do 10 splits on it, each split will have 100,000 records
BUt, if it goes from 1-10 then jumps to 250000 - 500000, then from 750000 to 1000000 (not evenly distributed) you will have skewed partitions and not have the best performance.




RICH 




__________________________________________________________________
This electronic message contains information from American Eagle Outfitters, Inc. that may be privileged and/or confidential. If you are not the intended recipient, any disclosure, copy, distribution or use of the contents of this message is prohibited. If you have received this email in error, please contact the sender immediately and destroy this message. Thank you.

Daniel SA

unread,
Jul 25, 2023, 7:37:27 AM7/25/23
to CDAP User
Hi guys,

thanks for your replies. The table has a non unique primary key and it's also date-partitioned using a filed that it's actually a timestamp. DDL would look similar to:
CREATE SET TABLE database.table,
FALLBACK ,
NO BEFORE JOURNAL,
NO AFTER JOURNAL,
CHECKSUM = DEFAULT,
DEFAULT MERGEBLOCKRATIO
(
id_NUPI_integer INTEGER NOT NULL,
dttm_timestamp TIMESTAMP(6) NOT NULL )
PRIMARY INDEX ( id_NUPI_integer )
PARTITION BY RANGE_N(CAST((dttm_timestamp ) AS DATE AT TIME ZONE 'Europe Central') BETWEEN DATE '2013-01-01' AND DATE '2029-12-31' EACH INTERVAL '1' DAY ,
NO RANGE, UNKNOWN);
I'm aware that partitioning doesn't exactly fit to partitioning field (date vs timestamp). In case this might be causing some issues, do you know what would be the recommended workaround in these cases?

@Albert, I will try to lower down the fetch size, although present/default value with this table doesn't seem quite big  --> total rows in the table 470million and about 235Gb // size for 1000 rows should be around 0,512Mb... if the counts displayed in the UI from CDAP ain't something you can rely on, how could I check the real progress while retrieving the data?

@Rich, when you mention trying to make splits with two fields, you mean recreating the table with a new index/partition or retrieving the data with two split fields? I've already used both fields in separate runs. And both cases ended up with an IllegalArgumentException: Size exceeds Integer.MAX_VALUE or heap space error. But when using both fields at the same time (split-by field name = id_NUPI_integer,dttm_timestamp , the pipeline crashes. I guess the two values from the boundary query won't fit with the two fields. Is there any chance to use more than one field as split field by using a split number greater than 1?

Thanks again and regards, Daniel
 

Richard Nemeth

unread,
Jul 25, 2023, 8:17:40 AM7/25/23
to cdap...@googlegroups.com
From my experience, we only split on one field when possible but we make sure it is on a file that is indexed and we make sure the table has its statistics up to date. If you are doing it on id_NUPI_integer  I would think that should work with no problems.

If we have used 2 fields, I think we had to combine them into one like concat(ITEM,LOC))

THANKS,

RICH 




Daniel SA

unread,
Jul 25, 2023, 8:30:32 AM7/25/23
to CDAP User
Ok, thanks Rich!
I have limited knowledge from teradata and limited access to the database.
How do you check if the statistics are up to date, and if not, how do you update them? 
Regards, Daniel

Daniel SA

unread,
Jul 27, 2023, 2:55:14 AM7/27/23
to CDAP User

Hi there!

The only effect after reducing fetch size is that retrieving data takes longer (or at least that's what the counts shows on the UI). It doesn't solve the issue; it simply postpones the first time error. A bunch of errors like these ones are present on the logs, and the pipe end up crashing after a while:

ERROR [Executor task launch worker for task 12:0.a.s.e. Executor@91] Exception in task 12.0 in stage 0.0 (TID 12) java.lang.IllegalArgumentException: Size exceeds Integer.MAX VALUE

ERROR [shuffle-server-5-2:0.a.s.n.s.TransportRequestHandler@127]-Error opening block StreamchunkId(streamId=2062307135000, chunkIndex=0} for request from /x.x.x.x:58724

There is repartitioner after Teradata plugin and a second one prior inserting into BQ (both with shuffle = False). The counts are increased up to wrangler node, but it doesn't go forth on the downstream (or at least not displayed in the UI). Wrangler simply makes lower-case al labels for the columns, but there is no real transformation in the data.

@Rich, statistics has been updated but I can’t appreciate any significant performance increase.

@Albert, you were talking that the data is streamed through, but in the wrangler I see a jump of counts that doesn’t go forth. The recipe on the wrangler is only about putting lower-case the labels from the columns. Do you know if there might be shuffling and where? also if there is a chance to deactivate it? 

Regards, Daniel

Captura de pantalla 2023-07-27 a las 8.53.57.png

Daniel SA

unread,
Jul 27, 2023, 4:26:10 AM7/27/23
to CDAP User

Hello again,

I’ve leaved yesterday running a pipe without the repartitoner (just in case it was making some noise). And defined the number of spark.executor.instances = 16, also equal to the number of splits in teradata plugin. It took about 10 hours to run but at least was “successfully” over.

Checking the counts on that run are quite higher (238millions) than the total counts of the source table (18.34millions). I was wondering if it could be some kind of issue with the counts in CDAP, but those 238M recs were actually inserted on the BigQuery -table, please check attached pics.

Is there any open bug on the Teradata plugin v1.7.0? 

Not sure if the issue could be related with the spark.executor.instances, the size of the source table or the number of splits greater than 1, but the source teradata plugin seems to be duplicating records.

I’m going to execute some more runs in order to see what param correlates with the number of duplicates.

Regards, Daniel

counts_bq.png
counts_row_data_sql_assistant.png
counts_size_bq.png
spark.executor.instances.png
pipe_with_dupplicates.png

Daniel SA

unread,
Jul 28, 2023, 4:49:35 AM7/28/23
to CDAP User
Hi there,
just another run with spark.executor.instances = 2, and the records are exactly duplicated. So the executor instances kind of multiplicates the records.
Should some other param be adjusted or is there a bug on the teradata plugin?
Thanks in advance for your support and regards, Daniel
cdap_pipe.png
row_counts_bq.png

Albert Shau

unread,
Aug 30, 2023, 2:43:24 PM8/30/23
to CDAP User
Hi Daniel,

Did you ever figure out what was happening here? The number of spark executors should not cause this sort of behavior, were you also changing the number of splits along with the number of executors? Increasing spark executors should not increase the data volume. With 1 executor and 2 splits, it just means the executor will process split1 then split2. With 2 executors and 2 splits, it means each executor will process one split.

The type of behavior you were seeing could be explained if the number of splits was also being adjusted, and if there was something wrong with the import query such that each individual split is reading the entire table instead of just a portion of it.

Regards,
Albert

Sanz Antolin, Daniel

unread,
Aug 31, 2023, 2:53:24 AM8/31/23
to cdap...@googlegroups.com

Hi Albert,

 

Still without clue of what could be happening...

 

I was modifying executor instances and splits so they match in number. But the param that multiplicate the records it’s actually the number of splits in Teradata plugin. So in former email I’ve pointed out spark.executor.instances but I really wanted to say the number of splits in Teradata plugin.

 

Please find different variations of both import and bounding queries. Duplicates/multiplication of records happen with all of them as soon as the number of splits is greater than 1:

 

 

I haven’t found on the database-plugins code how the record boundaries are given to each split, since that’s actually delegated to the MapReduce libraries. And on CDAP 6.7.1 under the “engine config” the MapReduce is marked as deprecated. Do you know if that might be causing the issue, so there is no proper parallelization, and each executor is reading and propagating the whole table?

Interfaz de usuario gráfica, Texto, Aplicación, Correo electrónico

Descripción generada automáticamente

 

Regards, Daniel

 

 

--
You received this message because you are subscribed to a topic in the Google Groups "CDAP User" group.
To unsubscribe from this topic, visit
https://groups.google.com/d/topic/cdap-user/5BfMDjLLFSc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to
cdap-user+...@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/cdap-user/8855d131-2490-4a41-b133-8d5113c1578an%40googlegroups.com.




Este correo electrónico y, en su caso, cualquier fichero anexo al mismo, contiene información de carácter confidencial exclusivamente dirigida a su destinatario o destinatarios. Si no es vd. el destinatario indicado, queda notificado que la lectura, utilización, divulgación y/o copia sin autorización está prohibida en virtud de la legislación vigente. En el caso de haber recibido este correo electrónico por error, se ruega notificar inmediatamente esta circunstancia mediante reenvío a la dirección electrónica del remitente.
Evite imprimir este mensaje si no es estrictamente necesario.

This email and any file attached to it (when applicable) contain(s) confidential information that is exclusively addressed to its recipient(s). If you are not the indicated recipient, you are informed that reading, using, disseminating and/or copying it without authorisation is forbidden in accordance with the legislation in effect. If you have received this email by mistake, please immediately notify the sender of the situation by resending it to their email address.
Avoid printing this message if it is not absolutely necessary.

Richard Nemeth

unread,
Aug 31, 2023, 7:49:01 AM8/31/23
to cdap...@googlegroups.com
I thought I remember reading that if you use a string as a split by you could get inaccurate results.

Sanz Antolin, Daniel

unread,
Aug 31, 2023, 8:10:29 AM8/31/23
to cdap...@googlegroups.com

Hi Richard,

 

Do you remember where did you read that?

 

I can’t say that the problem it’s constrained to clustered tables with string primary key, cause the pipe also generates duplicates with date/datetime-partitioned tables.

 

Regards, Daniel

 

 

Albert Shau

unread,
Aug 31, 2023, 2:13:45 PM8/31/23
to cdap...@googlegroups.com
Hi Daniel,

As Richard mentioned, it looks like there can be issues if the split column is a string instead of numeric (https://cdap.atlassian.net/browse/PLUGIN-257). How the underlying MapReduce library works is it just tries to divide up that min and max value from the bounding query into even "chunks", replacing the $CONDITIONS part of the import query with 'splitfield > chunk start and splitfield <= chunk end'. For numeric columns this is straightforward but it may not be doing proper things for other types.

Regards,
Albert

Sanz Antolin, Daniel

unread,
Sep 1, 2023, 2:38:29 AM9/1/23
to cdap...@googlegroups.com

Hi guys,

 

thanks for that info! The ticket you've mentioned seems to be 7 yo. Do you know if the issue is still unresolved?

 

I think it would be worth to mention in teradata plugin documentation that if split-field is of type string, the things can go wild...

 

For my understanding and if I don't want to leave the transfer speed aside, the only workaround would be to create an additional numeric field that can be used for making the splits, right? What in the other hand for thousands of tables won't be that practical...

 

Regards, Daniel

 

 

Reply all
Reply to author
Forward
0 new messages