添加异步消息队列

This commit is contained in:
Sheyiyuan 2024-11-30 10:25:12 +08:00
parent 02bb43bb45
commit bb3f84bb70
2 changed files with 21 additions and 8 deletions

View File

@ -12,20 +12,20 @@ import (
var gProtocolAddr string
// WebSocketHandler 接收WebSocket连接处的消息并处理
func WebSocketHandler(protocolAddr string) (*websocket.Conn, error) {
func WebSocketHandler(protocolAddr string) error {
// 保存全局变量
gProtocolAddr = protocolAddr
// 解析连接URL
u, err := url.Parse(protocolAddr)
if err != nil {
log.Println("[ERROR] Parse URL error:", err)
return nil, err
return err
}
conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
log.Println("[ERROR] Dial error:", err)
return nil, err
return err
}
defer func(conn *websocket.Conn) {
err := conn.Close()
@ -36,17 +36,30 @@ func WebSocketHandler(protocolAddr string) (*websocket.Conn, error) {
log.Println("[INFO] New connection established.")
// 定义通道,缓存消息和消息类型,防止消息处理阻塞
messageChan := make(chan []byte, 32)
messageTypeChan := make(chan int, 32)
for {
// 接收消息并放入通道
messageType, message, err := conn.ReadMessage()
if err != nil {
log.Println("[ERROR] Read error:", err)
break
return err
}
messageChan <- message
messageTypeChan <- messageType
// 将接收到的消息交给另一个函数处理
processMessage(messageType, message)
// 启动一个新的goroutine来处理消息
go func() {
defer func() {
// 处理完成后从通道中移除消息
<-messageChan
<-messageTypeChan
}()
processMessage(messageType, message)
}()
}
return conn, nil
}
// processMessage 处理接收到的消息

View File

@ -280,7 +280,7 @@ func startProtocol() {
//链接协议
// 启动 WebSocket 处理程序
log.Println("[INFO] 正在启动WebSocket链接程序...")
_, err = protocol.WebSocketHandler(protocolAddr)
err = protocol.WebSocketHandler(protocolAddr)
if err != nil {
// 如果发生错误,记录错误并退出程序
log.Fatal(err)