Foundations/JobSystem

Not logged in - Log In / Register

Revision 7 as of 2010-06-22 19:41:10

Clear message

Introduction

The Job System allows jobs to be run asynchronously from web requests.

Where a request needs to trigger a long-running job that would take too long for processing during a single web request the job system can be used to queue the job for later processing.

It is currently used for things such as:

Architecture

Each job that will go through the system has a specific type, which stores the arguments specific to that job type. For instance the merge proposal diff generation links to the merge proposal for which the diff should be generated.

Each type of job also shares some common fields that are used by the job system to ensure that the job is processed. These include the status of the job, timestamps at which it was queued, completed, etc.

Each job is backed by the database, providing durability and error tolerance.

Each job type also has an associated script which processes jobs of that type as needed, usually run from cron. The script selects any outstanding jobs and runs them.

Code

The code for this lives in lp.services.job.

You can see implementations of job types in

In lp.services.job.interfaces.job are the basic interfaces for jobs, which each job type will build upon.

Implementing a job type

If this is the first job for your app then you probably want to start by implementing a job interface that would be common to jobs of a certain class. For instance there is lp.bugs.interfaces.bugjob that deals with bugs referencing a bug.

We are going to create an interface for jobs dealing with IFoos, adjust as necessary.

Create some tests. In lib/lp/component/tests/test_foojob.py put:

import unittest

from canonical.testing import LaunchpadZopelessLayer

from lp.component.interfaces.foojob import FooJobType
from lp.component.model.foojob import FooJob, FooJobDerived
from lp.testing import TestCaseWithFactory


class FooJobTestCase(TestCaseWithFactory):
    """Test case for basic FooJob gubbins."""

    layer = LaunchpadZopelessLayer

    def test_instantiate(self):
        # FooJob.__init__() instantiates a FooJob instance.
        foo = self.factory.makeFoo()

        metadata = ('some', 'arbitrary', 'metadata')
        foo_job = FooJob(
            foo, FooJobType.COPY_ARCHIVE, metadata)

        self.assertEqual(foo, foo_job.foo)
        self.assertEqual(FooJobType.DO_FOO, foo_job.job_type)

        # When we actually access the FooJob's metadata it gets
        # deserialized from JSON, so the representation returned by
        # foo_job.metadata will be different from what we originally
        # passed in.
        metadata_expected = [u'some', u'arbitrary', u'metadata']
        self.assertEqual(metadata_expected, foo_job.metadata)

    
class FooJobDerivedTestCase(TestCaseWithFactory):
    """Test case for the FooJobDerived class."""

    layer = LaunchpadZopelessLayer
        
    def test_create_explodes(self):
        # FooJobDerived.create() will blow up because it needs to be
        # subclassed to work properly.
        foo = self.factory.makeFoo()
        self.assertRaises(
            AttributeError, FooJobDerived.create, foo)
        
        
def test_suite():
    return unittest.TestLoader().loadTestsFromName(__name__)

Create ./lib/lp/component/interfaces/foojob.py, an in it put:

from zope.interface import Attribute, Interface
from zope.schema import Int, Object                                           
                                                                              
from canonical.launchpad import _                                             

from lp.services.job.interfaces.job import IJob, IJobSource                   
from lp.component.interfaces.foo import IFoo                              
    
        
class IFooJob(Interface):
    """A Job related to a Foo."""                                        
    
    id = Int(
        title=_('DB ID'), required=True, readonly=True,                       
        description=_("The tracking number for this job."))                   
    
    foo = Object(
        title=_('The Foo this job is about.'), schema=IFoo,           
        required=True)
                                                                              
    job = Object(
        title=_('The common Job attributes'), schema=IJob, required=True)     
                                                                              
    metadata = Attribute('A dict of data about the job.')  

    def destroySelf():
        """Destroy this object."""


class IFooJobSource(IJobSource):                                          
    """An interface for acquiring IFooJobs."""                            
        
    def create(foo):                                                      
        """Create a new IFooJobs for a foo."""      

