_source = $source; } // Implement this function to actually do the job abstract public function run(); // Place the current job onto the queue ready for processing. public function queue() { $class = get_called_class(); $data = serialize($this); $db = DB::getInstance(); $this->_jobid = $db->insert("job", [ "class" => $class, "data" => $data, "queued" => time(), "source" => $this->_source, "status" => "Queued", ]); return $this->_jobid; } public function getJobClass() { $class = get_called_class(); return $class; } public function jobID() { return $this->_jobid; } public function setJobID($id) { $this->_jobid = $id; } // Set the current status message public function status($txt) { $db = DB::getInstance(); $db->update("job", $this->_jobid, [ "status" => $txt ]); } // Mark the job as completed successfully public function finish() { $db = DB::getInstance(); $db->update("job", $this->_jobid, [ "finished" => time(), ]); } // Mark the job as failed miserably public function fail() { $db = DB::getInstance(); $db->update("job", $this->_jobid, [ "failed" => time(), ]); } // Restart the job. public function retry() { $db = DB::getInstance(); $db->update("job", $this->_jobid, [ "failed" => 0, "finished" => 0, "started" => 0, "status" => "Retrying" ]); } // Restart the job and defer it to a later time. public function defer($when) { $db = DB::getInstance(); $db->update("job", $this->_jobid, [ "failed" => 0, "finished" => 0, "started" => 0, "queued" => $when, "status" => "Deferred until " . gmdate("Y-m-d\TH:i:s\Z", $when) ]); } // Look for the next available job that optionally has // the requested class. Mark it as started, deserialize // it, and return the job runner object. // Returns false if no job available. public static function consumeNextJob($class = null) { $db = DB::getInstance(); $db->query("lock table job write"); if ($class == null) { $q = $db->query("select * from job where started=0 and queued < unix_timestamp(now()) order by queued limit 1"); } else { $q = $db->query("select * from job where started=0 and queued < unix_timestamp(now()) and class=:class order by queued limit 1", ["class" => $class]); } $r = $db->nextRecord($q); if (!$r) { $db->query("unlock tables"); return false; } $db->update("job", $r->id, [ "started" => time() ]); $db->query("unlock tables"); $ob = unserialize($r->data); $ob->setJobID($r->id); return $ob; } }