From 36e0c17ad71f8ced571d9cda32f26ac18e518147 Mon Sep 17 00:00:00 2001 From: tone <3341154833@qq.com> Date: Sat, 15 Nov 2025 13:03:09 +0800 Subject: [PATCH] refactor: move call handlers to rpcSession --- src/core/RPCConnection.ts | 186 ++------------------------------------ src/core/RPCSession.ts | 178 +++++++++++++++++++++++++++++++++++- 2 files changed, 183 insertions(+), 181 deletions(-) diff --git a/src/core/RPCConnection.ts b/src/core/RPCConnection.ts index c7c0b83..92ce920 100644 --- a/src/core/RPCConnection.ts +++ b/src/core/RPCConnection.ts @@ -1,10 +1,6 @@ import { EventEmitter } from "@/utils/EventEmitter"; import { SocketConnection } from "./SocketConnection"; import { RPCPacket } from "./RPCPacket"; -import { makeCallPacket, makeCallResponsePacket, parseCallPacket, parseCallResponsePacket } from "./RPCCommon"; -import { RPCProvider } from "./RPCProvider"; -import { RPCError, RPCErrorCode } from "./RPCError"; -import { RPCSession } from "./RPCSession"; interface RPCConnectionEvents { call: RPCPacket; @@ -15,34 +11,15 @@ interface RPCConnectionEvents { closed: void; } -class CallResponseEmitter extends EventEmitter<{ - [id: string]: RPCPacket; -}> { - emitAll(packet: RPCPacket) { - this.events.forEach(subscribers => { - subscribers.forEach(fn => fn(packet)); - }) - } -} - export class RPCConnection extends EventEmitter { closed: boolean = false; - private callResponseEmitter = new CallResponseEmitter(); - private rpcSession!: RPCSession; - constructor(public socket: SocketConnection) { super(); socket.on('closed', () => { - this.emit('closed'); - this.callResponseEmitter.emitAll(makeCallResponsePacket({ - status: 'error', - requestPacketId: 'connection error', - errorCode: RPCErrorCode.CONNECTION_DISCONNECTED, - })); - this.callResponseEmitter.removeAllListeners(); this.closed = true; + this.emit('closed'); }); socket.on('msg', (msg) => { @@ -70,162 +47,17 @@ export class RPCConnection extends EventEmitter { this.emit('unknownPacket', packet); }); - - /** route by packet.id */ - this.on('callResponse', (packet) => { - this.callResponseEmitter.emit(packet.id, packet); - }) - } - - public setRPCSession(session: RPCSession) { - this.rpcSession = session; - } - - /** @throws */ - public async callRequest(options: { - fnPath: string; - args: any[]; - timeout: number; - }): Promise { - if (this.closed) { - throw new RPCError({ - errorCode: RPCErrorCode.CONNECTION_DISCONNECTED, - }); - } - - const { fnPath, args } = options; - const packet = makeCallPacket({ - fnPath, - args - }); - - let resolve: (data: any) => void; - let reject: (data: any) => void; - const promise = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - - const cancelTimeoutTimer = (() => { - const t = setTimeout(() => { - reject(new RPCError({ - errorCode: RPCErrorCode.TIMEOUT_ERROR, - })) - }, options.timeout); - - return () => clearTimeout(t); - })(); - - const handleCallResponsePacket = (packet: RPCPacket) => { - const result = parseCallResponsePacket(packet); - if (result === null) { - return reject(new RPCError({ - errorCode: RPCErrorCode.UNKNOWN_ERROR, - }));; - } - - const { success, error } = result; - if (success) { - return resolve(success.data); - } - - if (error) { - return reject(new RPCError({ - errorCode: error.errorCode, - reason: error.reason - })); - } - - return reject(new RPCError({ - errorCode: RPCErrorCode.UNKNOWN_ERROR, - }));; - } - this.callResponseEmitter.on(packet.id, handleCallResponsePacket); - - /** send call request */ - this.socket.send(packet); - - return promise.finally(() => { - this.callResponseEmitter.removeAllListeners(packet.id); - cancelTimeoutTimer(); - }); - } - - public onCallRequest(getProvider: () => RPCProvider | undefined) { - this.on('call', async (packet) => { - const request = parseCallPacket(packet); - if (request === null) { - return this.socket.send(makeCallResponsePacket({ - status: 'error', - requestPacket: packet, - errorCode: RPCErrorCode.CALL_PROTOCOL_ERROR, - })).catch(() => { }) - } - - // call the function - const provider = getProvider(); - if (!provider) { - return this.socket.send(makeCallResponsePacket({ - status: 'error', - requestPacket: packet, - errorCode: RPCErrorCode.PROVIDER_NOT_AVAILABLE, - })) - } - - const { fnPath, args } = request; - const fn = this.getProviderFunction(provider, fnPath); - if (!fn) { - return this.socket.send(makeCallResponsePacket({ - status: 'error', - requestPacket: packet, - errorCode: RPCErrorCode.METHOD_NOT_FOUND, - })) - } - - try { - const result = await fn(...args); - this.socket.send(makeCallResponsePacket({ - status: 'success', - requestPacket: packet, - data: result, - })) - } catch (error) { - this.socket.send(makeCallResponsePacket({ - status: 'error', - requestPacket: packet, - errorCode: RPCErrorCode.SERVER_ERROR, - ...(error instanceof RPCError ? { - errorCode: error.errorCode, - reason: error.reason, - } : {}) - })) - } - }) - } - - private getProviderFunction(provider: RPCProvider, fnPath: string) { - const paths = fnPath.split(':'); - let fnThis: any = provider; - let fn: any = provider; - try { - while (paths.length) { - const path = paths.shift()!; - fn = fn[path]; - if (paths.length !== 0) { - fnThis = fn; - } - } - if (typeof fn === 'function') { - return fn.bind(fnThis); - } - - throw new Error(); - } catch (error) { - return null; - } } public async close() { return this.socket.close(); } + + public async send(data: RPCPacket) { + if (this.closed) { + return; + } + + return this.socket.send(data); + } } \ No newline at end of file diff --git a/src/core/RPCSession.ts b/src/core/RPCSession.ts index 61a6034..c231a93 100644 --- a/src/core/RPCSession.ts +++ b/src/core/RPCSession.ts @@ -4,17 +4,67 @@ import { RPCHandler } from "./RPCHandler"; import { RPCProvider } from "./RPCProvider"; import { RPCClient } from "./RPCClient"; import { RPCServer } from "./RPCServer"; +import { RPCError, RPCErrorCode } from "./RPCError"; +import { makeCallPacket, makeCallResponsePacket, parseCallPacket, parseCallResponsePacket } from "./RPCCommon"; +import { RPCPacket } from "./RPCPacket"; +import { EventEmitter } from "@/utils/EventEmitter"; +function getProviderFunction(provider: RPCProvider, fnPath: string) { + const paths = fnPath.split(':'); + let fnThis: any = provider; + let fn: any = provider; + try { + while (paths.length) { + const path = paths.shift()!; + fn = fn[path]; + if (paths.length !== 0) { + fnThis = fn; + } + } + if (typeof fn === 'function') { + return fn.bind(fnThis); + } + + throw new Error(); + } catch (error) { + return null; + } +} + +class CallResponseEmitter extends EventEmitter<{ + [id: string]: RPCPacket; +}> { + emitAll(packet: RPCPacket) { + this.events.forEach(subscribers => { + subscribers.forEach(fn => fn(packet)); + }) + } +} export class RPCSession { + public callResponseEmitter = new CallResponseEmitter(); + constructor( public readonly connection: RPCConnection, public readonly rpcHandler: RPCHandler, - public readonly provider: RPCClient | RPCServer, + public readonly rpcProvider: RPCClient | RPCServer, ) { - connection.setRPCSession(this); - connection.onCallRequest(rpcHandler.getProvider.bind(rpcHandler)); + /** route by packet.id */ + this.connection.on('callResponse', (packet) => { + this.callResponseEmitter.emit(packet.id, packet); + }); + + this.connection.on('closed', () => { + this.callResponseEmitter.emitAll(makeCallResponsePacket({ + status: 'error', + requestPacketId: 'connection error', + errorCode: RPCErrorCode.CONNECTION_DISCONNECTED, + })); + this.callResponseEmitter.removeAllListeners(); + }); + + this.connection.on('call', this.onCallRequest.bind(this)); } getAPI(): ToDeepPromise { @@ -27,7 +77,7 @@ export class RPCSession { return createProxy(newPath); }, apply: (target, thisArg, args) => { - return this.connection.callRequest({ + return this.callRequest({ fnPath: path.join(':'), args: args, /** @todo accept from caller */ @@ -41,4 +91,124 @@ export class RPCSession { return createProxy() as unknown as ToDeepPromise; } + + /** @throws */ + public async callRequest(options: { + fnPath: string; + args: any[]; + timeout: number; + }): Promise { + if (this.connection.closed) { + throw new RPCError({ + errorCode: RPCErrorCode.CONNECTION_DISCONNECTED, + }); + } + + const { fnPath, args } = options; + const packet = makeCallPacket({ + fnPath, + args + }); + + let resolve: (data: any) => void; + let reject: (data: any) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + + const cancelTimeoutTimer = (() => { + const t = setTimeout(() => { + reject(new RPCError({ + errorCode: RPCErrorCode.TIMEOUT_ERROR, + })) + }, options.timeout); + + return () => clearTimeout(t); + })(); + + const handleCallResponsePacket = (packet: RPCPacket) => { + const result = parseCallResponsePacket(packet); + if (result === null) { + return reject(new RPCError({ + errorCode: RPCErrorCode.UNKNOWN_ERROR, + }));; + } + + const { success, error } = result; + if (success) { + return resolve(success.data); + } + + if (error) { + return reject(new RPCError({ + errorCode: error.errorCode, + reason: error.reason + })); + } + + return reject(new RPCError({ + errorCode: RPCErrorCode.UNKNOWN_ERROR, + }));; + } + this.callResponseEmitter.once(packet.id, handleCallResponsePacket); + + /** send call request */ + this.connection.send(packet); + + return promise.finally(() => { + this.callResponseEmitter.removeAllListeners(packet.id); + cancelTimeoutTimer(); + }); + } + + private async onCallRequest(packet: RPCPacket) { + const request = parseCallPacket(packet); + if (request === null) { + return this.connection.send(makeCallResponsePacket({ + status: 'error', + requestPacket: packet, + errorCode: RPCErrorCode.CALL_PROTOCOL_ERROR, + })).catch(() => { }) + } + + // call the function + const provider = this.rpcHandler.getProvider(); + if (!provider) { + return this.connection.send(makeCallResponsePacket({ + status: 'error', + requestPacket: packet, + errorCode: RPCErrorCode.PROVIDER_NOT_AVAILABLE, + })) + } + + const { fnPath, args } = request; + const fn = getProviderFunction(provider, fnPath); + if (!fn) { + return this.connection.send(makeCallResponsePacket({ + status: 'error', + requestPacket: packet, + errorCode: RPCErrorCode.METHOD_NOT_FOUND, + })) + } + + try { + const result = await fn(...args); + this.connection.send(makeCallResponsePacket({ + status: 'success', + requestPacket: packet, + data: result, + })) + } catch (error) { + this.connection.send(makeCallResponsePacket({ + status: 'error', + requestPacket: packet, + errorCode: RPCErrorCode.SERVER_ERROR, + ...(error instanceof RPCError ? { + errorCode: error.errorCode, + reason: error.reason, + } : {}) + })) + } + } } \ No newline at end of file