Diff for "Foundations/JobSystem"

Not logged in - Log In / Register

Differences between revisions 1 and 2
Revision 1 as of 2010-06-22 16:55:09
Size: 3930
Editor: james-w
Comment:
Revision 2 as of 2010-06-22 17:09:08
Size: 6838
Editor: james-w
Comment:
Deletions are marked like this. Additions are marked like this.
Line 36: Line 36:
  * lib/lp/bugs/interfaces/bugjob.py, lib/lp/bugs/models/bugjob.py and cronscripts/calculate-bug-heat.py
  * lib/lp/code/interfaces/branchmergeproposal.py, lib/lp/code/models/branchmergeproposal.py and cronscripts/merge-proposal-jobs.py
  * lib/lp/bugs/interfaces/bugjob.py, lib/lp/bugs/model/bugjob.py and cronscripts/calculate-bug-heat.py
  * lib/lp/code/interfaces/branchmergeproposal.py, lib/lp/code/model/branchmergeproposal.py and cronscripts/merge-proposal-jobs.py
Line 92: Line 92:
  * Create lib/lp/component/model/foojob.py
  * In that file put:
{{{
import simplejson
                                                                              
from sqlobject import SQLObjectNotFound
from storm.base import Storm
from storm.locals import Int, Reference, Unicode
                                                                              
from zope.component import getUtility
from zope.interface import classProvides, implements
                                                                              
from canonical.launchpad.webapp.interfaces import (
    DEFAULT_FLAVOR, IStoreSelector, MAIN_STORE)
                                                                              
from lazr.delegates import delegates
 
from lp.component.interfaces.foojob import IFooJob, IFooJobSource
from lp.component.model.foo import Foo
from lp.services.job.model.job import Job
from lp.services.job.runner import BaseRunnableJob


class FooJob(Storm):
    """Base class for jobs related to Foos."""

    implements(IFooJob)

    __storm_table__ = 'FooJob'
                                                                              
    id = Int(primary=True)

    job_id = Int(name='job')
    job = Reference(job_id, Job.id)
    
    foo_id = Int(name='foo')
    foo = Reference(foo_id, Foo.id)
                                                                              
    _json_data = Unicode('json_data')
        
    @property
    def metadata(self):
        return simplejson.loads(self._json_data)

    @classmethod
    def get(cls, key):
        """Return the instance of this class whose key is supplied."""
        store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
        instance = store.get(cls, key)
        if instance is None:
            raise SQLObjectNotFound(
                'No occurence of %s has key %s' % (cls.__name__, key))
        return instance


class FooJobDerived(BaseRunnableJob):
    """Intermediate class for deriving from FooJob."""
    delegates(IFooJob)
    classProvides(IFooJobSource)

    def __init__(self, job):
        self.context = job
}}}

Introduction

The Job System allows jobs to be run asynchronously from web requests.

Where a request needs to trigger a long-running job that would take too long for processing during a single web request the job system can be used to queue the job for later processing.

It is currently used for things such as:

  • Generating merge proposal diffs
  • Calculating bug heat

Architecture

Each job that will go through the system has a specific type, which stores the arguments specific to that job type. For instance the merge proposal diff generation links to the merge proposal for which the diff should be generated.

Each type of job also shares some common fields that are used by the job system to ensure that the job is processed. These include the status of the job, timestamps at which it was queued, completed, etc.

Each job is backed by the database, providing durability and error tolerance.

Each job type also has an associated script which processes jobs of that type as needed, usually run from cron. The script selects any outstanding jobs and runs them.

Code

The code for this lives in lp.services.job.

You can see implementations of job types in

  • lib/lp/bugs/interfaces/bugjob.py, lib/lp/bugs/model/bugjob.py and cronscripts/calculate-bug-heat.py
  • lib/lp/code/interfaces/branchmergeproposal.py, lib/lp/code/model/branchmergeproposal.py and cronscripts/merge-proposal-jobs.py

In lp.services.job.interfaces.job are the basic interfaces for jobs, which each job type will build upon.

  • JobStatus is an enum of the statuses a job can be in, waiting, running, completed, etc.

  • IJob is the interface implemented by each job type, and hold the attributes common to all types, as well as standard methods that the job system can use to run the jobs and manipulate them in other ways.
  • IRunnableJob
  • IJobSource is the interface for an object which can provide jobs that are ready to run right now.

Implementing a job type

If this is the first job for your app then you probably want to start by implementing a job interface that would be common to jobs of a certain class. For instance there is lp.bugs.interfaces.bugjob that deals with bugs referencing a bug.

We are going to create an interface for jobs dealing with IFoos, adjust as necessary.

  • Create ./lib/lp/component/interfaces/foojob.py
  • In that file put

from zope.interface import Attribute, Interface
from zope.schema import Int, Object                                           
                                                                              
from canonical.launchpad import _                                             

from lp.services.job.interfaces.job import IJob, IJobSource                   
from lp.component.interfaces.foo import IFoo                              
    
        
class IFooJob(Interface):
    """A Job related to a Foo."""                                        
    
    id = Int(
        title=_('DB ID'), required=True, readonly=True,                       
        description=_("The tracking number for this job."))                   
    
    foo = Object(
        title=_('The Foo this job is about.'), schema=IFoo,           
        required=True)
                                                                              
    job = Object(
        title=_('The common Job attributes'), schema=IJob, required=True)     
                                                                              
    metadata = Attribute('A dict of data about the job.')  


class IFooJobSource(IJobSource):                                          
    """An interface for acquiring IFooJobs."""                            
        
    def create(foo):                                                      
        """Create a new IFooJobs for a foo."""      
  • Create lib/lp/component/model/foojob.py
  • In that file put:

import simplejson                                                             
                                                                              
from sqlobject import SQLObjectNotFound                                       
from storm.base import Storm                                                  
from storm.locals import Int, Reference, Unicode                              
                                                                              
from zope.component import getUtility                                         
from zope.interface import classProvides, implements                          
                                                                              
from canonical.launchpad.webapp.interfaces import (                           
    DEFAULT_FLAVOR, IStoreSelector, MAIN_STORE)                               
                                                                              
from lazr.delegates import delegates                                          
 
from lp.component.interfaces.foojob import IFooJob, IFooJobSource     
from lp.component.model.foo import Foo                                                                                    
from lp.services.job.model.job import Job                                     
from lp.services.job.runner import BaseRunnableJob                            


class FooJob(Storm):
    """Base class for jobs related to Foos."""                            

    implements(IFooJob)                                                   

    __storm_table__ = 'FooJob'
                                                                              
    id = Int(primary=True)                                                    

    job_id = Int(name='job')
    job = Reference(job_id, Job.id)                                           
    
    foo_id = Int(name='foo')
    foo = Reference(foo_id, Foo.id)
                                                                              
    _json_data = Unicode('json_data')                                         
        
    @property
    def metadata(self):
        return simplejson.loads(self._json_data)

    @classmethod
    def get(cls, key):
        """Return the instance of this class whose key is supplied."""
        store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
        instance = store.get(cls, key)
        if instance is None:
            raise SQLObjectNotFound(
                'No occurence of %s has key %s' % (cls.__name__, key))
        return instance


class FooJobDerived(BaseRunnableJob):
    """Intermediate class for deriving from FooJob."""
    delegates(IFooJob)
    classProvides(IFooJobSource)

    def __init__(self, job):
        self.context = job

Foundations/JobSystem (last edited 2012-03-04 08:33:39 by lifeless)