feat(unstable): 断开连接时储存事件并重发

This commit is contained in:
CrescentLeaf
2025-11-30 01:45:42 +08:00
parent 4b5f0bcdd6
commit d57b023769
5 changed files with 52 additions and 28 deletions

View File

@@ -7,6 +7,7 @@ import EventCallbackFunction from "../typedef/EventCallbackFunction.ts"
import BaseApi from "./BaseApi.ts" import BaseApi from "./BaseApi.ts"
import DataWrongError from "./DataWrongError.ts" import DataWrongError from "./DataWrongError.ts"
import chalk from "chalk" import chalk from "chalk"
import EventStorer from "./EventStorer.ts";
function stringifyNotIncludeArrayBuffer(value: any) { function stringifyNotIncludeArrayBuffer(value: any) {
return JSON.stringify(value, (_k, v) => { return JSON.stringify(value, (_k, v) => {
@@ -46,7 +47,7 @@ export default class ApiManager {
} }
static clients: { [key: string]: { [key: string]: SocketIo.Socket<SocketIo.DefaultEventsMap, SocketIo.DefaultEventsMap, SocketIo.DefaultEventsMap, any> } } = {} static clients: { [key: string]: { [key: string]: SocketIo.Socket<SocketIo.DefaultEventsMap, SocketIo.DefaultEventsMap, SocketIo.DefaultEventsMap, any> } } = {}
static checkUserIsOnline(userId: string) { static checkUserIsOnline(userId: string) {
return this.getUserClientSockets(userId) != null return this.getUserClientSockets(userId) != null && Object.keys(this.getUserClientSockets(userId)).length > 0
} }
/** /**
* 獲取用戶所有的客戶端表 (格式遵循 設備ID_當前Session) * 獲取用戶所有的客戶端表 (格式遵循 設備ID_當前Session)
@@ -79,13 +80,15 @@ export default class ApiManager {
else { else {
console.log(chalk.green('[断]') + ` ${ip} disconnected`) console.log(chalk.green('[断]') + ` ${ip} disconnected`)
delete this.clients[clientInfo.userId][deviceId + '_' + sessionId] delete this.clients[clientInfo.userId][deviceId + '_' + sessionId]
if (Object.keys(this.clients[clientInfo.userId]).length == 0)
EventStorer.getInstanceForUser(clientInfo.userId).clearEvents()
} }
}) })
console.log(chalk.yellow('[连]') + ` ${ip} connected`) console.log(chalk.yellow('[连]') + ` ${ip} connected`)
socket.on("The_White_Silk", async (name: string, args: { [key: string]: unknown }, callback_: (ret: ApiCallbackMessage) => void) => { socket.on("The_White_Silk", async (name: string, args: { [key: string]: unknown }, callback_: (ret: ApiCallbackMessage) => void) => {
function callback(ret: ApiCallbackMessage) { function callback(ret: ApiCallbackMessage) {
console.log(chalk.blue('[]') + ` ${ip} <- ${ret.code == 200 ? chalk.green(ret.msg) : chalk.red(ret.msg)} [${ret.code}]${ret.data ? (' <extras: ' + stringifyNotIncludeArrayBuffer(ret.data) + '>') : ''}`) console.log(chalk.blue('[]') + ` ${ip} <- ${ret.code == 200 ? chalk.green(ret.msg) : chalk.red(ret.msg)} [${ret.code}]${ret.data ? (' <extras: ' + stringifyNotIncludeArrayBuffer(ret.data) + '>') : ''}`)
return callback_(ret) return callback_(ret)
} }
async function checkIsPromiseAndAwait(value: Promise<unknown> | unknown) { async function checkIsPromiseAndAwait(value: Promise<unknown> | unknown) {

View File

@@ -5,6 +5,8 @@ import User from "../data/User.ts"
import Token from "./Token.ts" import Token from "./Token.ts"
import TokenManager from './TokenManager.ts' import TokenManager from './TokenManager.ts'
import * as SocketIo from "socket.io" import * as SocketIo from "socket.io"
import EventStorer from "./EventStorer.ts"
import chalk from "chalk"
export default abstract class BaseApi { export default abstract class BaseApi {
abstract getName(): string abstract getName(): string
@@ -31,18 +33,23 @@ export default abstract class BaseApi {
if (!name.startsWith(this.getName() + ".")) throw Error("注冊的事件應該與接口集合命名空間相匹配: " + name) if (!name.startsWith(this.getName() + ".")) throw Error("注冊的事件應該與接口集合命名空間相匹配: " + name)
ApiManager.addEventListener(name, func) ApiManager.addEventListener(name, func)
} }
emitToClient(client: SocketIo.Socket, name: ClientEvent, args: { [key: string]: unknown }) { emitToClient(client: SocketIo.Socket, name: ClientEvent, args: { [key: string]: unknown }, user: User | string, deviceSession: string) {
client.emit("The_White_Silk", name, args) console.log(chalk.magenta('[发]') + ` ${client.handshake.address} <- ${chalk.yellow(name)} ${' <extras: ' + JSON.stringify(args) + '>'}`)
client.timeout(5000).emit("The_White_Silk", name, args, (err: Error) => {
if (err) EventStorer.getInstanceForUser(user).addEvent(name, args, deviceSession)
})
} }
boardcastToUsers(users: string[], name: ClientEvent, args: { [key: string]: unknown }) { boardcastToUsers(users: string[], name: ClientEvent, args: { [key: string]: unknown }) {
for (const user of users) { for (const user of users) {
if (ApiManager.checkUserIsOnline(user)) { if (ApiManager.checkUserIsOnline(user)) {
const sockets = ApiManager.getUserClientSockets(user) const sockets = ApiManager.getUserClientSockets(user)
for (const socket of Object.keys(sockets)) for (const deviceSession of Object.keys(sockets))
this.emitToClient(sockets[socket], name, args) if (sockets[deviceSession].connected)
} else { this.emitToClient(sockets[deviceSession], name, args, user, deviceSession)
// TODO: EventStore else
} EventStorer.getInstanceForUser(user).addEvent(name, args, deviceSession)
} else
EventStorer.getInstanceForUser(user).clearEvents()
} }
} }
} }

View File

@@ -1,4 +1,7 @@
import { ClientEvent } from "./ApiDeclare.ts"
export default class EventBean { export default class EventBean {
declare event_name: string declare event_name: ClientEvent
declare data: unknown declare data: { [key: string]: unknown }
declare device_session: string
} }

View File

@@ -2,8 +2,9 @@ import { DatabaseSync } from "node:sqlite"
import path from 'node:path' import path from 'node:path'
import config from "../config.ts" import config from "../config.ts"
import User from "../data/User.ts"; import User from "../data/User.ts"
import EventBean from "./EventBean.ts"; import EventBean from "./EventBean.ts"
import { ClientEvent } from "./ApiDeclare.ts"
export default class EventStorer { export default class EventStorer {
static database: DatabaseSync = this.init() static database: DatabaseSync = this.init()
@@ -13,41 +14,44 @@ export default class EventStorer {
return db return db
} }
static getInstanceForUser(user: User) { static getInstanceForUser(user: User | string) {
return new EventStorer(user) return new EventStorer(user)
} }
declare user: User declare user_id: string
constructor(user: User) { constructor(user: User | string) {
this.user = user this.user_id = user instanceof User ? user.bean.id : user
EventStorer.database.exec(` EventStorer.database.exec(`
CREATE TABLE IF NOT EXISTS ${this.getTableName()} ( CREATE TABLE IF NOT EXISTS ${this.getTableName()} (
/* 序号 */ count INTEGER PRIMARY KEY AUTOINCREMENT, /* 序号 */ count INTEGER PRIMARY KEY AUTOINCREMENT,
/* 事件 */ event_name TEXT NOT NULL, /* 事件 */ event_name TEXT NOT NULL,
/* 数据 */ data TEXT NOT NULL, /* 数据 */ data TEXT NOT NULL,
/* 会话 */ device_session TEXT
); );
`) `)
} }
protected getTableName() { protected getTableName() {
return `events_${this.user.bean.id}` return `events_${this.user_id.replaceAll('-', '_')}`
} }
addEvent(eventName: string, data: unknown) { addEvent(eventName: ClientEvent, data: object, device_session: string) {
EventStorer.database.prepare(`INSERT INTO ${this.getTableName()} ( EventStorer.database.prepare(`INSERT INTO ${this.getTableName()} (
event_name, event_name,
data data,
) VALUES (?, ?);`).run( device_session
) VALUES (?, ?, ?);`).run(
eventName, eventName,
JSON.stringify(data) JSON.stringify(data),
device_session || null
) )
} }
getEvents() { getEvents(device_session?: string) {
return EventStorer.database.prepare(`SELECT * FROM ${this.getTableName()};`).all().map((v: any) => ({ return EventStorer.database.prepare(`SELECT * FROM ${this.getTableName()}${device_session ? ` WHERE device_session = ?` : ''};`).all(...(device_session ? [ device_session ] : [])).map((v: any) => ({
...v, ...v,
data: JSON.parse(v.data) data: JSON.parse(v.data)
})) as unknown as EventBean[] })) as unknown as EventBean[]
} }
clearEvents() { clearEvents(device_session?: string) {
EventStorer.database.prepare(`DELETE FROM ${this.getTableName()};`).run() EventStorer.database.prepare(`DELETE FROM ${this.getTableName()}${device_session ? ` WHERE device_session = ?` : ''};`).run(...(device_session ? [ device_session ] : []))
} }
} }

