Python 在 3 中我在版本 5 中新增了 asyncio,但在版本 3 中使用後似乎並不容易使用7、asyncio 有了很大的改進,用起來還不錯,asyncio 引入了 async await 語法,有點類似於 nodejs。
最簡單的例子:
import asyncioasync def main():print('hello ..') await asyncio.sleep(1) print('...world!')# python 3.7+asyncio.run(main())例如,PHP 的 Swoole 擴充套件也是基於協程的,但使用起來不如 Python 方便,而且 Swoole 不提供非同步 await 語法。 Asyncio 提供了 Create Task 來建立協程任務,可以與 Swoole 的 go 函式進行比較
async def main():task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at ") await task1 await task2 print(f"finished at ")asyncio 還提供了許多幫助程式函式,例如 asyncioGather 用於同時執行多個協程任務,相當於之前使用 Swoole 實現的 groupwait。 對應的 swoole 通道也有乙個版本的 asyncioqueue。還有多個佇列,例如 priorityqueue 和 lifoqueue,因此 Asyncio 也完全適合 CSP 程式設計模型。
使用 asyncio 建立 TCP 服務:
import asyncioasync def handle echo(reader, writer): while true: 確定 EOF 終止符以關閉鏈結 if readerat_eof():break data = await reader.readline() message = data.decode() addr = writer.get_extra_info('peername') print(f"received from ") print(f"send: ") writer.write(data) await writer.drain() print("close the connection") writer.close()async def tcp_server_task():server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888) addr = server.sockets[0].getsockname() print(f'serving on ') async with server: await server.serve_forever()async def main():task = asyncio.create_task(tcp_server_task())await taskasyncio.run(main())現在我們假設我們需要建立乙個集中的TCP日誌採集服務,並將TCP埠接收到的日誌儲存在PostgreSQL或者MySQL中,日誌請求的數量會比較大,所以我們需要有良好的效能,使用以下簡單的實現方式:
import asynciofrom psycopg2.pool import **connectionpoolfrom datetime import datetimeclass asynctask: def __init__(self): self.task_queue = asyncio.queue() self.db_pool = **connectionpool( 2, 20, dbname='echoes', host='192.168.2.10', user='twn39', password='tangweinan') async def handle echo(self, reader, writer): while true: 確定 EOF 終止符以關閉鏈結 if readerat_eof():break data = await reader.readline() message = data.decode() self.task_queue.put_nowait(message.strip())writer.write("ok".encode())await writer.drain() print("close the connection") writer.close() async def tcp_server_task(self): server = await asyncio.start_server(self.handle_echo, '127.0.0.1', 8888) addr = server.sockets[0].getsockname() print(f'serving on ') async with server: await server.serve_forever() async def consume_task(self, name: str): while true: data = await self.task_queue.get() conn = self.db_pool.getconn() print(conn) cur = conn.cursor() cur.execute("insert into logs (level, message, created_at) values (%s, %s, %s)", (200, 'log test', datetime.now() conn.commit() self.db_pool.putconn(conn) print(f'work: consume data: ') self.task_queue.task_done()if __name__ == '__main__': async def main():async_task = asynctask() task = asyncio.create_task(async_task.tcp_server_task())task1 = asyncio.create_task(async_task.consume_task('worker-1')) task2 = asyncio.create_task(async_task.consume_task('worker-2')) await asyncio.gather(task, task1, task2, return_exceptions=true) asyncio.run(main())程式中引入了 Queue,它可以非同步寫入資料庫以提高高峰請求期間的穩定性,我們建立了三個主要任務,乙個是監聽 TCP 埠,獲取日誌資料,將日誌放入佇列中,另外兩個是消耗佇列,當佇列中有資料時寫入資料庫, 並在沒有資料時等待資料。
* 有兩個無限迴圈(實際上是三個),程式執行時,相當於並行,但實際上它是單執行緒的,協程可以中斷,當多個協程執行時,其實函式之間切換,但切換時間很短,所以反映在並行上。 多執行緒程式設計是由作業系統實現的,因此在它們之間切換的成本相對較高,而協程則由使用者決定何時切換,也稱為使用者端線程。