Issue reading from existing document with using spark-connector - Type Conversion problem?

1,728 views
Skip to first unread message

Luke Walker

unread,
Jul 10, 2017, 1:12:10 AM7/10/17
to mongodb-user
Hi,

I've downloaded a series of tweets in JSON format, then dropped them into a MongoDB document.

However when I'm trying to pull it out into Spark via pyspark, I'm hitting a conversion problem:

com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast DOCUMENT into a NullType (value: { "coordinates" : [151.25, -33.94], "type" : "Point" })

I don't want to filter out these rows as the rest of the metadata is useful to me, but I also don't want to have to remap all 900 fields from the schema just to tackle 4 problematic fields - is there an easy way to tell the Type Conversion code to simply make an assumption based on a setting / something I can force?

How the tweets are written into MongoDB

# Setup MongoDB connection
client
= MongoClient("mongodb://127.0.0.1")
db
= client.twitter_experiment


try:
    page_count
= 1
   
for tweets in limit_handled(tweepy.Cursor(api.search, q=query, count=count, result_type=result_type).pages()):
       
print('+', end='')
       
for tweet in tweets:
            result
= db.tweets.insert_one(tweet._json)
            tweetcount
+= 1
       
        page_count
+= 1
       
if page_count >= pages:
           
break
       
        time
.sleep(2.5)
   
   
print('')
   
print('Finished at page', page_count)
   
print('Retrieved', tweetcount, 'tweets')


except tweepy.TweepError as e:
   
print ("Unknown TweepError:", e)
   
print ("Was at {} page".format(page_count))
except:
   
print ("Unexpected error:", sys.exc_info()[0])

Pulling from MongoDB into Spark

my_spark = SparkSession.builder.appName("TwitterExperiment") \
   
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/twitter_experiment.tweets") \
   
.getOrCreate()
   
df
= spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

print("===dropDuplicates pass===")
print("before dropDuplicates:", df.count())
df
= df.dropDuplicates()
print("after dropDuplicates:", df.count())

Traceback

Traceback (most recent call last):
 
File "/tmp/zeppelin_pyspark-8432489702392663026.py", line 337, in <module>
 
exec(code)
 
File "<stdin>", line 6, in <module>
 
File "/usr/spark-2.1.0/python/pyspark/sql/dataframe.py", line 380, in count
 
return int(self._jdf.count())
 
File "/usr/local/lib/python3.4/dist-packages/py4j-0.10.4-py3.4.egg/py4j/java_gateway.py", line 1133, in __call__
 answer
, self.gateway_client, self.target_id, self.name)
 
File "/usr/spark-2.1.0/python/pyspark/sql/utils.py", line 63, in deco
 
return f(*a, **kw)
 
File "/usr/local/lib/python3.4/dist-packages/py4j-0.10.4-py3.4.egg/py4j/protocol.py", line 319, in get_return_value
 format
(target_id, ".", name), value)
py4j
.protocol.Py4JJavaError: An error occurred while calling o725.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 50.0 failed 1 times, most recent failure: Lost task 0.0 in stage 50.0 (TID 70, localhost, executor driver): com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast DOCUMENT into a NullType (value: { "coordinates" : [151.25, -33.94], "type" : "Point" })
 at com
.mongodb.spark.sql.MapFunctions$.com$mongodb$spark$sql$MapFunctions$$convertToDataType(MapFunctions.scala:80)
 at com
.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:38)
 at com
.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:36)
 at scala
.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala
.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala
.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala
.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
 at scala
.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala
.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
 at com
.mongodb.spark.sql.MapFunctions$.documentToRow(MapFunctions.scala:36)
 at com
.mongodb.spark.sql.MongoRelation$$anonfun$buildScan$2.apply(MongoRelation.scala:45)
 at com
.mongodb.spark.sql.MongoRelation$$anonfun$buildScan$2.apply(MongoRelation.scala:45)
 at scala
.collection.Iterator$$anon$11.next(Iterator.scala:409)
 at scala
.collection.Iterator$$anon$11.next(Iterator.scala:409)
 at org
.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
 at org
.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
 at org
.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at org
.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
 at scala
.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
 at org
.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
 at org
.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
 at org
.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
 at org
.apache.spark.scheduler.Task.run(Task.scala:99)
 at org
.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
 at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java
.lang.Thread.run(Thread.java:745)

Wan Bachtiar

unread,
Aug 7, 2017, 3:26:30 AM8/7/17
to mongodb-user

I don’t want to filter out these rows as the rest of the metadata is useful to me, but I also don’t want to have to remap all 900 fields from the schema just to tackle 4 problematic fields - is there an easy way to tell the Type Conversion code to simply make an assumption based on a setting / something I can force?

Hi Luke,

It’s been a while since you posted this question, have you found the answer to your issue ?

I assumed that the schema is inferred by Spark and not explicitly specified i.e. map 900 fields.
Based on the exception message you posted, the value seems to be coming from geo field or sort.
Spark would samples fields to infer the schema, and in this case it is likely that it has sampled all geo fields of NULL values. Which resulting in inferring type for geo field as NullType. When it encountered a document with non-null value of type Document it sees it as conflict.

A work around without defining your own map of 900 fields, is to let it infer the schema and then modify selected types only.
For example:


>>> df.printSchema()
root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- a: null (nullable = true)
 |-- b: string (nullable = true)

# Example of changing the NullType to StringType
>>> modified_df = df.withColumn("geo", df["geo"].cast("string")) 
>>> modified_df.printSchema()
root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)

See also pyspark.sql.types

Regards,
Wan.

Luke Walker

unread,
Aug 10, 2017, 11:05:48 PM8/10/17
to mongodb-user
Hi Wan,

Yes it had been a while, but your answer has helped me move further forward with this project.

Greatly appreciated for that :)

regards,
Luke
Reply all
Reply to author
Forward
0 new messages