🧪 Skills
WebSocket Reconnect
WebSocket connection management with exponential backoff + jitter retry, heartbeat detection, and circuit breaker pattern. Use when you need reliable WebSock...
v1.0.0
Description
name: websocket-reconnect description: WebSocket connection management with exponential backoff + jitter retry, heartbeat detection, and circuit breaker pattern. Use when you need reliable WebSocket connections that automatically recover from network failures. license: MIT metadata: {"author":"custom","version":"1.0.0","openclaw":{"emoji":"🔌","requires":{"bins":["node"]}}}
WebSocket Reconnect Skill
可靠的 WebSocket 连接管理,包含指数退避 + 抖动重连算法、心跳检测和断路器模式。
特性
- 指数退避重连: 智能重试策略,延迟时间指数增长
- 随机抖动 (Jitter): 防止群震问题 (thundering herd)
- 心跳检测: 自动检测僵死连接并触发重连
- 断路器模式: 三次状态 (CLOSED, OPEN, HALF-OPEN) 防止级联故障
- 最大重试上限: 避免无限重试消耗资源
- 事件驱动: 完整的事件系统用于状态监控
安装依赖
npm install ws
快速开始
基础用法
const { WebSocketReconnect } = require('./scripts/websocket-reconnect.js');
const ws = new WebSocketReconnect({
url: 'wss://api.example.com/socket',
maxRetries: 10,
baseDelay: 1000,
maxDelay: 30000
});
// 监听事件
ws.on('open', () => {
console.log('✅ Connected');
});
ws.on('message', (event) => {
console.log('📨 Message:', event.data);
});
ws.on('close', (event) => {
console.log('❌ Closed:', event.code, event.reason);
});
ws.on('error', (error) => {
console.error('⚠️ Error:', error.message);
});
ws.on('retry', (info) => {
console.log(`🔄 Retry ${info.attempt}/${info.maxRetries} in ${info.delay}ms`);
});
// 连接
ws.connect();
// 发送消息
ws.send(JSON.stringify({ type: 'subscribe', channel: 'updates' }));
// 关闭连接
// ws.close();
完整配置示例
const ws = new WebSocketReconnect({
// 连接配置
url: 'wss://api.example.com/socket',
protocols: ['graphql-ws'],
websocketOptions: {
headers: {
'Authorization': 'Bearer token123'
}
},
// 重连配置
maxRetries: 10, // 最大重试次数
baseDelay: 1000, // 基础延迟 (ms)
maxDelay: 30000, // 最大延迟 (ms)
multiplier: 2, // 退避倍数
jitter: 0.1, // 抖动系数 (0-1)
// 心跳配置
heartbeatInterval: 30000, // 心跳间隔 (ms)
heartbeatTimeout: 5000, // 心跳超时 (ms)
heartbeatMessage: JSON.stringify({ type: 'ping' }),
// 断路器配置
circuitBreaker: {
failureThreshold: 5, // 失败阈值
resetTimeout: 60000, // 重置超时 (ms)
halfOpenMaxRequests: 3 // 半开状态最大请求数
}
});
重连策略
指数退避计算
重连延迟按指数增长,避免频繁重试:
延迟 = baseDelay × (multiplier ^ 尝试次数)
示例 (baseDelay=1000ms, multiplier=2):
- 第 1 次重试:1000ms (1 秒)
- 第 2 次重试:2000ms (2 秒)
- 第 3 次重试:4000ms (4 秒)
- 第 4 次重试:8000ms (8 秒)
- ...
- 达到 maxDelay 后不再增长
抖动 (Jitter)
添加随机抖动防止多个客户端同时重试:
// jitter = 0.1 时,延迟在 ±10% 范围内随机
实际延迟 = 计算延迟 × (0.9 ~ 1.1)
心跳检测机制
工作原理
- 连接成功后,每隔
heartbeatInterval发送 ping 消息 - 等待服务器在
heartbeatTimeout内回复 pong - 如果超时,认为连接已僵死,自动关闭并触发重连
服务器端要求
服务器需要响应 ping 消息:
// 服务器示例 (Node.js + ws)
wss.on('connection', (ws) => {
ws.on('message', (data) => {
const message = JSON.parse(data);
if (message.type === 'ping') {
ws.send(JSON.stringify({ type: 'pong' }));
}
// ... 处理其他消息
});
});
自定义心跳消息
如果服务器使用不同的心跳协议:
const ws = new WebSocketReconnect({
url: 'wss://api.example.com',
heartbeatInterval: 30000,
heartbeatMessage: 'PING', // 字符串
// 或者
heartbeatMessage: { cmd: 'heartbeat' } // 对象 (会自动 JSON.stringify)
});
// 监听 pong 响应
ws.on('message', (event) => {
const data = JSON.parse(event.data);
if (data.type === 'pong') {
console.log('Heartbeat OK');
}
});
断路器模式
三种状态
CLOSED (正常)
- 请求正常通过
- 跟踪失败次数
- 失败达到阈值时打开
OPEN (熔断)
- 请求立即失败,不尝试连接
- 防止对故障服务的过载
- 经过
resetTimeout后自动转为 HALF-OPEN
HALF-OPEN (测试)
- 允许有限数量的请求通过
- 成功则转为 CLOSED
- 失败则转回 OPEN
状态监控
ws.on('circuitChange', (state) => {
console.log(`Circuit breaker state: ${state}`);
if (state === 'OPEN') {
console.log('⚠️ Service temporarily unavailable');
}
});
// 手动查询状态
const circuitState = ws.getCircuitState(); // 'CLOSED', 'OPEN', or 'HALF-OPEN'
手动控制断路器
// 手动打开断路器
ws.openCircuit();
// 手动关闭断路器 (重置)
ws.closeCircuit();
事件参考
| 事件 | 参数 | 描述 |
|---|---|---|
open |
无 | 连接成功建立 |
message |
event |
收到消息 (原生 MessageEvent) |
close |
{code, reason} |
连接关闭 |
error |
Error |
发生错误 |
retry |
{attempt, delay, maxRetries} |
即将重试 |
stateChange |
state |
连接状态变化 |
circuitChange |
state |
断路器状态变化 |
状态监控
获取统计信息
const stats = ws.getStats();
console.log(stats);
// {
// state: 'OPEN',
// circuitState: 'CLOSED',
// retryCount: 2,
// maxRetries: 10,
// circuitFailures: 0,
// failureThreshold: 5,
// lastActivity: 1709532000000,
// uptime: 15000
// }
连接状态
CLOSED: 未连接CONNECTING: 正在连接OPEN: 已连接CLOSING: 正在关闭BLOCKED: 被断路器阻止FAILED: 超过最大重试次数
完整示例
实时聊天客户端
const { WebSocketReconnect } = require('./scripts/websocket-reconnect.js');
class ChatClient {
constructor(userId) {
this.userId = userId;
this.ws = new WebSocketReconnect({
url: `wss://chat.example.com/socket?user=${userId}`,
maxRetries: 10,
baseDelay: 1000,
maxDelay: 30000,
heartbeatInterval: 30000,
heartbeatTimeout: 5000,
circuitBreaker: {
failureThreshold: 5,
resetTimeout: 60000
}
});
this.setupEventHandlers();
}
setupEventHandlers() {
this.ws.on('open', () => {
console.log('✅ Connected to chat server');
// 发送上线通知
this.ws.send(JSON.stringify({
type: 'online',
userId: this.userId
}));
});
this.ws.on('message', (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
});
this.ws.on('close', () => {
console.log('❌ Disconnected from chat server');
});
this.ws.on('error', (error) => {
console.error('⚠️ Chat error:', error.message);
});
this.ws.on('retry', (info) => {
console.log(`🔄 Reconnecting... Attempt ${info.attempt}/${info.maxRetries}`);
});
this.ws.on('circuitChange', (state) => {
if (state === 'OPEN') {
console.log('⚠️ Chat service temporarily unavailable');
}
});
}
handleMessage(message) {
switch (message.type) {
case 'pong':
// 心跳响应,忽略
break;
case 'chat':
console.log(`[${message.from}]: ${message.content}`);
break;
case 'system':
console.log(`[System]: ${message.content}`);
break;
}
}
sendMessage(content) {
if (this.ws.state === 'OPEN') {
this.ws.send(JSON.stringify({
type: 'chat',
content: content
}));
} else {
console.log('⚠️ Cannot send message: not connected');
}
}
connect() {
return this.ws.connect();
}
disconnect() {
this.ws.close();
}
}
// 使用示例
const chat = new ChatClient('user123');
chat.connect();
chat.sendMessage('Hello, world!');
GraphQL 订阅客户端
const { WebSocketReconnect } = require('./scripts/websocket-reconnect.js');
class GraphQLSubscriptionClient {
constructor(endpoint, options = {}) {
this.ws = new WebSocketReconnect({
url: endpoint,
protocols: ['graphql-ws'],
maxRetries: options.maxRetries || 10,
heartbeatInterval: options.heartbeatInterval || 30000,
heartbeatMessage: JSON.stringify({ type: 'connection_init' }),
...options
});
this.subscriptions = new Map();
this.subscriptionId = 0;
this.setupEventHandlers();
}
setupEventHandlers() {
this.ws.on('open', () => {
console.log('✅ GraphQL WebSocket connected');
// 重新订阅所有订阅
this.resubscribeAll();
});
this.ws.on('message', (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
});
}
handleMessage(message) {
switch (message.type) {
case 'connection_ack':
console.log('GraphQL connection acknowledged');
break;
case 'next':
const subscription = this.subscriptions.get(message.id);
if (subscription && subscription.onData) {
subscription.onData(message.payload);
}
break;
case 'complete':
const sub = this.subscriptions.get(message.id);
if (sub && sub.onComplete) {
sub.onComplete();
}
this.subscriptions.delete(message.id);
break;
case 'error':
const s = this.subscriptions.get(message.id);
if (s && s.onError) {
s.onError(message.payload);
}
break;
}
}
subscribe(query, variables, callbacks) {
const id = String(++this.subscriptionId);
this.subscriptions.set(id, {
query,
variables,
onData: callbacks.onData,
onError: callbacks.onError,
onComplete: callbacks.onComplete
});
if (this.ws.state === 'OPEN') {
this.ws.send(JSON.stringify({
id,
type: 'subscribe',
payload: { query, variables }
}));
}
return () => this.unsubscribe(id);
}
unsubscribe(id) {
this.subscriptions.delete(id);
if (this.ws.state === 'OPEN') {
this.ws.send(JSON.stringify({
id,
type: 'complete'
}));
}
}
resubscribeAll() {
for (const [id, sub] of this.subscriptions) {
this.ws.send(JSON.stringify({
id,
type: 'subscribe',
payload: { query: sub.query, variables: sub.variables }
}));
}
}
connect() {
return this.ws.connect();
}
disconnect() {
this.ws.close();
}
}
配置选项
| 选项 | 类型 | 默认值 | 描述 |
|---|---|---|---|
url |
string | 必填 | WebSocket 服务器 URL |
protocols |
array | [] |
子协议列表 |
websocketOptions |
object | {} |
WebSocket 构造函数选项 (Node.js) |
maxRetries |
number | 10 |
最大重试次数 |
baseDelay |
number | 1000 |
基础延迟 (毫秒) |
maxDelay |
number | 30000 |
最大延迟 (毫秒) |
multiplier |
number | 2 |
退避倍数 |
jitter |
number | 0.1 |
抖动系数 (0-1) |
heartbeatInterval |
number | 30000 |
心跳间隔 (毫秒) |
heartbeatTimeout |
number | 5000 |
心跳超时 (毫秒) |
heartbeatMessage |
string | {"type":"ping"} |
心跳消息 |
circuitBreaker.failureThreshold |
number | 5 |
打开断路器的失败次数 |
circuitBreaker.resetTimeout |
number | 60000 |
断路器重置超时 (毫秒) |
circuitBreaker.halfOpenMaxRequests |
number | 3 |
半开状态最大请求数 |
最佳实践
1. 合理设置重试次数
// 关键业务:更多重试
maxRetries: 20, baseDelay: 2000
// 非关键业务:较少重试
maxRetries: 5, baseDelay: 1000
2. 心跳间隔选择
// 高频交易:短间隔
heartbeatInterval: 10000, heartbeatTimeout: 3000
// 普通应用:标准间隔
heartbeatInterval: 30000, heartbeatTimeout: 5000
// 低带宽场景:长间隔
heartbeatInterval: 60000, heartbeatTimeout: 10000
3. 断路器阈值
// 敏感服务:低阈值快速熔断
circuitBreaker: { failureThreshold: 3, resetTimeout: 30000 }
// 稳定服务:高阈值避免误熔断
circuitBreaker: { failureThreshold: 10, resetTimeout: 120000 }
4. 优雅关闭
// 应用退出前
ws.close(1001, 'Client shutting down');
// 而不是直接
process.exit();
故障排查
问题:频繁重连
原因: 服务器不稳定或网络问题
解决:
- 增加
baseDelay和maxDelay - 降低
maxRetries - 检查服务器日志
问题:心跳超时
原因: 服务器未响应 ping 或网络延迟高
解决:
- 增加
heartbeatTimeout - 确认服务器正确响应 ping
- 检查网络质量
问题:断路器持续 OPEN
原因: 服务持续故障
解决:
- 检查服务健康状态
- 增加
failureThreshold - 减少
resetTimeout更快尝试恢复
性能影响
内存占用
每个实例约 1-2MB (包括事件处理器和定时器)
CPU 占用
- 空闲连接:几乎为零 (仅心跳定时器)
- 重连期间:低 (指数退避减少重试频率)
网络流量
- 心跳:每 30 秒约 100 字节
- 重连:根据网络状况动态调整
浏览器兼容性
支持所有现代浏览器 (Chrome, Firefox, Safari, Edge) 和 Node.js 14+
许可证
MIT
Reviews (0)
Sign in to write a review.
No reviews yet. Be the first to review!
Comments (0)
No comments yet. Be the first to share your thoughts!