Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 52 additions & 33 deletions internal/conversation_msg/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
}
startTime = m.SendTime
} else {
c.messagePullMinSeqMap.Delete(conversationID)
// Clear both maps when the user enters the conversation
c.messagePullForwardEndSeqMap.Delete(conversationID)
c.messagePullReverseEndSeqMap.Delete(conversationID)
}
log.ZDebug(ctx, "Assembly conversation parameters", "cost time", time.Since(t), "conversationID",
conversationID, "startTime:", startTime, "count:", req.Count, "startTime", startTime)
Expand All @@ -75,41 +77,48 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
log.ZDebug(ctx, "pull message", "pull cost time", time.Since(t))
t = time.Now()

var thisMinSeq int64
thisMinSeq, messageList = c.LocalChatLog2MsgStruct(ctx, list)
var thisEndSeq int64
thisEndSeq, messageList = c.LocalChatLog2MsgStruct(list, isReverse)
log.ZDebug(ctx, "message convert and unmarshal", "unmarshal cost time", time.Since(t))
t = time.Now()
if !isReverse {
sort.Sort(messageList)
if thisEndSeq != 0 {
c.messagePullForwardEndSeqMap.Store(conversationID, thisEndSeq)
}
} else {
if thisEndSeq != 0 {
c.messagePullReverseEndSeqMap.Store(conversationID, thisEndSeq)
}
}
log.ZDebug(ctx, "sort", "sort cost time", time.Since(t))
messageListCallback.MessageList = messageList
if thisMinSeq != 0 {
c.messagePullMinSeqMap.Store(conversationID, thisMinSeq)
}
return &messageListCallback, nil

return &messageListCallback, nil
}