View File

@@ -6,6 +6,7 @@ import ChatPrivate from "../data/ChatPrivate.ts"
import Chat from "../data/Chat.ts" import Chat from "../data/Chat.ts"
import chalk from "chalk" import chalk from "chalk"
import ApiManager from "./ApiManager.ts" import ApiManager from "./ApiManager.ts"
import EventStorer from "./EventStorer.ts";
export default class UserApi extends BaseApi { export default class UserApi extends BaseApi {
override getName(): string { override getName(): string {
@@ -37,10 +38,16 @@ export default class UserApi extends BaseApi {
clientInfo.userId = access_token.author clientInfo.userId = access_token.author
console.log(chalk.green('[验]') + ` ${access_token.author} authed on Client ${deviceId} (ip = ${ip})`) console.log(chalk.green('[验]') + ` ${access_token.author} authed on Client ${deviceId} (ip = ${ip})`)
const deviceSession = deviceId + '_' + sessionId
if (ApiManager.clients[clientInfo.userId] == null) ApiManager.clients[clientInfo.userId] = { if (ApiManager.clients[clientInfo.userId] == null) ApiManager.clients[clientInfo.userId] = {
[deviceId + '_' + sessionId]: socket [deviceSession]: socket
} }
else ApiManager.clients[clientInfo.userId][deviceId + '_' + sessionId] = socket else ApiManager.clients[clientInfo.userId][deviceSession] = socket
// 事件恢复
console.log(EventStorer.getInstanceForUser(access_token.author).getEvents(deviceSession))
for (const event of EventStorer.getInstanceForUser(access_token.author).getEvents(deviceSession))
this.emitToClient(socket, event.event_name, event.data, access_token.author, deviceSession)
return { return {
msg: "成功", msg: "成功",