objectify 4->5 migration

275 views
Skip to first unread message

Ivan Golubev

unread,
Jun 12, 2015, 11:57:55 AM6/12/15
to objectify...@googlegroups.com
Hello guys,
to follow the "incredibly important migration notes"
I created a cron job to migrate data stored in Google Storage.

I wonder: how can I filter the entities that are still stored in the old format from the already migrated ?
Something of this kind:
Query<MatchEntity> query = ObjectifyService.ofy().load().type(MyEntity.class).limit(QUERY_CHUNK_SIZE);

query = query.filter("createdDate <=", createdDate);

Jeff Schnitzer

unread,
Jun 12, 2015, 12:27:43 PM6/12/15
to objectify...@googlegroups.com
I don't understand exactly what you are doing or why you are trying to use a cron job.

After you configure your code, you need to iterate through your entities and load/save them. Typically this is a map/reduce kind of problem, and the exact way to do it depends on the number of entities in involved.

First of all, you need a deferrable task that rewrites one entity by key. Then you need a way of iterating all the keys of your entities, enqueuing one task for each key. Note that you should enqueue a batch at a time for performance, but that's trivial with the guava collections partition() method.

If you have relatively few entities (say, a few tens of thousands), you can make and endpoint that just iterates the entire set with a keys-only query and hit it directly. It should complete in 60s.

If you have significantly more (up to a few million), make your endpoint spawn a task that does the keys-only query. Now you have 10 minutes.

If you have really large quantities of entities, you need a proper map/reduce strategy. Look at the Google map/reduce library and see how it uses the __scatter__ property.

Jeff

--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Ivan Golubev

unread,
Jun 15, 2015, 1:27:47 PM6/15/15
to objectify...@googlegroups.com, je...@infohazard.org
Hello Jeff,

my application has 19 types of entities that need migration because of the @Embed annotation used.
158,703,547 objects take up 319.17 GB of data. The server is actively used: it has ~ 100k users and 250-300 instances.
So, I'd like to think about transactions and batch processing.

So there are 3 options I see:
1. create a task queue, a job and 19 cursors to iterate through each entity collection,
saving each entity separately:
List<Key<Car>> chunk = ofy().load().type(Car.class)
                        .startAt(Cursor.fromWebSafeString(cursorStr))
                        .limit(1000).keys().list();

for (final Key<Car> k: chunk)
ofy().transact(new Work<VoidWork>() {
public VoidWork run() {
ofy().save().entity(
ofy().load().entity(Car.class).key(k).now()
).now();
}
}
if ( chunk.isEmpty() )
   logger.warn("Car entity objectify migration complete.")
else 
   queue.add(url("/migrateObjetifyServlet")
        .param("cursor", cursor.toWebSafeString()));
// 18 other cursor iterations

This solution will take a while to migrate (days ?).

2. map reduce
2.1 scatter the keys to process entities concurrently
2.2 mapper processes one entity and returns nothing useful:
mapper code:
   ofy().transact(new Work<VoidWork>() {
public VoidWork run() {
ofy().save().entity(
ofy().load().entity(Car.class).key(k).now()
).now();
}
}

The question is what is the pricing for this solution, I found scary stories about thousands of dollars invested into adding a single field with map-reduce approach.

3. Use deferred tasks and again save each entity in a transaction.

What do you think would be the optimal way to re-save 300 Gb of data to migrate objectify 4 -> 5 ?
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.

Ivan Golubev

unread,
Jun 15, 2015, 1:36:45 PM6/15/15
to objectify...@googlegroups.com
Sorry Jeff, 
never mind about the price aspect - I gather it depends on the rate of processing and the amount of workers specified.

Ivan Golubev

unread,
Jun 15, 2015, 1:44:30 PM6/15/15
to objectify...@googlegroups.com
And for the 2.map-reduce solution I see DataStoreKeyInputReader which seems to be just what the doctor's ordered.

Ivan Golubev

unread,
Jun 15, 2015, 2:17:13 PM6/15/15
to objectify...@googlegroups.com
Am I right that map-reducing the task should look something like this:
// A map function processes input values one at a time and generates zero output values.
MapOnlyMapper<Key<Car>, Void> mapper = new MapOnlyMapper<>() {
public void map(Key<Car> k) {

ofy().transact(new Work<VoidWork>() {
public VoidWork run() {
            Thing thing = ofy().load().key(thingKey).now();
ofy().save().entity(thing).now();
}
});
}
};

Query<Car> query = ofy().load().type(Car.class);

MapReduceSpecification.of(
"Objectify 4 to 5 migration",
new DatastoreKeyInputReader(query),
mapper,
NoReducer.<Long, String, String>create(),
NoOutput.<String, String>create((int)1L)
);

Nicholas Okunew

unread,
Jun 15, 2015, 4:56:13 PM6/15/15
to objectify...@googlegroups.com
I'm not totally sure, but I expect that the MapReduce code is all low level - i.e. the query expected is a datastore query, not an ofy query, and the key is a low level key, not an ofy key. This seems like a better way to go about it potentially than your first code example.

I think in that example (where you're queuing your own task) you need to be a little careful of error states, the tasks will retry typically, but with some kind of backoff. If you get to some bad data (something that won't load properly, or save properly) because of an earlier migration, it can stall the whole thing. And if you've migrated 150m objects, you really probably don't want to start again. 

Potentially you could read all keys then queue each one, then process them in batches using pull queues, one queue for each entity type. This doesnt really handle the fact that the system is in use as the migration happens, but hopefully any new entities would be written in the new format anyway.

You could also consider adding a field to each of your entities to help identify they've been migrated (like an ofy version field or something). This won't help you query on those that haven't been migrated yet, though.

My advice would be to make this a process you can safely re-run without reprocessing entities that are migrated so you can purge queues etc.

--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.

Jeff Schnitzer

unread,
Jun 15, 2015, 7:05:15 PM6/15/15
to objectify...@googlegroups.com

This will be shorter than I would like sice I am typing on a phone...

There is pretty much one good way to reprocess this quantity of data.

1) get your system into such a state that all new entity writes go to the new format. I assume you have already done this.

2) fill a task queue with one task for every entity key. That task just loads and saves the entity.