Create lib/lp/component/model/foojob.py. In that file put:

import simplejson                                                             
                                                                              
from sqlobject import SQLObjectNotFound                                       
from storm.base import Storm                                                  
from storm.locals import Int, Reference, Unicode                              
                                                                              
from zope.component import getUtility                                         
from zope.interface import classProvides, implements                          
                                                                              
from canonical.launchpad.webapp.interfaces import (                           
    DEFAULT_FLAVOR, IStoreSelector, MAIN_STORE)                               
                                                                              
from lazr.delegates import delegates                                          
 
from lp.component.interfaces.foojob import IFooJob, IFooJobSource     
from lp.component.model.foo import Foo                                                                                    
from lp.services.job.model.job import Job                                     
from lp.services.job.runner import BaseRunnableJob                            


class FooJob(Storm):
    """Base class for jobs related to Foos."""                            

    implements(IFooJob)                                                   

    __storm_table__ = 'FooJob'
                                                                              
    id = Int(primary=True)                                                    

    job_id = Int(name='job')
    job = Reference(job_id, Job.id)                                           
    
    foo_id = Int(name='foo')
    foo = Reference(foo_id, Foo.id)
                                                                              
    _json_data = Unicode('json_data')                                         
        
    @property
    def metadata(self):
        return simplejson.loads(self._json_data)

    @classmethod
    def get(cls, key):
        """Return the instance of this class whose key is supplied."""
        store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
        instance = store.get(cls, key)
        if instance is None:
            raise SQLObjectNotFound(
                'No occurence of %s has key %s' % (cls.__name__, key))
        return instance


class FooJobDerived(BaseRunnableJob):
    """Intermediate class for deriving from FooJob."""
    delegates(IFooJob)
    classProvides(IFooJobSource)

    def __init__(self, job):
        self.context = job

You'll notice that we haven't quite done enough to run the tests yet, for that we need to create a job type enum.

This will have a value for each of the job types that you will run for a Foo. Start by implementing a single one. To do this edit ./lib/lp/component/interfaces/foojob.py and put something like:

from lazr.enum import DBEnumeratedType, DBItem


class FooJobType(DBEnumeratedType):

    DO_FOO = DBItem(0, """
        Do Foo.

        This job does frozzles a Foo.
        """)

Were DO_FOO is something descriptive. You will also need to change that in lib/lp/component/tests/test_foojob.py to match.

Then you need to modify ./lib/lp/component/model/foojob.py to add

from canonical.database.enumcol import EnumCol

from lp.component.intefaces.foojob import FooJobType

and then add to the FooJob class:

    job_type = EnumCol(enum=FooJobType, notNull=True)

    def __init__(self, foo, job_type, metadata):
        """Constructor.

        :param foo: the Foo this job relates to.
        :param job_type: the FooJobType of this job.
        :param metadata: The type-specific variables, as a JSON-compatible
            dict.
        """
        super(FooJob, self).__init__()
        json_data = simplejson.dumps(metadata)
        self.job = Job()
        self.foo = foo
        self.job_type = job_type
        # XXX AaronBentley 2009-01-29 bug=322819: This should be a bytestring,
        # but the DB representation is unicode.
        self._json_data = json_data.decode('utf-8')

You should now be in a position to run the tests that you wrote originally and have one of them pass.

To make the other pass we need to add some code to FooJobDerived:

    @classmethod
    def create(cls, foo):
        """See `IFooJob`."""
        # If there's already a job for the foo, don't create a new one.
        job = FooJob(foo, cls.class_job_type, {})
        return cls(job)

    @classmethod
    def get(cls, job_id):
        """Get a job by id.

        :return: the FooJob with the specified id, as the current
                 FooJobDerived subclass.
        :raises: SQLObjectNotFound if there is no job with the specified id,
                 or its job_type does not match the desired subclass.
        """"
        job = FooJob.get(job_id)
        if job.job_type != cls.class_job_type:
            raise SQLObjectNotFound(
                'No object found with id %d and type %s' % (job_id,
                cls.class_job_type.title))
        return cls(job)

    @classmethod
    def iterReady(cls):
        """Iterate through all ready FooJobs."""
        store = getUtility(IStoreSelector).get(MAIN_STORE, MASTER_FLAVOR)
        jobs = store.find(
            FooJob,
            And(FooJob.job_type == cls.class_job_type,
                FooJob.job == Job.id,
                Job.id.is_in(Job.ready_jobs),
                FooJob.bug == Foo.id))
        return (cls(job) for job in jobs)

