We've recently started using equality filters in mapper jobs configured with DatastoreInputReader (Python), and noticed an issue with workload distribution across shards.
Presently equality filters are ignored when choosing key range split points using the __scatter__ property. The resulting split points are appropriate for the entire population of entities of a given kind, but not necessarily for the sub-population matching the equality filter(s). This can result in a very uneven distribution of work among shards.
The solution I would like to propose is to apply any (equality) filters in the query that chooses the split points by sampling keys. The downside is that for each filter combination to be used there is an additional index required: the index must include all the properties from the filter plus the __scatter__ property.
diff --git a/python/src/mapreduce/input_readers.py b/python/src/mapreduce/input_readers.py
index 2632eb1..f81a46d 100755
--- a/python/src/mapreduce/input_readers.py
+++ b/python/src/mapreduce/input_readers.py
@@ -518,7 +518,7 @@ class AbstractDatastoreInputReader(InputReader):
ranges = cls._split_ns_by_scatter(
shard_count,
namespace,
- query_spec.entity_kind,
+ query_spec,
app)
# The nth split of each ns will be assigned to the nth shard.
# Shuffle so that None are not all by the end.
@@ -545,7 +545,7 @@ class AbstractDatastoreInputReader(InputReader):
def _split_ns_by_scatter(cls,
shard_count,
namespace,
- raw_entity_kind,
+ query_spec,
app):
"""Split a namespace by scatter index into key_range.KeyRange.
@@ -554,7 +554,7 @@ class AbstractDatastoreInputReader(InputReader):
Args:
shard_count: number of shards.
namespace: namespace name to split. str.
- raw_entity_kind: low level datastore API entity kind.
+ query_spec: query specification
app: app id in str.
Returns:
@@ -566,14 +566,22 @@ class AbstractDatastoreInputReader(InputReader):
# With one shard we don't need to calculate any split points at all.
return [key_range.KeyRange(namespace=namespace, _app=app)]
- ds_query = datastore.Query(kind=raw_entity_kind,
+ filters = {}
+ if query_spec.filters is not None:
+ filters = dict(('{} {}'.format(prop, op), value) for prop, op, value in query_spec.filters)
+ ds_query = datastore.Query(kind=query_spec.entity_kind,
namespace=namespace,
_app=app,
+ filters=filters,
keys_only=True)
ds_query.Order("__scatter__")
+
oversampling_factor = 32
random_keys = ds_query.Get(shard_count * oversampling_factor)
+ logging.debug('__scatter__ achieved oversampling factor: {}'.format(
+ 1.0 * len(random_keys) / shard_count))
+
if not random_keys:
# There are no entities with scatter property. We have no idea
# how to split.
Here is a shard distribution for the same job, before and after the change. (The counts don't match since I aborted the "before" job without waiting for the single working shard to complete):
Let me know if this seems reasonable, and if Google would be willing to consider this (or similar) change.