com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast DOCUMENT into a NullType (value: { "coordinates" : [151.25, -33.94], "type" : "Point" })
# Setup MongoDB connection
client = MongoClient("mongodb://127.0.0.1")
db = client.twitter_experiment
try:
page_count = 1
for tweets in limit_handled(tweepy.Cursor(api.search, q=query, count=count, result_type=result_type).pages()):
print('+', end='')
for tweet in tweets:
result = db.tweets.insert_one(tweet._json)
tweetcount += 1
page_count += 1
if page_count >= pages:
break
time.sleep(2.5)
print('')
print('Finished at page', page_count)
print('Retrieved', tweetcount, 'tweets')
except tweepy.TweepError as e:
print ("Unknown TweepError:", e)
print ("Was at {} page".format(page_count))
except:
print ("Unexpected error:", sys.exc_info()[0])
my_spark = SparkSession.builder.appName("TwitterExperiment") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/twitter_experiment.tweets") \
.getOrCreate()
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
print("===dropDuplicates pass===")
print("before dropDuplicates:", df.count())
df = df.dropDuplicates()
print("after dropDuplicates:", df.count())
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-8432489702392663026.py", line 337, in <module>
exec(code)
File "<stdin>", line 6, in <module>
File "/usr/spark-2.1.0/python/pyspark/sql/dataframe.py", line 380, in count
return int(self._jdf.count())
File "/usr/local/lib/python3.4/dist-packages/py4j-0.10.4-py3.4.egg/py4j/java_gateway.py", line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/spark-2.1.0/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/local/lib/python3.4/dist-packages/py4j-0.10.4-py3.4.egg/py4j/protocol.py", line 319, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o725.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 50.0 failed 1 times, most recent failure: Lost task 0.0 in stage 50.0 (TID 70, localhost, executor driver): com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast DOCUMENT into a NullType (value: { "coordinates" : [151.25, -33.94], "type" : "Point" })
at com.mongodb.spark.sql.MapFunctions$.com$mongodb$spark$sql$MapFunctions$$convertToDataType(MapFunctions.scala:80)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:38)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:36)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at com.mongodb.spark.sql.MapFunctions$.documentToRow(MapFunctions.scala:36)
at com.mongodb.spark.sql.MongoRelation$$anonfun$buildScan$2.apply(MongoRelation.scala:45)
at com.mongodb.spark.sql.MongoRelation$$anonfun$buildScan$2.apply(MongoRelation.scala:45)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I don’t want to filter out these rows as the rest of the metadata is useful to me, but I also don’t want to have to remap all 900 fields from the schema just to tackle 4 problematic fields - is there an easy way to tell the Type Conversion code to simply make an assumption based on a setting / something I can force?
Hi Luke,
It’s been a while since you posted this question, have you found the answer to your issue ?
I assumed that the schema is inferred by Spark and not explicitly specified i.e. map 900 fields.
Based on the exception message you posted, the value
seems to be coming from geo
field or sort.
Spark would samples fields to infer the schema, and in this case it is likely that it has sampled all geo
fields of NULL
values. Which resulting in inferring type for geo
field as NullType
. When it encountered a document with non-null value of type Document
it sees it as conflict.
A work around without defining your own map of 900 fields, is to let it infer the schema and then modify selected types only.
For example:
>>> df.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- a: null (nullable = true)
|-- b: string (nullable = true)
# Example of changing the NullType to StringType
>>> modified_df = df.withColumn("geo", df["geo"].cast("string"))
>>> modified_df.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- a: string (nullable = true)
|-- b: string (nullable = true)
See also pyspark.sql.types
Regards,
Wan.