Any other approach, especially approaches involving cursors, will cause you massive amounts of pain. Your queries will timeout and fail and you will find yourself in a retry loop with an iteration cycle measured in days.

#2 is a little tricky. Ideally you would just issue a keys only query and encode a task for each one but that won't work for 200M entities. Instead you must break that down.

Use the mechanism of the map/reduce library (but I would not bother with the actual library for this). There is a magic property attached to some % of entities called __scatter__. A keys-only query on this property gives you a random sampling of keys. If you query for 1000 keys and sort them, you can determine effective partition values - you will want maybe 100 partitions.

Enqueue a task for each partition. That task queries for keys bounded by the partition and enqueues the final read/write task for each key.

This may sound complicated but it is really just a few lines of code. Too much for me to type on a phone though.

You can fill the reducer queue in a few minutes. Processing all the entities is just a question of waiting for the queues to drain. At 500/s you should be done in about 5 days.

Cost wise, if this doesn't update any indexes, you'll pretty much pay one read plus one write.

Suerte,
Jeff

Ivan Golubev

unread,
Jun 16, 2015, 10:26:22 AM6/16/15
to objectify...@googlegroups.com, je...@infohazard.org
Thank you for your replies Jeff and Nick.

As I see it:
1. I retrieve random sample of keys with __scatter__ property 
    (which according to docs is added to 1 out of every 512 entities) this way:
List<Key<Item>> partition_points = 
                 ofy().load().type(Item.class).order(SCATTER_RESERVED_PROPERTY).limit(5000).keys().list();
I suppose list of 5000 keys should fit into memory.

2. Then sort keys, create ranges and submit tasks to process ranges.
3. Then process ranges: request keys in the range and submit a task for each key in the range.
List<Key<Item>> keys_in_range = 
              ofy().load().type(Item.class).offset(range_start).limit(100).keys().list();
4. Process individual entities: load/save entity by key in a transaction.

Regarding the price: I haven't found any indexes and this means that
with the pace of 500/s (43.2 million/day) it should take 3.5 days to process (read and write) 150 million objects.
With 50k free daily limit and $0.06/100k for write or read operations,
it should take $ 25.89 * 2 (read+write) per day * 3.5 days = $ 181.23.

To Jeff:
The only thing that bothers me is whether the sampling of partition points is correct.
Will the ranges created with first 2 queries cover all the entities ?
I mean, what if the last partition point won't be fetched and the entire range of keys won't be processed ?
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.

Jeff Schnitzer

unread,
Jun 16, 2015, 1:27:34 PM6/16/15
to objectify...@googlegroups.com

You are close. Don't use offset to extract the partition. Use greater than and less than filters on __key__

To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.

Jeff Schnitzer

unread,
Jun 16, 2015, 3:35:48 PM6/16/15
to objectify...@googlegroups.com

