Product.php 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. <?php
  2. namespace app\job\queue;
  3. use app\common\model\product\ProductTimer;
  4. use app\common\model\shop\Task;
  5. use app\shop\model\product\Product as ProductModel;
  6. use app\shop\model\product\ProductSku;
  7. use think\db\exception\DataNotFoundException;
  8. use think\db\exception\DbException;
  9. use think\db\exception\ModelNotFoundException;
  10. use think\queue\Job;
  11. class Product
  12. {
  13. private static $attempts = 3; // 失败队列重试最大次数
  14. /**
  15. * @Title: fire
  16. * @Description: fire方法是消息队列默认调用的方法
  17. * @param Job $job
  18. * @param array $payload
  19. */
  20. public function fire(Job $job, array $payload)
  21. {
  22. $jobId = $job->getJobId();
  23. // 有些消息在到达消费者时,可能已经不再需要执行了
  24. $taskBeDone = $this->taskBeDone($job, $payload);
  25. if (!$taskBeDone) {
  26. // 删除任务
  27. $job->delete();
  28. return;
  29. }
  30. // 执行结果
  31. $result = 0;
  32. // 执行任务
  33. $isJobDone = $this->executeTask($payload);
  34. if ($isJobDone) {
  35. // 删除任务
  36. $job->delete();
  37. // 执行结果
  38. $result = 1;
  39. } else {
  40. // 检查任务执行次数
  41. if ($job->attempts() > self::$attempts) {
  42. // 删除任务
  43. $job->delete();
  44. // 重新发布任务
  45. // $job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
  46. }
  47. }
  48. // 从商品上下架表中删除定时信息
  49. (new ProductSku)->removeProductTimer($payload['product_id']);
  50. // 写入task表,留存执行记录
  51. $record = [
  52. 'task_id' => $jobId,
  53. 'category' => 'product',
  54. 'tid' => $payload['product_id'],
  55. 'state' => $payload['state'],
  56. 'publish_time' => strtotime($payload['solt']),
  57. 'payload' => jsonEncode($payload),
  58. 'result' => $result, // 1成功,0失败
  59. ];
  60. Task::add($record);
  61. }
  62. /**
  63. * @Title: taskBeDone
  64. * @Description: 前置操作,删除不需要执行的任务
  65. * @param Job $job
  66. * @param array $payload
  67. * @return boolean
  68. */
  69. private function taskBeDone(Job $job, array $payload): bool
  70. {
  71. try {
  72. // 如果任务不存在,则取消队列
  73. $task = (new ProductTimer())->where('task_id', '=', $job->getJobId())->find();
  74. if (!$task) {
  75. // 写入task表,留存执行记录
  76. $record = [
  77. 'task_id' => $job->getJobId(),
  78. 'category' => 'product',
  79. 'tid' => $payload['product_id'],
  80. 'state' => $payload['state'],
  81. 'publish_time' => strtotime($payload['solt']),
  82. 'payload' => jsonEncode($payload),
  83. 'result' => 2, // 1成功,0失败,2取消
  84. ];
  85. Task::add($record);
  86. return false;
  87. }
  88. } catch (DataNotFoundException | ModelNotFoundException | DbException $e) {
  89. return true;
  90. }
  91. return true;
  92. }
  93. /**
  94. * @Title: execute
  95. * @Description: 业务处理
  96. * @param array $payload
  97. * @return bool
  98. */
  99. public function executeTask(array $payload): bool
  100. {
  101. // 商品详情
  102. $model = ProductModel::detail($payload['product_id']);
  103. if (!$model->setStatus($payload['state'])) {
  104. return false;
  105. }
  106. return true;
  107. }
  108. /**
  109. * 发布失败
  110. * @param $data
  111. */
  112. public function failed($data)
  113. {
  114. // todo ...任务达到最大重试次数后,失败了
  115. }
  116. }