1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
| class RetrievalEvaluator:
def __init__(self):
self.metrics = {}
def evaluate(self, queries: List[str], ground_truth: List[List[str]],
retrieved: List[List[str]]) -> Dict:
"""评估检索质量"""
metrics = {
"mrr": self.calculate_mrr(ground_truth, retrieved),
"map": self.calculate_map(ground_truth, retrieved),
"ndcg": self.calculate_ndcg(ground_truth, retrieved),
"recall@k": {},
"precision@k": {}
}
for k in [1, 3, 5, 10]:
metrics["recall@k"][k] = self.calculate_recall_at_k(
ground_truth, retrieved, k
)
metrics["precision@k"][k] = self.calculate_precision_at_k(
ground_truth, retrieved, k
)
return metrics
def calculate_mrr(self, ground_truth: List[List[str]],
retrieved: List[List[str]]) -> float:
"""计算MRR(平均倒数排名)"""
mrr = 0
for gt, ret in zip(ground_truth, retrieved):
for i, doc in enumerate(ret):
if doc in gt:
mrr += 1 / (i + 1)
break
return mrr / len(ground_truth)
def calculate_ndcg(self, ground_truth: List[List[str]],
retrieved: List[List[str]], k: int = 10) -> float:
"""计算NDCG"""
import numpy as np
def dcg(relevances, k):
relevances = np.array(relevances)[:k]
if relevances.size:
return np.sum(relevances / np.log2(np.arange(2, relevances.size + 2)))
return 0
ndcg_scores = []
for gt, ret in zip(ground_truth, retrieved):
relevances = [1 if doc in gt else 0 for doc in ret[:k]]
ideal_relevances = [1] * min(len(gt), k) + [0] * (k - min(len(gt), k))
dcg_score = dcg(relevances, k)
idcg_score = dcg(ideal_relevances, k)
if idcg_score > 0:
ndcg_scores.append(dcg_score / idcg_score)
else:
ndcg_scores.append(0)
return np.mean(ndcg_scores)
|