@@ -628,55 +628,83 @@ def _rrf_merge(
628628 k : int = _RRF_K ,
629629 ) -> List [Document ]:
630630 """
631- Reciprocal Rank Fusion: score(d) = Σ_i 1 / (k + rank_i (d))
631+ Reciprocal Rank Fusion: score(d) = Σ_i 1 / (k + best_rank_i (d))
632632
633633 Args:
634634 ranked_lists: 多路 (source_name, ranked_docs) — docs 按相关度降序
635635 top_k: 最终返回个数
636636 k: RRF 平滑常数,默认 60(Cormack et al. 2009)
637637
638638 去重 key:node_id 优先,page_content[:200] hash 兜底。
639- 合并后 metadata 写入 rrf_score / rrf_sources / final_score。
640- """
641- rrf_scores : Dict [str , float ] = {}
642- doc_index : Dict [str , Document ] = {}
643- sources : Dict [str , List [str ]] = {}
644- ranks_by_source : Dict [str , Dict [str , int ]] = {}
645639
646- for source_name , ranked_docs in ranked_lists :
640+ 同 source 内同 doc_id 多次命中(如一道菜的多个 chunk 共享 recipe.nodeId):
641+ - 算分只取该 source 内最佳 rank(最小 rank)一次,避免重复加分
642+ - 命中 chunk 数另存到 rrf_chunk_hits,供后续分析
643+
644+ canonical doc(最终展示给 LLM 的 page_content):
645+ 选全局最小 rank 那个 chunk;rank 相同时按 ranked_lists 顺序优先。
646+
647+ 返回的 Document 是新对象,不会 mutate 输入 list 里的 Document。
648+ """
649+ # doc_id -> source_name -> 该 source 内最小 rank(用于算分)
650+ best_rank_per_source : Dict [str , Dict [str , int ]] = {}
651+ # doc_id -> source_name -> 该 source 内命中 chunk 次数(信息存档)
652+ chunk_hits_per_source : Dict [str , Dict [str , int ]] = {}
653+ # doc_id -> (global_best_rank, source_priority, doc) — 选 canonical doc
654+ best_doc_info : Dict [str , Tuple [int , int , Document ]] = {}
655+
656+ for source_priority , (source_name , ranked_docs ) in enumerate (ranked_lists ):
647657 for rank , doc in enumerate (ranked_docs , start = 1 ):
648658 node_id = doc .metadata .get ("node_id" )
649659 doc_id = (
650660 str (node_id ) if node_id is not None
651661 else f"hash::{ hash (doc .page_content [:200 ])} "
652662 )
653663
654- contribution = 1.0 / (k + rank )
655- rrf_scores [doc_id ] = rrf_scores .get (doc_id , 0.0 ) + contribution
664+ if doc_id not in best_rank_per_source :
665+ best_rank_per_source [doc_id ] = {}
666+ chunk_hits_per_source [doc_id ] = {}
667+
668+ curr_best = best_rank_per_source [doc_id ].get (source_name )
669+ # 如果是第一次出现或者当前rank比记录的更小,则更新
670+ if curr_best is None or rank < curr_best :
671+ best_rank_per_source [doc_id ][source_name ] = rank
672+
673+ chunk_hits_per_source [doc_id ][source_name ] = (
674+ chunk_hits_per_source [doc_id ].get (source_name , 0 ) + 1
675+ )
656676
657- # 第一次见到这个 doc 时记录为 canonical(通常是 rank 较高的那路)
658- if doc_id not in doc_index :
659- doc_index [doc_id ] = doc
660- sources [doc_id ] = []
661- ranks_by_source [doc_id ] = {}
677+ new_key = (rank , source_priority )
678+ if (
679+ doc_id not in best_doc_info
680+ or new_key < (best_doc_info [doc_id ][0 ], best_doc_info [doc_id ][1 ])
681+ ):
682+ best_doc_info [doc_id ] = (rank , source_priority , doc )
662683
663- if source_name not in sources [doc_id ]:
664- sources [doc_id ].append (source_name )
665- ranks_by_source [doc_id ][source_name ] = rank
684+ # 每个 source 只用 best rank 算一次贡献
685+ rrf_scores : Dict [str , float ] = {
686+ doc_id : sum (1.0 / (k + r ) for r in source_ranks .values ())
687+ for doc_id , source_ranks in best_rank_per_source .items ()
688+ }
666689
667- # 按 RRF score 降序
668690 sorted_ids = sorted (
669691 rrf_scores .keys (), key = lambda d : rrf_scores [d ], reverse = True
670692 )
671693
672694 merged : List [Document ] = []
673695 for doc_id in sorted_ids [:top_k ]:
674- doc = doc_index [doc_id ]
675- doc .metadata ["rrf_score" ] = rrf_scores [doc_id ]
676- doc .metadata ["rrf_sources" ] = list (sources [doc_id ])
677- doc .metadata ["rrf_ranks" ] = dict (ranks_by_source [doc_id ])
678- doc .metadata ["final_score" ] = rrf_scores [doc_id ]
679- merged .append (doc )
696+ _ , _ , source_doc = best_doc_info [doc_id ]
697+ # 浅 copy metadata,避免 mutate 上游 Document
698+ new_metadata = dict (source_doc .metadata )
699+ new_metadata ["rrf_score" ] = rrf_scores [doc_id ]
700+ new_metadata ["rrf_sources" ] = list (best_rank_per_source [doc_id ].keys ())
701+ new_metadata ["rrf_ranks" ] = dict (best_rank_per_source [doc_id ])
702+ new_metadata ["rrf_chunk_hits" ] = dict (chunk_hits_per_source [doc_id ])
703+ new_metadata ["final_score" ] = rrf_scores [doc_id ]
704+ merged .append (Document (
705+ page_content = source_doc .page_content ,
706+ metadata = new_metadata ,
707+ ))
680708
681709 return merged
682710
0 commit comments