Oh one more thing,which you asked about - the queries for the partition ends need to be open-ended.

Won't be in front of a real keyboard till thr but if that doesnt make sense I can elaborate.

Ivan Golubev

unread,
Jun 17, 2015, 5:45:30 AM6/17/15
to objectify...@googlegroups.com, je...@infohazard.org
Do you mean that first query should be with condition: < first partition point,
and the last query should be with condition: >= last partition point ?
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.

Jeff Schnitzer

unread,
Jun 17, 2015, 10:55:19 AM6/17/15
to objectify...@googlegroups.com

Exactly

To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.

Ivan Golubev

unread,
Jun 24, 2015, 1:10:22 PM6/24/15
to objectify...@googlegroups.com, je...@infohazard.org
Hi Jeff,

I have finally tried the approach you proposed.
The exact code (for the posterity) is this one:

private List<Class> classesToMigrate = ImmutableList.<Class>builder().add(       
User.class,
        ...
).build();

public void submitObjectifyMigrationClassTasks() {
/* submit tasks for each instance type with delays */
for (Class c: classesToMigrate) {
taskService.queueTask(OBJECTIFY_MIGRATION_TASK_TYPE, c.getCanonicalName(), objectifyMigrateQueue);
log.warning(String.format("Objectify migration task for class %s has been submitted.", c.getName()));
}
}

private <C> void processEntityClass(Class<C> clazz) {
try {
/* fetch entity keys using the "scatter" property, keys queries come at 1/7th the price of a full fetch */
List<Key<C>> partitionPoints = ofy().load().type(clazz).order(SCATTER_RESERVED_PROPERTY).limit(5000).keys().list();

if ( !partitionPoints.isEmpty() ) {
processKeysWithPartitionPoints(clazz, partitionPoints);
} else {
/* if not partition points found */
processKeysWithoutPartitionPoints(clazz);
}
} catch (Exception ex) {
log.log(Level.SEVERE, String.format("Objectify migration: something went wrong for class %s.", clazz.getName()), ex);
}
}

private <C> void processKeysWithoutPartitionPoints(Class<C> clazz) throws Exception {
if (ofy().load().type(clazz).count() == 0) {
log.warning(String.format("Nothing to migrate for class %s !", clazz.getName()));

} else {
/* no scatter points found - fetch all the keys for this class */
if (ofy().load().type(clazz).count() > 5000) {
throw new Exception(
String.format("Assertion error: objectify failed to mark %s entities with __scatter__ property.", clazz.getName()));
}
List<Key<C>> keys = ofy().load().type(clazz).limit(5000).keys().list();
submitIndividualKeyTasks(keys);
}
}

private <C> void processKeysWithPartitionPoints(Class<C> clazz, List<Key<C>> partitionPoints) {
log.warning("Partition points count: " + partitionPoints.size());
/* sort keys */
Collections.sort(partitionPoints);
/* build ranges (partition) */
LinkedList<Range<Key<C>>> ranges = toTuples(partitionPoints);
/* add starting and ending ranges */
ranges.addFirst(
new Range<>(
null, partitionPoints.get(0)
)
);
ranges.addLast(
new Range<>(
partitionPoints.get(partitionPoints.size() - 1), null
)
);
log.warning("Ranges count: " + ranges.size());

int totalKeysToProcess = 0;
/* fetch keys in each range and submit individual tasks */
for (Range<Key<C>> range : ranges) {
List<Key<C>> keys = getKeysInRange(range, clazz);
submitIndividualKeyTasks(keys);
totalKeysToProcess += keys.size();
}

log.warning(
String.format("Total keys to process (%d) equals entity count: %b",
totalKeysToProcess,
totalKeysToProcess == ofy().load().type(clazz).count()
)
);

log.warning(String.format("Objectify migration: all individual tasks for class %s have been submitted.", clazz.getName()));
}

private <T> List<Key<T>> getKeysInRange(Range<Key<T>> tuple, Class<T> clazz) {
List<Key<T>> keys;
if (tuple.start == null) {
keys = ofy().load().type(clazz).filterKey("<", tuple.end).keys().list();
}
else if (tuple.end == null) {
keys = ofy().load().type(clazz).filterKey(">=", tuple.start).keys().list();
}
else {
keys = ofy().load().type(clazz).filterKey(">=", tuple.start).filterKey("<", tuple.end).keys().list();
}
int size = (keys != null) ? keys.size() : 0;
log.warning(String.format("Keys count in range %s: %s.", tuple.toString(), size));
return keys;
}


