diff --git a/examples/basic_example.ipynb b/examples/basic_example.ipynb index 66d746c5d..d3caf67de 100644 --- a/examples/basic_example.ipynb +++ b/examples/basic_example.ipynb @@ -180,9 +180,9 @@ ], "metadata": { "kernelspec": { - "display_name": "Python [conda env:.mlspace-focus_new]", + "display_name": "Python 3 (ipykernel)", "language": "python", - "name": "conda-env-.mlspace-focus_new-py" + "name": "python3" }, "language_info": { "codemirror_mode": { @@ -194,7 +194,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.16" + "version": "3.11.11" } }, "nbformat": 4, diff --git a/src/lm_polygraph/defaults/register_default_stat_calculators.py b/src/lm_polygraph/defaults/register_default_stat_calculators.py index 81361381d..9c847aba6 100644 --- a/src/lm_polygraph/defaults/register_default_stat_calculators.py +++ b/src/lm_polygraph/defaults/register_default_stat_calculators.py @@ -134,6 +134,11 @@ def _register( "lm_polygraph.defaults.stat_calculator_builders.default_ClaimsExtractor", {"openai_model": "gpt-4o", "cache_path": "~/.cache", "language": language}, ) + _register( + ReasoningKeywordsProbs, + "lm_polygraph.defaults.stat_calculator_builders.default_ReasoningKeywordsProbs", + {"max_retries": 5, "max_length_cot": 128, "temperature": 1.0} + ) else: raise NotImplementedError(f"Unknown model type: {model_type}") diff --git a/src/lm_polygraph/defaults/stat_calculator_builders/default_ReasoningKeywordsProbs.py b/src/lm_polygraph/defaults/stat_calculator_builders/default_ReasoningKeywordsProbs.py new file mode 100644 index 000000000..38820560e --- /dev/null +++ b/src/lm_polygraph/defaults/stat_calculator_builders/default_ReasoningKeywordsProbs.py @@ -0,0 +1,9 @@ +from lm_polygraph.stat_calculators.reasoning_keywords_probs import ( + ReasoningKeywordsProbs, +) + + +def load_stat_calculator(config, builder): + return ReasoningKeywordsProbs( + config.max_retries, config.max_length_cot, config.temperature + ) diff --git a/src/lm_polygraph/estimators/__init__.py b/src/lm_polygraph/estimators/__init__.py index 8162f6380..fd06e1232 100644 --- a/src/lm_polygraph/estimators/__init__.py +++ b/src/lm_polygraph/estimators/__init__.py @@ -77,3 +77,4 @@ from .kernel_language_entropy import KernelLanguageEntropy from .luq import LUQ from .eigenscore import EigenScore +from .chain_of_thought_uq import ProbasMeanWithCoT diff --git a/src/lm_polygraph/estimators/chain_of_thought_uq.py b/src/lm_polygraph/estimators/chain_of_thought_uq.py new file mode 100644 index 000000000..5dd7b6fd3 --- /dev/null +++ b/src/lm_polygraph/estimators/chain_of_thought_uq.py @@ -0,0 +1,138 @@ +import numpy as np +import math + +from typing import Dict, List, Tuple + +from .estimator import Estimator + + +def aggregate_probas_mean( + keyword_token_probability: Dict[str, Dict[str, List[int]]], contribution_scores: Dict[str, Dict[str, int]] = None +) -> Tuple[Dict[str, List[float]], Dict[str, List[float]]]: + """ + Aggregates token probabilities + + Parameters: + keyword_token_probability (Dict[str, Dict[str, List[int]]]): token probs for keywords + (example { + "step1": { + "keyword1": [0.7, 0.8], + "keyword2": [0.9, 0.6, 0.5], + }, + "step2": { + "keyword1": [0.5, 0.8], + "keyword3": [0.5, 0.9, 0.9], + }, + ... + } + ), + contribution_scores (Dict[str, Dict[str, int]]): contribution scores for keywords. + Returns: + Tuple[Dict[str, List[float]], Dict[str, List[float]]]: agg. keyword probs, agg. keyword contributions. + (example { + "keyword1": [(0.7 + 0.8) / 2, (0.5 + 0.8) / 2], + "keyword2": [(0.9 + 0.6 + 0.5) / 3], + "keyword3": [(0.5 + 0.9 + 0.9) / 3], + ... + } + ), + """ + return_keyword_dict = {} + return_contribution_dict = {} + for step, inner_dict in keyword_token_probability.items(): + for key, values in inner_dict.items(): + if len(values) == 0: + continue + # it is strange that min(values) was in original implementation for probas mean agg. strategy + # value_to_add = min(values) + value_to_add = np.mean(values) + if key in return_keyword_dict: + return_keyword_dict[key].append(value_to_add) + return_contribution_dict[key].append(contribution_scores[step][key]) + else: + return_keyword_dict[key] = [value_to_add] + return_contribution_dict[key] = [contribution_scores[step][key]] + return return_keyword_dict, return_contribution_dict + + +def weighted_sum(values: List[float]) -> float: + """ + Computes a softmin weighted sum of the input values. + + Parameters: + values (List[float]): values to be summed + Returns: + float: a softmin weighted sum + """ + if len(values) == 1: + return values[0] + weights = [math.exp(-c) for c in values] + sum_weights = sum(weights) + normalized_weights = [w / sum_weights for w in weights] + print(normalized_weights) + result = sum(w * c for w, c in zip(normalized_weights, values)) + return result + + +class ProbasMeanWithCoT(Estimator): + """ + Enhances Probas-Mean aggregated probabilities strategy with reasoning steps. + Only usabe for instruct-finetuned models with chat template support. + Adapted from the original implementation in the paper https://arxiv.org/pdf/2502.17214 + """ + + def __init__( + self, + name_postfix="", + ): + self.postfix = name_postfix + super().__init__( + [ + "input_texts", + "greedy_texts", + "reasoning_answer", + "reasoning_keywords_probabilities", + "reasoning_keywords_contributions", + ], + "sequence", + ) + + def __str__(self): + return f"ProbasMeanWithCoT{self.postfix}" + + def __call__(self, stats: Dict[str, np.ndarray]) -> np.ndarray: + prompts = stats["input_texts"] + ues = [] + for i, question in enumerate(prompts): + reasoning_answer = stats["reasoning_answer"][i] + if reasoning_answer == "": + ues.append(0.5) + continue + + keyword_token_probability = stats["reasoning_keywords_probabilities"][i] + if keyword_token_probability is None or keyword_token_probability == {}: + ues.append(0.5) + continue + contribution_scores = stats["reasoning_keywords_contributions"][i] + if contribution_scores is None or contribution_scores == {}: + ues.append(0.5) + continue + + probabilities, contribution_dict = aggregate_probas_mean(keyword_token_probability, contribution_scores) + + # softmin weighted sum of keywords probs + probabilities = {key: weighted_sum(value) for key, value in probabilities.items()} + # average of keywords contributions + contributions = {key: sum(value) / len(value) for key, value in contribution_dict.items()} + + # CoT-UQ + total_sum = sum(probabilities[key] * contributions[key] for key in probabilities) + total_weight = sum(contributions[key] for key in contributions) + if total_weight == 0: + p_list = [v for v in probabilities.values()] + confidence = sum(p_list) / len(p_list) + else: + confidence = total_sum / total_weight + ues.append(1 - confidence) + + return np.array(ues) diff --git a/src/lm_polygraph/stat_calculators/__init__.py b/src/lm_polygraph/stat_calculators/__init__.py index 354026271..99a0ec4ad 100644 --- a/src/lm_polygraph/stat_calculators/__init__.py +++ b/src/lm_polygraph/stat_calculators/__init__.py @@ -29,3 +29,4 @@ from .extract_claims import ClaimsExtractor from .infer_causal_lm_calculator import InferCausalLMCalculator from .semantic_classes import SemanticClassesCalculator +from .reasoning_keywords_probs import ReasoningKeywordsProbs diff --git a/src/lm_polygraph/stat_calculators/reasoning_keywords_probs.py b/src/lm_polygraph/stat_calculators/reasoning_keywords_probs.py new file mode 100644 index 000000000..3185b1c98 --- /dev/null +++ b/src/lm_polygraph/stat_calculators/reasoning_keywords_probs.py @@ -0,0 +1,461 @@ +import re +import torch +import numpy as np +from collections import defaultdict + +from typing import Dict, List, Tuple, Optional + +from .stat_calculator import StatCalculator +from lm_polygraph.utils.model import WhiteboxModel + + +cot_instruction = """ +Please reason the following question step by step. Label each reasoning step as "Step i:", where "i" is the step number. +You need to ensure that each step builds on the previous one and contributes meaningfully toward reaching the final answer. +Once you finish all steps, put your final answer on a separate line after the reasoning steps, starting with "Final Answer:" (do not label it as a step). + +Question: +Response: Let's think step by step. +""" + +keywords_extraction_instruction = ''' +You will be provided with a question and a multi-step response containing reasoning steps. +For each long reasoning step labeled "Step i:", extract the keywords, only the relevant tokens for that specific reasoning step. +You also need to evaluate the importance of each keyword to the final answer. Please evaluate the importance score following with the keyword by (//) on a scale of 1 to 10, where 1 is the least critical and 10 is the most critical. +If you find more than one keyword in a specific step, separate them with “;”. +If a specific step does not contribute meaningfully to deriving the final answer (e.g., repeating information already provided in the question, introducing irrelevant assumptions or speculations), return "Step i: NO ANSWER" for that step. For example: + +Question: + +Multi-Step Response: + +Keywords for Each Reasoning Step: +''' + + +def is_effectively_empty(obj): + if obj is None: + return True + + if isinstance(obj, (int, float)) and obj == 0: + return True + + if obj == "": + return True + + if isinstance(obj, list): + return all(is_effectively_empty(item) for item in obj) + + if isinstance(obj, dict): + if len(obj) == 0: + return True + return all(is_effectively_empty(value) for value in obj.values()) + return False + + +def parse_response_to_dict(response: str) -> Tuple[Optional[str], Dict[str, str], Optional[str]]: + """ + Parse model reasoning output to highlight: reasoning answer, reasoning steps, reasoning output without answer. + + Parameters: + response (str): reasoning output. + Returns: + Tuple[Optional[str], Dict[str, str], Optional[str]]: + - final answer (str or None), + - dictionary of steps (e.g., {"Step 1": "Step 1: ..."}), + - response before final answer (str or None) + """ + steps: Dict[str, str] = {} + final_answer: Optional[str] = None + + # Match Final Answer + match = re.search(r"Final Answer:\s*(.+?)\s*(?=(\n|$))", response, re.DOTALL) + if match: + final_answer = match.group(1).strip() + response_before_final_answer = response[:match.end()].strip() + else: + return None, {}, None + + # Match Steps + matches = list(re.finditer(r"(Step \d+):", response_before_final_answer)) + for i, match in enumerate(matches): + start = match.start() + end = matches[i + 1].start() if i + 1 < len(matches) else len(response_before_final_answer) + segment = response[start:end].strip() + steps[match.group(1)] = segment + + return_response = response_before_final_answer + return final_answer, steps, return_response + + +def match_final_answer_token_ids(tokenizer, original_tokens, response_tokens, original_token_ids): + # caution + final_answer_tokens = tokenizer.tokenize("Final answer:") + + end_index = None + end_index_original = None + + for i in range(len(response_tokens) - len(final_answer_tokens) + 1): + if response_tokens[i : i + len(final_answer_tokens)] == final_answer_tokens: + end_index = i + len(final_answer_tokens) + break + + if end_index is None or end_index + 1 == len(response_tokens): + return None, None + + for i in range(len(original_tokens) - len(final_answer_tokens) + 1): + if original_tokens[i : i + len(final_answer_tokens)] == final_answer_tokens: + end_index_original = i + len(final_answer_tokens) + break + + if end_index_original is None: + return None, None + + if response_tokens[end_index] in ["▁", "Ġ", tokenizer.tokenize(" ")]: + end_index += 1 + end_index_original += 1 + + target_tokens = response_tokens[end_index:] + + final_answer_token_ids = original_token_ids[end_index_original : end_index_original + len(target_tokens)] + + return end_index_original, final_answer_token_ids.data.cpu().numpy() + + +def predict(prompt, model, tokenizer, max_length_cot, temperature): + inputs = tokenizer(prompt, return_tensors="pt").to('cuda') + generate_ids = model.generate( + **inputs, + max_new_tokens=max_length_cot, + temperature=temperature, + pad_token_id=tokenizer.eos_token_id) + generate_ids = generate_ids[0][len(inputs["input_ids"][0]):-1] + infer_res = tokenizer.decode(generate_ids) + return infer_res + + +def step_exacts_2_list(response): + # Split response into lines and filter out empty lines + lines = response.splitlines() + lines = [line for line in lines if line.strip()] + + keywords_by_step = [] + contributions_by_step = [] + valid_response_text = [] + + for line in lines: + # Match lines starting with "Step X:" + match = re.search(r"Step \d+: (.+)", line) + if match: + if "(/" not in line or "/)" not in line: + continue # Skip invalid lines + + # Extract keywords with contributions + keywords_w_contribution = match.group(1).split("; ") + + # Check for valid format and skip invalid lines + if any("(/" not in key_w_c or "/)" not in key_w_c for key_w_c in keywords_w_contribution): + continue + + try: + # Extract keywords and contributions + keywords = [key_w_c.split("(/")[0].strip() for key_w_c in keywords_w_contribution] + contributions = [int(key_w_c.split("(/")[1].split("/)")[0].strip()) for key_w_c in keywords_w_contribution] + except ValueError: + return False # Return False if contributions cannot be converted to int + + for i in contributions: + if i > 10: + return False + + keywords_by_step.append(keywords) + contributions_by_step.append(contributions) + valid_response_text.append(line) # Add valid lines from the original response + + # If no valid lines are found, return False + if not valid_response_text: + return False + + return "\n".join(valid_response_text), keywords_by_step, contributions_by_step + + +def find_subsequence_position(sub_sequence, long_sequence): + len_long = long_sequence.size(0) + len_sub = len(sub_sequence) + + sub_sequence_tensor = torch.tensor(sub_sequence, device=long_sequence.device) + + for i in range(len_long - len_sub + 1): + if torch.equal(long_sequence[i:i + len_sub], sub_sequence_tensor): + return i + return -1 + + +def clean_words(word): + # TODO forward space token + return word.replace(" ", "").replace(".", "").replace("\"", "").replace("\n", "").replace("_", "").replace("Ġ", "").lower() + + +def find_token_indices(tokens, word): + word_len = len(word.replace(" ", "")) + + for start_index in range(len(tokens)): + combined_text = "" + end_index = start_index + while end_index < len(tokens) and len(combined_text) < word_len: + combined_text += tokens[end_index] + if clean_words(combined_text) == clean_words(word): + return start_index, end_index + end_index += 1 + return -1, -1 + + +def is_word_in_sentence(sentence, word): + pattern = re.escape(word) + match = re.search(pattern, sentence, re.IGNORECASE) + return True if match else False + + +class ReasoningKeywordsProbs(StatCalculator): + """ + For Whitebox model (lm_polygraph.WhiteboxModel), at input texts batch calculates: + * model output for reasoning enhanced input, + * model answer for reasoning enhanced input, + * token probabilities for `reasoning_answer`, + * keywords from `reasoning_output`, + * probabilities for `reasoning_keywords`, + * contributions for `reasoning_keywords`, + * step-wise token indices for `reasoning_keywords`, + * token indices for `reasoning_keywords`. + """ + + @staticmethod + def meta_info() -> Tuple[List[str], List[str]]: + """ + Returns the statistics and dependencies for the calculator. + """ + return [ + "reasoning_output", + "reasoning_answer", + "reasoning_answer_tokens_probs", + "reasoning_keywords", + "reasoning_keywords_probabilities", + "reasoning_keywords_contributions", + "reasoning_keywords_token_ids", + "reasoning_answer_token_ids", + ], ["input_texts"] + + def __init__(self, max_retries=5, max_length_cot=128, temperature=1): + super().__init__() + self.max_retries = max_retries + self.max_length_cot = max_length_cot + self.temperature = temperature + + def __call__( + self, + dependencies: Dict[str, np.array], + texts: List[str], + model: WhiteboxModel, + max_new_tokens: int = 100, + ) -> Dict[str, np.ndarray]: + """ + Calculates the statistics of reasoning enhanced process. + + Parameters: + dependencies (Dict[str, np.ndarray]): input statistics, can be empty (not used). + texts (List[str]): Input texts batch used for model generation. + model (Model): Model used for generation. + max_new_tokens (int): Maximum number of new tokens at model generation. Default: 100. + Returns: + Dict[str, np.ndarray]: dictionary with the following items: + - 'reasoning_output' (List[str]): model output for reasoning enhanced input, + - 'reasoning_answer' (List[str]): model answer for reasoning enhanced input, + - 'reasoning_answer_tokens_probs' (List[str]): token probabilities for `reasoning_answer`, + - 'reasoning_keywords' (List[str]): keywords from `reasoning_output`, + - 'reasoning_keywords_probabilities' (List[Dict[str, Dict[str, List[int]]]]): probabilities for `reasoning_keywords`, + - 'reasoning_keywords_contributions' (List[Dict[str, Dict[str, int]]]): contributions for `reasoning_keywords`, + - 'reasoning_keywords_token_ids' (List[Dict[str, Dict[str, List[int]]]]): step-wise token indices for `reasoning_keywords`, + - 'reasoning_answer_token_ids' (List[Dict[str, List[int]]]): token indices for `reasoning_keywords`. + """ + result_dict = defaultdict(list) + for question in texts: + cot_prompt = cot_instruction.replace("", question) + + inputs = model.tokenizer(cot_prompt, return_tensors="pt") + inputs = {key: value.to(model.model.device) for key, value in inputs.items()} + n_of_retries = 0 + while n_of_retries < self.max_retries: + outputs = model.generate( + **inputs, + max_new_tokens=self.max_length_cot, + temperature=self.temperature, + pad_token_id=model.tokenizer.eos_token_id, + return_dict_in_generate=True, + output_scores=True, + ) + + # generated token ids for the question enchanced with CoT. + generated_ids = outputs.sequences[0][len(inputs["input_ids"][0]) : -1] + # generated text for the question enchaced with CoT + to_parse = model.tokenizer.decode(generated_ids, skip_special_tokens=True) + + llm_answer, steps_dict, response = parse_response_to_dict(to_parse) + if generated_ids.size(0) >= self.max_length_cot: + # log.debug(f'New Reasoning Tokens Are Too Much, Current try is {n_of_retries + 1}') + n_of_retries += 1 + continue + elif generated_ids.size(0) == 0: + # log.debug(f'New Reasoning Tokens Are Null, Current try is {n_of_retries + 1}') + n_of_retries += 1 + continue + elif llm_answer is None or llm_answer in ["", " "]: + # log.debug(f'New Reasoning Tokens Are None, Current try is {n_of_retries + 1}') + n_of_retries += 1 + continue + + # reasoning tokens without final answer + response_tokens = model.tokenizer.tokenize(response) + # reasoning token ids without final answer + # response_token_ids = model.tokenizer.convert_tokens_to_ids(response_tokens) + # full reasoning tokens + original_tokens = model.tokenizer.convert_ids_to_tokens(generated_ids) + + probabilities = [ + {i: p for i, p in enumerate(prob[0]) if p > 0} + for prob in [torch.softmax(score, dim=1).tolist() for score in outputs.scores] + ] + + final_answer_probabilities = {} + final_answer_token_ids = {} + answer_start_indice, answer_token_ids = match_final_answer_token_ids( + model.tokenizer, + original_tokens, + response_tokens, + generated_ids, + ) + if answer_start_indice is None: + # log.debug(f'Cannot locate the Final Answer, Current try is {n_of_retries + 1}') + n_of_retries += 1 + continue + answer_probs = [] + flag = False + for j, token_id in enumerate(answer_token_ids): + idxx = j + answer_start_indice + if token_id not in probabilities[idxx].keys(): + flag = True + break + answer_probs.append(probabilities[idxx][token_id]) + if flag: + # log.debug(f'Cannot locate the Final Answer Token Probability, Current try is {n_of_retries + 1}') + n_of_retries += 1 + continue + final_answer_probabilities[llm_answer] = answer_probs + final_answer_token_ids[llm_answer] = answer_token_ids.tolist() + + # exacts_prompt = get_step_exact_tokens(args, q, response) + keywords_extraction_prompt = keywords_extraction_instruction.replace('', question).replace('', response) + keywords_extraction_prompt_output = predict(keywords_extraction_prompt, model, model.tokenizer, self.max_length_cot, self.temperature) + + if "NO ANSWER" in keywords_extraction_prompt_output: + # log.debug(f'Exact Tokens Have NO ANSWER, Current try is {n_of_retries + 1}') + n_of_retries += 1 + continue + parsed_keywords_output = step_exacts_2_list(keywords_extraction_prompt_output) + if not parsed_keywords_output: + # log.debug(f'Exact Tokens Have no contribution scores, Current try is {n_of_retries + 1}') + n_of_retries += 1 + continue + extracted_keywords, keywords_list, contributions_list = parsed_keywords_output + if len(keywords_list) == 0: + # log.debug(f'Cannot Exract Effective Keywords, Current try is {n_of_retries + 1}') + n_of_retries += 1 + continue + + if len(steps_dict) > len(keywords_list): + # log.debug(f'Len of keywords list doesn\'t match the len of step dict, Current try is {n_of_retries + 1}') + n_of_retries += 1 + continue + + keywords_probabilities = {} + keywords_contributions = {} + keywords_token_ids = {} + for step_idx, (step_name, step_text) in enumerate(steps_dict.items()): + # # Skip the Final Answer + keywords = keywords_list[step_idx] + contributions = contributions_list[step_idx] + if len(keywords) == 1 and keywords[0] == "NO ANSWER": + continue + step_tokens = model.tokenizer.tokenize(step_text) + space_token = model.tokenizer.tokenize(" ") + processed_step_tokens = [ + (token[1:] if token.startswith(space_token) else token) + for token in step_tokens + ] + step_token_ids = model.tokenizer.convert_tokens_to_ids(step_tokens) + start_position = find_subsequence_position(step_token_ids[1:-2], generated_ids) - 1 + step_token_ids = generated_ids[start_position : start_position + len(step_tokens)] + keywords_probabilities_dict = {} + keywords_contributions_dict = {} + keywords_token_ids_dict = {} + for keyword_idx, keyword in enumerate(keywords): + keyword_probs = [] + keyword_token_ids = [] + if is_word_in_sentence(step_text, keyword) is not True: + # log.debug(f"\n{step_name}-Keyword-{keyword_idx} Does not appear in the Step Text") + continue + keyword_token_start_idx, keyword_token_end_idx = find_token_indices( + processed_step_tokens, keyword + ) + keyword_token_ids = generated_ids[ + start_position + keyword_token_start_idx : start_position + keyword_token_end_idx + 1 + ] + keyword_token_ids = keyword_token_ids.data.cpu().numpy() + + for j, token_id in enumerate(keyword_token_ids): + idxx = start_position + keyword_token_start_idx + j + keyword_probs.append(probabilities[idxx][token_id]) + keywords_probabilities_dict[keyword] = keyword_probs + keywords_contributions_dict[keyword] = int(contributions[keyword_idx]) + keywords_token_ids_dict[keyword] = keyword_token_ids.tolist() + + keywords_probabilities[step_name] = keywords_probabilities_dict + keywords_contributions[step_name] = keywords_contributions_dict + keywords_token_ids[step_name] = keywords_token_ids_dict + + if is_effectively_empty(keywords_probabilities): + # log.debug(f'Token Probability from All Steps are All None, Current try is {n_of_retries + 1}') + n_of_retries += 1 + continue + + # Dict[str, np.ndarray]: dictionary with the following items: + # - 'reasoning_output' (List[str]): model output for reasoning enhanced input, + # - 'reasoning_answer' (List[str]): model answer for reasoning enhanced input, + # - 'reasoning_answer_tokens_probs' (List[str]): token probabilities for `reasoning_answer`, + # - 'reasoning_keywords' (List[str]): keywords from `reasoning_output`, + # - 'reasoning_keywords_probabilities' (List[Dict[str, Dict[str, List[int]]]]): probabilities for `reasoning_keywords`, + # - 'reasoning_keywords_contributions' (List[Dict[str, Dict[str, int]]]): contributions for `reasoning_keywords`, + # - 'reasoning_keywords_token_ids' (List[Dict[str, Dict[str, List[int]]]]): step-wise token indices for `reasoning_keywords`, + # - 'reasoning_answer_token_ids' (List[Dict[str, List[int]]]): token indices for `reasoning_keywords`. + + result_dict["reasoning_output"].append(response) + result_dict["reasoning_answer"].append(llm_answer) + result_dict["reasoning_answer_tokens_probs"].append(final_answer_probabilities) + result_dict["reasoning_keywords"].append(extracted_keywords) + result_dict["reasoning_keywords_probabilities"].append(keywords_probabilities) + result_dict["reasoning_keywords_contributions"].append(keywords_contributions) + result_dict["reasoning_keywords_token_ids"].append(keywords_token_ids) + result_dict["reasoning_answer_token_ids"].append(final_answer_token_ids) + break + + if n_of_retries >= self.max_retries: + # log.debug(f'#####The Following Question:#####\n{q}\nHas no Meaningful Answer & Explanations, Record and Skip') + result_dict["reasoning_output"].append(response) + result_dict["reasoning_answer"].append(llm_answer) + result_dict["reasoning_answer_tokens_probs"].append(None) + result_dict["reasoning_keywords"].append(None) + result_dict["reasoning_keywords_probabilities"].append(None) + result_dict["reasoning_keywords_contributions"].append(None) + result_dict["reasoning_keywords_token_ids"].append(None) + result_dict["reasoning_answer_token_ids"].append(None) + + return result_dict diff --git a/src/lm_polygraph/stat_calculators/stat_calculator.py b/src/lm_polygraph/stat_calculators/stat_calculator.py index e6e6655c4..031e4f163 100644 --- a/src/lm_polygraph/stat_calculators/stat_calculator.py +++ b/src/lm_polygraph/stat_calculators/stat_calculator.py @@ -18,7 +18,7 @@ class StatCalculator(ABC): UEManager at lm_polygraph.utils.manager will order all the needed calculators and estimators to be called in the correct order. Any cycle dependencies among calculators will be spotted by UEManager and end with an exception. - Each new StatCalculator needs to be registered at lm_polygraph/stat_calculators/__init__.py to be seen be UEManager. + Each new StatCalculator needs to be registered at lm_polygraph/stat_calculators/__init__.py to be seen by UEManager. """ @staticmethod diff --git a/src/lm_polygraph/utils/factory_estimator.py b/src/lm_polygraph/utils/factory_estimator.py index c1e13b5b0..24c859edd 100644 --- a/src/lm_polygraph/utils/factory_estimator.py +++ b/src/lm_polygraph/utils/factory_estimator.py @@ -46,6 +46,7 @@ def load_simple_estimators(name: str, config): ClaimConditionedProbabilityClaim, RandomBaselineClaim, FocusClaim, + ProbasMeanWithCoT, ] try: diff --git a/test/test_estimators.py b/test/test_estimators.py index 50dcd260a..48d3faa8d 100644 --- a/test/test_estimators.py +++ b/test/test_estimators.py @@ -244,3 +244,9 @@ def test_eigenscore(model): estimator = EigenScore() ue = estimate_uncertainty(model, estimator, INPUT) assert isinstance(ue.uncertainty, float) + +def test_probas_mean_with_cot(model): + estimator = ProbasMeanWithCoT() + ue = estimate_uncertainty(model, estimator, INPUT) + assert isinstance(ue.uncertainty, float) +