这是一个被我修改过的是生产 /消费者模型,由于需要消费者返回数据,所以不得已用了两个 Queue,第二个用于把结果写回去,最后在 main 函数里取出返回,下面的代码在我的 python3.7 和 python3.8 环境都可以正常运行,先上代码,请大家过目。
import asyncio import pandas as pd import random import re async def crawler(u): """ 模拟爬虫返回结果 :param u: fake url :return: """ i = int(re.search(r"\d+", u).group(0)) await asyncio.sleep(random.random()) return {'url': u, 'result': i} async def worker(qin, qout, w): """ 消费者异步函数 :param qin: Queue1,用于生产者写入和消费者读出 :param qout: Queue2,用于让消费者回写结果 :param w: worker id :return: """ while True: if qin.empty(): break u = await qin.get() print(f"Worker-{w} crawling {u}") resp = await crawler(u) await qout.put(resp) qout.task_done() async def generate_url(url, qin): """ 生产者异步函数 :param url: :param qin: Queue1,写出生产者产出的(这里为传入的) url :return: """ await qin.put(url) print(f"Queue size = {qin.qsize()}") # qin.task_done() async def main(qmax=20): q_in = asyncio.Queue(qmax) q_out = asyncio.Queue() urls = [f"url{i}" for i in range(100)] producers = [asyncio.create_task(generate_url(u, q_in)) for u in urls] # producers = [await q_in.put(u) for u in urls] cOnsumers= [asyncio.create_task(worker(q_in, q_out, i)) for i in range(1, qmax // 2 + 1)] await asyncio.gather(*consumers) await asyncio.gather(*producers) # await q_in.join() await q_out.join() for c in consumers: c.cancel() return [await q_out.get() for _ in range(q_out.qsize())] if __name__ == "__main__": result = asyncio.run(main(30)) df = pd.DataFrame(result).set_index('url') 目前有三个问题,
- 被注释掉的 qin.join,qin.task_done 是需要?还是让第二个 queue 阻塞就好了?
- main 函数中的 producers 是我写的异步列表推导式,想用于替代 generate_url 函数,但是不能正常运行
- 是否还有更优雅的实现方式?