private <T> void submitIndividualKeyTasks(List<Key<T>> keys) {
for (Key<T> k: keys)
taskService.queueTask(OBJECTIFY_MIGRATION_INDIVIDUAL_ITEM_TASK_TYPE, gson.toJson(k), objectifyMigrateItemQueue);
}

private <T> void processEntityItem(final Key<T> itemKey) {
ofy().transact(

new Work<VoidWork>() {
public VoidWork run() {
                    try {
T t = ofy().load().key(itemKey).now();
ofy().save().entity(t).now();
} catch (Exception e) {
log.warning("Objectify migration: failed to save individual item: " + itemKey);
}
return null;
}
}
);
}
private <T> LinkedList<Range<Key<T>>> toTuples(List<Key<T>> list) { ... }
private class Range<A> { A start, end; ... }


What definitely works:
map-reduce code: for each entity type - key ranges are being build and individual items are being submitted.

The only thing I get these errors:
java.lang.NullPointerException at com.googlecode.objectify.impl.Round$1.nowUncached(Round.java:73) at com.googlecode.objectify.util.ResultCache.now(ResultCache.java:30) at com.googlecode.objectify.impl.TransactionImpl.commitAsync(TransactionImpl.java:61) at com.googlecode.objectify.impl.TransactionImpl.commit(TransactionImpl.java:45) at com.googlecode.objectify.impl.TransactorNo.transactOnce(TransactorNo.java:119) at com.googlecode.objectify.impl.TransactorNo.transactNew(TransactorNo.java:95) at com.googlecode.objectify.impl.TransactorNo.transact(TransactorNo.java:85) at com.googlecode.objectify.impl.ObjectifyImpl.transact(ObjectifyImpl.java:177) at com.ea.scrabble.task.objectifymigrate.ObjectifyMigrateServiceImpl.processEntityItem(ObjectifyMigrateServiceImpl.java:194)

So I wonder - maybe the exact saving process should be done in a different manner (ex. by bypassing cache and using some lover-level calls) ?

Exactly

To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.

Jeff Schnitzer

unread,
Jun 24, 2015, 1:49:59 PM6/24/15
to objectify...@googlegroups.com
That exception is baffling. The 'cache' being referenced is just the cache of the value in the Result<?> - it is unrelated to memcache. Can you post the full stacktrace, including the whole GAE stack? 

This should be a simple standalone request which loads and saves an entity. The only thing I can imagine is that there's some bizarre edge case involving @OnLoad methods and @Load annotations firing in a peculiar order. Is there something particularly odd that happens when that class is loaded?

Other than that, there are two things I notice about your code:

 1) It does not appear to be enqueueing separate mapper tasks for each partition. If it just executes each range in serial there's not much point in the partitioning.

 2) You'll get a _lot_ more throughput if you submit multiple tasks at once. Guava's partition() method helps a lot. Here's a class I use:

@Data
public static class QueueHelper {
private final Queue queue;

public void add(DeferredTask payload) {
this.add(null, payload);
}

public void add(Transaction txn, DeferredTask payload) {
if (log.isDebugEnabled())
log.debug("Queue '" + queue.getQueueName() + "' adding " + payload);

queue.add(txn, TaskOptions.Builder.withPayload(payload));
}

/** Allows any number of tasks; automatically partitions as necessary */
public void add(Iterable<? extends DeferredTask> payloads) {
Iterable<TaskOptions> opts = Iterables.transform(payloads, new Function<DeferredTask, TaskOptions>() {
@Override
public TaskOptions apply(DeferredTask task) {
return TaskOptions.Builder.withPayload(task);
}
});

Iterable<List<TaskOptions>> partitioned = Iterables.partition(opts, QueueConstants.maxTasksPerAdd());

for (List<TaskOptions> piece: partitioned)
queue.add(null, piece);
}
}


Exactly

To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.

Jeff Schnitzer

unread,
Jun 24, 2015, 1:53:02 PM6/24/15
to objectify...@googlegroups.com
It might be more helpful if I post the whole class:

package com.example;

import com.google.appengine.api.datastore.Transaction;
import com.google.appengine.api.taskqueue.DeferredTask;
import com.google.appengine.api.taskqueue.Queue;
import com.google.appengine.api.taskqueue.QueueConstants;
import com.google.appengine.api.taskqueue.QueueFactory;
import com.google.appengine.api.taskqueue.TaskOptions;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.List;

