[oppia] push by s...@google.com - Preliminary commit for jobs functionality with tests. Not yet ready fo... on 2014-05-07 05:41 GMT

0 views
Skip to first unread message

op...@googlecode.com

unread,
May 7, 2014, 1:42:06 AM5/7/14
to oppia-de...@googlegroups.com
Revision: 9e6be1406452
Author: Sean Lip <s...@google.com>
Date: Wed May 7 05:41:48 2014 UTC
Log: Preliminary commit for jobs functionality with tests. Not yet
ready for check-in.

http://code.google.com/p/oppia/source/detail?r=9e6be1406452

Modified:
/app.yaml
/core/jobs.py
/core/jobs_test.py
/core/storage/base_model/gae_models.py
/core/storage/job/gae_models.py
/core/tests/gae_suite.py
/core/tests/test_utils.py
/utils.py

=======================================
--- /app.yaml Mon May 5 20:21:45 2014 UTC
+++ /app.yaml Wed May 7 05:41:48 2014 UTC
@@ -10,6 +10,7 @@

builtins:
- appstats: on
+- deferred: on
- remote_api: on

inbound_services:
=======================================
--- /core/jobs.py Thu Apr 3 07:24:39 2014 UTC
+++ /core/jobs.py Wed May 7 05:41:48 2014 UTC
@@ -18,6 +18,9 @@

__author__ = 'Sean Lip'

+import logging
+import time
+import traceback

from core.platform import models
(job_models,) = models.Registry.import_models([models.NAMES.job])
@@ -29,41 +32,77 @@
STATUS_CODE_STARTED = job_models.STATUS_CODE_STARTED
STATUS_CODE_COMPLETED = job_models.STATUS_CODE_COMPLETED
STATUS_CODE_FAILED = job_models.STATUS_CODE_FAILED
+STATUS_CODE_CANCELED = job_models.STATUS_CODE_CANCELED

VALID_STATUS_CODE_TRANSITIONS = {
STATUS_CODE_NEW: [STATUS_CODE_QUEUED],
- STATUS_CODE_QUEUED: [STATUS_CODE_QUEUED, STATUS_CODE_STARTED],
+ STATUS_CODE_QUEUED: [STATUS_CODE_STARTED, STATUS_CODE_CANCELED],
STATUS_CODE_STARTED: [
- STATUS_CODE_QUEUED, STATUS_CODE_COMPLETED, STATUS_CODE_FAILED
- ],
- STATUS_CODE_COMPLETED: [STATUS_CODE_QUEUED],
- STATUS_CODE_FAILED: [STATUS_CODE_QUEUED],
+ STATUS_CODE_COMPLETED, STATUS_CODE_FAILED, STATUS_CODE_CANCELED],
+ STATUS_CODE_COMPLETED: [],
+ STATUS_CODE_FAILED: [],
+ STATUS_CODE_CANCELED: [],
}
+
+
+class PermanentTaskFailure(Exception):
+ pass


class BaseJob(object):
- """A class that represents a durable job at runtime.
+ """Base class for durable jobs.

