Source code for fanoutqa.eval.scorer

import asyncio
import warnings
from typing import Dict, Iterable, Optional, Tuple

import rouge_score
import rouge_score.scoring
from bleurt.score import BleurtScorer
from rouge_score.rouge_scorer import RougeScorer

from fanoutqa.eval.llm import OPENAI_API_KEY, get_llm_factuality
from fanoutqa.eval.models import (
    AccuracyScore,
    Answer,
    EvaluationScore,
    EvaluationSingleScore,
    RougeScore,
    RougeScorePart,
)
from fanoutqa.eval.string import answer_in_text
from fanoutqa.eval.utils import str_answer
from fanoutqa.models import DevQuestion
from fanoutqa.utils import batched

ROUGE_TYPES = ("rouge1", "rouge2", "rougeL")


class Scorer:
    def __init__(
        self, questions: list[DevQuestion], answers: list[Answer], only_score_answered=False, llm_cache_key: str = None
    ):
        """
        :param questions: The questions and reference answers, as loaded by the dataset
        :param answers: The generated answers to score
        :param only_score_answered: Whether to only score questions that have an answer (True), or consider unanswered
            questions to have 0 score (False, default).
        :param llm_cache_key: If this is provided, cache the LLM-as-judge generations with this key. We recommend
            setting this to a human-readable key for each system under test.
        """
        self.questions = questions
        self.questions_by_id = {q.id: q for q in self.questions}
        self.answers = answers
        self.answers_by_id = {r["id"]: r for r in self.answers}

        # number of trials to eval
        self.only_score_answered = only_score_answered
        if self.only_score_answered:
            self.eval_len = len(self.answers)
        else:
            self.eval_len = len(self.questions)

        self.llm_cache_key = llm_cache_key

        # ext evallers
        self.rouge = RougeScorer(ROUGE_TYPES, use_stemmer=True)
        self.bleurt = BleurtScorer("BLEURT-20")

    async def score(self):
        acc, acc_raw = self.score_accuracy()
        rouge, rouge_raw = self.score_rouge()
        bleurt_, bleurt_raw = self.score_bleurt()
        # require FANOUTQA_OPENAI_API_KEY to be set to do GPT judge to prevent footguns
        if not OPENAI_API_KEY:
            warnings.warn(
                "No OpenAI API key found! To run GPT-as-judge scoring, set the `FANOUTQA_OPENAI_API_KEY` env var to"
                " your OpenAI API key."
            )
            gptscore = 0
            gpt_raw = {}
        else:
            gptscore, gpt_raw = await self.score_gpt()

        # collect raw aggs
        raw_scores = []
        for q, a in self.get_qa_pairs():
            raw_scores.append(
                EvaluationSingleScore(
                    question_id=q.id,
                    acc=acc_raw[q.id],
                    rouge=rouge_raw[q.id],
                    bleurt=bleurt_raw[q.id],
                    gpt=gpt_raw.get(q.id),
                )
            )
        return EvaluationScore(acc=acc, rouge=rouge, bleurt=bleurt_, gpt=gptscore, raw=raw_scores)

    def get_qa_pairs(self) -> Iterable[tuple[DevQuestion, Optional[Answer]]]:
        """Yield pairs of questions and answers to score.
        The answer may be None if there is no answer for a given question and ``only_score_answered`` is False.
        """
        if self.only_score_answered:
            for a in self.answers:
                q = self.questions_by_id.get(a["id"])
                yield q, a
        else:
            for q in self.questions:
                a = self.answers_by_id.get(q.id)
                yield q, a

    # scorers
    def score_accuracy(self) -> Tuple[AccuracyScore, Dict[str, float]]:
        """Get the loose and strict accuracy scores for the loaded qs and as."""
        raw_scores = {}  # qid -> score
        accs = []
        n_perfect = 0
        for q, a in self.get_qa_pairs():
            if a is None:
                accs.append(0)
                raw_scores[q.id] = 0
                continue
            result = answer_in_text(q.answer, a["answer"])
            accs.append(result.score)
            raw_scores[q.id] = result.score
            if result.found:
                n_perfect += 1

        assert len(accs) == self.eval_len
        assert len(raw_scores) == self.eval_len
        avg_acc = sum(accs) / self.eval_len
        pct_perfect = n_perfect / self.eval_len
        return AccuracyScore(loose=avg_acc, strict=pct_perfect), raw_scores

    def score_rouge(self) -> Tuple[RougeScore, Dict[str, RougeScore]]:
        """Get the ROUGE-1, ROUGE-2, and ROUGE-L scores (P/R/F1) for the loaded qs and as."""
        raw_scores = {}  # qid -> RougeScore
        scores = {t: [] for t in ROUGE_TYPES}  # rouge_type -> list[Score]
        for q, a in self.get_qa_pairs():
            if a is None:
                for score in scores.values():
                    score.append(rouge_score.scoring.Score(0, 0, 0))
                raw_scores[q.id] = RougeScore(
                    **{k: RougeScorePart(precision=0, recall=0, fscore=0) for k in ROUGE_TYPES}
                )
                continue
            results = self.rouge.score(str_answer(q.answer), str_answer(a["answer"]))
            for k, v in results.items():
                scores[k].append(v)
            raw_scores[q.id] = RougeScore(
                **{
                    k: RougeScorePart(precision=v.precision, recall=v.recall, fscore=v.fmeasure)
                    for k, v in results.items()
                }
            )

        assert all(len(v) == self.eval_len for v in scores.values())
        assert len(raw_scores) == self.eval_len
        out = {}
        for k, v in scores.items():
            avg_precision = sum(s.precision for s in v) / self.eval_len
            avg_recall = sum(s.recall for s in v) / self.eval_len
            avg_fscore = sum(s.fmeasure for s in v) / self.eval_len
            out[k] = RougeScorePart(precision=avg_precision, recall=avg_recall, fscore=avg_fscore)
        return RougeScore(**out), raw_scores

    def score_bleurt(self) -> Tuple[float, Dict[str, float]]:
        """Get the BLEURT score for the loaded qs and as."""
        references = []
        candidates = []
        idx_to_id = {}
        for idx, (q, a) in enumerate(self.get_qa_pairs()):
            idx_to_id[idx] = q.id
            if a is None:
                candidates.append("")
            else:
                candidates.append(str_answer(a["answer"]))
            references.append(str_answer(q.answer))

        scores = self.bleurt.score(references=references, candidates=candidates)
        assert len(scores) == self.eval_len
        avg_score = sum(scores) / self.eval_len
        raw_scores = {idx_to_id[idx]: score for idx, score in enumerate(scores)}
        assert len(raw_scores) == self.eval_len
        return avg_score, raw_scores

    async def score_gpt(self) -> Tuple[float, Dict[str, int]]:
        """Use GPT-4 as a judge to grade the loaded qs and as."""
        accs = []
        raw_scores = {}

        for batch in batched(self.get_qa_pairs(), 20):
            # eval 20 qs at a time
            coros = []
            ids = []
            for q, a in batch:
                if a is None:
                    accs.append(0)
                    raw_scores[q.id] = 0
                    continue
                # sometimes we have fun neural text degeneration, just cut it off
                ans = a["answer"]
                if len(a["answer"]) > 4000:
                    warnings.warn(f"The answer to question ID {a['id']} is too long, trimming it to 4000 characters.")
                    ans = ans[:4000]
                coro = get_llm_factuality(q, ans, cache_key=self.llm_cache_key)
                coros.append(coro)
                ids.append(q.id)

            # and score their answers
            # B, C, E = full score, anything else = 0
            answers = await asyncio.gather(*coros)
            for qid, result in zip(ids, answers):
                mc = result.strip()[-1].lower()
                if mc in "bce":
                    accs.append(1)
                    raw_scores[qid] = 1
                else:
                    accs.append(0)
                    raw_scores[qid] = 0

        assert len(accs) == self.eval_len
        assert len(raw_scores) == self.eval_len
        avg_acc = sum(accs) / self.eval_len
        return avg_acc, raw_scores


[docs] def evaluate(questions: list[DevQuestion], answers: list[Answer], **kwargs) -> EvaluationScore: """ Evaluate all FOQA metrics across the given questions and generated answers. :param questions: The questions and reference answers, as loaded by the dataset. :param answers: The generated answers to score. These should be dictionaries like ``{"id": "...", "answer": "..."}`` :param only_score_answered: Whether to only score questions that have an answer (True), or consider unanswered questions to have 0 score (False, default). This is useful for evaluating only a subset of the dataset. :param llm_cache_key: If this is provided, cache the LLM-as-judge generations with this key. We recommend setting this to a human-readable key for each system under test. """ scorer = Scorer(questions, answers, **kwargs) return asyncio.run(scorer.score())