That should get the second test passing.

Next we have to define the DB table for the new Job class. A patch something like this would work:

-- Copyright 2009 Canonical Ltd.  This software is licensed under the
-- GNU Affero General Public License version 3 (see the file LICENSE).

SET client_min_messages=ERROR;

-- The schema patch required for adding foo jobs.

-- The `FooJob` table captures the data required for an foo job.

CREATE TABLE FooJob (
    id serial PRIMARY KEY,
    -- FK to the `Job` record with the "generic" data about this archive
    -- job.
    job integer NOT NULL CONSTRAINT foojob__job__fk REFERENCES job,
    -- FK to the associated `Foo` record.
    foo integer NOT NULL CONSTRAINT foojob__foo__fk REFERENCES foo,
    -- The particular type of foo job
    job_type integer NOT NULL,
    -- JSON data for use by the job
    json_data text
);

ALTER TABLE FooJob ADD CONSTRAINT foojob__job__key UNIQUE (job);
CREATE INDEX foojob__foo__job_type__idx ON FooJob(foo, job_type);

INSERT INTO LaunchpadDatabaseRevision VALUES (2207, 99, 0);

With comments.sql entries like:

-- FooJob

COMMENT ON TABLE FooJob is 'Contains references to jobs to be run against Foos.';
COMMENT ON COLUMN FooJob.foo IS 'The foo on which the job is to be run.';
COMMENT ON COLUMN FooJob.job_type IS 'The type of job (enumeration value). Allows us to query the database for a given subset of FooJobs.';
COMMENT ON COLUMN FooJob.json_data IS 'A JSON struct containing data for the job.';

And an entry in the [launchpad_main] section of database/schema/security.cfg with

public.foojob                       = SELECT, INSERT, UPDATE, DELETE

Now we move on to implementing the behaviour of our specific type, DO_FOO.

For this example we will put the tests in lib/lp/component/tests/test_dofoo.py

First we will test something static.

from canonical.testing import LaunchpadZopelessLayer

from lp.testing import TestCaseWithFactory


class DoFooJobTests(TestCaseWithFactory):
    """Test case for DoFooJob."""
    
    layer = LaunchpadZopelessLayer

    def test_getOopsVars(self):
        foo = self.factory.makeFoo()
        job = DoFooJob.create(foo)
        vars = job.getOopsVars()
        self.assertIn(('foo_id', foo.id), vars)
        self.assertIn(('foo_job_id', job.context.id), vars)
        self.assertIn(('foo_job_type', job.context.job_type.title), vars)

This doesn't actually depend on the behaviour of the job, and in fact the variables are applicable to any FooJob, so getOopsVars on FooJobDerived could implement that. That is in fact what we want, but we can't test FooJobDerived directly as we can't create instances of it, therefore we test it via DoFooJob.

Firstly then, add a method to FooJobDerived.

    def getOopsVars(self):
        """See `IRunnableJob`."""
        vars =  BaseRunnableJob.getOopsVars(self)
        vars.extend([
            ('foo_id', self.context.foo.id),
            ('foo_job_id', self.context.id),
            ('foo_job_type', self.context.job_type.title),
            ])
        return vars

This won't let the test pass though, as we still need DoFooJob.

In ./lib/lp/component/interfaces/foojob.py put:

from lp.services.job.interfaces.job import IRunnableJob

class IDoFooJob(IRunnableJob):
    """A Job to do Foo."""


class IDoFooJobSource(IFooJobSource):
    """An interface for acquiring DoFooJobs."""

In ./lib/lp/component/model/dofoojob.py put: