Foundations/JobSystem

Not logged in - Log In / Register

Introduction

The Job System allows work to be performed subsequent to the web request, with generic code handling logging, scheduling and so forth. Common uses for this are when a web request needs work which cannot sensibly be done in-request.

It is currently used for things such as:

Architecture

Each job that will go through the system has a specific type, which stores the arguments specific to that job type. For instance the merge proposal diff generation links to the merge proposal for which the diff should be generated.

Each type of job also shares some common fields that are used by the job system to ensure that the job is processed. These include the status of the job, timestamps at which it was queued, completed, etc.

Each job is backed by the database, providing durability and error tolerance.

Each job type also has an associated script which processes jobs of that type as needed, usually run from cron. The script selects any outstanding jobs and runs them. In some cases, one script runs several types of jobs.

Code

The code for this lives in lp.services.job.

You can see implementations of job types in

In lp.services.job.interfaces.job are the basic interfaces for jobs, which each job type will build upon.

Implementing a job type

At minimum, you need an object with a .job attribute, where the object implements IRunnableJob, and the .job attribute implements IJob.

You may choose to implement this using single-table polymorphism.

Implementing the behaviour of the specific job type

For this example we will put the tests in lib/lp/component/tests/test_dofoo.py

First we will test something static.

from canonical.testing import DatabaseFunctionalLayer

from lp.testing import TestCaseWithFactory


class DoFooJobTests(TestCaseWithFactory):
    """Test case for DoFooJob."""
    
    layer = DatabaseFunctionalLayer

    def test_getOopsVars(self):
        foo = self.factory.makeFoo()
        job = DoFooJob.create(foo)
        vars = job.getOopsVars()
        self.assertIn(('foo_id', foo.id), vars)
        self.assertIn(('foo_job_id', job.context.id), vars)
        self.assertIn(('foo_job_type', job.context.job_type.title), vars)

Firstly then, add a method to DoFooJob.

    def getOopsVars(self):
        """See `IRunnableJob`."""
        vars =  BaseRunnableJob.getOopsVars(self)
        vars.extend([
            ('foo_id', self.context.foo.id),
            ('foo_job_id', self.context.id),
            ('foo_job_type', self.context.job_type.title),
            ])
        return vars

In ./lib/lp/component/interfaces/foojob.py put:

from lp.services.job.interfaces.job import IRunnableJob

class IDoFooJob(IRunnableJob):
    """A Job to do Foo."""


class IDoFooJobSource(IFooJobSource):
    """An interface for acquiring DoFooJobs."""

In ./lib/lp/component/model/dofoojob.py put:

from zope.interface import classProvides, implements

from lp.component.interfaces.foojob import (
    FooJobType, IDoFooJob, IDoFooJobSource)
from lp.component.model.foojob import FooJobDerived


class DoFooJob():

    implements(IDoFooJob)

    class_job_type = FooJobType.DO_FOO
    classProvides(IDoFooJobSource)

    @classmethod
    def create(cls, foo):
        """See `IDoFooJobSource`."""

Now your test should pass.

Unique jobs per-target

Some job types can be unique for the target, such as calculating bug heat, or generating a merge proposal diff. If you want to have only a single job of each type for each foo then you need a bit more code. In this case, it makes sense to rename create since it no longer guarantees it will create something.

Add to DoFooJobTests:

    def _getJobs(self):
        """Return the pending DoFooJobs as a list."""
        return list(DoFooJob.iterReady())
        
    def _getJobCount(self):
        """Return the number of DoFooJobs in the queue."""
        return len(self._getJobs())
        
    def test_acquire_only_acquires_one(self):
        archive = self.factory.makeFoo()
        # If there's already a DoFooJob for a foo,
        # DoFooJob.create() won't create a new one.
        job = DoFooJob.acquire(foo)
    
        # There will now be one job in the queue.
        self.assertEqual(1, self._getJobCount())

        new_job = DoFooJob.acquire(foo)
        
        # The two jobs will in fact be the same job.
        self.assertEqual(job, new_job)
    
        # And the queue will still have a length of 1.
        self.assertEqual(1, self._getJobCount())

To make this test pass we need to make DoFooJob.create look like:

    @classmethod                                                              
    def acquire(cls, foo):                                                 
        """See `IDoFooJobSource`."""
        # If there's already a job for the foo, don't create a new one.   
        store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)    
        job_for_foo = store.find(                                             
            FooJob,                                                       
            FooJob.foo == foo,                                        
            FooJob.job == Job.id,                                         
            Job.id.is_in(Job.ready_jobs)                                      
            ).any()                                                           
                                                                              
        if job_for_foo is not None:                                           
            return cls(job_for_foo)                                           
        else:                                                                 
            return DoFooJob()

Doing the actual work

Now lets look at implementing the actual work. A new method on the test case:

    def test_run(self):                                                       
        """Test that DoFooJob.run() actually frozzles the foo."""     
        foo = self.factory.makeFoo()
        job = DoFooJob.create(foo)                                  
         
        job.run()

        # Put checks here that the job actually ran
        self.assertEqual("frozzled", foo.some_attribute)

To implement this we have to add DoFooJob.run

    def run(self):
        """See `IRunnableJob`."""
        # Put here the code that acts on self.foo to do the work

The job processing script

You'll need a cron script that picks pending jobs off your queue and runs them. There are two ways to do this. One is to execute the generic job runner, run_jobs.py and fill in the details in your lazr configuration. The other is to create a custom cron script, using the same script class internally.

The easy way: run_jobs.py

Set up a configuration section with the name of the script. It should specify the database user to run the jobs under, and the name of the IJobRunner utility class for finding pending jobs:

[do_foo]
# Database role to run do-foo jobs under.
# datatype: string
dbuser: dofoo

# Utility for finding pending DoFoo jobs.
source_interface: lp.component.interfaces.foojob.IDoFooJobSource

To run pending jobs of this type, just execute run_jobs with the name of this configuration section as its argument:

cronscripts/run_jobs.py do_foo

You could test this by running the script like that, but there's not much point: if you trust that run_jobs does its job, you can save yourself the database commit and the script startup time. Instantiate the underlying script object directly in your test instead.

    def test_job_runner_runs_job(self):
        foo = self.factory.makeFoo()
        job = DoFooJob.create(foo)

        # Tell JobCronScript to take its configuration from the
        # config section named on the command line.
        script = JobCronScript(test_args=["do_foo"], commandline_config=True)

        # Suppress test noise.
        script.logger = DevNullLogger()

        # Run pending jobs.
        script.main()

        # This particular job completes successfully.
        self.assertEqual(JobStatus.COMPLETED, job.context.job.status)

Note: You may be tempted to override methods inside job, e.g. using FakeMethod. Then you could just check that job.run.call_count == 1. But that may not work! Many of our job types are really wrappers for database-backed base types such as DistributionJob. The job runner will find the same database-backed job object, but wrap its own instance of your actual job type around it in memory. Now you have two wrappers (the one you created and the one the job runner created) for the same database object, and the one that gets run is not the one that you patched.

Alternative: Do It Yourself

Would you rather write your own cron script? Here's a test that the script does what it should:

import transaction
from canonical.launchpad.scripts.tests import run_script


    def test_cronscript_succeeds(self):
        # The do-foo cronscript will run all pending
        # DoFooJobs.
        foo = self.factory.makeFoo()
        DoFooJob.create(foo)
        transaction.commit()

        retcode, stdout, stderr = run_script(
            'cronscripts/calculate-bug-heat.py', [],
            expect_returncode=0)
        self.assertEqual('', stdout)
        self.assertIn(
            'INFO    Ran 1 DoFooJob jobs.\n', stderr)

The cronscript itself uses the same script class that underpins run_jobs.py, but configures it directly instead of from the command line. It should be something like

#
# Copyright 2010 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

# pylint: disable-msg=W0403

"""Do Foo."""

__metaclass__ = type

import _pythonpath
        
from canonical.launchpad.webapp import errorlog

from lp.services.job.runner import JobCronScript
from lp.component.interfaces.foojob import IDoFooJobSource
        
        
class RunDoFoo(JobCronScript):
    """Run DoFooJob jobs."""

    # Point JobCronScript to the config section that tells it what
    # to do.
    config_name = 'do_foo'

    # Tell JobCronScript the jobs utility that finds pending jobs.
    source_interface = IDoFooJobSource
            
    def main(self):
        errorlog.globalErrorUtility.configure(self.config_name)
        return super(RunDoFoo, self).main()
        
        
if __name__ == '__main__':
    script = RunDoFoo()
    script.lock_and_run()

Foundations/JobSystem (last edited 2012-03-04 08:33:39 by lifeless)