async def run_task(self, tasks, task_handler): cOnsumer= asyncio.ensure_future(self.consume(self.task_queue)) await self.produce(tasks, task_handler, self.task_queue) await self.task_queue.join() consumer.cancel() async def produce(self, tasks, task_handler, queue): n = len(tasks) for i, task in enumerate(tasks): item = (i + 1, n, task, task_handler) await queue.put(item) async def consume(self, queue): async with ClientSession() as session: while 1: item = await queue.get() index = item[0] count = item[1] handler = item[3] rs = await handler(item[2], i=index, n=count, session=session) print('consuming {}/{} {}...'.format(index, count, rs)) queue.task_done() # 只要加了这句程序就挂起了 await self.produce(rs, self.detail_handler, self.result_queue) loop = asyncio.get_event_loop() loop.run_until_complete(self.run_task(url_list, self.page_handler)) loop.close()
打印
consuming 1/5 ... consuming 2/5 ...
await self.produce(rs, self.detail_handler, self.result_queue)