websocket
概念
websocket是一种网络通信协议。
http通信只能由客户端发起,做不到服务器主动向客户端推送消息。
如果服务器有连续的状态变化,客户端要知道这个变化非常麻烦,只能通过“轮询”,也就是每隔一段时间,发出一个询问,了解服务器有没有新的消息。
轮询的效率非常低,非常浪费资源,因为必须不停连接,或者http连接始终打开。
因此,就产生了websocket。
特点
1、服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,真正的双向平等对话,属于服务器推送技术的一种。
2、建立在TCP协议之上
3、与HTTP协议有良好的兼容性
4、数据格式比较轻量,性能开销小,通信高效
5、没有同源限制,客户端可以与任意服务器通信
6、协议标识符是ws,加密的是wss,服务器网址就是url
ws://example.com:80/some/path
建立连接过程
1、websocket连接由浏览器发起,请求协议是一个标准的HTTP请求
GET ws://localhost:3000/ws/chat HTTP/1.1
Host: localhost
Upgrade: websocket
Connection: Upgrade
Origin: http://localhost:3000
Sec-WebSocket-Key: client-random-string
Sec-WebSocket-Version: 13
请求头Upgrade:websocket和Connection:Upgrade表示连接将要被转换为WebSocket连接
Sec-WebSocket-Key是用于标识这个连接
Sec-WebSocket-Version指定了WebSocket的协议版本
2、 随后,服务器如果接受请求,就会返回响应
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: server-random-string
响应代码101表示本次连接的HTTP协议即将被修改,更改后的协议就是Upgrade:websocket指定的websocket协议
3、websocket连接建立成功,浏览器和服务器就可以随时主动发送消息给对方。
客户端API
构造函数
WebSocket对象作为一个构造函数,用于新建WebSocket实例。
var ws=new WebSocket('ws://localhost:8080');
webSocket.readyState
readyState
属性返回实例对象的当前状态:
CONNECTING:值为0,表示正在连接;
OPEN:值为1,表示连接成功,可以通信了;
CLOSING:值为2,表示连接正在关闭;
CLOSED:值为3,表示连接已经关闭,或者打开连接失败
webSocket.onopen
用于指定连接成功后的回调函数
ws.onopen=function(){
ws.send('xxx');
}
如果要指定多个,可以使用addEventListener方法
ws.addEventListener('open',function(event){
ws.send('xxx');
})
webSocket.onclose
指定连接关闭后的回调函数
webSocket.onmessage
指定收到服务器数据后的回调函数
服务器数据可能是文本,也可能是二进制数据(blob对象或Arraybuffer对象)
ws.onmessage=function(event){
if(typeof event.data===String){
console.log("xxx");
}
if(event.data instanceof ArrayBuffer){
var buffer=event.data;
console.log("Xxx");
}
}
也可以使用binaryType属性,显式指定收到的二进制数据类型
// 收到的是 blob 数据
ws.binaryType = "blob";
ws.onmessage = function(e) {
console.log(e.data.size);
};
// 收到的是 ArrayBuffer 数据
ws.binaryType = "arraybuffer";
ws.onmessage = function(e) {
console.log(e.data.byteLength);
};
webSocket.send()
用于向服务器发送数据
// 发送文本
ws.send('message');
// 发送blob对象
var file=document.querySelector('input[type="file"]').files[0];
ws.send(file);
// 发送ArrayBuffer对象
var img=canvas_context.getImageData(0,0,400,320);
var binary=new Uint8Array(img.data,data.length);
for(var i=0;i<img.data.length;i++){
binary[i]=img.data[i];
}
ws.send(binary.buffer);
webSocket.bufferedAmount
表示还有多少字节的二进制数据没有发送出去,可以用来判断发送是否结束
var data = new ArrayBuffer(10000000);
socket.send(data);
if (socket.bufferedAmount === 0) {
// 发送完毕
} else {
// 发送还没结束
}
webSocket.onerror
用于指定报错的回调函数
保证websocket的可靠性和稳定性
心跳机制
心跳机制通过定期发送Ping/Pong帧来检测连接的活跃状态,防止由于长时间无数据传输而导致连接被中间设备(如路由器、防火墙)关闭
// 发送心跳Ping消息
const pingInterval = setInterval(() => {
if (socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify({ type: 'ping' }));
}
}, 30000); // 每30秒发送一次Ping消息
重连机制
在连接断开时自动重连,以保证连接的持续性。可以在客户端实现重连机制。
socket.onclose = () => {
console.log('Connection closed, attempting to reconnect...');
if (!reconnectInterval) {
reconnectInterval = setInterval(connect, 5000); // 每5秒尝试重连一次
}
};
断线重连
在服务器端也要对客户端的重连进行适当处理,确保连接状态的一致性。
const interval = setInterval(() => {
wss.clients.forEach((client) => {
if (!client.isAlive) {
return client.terminate();
}
client.isAlive = false;
client.ping(() => {});
});
}, 30000); // 每30秒进行一次心跳检查
错误处理和日志记录
在客户端和服务器端都要对错误进行适当处理,并记录日志以便于问题的追踪和排查。
// 客户端
socket.onerror = (error) => {
console.error('WebSocket error:', error);
// 根据需要进行进一步的错误处理
};
// 服务端
ws.on('error', (error) => {
console.error('WebSocket error:', error);
// 根据需要进行进一步的错误处理
ws.close();
});
优雅关闭连接
在需要关闭连接时,通过发送关闭帧(Opcode 0x8)来优雅地关闭连接。
// 客户端
function closeConnection() {
if (socket.readyState === WebSocket.OPEN) {
socket.close(1000, 'Closing connection'); // 1000表示正常关闭
}
}
// 服务端
ws.on('close', (code, reason) => {
console.log(`Connection closed: ${code} - ${reason}`);
// 处理连接关闭逻辑
});
封装
阿里开源hook写法
参考https://github.com/alibaba/hooks/blob/master/packages/hooks/src/useWebSocket/index.ts
import { useEffect, useRef, useState } from 'react';
import useLatest from '../useLatest';
import useMemoizedFn from '../useMemoizedFn';
import useUnmount from '../useUnmount';
export enum ReadyState {
Connecting = 0,
Open = 1,
Closing = 2,
Closed = 3,
}
export interface Options {
reconnectLimit?: number;
reconnectInterval?: number;
manual?: boolean;
onOpen?: (event: WebSocketEventMap['open'], instance: WebSocket) => void;
onClose?: (event: WebSocketEventMap['close'], instance: WebSocket) => void;
onMessage?: (message: WebSocketEventMap['message'], instance: WebSocket) => void;
onError?: (event: WebSocketEventMap['error'], instance: WebSocket) => void;
protocols?: string | string[];
}
export interface Result {
latestMessage?: WebSocketEventMap['message'];
sendMessage: WebSocket['send'];
disconnect: () => void;
connect: () => void;
readyState: ReadyState;
webSocketIns?: WebSocket;
}
export default function useWebSocket(socketUrl: string, options: Options = {}): Result {
const {
reconnectLimit = 3,
reconnectInterval = 3 * 1000,
manual = false,
onOpen,
onClose,
onMessage,
onError,
protocols,
} = options;
const onOpenRef = useLatest(onOpen);
const onCloseRef = useLatest(onClose);
const onMessageRef = useLatest(onMessage);
const onErrorRef = useLatest(onError);
const reconnectTimesRef = useRef(0);
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout>>();
const websocketRef = useRef<WebSocket>();
const [latestMessage, setLatestMessage] = useState<WebSocketEventMap['message']>();
const [readyState, setReadyState] = useState<ReadyState>(ReadyState.Closed);
const reconnect = () => {
if (
reconnectTimesRef.current < reconnectLimit &&
websocketRef.current?.readyState !== ReadyState.Open
) {
if (reconnectTimerRef.current) {
clearTimeout(reconnectTimerRef.current);
}
reconnectTimerRef.current = setTimeout(() => {
// eslint-disable-next-line @typescript-eslint/no-use-before-define
connectWs();
reconnectTimesRef.current++;
}, reconnectInterval);
}
};
const connectWs = () => {
if (reconnectTimerRef.current) {
clearTimeout(reconnectTimerRef.current);
}
if (websocketRef.current) {
websocketRef.current.close();
}
const ws = new WebSocket(socketUrl, protocols);
setReadyState(ReadyState.Connecting);
ws.onerror = (event) => {
if (websocketRef.current !== ws) {
return;
}
reconnect();
onErrorRef.current?.(event, ws);
setReadyState(ws.readyState || ReadyState.Closed);
};
ws.onopen = (event) => {
if (websocketRef.current !== ws) {
return;
}
onOpenRef.current?.(event, ws);
reconnectTimesRef.current = 0;
setReadyState(ws.readyState || ReadyState.Open);
};
ws.onmessage = (message: WebSocketEventMap['message']) => {
if (websocketRef.current !== ws) {
return;
}
onMessageRef.current?.(message, ws);
setLatestMessage(message);
};
ws.onclose = (event) => {
onCloseRef.current?.(event, ws);
// closed by server
if (websocketRef.current === ws) {
reconnect();
}
// closed by disconnect or closed by server
if (!websocketRef.current || websocketRef.current === ws) {
setReadyState(ws.readyState || ReadyState.Closed);
}
};
websocketRef.current = ws;
};
const sendMessage: WebSocket['send'] = (message) => {
if (readyState === ReadyState.Open) {
websocketRef.current?.send(message);
} else {
throw new Error('WebSocket disconnected');
}
};
const connect = () => {
reconnectTimesRef.current = 0;
connectWs();
};
const disconnect = () => {
if (reconnectTimerRef.current) {
clearTimeout(reconnectTimerRef.current);
}
reconnectTimesRef.current = reconnectLimit;
websocketRef.current?.close();
websocketRef.current = undefined;
};
useEffect(() => {
if (!manual && socketUrl) {
connect();
}
}, [socketUrl, manual]);
useUnmount(() => {
disconnect();
});
return {
latestMessage,
sendMessage: useMemoizedFn(sendMessage),
connect: useMemoizedFn(connect),
disconnect: useMemoizedFn(disconnect),
readyState,
webSocketIns: websocketRef.current,
};
}
其他写法
https://juejin.cn/post/7231481633671757861
// socket.js
import modal from '@/plugins/modal'
const baseURL = import.meta.env.VITE_APP_BASE_WS;
const EventTypes = ['open', 'close', 'message', 'error', 'reconnect'];
const DEFAULT_CHECK_TIME = 55 * 1000; // 心跳检测的默认时间
const DEFAULT_CHECK_COUNT = 3; // 心跳检测默认失败重连次数
const DEFAULT_CHECK_DATA = { Type: 1, Parameters: ['alive'] }; // 心跳检测的默认参数 - 跟后端协商的
const CLOSE_ABNORMAL = 1006; // WebSocket非正常关闭code码
class EventMap {
deps = new Map();
depend(eventType, callback) {
this.deps.set(eventType, callback);
}
notify(eventType, event) {
if (this.deps.has(eventType)) {
this.deps.get(eventType)(event);
}
}
}
class Socket extends WebSocket {
heartCheckData = DEFAULT_CHECK_DATA;
heartCheckTimeout = DEFAULT_CHECK_TIME;
heartCheckInterval = null;
heartCheckCount = DEFAULT_CHECK_COUNT
constructor(options, dep, reconnectCount = 0) {
let _baseURL = baseURL
const { url, protocols, query = {}, greet = null, customBase = null } = options;
const _queryParams = Object.keys(query).reduce((str, key) => {
if (typeof query[key] !== 'object' && typeof query[key] !== 'function') {
return str += str.length > 0 ? `&${key}=${query[key]}` : `${key}=${query[key]}`;
} else {
return str;
}
}, '');
if (customBase) {
_baseURL = customBase
}
super(`${_baseURL}${url}?${_queryParams}`, protocols);
this._currentOptions = options;
this._dep = dep;
this._reconnectCount = reconnectCount;
greet && Object.assign(this, {
heartCheckData: greet
})
this.initSocket();
}
// 初始化WebSocket
initSocket() {
// 监听webSocket的事件
this.onopen = function (e) {
this._dep.notify('open', e);
this.heartCheckStart();
}
this.onclose = function (e) {
this._dep.notify('close', e);
// 如果WebSocket是非正常关闭 则进行重连
if (e.code === CLOSE_ABNORMAL) {
if (this._reconnectCount < this.heartCheckCount) {
this._reconnectCount++;
const _socket = new Socket(this._currentOptions, this._dep, this._reconnectCount);
this._dep.notify('reconnect', _socket);
} else {
return modal.msgError('WebSocket重连失败, 请联系技术客服!');
}
}
}
this.onerror = function (e) {
this._dep.notify('error', e);
}
this.onmessage = function (e) {
// 如果后端返回的是二进制数据
if (e.data instanceof Blob) {
const reader = new FileReader()
reader.readAsArrayBuffer(e.data)
reader.onload = (ev) => {
if (ev.target.readyState === FileReader.DONE) {
this._dep.notify('message', ev.target?.result);
}
}
} else {
// 处理普通数据
try {
const _parseData = JSON.parse(e.data);
this._dep.notify('message', _parseData);
} catch (error) {
console.log(error)
}
}
}
}
// 订阅事件
subscribe(eventType, callback) {
if (typeof callback !== 'function') throw new Error('The second param is must be a function');
if (!EventTypes.includes(eventType)) throw new Error('The first param is not supported');
this._dep.depend(eventType, callback);
}
// 发送消息
sendMessage(data, options = {}) {
const { transformJSON = true } = options;
let result = data;
if (transformJSON) {
result = JSON.stringify(data);
}
this.send(result);
}
// 关闭WebSocket
closeSocket(code, reason) {
this.close(code, reason);
}
// 开始心跳检测
heartCheckStart() {
this.heartCheckInterval = setInterval(() => {
if (this.readyState === this.OPEN) {
let transformJSON = typeof this.heartCheckData === 'object'
this.sendMessage(this.heartCheckData, { transformJSON });
} else {
this.clearHeartCheck();
}
}, this.heartCheckTimeout)
}
// 清除心跳检测
clearHeartCheck() {
clearInterval(this.heartCheckInterval);
}
// 重置心跳检测
resetHeartCheck() {
clearInterval(this.heartCheckInterval);
this.heartCheckStart();
}
}
// 默认的配置项
const defaultOptions = {
url: '',
protocols: '',
query: {},
}
export const useSocket = (options = defaultOptions) => {
if (!window.WebSocket) return modal.msgWarning('您的浏览器不支持WebSocket, 请更换浏览器!');
const dep = new EventMap();
const reconnectCount = 0;
return new Socket(options, dep, reconnectCount);
}
首先暴露了一个 useSocket 函数,该函数接收一个 options 配置项参数,支持的参数有:
url:要连接的 WebSocket URL;
protocols:一个协议字符串或者一个包含协议字符串的数组;
query:可以通过 URL 传递给后端的查询参数;
greet:心跳检测的打招呼信息;
customBase:自定义的 baseURL ,否则默认使用环境变量中定义的 env.VITE_APP_BASE_WS。
在调用该函数后,首先会判断当前用户的浏览器是否支持 WebSocket,如果不支持给予用户提示。
然后实例化了一个 EventMap 类的实例对象 dep,可以把它当作是一个依赖收集桶,当用户订阅了某个 WebSocket 事件时,将收集这个事件对应的回调作为依赖,在事件触发时,再通知该依赖,然后调用该事件对应的回调函数。
接下来定义了一个初始的重连次数记录值 reconnectCount 为 0,每当这个 WebSocket 重连时,该值会自增。
之后实例化了自己封装的 Socket 类,并传入了上面的三个参数。
在 Socket 类的构造函数 constructor 中,先取出配置项,把 query 内的参数拼接在 URL 上,然后使用 super 调用父类的构造函数进行建立 WebSocket 连接。
之后缓存了当前 Socket 实例化时的参数,再调用 initSocket() 方法去进行 WebSocket 事件的监听:
onopen:触发 dep 内 open 对应的回调函数并且打开心跳检测;
onclose:触发 dep 内 close 对应的回调函数并且对关闭的 code 码进行判断,如果是非正常关闭连接,将会进行重连,如果重连次数达到阈值,则通知给用户;
onerror:触发 dep 内 error 对应的回调函数;
onmessage:接收到服务端返回的数据,可以先根据自身业务做一些预处理,比如我就根据不同的数据类型进行了数据解析的预处理,之后再触发 dep 内 message 对应的回调函数并传入处理过后的数据。
也暴露了一些成员方法以供实例对象使用:
subscribe:订阅 WebSocket 事件,传入事件类型并须是 EventTypes 内的类型之一,第二个参数则是回调函数;
sendMessage:同样的,在给服务端发送数据之前也可以根据自身业务做一些预处理,比如将需要转成 JSON 的数据,在这里统一转换后再发送给服务端;
closeSocket:关闭 WebSocket 连接;
heartCheckStart:开始心跳检测,会创建一个定时器,在一定时间之后(默认是55s)给服务端发送信息确认连接是否正常;
clearHeartCheck:清除心跳检测定时器(如果当前 WebSocket 连接已经关闭,则自动清除);
resetHeartCheck:重置心跳检测定时器。
使用
// xx.jsx or xx.vue
import { useSocket } from './socket.js'
const socket = ref(null) // WebSocket实例
const initWebSocket = () => {
const options = {
url: '/<your url>',
query: {
// something params
},
}
socket.value = useSocket(options)
socket.value.subscribe('open', () => {
console.log('WebSocket连接成功!')
const greet = 'hello'
// 发送打招呼消息
socket.value.sendMessage(greet)
})
socket.value.subscribe('close', reason => {
console.log('WebSocket连接关闭!', reason)
})
socket.value.subscribe('message', result => {
console.log('WebSocket接收到消息:', result)
})
socket.value.subscribe('error', err => {
console.log('WebSocket捕获错误:', err)
})
socket.value.subscribe('reconnect', _socket => {
console.log('WebSocket断开重连:', _socket)
socket.value = _socket
})
}
initWebSocket()
debug心跳检测是否有效
// 测试心跳检测重连 手动模拟断开的情况
if (this._reconnectCount > 0) return;
const tempTimer = setInterval(() => {
this.close();
if (this._reconnectCount < 3) {
console.log('重连');
this._reconnectCount++;
const _socket = new Socket(this._currentOptions, this._dep, this._reconnectCount);
this._dep.notify('reconnect', _socket);
} else {
return clearInterval(tempTimer);
}
}, 3 * 1000)