/**
* Convenience methods for adding to the appengine queues.
*/
@Slf4j
public class Queues
{
/** Just a slightly more convenient interface for our purposes */
   @Data
public static class QueueHelper {
private final Queue queue;

public void add(DeferredTask payload) {
this.add(null, payload);
}

public void add(Transaction txn, DeferredTask payload) {
if (log.isDebugEnabled())
log.debug("Queue '" + queue.getQueueName() + "' adding " + payload);

queue.add(txn, TaskOptions.Builder.withPayload(payload));
}

/** Allows any number of tasks; automatically partitions as necessary */
public void add(Iterable<? extends DeferredTask> payloads) {
Iterable<TaskOptions> opts = Iterables.transform(payloads, new Function<DeferredTask, TaskOptions>() {
@Override
public TaskOptions apply(DeferredTask task) {
return TaskOptions.Builder.withPayload(task);
}
});

Iterable<List<TaskOptions>> partitioned = Iterables.partition(opts, QueueConstants.maxTasksPerAdd());

for (List<TaskOptions> piece: partitioned)
queue.add(null, piece);
}
}

   /** The default queue ('default' is a java keyword, oops) */
public static QueueHelper deflt() {
return new QueueHelper(QueueFactory.getDefaultQueue());
}

/** */
public static QueueHelper email() {
return new QueueHelper(QueueFactory.getQueue("email"));
}
}

Ivan Golubev

unread,
Jun 25, 2015, 10:24:27 AM6/25/15
to objectify...@googlegroups.com, je...@infohazard.org
The exact exception is:
java.lang.NullPointerException at com.googlecode.objectify.impl.Round$1.nowUncached(Round.java:73) at com.googlecode.objectify.util.ResultCache.now(ResultCache.java:30) at com.googlecode.objectify.impl.TransactionImpl.commitAsync(TransactionImpl.java:61) at com.googlecode.objectify.impl.TransactionImpl.commit(TransactionImpl.java:45) at com.googlecode.objectify.impl.TransactorNo.transactOnce(TransactorNo.java:119) at com.googlecode.objectify.impl.TransactorNo.transactNew(TransactorNo.java:95) at com.googlecode.objectify.impl.TransactorNo.transact(TransactorNo.java:85) at com.googlecode.objectify.impl.ObjectifyImpl.transact(ObjectifyImpl.java:177) at com.ea.scrabble.task.objectifymigrate.ObjectifyMigrateServiceImpl.processEntityItem(ObjectifyMigrateServiceImpl.java:220) at com.ea.scrabble.task.objectifymigrate.ObjectifyMigrateServiceImpl.access$500(ObjectifyMigrateServiceImpl.java:43) at com.ea.scrabble.task.objectifymigrate.ObjectifyMigrateServiceImpl$4.executeTask(ObjectifyMigrateServiceImpl.java:95) at com.ea.scrabble.tasks.QueuedTaskServlet.doPost(QueuedTaskServlet.java:44) at javax.servlet.http.HttpServlet.service(HttpServlet.java:637) at javax.servlet.http.HttpServlet.service(HttpServlet.java:717) at com.google.inject.servlet.ServletDefinition.doService(ServletDefinition.java:263) at com.google.inject.servlet.ServletDefinition.service(ServletDefinition.java:178) at com.google.inject.servlet.ManagedServletPipeline.service(ManagedServletPipeline.java:91) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:62) at com.googlecode.objectify.cache.AsyncCacheFilter.doFilter(AsyncCacheFilter.java:59) at com.googlecode.objectify.ObjectifyFilter.doFilter(ObjectifyFilter.java:49) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:168) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:168) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.googlecode.objectify.cache.AsyncCacheFilter.doFilter(AsyncCacheFilter.java:59) at com.googlecode.objectify.ObjectifyFilter.doFilter(ObjectifyFilter.java:49) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:168) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at org.apache.shiro.web.servlet.AbstractShiroFilter.executeChain(AbstractShiroFilter.java:449) at org.apache.shiro.web.servlet.AbstractShiroFilter$1.call(AbstractShiroFilter.java:365) at org.apache.shiro.subject.support.SubjectCallable.doCall(SubjectCallable.java:90) at org.apache.shiro.subject.support.SubjectCallable.call(SubjectCallable.java:83) at org.apache.shiro.subject.support.DelegatingSubject.execute(DelegatingSubject.java:383) at org.apache.shiro.web.servlet.AbstractShiroFilter.doFilterInternal(AbstractShiroFilter.java:362) at org.apache.shiro.web.servlet.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:125) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:168) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:168) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118) at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1157) at com.google.apphosting.utils.servlet.ParseBlobUploadFilter.doFilter(ParseBlobUploadFilter.java:125) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1157) at com.google.apphosting.runtime.jetty.SaveSessionFilter.doFilter(SaveSessionFilter.java:35) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1157) at com.google.apphosting.utils.servlet.JdbcMySqlConnectionCleanupFilter.doFilter(JdbcMySqlConnectionCleanupFilter.java:60) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1157) at com.google.apphosting.utils.servlet.TransactionCleanupFilter.doFilter(TransactionCleanupFilter.java:43) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1157) at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:388) at org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216) at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182) at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:765) at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:418) at com.google.apphosting.runtime.jetty.AppVersionHandlerMap.handle(AppVersionHandlerMap.java:257) at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152) at org.mortbay.jetty.Server.handle(Server.java:326) at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542) at org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:923) at com.google.apphosting.runtime.jetty.RpcRequestParser.parseAvailable(RpcRequestParser.java:76) at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404) at com.google.apphosting.runtime.jetty.JettyServletEngineAdapter.serviceRequest(JettyServletEngineAdapter.java:146) at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.run(JavaRuntime.java:482) at com.google.tracing.TraceContext$TraceContextRunnable.runInContext(TraceContext.java:437) at com.google.tracing.TraceContext$TraceContextRunnable$1.run(TraceContext.java:444) at com.google.tracing.CurrentContext.runInContext(CurrentContext.java:230) at com.google.tracing.TraceContext$AbstractTraceContextCallback.runInInheritedContextNoUnref(TraceContext.java:308) at com.google.tracing.TraceContext$AbstractTraceContextCallback.runInInheritedContext(TraceContext.java:300) at com.google.tracing.TraceContext$TraceContextRunnable.run(TraceContext.java:441) at com.google.apphosting.runtime.ThreadGroupPool$PoolEntry.run(ThreadGroupPool.java:234) at java.lang.Thread.run(Thread.java:745)

