positionSchema = StructType([
StructField("timestamp", TimestampType(), True),
StructField("messageId", IntegerType(), True),
StructField("repeat", IntegerType(), True),
StructField("mmsi", StringType(), True),
StructField("latitude", DoubleType(), True),
StructField("longitude", DoubleType(), True),
StructField("navStatus", IntegerType(), True),
StructField("rot", IntegerType(), True),
StructField("sog", IntegerType(), True),
StructField("posAccuracy", IntegerType(), True),
StructField("cog", IntegerType(), True),
StructField("trueHeading", IntegerType(), True),
StructField("specialManeuvreIndicator", IntegerType(), True),
StructField("raim", IntegerType(), True)
])
staticSchema = StructType([
StructField("timestamp", TimestampType(), True),
StructField("messageId", IntegerType(), True),
StructField("repeat", IntegerType(), True),
StructField("mmsi", StringType(), True),
StructField("callSign", StringType(), True),
StructField("name", StringType(), True),
StructField("shipType", IntegerType(), True),
StructField("dimensionBow", IntegerType(), True),
StructField("dimensionStern", IntegerType(), True),
StructField("dimensionPort", IntegerType(), True),
StructField("dimensionStarboard", IntegerType(), True),
StructField("positionDeviceType", IntegerType(), True),
StructField("eta", TimestampType(), True),
StructField("destination", StringType(), True),
StructField("maxStaticDraught", IntegerType(), True)
])
trajectorySchema = StructType([
StructField("id", StringType(), True),
StructField("mmsi", StringType(), True),
StructField("start_timestamp", TimestampType(), True),
StructField("end_timestamp", TimestampType(), True),
StructField("ship_name", StringType(), True),
StructField("call_sign", StringType(), True),
StructField("destination", StringType(), True),
StructField("ship_type", IntegerType(), True),
StructField("geometry", ArrayType(ArrayType(DoubleType())), True),
StructField("geometry_index", ArrayType(StringType()), True),
StructField("wkt", StringType(), True),
StructField("length", DoubleType(), True)
])
@pandas_udf(trajectorySchema, PandasUDFType.GROUPED_MAP)
def process_trajectory(pdf):
# create geodataframe
position_df = GeoDataFrame(pdf, crs={'init' :'epsg:4326'}, geometry=gpd.points_from_xy(pdf.longitude, pdf.latitude))
# create spatial index for temporal analysis
position_df['t'] = pd.to_datetime(position_df['timestamp'], format='%d/%m/%Y %H:%M:%S')
position_df = position_df.set_index('t')
# get trajectories
trajectory_col = mpd.TrajectoryCollection(position_df, 'mmsi', min_length=300)
# static & trajectories
_trajetories = []
for trajectory in trajectory_col.trajectories:
geometry_series = trajectory.df['geometry'].apply(np.asarray).values.tolist()
geometry_series_index = [t.strftime("%Y-%m-%d %H:%M:%SZ") for t in trajectory.df['geometry'].index.tolist()]
#create trip hash
gseed = [nd.wkt for nd in trajectory.df['geometry'].values[0:5]]
seed = trajectory.df['mmsi'].iloc[0] + "".join(gseed).replace(" ", "")
seed = str(int(hashlib.sha1(seed.encode('utf-8')).hexdigest(), 16) % (10 ** 8))
_trajetories.append(dict(
id=seed, \
mmsi=trajectory.df['mmsi'].iloc[0], \
start_timestamp=trajectory.get_start_time(), \
end_timestamp=trajectory.get_end_time(), \
ship_name=trajectory.df['name'].iloc[0], \
call_sign=trajectory.df['callSign'].iloc[0], \
destination=trajectory.df['destination'].iloc[0], \
ship_type=trajectory.df['shipType'].iloc[0], \
geometry=geometry_series, \
geometry_index=geometry_series_index, \
wkt=trajectory.to_linestringm_wkt(), \
length=trajectory.get_length()
))
trajetories = pd.DataFrame(_trajetories, columns=["id", "mmsi", "start_timestamp", "end_timestamp", "ship_name", "call_sign", "destination", "ship_type", "geometry", "geometry_index", "wkt", "length"])
return trajetories
_positions = (
spark.readStream \
.option("latestFirst", True) \
.schema(positionSchema) \
.parquet("/ais/messages/positions") \
.withWatermark("timestamp", "1 hours")
)
df = _positions \
.join(
_statics,
expr("""static_mmsi = mmsi AND static_timestamp >= timestamp AND static_timestamp <= timestamp + interval 30 minutes""")
).drop("static_mmsi") \
.groupBy(
window(_positions.timestamp, "1 hours", "30 minutes")
).apply(process_trajectory).dropDuplicates(["id"])