Query<MatchEntity> query = ObjectifyService.ofy().load().type(MyEntity.class).limit(QUERY_CHUNK_SIZE);
query = query.filter("createdDate <=", createdDate);
--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
List<Key<Car>> chunk = ofy().load().type(Car.class).startAt(Cursor.fromWebSafeString(cursorStr)).limit(1000).keys().list();
for (final Key<Car> k: chunk)
ofy().transact(new Work<VoidWork>() {
public VoidWork run() {
ofy().save().entity(
ofy().load().entity(Car.class).key(k).now()
).now();
}
}
if ( chunk.isEmpty() )logger.warn("Car entity objectify migration complete.");elsequeue.add(url("/migrateObjetifyServlet").param("cursor", cursor.toWebSafeString()));
// 18 other cursor iterations
ofy().transact(new Work<VoidWork>() {
public VoidWork run() {
ofy().save().entity(
ofy().load().entity(Car.class).key(k).now()
).now();
}
}
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.
// A map function processes input values one at a time and generates zero output values.
MapOnlyMapper<Key<Car>, Void> mapper = new MapOnlyMapper<>() {
public void map(Key<Car> k) {
ofy().transact(new Work<VoidWork>() {
public VoidWork run() {
Thing thing = ofy().load().key(thingKey).now();
ofy().save().entity(thing).now();
}
});
}
};
Query<Car> query = ofy().load().type(Car.class);
MapReduceSpecification.of(
"Objectify 4 to 5 migration",
new DatastoreKeyInputReader(query),
mapper,
NoReducer.<Long, String, String>create(),
NoOutput.<String, String>create((int)1L)
);
--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.
This will be shorter than I would like sice I am typing on a phone...
There is pretty much one good way to reprocess this quantity of data.
1) get your system into such a state that all new entity writes go to the new format. I assume you have already done this.
2) fill a task queue with one task for every entity key. That task just loads and saves the entity.
Any other approach, especially approaches involving cursors, will cause you massive amounts of pain. Your queries will timeout and fail and you will find yourself in a retry loop with an iteration cycle measured in days.
#2 is a little tricky. Ideally you would just issue a keys only query and encode a task for each one but that won't work for 200M entities. Instead you must break that down.
Use the mechanism of the map/reduce library (but I would not bother with the actual library for this). There is a magic property attached to some % of entities called __scatter__. A keys-only query on this property gives you a random sampling of keys. If you query for 1000 keys and sort them, you can determine effective partition values - you will want maybe 100 partitions.
Enqueue a task for each partition. That task queries for keys bounded by the partition and enqueues the final read/write task for each key.
This may sound complicated but it is really just a few lines of code. Too much for me to type on a phone though.
You can fill the reducer queue in a few minutes. Processing all the entities is just a question of waiting for the queues to drain. At 500/s you should be done in about 5 days.
Cost wise, if this doesn't update any indexes, you'll pretty much pay one read plus one write.
Suerte,
Jeff
List<Key<Item>> partition_points = ofy().load().type(Item.class).order(SCATTER_RESERVED_PROPERTY).limit(5000).keys().list();
List<Key<Item>> keys_in_range =ofy().load().type(Item.class).offset(range_start).limit(100).keys().list();
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.
You are close. Don't use offset to extract the partition. Use greater than and less than filters on __key__
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.
Oh one more thing,which you asked about - the queries for the partition ends need to be open-ended.
Won't be in front of a real keyboard till thr but if that doesnt make sense I can elaborate.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.
Exactly
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.
private List<Class> classesToMigrate = ImmutableList.<Class>builder().add(
User.class,
...
).build();
public void submitObjectifyMigrationClassTasks() {
/* submit tasks for each instance type with delays */
for (Class c: classesToMigrate) {
taskService.queueTask(OBJECTIFY_MIGRATION_TASK_TYPE, c.getCanonicalName(), objectifyMigrateQueue);
log.warning(String.format("Objectify migration task for class %s has been submitted.", c.getName()));
}
}
private <C> void processEntityClass(Class<C> clazz) {
try {
/* fetch entity keys using the "scatter" property, keys queries come at 1/7th the price of a full fetch */
List<Key<C>> partitionPoints = ofy().load().type(clazz).order(SCATTER_RESERVED_PROPERTY).limit(5000).keys().list();
if ( !partitionPoints.isEmpty() ) {
processKeysWithPartitionPoints(clazz, partitionPoints);
} else {
/* if not partition points found */
processKeysWithoutPartitionPoints(clazz);
}
} catch (Exception ex) {
log.log(Level.SEVERE, String.format("Objectify migration: something went wrong for class %s.", clazz.getName()), ex);
}
}
private <C> void processKeysWithoutPartitionPoints(Class<C> clazz) throws Exception {
if (ofy().load().type(clazz).count() == 0) {
log.warning(String.format("Nothing to migrate for class %s !", clazz.getName()));
} else {
/* no scatter points found - fetch all the keys for this class */
if (ofy().load().type(clazz).count() > 5000) {
throw new Exception(
String.format("Assertion error: objectify failed to mark %s entities with __scatter__ property.", clazz.getName()));
}
List<Key<C>> keys = ofy().load().type(clazz).limit(5000).keys().list();
submitIndividualKeyTasks(keys);
}
}
private <C> void processKeysWithPartitionPoints(Class<C> clazz, List<Key<C>> partitionPoints) {
log.warning("Partition points count: " + partitionPoints.size());
/* sort keys */
Collections.sort(partitionPoints);
/* build ranges (partition) */
LinkedList<Range<Key<C>>> ranges = toTuples(partitionPoints);
/* add starting and ending ranges */
ranges.addFirst(
new Range<>(
null, partitionPoints.get(0)
)
);
ranges.addLast(
new Range<>(
partitionPoints.get(partitionPoints.size() - 1), null
)
);
log.warning("Ranges count: " + ranges.size());
int totalKeysToProcess = 0;
/* fetch keys in each range and submit individual tasks */
for (Range<Key<C>> range : ranges) {
List<Key<C>> keys = getKeysInRange(range, clazz);
submitIndividualKeyTasks(keys);
totalKeysToProcess += keys.size();
}
log.warning(
String.format("Total keys to process (%d) equals entity count: %b",
totalKeysToProcess,
totalKeysToProcess == ofy().load().type(clazz).count()
)
);
log.warning(String.format("Objectify migration: all individual tasks for class %s have been submitted.", clazz.getName()));
}
private <T> List<Key<T>> getKeysInRange(Range<Key<T>> tuple, Class<T> clazz) {
List<Key<T>> keys;
if (tuple.start == null) {
keys = ofy().load().type(clazz).filterKey("<", tuple.end).keys().list();
}
else if (tuple.end == null) {
keys = ofy().load().type(clazz).filterKey(">=", tuple.start).keys().list();
}
else {
keys = ofy().load().type(clazz).filterKey(">=", tuple.start).filterKey("<", tuple.end).keys().list();
}
int size = (keys != null) ? keys.size() : 0;
log.warning(String.format("Keys count in range %s: %s.", tuple.toString(), size));
return keys;
}
private <T> void submitIndividualKeyTasks(List<Key<T>> keys) {
for (Key<T> k: keys)
taskService.queueTask(OBJECTIFY_MIGRATION_INDIVIDUAL_ITEM_TASK_TYPE, gson.toJson(k), objectifyMigrateItemQueue);
}
private <T> void processEntityItem(final Key<T> itemKey) {
ofy().transact(
new Work<VoidWork>() {
public VoidWork run() {
try {
T t = ofy().load().key(itemKey).now();
ofy().save().entity(t).now();
} catch (Exception e) {
log.warning("Objectify migration: failed to save individual item: " + itemKey);
}
return null;
}
}
);
}private <T> LinkedList<Range<Key<T>>> toTuples(List<Key<T>> list) { ... }private class Range<A> { A start, end; ... }
Exactly
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.
@Data
public static class QueueHelper {
private final Queue queue;
public void add(DeferredTask payload) {
this.add(null, payload);
}
public void add(Transaction txn, DeferredTask payload) {
if (log.isDebugEnabled())
log.debug("Queue '" + queue.getQueueName() + "' adding " + payload);
queue.add(txn, TaskOptions.Builder.withPayload(payload));
}
/** Allows any number of tasks; automatically partitions as necessary */
public void add(Iterable<? extends DeferredTask> payloads) {
Iterable<TaskOptions> opts = Iterables.transform(payloads, new Function<DeferredTask, TaskOptions>() {
@Override
public TaskOptions apply(DeferredTask task) {
return TaskOptions.Builder.withPayload(task);
}
});
Iterable<List<TaskOptions>> partitioned = Iterables.partition(opts, QueueConstants.maxTasksPerAdd());
for (List<TaskOptions> piece: partitioned)
queue.add(null, piece);
}
}
Exactly
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "objectify-appengine" group.
To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appen...@googlegroups.com.
package com.example;
import com.google.appengine.api.datastore.Transaction;
import com.google.appengine.api.taskqueue.DeferredTask;
import com.google.appengine.api.taskqueue.Queue;
import com.google.appengine.api.taskqueue.QueueConstants;
import com.google.appengine.api.taskqueue.QueueFactory;
import com.google.appengine.api.taskqueue.TaskOptions;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* Convenience methods for adding to the appengine queues.
*/
@Slf4j
public class Queues
{
/** Just a slightly more convenient interface for our purposes */
@Data
public static class QueueHelper {
private final Queue queue;
public void add(DeferredTask payload) {
this.add(null, payload);
}
public void add(Transaction txn, DeferredTask payload) {
if (log.isDebugEnabled())
log.debug("Queue '" + queue.getQueueName() + "' adding " + payload);
queue.add(txn, TaskOptions.Builder.withPayload(payload));
}
/** Allows any number of tasks; automatically partitions as necessary */
public void add(Iterable<? extends DeferredTask> payloads) {
Iterable<TaskOptions> opts = Iterables.transform(payloads, new Function<DeferredTask, TaskOptions>() {
@Override
public TaskOptions apply(DeferredTask task) {
return TaskOptions.Builder.withPayload(task);
}
});
Iterable<List<TaskOptions>> partitioned = Iterables.partition(opts, QueueConstants.maxTasksPerAdd());
for (List<TaskOptions> piece: partitioned)
queue.add(null, piece);
}
}
/** The default queue ('default' is a java keyword, oops) */
public static QueueHelper deflt() {
return new QueueHelper(QueueFactory.getDefaultQueue());
}
/** */
public static QueueHelper email() {
return new QueueHelper(QueueFactory.getQueue("email"));
}
}
--
One oddity is that I see the ObjectifyFilter appearing twice in that stack. I don't think this is causing the exception but it can't be good.Is this a normal frontend instance or a managed vm?The only other thing that could possibly shed some light on this is the class definition of the entity being rewritten. If you want to send that to me privately, I'll take a closer look.This is pretty baffling because there's not much going on in this request. Just load/save one entity. I've never seen anything like it, and I'm drawing a blank on what could even potentially cause that NPE.JeffOn Thu, Jun 25, 2015 at 7:24 AM, Ivan Golubev <igol...@ea.com> wrote:The exact exception is:java.lang.NullPointerException at com.googlecode.objectify.impl.Round$1.nowUncached(Round.java:73) at com.googlecode.objectify.util.ResultCache.now(ResultCache.java:30) at com.googlecode.objectify.impl.TransactionImpl.commitAsync(TransactionImpl.java:61) at com.googlecode.objectify.impl.TransactionImpl.commit(TransactionImpl.java:45) at com.googlecode.objectify.impl.TransactorNo.transactOnce(TransactorNo.java:119) at com.googlecode.objectify.impl.TransactorNo.transactNew(TransactorNo.java:95) at com.googlecode.objectify.impl.TransactorNo.transact(TransactorNo.java:85) at com.googlecode.objectify.impl.ObjectifyImpl.transact(ObjectifyImpl.java:177) at com.ea.scrabble.task.objectifymigrate.ObjectifyMigrateServiceImpl.processEntityItem(ObjectifyMigrateServiceImpl.java:220) at com.ea.scrabble.task.objectifymigrate.ObjectifyMigrateServiceImpl.access$500(ObjectifyMigrateServiceImpl.java:43) at com.ea.scrabble.task.objectifymigrate.ObjectifyMigrateServiceImpl$4.executeTask(ObjectifyMigrateServiceImpl.java:95) at com.ea.scrabble.tasks.QueuedTaskServlet.doPost(QueuedTaskServlet.java:44) at javax.servlet.http.HttpServlet.service(HttpServlet.java:637) at javax.servlet.http.HttpServlet.service(HttpServlet.java:717) at com.google.inject.servlet.ServletDefinition.doService(ServletDefinition.java:263) at com.google.inject.servlet.ServletDefinition.service(ServletDefinition.java:178) at com.google.inject.servlet.ManagedServletPipeline.service(ManagedServletPipeline.java:91) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:62) at com.googlecode.objectify.cache.AsyncCacheFilter.doFilter(AsyncCacheFilter.java:59) at com.googlecode.objectify.ObjectifyFilter.doFilter(ObjectifyFilter.java:49) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:168) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:168) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.googlecode.objectify.cache.AsyncCacheFilter.doFilter(AsyncCacheFilter.java:59) at com.googlecode.objectify.ObjectifyFilter.doFilter(ObjectifyFilter.java:49) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:168) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at org.apache.shiro.web.servlet.AbstractShiroFilter.executeChain(AbstractShiroFilter.java:449) at org.apache.shiro.web.servlet.AbstractShiroFilter$1.call(AbstractShiroFilter.java:365) at org.apache.shiro.subject.support.SubjectCallable.doCall(SubjectCallable.java:90) at org.apache.shiro.subject.support.SubjectCallable.call(SubjectCallable.java:83) at org.apache.shiro.subject.support.DelegatingSubject.execute(DelegatingSubject.java:383) at org.apache.shiro.web.servlet.AbstractShiroFilter.doFilterInternal(AbstractShiroFilter.java:362) at org.apache.shiro.web.servlet.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:125) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) at com.google.inject.servlet.FilterChainInvocation.doFilter(Filte
...
--
@Log
@SuppressWarnings("serial")
public class UpdateServlet extends HttpServlet {
final Map<String, Class<? extends Updatable>> classesToUpdate = ImmutableMap
.<String, Class<? extends Updatable>>builder().put("user", User.class).put("images", ImageData.class).build();
final public static String TYPE_PARAM = "t";
final public static String SCATTER_RESERVED_PROPERTY = "__scatter__";
final public static int MAX_QUERY_COUNT = 5000;
public static void submitUpdateClassTask(String className) throws IOException {
Queue queue = QueueFactory.getDefaultQueue();
TaskOptions options = withUrl("/tasks/update").param(TYPE_PARAM, className);
queue.add(options);
}
/*
* Update command must be run by admin user (non-Javadoc)
*
* @see
* javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest,
* javax.servlet.http.HttpServletResponse)
*/
public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
try {
// Ensure the udpate command is only run by admin users
User.getAdminUser(req);
String className = req.getParameter(TYPE_PARAM);
submitUpdateClassTask(className);
String updateMessage = "Submitted update task for " + className;
log.info(updateMessage);
resp.getWriter().write(updateMessage);
} catch (UnauthorizedException e) {
resp.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
e.printStackTrace();
}
}
// This method runs to update a class
public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
String className = req.getParameter(TYPE_PARAM);
Class<? extends Updatable> type = classesToUpdate.get(className);
processEntityClass(type);
}
private <C extends Updatable> void processEntityClass(Class<C> type) {
try {
/*
* fetch entity keys using the "scatter" property, keys queries come at 1/7th
* the price of a full fetch
*/
List<Key<C>> partitionPoints = ofy().load().type(type).order(SCATTER_RESERVED_PROPERTY)
.limit(MAX_QUERY_COUNT).keys().list();
log.warning("Partition points count: " + partitionPoints.size());
/* sort keys */
Collections.sort(partitionPoints);
//Add a task for each partition point
List<DeferredTask> tasks = new ArrayList<DeferredTask>();
Key<C> start = null;
for (Key<C> end : partitionPoints) {
tasks.add(new UpdateObjects<C>(type, start, end));
start = end;
}
tasks.add(new UpdateObjects<C>(type, start, null));
//Queue the tasks in batches
Queues.deflt().add(tasks);
log.warning(
String.format("Objectify update: all individual tasks for class %s have been submitted.", type.getName()));
} catch (Exception ex) {
log.log(Level.SEVERE, String.format("Objectify migration: something went wrong for class %s.", type.getName()),
ex);
}
}
public static class UpdateObjects<C extends Updatable> implements DeferredTask {
private Key<C> start;
private Key<C> end;
private Class<C> type;
public UpdateObjects(Class<C> type, Key<C> start, Key<C> end) {
this.start = start;
this.end = end;
this.type = type;
}
@Override
public void run() {
Query<C> query = ofy().load().type(type);
if (start != null) {
query = query.filterKey(">=", this.start);
}
if (end != null) {
query = query.filterKey("<", this.end);
}
List<Key<C>> keys = query.keys().list();
List<DeferredTask> tasks = new ArrayList<DeferredTask>();
for (Key<C> key : keys) {
tasks.add(new UpdateItem<C>(key));
}
Queues.deflt().add(tasks);
}
}
public static class UpdateItem<C extends Updatable> implements DeferredTask {
private Key<C> key;
public UpdateItem(Key<C> key) {
this.key = key;
}
@Override
public void run() {
ofy().transact(
new Work<VoidWork>() {
public VoidWork run() {
try {
C t = ofy().load().key(key).now();
t.updateData();
ofy().save().entity(t).now();
} catch (Exception e) {
log.warning("Objectify migration: failed to save individual item: " + key);
}
return null;
}
});
}
}
}To unsubscribe from this group and stop receiving emails from it, send an email to objectify-appengine+unsub...@googlegroups.com.