Jeff Schnitzer

unread,
Jun 25, 2015, 11:37:47 AM6/25/15
to objectify...@googlegroups.com
One oddity is that I see the ObjectifyFilter appearing twice in that stack. I don't think this is causing the exception but it can't be good.

Is this a normal frontend instance or a managed vm?

The only other thing that could possibly shed some light on this is the class definition of the entity being rewritten. If you want to send that to me privately, I'll take a closer look.

This is pretty baffling because there's not much going on in this request. Just load/save one entity. I've never seen anything like it, and I'm drawing a blank on what could even potentially cause that NPE.

Jeff

--

Ivan Golubev

unread,
Jun 26, 2015, 12:25:47 PM6/26/15
to objectify...@googlegroups.com, je...@infohazard.org
The code I am posting contains pretty standard POJO classes, nothing secret:
// ChatRoomEntity.java
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import com.googlecode.objectify.annotation.Cache;
import com.googlecode.objectify.annotation.Entity;
import com.googlecode.objectify.annotation.Unindex;

@Entity(name = "ChatRoom")
@Unindex
@Cache
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ChatRoomEntity {
@Id
private String chatRoomId;
private List<ChatEntry> chatEntries = new ArrayList<ChatEntry>();
}

// ChatEntry.java
import java.io.Serializable;
import java.util.Date;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import com.googlecode.objectify.annotation.Embed;

@Data
@NoArgsConstructor
@AllArgsConstructor
@Embed
public class ChatEntry implements Serializable {
private int chatEntryId;
private String playerId;
private String message;
private Date discussiondate;
}

On Thursday, June 25, 2015 at 6:37:47 PM UTC+3, Jeff Schnitzer wrote:
One oddity is that I see the ObjectifyFilter appearing twice in that stack. I don't think this is causing the exception but it can't be good.

Is this a normal frontend instance or a managed vm?

The only other thing that could possibly shed some light on this is the class definition of the entity being rewritten. If you want to send that to me privately, I'll take a closer look.

This is pretty baffling because there's not much going on in this request. Just load/save one entity. I've never seen anything like it, and I'm drawing a blank on what could even potentially cause that NPE.

Jeff

