Diff for "Foundations/JobSystem"

Not logged in - Log In / Register

Differences between revisions 18 and 19
Revision 18 as of 2010-08-02 16:50:48
Size: 10865
Editor: abentley
Comment:
Revision 19 as of 2010-08-02 17:08:39
Size: 9403
Editor: abentley
Comment:
Deletions are marked like this. Additions are marked like this.
Line 54: Line 54:

Now we move on to implementing the behaviour of our specific type, DO_FOO.
Line 81: Line 79:
This doesn't actually depend on the behaviour of the job, and in fact
the variables are applicable to any FooJob, so getOopsVars on FooJobDerived
could implement that. That is in fact what we want, but we can't test
FooJobDerived directly as we can't create instances of it, therefore we
test it via DoFooJob.

Firstly then, add a method to FooJobDerived.

Firstly then, add a method to DoFooJob.
Line 101: Line 94:
This won't let the test pass though, as we still need DoFooJob.
Line 126: Line 117:
class DoFooJob(FooJobDerived): class DoFooJob():
Line 136: Line 127:
        return super(DoFooJob, cls).create(foo)
Line 145: Line 136:
of each type for each foo then you need a bit more code. Add to DoFooJobTests: of each type for each foo then you need a bit more code. In this case, it
makes sense to rename `create` since it no longer guarantees it will create
something.

Add to DoFooJobTests:
Line 156: Line 151:
    def test_create_only_creates_one(self):     def test_acquire_only_acquires_one(self):
Line 160: Line 155:
        job = DoFooJob.create(foo)         job = DoFooJob.acquire(foo)
Line 165: Line 160:
        new_job = DoFooJob.create(foo)         new_job = DoFooJob.acquire(foo)
Line 174: Line 169:
Line 178: Line 174:
    def create(cls, foo):     def acquire(cls, foo):
Line 185: Line 181:
            FooJob.job_type == cls.class_job_type,
Line 193: Line 188:
            return super(DoFooJob, cls).create(foo)             return DoFooJob()
Line 282: Line 277:
== Attaching metdata ==

We defined a JSON metadata field in the schema, but we haven't made
any use of it yet.

To do that first change FooJobDerived.create to be

{{{
    @classmethod
    def create(cls, foo, metadata=None):
        """See `IFooJob`."""
        if metadata is None:
            metadata = {}
        job = FooJob(foo, cls.class_job_type, metadata)
        return cls(job)
}}}

Thereby allowing us to create FooJobs with metadata.

In the subclasses, e.g. DoFooJob, you can have the create
method pass a json-compatible object to the up-called create
classmethod. This can either be passed directly from an
argument, or more likely be a dict or similar formed from
specific parameters, e.g.

{{{
    @classmethod
    def create(cls, foo, bar):
        ...
        ...
        return super(DoFooJob, cls).create(foo, {'bar': bar})
}}}

The run method can then access self.metadata when running the
job.

Introduction

The Job System allows jobs to be run asynchronously from web requests.

Where a request needs to trigger a long-running job that would take too long for processing during a single web request the job system can be used to queue the job for later processing.

It is currently used for things such as:

  • Generating merge proposal diffs
  • Calculating bug heat

Architecture

Each job that will go through the system has a specific type, which stores the arguments specific to that job type. For instance the merge proposal diff generation links to the merge proposal for which the diff should be generated.

Each type of job also shares some common fields that are used by the job system to ensure that the job is processed. These include the status of the job, timestamps at which it was queued, completed, etc.

Each job is backed by the database, providing durability and error tolerance.

Each job type also has an associated script which processes jobs of that type as needed, usually run from cron. The script selects any outstanding jobs and runs them. In some cases, one script runs several types of jobs.

Code

The code for this lives in lp.services.job.

You can see implementations of job types in

  • lib/lp/bugs/interfaces/bugjob.py, lib/lp/bugs/model/bugjob.py, lib/lp/bugs/model/bugheat.py and cronscripts/calculate-bug-heat.py
  • lib/lp/code/interfaces/branchjob.py, lib/lp/code/model/branchjob.py, lib/lp/code/interfaces/branchmergeproposal.py, lib/lp/code/model/branchmergeproposal.py and cronscripts/merge-proposal-jobs.py

In lp.services.job.interfaces.job are the basic interfaces for jobs, which each job type will build upon.

  • JobStatus is an enum of the statuses a job can be in, waiting, running, completed, etc.

  • IJob is the interface implemented by each job type, and hold the attributes common to all types, as well as standard methods that the job system can use to run the jobs and manipulate them in other ways.
  • IRunnableJob is a job that can be run right now, and has a run() method that will do the work.
  • IJobSource is the interface for an object which can provide jobs that are ready to run right now.

Implementing a job type

At minimum, you need an object with a .job attribute, where the object implements IRunnableJob, and the .job attribute implements IJob.

You may choose to implement this using single-table polymorphism.

Implementing the behaviour of the specific job type

For this example we will put the tests in lib/lp/component/tests/test_dofoo.py

First we will test something static.

from canonical.testing import DatabaseFunctionalLayer

from lp.testing import TestCaseWithFactory


