WebSocket 编程主要涉及创建服务器和客户端两个部分。以下是一些基本的示例代码,分别展示了如何使用 `websocket-client` 库和 `websockets` 库进行 WebSocket 编程。
使用 `websocket-client` 库
安装 `websocket-client` 库
```bash
pip install websocket-client
```
创建 WebSocket 客户端
```python
import websocket
创建 WebSocket 连接
ws = websocket.WebSocket()
ws.connect("ws://echo.websocket.org")
发送消息
ws.send("Hello, WebSocket!")
接收消息
result = ws.recv()
print("收到服务器响应:", result)
关闭连接
ws.close()
```
异步回调示例
```python
import websocket
import asyncio
def on_message(ws, message):
print(f"收到消息: {message}")
def on_error(ws, error):
print(f"发生错误: {error}")
def on_close(ws, close_status_code, close_msg):
print("连接关闭")
def on_open(ws):
print("连接建立")
ws.send("Hello Server!")
创建 WebSocket 应用
websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://echo.websocket.org",
on_message=on_message,
on_error=on_error,
on_close=on_close,
on_open=on_open)
启动应用
ws.run_forever()
```
使用 `websockets` 库
安装 `websockets` 库
```bash
pip install websockets
```
创建 WebSocket 服务器
```python
import asyncio
import websockets
async def echo(websocket, path):
async for message in websocket:
print(f"Received: {message}")
await websocket.send(message)
start_server = websockets.serve(echo, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
```
创建 WebSocket 客户端
```python
import asyncio
import websockets
async def hello():
async with websockets.connect("ws://localhost:8765") as websocket:
await websocket.send("Hello, Server!")
response = await websocket.recv()
print(f"收到服务器响应: {response}")
asyncio.run(hello())
```
进阶技巧
异步编程:使用 `async/await` 语法可以高效处理大量并发连接。
消息处理:可以定义回调函数来处理不同类型的消息。
连接管理:可以维护连接集合,以便进行广播或其他操作。
这些示例代码展示了如何使用 `websocket-client` 和 `websockets` 库进行基本的 WebSocket 编程。根据具体需求,可以进一步扩展和优化这些示例。