python websocket 服务端demo
2025年3月31日...大约 1 分钟
python websocket 服务端demo
# -*- coding: UTF-8 -*-
#!/usr/bin/env python3
"""
Echo server using the asyncio API.
pip install websockets==15.0.1
"""
import asyncio
import sys
import json
from websockets.asyncio.server import serve
from websockets.exceptions import ConnectionClosed
websocketList = []
async def echo(websocket):
print(f"客户端连接成功,客户端信息如下:")
# json 化请求头信息
headerDict = {}
headerDict.setdefault("path",websocket.request.path)
remote_addr = f"{websocket.remote_address[0]}:{websocket.remote_address[1]}"
headerDict.setdefault("remote_addr",remote_addr)
for header,value in websocket.request.headers.raw_items():
headerDict.setdefault(header,value)
headers = json.dumps(headerDict,indent=4)
print(headers)
# 发送欢迎信息
await websocket.send(f"Welcome Access My WebSocket Server! 当前有 {len(websocketList)}个客户端,你是第 {len(websocketList)+1} 个")
# 添加到列表
websocketList.append(websocket)
# 执行监听用户输入信息
try :
async for message in websocket:
if message.lower() == "exit":
await websocket.close()
print("客户端连接已关闭")
elif message.lower() == "header":
await websocket.send(headers)
else:
await websocket.send(message)
except ConnectionClosed:
print("客户端主动断开")
# 断开连接后,删除列表中的记录
print("客户端主动断开")
websocketList.remove(websocket)
# 主进程入口
async def main():
try:
port = int(sys.argv[1]) if len(sys.argv) == 2 else 8765
async with serve(echo, "0.0.0.0", port) as server:
print(f"WebSocket 服务器运行在 {port} 端口!")
await server.serve_forever()
except ValueError:
print("端口号必须是整数")
except Exception as e:
print(f"服务器启动失败: {e}")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n手动停止服务器")
贡献者
lichangyangccm@163.com