- Note that only one instance of a job should actually be running at a
- particular time.
+ Each subclass of this class defines a different type of durable job.
+ Instances of each subclass each define a single job.
"""
# IMPORTANT! This should be set to True for classes which actually
define
# jobs (as opposed to abstract base superclasses).
IS_VALID_JOB_CLASS = False

- _job_id = None
- execution_time_sec = None
- status_code = None
- output = None
+ def __init__(self, job_id):
+ self._job_id = job_id
+ self.execution_time_sec = None
+ self.status_code = None
+ self.output = None

- def __init__(self):
- if not self.IS_VALID_JOB_CLASS:
+ def _post_failure_hook(self):
+ """Run after a job has failed. Can be overwritten by subclasses."""
+ pass
+
+ def _run_job(self):
+ logging.info('Job started')
+ time_started = time.time()
+
+ result = ''
+ try:
+ self.mark_started()
+ result = self.run()
+ self.mark_completed(time.time() - time_started, result)
+ logging.info('Job completed')
+ except Exception as e:
+ logging.error(traceback.format_exc())
+ logging.error('Job failed')
+ self.mark_failed(time.time() - time_started, unicode(e))
+ self._failure_hook()
+ raise PermanentTaskFailure(
+ '%s\n%s' % (unicode(e), traceback.format_exc()))
+
+ def enqueue(self):
+ self.mark_queued()
+ self._run_job()
+
+ @classmethod
+ def create_new(cls):
+ if not cls.IS_VALID_JOB_CLASS:
raise Exception(
'Tried to directly initialize a job using the abstract
base '
- 'class %s, which is not allowed.' %
self.__class__.__name__)
+ 'class %s, which is not allowed.' % cls.__name__)
+
+ job_id = job_models.JobModel.get_new_id(cls.__name__)
+ job_model = job_models.JobModel(id=job_id)
+ job_model.put()

- self._job_id = 'job-%s' % self.__class__.__name__
- self._reload_from_datastore()
+ return cls(job_id)

def _reload_from_datastore(self):
"""Loads the last known state of this job from the datastore."""
@@ -131,7 +170,32 @@
self.output = output
return self._update_status()

+ def mark_canceled(self, execution_time_sec, message):
+ self.execution_time_sec = execution_time_sec
+ self.status_code = STATUS_CODE_CANCELED
+ self.output = message
+ return self._update_status()
+
@property
def is_active(self):
self._reload_from_datastore()
return self.status_code in [STATUS_CODE_QUEUED,
STATUS_CODE_STARTED]
+
+ @property
+ def has_finished(self):
+ self._reload_from_datastore()
+ return self.status_code in [STATUS_CODE_COMPLETED,
STATUS_CODE_FAILED]
+
+ def cancel(self, user_id):
+ message = 'Canceled by %s' % (user_id or 'system')
+ # TODO(sll): Redo this.
+ execution_time_sec = time.time() - 0
+ # Do job-specific cancellation work outside the transaction.
+ self._cancel_queued_work()
+ # Update the job record.
+ transaction_services.run_in_transaction(
+ self.mark_canceled, execution_time_sec, message)
+
+ def _cancel_queued_work(self, job_id, message):
+ """Override in subclasses to cancel work outside the
transaction."""
+ pass
=======================================
--- /core/jobs_test.py Thu Apr 3 07:24:39 2014 UTC
+++ /core/jobs_test.py Wed May 7 05:41:48 2014 UTC
@@ -18,10 +18,12 @@

__author__ = 'Sean Lip'

-
from core import jobs
import test_utils

+from google.appengine.ext import deferred
+from google.appengine.ext import ndb
+

class JobModelUnitTests(test_utils.GenericTestBase):
"""Tests for the tracking of job status in the datastore."""
@@ -32,10 +34,10 @@
class AnotherDummyJob(jobs.BaseJob):
IS_VALID_JOB_CLASS = True

- def test_create_new_job(self):
+ def test_create_new(self):
"""Test the creation of a new job."""
- new_job = self.DummyJob()
- self.assertEqual(new_job._job_id, 'job-DummyJob')
+ new_job = self.DummyJob.create_new()
+ self.assertTrue(new_job._job_id.startswith('DummyJob'))
self.assertIsNone(new_job.execution_time_sec)
self.assertIsNone(new_job.status_code)
self.assertIsNone(new_job.output)
@@ -43,7 +45,7 @@

def test_enqueue_new_job(self):
"""Test the enqueueing of a new job."""
- new_job = self.DummyJob()
+ new_job = self.DummyJob.create_new()
new_job.mark_queued()
self.assertTrue(new_job.is_active)
self.assertEqual(new_job.execution_time_sec, 0)
@@ -51,7 +53,7 @@
self.assertIsNone(new_job.output)

def test_job_completion(self):
- new_job = self.DummyJob()
+ new_job = self.DummyJob.create_new()
new_job.mark_queued()

new_job.mark_started()
@@ -63,7 +65,7 @@
self.assertEqual(new_job.output, 'output')

def test_job_failure(self):
- new_job = self.DummyJob()
+ new_job = self.DummyJob.create_new()
new_job.mark_queued()

new_job.mark_started()
@@ -76,49 +78,22 @@
self.assertEqual(new_job.output, 'output')
self.assertFalse(new_job.is_active)

- def test_job_restart(self):
- new_job = self.DummyJob()
- new_job.mark_queued()
- new_job.mark_started()
-
- new_job.mark_queued()
- self.assertEqual(new_job.execution_time_sec, 0)
- self.assertEqual(new_job.status_code, jobs.STATUS_CODE_QUEUED)
- self.assertIsNone(new_job.output)
-
- new_job.mark_started()
- new_job.mark_failed(20, 'output')
- self.assertEqual(new_job.status_code, jobs.STATUS_CODE_FAILED)
-
- new_job.mark_queued()
- self.assertEqual(new_job.execution_time_sec, 0)
- self.assertEqual(new_job.status_code, jobs.STATUS_CODE_QUEUED)
- self.assertIsNone(new_job.output)
-
def test_status_code_transitions(self):
"""Test that invalid status code transitions are caught."""
-
- new_job = self.DummyJob()
- with self.assertRaisesRegexp(Exception, 'Job was not created'):
- new_job.mark_started()
- with self.assertRaisesRegexp(Exception, 'Job was not created'):
- new_job.mark_completed(20, 'output')
- with self.assertRaisesRegexp(Exception, 'Job was not created'):
- new_job.mark_failed(20, 'output')
-
+ new_job = self.DummyJob.create_new()
new_job.mark_queued()
+ new_job.mark_started()
+ new_job.mark_completed(20, 'output')
+ with self.assertRaisesRegexp(Exception, 'Invalid status code
change'):
+ new_job.mark_queued()
with self.assertRaisesRegexp(Exception, 'Invalid status code
change'):
new_job.mark_completed(20, 'output')
with self.assertRaisesRegexp(Exception, 'Invalid status code
change'):
new_job.mark_failed(20, 'output')

- new_job.mark_started()
- new_job.mark_completed(20, 'output')
- new_job.mark_queued()
-
def test_different_jobs_are_independent(self):
- new_job = self.DummyJob()
- another_job = self.AnotherDummyJob()
+ new_job = self.DummyJob.create_new()
+ another_job = self.AnotherDummyJob.create_new()

new_job.mark_queued()
new_job.mark_started()
@@ -128,21 +103,92 @@
self.assertEqual(new_job.status_code, jobs.STATUS_CODE_FAILED)
self.assertEqual(another_job.status_code, jobs.STATUS_CODE_QUEUED)

- def test_one_job_runs_per_class(self):
- # This tests that different instances of a job class refer to the
same
- # backend job.
- new_job = self.DummyJob()
- new_job.mark_queued()
- new_job.mark_started()
-
- same_job = self.DummyJob()
- self.assertEqual(same_job.status_code, jobs.STATUS_CODE_STARTED)
-
def test_can_only_instantiate_valid_jobs(self):
- new_job = self.DummyJob()
+ new_job = self.DummyJob.create_new()
new_job.mark_queued()
new_job.mark_started()

with self.assertRaisesRegexp(
Exception, 'Tried to directly initialize'):
- jobs.BaseJob()
+ jobs.BaseJob.create_new()
+
+
+TEST_INPUT_DATA = [(1, 2), (3, 4), (1, 5)]
+SUM_MODEL_ID = 'all_data_id'
+
+
+class NumbersModel(ndb.Model):
+ number = ndb.IntegerProperty()
+
+
+class SumModel(ndb.Model):
+ total = ndb.IntegerProperty(default=0)
+
+
+class TestJob(jobs.BaseJob):
+ """Base class for test jobs."""
+ IS_VALID_JOB_CLASS = True
+
+ def enqueue(self):
+ """Overwrites the superclass to use the deferred task queue
stub."""
+ self.mark_queued()
+ deferred.defer(self._run_job)
+
+
+class TestAdditionJob(TestJob):
+ """Test job that sums all NumbersModel data.
+
+ The result is stored in a SumModel entity with id SUM_MODEL_ID.
+ """
+ def run(self):
+ total = sum([
+ numbers_model.number for numbers_model in
NumbersModel.query()])
+ SumModel(id=SUM_MODEL_ID, total=total).put()
+
+
+class JobLifecycleUnitTests(test_utils.GenericTestBase):
+ """Tests a real job."""
+
+ def _get_stored_total(self):
+ sum_model = SumModel.get_by_id(SUM_MODEL_ID)
+ return sum_model.total if sum_model else 0
+
+ def _populate_data(self):
+ """Populate the datastore with four NumbersModel instances."""
+ NumbersModel(number=1).put()
+ NumbersModel(number=2).put()
+ NumbersModel(number=1).put()
+ NumbersModel(number=2).put()
+
+ def test_addition_job_workflow(self):
+ self._populate_data()
+ self.assertEqual(self._get_stored_total(), 0)
+
+ TestAdditionJob.create_new().enqueue()
+ self.assertEqual(self.count_jobs_in_taskqueue(), 1)
+ self.process_and_flush_pending_tasks()
+ self.assertEqual(self._get_stored_total(), 6)
+
+ NumbersModel(number=3).put()
+
+ TestAdditionJob.create_new().enqueue()
+ self.assertEqual(self.count_jobs_in_taskqueue(), 1)
+ self.process_and_flush_pending_tasks()
+ self.assertEqual(self._get_stored_total(), 9)
+
+ def test_two_jobs(self):
+ self._populate_data()
+ TestAdditionJob.create_new().enqueue()
+
+ NumbersModel(number=3).put()
+ TestAdditionJob.create_new().enqueue()
+
+ self.assertEqual(self.count_jobs_in_taskqueue(), 2)
+ self.process_and_flush_pending_tasks()
+ self.assertEqual(self._get_stored_total(), 9)
+
+ def test_error_case(self):
+ job = TestAdditionJob.create_new()
+ job.enqueue()
+ with self.assertRaisesRegexp(Exception, 'Invalid status code
change'):
+ job.enqueue()
=======================================
--- /core/storage/base_model/gae_models.py Mon Mar 31 07:16:42 2014 UTC
+++ /core/storage/base_model/gae_models.py Wed May 7 05:41:48 2014 UTC
@@ -28,9 +28,9 @@
"""Base model for all persistent object storage classes."""

# When this entity was first created.
- created_on = ndb.DateTimeProperty(auto_now_add=True)
+ created_on = ndb.DateTimeProperty(auto_now_add=True, indexed=True)
# When this entity was last updated.
- last_updated = ndb.DateTimeProperty(auto_now=True)
+ last_updated = ndb.DateTimeProperty(auto_now=True, indexed=True)
# Whether the current version of the file is deleted.
deleted = ndb.BooleanProperty(indexed=True, default=False)

=======================================
--- /core/storage/job/gae_models.py Sun Mar 30 09:35:43 2014 UTC
+++ /core/storage/job/gae_models.py Wed May 7 05:41:48 2014 UTC
@@ -18,8 +18,11 @@

__author__ = 'Sean Lip'

+import random
+
from core.platform import models
(base_models,) = models.Registry.import_models([models.NAMES.base_model])
+import utils

from google.appengine.ext import ndb

@@ -30,24 +33,29 @@
STATUS_CODE_STARTED = 2
STATUS_CODE_COMPLETED = 3
STATUS_CODE_FAILED = 4
+STATUS_CODE_CANCELED = 5


class JobModel(base_models.BaseModel):
- """Class representing a datastore entity for a long-running job.
+ """Class representing a datastore entity for a long-running job."""

