getJobId(); // 有些消息在到达消费者时,可能已经不再需要执行了 $taskBeDone = $this->taskBeDone($job, $payload); if (!$taskBeDone) { // 删除任务 $job->delete(); return; } // 执行结果 $result = 0; // 执行任务 $isJobDone = $this->executeTask($payload); if ($isJobDone) { // 删除任务 $job->delete(); // 执行结果 $result = 1; } else { // 检查任务执行次数 if ($job->attempts() > self::$attempts) { // 删除任务 $job->delete(); // 重新发布任务 // $job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行 } } // 从商品上下架表中删除定时信息 (new ProductSku)->removeProductTimer($payload['product_id']); // 写入task表,留存执行记录 $record = [ 'task_id' => $jobId, 'category' => 'product', 'tid' => $payload['product_id'], 'state' => $payload['state'], 'publish_time' => strtotime($payload['solt']), 'payload' => jsonEncode($payload), 'result' => $result, // 1成功,0失败 ]; Task::add($record); } /** * @Title: taskBeDone * @Description: 前置操作,删除不需要执行的任务 * @param Job $job * @param array $payload * @return boolean */ private function taskBeDone(Job $job, array $payload): bool { try { // 如果任务不存在,则取消队列 $task = (new ProductTimer())->where('task_id', '=', $job->getJobId())->find(); if (!$task) { // 写入task表,留存执行记录 $record = [ 'task_id' => $job->getJobId(), 'category' => 'product', 'tid' => $payload['product_id'], 'state' => $payload['state'], 'publish_time' => strtotime($payload['solt']), 'payload' => jsonEncode($payload), 'result' => 2, // 1成功,0失败,2取消 ]; Task::add($record); return false; } } catch (DataNotFoundException | ModelNotFoundException | DbException $e) { return true; } return true; } /** * @Title: execute * @Description: 业务处理 * @param array $payload * @return bool */ public function executeTask(array $payload): bool { // 商品详情 $model = ProductModel::detail($payload['product_id']); if (!$model->setStatus($payload['state'])) { return false; } return true; } /** * 发布失败 * @param $data */ public function failed($data) { // todo ...任务达到最大重试次数后,失败了 } }