120 lines
2.7 KiB
PHP
120 lines
2.7 KiB
PHP
<?php
|
|
|
|
abstract class Job {
|
|
private $_jobid = 0;
|
|
private $_source = "unknown";
|
|
|
|
public function __construct($source = "unknown") {
|
|
$this->_source = $source;
|
|
}
|
|
|
|
// Implement this function to actually do the job
|
|
abstract public function run();
|
|
|
|
// Place the current job onto the queue ready for processing.
|
|
public function queue() {
|
|
$class = get_called_class();
|
|
$data = serialize($this);
|
|
$db = DB::getInstance();
|
|
|
|
$this->_jobid = $db->insert("job", [
|
|
"class" => $class,
|
|
"data" => $data,
|
|
"queued" => time(),
|
|
"source" => $this->_source,
|
|
"status" => "Queued",
|
|
]);
|
|
|
|
return $this->_jobid;
|
|
}
|
|
|
|
public function getJobClass() {
|
|
$class = get_called_class();
|
|
return $class;
|
|
}
|
|
|
|
public function jobID() {
|
|
return $this->_jobid;
|
|
}
|
|
|
|
public function setJobID($id) {
|
|
$this->_jobid = $id;
|
|
}
|
|
|
|
// Set the current status message
|
|
public function status($txt) {
|
|
$db = DB::getInstance();
|
|
$db->update("job", $this->_jobid, [
|
|
"status" => $txt
|
|
]);
|
|
}
|
|
|
|
// Mark the job as completed successfully
|
|
public function finish() {
|
|
$db = DB::getInstance();
|
|
$db->update("job", $this->_jobid, [
|
|
"finished" => time(),
|
|
]);
|
|
}
|
|
|
|
// Mark the job as failed miserably
|
|
public function fail() {
|
|
$db = DB::getInstance();
|
|
$db->update("job", $this->_jobid, [
|
|
"failed" => time(),
|
|
]);
|
|
}
|
|
|
|
// Restart the job.
|
|
public function retry() {
|
|
$db = DB::getInstance();
|
|
$db->update("job", $this->_jobid, [
|
|
"failed" => 0,
|
|
"finished" => 0,
|
|
"started" => 0,
|
|
"status" => "Retrying"
|
|
]);
|
|
}
|
|
|
|
// Restart the job and defer it to a later time.
|
|
public function defer($when) {
|
|
$db = DB::getInstance();
|
|
$db->update("job", $this->_jobid, [
|
|
"failed" => 0,
|
|
"finished" => 0,
|
|
"started" => 0,
|
|
"queued" => $when,
|
|
"status" => "Deferred until " . gmdate("Y-m-d\TH:i:s\Z", $when)
|
|
]);
|
|
}
|
|
|
|
// Look for the next available job that optionally has
|
|
// the requested class. Mark it as started, deserialize
|
|
// it, and return the job runner object.
|
|
// Returns false if no job available.
|
|
public static function consumeNextJob($class = null) {
|
|
$db = DB::getInstance();
|
|
|
|
$db->query("lock table job write");
|
|
if ($class == null) {
|
|
$q = $db->query("select * from job where started=0 and queued < unix_timestamp(now()) order by queued limit 1");
|
|
} else {
|
|
$q = $db->query("select * from job where started=0 and queued < unix_timestamp(now()) and class=:class order by queued limit 1", ["class" => $class]);
|
|
}
|
|
$r = $db->nextRecord($q);
|
|
if (!$r) {
|
|
$db->query("unlock tables");
|
|
return false;
|
|
}
|
|
|
|
$db->update("job", $r->id, [
|
|
"started" => time()
|
|
]);
|
|
$db->query("unlock tables");
|
|
|
|
$ob = unserialize($r->data);
|
|
$ob->setJobID($r->id);
|
|
return $ob;
|
|
}
|
|
}
|