On Thu, Jun 25, 2015 at 7:24 AM, Ivan Golubev <igol...@ea.com> wrote:
The exact exception is:
java.lang.NullPointerException at com.googlecode.objectify.impl.Round$1.nowUncached(Round.java:73) at com.googlecode.objectify.util.ResultCache.now(ResultCache.java:30) at com.googlecode.objectify.impl.TransactionImpl.commitAsync(TransactionImpl.java:61) at com.googlecode.objectify.impl.TransactionImpl.commit(TransactionImpl.java:45) at com.googlecode.objectify.impl.TransactorNo.transactOnce(TransactorNo.java:119) at com.googlecode.objectify.impl.TransactorNo.transactNew(TransactorNo.java:95) at com.googlecode.objectify.impl.TransactorNo.transact(TransactorNo.java:85) at com.googlecode.objectify.impl.ObjectifyImpl.transact(ObjectifyImpl.java:177) at com.ea.scrabble.task.objectifymigrate.ObjectifyMigrateServiceImpl.processEntityItem(ObjectifyMigrateServiceImpl.java:220) at com.ea.scrabble.task.objectifymigrate.ObjectifyMigrateServiceImpl.access$500(ObjectifyMigrateServiceImpl.java:43) at com.ea.scrabble.task.objectifymigrate.ObjectifyMigrateServiceImpl$4.executeTask(ObjectifyMigrateServiceImpl.java:95) at com.ea.scrabble.tasks.QueuedTaskServlet.doPost(QueuedTaskServlet.java:44) at javax.servlet.http.HttpServlet.service(HttpServlet.java:637) at javax.servlet.http.HttpServlet.service(HttpServlet.java:717) at com.google.inject.servlet.ServletDefinition.doService(ServletDefinition.java:263) at com.google.inject.servlet.ServletDefinition.service(ServletDefinition.java:178) at com.google.inject.servlet.ManagedServletPipeline.service(ManagedServletPipeline.java:91) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:62) at com.googlecode.objectify.cache.AsyncCacheFilter.doFilter(AsyncCacheFilter.java:59) at com.googlecode.objectify.ObjectifyFilter.doFilter(ObjectifyFilter.java:49) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:168) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:168) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.googlecode.objectify.cache.AsyncCacheFilter.doFilter(AsyncCacheFilter.java:59) at com.googlecode.objectify.ObjectifyFilter.doFilter(ObjectifyFilter.java:49) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:168) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at org.apache.shiro.web.servlet.AbstractShiroFilter.executeChain(AbstractShiroFilter.java:449) at org.apache.shiro.web.servlet.AbstractShiroFilter$1.call(AbstractShiroFilter.java:365) at org.apache.shiro.subject.support.SubjectCallable.doCall(SubjectCallable.java:90) at org.apache.shiro.subject.support.SubjectCallable.call(SubjectCallable.java:83) at org.apache.shiro.subject.support.DelegatingSubject.execute(DelegatingSubject.java:383) at org.apache.shiro.web.servlet.AbstractShiroFilter.doFilterInternal(AbstractShiroFilter.java:362) at org.apache.shiro.web.servlet.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:125) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) at com.google.inject.servlet.FilterChainInvocation.doFilter(Filte
...

Jeff Schnitzer

unread,
Jun 26, 2015, 2:51:22 PM6/26/15
to objectify...@googlegroups.com
I am totally baffled. I've checked out the 4.1.3 code and can't reproduce the issue. I can't even picture a code path that could produce it. Have you modified Objectify at all, or overridden any deep behavior?

The only thing I can think of is to turn on Objectify's logging in logging.properties:

com.googlecode.objectify.level = FINEST

...and paste the log from your task that processes one item. 

Jeff

--

Michael Walmsley

unread,
Sep 14, 2018, 7:28:29 PM9/14/18
to objectify-appengine
I know this is very old... but it took me a while to wrap my head around all of this.

Is this code correct?

@Log
@SuppressWarnings("serial")
public class UpdateServlet extends HttpServlet {


 
final Map<String, Class<? extends Updatable>> classesToUpdate = ImmutableMap
     
.<String, Class<? extends Updatable>>builder().put("user", User.class).put("images", ImageData.class).build();


 
final public static String TYPE_PARAM = "t";
 
final public static String SCATTER_RESERVED_PROPERTY = "__scatter__";
 
final public static int MAX_QUERY_COUNT = 5000;


 
public static void submitUpdateClassTask(String className) throws IOException {
   
Queue queue = QueueFactory.getDefaultQueue();
   
TaskOptions options = withUrl("/tasks/update").param(TYPE_PARAM, className);
    queue
.add(options);
 
}
 


 
/*
   * Update command must be run by admin user (non-Javadoc)
   *
   * @see
   * javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest,
   * javax.servlet.http.HttpServletResponse)
   */

 
public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
   
try {
     
// Ensure the udpate command is only run by admin users
     
User.getAdminUser(req);


     
String className = req.getParameter(TYPE_PARAM);
      submitUpdateClassTask
(className);


     
String updateMessage = "Submitted update task for " + className;
      log
.info(updateMessage);
      resp
.getWriter().write(updateMessage);
   
} catch (UnauthorizedException e) {
      resp
.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
      e
.printStackTrace();
   
}
 
}


 
 
// This method runs to update a class
 
public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
   
String className = req.getParameter(TYPE_PARAM);


   
Class<? extends Updatable> type = classesToUpdate.get(className);
    processEntityClass
(type);
 
}


 
 
