感谢大家在前文 请教下各位 Gunicorn...提供的帮助,前段时间有些忙,没有一一回复大家。在这里感谢各位了,中秋快乐。
听取了大家的意见,由于 celery 对于我来说过于复杂,所以使用 rq 来做。
叨扰一下新的问题:
- 长耗时任务分为两个步骤
- 步骤 1 执行完,才执行步骤 2
- 步骤 2 内部需要拆分为多个 worker 并行执行。
目前遇到的问题是
- step_two 里面的 worker 变成串行了。
- 要怎么汇总 step_two 的运行结果
- 如何 close 访问 /task 后创建的任务
以下代码使用 gunicorn 默认模式运行
from datetime import datetime from redis import Redis import time import rq_dashboard from flask import Flask, request from rq import Queue, Worker from rq.command import send_stop_job_command redis = Redis(host='127.0.0.1', port=6379) queue = Queue(cOnnection=redis, name='yixianghuitong') app = Flask(__name__) app.config['RQ_DASHBOARD_REDIS_URL'] = 'redis://localhost:6379' app.config.from_object(rq_dashboard.default_settings) app.register_blueprint(rq_dashboard.blueprint, url_prefix="/rq") @app.route('/task') def add_task(): user = request.args.get('user') if user: print('执行步骤 1') d = datetime.now().strftime('%Y%m%d%H%M%S') name = f"{user}_{d}" queue.enqueue( step_one, args=(name,) ) worker = Worker([queue], cOnnection=redis, name=name) worker.work(burst=True) print('执行步骤 2') step_two(name) # TODO: 汇总上面的结果返回给网页端 return f"Task {name} complete" return 'No value for n' def step_one(name): delay = 5 print(f"Running {name}, Simulating {delay} second delay") time.sleep(delay) f = open(f"step_one_{name}.txt", 'a') d = datetime.now().strftime('%Y%m%d-%H%M%S') f.write(d) print(f"Task {name} complete") return name def step_two(name): for i in range(2): worker_name = f"{name}_{i}" queue.enqueue(step_two_fn, worker_name) worker = Worker([queue], cOnnection=redis, name=name) worker.work(burst=True) def step_two_fn(name): delay = 5 print(f"Running {name}, Simulating {delay} second delay") time.sleep(delay) f = open(f"step_two_fn_{name}.txt", 'a') d = datetime.now().strftime('%Y%m%d-%H%M%S') f.write(d) print(f"Task {name} complete") return name @app.route('/close') def close(): # list_jobs = queue.get_jobs() # print(list_jobs) id = request.args.get('id') # job = queue.fetch_job(id) send_stop_job_command(redis, id) # for j in list_jobs: # print(11, j.get_id()) # j.cancel() return 'len(list_jobs)' 