Arrow Conversion Error on Pandas UDF

730 views
Skip to first unread message

Nematix Corporation

unread,
Jul 3, 2020, 9:34:52 AM7/3/20
to Google Cloud Dataproc Discussions
I have a weird issue with the following snippet of program that running correctly at DataBrick with the same version of Spark of DataProc.
When running at DataProc it give me the following error and it cannot be reproduce at DataBrick.

Python : 3.7.6
Spark : 2.4.5
Dataproc Image : 1.5 Ubuntu
PyArrow : 0.15.1

Code snippet:

positionSchema = StructType([
 
StructField("timestamp", TimestampType(), True),
 
StructField("messageId", IntegerType(), True),
 
StructField("repeat", IntegerType(), True),
 
StructField("mmsi", StringType(), True),
 
StructField("latitude", DoubleType(), True),
 
StructField("longitude", DoubleType(), True),
 
StructField("navStatus", IntegerType(), True),
 
StructField("rot", IntegerType(), True),
 
StructField("sog", IntegerType(), True),
 
StructField("posAccuracy", IntegerType(), True),
 
StructField("cog", IntegerType(), True),
 
StructField("trueHeading", IntegerType(), True),
 
StructField("specialManeuvreIndicator", IntegerType(), True),
 
StructField("raim", IntegerType(), True)
])

staticSchema
= StructType([
 
StructField("timestamp", TimestampType(), True),
 
StructField("messageId", IntegerType(), True),
 
StructField("repeat", IntegerType(), True),
 
StructField("mmsi", StringType(), True),
 
StructField("callSign", StringType(), True),
 
StructField("name", StringType(), True),
 
StructField("shipType", IntegerType(), True),
 
StructField("dimensionBow", IntegerType(), True),
 
StructField("dimensionStern", IntegerType(), True),
 
StructField("dimensionPort", IntegerType(), True),
 
StructField("dimensionStarboard", IntegerType(), True),
 
StructField("positionDeviceType", IntegerType(), True),
 
StructField("eta", TimestampType(), True),
 
StructField("destination", StringType(), True),
 
StructField("maxStaticDraught", IntegerType(), True)
])

trajectorySchema
= StructType([
 
StructField("id", StringType(), True),
 
StructField("mmsi", StringType(), True),
 
StructField("start_timestamp", TimestampType(), True),
 
StructField("end_timestamp", TimestampType(), True),
 
StructField("ship_name", StringType(), True),
 
StructField("call_sign", StringType(), True),
 
StructField("destination", StringType(), True),
 
StructField("ship_type", IntegerType(), True),
 
StructField("geometry", ArrayType(ArrayType(DoubleType())), True),
 
StructField("geometry_index", ArrayType(StringType()), True),
 
StructField("wkt", StringType(), True),
 
StructField("length", DoubleType(), True)
])

@pandas_udf(trajectorySchema, PandasUDFType.GROUPED_MAP)
def process_trajectory(pdf):
 
# create geodataframe
  position_df
= GeoDataFrame(pdf, crs={'init' :'epsg:4326'}, geometry=gpd.points_from_xy(pdf.longitude, pdf.latitude))
 
 
# create spatial index for temporal analysis
  position_df
['t'] = pd.to_datetime(position_df['timestamp'], format='%d/%m/%Y %H:%M:%S')
  position_df
= position_df.set_index('t')
 
 
# get trajectories
  trajectory_col
= mpd.TrajectoryCollection(position_df, 'mmsi', min_length=300)
 
 
# static & trajectories
  _trajetories
= []
 
for trajectory in trajectory_col.trajectories:
    geometry_series
= trajectory.df['geometry'].apply(np.asarray).values.tolist()
    geometry_series_index
= [t.strftime("%Y-%m-%d %H:%M:%SZ") for t in trajectory.df['geometry'].index.tolist()]
   
   
#create trip hash
    gseed
= [nd.wkt for nd in trajectory.df['geometry'].values[0:5]]
    seed
= trajectory.df['mmsi'].iloc[0] + "".join(gseed).replace(" ", "")
    seed
= str(int(hashlib.sha1(seed.encode('utf-8')).hexdigest(), 16) % (10 ** 8))
   
    _trajetories
.append(dict(
      id
=seed, \
      mmsi
=trajectory.df['mmsi'].iloc[0], \
      start_timestamp
=trajectory.get_start_time(), \
      end_timestamp
=trajectory.get_end_time(), \
      ship_name
=trajectory.df['name'].iloc[0], \
      call_sign
=trajectory.df['callSign'].iloc[0], \
      destination
=trajectory.df['destination'].iloc[0], \
      ship_type
=trajectory.df['shipType'].iloc[0], \
      geometry
=geometry_series, \
      geometry_index
=geometry_series_index, \
      wkt
=trajectory.to_linestringm_wkt(), \
      length
=trajectory.get_length()
   
))

  trajetories
= pd.DataFrame(_trajetories, columns=["id", "mmsi", "start_timestamp", "end_timestamp", "ship_name", "call_sign", "destination", "ship_type", "geometry", "geometry_index", "wkt", "length"])

 
return trajetories


_positions
= (
  spark
.readStream \
   
.option("latestFirst", True) \
   
.schema(positionSchema) \
   
.parquet("/ais/messages/positions") \
   
.withWatermark("timestamp", "1 hours")
)

df
= _positions \
 
.join(
    _statics
,
    expr
("""static_mmsi = mmsi AND static_timestamp >= timestamp AND static_timestamp <= timestamp + interval 30 minutes""")
 
).drop("static_mmsi") \
 
.groupBy(
    window
(_positions.timestamp, "1 hours", "30 minutes")
 
).apply(process_trajectory).dropDuplicates(["id"])

Error:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 73 in stage 12.0 failed 4 times, most recent failure: Lost task 73.3 in stage 12.0 (TID 11316, mdm-analytic-w-0.asia-southeast1-b.c.mdm-analytic-278918.internal, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process
()
 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer
.dump_stream(func(split_index, iterator), outfile)
 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 290, in dump_stream
   
for series in iterator:
 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 311, in load_stream
   
yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]
 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 311, in <listcomp>
   
yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]
 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 278, in arrow_to_pandas
    s
= _check_series_convert_date(s, from_arrow_type(arrow_column.type))
 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1672, in from_arrow_type
   
raise TypeError("Unsupported type in conversion from Arrow: " + str(at))
TypeError: Unsupported type in conversion from Arrow: struct<start: timestamp[us, tz=Asia/Kuala_Lumpur], end: timestamp[us, tz=Asia/Kuala_Lumpur]>


Nematix Corporation

unread,
Jul 4, 2020, 9:13:54 PM7/4/20
to Google Cloud Dataproc Discussions
I did downgrade it to 0.13.0 but it seems doesn't work. I try to simplify my program, the same error happen. I think the issue is in the PySpark and even though it was the same version with Databrick, but it handle differently the conversion of Arrow and Pandas

Nematix Corporation

unread,
Jul 8, 2020, 3:42:07 AM7/8/20
to Google Cloud Dataproc Discussions
This is limited feature for Spark 2.4 below line, fix have been introduce in Spark 3.0.0 DataBrick I assume using a special build to reflect based on version of PyArrow that they use in the distribution.

You can inspect following file in source for further clarification.

/python/pyspark/sql/types.py
Reply all
Reply to author
Forward
0 new messages