private <C extends Updatable> void processEntityClass(Class<C> type) {
   
try {
     
/*

       * fetch entity keys using the "scatter" property, keys queries come at 1/7th
       * the price of a full fetch
       */

     
List<Key<C>> partitionPoints = ofy().load().type(type).order(SCATTER_RESERVED_PROPERTY)
         
.limit(MAX_QUERY_COUNT).keys().list();

     
      log
.warning("Partition points count: " + partitionPoints.size());
     
     
/* sort keys */
     
Collections.sort(partitionPoints);

     
     
//Add a task for each partition point
     
List<DeferredTask> tasks = new ArrayList<DeferredTask>();
     
Key<C> start = null;
     
     
for (Key<C> end : partitionPoints) {
        tasks
.add(new UpdateObjects<C>(type, start, end));
        start
= end;
     
}
     
      tasks
.add(new UpdateObjects<C>(type, start, null));
     
     
//Queue the tasks in batches
     
Queues.deflt().add(tasks);
     
      log
.warning(
         
String.format("Objectify update: all individual tasks for class %s have been submitted.", type.getName()));


     
   
} catch (Exception ex) {
      log
.log(Level.SEVERE, String.format("Objectify migration: something went wrong for class %s.", type.getName()),
          ex
);
   
}
 
}
 
 
 
public static class UpdateObjects<C extends Updatable> implements DeferredTask {
   
private Key<C> start;
   
private Key<C> end;
   
private Class<C> type;


   
public UpdateObjects(Class<C> type, Key<C> start, Key<C> end) {
     
this.start = start;
     
this.end = end;
     
this.type = type;
   
}


   
@Override
   
public void run() {
     
Query<C> query = ofy().load().type(type);
     
     
if (start != null) {
        query
= query.filterKey(">=", this.start);
     
}
     
     
if (end != null) {
        query
= query.filterKey("<", this.end);
     
}
     
     
List<Key<C>> keys = query.keys().list();
     
List<DeferredTask> tasks = new ArrayList<DeferredTask>();
     
     
for (Key<C> key : keys) {
        tasks
.add(new UpdateItem<C>(key));
     
}
     
     
Queues.deflt().add(tasks);
   
}
 
}
 
 
 
public static class UpdateItem<C extends Updatable> implements DeferredTask {
   
private Key<C> key;


   
public UpdateItem(Key<C> key) {
     
this.key = key;
   
}


   
@Override
   
public void run() {

      ofy
().transact(


         
new Work<VoidWork>() {
           
public VoidWork run() {
             
try {

                C t
= ofy().load().key(key).now();
                t
.updateData();
                ofy
().save().entity(t).now();
             
} catch (Exception e) {
                log
.warning("Objectify migration: failed to save individual item: " + key);
             
}
             
return null;
           
}
         
});
   
}
 
}
}

To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.

Jeff Schnitzer

unread,
Sep 20, 2018, 11:27:54 AM9/20/18
to objectify-appengine
It's a lot of code to review without a compiler, but it looks right to me.

I would put the mapper (UpdateObjects) on a different queue from the reducer (UpdateItem). You can overload a queue and that will cause retries; if you retry the mapper you'll end up with a lot of redundant UpdateItems. When doing big batch jobs I'll set the concurrency of the mapper to 1 and leave the reducer queue paused until the mapper is empty, just so I get a sanity check on the size of the reducer job.

The only other thing I'll say is that you can really clean that up with Java8 streams & StreamEx:

StreamEx.of(ofy().load().type(type).order(SCATTER_RESERVED_PROPERTY).limit(MAX_QUERY_COUNT).keys().list()).
    .sorted()
    .prepend(null)
    .append(null)
    .pairMap((left, right) -> new UpdateObjects<>(type, left, right))
    .collect(Collectors.toList());

I also used the Guava Range<?> class instead of passing left/right to my mapper class, but that's certainly not necessary.

Jeff
Reply all
Reply to author
Forward
0 new messages