class DoFooJobTests(TestCaseWithFactory):
    """Test case for DoFooJob."""
    
    layer = DatabaseFunctionalLayer

    def test_getOopsVars(self):
        foo = self.factory.makeFoo()
        job = DoFooJob.create(foo)
        vars = job.getOopsVars()
        self.assertIn(('foo_id', foo.id), vars)
        self.assertIn(('foo_job_id', job.context.id), vars)
        self.assertIn(('foo_job_type', job.context.job_type.title), vars)

Firstly then, add a method to DoFooJob.

    def getOopsVars(self):
        """See `IRunnableJob`."""
        vars =  BaseRunnableJob.getOopsVars(self)
        vars.extend([
            ('foo_id', self.context.foo.id),
            ('foo_job_id', self.context.id),
            ('foo_job_type', self.context.job_type.title),
            ])
        return vars

In ./lib/lp/component/interfaces/foojob.py put:

from lp.services.job.interfaces.job import IRunnableJob

class IDoFooJob(IRunnableJob):
    """A Job to do Foo."""


class IDoFooJobSource(IFooJobSource):
    """An interface for acquiring DoFooJobs."""

In ./lib/lp/component/model/dofoojob.py put:

from zope.interface import classProvides, implements

from lp.component.interfaces.foojob import (
    FooJobType, IDoFooJob, IDoFooJobSource)
from lp.component.model.foojob import FooJobDerived


class DoFooJob():

    implements(IDoFooJob)

    class_job_type = FooJobType.DO_FOO
    classProvides(IDoFooJobSource)

    @classmethod
    def create(cls, foo):
        """See `IDoFooJobSource`."""

Now your test should pass.

Unique jobs per-target

Some job types can be unique for the target, such as calculating bug heat, or generating a merge proposal diff. If you want to have only a single job of each type for each foo then you need a bit more code. In this case, it makes sense to rename create since it no longer guarantees it will create something.

Add to DoFooJobTests:

    def _getJobs(self):
        """Return the pending DoFooJobs as a list."""
        return list(DoFooJob.iterReady())
        
    def _getJobCount(self):
        """Return the number of DoFooJobs in the queue."""
        return len(self._getJobs())
        
    def test_acquire_only_acquires_one(self):
        archive = self.factory.makeFoo()
        # If there's already a DoFooJob for a foo,
        # DoFooJob.create() won't create a new one.
        job = DoFooJob.acquire(foo)
    
        # There will now be one job in the queue.
        self.assertEqual(1, self._getJobCount())

        new_job = DoFooJob.acquire(foo)
        
        # The two jobs will in fact be the same job.
        self.assertEqual(job, new_job)
    
        # And the queue will still have a length of 1.
        self.assertEqual(1, self._getJobCount())

To make this test pass we need to make DoFooJob.create look like:

    @classmethod                                                              
    def acquire(cls, foo):                                                 
        """See `IDoFooJobSource`."""
        # If there's already a job for the foo, don't create a new one.   
        store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)    
        job_for_foo = store.find(                                             
            FooJob,                                                       
            FooJob.foo == foo,                                        
            FooJob.job == Job.id,                                         
            Job.id.is_in(Job.ready_jobs)                                      
            ).any()                                                           
                                                                              
        if job_for_foo is not None:                                           
            return cls(job_for_foo)                                           
        else:                                                                 
            return DoFooJob()

Doing the actual work

Now lets look at implementing the actual work. A new method on the test case:

    def test_run(self):                                                       
        """Test that DoFooJob.run() actually frozzles the foo."""     
        foo = self.factory.makeFoo()
        job = DoFooJob.create(foo)                                  
         
        job.run()

        # Put checks here that the job actually ran
        self.assertEqual("frozzled", foo.some_attribute)

To implement this we have to add DoFooJob.run

    def run(self):
        """See `IRunnableJob`."""
        # Put here the code that acts on self.foo to do the work

The job processing script

Here's a test that the cronscript does what it should:

import transaction
from canonical.launchpad.scripts.tests import run_script


    def test_cronscript_succeeds(self):
        # The do-foo cronscript will run all pending
        # DoFooJobs.
        foo = self.factory.makeFoo()
        DoFooJob.create(foo)
        transaction.commit()

        retcode, stdout, stderr = run_script(
            'cronscripts/calculate-bug-heat.py', [],
            expect_returncode=0)
        self.assertEqual('', stdout)
        self.assertIn(
            'INFO    Ran 1 DoFooJob jobs.\n', stderr)

The content of that cronscript should be something like

#
# Copyright 2010 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

# pylint: disable-msg=W0403

"""Do Foo."""

__metaclass__ = type

import _pythonpath
        
from canonical.launchpad.webapp import errorlog

from lp.services.job.runner import JobCronScript
from lp.component.interfaces.foojob import IDoFooJobSource
        
        
class RunDoFoo(JobCronScript):
    """Run DoFooJob jobs."""
        
    config_name = 'do_foo'
    source_interface = IDoFooJobSource
            
    def main(self):
        errorlog.globalErrorUtility.configure(self.config_name)
        return super(RunDoFoo, self).main()
        
        
if __name__ == '__main__':
    script = RunDoFoo()
    script.lock_and_run()

Notes

If creating your Foo object adds one of the jobs that you are testing for, see lp.bugs.tests.test_bugheat.CalculateBugHeatJobTestCase.setUp() for how to avoid this messing up your tests.

Foundations/JobSystem (last edited 2012-03-04 08:33:39 by lifeless)