11import { Redis , RedisOptions } from 'ioredis'
2- import { isEqual } from 'lodash'
32import { BufferMemory , BufferMemoryInput } from 'langchain/memory'
4- import { RedisChatMessageHistory , RedisChatMessageHistoryInput } from '@langchain/community/stores/message/ioredis'
53import { mapStoredMessageToChatMessage , BaseMessage , AIMessage , HumanMessage } from '@langchain/core/messages'
64import { INode , INodeData , INodeParams , ICommonObject , MessageType , IMessage , MemoryMethods , FlowiseMemory } from '../../../src/Interface'
75import {
@@ -12,42 +10,6 @@ import {
1210 mapChatMessageToBaseMessage
1311} from '../../../src/utils'
1412
15- let redisClientSingleton : Redis
16- let redisClientOption : RedisOptions
17- let redisClientUrl : string
18-
19- const getRedisClientbyOption = ( option : RedisOptions ) => {
20- if ( ! redisClientSingleton ) {
21- // if client doesn't exists
22- redisClientSingleton = new Redis ( option )
23- redisClientOption = option
24- return redisClientSingleton
25- } else if ( redisClientSingleton && ! isEqual ( option , redisClientOption ) ) {
26- // if client exists but option changed
27- redisClientSingleton . quit ( )
28- redisClientSingleton = new Redis ( option )
29- redisClientOption = option
30- return redisClientSingleton
31- }
32- return redisClientSingleton
33- }
34-
35- const getRedisClientbyUrl = ( url : string ) => {
36- if ( ! redisClientSingleton ) {
37- // if client doesn't exists
38- redisClientSingleton = new Redis ( url )
39- redisClientUrl = url
40- return redisClientSingleton
41- } else if ( redisClientSingleton && url !== redisClientUrl ) {
42- // if client exists but option changed
43- redisClientSingleton . quit ( )
44- redisClientSingleton = new Redis ( url )
45- redisClientUrl = url
46- return redisClientSingleton
47- }
48- return redisClientSingleton
49- }
50-
5113class RedisBackedChatMemory_Memory implements INode {
5214 label : string
5315 name : string
@@ -114,11 +76,11 @@ class RedisBackedChatMemory_Memory implements INode {
11476 }
11577
11678 async init ( nodeData : INodeData , _ : string , options : ICommonObject ) : Promise < any > {
117- return await initalizeRedis ( nodeData , options )
79+ return await initializeRedis ( nodeData , options )
11880 }
11981}
12082
121- const initalizeRedis = async ( nodeData : INodeData , options : ICommonObject ) : Promise < BufferMemory > => {
83+ const initializeRedis = async ( nodeData : INodeData , options : ICommonObject ) : Promise < BufferMemory > => {
12284 const sessionTTL = nodeData . inputs ?. sessionTTL as number
12385 const memoryKey = nodeData . inputs ?. memoryKey as string
12486 const sessionId = nodeData . inputs ?. sessionId as string
@@ -127,120 +89,102 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom
12789 const credentialData = await getCredentialData ( nodeData . credential ?? '' , options )
12890 const redisUrl = getCredentialParam ( 'redisUrl' , credentialData , nodeData )
12991
130- let client : Redis
131-
132- if ( ! redisUrl || redisUrl === '' ) {
133- const username = getCredentialParam ( 'redisCacheUser' , credentialData , nodeData )
134- const password = getCredentialParam ( 'redisCachePwd' , credentialData , nodeData )
135- const portStr = getCredentialParam ( 'redisCachePort' , credentialData , nodeData )
136- const host = getCredentialParam ( 'redisCacheHost' , credentialData , nodeData )
137- const sslEnabled = getCredentialParam ( 'redisCacheSslEnabled' , credentialData , nodeData )
138-
139- const tlsOptions = sslEnabled === true ? { tls : { rejectUnauthorized : false } } : { }
140-
141- client = getRedisClientbyOption ( {
142- port : portStr ? parseInt ( portStr ) : 6379 ,
143- host,
144- username,
145- password,
146- ...tlsOptions
147- } )
148- } else {
149- client = getRedisClientbyUrl ( redisUrl )
150- }
151-
152- let obj : RedisChatMessageHistoryInput = {
153- sessionId,
154- client
155- }
156-
157- if ( sessionTTL ) {
158- obj = {
159- ...obj ,
160- sessionTTL
161- }
162- }
163-
164- const redisChatMessageHistory = new RedisChatMessageHistory ( obj )
92+ const redisOptions = redisUrl
93+ ? redisUrl
94+ : ( {
95+ port : parseInt ( getCredentialParam ( 'redisCachePort' , credentialData , nodeData ) || '6379' ) ,
96+ host : getCredentialParam ( 'redisCacheHost' , credentialData , nodeData ) ,
97+ username : getCredentialParam ( 'redisCacheUser' , credentialData , nodeData ) ,
98+ password : getCredentialParam ( 'redisCachePwd' , credentialData , nodeData ) ,
99+ tls : getCredentialParam ( 'redisCacheSslEnabled' , credentialData , nodeData ) ? { rejectUnauthorized : false } : undefined
100+ } as RedisOptions )
165101
166102 const memory = new BufferMemoryExtended ( {
167103 memoryKey : memoryKey ?? 'chat_history' ,
168- chatHistory : redisChatMessageHistory ,
169104 sessionId,
170105 windowSize,
171- redisClient : client ,
172- sessionTTL
106+ sessionTTL ,
107+ redisOptions
173108 } )
174109
175110 return memory
176111}
177112
178113interface BufferMemoryExtendedInput {
179- redisClient : Redis
180114 sessionId : string
181115 windowSize ?: number
182116 sessionTTL ?: number
117+ redisOptions : RedisOptions | string
183118}
184119
185120class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods {
186121 sessionId = ''
187- redisClient : Redis
188122 windowSize ?: number
189123 sessionTTL ?: number
124+ redisOptions : RedisOptions | string
190125
191126 constructor ( fields : BufferMemoryInput & BufferMemoryExtendedInput ) {
192127 super ( fields )
193128 this . sessionId = fields . sessionId
194- this . redisClient = fields . redisClient
195129 this . windowSize = fields . windowSize
196130 this . sessionTTL = fields . sessionTTL
131+ this . redisOptions = fields . redisOptions
132+ }
133+
134+ private async withRedisClient < T > ( fn : ( client : Redis ) => Promise < T > ) : Promise < T > {
135+ const client = typeof this . redisOptions === 'string' ? new Redis ( this . redisOptions ) : new Redis ( this . redisOptions )
136+ try {
137+ return await fn ( client )
138+ } finally {
139+ await client . quit ( )
140+ }
197141 }
198142
199143 async getChatMessages (
200144 overrideSessionId = '' ,
201145 returnBaseMessages = false ,
202146 prependMessages ?: IMessage [ ]
203147 ) : Promise < IMessage [ ] | BaseMessage [ ] > {
204- if ( ! this . redisClient ) return [ ]
205-
206- const id = overrideSessionId ? overrideSessionId : this . sessionId
207- const rawStoredMessages = await this . redisClient . lrange ( id , this . windowSize ? this . windowSize * - 1 : 0 , - 1 )
208- const orderedMessages = rawStoredMessages . reverse ( ) . map ( ( message ) => JSON . parse ( message ) )
209- const baseMessages = orderedMessages . map ( mapStoredMessageToChatMessage )
210- if ( prependMessages ?. length ) {
211- baseMessages . unshift ( ... ( await mapChatMessageToBaseMessage ( prependMessages ) ) )
212- }
213- return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage ( baseMessages )
148+ return this . withRedisClient ( async ( client ) => {
149+ const id = overrideSessionId ? overrideSessionId : this . sessionId
150+ const rawStoredMessages = await client . lrange ( id , this . windowSize ? this . windowSize * - 1 : 0 , - 1 )
151+ const orderedMessages = rawStoredMessages . reverse ( ) . map ( ( message ) => JSON . parse ( message ) )
152+ const baseMessages = orderedMessages . map ( mapStoredMessageToChatMessage )
153+ if ( prependMessages ?. length ) {
154+ baseMessages . unshift ( ... ( await mapChatMessageToBaseMessage ( prependMessages ) ) )
155+ }
156+ return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage ( baseMessages )
157+ } )
214158 }
215159
216160 async addChatMessages ( msgArray : { text : string ; type : MessageType } [ ] , overrideSessionId = '' ) : Promise < void > {
217- if ( ! this . redisClient ) return
218-
219- const id = overrideSessionId ? overrideSessionId : this . sessionId
220- const input = msgArray . find ( ( msg ) => msg . type === 'userMessage' )
221- const output = msgArray . find ( ( msg ) => msg . type === 'apiMessage' )
222-
223- if ( input ) {
224- const newInputMessage = new HumanMessage ( input . text )
225- const messageToAdd = [ newInputMessage ] . map ( ( msg ) => msg . toDict ( ) )
226- await this . redisClient . lpush ( id , JSON . stringify ( messageToAdd [ 0 ] ) )
227- if ( this . sessionTTL ) await this . redisClient . expire ( id , this . sessionTTL )
228- }
161+ await this . withRedisClient ( async ( client ) => {
162+ const id = overrideSessionId ? overrideSessionId : this . sessionId
163+ const input = msgArray . find ( ( msg ) => msg . type === 'userMessage' )
164+ const output = msgArray . find ( ( msg ) => msg . type === 'apiMessage' )
165+
166+ if ( input ) {
167+ const newInputMessage = new HumanMessage ( input . text )
168+ const messageToAdd = [ newInputMessage ] . map ( ( msg ) => msg . toDict ( ) )
169+ await client . lpush ( id , JSON . stringify ( messageToAdd [ 0 ] ) )
170+ if ( this . sessionTTL ) await client . expire ( id , this . sessionTTL )
171+ }
229172
230- if ( output ) {
231- const newOutputMessage = new AIMessage ( output . text )
232- const messageToAdd = [ newOutputMessage ] . map ( ( msg ) => msg . toDict ( ) )
233- await this . redisClient . lpush ( id , JSON . stringify ( messageToAdd [ 0 ] ) )
234- if ( this . sessionTTL ) await this . redisClient . expire ( id , this . sessionTTL )
235- }
173+ if ( output ) {
174+ const newOutputMessage = new AIMessage ( output . text )
175+ const messageToAdd = [ newOutputMessage ] . map ( ( msg ) => msg . toDict ( ) )
176+ await client . lpush ( id , JSON . stringify ( messageToAdd [ 0 ] ) )
177+ if ( this . sessionTTL ) await client . expire ( id , this . sessionTTL )
178+ }
179+ } )
236180 }
237181
238182 async clearChatMessages ( overrideSessionId = '' ) : Promise < void > {
239- if ( ! this . redisClient ) return
240-
241- const id = overrideSessionId ? overrideSessionId : this . sessionId
242- await this . redisClient . del ( id )
243- await this . clear ( )
183+ await this . withRedisClient ( async ( client ) => {
184+ const id = overrideSessionId ? overrideSessionId : this . sessionId
185+ await client . del ( id )
186+ await this . clear ( )
187+ } )
244188 }
245189}
246190
0 commit comments