func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversationID string,
count int, startTime int64, isReverse bool, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) {

var list []*model_struct.LocalChatLog
var list, validMessages []*model_struct.LocalChatLog

// If all retrieved messages are either deleted or filtered out, continue fetching messages from an earlier point.
shouldFetchMoreMessages := func(messages []*model_struct.LocalChatLog) bool {
// Get the number of invalid messages in this batch to recursive fetching from earlier points.
shouldFetchMoreMessagesNum := func(messages []*model_struct.LocalChatLog) int {
if len(messages) == 0 {
return false
return count
}

allDeleted := true
// Represents the number of valid messages in the batch
validateMessageNum := 0
for _, msg := range messages {
if msg.Status < constant.MsgStatusHasDeleted {
allDeleted = false
break
validateMessageNum++
validMessages = append(validMessages, msg)
} else {
log.ZDebug(ctx, "this message has been deleted or exception message", "msg", msg)
}
}
return allDeleted
return count - validateMessageNum
}
getNewStartTime := func(messages []*model_struct.LocalChatLog) int64 {
if len(messages) == 0 {
Expand All @@ -128,39 +137,49 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
return nil, err
}
t = time.Now()
maxSeq := c.validateAndFillInternalGaps(ctx, conversationID, isReverse,
thisStartSeq := c.validateAndFillInternalGaps(ctx, conversationID, isReverse,
count, startTime, &list, messageListCallback)
log.ZDebug(ctx, "internal continuity check", "cost time", time.Since(t))
t = time.Now()
c.validateAndFillInterBlockGaps(ctx, maxSeq, conversationID,
c.validateAndFillInterBlockGaps(ctx, thisStartSeq, conversationID,
isReverse, count, startTime, &list, messageListCallback)
log.ZDebug(ctx, "between continuity check", "cost time", time.Since(t))
t = time.Now()
c.validateAndFillEndBlockContinuity(ctx, conversationID, isReverse,
count, startTime, &list, messageListCallback)
log.ZDebug(ctx, "end continuity check", "cost time", time.Since(t))
// If all retrieved messages are either deleted or filtered out,
//continue fetching recursively until either valid messages are found or all messages have been fetched.
if shouldFetchMoreMessages(list) && !messageListCallback.IsEnd {
return c.fetchMessagesWithGapCheck(ctx, conversationID, count, getNewStartTime(list), isReverse, messageListCallback)
// If the number of valid messages retrieved is less than the count,
// continue fetching recursively until the valid messages are sufficient or all messages have been fetched.
missingCount := shouldFetchMoreMessagesNum(list)
if missingCount > 0 && !messageListCallback.IsEnd {
log.ZDebug(ctx, "fetch more messages", "missingCount", missingCount, "conversationID", conversationID)
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, getNewStartTime(list), isReverse, messageListCallback)
if err != nil {
return nil, err
}
log.ZDebug(ctx, "fetch more messages", "missingMessages", missingMessages)
return append(validMessages, missingMessages...), nil
}

return list, nil
return validMessages, nil
}

func (c *Conversation) LocalChatLog2MsgStruct(ctx context.Context, list []*model_struct.LocalChatLog) (int64, []*sdk_struct.MsgStruct) {
func (c *Conversation) LocalChatLog2MsgStruct(list []*model_struct.LocalChatLog, isReverse bool) (int64, []*sdk_struct.MsgStruct) {
messageList := make([]*sdk_struct.MsgStruct, 0, len(list))
var thisMinSeq int64
var thisEndSeq int64
for _, v := range list {
if v.Seq != 0 && thisMinSeq == 0 {
thisMinSeq = v.Seq
if v.Seq != 0 && thisEndSeq == 0 {
thisEndSeq = v.Seq
}
if v.Seq < thisMinSeq && v.Seq != 0 {
thisMinSeq = v.Seq
}
if v.Status >= constant.MsgStatusHasDeleted {
log.ZDebug(ctx, "this message has been deleted or exception message", "msg", v)
continue
if isReverse {
if v.Seq > thisEndSeq && thisEndSeq != 0 {
thisEndSeq = v.Seq
}

} else {
if v.Seq < thisEndSeq && v.Seq != 0 {
thisEndSeq = v.Seq
}
}
temp := LocalChatLogToMsgStruct(v)

Expand All @@ -169,7 +188,7 @@ func (c *Conversation) LocalChatLog2MsgStruct(ctx context.Context, list []*model
}
messageList = append(messageList, temp)
}
return thisMinSeq, messageList
return thisEndSeq, messageList
}

func (c *Conversation) typingStatusUpdate(ctx context.Context, recvID, msgTip string) error {
Expand Down
136 changes: 69 additions & 67 deletions internal/conversation_msg/conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,29 @@ var SearchContentType = []int{constant.Text, constant.AtText, constant.File}

type Conversation struct {
*interaction.LongConnMgr
conversationSyncer *syncer.Syncer[*model_struct.LocalConversation, pbConversation.GetOwnerConversationResp, string]
db db_interface.DataBase
ConversationListener func() open_im_sdk_callback.OnConversationListener
msgListener func() open_im_sdk_callback.OnAdvancedMsgListener
msgKvListener func() open_im_sdk_callback.OnMessageKvInfoListener
businessListener func() open_im_sdk_callback.OnCustomBusinessListener
recvCH chan common.Cmd2Value
loginUserID string
platformID int32
DataDir string
relation *relation.Relation
group *group.Group
user *user.User
file *file.File
cache *cache.Cache[string, *model_struct.LocalConversation]
maxSeqRecorder MaxSeqRecorder
messagePullMinSeqMap *cache.Cache[string, int64]
IsExternalExtensions bool
msgOffset int
progress int
conversationSyncMutex sync.Mutex
streamMsgMutex sync.Mutex
conversationSyncer *syncer.Syncer[*model_struct.LocalConversation, pbConversation.GetOwnerConversationResp, string]
db db_interface.DataBase
ConversationListener func() open_im_sdk_callback.OnConversationListener
msgListener func() open_im_sdk_callback.OnAdvancedMsgListener
msgKvListener func() open_im_sdk_callback.OnMessageKvInfoListener
businessListener func() open_im_sdk_callback.OnCustomBusinessListener
recvCH chan common.Cmd2Value
loginUserID string
platformID int32
DataDir string
relation *relation.Relation
group *group.Group
user *user.User
file *file.File
cache *cache.Cache[string, *model_struct.LocalConversation]
maxSeqRecorder MaxSeqRecorder
messagePullForwardEndSeqMap *cache.Cache[string, int64]
messagePullReverseEndSeqMap *cache.Cache[string, int64]
IsExternalExtensions bool
msgOffset int
progress int
conversationSyncMutex sync.Mutex
streamMsgMutex sync.Mutex

startTime time.Time

Expand All @@ -96,20 +97,21 @@ func NewConversation(ctx context.Context, longConnMgr *interaction.LongConnMgr,
file *file.File) *Conversation {
info := ccontext.Info(ctx)
n := &Conversation{db: db,
LongConnMgr: longConnMgr,
recvCH: ch,
loginUserID: info.UserID(),
platformID: info.PlatformID(),
DataDir: info.DataDir(),
relation: relation,
group: group,
user: user,
file: file,
IsExternalExtensions: info.IsExternalExtensions(),
maxSeqRecorder: NewMaxSeqRecorder(),
messagePullMinSeqMap: cache.NewCache[string, int64](),
msgOffset: 0,
progress: 0,
LongConnMgr: longConnMgr,
recvCH: ch,
loginUserID: info.UserID(),
platformID: info.PlatformID(),
DataDir: info.DataDir(),
relation: relation,
group: group,
user: user,
file: file,
IsExternalExtensions: info.IsExternalExtensions(),
maxSeqRecorder: NewMaxSeqRecorder(),
messagePullForwardEndSeqMap: cache.NewCache[string, int64](),
messagePullReverseEndSeqMap: cache.NewCache[string, int64](),
msgOffset: 0,
progress: 0,
}
n.typing = newTyping(n)
n.initSyncer()
Expand Down Expand Up @@ -922,37 +924,37 @@ func (c *Conversation) FetchSurroundingMessages(ctx context.Context, conversatio
if len(res) == 0 {
return []*sdk_struct.MsgStruct{}, nil
}
_, msgList := c.LocalChatLog2MsgStruct(ctx, []*model_struct.LocalChatLog{res[0]})
if len(msgList) == 0 {
return []*sdk_struct.MsgStruct{}, nil
}
msg := msgList[0]
//_, msgList := c.LocalChatLog2MsgStruct []*model_struct.LocalChatLog{res[0]})
//if len(msgList) == 0 {
// return []*sdk_struct.MsgStruct{}, nil
//}
//msg := msgList[0]
result := make([]*sdk_struct.MsgStruct, 0, before+after+1)
if before > 0 {
req := sdk.GetAdvancedHistoryMessageListParams{
ConversationID: conversationID,
Count: int(before),
StartClientMsgID: msg.ClientMsgID,
}
val, err := c.getAdvancedHistoryMessageList(ctx, req, false)
if err != nil {
return nil, err
}
result = append(result, val.MessageList...)
}
result = append(result, msg)
if after > 0 {
req := sdk.GetAdvancedHistoryMessageListParams{
ConversationID: conversationID,
Count: int(after),
StartClientMsgID: msg.ClientMsgID,
}
val, err := c.getAdvancedHistoryMessageList(ctx, req, true)
if err != nil {
return nil, err
}
result = append(result, val.MessageList...)
}
sort.Sort(sdk_struct.NewMsgList(result))
//if before > 0 {
// req := sdk.GetAdvancedHistoryMessageListParams{
// ConversationID: conversationID,
// Count: int(before),
// StartClientMsgID: msg.ClientMsgID,
// }
// val, err := c.getAdvancedHistoryMessageList(ctx, req, false)
// if err != nil {
// return nil, err
// }
// result = append(result, val.MessageList...)
//}
//result = append(result, msg)
//if after > 0 {
// req := sdk.GetAdvancedHistoryMessageListParams{
// ConversationID: conversationID,
// Count: int(after),
// StartClientMsgID: msg.ClientMsgID,
// }
// val, err := c.getAdvancedHistoryMessageList(ctx, req, true)
// if err != nil {
// return nil, err
// }
// result = append(result, val.MessageList...)
//}
//sort.Sort(sdk_struct.NewMsgList(result))
return result, nil
}
Loading