- The id of a job is, by default, its class name. Note that this means
that,
- for each type of job, only one instance may run at a particular time.
- """
+ @classmethod
+ def get_new_id(cls, entity_name):
+ """Overwrites superclass method."""
+ job_name = entity_name
+ current_time_str = str(int(utils.get_current_time_in_millisecs()))
+ random_int = random.randint(0, 1000)
+ return '%s-%s-%s' % (job_name, current_time_str, random_int)

# The execution time of the job, in seconds.
- execution_time_sec = ndb.IntegerProperty(indexed=False)
+ execution_time_sec = ndb.FloatProperty(indexed=False)
# The current status code for the job.
status_code = ndb.IntegerProperty(
- indexed=False,
+ indexed=True,
default=STATUS_CODE_NEW,
choices=[
STATUS_CODE_NEW, STATUS_CODE_QUEUED, STATUS_CODE_STARTED,
- STATUS_CODE_COMPLETED, STATUS_CODE_FAILED
+ STATUS_CODE_COMPLETED, STATUS_CODE_FAILED, STATUS_CODE_CANCELED
])
# The output of the job.
output = ndb.TextProperty(indexed=False)
=======================================
--- /core/tests/gae_suite.py Sat Apr 26 02:45:00 2014 UTC
+++ /core/tests/gae_suite.py Wed May 7 05:41:48 2014 UTC
@@ -65,7 +65,7 @@

import feconf

-EXPECTED_TEST_COUNT = 268
+EXPECTED_TEST_COUNT = 269


_PARSER = argparse.ArgumentParser()
=======================================
--- /core/tests/test_utils.py Mon Mar 10 07:58:21 2014 UTC
+++ /core/tests/test_utils.py Wed May 7 05:41:48 2014 UTC
@@ -254,7 +254,8 @@
self.testbed.init_memcache_stub()
self.testbed.init_datastore_v3_stub(consistency_policy=policy)
self.testbed.init_taskqueue_stub()
- self.taskq = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)
+ self.taskqueue_stub = self.testbed.get_stub(
+ testbed.TASKQUEUE_SERVICE_NAME)

# Set up the app to be tested.
self.testapp = webtest.TestApp(main.app)
@@ -263,6 +264,20 @@
os.environ['USER_IS_ADMIN'] = '0'
self.testbed.deactivate()

+ def count_jobs_in_taskqueue(self):
+ return len(self.taskqueue_stub.get_filtered_tasks())
+
+ def process_and_flush_pending_tasks(self):
+ from google.appengine.ext import deferred
+
+ tasks = self.taskqueue_stub.get_filtered_tasks()
+ self.taskqueue_stub.FlushQueue('default')
+ while tasks:
+ for task in tasks:
+ deferred.run(task.payload)
+ tasks = self.taskqueue_stub.get_filtered_tasks()
+ self.taskqueue_stub.FlushQueue('default')
+

if feconf.PLATFORM == 'gae':
GenericTestBase = AppEngineTestBase
=======================================
--- /utils.py Mon Apr 21 08:42:13 2014 UTC
+++ /utils.py Wed May 7 05:41:48 2014 UTC
@@ -17,6 +17,7 @@
__author__ = 's...@google.com (Sean Lip)'

import base64
+import datetime
import hashlib
import json
import os
@@ -323,3 +324,12 @@
datetime_obj: An object of type datetime.datetime.
"""
return time.mktime(datetime_obj.timetuple()) * 1000
+
+
+def get_current_time_in_millisecs():
+ """Returns time in milliseconds since the Epoch.
+
+ Args:
+ datetime_obj: An object of type datetime.datetime.
+ """
+ return get_time_in_millisecs(datetime.datetime.utcnow())
Reply all
Reply to author
Forward
0 new messages