From 966b83023ee26d013bd320dd1454446df7ee3f12 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sat, 16 May 2026 14:11:52 +0300 Subject: [PATCH 01/11] init: manifold implementation --- Makefile | 4 +- src/_schemas.py | 12 + .../func_manifold_fetch}/Makefile | 28 +- src/orchestration/func_manifold_fetch/main.py | 30 + .../func_manifold_fetch}/requirements.txt | 8 +- .../func_manifold_update}/Makefile | 26 +- .../func_manifold_update/main.py | 54 ++ .../func_manifold_update}/requirements.txt | 8 +- src/questions/manifold/.gcloudignore | 3 - src/questions/manifold/fetch/main.py | 145 ---- .../manifold/update_questions/main.py | 295 -------- src/sources/manifold.py | 388 ++++++++++- src/tests/conftest.py | 64 ++ src/tests/test_manifold.py | 649 ++++++++++++++++++ 14 files changed, 1235 insertions(+), 479 deletions(-) rename src/{questions/manifold/fetch => orchestration/func_manifold_fetch}/Makefile (55%) create mode 100644 src/orchestration/func_manifold_fetch/main.py rename src/{questions/manifold/update_questions => orchestration/func_manifold_fetch}/requirements.txt (88%) rename src/{questions/manifold/update_questions => orchestration/func_manifold_update}/Makefile (57%) create mode 100644 src/orchestration/func_manifold_update/main.py rename src/{questions/manifold/fetch => orchestration/func_manifold_update}/requirements.txt (88%) delete mode 100644 src/questions/manifold/.gcloudignore delete mode 100644 src/questions/manifold/fetch/main.py delete mode 100644 src/questions/manifold/update_questions/main.py create mode 100644 src/tests/test_manifold.py diff --git a/Makefile b/Makefile index 98b109c3..c59b8c0c 100644 --- a/Makefile +++ b/Makefile @@ -127,10 +127,10 @@ baselines: llm-baselines naive-and-dummy-forecasters manifold: manifold-fetch manifold-update-questions manifold-fetch: - $(MAKE) -C src/questions/manifold/fetch || echo "* $@" >> $(MAKE_FAILURE_LOG) + $(MAKE) -C src/orchestration/func_manifold_fetch || echo "* $@" >> $(MAKE_FAILURE_LOG) manifold-update-questions: - $(MAKE) -C src/questions/manifold/update_questions || echo "* $@" >> $(MAKE_FAILURE_LOG) + $(MAKE) -C src/orchestration/func_manifold_update || echo "* $@" >> $(MAKE_FAILURE_LOG) metaculus: metaculus-fetch metaculus-update-questions diff --git a/src/_schemas.py b/src/_schemas.py index 3c670723..d38c0516 100644 --- a/src/_schemas.py +++ b/src/_schemas.py @@ -92,6 +92,18 @@ class InferFetchFrame(QuestionFrame): nullify_question: Series[bool] +class ManifoldFetchFrame(pa.DataFrameModel): + """Output of ManifoldSource.fetch(). Just market IDs from search-markets endpoint.""" + + id: Series[str] + + class Config: + """Schema configuration.""" + + strict = False + coerce = True + + class AcledResolutionFrame(pa.DataFrameModel): """ACLED-specific: aggregated events by country and date. diff --git a/src/questions/manifold/fetch/Makefile b/src/orchestration/func_manifold_fetch/Makefile similarity index 55% rename from src/questions/manifold/fetch/Makefile rename to src/orchestration/func_manifold_fetch/Makefile index 625b4b51..09a7aa84 100644 --- a/src/questions/manifold/fetch/Makefile +++ b/src/orchestration/func_manifold_fetch/Makefile @@ -9,27 +9,33 @@ UPLOAD_DIR = upload .gcloudignore: cp -r $(ROOT_DIR)src/helpers/.gcloudignore . -Procfile: - cp -r $(ROOT_DIR)src/helpers/Procfile . +Dockerfile: $(ROOT_DIR)src/helpers/Dockerfile.template + sed \ + -e 's/REGION/$(CLOUD_DEPLOY_REGION)/g' \ + -e 's/STACK/google-22-full/g' \ + -e 's/PYTHON_VERSION/python312/g' \ + $< > Dockerfile -deploy : main.py .gcloudignore requirements.txt Procfile +deploy : .gcloudignore requirements.txt Dockerfile mkdir -p $(UPLOAD_DIR) cp -r $(ROOT_DIR)utils $(UPLOAD_DIR)/ - cp -r $(ROOT_DIR)src/helpers $(UPLOAD_DIR)/ - cp -r $(ROOT_DIR)src/sources $(UPLOAD_DIR)/ - cp $(ROOT_DIR)src/_fb_types.py $(UPLOAD_DIR)/ - cp $(ROOT_DIR)src/_schemas.py $(UPLOAD_DIR)/ + cp -r $(ROOT_DIR)src/helpers $(UPLOAD_DIR)/helpers + cp -r $(ROOT_DIR)src/sources $(UPLOAD_DIR)/sources mkdir -p $(UPLOAD_DIR)/orchestration cp $(ROOT_DIR)src/orchestration/__init__.py $(UPLOAD_DIR)/orchestration/ - cp $(ROOT_DIR)src/orchestration/_io.py $(UPLOAD_DIR)/orchestration/ - cp $^ $(UPLOAD_DIR)/ + cp $(ROOT_DIR)src/orchestration/_source_io.py $(UPLOAD_DIR)/orchestration/ + cp $(ROOT_DIR)src/_fb_types.py $(UPLOAD_DIR)/ + cp $(ROOT_DIR)src/_schemas.py $(UPLOAD_DIR)/ + cp main.py $(UPLOAD_DIR)/main.py + cp requirements.txt $(UPLOAD_DIR)/requirements.txt + cp Dockerfile $(UPLOAD_DIR)/ gcloud run jobs deploy \ func-data-manifold-fetch \ --project $(CLOUD_PROJECT) \ --region $(CLOUD_DEPLOY_REGION) \ --tasks 1 \ --parallelism 1 \ - --task-timeout 30s \ + --task-timeout 60s \ --memory 512Mi \ --max-retries 0 \ --service-account $(QUESTION_BANK_BUCKET_SERVICE_ACCOUNT) \ @@ -37,4 +43,4 @@ deploy : main.py .gcloudignore requirements.txt Procfile --source $(UPLOAD_DIR) clean : - rm -rf $(UPLOAD_DIR) .gcloudignore Procfile + rm -rf $(UPLOAD_DIR) .gcloudignore Dockerfile diff --git a/src/orchestration/func_manifold_fetch/main.py b/src/orchestration/func_manifold_fetch/main.py new file mode 100644 index 00000000..9c9c3331 --- /dev/null +++ b/src/orchestration/func_manifold_fetch/main.py @@ -0,0 +1,30 @@ +"""Manifold fetch entry point.""" + +from __future__ import annotations + +import logging +from typing import Any + +from helpers import decorator +from orchestration import _source_io +from sources.manifold import ManifoldSource + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +SOURCE = "manifold" + + +@decorator.log_runtime +def driver(_: Any) -> None: + """Fetch Manifold market IDs and upload to question bank.""" + source = ManifoldSource() + + dff = source.fetch() + + _source_io.write_fetch_output(SOURCE, dff) + logger.info("Done.") + + +if __name__ == "__main__": + driver(None) diff --git a/src/questions/manifold/update_questions/requirements.txt b/src/orchestration/func_manifold_fetch/requirements.txt similarity index 88% rename from src/questions/manifold/update_questions/requirements.txt rename to src/orchestration/func_manifold_fetch/requirements.txt index 88cd3eee..1d2a4d8b 100644 --- a/src/questions/manifold/update_questions/requirements.txt +++ b/src/orchestration/func_manifold_fetch/requirements.txt @@ -1,8 +1,10 @@ google-cloud-storage google-cloud-secret-manager pandas>=2.2.2,<3.0 -pyarrow -backoff -certifi pandera termcolor +requests +certifi +backoff +numpy +pyarrow diff --git a/src/questions/manifold/update_questions/Makefile b/src/orchestration/func_manifold_update/Makefile similarity index 57% rename from src/questions/manifold/update_questions/Makefile rename to src/orchestration/func_manifold_update/Makefile index 5382e5ae..5e4d8f18 100644 --- a/src/questions/manifold/update_questions/Makefile +++ b/src/orchestration/func_manifold_update/Makefile @@ -9,20 +9,26 @@ UPLOAD_DIR = upload .gcloudignore: cp -r $(ROOT_DIR)src/helpers/.gcloudignore . -Procfile: - cp -r $(ROOT_DIR)src/helpers/Procfile . +Dockerfile: $(ROOT_DIR)src/helpers/Dockerfile.template + sed \ + -e 's/REGION/$(CLOUD_DEPLOY_REGION)/g' \ + -e 's/STACK/google-22-full/g' \ + -e 's/PYTHON_VERSION/python312/g' \ + $< > Dockerfile -deploy : main.py .gcloudignore requirements.txt Procfile +deploy : .gcloudignore requirements.txt Dockerfile mkdir -p $(UPLOAD_DIR) cp -r $(ROOT_DIR)utils $(UPLOAD_DIR)/ - cp -r $(ROOT_DIR)src/helpers $(UPLOAD_DIR)/ - cp -r $(ROOT_DIR)src/sources $(UPLOAD_DIR)/ - cp $(ROOT_DIR)src/_fb_types.py $(UPLOAD_DIR)/ - cp $(ROOT_DIR)src/_schemas.py $(UPLOAD_DIR)/ + cp -r $(ROOT_DIR)src/helpers $(UPLOAD_DIR)/helpers + cp -r $(ROOT_DIR)src/sources $(UPLOAD_DIR)/sources mkdir -p $(UPLOAD_DIR)/orchestration cp $(ROOT_DIR)src/orchestration/__init__.py $(UPLOAD_DIR)/orchestration/ - cp $(ROOT_DIR)src/orchestration/_io.py $(UPLOAD_DIR)/orchestration/ - cp $^ $(UPLOAD_DIR)/ + cp $(ROOT_DIR)src/orchestration/_source_io.py $(UPLOAD_DIR)/orchestration/ + cp $(ROOT_DIR)src/_fb_types.py $(UPLOAD_DIR)/ + cp $(ROOT_DIR)src/_schemas.py $(UPLOAD_DIR)/ + cp main.py $(UPLOAD_DIR)/main.py + cp requirements.txt $(UPLOAD_DIR)/requirements.txt + cp Dockerfile $(UPLOAD_DIR)/ gcloud run jobs deploy \ func-data-manifold-update-questions \ --project $(CLOUD_PROJECT) \ @@ -37,4 +43,4 @@ deploy : main.py .gcloudignore requirements.txt Procfile --source $(UPLOAD_DIR) clean : - rm -rf $(UPLOAD_DIR) .gcloudignore Procfile + rm -rf $(UPLOAD_DIR) .gcloudignore Dockerfile diff --git a/src/orchestration/func_manifold_update/main.py b/src/orchestration/func_manifold_update/main.py new file mode 100644 index 00000000..c3642815 --- /dev/null +++ b/src/orchestration/func_manifold_update/main.py @@ -0,0 +1,54 @@ +"""Manifold update entry point.""" + +from __future__ import annotations + +import logging +from typing import Any + +from helpers import data_utils, decorator, env +from orchestration import _source_io +from sources.manifold import ManifoldSource +from utils import gcp + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +SOURCE = "manifold" + + +@decorator.log_runtime +def driver(_: Any) -> None: + """Update Manifold questions and resolution files.""" + source = ManifoldSource() + + dfq, dff = data_utils.get_data_from_cloud_storage( + SOURCE, return_question_data=True, return_fetch_data=True + ) + + logger.info("Loading existing resolution files...") + existing_resolution_files = _source_io.load_existing_resolution_files( + SOURCE, ids=dff["id"].astype(str).tolist() + ) + logger.info(f"Loaded {len(existing_resolution_files)} resolution files") + + files_in_storage = gcp.storage.list_with_prefix( + bucket_name=env.QUESTION_BANK_BUCKET, prefix=SOURCE + ) + + result = source.update( + dfq, + dff, + existing_resolution_files=existing_resolution_files, + files_in_storage=files_in_storage, + ) + + logger.info("Uploading to GCP...") + data_utils.upload_questions(result.dfq, SOURCE) + if result.resolution_files: + _source_io.upload_resolution_files(SOURCE, result.resolution_files) + + logger.info("Done.") + + +if __name__ == "__main__": + driver(None) diff --git a/src/questions/manifold/fetch/requirements.txt b/src/orchestration/func_manifold_update/requirements.txt similarity index 88% rename from src/questions/manifold/fetch/requirements.txt rename to src/orchestration/func_manifold_update/requirements.txt index 88cd3eee..1d2a4d8b 100644 --- a/src/questions/manifold/fetch/requirements.txt +++ b/src/orchestration/func_manifold_update/requirements.txt @@ -1,8 +1,10 @@ google-cloud-storage google-cloud-secret-manager pandas>=2.2.2,<3.0 -pyarrow -backoff -certifi pandera termcolor +requests +certifi +backoff +numpy +pyarrow diff --git a/src/questions/manifold/.gcloudignore b/src/questions/manifold/.gcloudignore deleted file mode 100644 index a5273b34..00000000 --- a/src/questions/manifold/.gcloudignore +++ /dev/null @@ -1,3 +0,0 @@ -*~ -.DS_Store -__pycache__ diff --git a/src/questions/manifold/fetch/main.py b/src/questions/manifold/fetch/main.py deleted file mode 100644 index 3bb8e752..00000000 --- a/src/questions/manifold/fetch/main.py +++ /dev/null @@ -1,145 +0,0 @@ -"""Fetch data from Manifold API.""" - -import json -import logging -import os -import sys -from datetime import timedelta - -import backoff -import certifi -import requests - -sys.path.append(os.path.join(os.path.dirname(__file__), "../../..")) -from helpers import data_utils, dates, decorator, env # noqa: E402 - -sys.path.append(os.path.join(os.path.dirname(__file__), "../../../..")) -from utils import gcp # noqa: E402 - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -filenames = data_utils.generate_filenames(source="manifold") - -MANIFOLD_TOPIC_SLUGS = [ - "ai", - "biotech", - "business", - "celebrities", - "chess", - "china", - "climate", - "culture-default", - "economics-default", - "entertainment", - "europe", - "finance", - "gaming", - "geopolitics", - "health", - "india", - "mathematics", - "middle-east", - "movies", - "music-f213cbf1eab5", - "politics-default", - "programming", - "russia", - "science-default", - "space", - "sports-default", - "stocks", - "technical-ai-timelines", - "technology-default", - "uk-politics", - "ukraine", - "us-politics", - "wars", - "world-default", -] - -TODAY = dates.get_date_today() - -MAX_RESOLUTION_DATE_IN_DAYS = 365 * 2 - -MAX_RESOLUTION_DATE = TODAY + timedelta(days=MAX_RESOLUTION_DATE_IN_DAYS) - -MIN_BETTER_COUNT = 17 - -MIN_LIQUIDITY = 120 - - -@backoff.on_exception( - backoff.expo, - requests.exceptions.RequestException, - max_time=500, - on_backoff=data_utils.print_error_info_handler, -) -def _call_endpoint(ids, additional_params=None): - """Get the top 100 markets from Manifold Markets.""" - endpoint = "https://api.manifold.markets/v0/search-markets" - params = { - "sort": "most-popular", - "contractType": "BINARY", - "filter": "open", - "limit": 100, - } - if additional_params: - params.update(additional_params) - logger.info(f"Calling {endpoint} with additional params {additional_params}") - - response = requests.get(endpoint, params=params, verify=certifi.where()) - if not response.ok: - logger.error( - f"Request to endpoint failed for {endpoint}: {response.status_code} Error. " - f"{response.text}" - ) - response.raise_for_status() - - def resolves_by(close_time_epoch_ms) -> bool: - close_sec = min(close_time_epoch_ms / 1000, dates.MAX_EPOCH_SEC) - close_date = dates.convert_epoch_time_in_sec_to_datetime(close_sec).date() - return close_date <= MAX_RESOLUTION_DATE - - selected_markets = { - market["id"] - for market in response.json() - if market["uniqueBettorCount"] >= MIN_BETTER_COUNT - and market["totalLiquidity"] >= MIN_LIQUIDITY - and resolves_by(market["closeTime"]) - } - ids.update(selected_markets) - return ids - - -def _get_data(): - """Get pertinent Manifold questions and data.""" - logger.info("Calling Manifold search-markets endpoint") - ids = _call_endpoint(set()) - for topic in MANIFOLD_TOPIC_SLUGS: - ids = _call_endpoint(ids, {"topicSlug": topic}) - return sorted(ids) - - -@decorator.log_runtime -def driver(_): - """Fetch Manifold data and update question file in GCP Cloud Storage.""" - # Get the latest Manifold data - ids = _get_data() - - # Save - with open(filenames["local_fetch"], "w") as f: - for id_str in ids: - f.write(json.dumps({"id": id_str}) + "\n") - - # Upload - gcp.storage.upload( - bucket_name=env.QUESTION_BANK_BUCKET, - local_filename=filenames["local_fetch"], - ) - - logger.info("Done.") - - -if __name__ == "__main__": - driver(None) diff --git a/src/questions/manifold/update_questions/main.py b/src/questions/manifold/update_questions/main.py deleted file mode 100644 index 1c67bfa5..00000000 --- a/src/questions/manifold/update_questions/main.py +++ /dev/null @@ -1,295 +0,0 @@ -"""Generate questions from Manifold API.""" - -import logging -import os -import sys -from datetime import timedelta - -import backoff -import certifi -import numpy as np -import pandas as pd -import requests - -sys.path.append(os.path.join(os.path.dirname(__file__), "../../..")) -from helpers import constants, data_utils, dates, decorator, env # noqa: E402 - -sys.path.append(os.path.join(os.path.dirname(__file__), "../../../..")) # noqa: E402 -from utils import gcp # noqa: E402 - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -source = "manifold" -filenames = data_utils.generate_filenames(source=source) - - -@backoff.on_exception( - backoff.expo, - requests.exceptions.RequestException, - max_time=200, - max_tries=10, - factor=2, - base=2, - on_backoff=data_utils.print_error_info_handler, -) -def _get_market(market_id): - """Get the market description and close time for the specified market.""" - logger.info(f"Calling market endpoint for {market_id}") - endpoint = f"https://api.manifold.markets/v0/market/{market_id}" - response = requests.get(endpoint, verify=certifi.where()) - if not response.ok: - logger.error(f"Request to market endpoint failed for {market_id}.") - response.raise_for_status() - return response.json() - - -@backoff.on_exception( - backoff.expo, - requests.exceptions.RequestException, - max_time=200, - max_tries=10, - factor=2, - base=2, - on_backoff=data_utils.print_error_info_handler, -) -def _get_market_forecasts(market_id): - """Get the market description and close time for the specified market.""" - logger.info(f"Calling bets endpoint for {market_id}") - endpoint = "https://api.manifold.markets/v0/bets" - max_bets_to_return = 1000 - params = { - "contractId": market_id, - "limit": max_bets_to_return, - } - - all_bets = [] - n_requests = 0 - while True: - n_requests += 1 - if n_requests % 100 == 0: - logger.info(f"Request number {n_requests} for {market_id}.") - response = requests.get(endpoint, params=params, verify=certifi.where()) - if not response.ok: - logger.error(f"Request to bets endpoint failed for {market_id}.") - response.raise_for_status() - if len(response.json()) == 0: - break - new_bets = [m for m in response.json()] - - all_bets += new_bets - if ( - all_bets[-1]["createdTime"] < constants.BENCHMARK_START_DATE_EPOCHTIME_MS - or len(new_bets) < max_bets_to_return - ): - break - params["before"] = all_bets[-1]["id"] - return all_bets - - -def _update_questions_and_resolved_values(dfq, dff): - """Update the dataframes that hold the questions and the resolution values. - - For Manifold, store resolution values by market id to decrease calls to endpoint. First check - the file to see if an entry exists for today. If so, skip. Otherwise, recreate the file. When - done, return dfq. - - dfq: Manifold questions in the question bank - dff: Today's fetched markets - """ - # Use yesterday because we run at midnight UTC so have complete info for yesterday. - YESTERDAY = dates.get_date_today() - timedelta(days=1) - - def _get_resolved_market_value(market): - """Get the market value based on the resolution. - - A market that has resolved should return the resolved value. The possible values for - market["resolution"] and the associated return values are: - * YES -> 1 - * NO -> 0 - * MKT -> market probability - * CANCEL (i.e. N/A) -> NaN - """ - return {"YES": 1, "NO": 0, "MKT": market["resolutionProbability"]}.get( - market["resolution"], np.nan - ) - - def _create_resolution_file(dfq, index, market): - - basename = f"{market['id']}.jsonl" - remote_filename = f"{source}/{basename}" - local_filename = "/tmp/tmp.jsonl" - if os.path.exists(local_filename): - os.remove(local_filename) - gcp.storage.download_no_error_message_on_404( - bucket_name=env.QUESTION_BANK_BUCKET, - filename=remote_filename, - local_filename=local_filename, - ) - - if os.path.exists(local_filename): - df = pd.read_json( - local_filename, - lines=True, - dtype=constants.RESOLUTION_FILE_COLUMN_DTYPE, - convert_dates=False, - ) - else: - df = pd.DataFrame( - { - col: pd.Series( - dtype=( - constants.RESOLUTION_FILE_COLUMN_DTYPE[col] - if col in constants.RESOLUTION_FILE_COLUMN_DTYPE - else "object" - ) - ) - for col in constants.RESOLUTION_FILE_COLUMNS - } - ) - - if not df.empty and pd.to_datetime(df["date"].iloc[-1]).date() >= YESTERDAY: - # Check last datetime to see if we've already gotten the resolution value for today - # If we have, return to avoid unnecessary API calls - return df["value"].iloc[-1] - - # Get the last market value for the day and make this the value for the day - forecasts = _get_market_forecasts(market["id"]) - df = pd.DataFrame( - [ - { - "datetime": dates.convert_epoch_time_in_ms_to_iso(forecast["createdTime"]), - "value": forecast["probAfter"], - } - for forecast in forecasts - if forecast.get("isFilled") - ] - ) - if df.empty: - return None - - df["datetime"] = pd.to_datetime(df["datetime"]) - df = df.sort_values(by="datetime") - df["date"] = df["datetime"].dt.date - df = df[df["date"] <= YESTERDAY] - if df.empty: - # empty if this market only has forecasts from today - return None - - df = df.groupby(by="date").last().reset_index() - df = df[["date", "value"]] - - date_range = pd.date_range(start=df["date"].min(), end=YESTERDAY, freq="D") - if market["isResolved"]: - # If the market has been resolved, add the market value and resolution date - resolved_date = pd.Timestamp(dfq.at[index, "market_info_resolution_datetime"]).date() - df = df[df["date"] < resolved_date] - df.loc[len(df)] = { - "date": resolved_date, - "value": _get_resolved_market_value(market), - } - date_range = pd.date_range(start=df["date"].min(), end=resolved_date, freq="D") - - df_dates = pd.DataFrame(date_range, columns=["date"]) - df_dates["date"] = df_dates["date"].dt.date - df = pd.merge(left=df_dates, right=df, on="date", how="left") - - if market["isResolved"]: - # The last date was set to the resolution value. This could be NaN, so don't forward - # fill it, because the question has actually been nullified. - df.iloc[:-1] = df.iloc[:-1].ffill() - else: - df = df.ffill() - - df["id"] = market["id"] - df = df[["id", "date", "value"]].astype(dtype=constants.RESOLUTION_FILE_COLUMN_DTYPE) - - # Save and Upload - df.to_json(local_filename, orient="records", lines=True, date_format="iso") - gcp.storage.upload( - bucket_name=env.QUESTION_BANK_BUCKET, - local_filename=local_filename, - filename=remote_filename, - ) - - # Return the last market value for the series - return df["value"].iloc[-1] - - def _assign_market_values_to_df(df, index, market): - url = market["url"] - df.at[index, "question"] = market["question"] - df.at[index, "background"] = market["textDescription"] - df.at[index, "market_info_resolution_criteria"] = "N/A" - df.at[index, "market_info_open_datetime"] = dates.convert_epoch_time_in_ms_to_iso( - market["createdTime"] - ) - df.at[index, "market_info_close_datetime"] = dates.convert_epoch_time_in_ms_to_iso( - market["closeTime"] - ) - df.at[index, "url"] = url - if market["isResolved"]: - df.at[index, "resolved"] = True - df.at[index, "market_info_resolution_datetime"] = dates.convert_epoch_time_in_ms_to_iso( - market["resolutionTime"] - ) - df.at[index, "forecast_horizons"] = "N/A" - return df - - # Find rows in dff not in dfq: These are the new markets to add to dfq - col_to_append = dff[~dff["id"].isin(dfq["id"])]["id"] - - # Set all non-id columns to `None` for the new markets - df_ids_to_append = pd.DataFrame(col_to_append).assign( - **{col: None for col in dfq.columns if col != "id"} - ) - df_ids_to_append["resolved"] = False - df_ids_to_append["freeze_datetime_value_explanation"] = "The market value." - df_ids_to_append["market_info_resolution_datetime"] = "N/A" - dfq = pd.concat([dfq, df_ids_to_append], ignore_index=True) - - # Update all unresolved questions in dfq. Update resolved, resolution_datetime, and background. - # Recreate all rows of resolution files for unresolved questions - dfq["resolved"] = dfq["resolved"].astype(bool) - for index, row in dfq[~dfq["resolved"]].iterrows(): - market = _get_market(row["id"]) - dfq = _assign_market_values_to_df(dfq, index, market) - dfq.at[index, "freeze_datetime_value"] = _create_resolution_file(dfq, index, market) - - # Save and upload - # Upload dfq before checking resolved questions in case we hit rate limit - data_utils.upload_questions(dfq, source) - - for index, row in dfq[dfq["resolved"]].iterrows(): - # Regenerate resolution files in case they've been deleted - resolved_files = gcp.storage.list_with_prefix( - bucket_name=env.QUESTION_BANK_BUCKET, prefix=source - ) - filename = f"{row['id']}.jsonl" - if filename not in resolved_files: - market = _get_market(row["id"]) - _create_resolution_file(dfq, index, market) - - -@decorator.log_runtime -def driver(_): - """Pull in fetched data and update questions and resolved values in question bank.""" - # Download pertinent files from Cloud Storage - dff = data_utils.download_and_read( - filename=filenames["jsonl_fetch"], - local_filename=filenames["local_fetch"], - df_tmp=pd.DataFrame(columns=["id"]), - dtype={"id": str}, - ) - dfq = data_utils.get_data_from_cloud_storage( - source=source, - return_question_data=True, - ) - - # Update the existing questions and resolution values - _update_questions_and_resolved_values(dfq, dff) - - logger.info("Done.") - - -if __name__ == "__main__": - driver(None) diff --git a/src/sources/manifold.py b/src/sources/manifold.py index 693dc8fe..6b52b175 100644 --- a/src/sources/manifold.py +++ b/src/sources/manifold.py @@ -2,20 +2,394 @@ from __future__ import annotations -from typing import ClassVar +import logging +from datetime import timedelta +from typing import Any, ClassVar + +import backoff +import certifi +import numpy as np +import pandas as pd +import pandera.pandas as pa +import requests +from pandera.typing import DataFrame + +from _fb_types import UpdateResult +from _schemas import ManifoldFetchFrame, QuestionFrame, ResolutionFrame +from helpers import constants, data_utils, dates from ._market import MarketSource +logger = logging.getLogger(__name__) + +_MANIFOLD_API_BASE = "https://api.manifold.markets/v0" + +_TOPIC_SLUGS = [ + "ai", + "biotech", + "business", + "celebrities", + "chess", + "china", + "climate", + "culture-default", + "economics-default", + "entertainment", + "europe", + "finance", + "gaming", + "geopolitics", + "health", + "india", + "mathematics", + "middle-east", + "movies", + "music-f213cbf1eab5", + "politics-default", + "programming", + "russia", + "science-default", + "space", + "sports-default", + "stocks", + "technical-ai-timelines", + "technology-default", + "uk-politics", + "ukraine", + "us-politics", + "wars", + "world-default", +] + +_MAX_RESOLUTION_DATE_IN_DAYS = 365 * 2 +_MIN_BETTOR_COUNT = 17 +_MIN_LIQUIDITY = 120 + class ManifoldSource(MarketSource): """Manifold prediction market source.""" name: ClassVar[str] = "manifold" - def fetch(self, **kwargs): - """Fetch Manifold data from external API.""" - raise NotImplementedError + # ------------------------------------------------------------------ + # Public: fetch + # ------------------------------------------------------------------ + + @pa.check_types + def fetch(self, **kwargs: Any) -> DataFrame[ManifoldFetchFrame]: + """Fetch market IDs from Manifold search-markets endpoint. + + Calls search-markets (1 global + N topic slugs), filters by + min bettors, min liquidity, and max resolution date. + """ + ids = self._search_markets() + logger.info(f"Discovered {len(ids)} candidate market IDs from search.") + return pd.DataFrame({"id": sorted(ids)}) + + # ------------------------------------------------------------------ + # Public: update + # ------------------------------------------------------------------ + + @pa.check_types + def update( + self, + dfq: DataFrame[QuestionFrame], + dff: DataFrame[ManifoldFetchFrame], + *, + existing_resolution_files: dict[str, DataFrame[ResolutionFrame]] | None = None, + files_in_storage: list[str] | None = None, + ) -> UpdateResult: + """Process fetched IDs into updated questions and resolution files. + + For each new ID in dff, appends to dfq. Then for each unresolved question, + fetches market details and builds resolution files. Finally regenerates + missing resolution files for resolved questions. + + Args: + dfq (DataFrame[QuestionFrame]): Existing questions. + dff (DataFrame[ManifoldFetchFrame]): Freshly fetched market IDs. + existing_resolution_files (dict | None): Per-question existing resolution data. + files_in_storage (list[str] | None): Existing resolution file paths in storage. + """ + existing_resolution_files = existing_resolution_files or {} + files_in_storage = files_in_storage or [] + resolution_files: dict[str, pd.DataFrame] = {} + + # --- Append new IDs from dff to dfq --- + new_ids = dff[~dff["id"].isin(dfq["id"])]["id"] + if not new_ids.empty: + df_new = pd.DataFrame({"id": new_ids}).assign( + **{col: None for col in dfq.columns if col != "id"} + ) + df_new["resolved"] = False + df_new["freeze_datetime_value_explanation"] = "The market value." + df_new["market_info_resolution_datetime"] = "N/A" + dfq = pd.concat([dfq, df_new], ignore_index=True) + + # --- Update all unresolved questions --- + dfq["resolved"] = dfq["resolved"].astype(bool) + for index, row in dfq[~dfq["resolved"]].iterrows(): + market = self._get_market(row["id"]) + if market is None: + continue + + # Assign market details to dfq row + dfq.at[index, "question"] = market["question"] + dfq.at[index, "background"] = market.get("textDescription", "") + dfq.at[index, "market_info_resolution_criteria"] = "N/A" + dfq.at[index, "market_info_open_datetime"] = dates.convert_epoch_time_in_ms_to_iso( + market["createdTime"] + ) + dfq.at[index, "market_info_close_datetime"] = dates.convert_epoch_time_in_ms_to_iso( + market["closeTime"] + ) + dfq.at[index, "url"] = market.get("url", "") + if market["isResolved"]: + dfq.at[index, "resolved"] = True + dfq.at[index, "market_info_resolution_datetime"] = ( + dates.convert_epoch_time_in_ms_to_iso(market["resolutionTime"]) + ) + dfq.at[index, "forecast_horizons"] = "N/A" + + # Build resolution file + existing_df = existing_resolution_files.get(row["id"]) + df_res = self._build_resolution_file( + market=market, + market_info_resolution_datetime=dfq.at[index, "market_info_resolution_datetime"], + existing_df=existing_df, + ) + if df_res is not None: + resolution_files[row["id"]] = df_res + dfq.at[index, "freeze_datetime_value"] = df_res["value"].iloc[-1] + + # --- Regenerate missing resolution files for resolved questions --- + for _index, row in dfq[dfq["resolved"]].iterrows(): + filename = f"{self.name}/{row['id']}.jsonl" + if filename not in files_in_storage and row["id"] not in resolution_files: + market = self._get_market(row["id"]) + if market is None: + continue + df_res = self._build_resolution_file( + market=market, + market_info_resolution_datetime=row["market_info_resolution_datetime"], + existing_df=None, + ) + if df_res is not None: + resolution_files[row["id"]] = df_res + + return UpdateResult( + dfq=dfq, + resolution_files=resolution_files, + ) + + # ------------------------------------------------------------------ + # Private: search-markets API + # ------------------------------------------------------------------ + + @backoff.on_exception( + backoff.expo, + requests.exceptions.RequestException, + max_time=500, + on_backoff=data_utils.print_error_info_handler, + ) + def _call_search_endpoint(self, additional_params: dict | None = None) -> set[str]: + """Call search-markets and return qualifying market IDs.""" + endpoint = f"{_MANIFOLD_API_BASE}/search-markets" + params: dict[str, Any] = { + "sort": "most-popular", + "contractType": "BINARY", + "filter": "open", + "limit": 100, + } + if additional_params: + params.update(additional_params) + logger.info(f"Calling {endpoint} with additional params {additional_params}") + + response = requests.get(endpoint, params=params, verify=certifi.where()) + if not response.ok: + logger.error( + f"Request to endpoint failed for {endpoint}: {response.status_code} Error. " + f"{response.text}" + ) + response.raise_for_status() + + today = dates.get_date_today() + max_resolution_date = today + timedelta(days=_MAX_RESOLUTION_DATE_IN_DAYS) + + def resolves_by(close_time_epoch_ms: int) -> bool: + close_sec = min(close_time_epoch_ms / 1000, dates.MAX_EPOCH_SEC) + close_date = dates.convert_epoch_time_in_sec_to_datetime(close_sec).date() + return close_date <= max_resolution_date + + return { + market["id"] + for market in response.json() + if market["uniqueBettorCount"] >= _MIN_BETTOR_COUNT + and market["totalLiquidity"] >= _MIN_LIQUIDITY + and resolves_by(market["closeTime"]) + } + + def _search_markets(self) -> set[str]: + """Discover market IDs across all topic slugs.""" + logger.info("Calling Manifold search-markets endpoint") + ids = self._call_search_endpoint() + for topic in _TOPIC_SLUGS: + ids |= self._call_search_endpoint({"topicSlug": topic}) + return ids + + # ------------------------------------------------------------------ + # Private: market detail API + # ------------------------------------------------------------------ + + @backoff.on_exception( + backoff.expo, + requests.exceptions.RequestException, + max_time=200, + max_tries=10, + factor=2, + base=2, + on_backoff=data_utils.print_error_info_handler, + ) + def _get_market(self, market_id: str) -> dict | None: + """Fetch full market details from /market/{id}.""" + logger.info(f"Calling market endpoint for {market_id}") + endpoint = f"{_MANIFOLD_API_BASE}/market/{market_id}" + response = requests.get(endpoint, verify=certifi.where()) + if not response.ok: + logger.error(f"Request to market endpoint failed for {market_id}.") + response.raise_for_status() + return response.json() + + # ------------------------------------------------------------------ + # Private: bets API + # ------------------------------------------------------------------ + + @backoff.on_exception( + backoff.expo, + requests.exceptions.RequestException, + max_time=200, + max_tries=10, + factor=2, + base=2, + on_backoff=data_utils.print_error_info_handler, + ) + def _get_market_bets(self, market_id: str) -> list[dict]: + """Fetch all bets for a market with pagination.""" + logger.info(f"Calling bets endpoint for {market_id}") + endpoint = f"{_MANIFOLD_API_BASE}/bets" + max_bets_to_return = 1000 + params: dict[str, Any] = { + "contractId": market_id, + "limit": max_bets_to_return, + } + + all_bets: list[dict] = [] + n_requests = 0 + while True: + n_requests += 1 + if n_requests % 100 == 0: + logger.info(f"Request number {n_requests} for {market_id}.") + response = requests.get(endpoint, params=params, verify=certifi.where()) + if not response.ok: + logger.error(f"Request to bets endpoint failed for {market_id}.") + response.raise_for_status() + new_bets = response.json() + if not new_bets: + break + + all_bets += new_bets + if ( + all_bets[-1]["createdTime"] < constants.BENCHMARK_START_DATE_EPOCHTIME_MS + or len(new_bets) < max_bets_to_return + ): + break + params["before"] = all_bets[-1]["id"] + return all_bets + + # ------------------------------------------------------------------ + # Private: resolution file building + # ------------------------------------------------------------------ + + def _build_resolution_file( + self, + market: dict, + market_info_resolution_datetime: str, + existing_df: DataFrame[ResolutionFrame] | None = None, + ) -> DataFrame[ResolutionFrame] | None: + """Build or update a resolution file for a single market.""" + yesterday = dates.get_date_today() - timedelta(days=1) + market_id = market["id"] + + # --- Already up-to-date check --- + if ( + existing_df is not None + and not existing_df.empty + and pd.to_datetime(existing_df["date"].iloc[-1]).date() >= yesterday + ): + return existing_df + + # --- Fetch bets and build daily series --- + forecasts = self._get_market_bets(market_id) + df = pd.DataFrame( + [ + { + "datetime": dates.convert_epoch_time_in_ms_to_iso(forecast["createdTime"]), + "value": forecast["probAfter"], + } + for forecast in forecasts + if forecast.get("isFilled") + ] + ) + if df.empty: + return None + + df["datetime"] = pd.to_datetime(df["datetime"]) + df = df.sort_values(by="datetime") + df["date"] = df["datetime"].dt.date + df = df[df["date"] <= yesterday] + if df.empty: + return None + + df = df.groupby(by="date").last().reset_index() + df = df[["date", "value"]] + + # --- Forward-fill missing dates --- + date_range = pd.date_range(start=df["date"].min(), end=yesterday, freq="D") + if market["isResolved"]: + resolved_date = pd.Timestamp(market_info_resolution_datetime).date() + df = df[df["date"] < resolved_date] + df.loc[len(df)] = { + "date": resolved_date, + "value": self._get_resolved_market_value(market), + } + date_range = pd.date_range(start=df["date"].min(), end=resolved_date, freq="D") + + df_dates = pd.DataFrame(date_range, columns=["date"]) + df_dates["date"] = df_dates["date"].dt.date + df = pd.merge(left=df_dates, right=df, on="date", how="left") + + if market["isResolved"]: + # Don't forward-fill last row (could be NaN for CANCEL) + df.iloc[:-1] = df.iloc[:-1].ffill() + else: + df = df.ffill() + + df["id"] = market_id + return self._finalize_resolution_df(df) + + @staticmethod + def _finalize_resolution_df(df: pd.DataFrame) -> DataFrame[ResolutionFrame]: + """Filter to benchmark period and validate as ResolutionFrame.""" + df["date"] = pd.to_datetime(df["date"]) + df = df[df["date"].dt.date >= constants.BENCHMARK_START_DATE_DATETIME_DATE] + df = df[["id", "date", "value"]].astype(dtype=constants.RESOLUTION_FILE_COLUMN_DTYPE) + return ResolutionFrame.validate(df) - def update(self, dfq, dff, **kwargs): - """Process fetched Manifold data into questions and resolution files.""" - raise NotImplementedError + @staticmethod + def _get_resolved_market_value(market: dict) -> float: + """Map resolution outcome to numeric value. + YES -> 1, NO -> 0, MKT -> market probability, CANCEL -> NaN + """ + return {"YES": 1, "NO": 0, "MKT": market["resolutionProbability"]}.get( + market["resolution"], np.nan + ) diff --git a/src/tests/conftest.py b/src/tests/conftest.py index 779e5b1e..2995891c 100644 --- a/src/tests/conftest.py +++ b/src/tests/conftest.py @@ -10,6 +10,7 @@ from sources.acled import AcledSource from sources.fred import FredSource from sources.infer import InferSource +from sources.manifold import ManifoldSource from sources.metaculus import MetaculusSource # --------------------------------------------------------------------------- @@ -77,6 +78,12 @@ def infer_source(): return src +@pytest.fixture() +def manifold_source(): + """Return a ManifoldSource instance.""" + return ManifoldSource() + + # --------------------------------------------------------------------------- # DataFrame factories # --------------------------------------------------------------------------- @@ -252,3 +259,60 @@ def make_infer_fetch_df(rows): if col not in df.columns: df[col] = default return df + + +# --------------------------------------------------------------------------- +# Manifold-specific factories +# --------------------------------------------------------------------------- + + +def make_manifold_api_market(**overrides): + """Build a realistic Manifold market dict as returned by /market/{id}.""" + base = { + "id": "mkt_001", + "question": "Will X happen by 2026?", + "textDescription": "Background text.", + "createdTime": 1704067200000, # 2024-01-01 epoch ms + "closeTime": 1735689600000, # 2025-01-01 epoch ms + "isResolved": False, + "resolution": None, + "resolutionTime": None, + "resolutionProbability": None, + "url": "https://manifold.markets/user/test-market", + "uniqueBettorCount": 20, + "totalLiquidity": 200, + } + base.update(overrides) + return base + + +def make_manifold_search_result(**overrides): + """Build a search result item from /search-markets (subset of market fields).""" + base = { + "id": "mkt_001", + "uniqueBettorCount": 20, + "totalLiquidity": 200, + "closeTime": 1735689600000, # 2025-01-01 epoch ms + } + base.update(overrides) + return base + + +def make_manifold_bet(**overrides): + """Build a single bet dict as returned by /bets endpoint.""" + base = { + "id": "bet_001", + "contractId": "mkt_001", + "createdTime": 1717200000000, # ~2024-06-01 epoch ms + "probAfter": 0.6, + "probBefore": 0.5, + "isFilled": True, + "amount": 10, + } + base.update(overrides) + return base + + +def make_manifold_fetch_df(rows): + """Build a DataFrame matching ManifoldFetchFrame schema (just id column).""" + return pd.DataFrame(rows) diff --git a/src/tests/test_manifold.py b/src/tests/test_manifold.py new file mode 100644 index 00000000..184b042f --- /dev/null +++ b/src/tests/test_manifold.py @@ -0,0 +1,649 @@ +"""Tests for ManifoldSource fetch/update logic.""" + +from datetime import date +from unittest.mock import Mock, patch + +import numpy as np +import pandas as pd + +from _schemas import ManifoldFetchFrame, QuestionFrame, ResolutionFrame +from sources.manifold import ManifoldSource + +from .conftest import ( + make_manifold_api_market, + make_manifold_bet, + make_manifold_fetch_df, + make_manifold_search_result, + make_question_df, + make_resolution_df, +) + +# --------------------------------------------------------------------------- +# _get_resolved_market_value (pure, no mocking) +# --------------------------------------------------------------------------- + + +class TestGetResolvedMarketValue: + """Tests for ManifoldSource._get_resolved_market_value static method.""" + + def test_yes_resolution(self): + """YES resolution returns 1.0.""" + market = make_manifold_api_market(resolution="YES") + assert ManifoldSource._get_resolved_market_value(market) == 1.0 + + def test_no_resolution(self): + """NO resolution returns 0.0.""" + market = make_manifold_api_market(resolution="NO") + assert ManifoldSource._get_resolved_market_value(market) == 0.0 + + def test_mkt_resolution(self): + """MKT resolution returns resolutionProbability.""" + market = make_manifold_api_market(resolution="MKT", resolutionProbability=0.73) + assert ManifoldSource._get_resolved_market_value(market) == 0.73 + + def test_cancel_resolution(self): + """CANCEL resolution returns NaN.""" + market = make_manifold_api_market(resolution="CANCEL") + assert np.isnan(ManifoldSource._get_resolved_market_value(market)) + + def test_unknown_resolution(self): + """Unknown resolution string returns NaN.""" + market = make_manifold_api_market(resolution="FOOBAR") + assert np.isnan(ManifoldSource._get_resolved_market_value(market)) + + +# --------------------------------------------------------------------------- +# _finalize_resolution_df (pure, no mocking) +# --------------------------------------------------------------------------- + + +class TestFinalizeResolutionDf: + """Tests for ManifoldSource._finalize_resolution_df static method.""" + + def test_filters_before_benchmark_start(self): + """Rows before BENCHMARK_START_DATE are dropped.""" + df = pd.DataFrame( + { + "id": ["A", "A", "A"], + "date": pd.to_datetime(["2020-01-01", "2024-06-01", "2024-07-01"]), + "value": [0.1, 0.2, 0.3], + } + ) + result = ManifoldSource._finalize_resolution_df(df) + assert len(result) == 2 + assert result["value"].tolist() == [0.2, 0.3] + + def test_validates_schema(self): + """Output is a valid ResolutionFrame.""" + df = pd.DataFrame( + { + "id": ["A"], + "date": pd.to_datetime(["2024-06-01"]), + "value": [0.5], + } + ) + result = ManifoldSource._finalize_resolution_df(df) + ResolutionFrame.validate(result) + + def test_only_keeps_id_date_value(self): + """Extra columns are stripped.""" + df = pd.DataFrame( + { + "id": ["A"], + "date": pd.to_datetime(["2024-06-01"]), + "value": [0.5], + "extra": ["junk"], + } + ) + result = ManifoldSource._finalize_resolution_df(df) + assert list(result.columns) == ["id", "date", "value"] + + +# --------------------------------------------------------------------------- +# _build_resolution_file (mock _get_market_bets) +# --------------------------------------------------------------------------- + + +class TestBuildResolutionFile: + """Tests for ManifoldSource._build_resolution_file.""" + + @patch.object(ManifoldSource, "_get_market_bets") + def test_already_up_to_date(self, mock_bets, manifold_source, freeze_today): + """Skips API call if existing data covers through yesterday.""" + freeze_today(date(2026, 1, 15)) + existing = make_resolution_df( + [ + {"id": "mkt_001", "date": "2024-06-01", "value": 0.5}, + {"id": "mkt_001", "date": "2026-01-14", "value": 0.6}, + ] + ) + market = make_manifold_api_market() + result = manifold_source._build_resolution_file( + market=market, market_info_resolution_datetime="N/A", existing_df=existing + ) + + assert result.equals(existing) + mock_bets.assert_not_called() + + @patch.object(ManifoldSource, "_get_market_bets") + def test_basic_unresolved_market(self, mock_bets, manifold_source, freeze_today): + """Builds valid time series from filled bets for an unresolved market.""" + freeze_today(date(2026, 1, 15)) + mock_bets.return_value = [ + make_manifold_bet(id="b1", createdTime=1768046400000, probAfter=0.4), # 2026-01-10 + make_manifold_bet(id="b2", createdTime=1768226400000, probAfter=0.6), # 2026-01-12 + ] + market = make_manifold_api_market() + result = manifold_source._build_resolution_file( + market=market, market_info_resolution_datetime="N/A", existing_df=None + ) + + assert result is not None + assert not result.empty + assert (result["id"] == "mkt_001").all() + ResolutionFrame.validate(result) + # Should have forward-filled dates: 10, 11, 12, 13, 14 + assert len(result) >= 5 + + @patch.object(ManifoldSource, "_get_market_bets") + def test_empty_bets_returns_none(self, mock_bets, manifold_source, freeze_today): + """No bets returns None.""" + freeze_today(date(2026, 1, 15)) + mock_bets.return_value = [] + market = make_manifold_api_market() + result = manifold_source._build_resolution_file( + market=market, market_info_resolution_datetime="N/A", existing_df=None + ) + assert result is None + + @patch.object(ManifoldSource, "_get_market_bets") + def test_no_filled_bets_returns_none(self, mock_bets, manifold_source, freeze_today): + """All bets with isFilled=False returns None.""" + freeze_today(date(2026, 1, 15)) + mock_bets.return_value = [ + make_manifold_bet(isFilled=False, createdTime=1768046400000), + make_manifold_bet(isFilled=False, createdTime=1768226400000), + ] + market = make_manifold_api_market() + result = manifold_source._build_resolution_file( + market=market, market_info_resolution_datetime="N/A", existing_df=None + ) + assert result is None + + @patch.object(ManifoldSource, "_get_market_bets") + def test_forward_fills_gaps(self, mock_bets, manifold_source, freeze_today): + """Missing dates between bets are forward-filled.""" + freeze_today(date(2026, 1, 15)) + mock_bets.return_value = [ + make_manifold_bet(id="b1", createdTime=1768046400000, probAfter=0.3), # 2026-01-10 + make_manifold_bet(id="b2", createdTime=1768384800000, probAfter=0.8), # 2026-01-14 + ] + market = make_manifold_api_market() + result = manifold_source._build_resolution_file( + market=market, market_info_resolution_datetime="N/A", existing_df=None + ) + + dates_in_df = pd.to_datetime(result["date"]).dt.date.tolist() + # 11th, 12th, 13th should be forward-filled from the 10th's value + assert date(2026, 1, 11) in dates_in_df + assert date(2026, 1, 12) in dates_in_df + assert date(2026, 1, 13) in dates_in_df + + @patch.object(ManifoldSource, "_get_market_bets") + def test_resolved_truncates_at_resolution(self, mock_bets, manifold_source, freeze_today): + """Resolved market: data truncated at resolution date, final row has resolved value.""" + freeze_today(date(2026, 1, 15)) + mock_bets.return_value = [ + make_manifold_bet(id="b1", createdTime=1768046400000, probAfter=0.4), # 2026-01-10 + make_manifold_bet(id="b2", createdTime=1768226400000, probAfter=0.6), # 2026-01-12 + make_manifold_bet(id="b3", createdTime=1768384800000, probAfter=0.9), # 2026-01-14 + ] + market = make_manifold_api_market( + isResolved=True, + resolution="YES", + resolutionTime=1768310400000, # 2026-01-13T12:00:00Z + ) + result = manifold_source._build_resolution_file( + market=market, + market_info_resolution_datetime="2026-01-13T12:00:00+00:00", + existing_df=None, + ) + + assert result is not None + # Last row should be the resolution date with resolved value + last_date = pd.to_datetime(result["date"].iloc[-1]).date() + assert last_date == date(2026, 1, 13) + assert float(result["value"].iloc[-1]) == 1.0 + # No rows after resolution date + all_dates = pd.to_datetime(result["date"]).dt.date + assert all(d <= date(2026, 1, 13) for d in all_dates) + + @patch.object(ManifoldSource, "_get_market_bets") + def test_resolved_cancel_nan_last_row(self, mock_bets, manifold_source, freeze_today): + """CANCEL resolution: last row is NaN, not forward-filled.""" + freeze_today(date(2026, 1, 15)) + mock_bets.return_value = [ + make_manifold_bet(id="b1", createdTime=1768046400000, probAfter=0.4), # 2026-01-10 + make_manifold_bet(id="b2", createdTime=1768226400000, probAfter=0.6), # 2026-01-12 + ] + market = make_manifold_api_market( + isResolved=True, + resolution="CANCEL", + resolutionTime=1768310400000, # 2026-01-13T12:00:00Z + ) + result = manifold_source._build_resolution_file( + market=market, + market_info_resolution_datetime="2026-01-13T12:00:00+00:00", + existing_df=None, + ) + + assert result is not None + # Last row (resolution date) should be NaN for CANCEL + assert np.isnan(float(result["value"].iloc[-1])) + + @patch.object(ManifoldSource, "_get_market_bets") + def test_filters_future_bets(self, mock_bets, manifold_source, freeze_today): + """Bets with date > yesterday are excluded.""" + freeze_today(date(2026, 1, 15)) + mock_bets.return_value = [ + make_manifold_bet(id="b1", createdTime=1768384800000, probAfter=0.5), # 2026-01-14 + make_manifold_bet(id="b2", createdTime=1768464000000, probAfter=0.9), # 2026-01-15 + ] + market = make_manifold_api_market() + result = manifold_source._build_resolution_file( + market=market, market_info_resolution_datetime="N/A", existing_df=None + ) + + assert result is not None + all_dates = pd.to_datetime(result["date"]).dt.date + assert all(d <= date(2026, 1, 14) for d in all_dates) + + +# --------------------------------------------------------------------------- +# _call_search_endpoint (mock requests.get) +# --------------------------------------------------------------------------- + + +class TestCallSearchEndpoint: + """Tests for ManifoldSource._call_search_endpoint.""" + + def _mock_response(self, markets): + resp = Mock() + resp.ok = True + resp.json.return_value = markets + resp.raise_for_status = Mock() + return resp + + @patch("sources.manifold.requests.get") + def test_basic_returns_qualifying_ids(self, mock_get, manifold_source, freeze_today): + """Returns IDs for markets meeting all criteria.""" + freeze_today(date(2026, 1, 15)) + mock_get.return_value = self._mock_response( + [ + make_manifold_search_result(id="a"), + make_manifold_search_result(id="b"), + ] + ) + ids = manifold_source._call_search_endpoint() + assert ids == {"a", "b"} + + @patch("sources.manifold.requests.get") + def test_filters_low_bettors(self, mock_get, manifold_source, freeze_today): + """Markets with < 17 bettors are excluded.""" + freeze_today(date(2026, 1, 15)) + mock_get.return_value = self._mock_response( + [ + make_manifold_search_result(id="low", uniqueBettorCount=16), + make_manifold_search_result(id="ok", uniqueBettorCount=17), + ] + ) + ids = manifold_source._call_search_endpoint() + assert ids == {"ok"} + + @patch("sources.manifold.requests.get") + def test_filters_low_liquidity(self, mock_get, manifold_source, freeze_today): + """Markets with < 120 liquidity are excluded.""" + freeze_today(date(2026, 1, 15)) + mock_get.return_value = self._mock_response( + [ + make_manifold_search_result(id="low", totalLiquidity=119), + make_manifold_search_result(id="ok", totalLiquidity=120), + ] + ) + ids = manifold_source._call_search_endpoint() + assert ids == {"ok"} + + @patch("sources.manifold.requests.get") + def test_filters_late_resolution(self, mock_get, manifold_source, freeze_today): + """Markets closing > 730 days from today are excluded.""" + freeze_today(date(2026, 1, 15)) + mock_get.return_value = self._mock_response( + [ + # 2029-01-01 is way past 730 days from 2026-01-15 + make_manifold_search_result(id="late", closeTime=1861920000000), + # 2025-06-01 is well within range (already past, even) + make_manifold_search_result(id="ok", closeTime=1748736000000), + ] + ) + ids = manifold_source._call_search_endpoint() + assert ids == {"ok"} + + @patch("sources.manifold.requests.get") + def test_additional_params_passed(self, mock_get, manifold_source, freeze_today): + """additional_params are merged into API request params.""" + freeze_today(date(2026, 1, 15)) + mock_get.return_value = self._mock_response([]) + manifold_source._call_search_endpoint({"topicSlug": "ai"}) + + mock_get.assert_called_once() + called_params = mock_get.call_args[1].get("params") or mock_get.call_args[0][1] + # params could be passed as keyword arg + if isinstance(called_params, dict): + assert called_params["topicSlug"] == "ai" + else: + # Check kwargs + assert mock_get.call_args.kwargs["params"]["topicSlug"] == "ai" + + +# --------------------------------------------------------------------------- +# fetch() (mock _search_markets) +# --------------------------------------------------------------------------- + + +class TestFetch: + """Tests for ManifoldSource.fetch.""" + + @patch.object(ManifoldSource, "_search_markets") + def test_basic_fetch(self, mock_search, manifold_source): + """Returns sorted ManifoldFetchFrame with correct IDs.""" + mock_search.return_value = {"id_b", "id_a", "id_c"} + dff = manifold_source.fetch() + + assert len(dff) == 3 + assert dff["id"].tolist() == ["id_a", "id_b", "id_c"] + ManifoldFetchFrame.validate(dff) + + @patch.object(ManifoldSource, "_search_markets") + def test_empty_results(self, mock_search, manifold_source): + """Empty search returns empty valid frame.""" + mock_search.return_value = set() + dff = manifold_source.fetch() + + assert len(dff) == 0 + ManifoldFetchFrame.validate(dff) + + +# --------------------------------------------------------------------------- +# update() (mock _get_market + _build_resolution_file) +# --------------------------------------------------------------------------- + + +class TestUpdate: + """Tests for ManifoldSource.update.""" + + @patch.object(ManifoldSource, "_build_resolution_file") + @patch.object(ManifoldSource, "_get_market") + def test_new_id_appended(self, mock_market, mock_build, manifold_source): + """IDs in dff not in dfq get appended with defaults.""" + mock_market.return_value = make_manifold_api_market(id="new_001") + mock_build.return_value = make_resolution_df( + [{"id": "new_001", "date": "2024-06-01", "value": 0.5}] + ) + dfq = make_question_df([{"id": "existing_001"}]) + dff = make_manifold_fetch_df([{"id": "new_001"}]) + + result = manifold_source.update(dfq, dff) + + assert "new_001" in result.dfq["id"].values + assert len(result.dfq) == 2 + new_row = result.dfq[result.dfq["id"] == "new_001"].iloc[0] + assert new_row["freeze_datetime_value_explanation"] == "The market value." + + @patch.object(ManifoldSource, "_build_resolution_file") + @patch.object(ManifoldSource, "_get_market") + def test_existing_unresolved_updated(self, mock_market, mock_build, manifold_source): + """Unresolved question fields are updated from market details.""" + mock_market.return_value = make_manifold_api_market( + id="mkt_001", + question="Updated question text", + textDescription="New background", + url="https://manifold.markets/updated", + ) + mock_build.return_value = make_resolution_df( + [{"id": "mkt_001", "date": "2024-06-01", "value": 0.65}] + ) + dfq = make_question_df([{"id": "mkt_001", "resolved": False}]) + dff = make_manifold_fetch_df([{"id": "mkt_001"}]) + + result = manifold_source.update(dfq, dff) + + row = result.dfq[result.dfq["id"] == "mkt_001"].iloc[0] + assert row["question"] == "Updated question text" + assert row["background"] == "New background" + assert row["url"] == "https://manifold.markets/updated" + + @patch.object(ManifoldSource, "_build_resolution_file") + @patch.object(ManifoldSource, "_get_market") + def test_market_becomes_resolved(self, mock_market, mock_build, manifold_source): + """Market with isResolved=True marks dfq row as resolved.""" + mock_market.return_value = make_manifold_api_market( + id="mkt_001", + isResolved=True, + resolution="YES", + resolutionTime=1768310400000, # 2026-01-13 + ) + mock_build.return_value = make_resolution_df( + [{"id": "mkt_001", "date": "2024-06-01", "value": 1.0}] + ) + dfq = make_question_df([{"id": "mkt_001", "resolved": False}]) + dff = make_manifold_fetch_df([{"id": "mkt_001"}]) + + result = manifold_source.update(dfq, dff) + + row = result.dfq[result.dfq["id"] == "mkt_001"].iloc[0] + assert bool(row["resolved"]) is True + assert row["market_info_resolution_datetime"] != "N/A" + + @patch.object(ManifoldSource, "_build_resolution_file") + @patch.object(ManifoldSource, "_get_market") + def test_resolution_file_stored(self, mock_market, mock_build, manifold_source): + """Resolution file from _build_resolution_file is in result.""" + res_df = make_resolution_df([{"id": "mkt_001", "date": "2024-06-01", "value": 0.5}]) + mock_market.return_value = make_manifold_api_market(id="mkt_001") + mock_build.return_value = res_df + dfq = make_question_df([{"id": "mkt_001", "resolved": False}]) + dff = make_manifold_fetch_df([{"id": "mkt_001"}]) + + result = manifold_source.update(dfq, dff) + + assert "mkt_001" in result.resolution_files + + @patch.object(ManifoldSource, "_build_resolution_file") + @patch.object(ManifoldSource, "_get_market") + def test_freeze_datetime_value_set(self, mock_market, mock_build, manifold_source): + """freeze_datetime_value is set to last value of resolution df.""" + mock_market.return_value = make_manifold_api_market(id="mkt_001") + mock_build.return_value = make_resolution_df( + [ + {"id": "mkt_001", "date": "2024-06-01", "value": 0.3}, + {"id": "mkt_001", "date": "2024-06-02", "value": 0.75}, + ] + ) + dfq = make_question_df([{"id": "mkt_001", "resolved": False}]) + dff = make_manifold_fetch_df([{"id": "mkt_001"}]) + + result = manifold_source.update(dfq, dff) + + row = result.dfq[result.dfq["id"] == "mkt_001"].iloc[0] + assert str(row["freeze_datetime_value"]) == "0.75" + + @patch.object(ManifoldSource, "_build_resolution_file") + @patch.object(ManifoldSource, "_get_market") + def test_get_market_returns_none_skipped(self, mock_market, mock_build, manifold_source): + """_get_market returning None skips the row.""" + mock_market.return_value = None + dfq = make_question_df([{"id": "mkt_001", "resolved": False}]) + dff = make_manifold_fetch_df([{"id": "mkt_001"}]) + + result = manifold_source.update(dfq, dff) + + mock_build.assert_not_called() + assert "mkt_001" not in (result.resolution_files or {}) + + @patch.object(ManifoldSource, "_build_resolution_file") + @patch.object(ManifoldSource, "_get_market") + def test_build_resolution_returns_none(self, mock_market, mock_build, manifold_source): + """_build_resolution_file returning None: no resolution file stored.""" + mock_market.return_value = make_manifold_api_market(id="mkt_001") + mock_build.return_value = None + dfq = make_question_df([{"id": "mkt_001", "resolved": False}]) + dff = make_manifold_fetch_df([{"id": "mkt_001"}]) + + result = manifold_source.update(dfq, dff) + + assert "mkt_001" not in (result.resolution_files or {}) + + @patch.object(ManifoldSource, "_build_resolution_file") + @patch.object(ManifoldSource, "_get_market") + def test_regenerates_missing_resolved_files(self, mock_market, mock_build, manifold_source): + """Resolved questions missing from storage get resolution files regenerated.""" + mock_market.return_value = make_manifold_api_market( + id="mkt_001", isResolved=True, resolution="YES" + ) + mock_build.return_value = make_resolution_df( + [{"id": "mkt_001", "date": "2024-06-01", "value": 1.0}] + ) + dfq = make_question_df( + [ + { + "id": "mkt_001", + "resolved": True, + "market_info_resolution_datetime": "2024-07-01T00:00:00+00:00", + } + ] + ) + dff = make_manifold_fetch_df([{"id": "mkt_001"}]) + + result = manifold_source.update(dfq, dff, files_in_storage=[]) + + assert "mkt_001" in result.resolution_files + + @patch.object(ManifoldSource, "_build_resolution_file") + @patch.object(ManifoldSource, "_get_market") + def test_skips_resolved_already_in_storage(self, mock_market, mock_build, manifold_source): + """Resolved questions with files in storage are not re-fetched.""" + dfq = make_question_df( + [ + { + "id": "mkt_001", + "resolved": True, + "market_info_resolution_datetime": "2024-07-01T00:00:00+00:00", + } + ] + ) + dff = make_manifold_fetch_df([{"id": "mkt_001"}]) + + result = manifold_source.update(dfq, dff, files_in_storage=["manifold/mkt_001.jsonl"]) + + # _get_market should not be called for the resolved question + mock_market.assert_not_called() + assert "mkt_001" not in (result.resolution_files or {}) + + @patch.object(ManifoldSource, "_build_resolution_file") + @patch.object(ManifoldSource, "_get_market") + def test_output_schema_valid(self, mock_market, mock_build, manifold_source): + """Output dfq passes QuestionFrame validation.""" + mock_market.return_value = make_manifold_api_market(id="new_001") + mock_build.return_value = make_resolution_df( + [{"id": "new_001", "date": "2024-06-01", "value": 0.5}] + ) + dfq = make_question_df([{"id": "existing_001"}]) + dff = make_manifold_fetch_df([{"id": "new_001"}]) + + result = manifold_source.update(dfq, dff) + QuestionFrame.validate(result.dfq) + + +# --------------------------------------------------------------------------- +# _get_market_bets (mock requests.get) +# --------------------------------------------------------------------------- + + +class TestGetMarketBets: + """Tests for ManifoldSource._get_market_bets.""" + + def _mock_response(self, bets): + resp = Mock() + resp.ok = True + resp.json.return_value = bets + resp.raise_for_status = Mock() + return resp + + @patch("sources.manifold.requests.get") + def test_single_page(self, mock_get, manifold_source): + """Returns all bets from a single page (< limit).""" + bets = [make_manifold_bet(id=f"b{i}", createdTime=1768046400000) for i in range(5)] + mock_get.return_value = self._mock_response(bets) + + result = manifold_source._get_market_bets("mkt_001") + + assert len(result) == 5 + assert mock_get.call_count == 1 + + @patch("sources.manifold.requests.get") + def test_pagination(self, mock_get, manifold_source): + """Multiple pages fetched until empty page.""" + page1 = [make_manifold_bet(id=f"b{i}", createdTime=1768046400000) for i in range(1000)] + page2 = [ + make_manifold_bet(id=f"b{i}", createdTime=1768046400000) for i in range(1000, 1500) + ] + mock_get.side_effect = [ + self._mock_response(page1), + self._mock_response(page2), + ] + + result = manifold_source._get_market_bets("mkt_001") + + assert len(result) == 1500 + + @patch("sources.manifold.requests.get") + def test_stops_at_benchmark_start(self, mock_get, manifold_source): + """Stops paginating when last bet's createdTime < BENCHMARK_START_DATE_EPOCHTIME_MS.""" + # Use a createdTime before benchmark start (2024-05-01) + old_epoch_ms = 1704067200000 # 2024-01-01 (before benchmark) + bets = [make_manifold_bet(id=f"b{i}", createdTime=old_epoch_ms) for i in range(1000)] + mock_get.return_value = self._mock_response(bets) + + result = manifold_source._get_market_bets("mkt_001") + + assert len(result) == 1000 + # Only one call — stops because last bet is before benchmark start + assert mock_get.call_count == 1 + + @patch("sources.manifold.requests.get") + def test_pagination_sets_before_param(self, mock_get, manifold_source): + """Second request includes before= from first page.""" + page1 = [make_manifold_bet(id=f"b{i}", createdTime=1768046400000) for i in range(1000)] + page2 = [make_manifold_bet(id="b1000", createdTime=1768046400000)] + mock_get.side_effect = [ + self._mock_response(page1), + self._mock_response(page2), + ] + + manifold_source._get_market_bets("mkt_001") + + # Second call should have before=last bet's id from page1 + second_call_params = ( + mock_get.call_args_list[1][1].get("params") or mock_get.call_args_list[1][0][1] + ) + assert second_call_params.get("before") == "b999" + + @patch("sources.manifold.requests.get") + def test_empty_first_page(self, mock_get, manifold_source): + """No bets returns empty list.""" + mock_get.return_value = self._mock_response([]) + + result = manifold_source._get_market_bets("mkt_001") + + assert result == [] + assert mock_get.call_count == 1 From d531233b3665a6f1e20c1c8110cc5624b814b88f Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sat, 16 May 2026 14:13:34 +0300 Subject: [PATCH 02/11] fix: ensure manifold's fetch timeout is consistent with its `max_time` backoff --- src/orchestration/func_manifold_fetch/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/orchestration/func_manifold_fetch/Makefile b/src/orchestration/func_manifold_fetch/Makefile index 09a7aa84..0b123cb9 100644 --- a/src/orchestration/func_manifold_fetch/Makefile +++ b/src/orchestration/func_manifold_fetch/Makefile @@ -35,7 +35,7 @@ deploy : .gcloudignore requirements.txt Dockerfile --region $(CLOUD_DEPLOY_REGION) \ --tasks 1 \ --parallelism 1 \ - --task-timeout 60s \ + --task-timeout 560s \ --memory 512Mi \ --max-retries 0 \ --service-account $(QUESTION_BANK_BUCKET_SERVICE_ACCOUNT) \ From d3747335485deb5e0845ff1e7746e422308204cb Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sat, 16 May 2026 14:28:05 +0300 Subject: [PATCH 03/11] format --- src/sources/manifold.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sources/manifold.py b/src/sources/manifold.py index 6b52b175..e27677c3 100644 --- a/src/sources/manifold.py +++ b/src/sources/manifold.py @@ -388,6 +388,7 @@ def _finalize_resolution_df(df: pd.DataFrame) -> DataFrame[ResolutionFrame]: @staticmethod def _get_resolved_market_value(market: dict) -> float: """Map resolution outcome to numeric value. + YES -> 1, NO -> 0, MKT -> market probability, CANCEL -> NaN """ return {"YES": 1, "NO": 0, "MKT": market["resolutionProbability"]}.get( From 659721af7ccfd58e2f36192877a3294fa504c1d0 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sat, 16 May 2026 21:34:04 +0300 Subject: [PATCH 04/11] fix: load all existing resolution files, not just those in dff; matches old code --- src/orchestration/func_manifold_update/main.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/orchestration/func_manifold_update/main.py b/src/orchestration/func_manifold_update/main.py index c3642815..a152a0dc 100644 --- a/src/orchestration/func_manifold_update/main.py +++ b/src/orchestration/func_manifold_update/main.py @@ -26,9 +26,10 @@ def driver(_: Any) -> None: ) logger.info("Loading existing resolution files...") - existing_resolution_files = _source_io.load_existing_resolution_files( - SOURCE, ids=dff["id"].astype(str).tolist() - ) + # No ids= filter: load ALL existing resolution files so that we + # matching the old non-refactored behaviour. + # TODO: we can drop pre-benchmark start date history/ids. + existing_resolution_files = _source_io.load_existing_resolution_files(SOURCE) logger.info(f"Loaded {len(existing_resolution_files)} resolution files") files_in_storage = gcp.storage.list_with_prefix( From f8e585c22667f31eb9b2ceab99d61cf7a3a0bab0 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sat, 16 May 2026 21:34:30 +0300 Subject: [PATCH 05/11] fix: skip writing resolution files that have not changed (saves a ton of IO) --- src/sources/manifold.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/sources/manifold.py b/src/sources/manifold.py index e27677c3..171770ae 100644 --- a/src/sources/manifold.py +++ b/src/sources/manifold.py @@ -161,6 +161,13 @@ def update( if df_res is not None: resolution_files[row["id"]] = df_res dfq.at[index, "freeze_datetime_value"] = df_res["value"].iloc[-1] + # if rebuilt, then write; else - skip + if df_res is not existing_df: + logger.info(f"Rebuilt, will write - id={row['id']}") + resolution_files[row["id"]] = df_res + else: + logger.info(f"Skipped writing to resolution files, not changed -id={row['id']}") + # --- Regenerate missing resolution files for resolved questions --- for _index, row in dfq[dfq["resolved"]].iterrows(): From 37b355139193164394f93d7e57a3785fdec4d890 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sat, 16 May 2026 21:34:48 +0300 Subject: [PATCH 06/11] format --- src/sources/manifold.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sources/manifold.py b/src/sources/manifold.py index 171770ae..3f66588d 100644 --- a/src/sources/manifold.py +++ b/src/sources/manifold.py @@ -168,7 +168,6 @@ def update( else: logger.info(f"Skipped writing to resolution files, not changed -id={row['id']}") - # --- Regenerate missing resolution files for resolved questions --- for _index, row in dfq[dfq["resolved"]].iterrows(): filename = f"{self.name}/{row['id']}.jsonl" From 13f88b2b31a85d75b2d97f8601473c2ab2e279ff Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Fri, 22 May 2026 18:36:18 +0300 Subject: [PATCH 07/11] fixes --- src/orchestration/func_manifold_fetch/requirements.txt | 1 - src/orchestration/func_manifold_update/requirements.txt | 1 - src/sources/manifold.py | 9 ++++----- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/orchestration/func_manifold_fetch/requirements.txt b/src/orchestration/func_manifold_fetch/requirements.txt index 1d2a4d8b..37337e71 100644 --- a/src/orchestration/func_manifold_fetch/requirements.txt +++ b/src/orchestration/func_manifold_fetch/requirements.txt @@ -7,4 +7,3 @@ requests certifi backoff numpy -pyarrow diff --git a/src/orchestration/func_manifold_update/requirements.txt b/src/orchestration/func_manifold_update/requirements.txt index 1d2a4d8b..37337e71 100644 --- a/src/orchestration/func_manifold_update/requirements.txt +++ b/src/orchestration/func_manifold_update/requirements.txt @@ -7,4 +7,3 @@ requests certifi backoff numpy -pyarrow diff --git a/src/sources/manifold.py b/src/sources/manifold.py index 3f66588d..7cda3d38 100644 --- a/src/sources/manifold.py +++ b/src/sources/manifold.py @@ -135,7 +135,7 @@ def update( # Assign market details to dfq row dfq.at[index, "question"] = market["question"] - dfq.at[index, "background"] = market.get("textDescription", "") + dfq.at[index, "background"] = market["textDescription"] dfq.at[index, "market_info_resolution_criteria"] = "N/A" dfq.at[index, "market_info_open_datetime"] = dates.convert_epoch_time_in_ms_to_iso( market["createdTime"] @@ -143,7 +143,7 @@ def update( dfq.at[index, "market_info_close_datetime"] = dates.convert_epoch_time_in_ms_to_iso( market["closeTime"] ) - dfq.at[index, "url"] = market.get("url", "") + dfq.at[index, "url"] = market["url"] if market["isResolved"]: dfq.at[index, "resolved"] = True dfq.at[index, "market_info_resolution_datetime"] = ( @@ -159,7 +159,6 @@ def update( existing_df=existing_df, ) if df_res is not None: - resolution_files[row["id"]] = df_res dfq.at[index, "freeze_datetime_value"] = df_res["value"].iloc[-1] # if rebuilt, then write; else - skip if df_res is not existing_df: @@ -256,7 +255,7 @@ def _search_markets(self) -> set[str]: base=2, on_backoff=data_utils.print_error_info_handler, ) - def _get_market(self, market_id: str) -> dict | None: + def _get_market(self, market_id: str) -> dict: """Fetch full market details from /market/{id}.""" logger.info(f"Calling market endpoint for {market_id}") endpoint = f"{_MANIFOLD_API_BASE}/market/{market_id}" @@ -330,7 +329,7 @@ def _build_resolution_file( if ( existing_df is not None and not existing_df.empty - and pd.to_datetime(existing_df["date"].iloc[-1]).date() >= yesterday + and pd.to_datetime(existing_df["date"].max()).date() >= yesterday ): return existing_df From 419ff0b0eb1374ecf83f46d2535b38746798944b Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Mon, 25 May 2026 14:03:39 +0300 Subject: [PATCH 08/11] fix: remove stale None guard and associated test --- src/sources/manifold.py | 4 ---- src/tests/test_manifold.py | 13 ------------- 2 files changed, 17 deletions(-) diff --git a/src/sources/manifold.py b/src/sources/manifold.py index 7cda3d38..c4910bd9 100644 --- a/src/sources/manifold.py +++ b/src/sources/manifold.py @@ -130,8 +130,6 @@ def update( dfq["resolved"] = dfq["resolved"].astype(bool) for index, row in dfq[~dfq["resolved"]].iterrows(): market = self._get_market(row["id"]) - if market is None: - continue # Assign market details to dfq row dfq.at[index, "question"] = market["question"] @@ -172,8 +170,6 @@ def update( filename = f"{self.name}/{row['id']}.jsonl" if filename not in files_in_storage and row["id"] not in resolution_files: market = self._get_market(row["id"]) - if market is None: - continue df_res = self._build_resolution_file( market=market, market_info_resolution_datetime=row["market_info_resolution_datetime"], diff --git a/src/tests/test_manifold.py b/src/tests/test_manifold.py index 184b042f..c947d34f 100644 --- a/src/tests/test_manifold.py +++ b/src/tests/test_manifold.py @@ -477,19 +477,6 @@ def test_freeze_datetime_value_set(self, mock_market, mock_build, manifold_sourc row = result.dfq[result.dfq["id"] == "mkt_001"].iloc[0] assert str(row["freeze_datetime_value"]) == "0.75" - @patch.object(ManifoldSource, "_build_resolution_file") - @patch.object(ManifoldSource, "_get_market") - def test_get_market_returns_none_skipped(self, mock_market, mock_build, manifold_source): - """_get_market returning None skips the row.""" - mock_market.return_value = None - dfq = make_question_df([{"id": "mkt_001", "resolved": False}]) - dff = make_manifold_fetch_df([{"id": "mkt_001"}]) - - result = manifold_source.update(dfq, dff) - - mock_build.assert_not_called() - assert "mkt_001" not in (result.resolution_files or {}) - @patch.object(ManifoldSource, "_build_resolution_file") @patch.object(ManifoldSource, "_get_market") def test_build_resolution_returns_none(self, mock_market, mock_build, manifold_source): From 198d65d94df2a77a58e98a36ff28bd334bc41caa Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Mon, 25 May 2026 14:33:25 +0300 Subject: [PATCH 09/11] fix: condition up to date check for res file based on resolution/date --- src/sources/manifold.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/sources/manifold.py b/src/sources/manifold.py index c4910bd9..003a3265 100644 --- a/src/sources/manifold.py +++ b/src/sources/manifold.py @@ -322,12 +322,19 @@ def _build_resolution_file( market_id = market["id"] # --- Already up-to-date check --- - if ( - existing_df is not None - and not existing_df.empty - and pd.to_datetime(existing_df["date"].max()).date() >= yesterday - ): - return existing_df + # If resolved: must extend through the resolution date (so the resolution row is present). + # If unresolved: must extend through yesterday. + # Without the isResolved branch, a same-day rerun after a market resolves would silently + # keep the stale file (last_date == yesterday < resolution_date == today). + if existing_df is not None and not existing_df.empty: + last_date = pd.to_datetime(existing_df["date"].max()).date() + cutoff = ( + pd.Timestamp(market_info_resolution_datetime).date() + if market["isResolved"] + else yesterday + ) + if last_date >= cutoff: + return existing_df # --- Fetch bets and build daily series --- forecasts = self._get_market_bets(market_id) From bb153929b736bb288d25022c2f7efbd3f5a94042 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Mon, 25 May 2026 14:33:37 +0300 Subject: [PATCH 10/11] fix: logger info for additional params --- src/sources/manifold.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sources/manifold.py b/src/sources/manifold.py index 003a3265..5f2dd359 100644 --- a/src/sources/manifold.py +++ b/src/sources/manifold.py @@ -204,7 +204,7 @@ def _call_search_endpoint(self, additional_params: dict | None = None) -> set[st } if additional_params: params.update(additional_params) - logger.info(f"Calling {endpoint} with additional params {additional_params}") + logger.info(f"Calling {endpoint} with additional params: {additional_params}") response = requests.get(endpoint, params=params, verify=certifi.where()) if not response.ok: From b698b422ce7b1d6f214a511377a9a9384b58e4ae Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Mon, 25 May 2026 14:57:52 +0300 Subject: [PATCH 11/11] fix: set `max_resolution_date` once on the `fetch()` surface --- src/sources/manifold.py | 40 ++++++++++++++++++++++++++++---------- src/tests/test_manifold.py | 13 ++++++++----- 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/src/sources/manifold.py b/src/sources/manifold.py index 5f2dd359..9adbdf01 100644 --- a/src/sources/manifold.py +++ b/src/sources/manifold.py @@ -3,7 +3,7 @@ from __future__ import annotations import logging -from datetime import timedelta +from datetime import date, timedelta from typing import Any, ClassVar import backoff @@ -76,13 +76,28 @@ class ManifoldSource(MarketSource): # ------------------------------------------------------------------ @pa.check_types - def fetch(self, **kwargs: Any) -> DataFrame[ManifoldFetchFrame]: + def fetch( + self, + *, + max_resolution_date: date | None = None, + **kwargs: Any, + ) -> DataFrame[ManifoldFetchFrame]: """Fetch market IDs from Manifold search-markets endpoint. Calls search-markets (1 global + N topic slugs), filters by min bettors, min liquidity, and max resolution date. + + Args: + max_resolution_date (date | None): Cutoff for market close date. + Defaults to today + ``_MAX_RESOLUTION_DATE_IN_DAYS``. Computed + once here and threaded through so the inner endpoint calls + share the same cutoff instead of each recomputing "today". """ - ids = self._search_markets() + if max_resolution_date is None: + max_resolution_date = dates.get_date_today() + timedelta( + days=_MAX_RESOLUTION_DATE_IN_DAYS + ) + ids = self._search_markets(max_resolution_date=max_resolution_date) logger.info(f"Discovered {len(ids)} candidate market IDs from search.") return pd.DataFrame({"id": sorted(ids)}) @@ -193,7 +208,12 @@ def update( max_time=500, on_backoff=data_utils.print_error_info_handler, ) - def _call_search_endpoint(self, additional_params: dict | None = None) -> set[str]: + def _call_search_endpoint( + self, + *, + max_resolution_date: date, + additional_params: dict | None = None, + ) -> set[str]: """Call search-markets and return qualifying market IDs.""" endpoint = f"{_MANIFOLD_API_BASE}/search-markets" params: dict[str, Any] = { @@ -214,9 +234,6 @@ def _call_search_endpoint(self, additional_params: dict | None = None) -> set[st ) response.raise_for_status() - today = dates.get_date_today() - max_resolution_date = today + timedelta(days=_MAX_RESOLUTION_DATE_IN_DAYS) - def resolves_by(close_time_epoch_ms: int) -> bool: close_sec = min(close_time_epoch_ms / 1000, dates.MAX_EPOCH_SEC) close_date = dates.convert_epoch_time_in_sec_to_datetime(close_sec).date() @@ -230,12 +247,15 @@ def resolves_by(close_time_epoch_ms: int) -> bool: and resolves_by(market["closeTime"]) } - def _search_markets(self) -> set[str]: + def _search_markets(self, *, max_resolution_date: date) -> set[str]: """Discover market IDs across all topic slugs.""" logger.info("Calling Manifold search-markets endpoint") - ids = self._call_search_endpoint() + ids = self._call_search_endpoint(max_resolution_date=max_resolution_date) for topic in _TOPIC_SLUGS: - ids |= self._call_search_endpoint({"topicSlug": topic}) + ids |= self._call_search_endpoint( + max_resolution_date=max_resolution_date, + additional_params={"topicSlug": topic}, + ) return ids # ------------------------------------------------------------------ diff --git a/src/tests/test_manifold.py b/src/tests/test_manifold.py index c947d34f..58332a2e 100644 --- a/src/tests/test_manifold.py +++ b/src/tests/test_manifold.py @@ -284,7 +284,7 @@ def test_basic_returns_qualifying_ids(self, mock_get, manifold_source, freeze_to make_manifold_search_result(id="b"), ] ) - ids = manifold_source._call_search_endpoint() + ids = manifold_source._call_search_endpoint(max_resolution_date=date(2028, 1, 14)) assert ids == {"a", "b"} @patch("sources.manifold.requests.get") @@ -297,7 +297,7 @@ def test_filters_low_bettors(self, mock_get, manifold_source, freeze_today): make_manifold_search_result(id="ok", uniqueBettorCount=17), ] ) - ids = manifold_source._call_search_endpoint() + ids = manifold_source._call_search_endpoint(max_resolution_date=date(2028, 1, 14)) assert ids == {"ok"} @patch("sources.manifold.requests.get") @@ -310,7 +310,7 @@ def test_filters_low_liquidity(self, mock_get, manifold_source, freeze_today): make_manifold_search_result(id="ok", totalLiquidity=120), ] ) - ids = manifold_source._call_search_endpoint() + ids = manifold_source._call_search_endpoint(max_resolution_date=date(2028, 1, 14)) assert ids == {"ok"} @patch("sources.manifold.requests.get") @@ -325,7 +325,7 @@ def test_filters_late_resolution(self, mock_get, manifold_source, freeze_today): make_manifold_search_result(id="ok", closeTime=1748736000000), ] ) - ids = manifold_source._call_search_endpoint() + ids = manifold_source._call_search_endpoint(max_resolution_date=date(2028, 1, 14)) assert ids == {"ok"} @patch("sources.manifold.requests.get") @@ -333,7 +333,10 @@ def test_additional_params_passed(self, mock_get, manifold_source, freeze_today) """additional_params are merged into API request params.""" freeze_today(date(2026, 1, 15)) mock_get.return_value = self._mock_response([]) - manifold_source._call_search_endpoint({"topicSlug": "ai"}) + manifold_source._call_search_endpoint( + max_resolution_date=date(2028, 1, 14), + additional_params={"topicSlug": "ai"}, + ) mock_get.assert_called_once() called_params = mock_get.call_args[1].get("params") or mock_get.call_args[0][1]