Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ metaculus-update-questions:
infer: infer-fetch infer-update-questions

infer-fetch:
$(MAKE) -C src/questions/infer/fetch || echo "* $@" >> $(MAKE_FAILURE_LOG)
$(MAKE) -C src/orchestration/func_infer_fetch || echo "* $@" >> $(MAKE_FAILURE_LOG)

infer-update-questions:
$(MAKE) -C src/questions/infer/update_questions || echo "* $@" >> $(MAKE_FAILURE_LOG)
$(MAKE) -C src/orchestration/func_infer_update || echo "* $@" >> $(MAKE_FAILURE_LOG)

acled: acled-fetch acled-update-questions

Expand Down
23 changes: 23 additions & 0 deletions src/_fb_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,26 @@ class SourceQuestionBank:


QuestionBank = dict[str, SourceQuestionBank]


@dataclass
class UpdateResult:
"""Return value of a source's update() method.

Validates contents on construction: dfq must be a valid QuestionFrame,
each resolution file must be a valid ResolutionFrame.
"""

dfq: pd.DataFrame
resolution_files: dict[str, pd.DataFrame] | None = None
hash_mapping: dict[str, dict] | None = None

def __post_init__(self):
"""Validate schema constraints."""
from _schemas import QuestionFrame, ResolutionFrame

self.dfq = QuestionFrame.validate(self.dfq)
if self.resolution_files:
self.resolution_files = {
qid: ResolutionFrame.validate(df) for qid, df in self.resolution_files.items()
}
8 changes: 8 additions & 0 deletions src/_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ class ResolveReadyFrame(ExplodedQuestionSetFrame):
market_value_on_due_date: Series[float] = pa.Field(nullable=True)


class InferFetchFrame(QuestionFrame):
"""Output of InferSource.fetch(). QuestionFrame plus transient fields for update()."""

fetch_datetime: Series[str]
probability: Series[object] = pa.Field(nullable=True)
nullify_question: Series[bool]


class AcledResolutionFrame(pa.DataFrameModel):
"""ACLED-specific: aggregated events by country and date.

Expand Down
20 changes: 7 additions & 13 deletions src/helpers/acled.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@
import numpy as np
import pandas as pd

from sources._metadata import SOURCE_METADATA

from . import data_utils

SOURCE_INTRO = SOURCE_METADATA["acled"]["source_intro"]
RESOLUTION_CRITERIA = SOURCE_METADATA["acled"]["resolution_criteria"]

source = "acled"

# Lazy import to avoid circular imports at module level
Expand All @@ -17,9 +22,9 @@
def _get_source():
global _source
if _source is None:
from sources import SOURCES
from sources.acled import AcledSource

_source = SOURCES[source]
_source = AcledSource()
return _source


Expand Down Expand Up @@ -84,17 +89,6 @@ def upload_hash_mapping():
https://acleddata.com/knowledge-base/codebook/#acled-events
"""

SOURCE_INTRO = (
"The Armed Conflict Location & Event Data Project (ACLED) collects real-time data on the "
"locations, dates, actors, fatalities, and types of all reported political violence and "
"protest events around the world. You're going to predict how questions based on this data "
"will resolve."
)

RESOLUTION_CRITERIA = (
"Resolves to the value calculated from the ACLED dataset once the data is published."
)


def read_dff(local_question_bank_dir=None) -> pd.DataFrame:
"""
Expand Down
14 changes: 5 additions & 9 deletions src/helpers/dbnomics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
"""DBnomics-specific variables."""

from sources._metadata import SOURCE_METADATA

SOURCE_INTRO = SOURCE_METADATA["dbnomics"]["source_intro"]
RESOLUTION_CRITERIA = SOURCE_METADATA["dbnomics"]["resolution_criteria"]

FETCH_COLUMN_DTYPE = {
"id": str,
"period": str,
Expand All @@ -9,15 +14,6 @@
}
FETCH_COLUMNS = list(FETCH_COLUMN_DTYPE.keys())

SOURCE_INTRO = (
"DBnomics collects data on topics such as population and living conditions, "
"environment and energy, agriculture, finance, trade and others from publicly available "
"resources, for example national and international statistical institutions, researchers and "
"private companies. You're going to predict how questions based on this data will resolve."
)

RESOLUTION_CRITERIA = "Resolves to the value found at {url} once the data is published."

METEOFRANCE_STATIONS = [
{"id": "07005", "station": "Abbeville"},
{"id": "07015", "station": "Lille Airport"},
Expand Down
13 changes: 5 additions & 8 deletions src/helpers/fred.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@

sys.path.append(os.path.join(os.path.dirname(__file__), ".."))

from sources.fred import NULLIFIED_IDS # noqa: F401, E402
from sources._metadata import SOURCE_METADATA # noqa: E402

SOURCE_INTRO = (
"The Federal Reserve Economic Data database (FRED) provides economic data from national, "
"international, public, and private sources.You're going to predict how questions based on "
"this data will resolve."
)

RESOLUTION_CRITERIA = "Resolves to the value found at {url} once the data is published."
_META = SOURCE_METADATA["fred"]
SOURCE_INTRO = _META["source_intro"]
RESOLUTION_CRITERIA = _META["resolution_criteria"]
NULLIFIED_IDS = [nq.id for nq in _META["nullified_questions"]]

# flake8: noqa: B950

Expand Down
11 changes: 4 additions & 7 deletions src/helpers/infer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
"""Infer-specific variables."""
"""Infer-specific variables. Delegates to sources._metadata."""

SOURCE_INTRO = (
"We would like you to predict the outcome of a prediction market. A prediction market, in this "
"context, is the aggregate of predictions submitted by users on the website INFER Public. "
"You're going to predict the probability that the market will resolve as 'Yes'."
)
from sources._metadata import SOURCE_METADATA

RESOLUTION_CRITERIA = "Resolves to the outcome of the question found at {url}."
SOURCE_INTRO = SOURCE_METADATA["infer"]["source_intro"]
RESOLUTION_CRITERIA = SOURCE_METADATA["infer"]["resolution_criteria"]
11 changes: 4 additions & 7 deletions src/helpers/manifold.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
"""Manifold-specific variables."""
"""Manifold-specific variables. Delegates to sources._metadata."""

SOURCE_INTRO = (
"We would like you to predict the outcome of a prediction market. A prediction market, in this "
"context, is the aggregate of predictions submitted by users on the website Manifold. "
"You're going to predict the probability that the market will resolve as 'Yes'."
)
from sources._metadata import SOURCE_METADATA

RESOLUTION_CRITERIA = "Resolves to the outcome of the question found at {url}."
SOURCE_INTRO = SOURCE_METADATA["manifold"]["source_intro"]
RESOLUTION_CRITERIA = SOURCE_METADATA["manifold"]["resolution_criteria"]
14 changes: 5 additions & 9 deletions src/helpers/metaculus.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
"""Metaculus-specific variables."""

from sources._metadata import SOURCE_METADATA

SOURCE_INTRO = SOURCE_METADATA["metaculus"]["source_intro"]
RESOLUTION_CRITERIA = SOURCE_METADATA["metaculus"]["resolution_criteria"]

CATEGORIES = [
"artificial-intelligence",
"computing-and-math",
Expand All @@ -16,12 +21,3 @@
"sports-entertainment",
"technology",
]

SOURCE_INTRO = (
"We would like you to predict the outcome of a prediction market. A prediction market, in this "
"context, is the aggregate of predictions submitted by users on the website Metaculus. "
"You're going to predict the probability that the market will resolve as 'Yes'."
)


RESOLUTION_CRITERIA = "Resolves to the outcome of the question found at {url}."
13 changes: 5 additions & 8 deletions src/helpers/polymarket.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@

sys.path.append(os.path.join(os.path.dirname(__file__), ".."))

from sources.polymarket import NULLIFIED_QUESTION_IDS # noqa: F401, E402
from sources._metadata import SOURCE_METADATA # noqa: E402

SOURCE_INTRO = (
"We would like you to predict the outcome of a prediction market. A prediction market, in this "
"context, is the aggregate of predictions submitted by users on the website Polymarket. "
"You're going to predict the probability that the market will resolve as 'Yes'."
)

RESOLUTION_CRITERIA = "Resolves to the outcome of the question found at {url}."
_META = SOURCE_METADATA["polymarket"]
SOURCE_INTRO = _META["source_intro"]
RESOLUTION_CRITERIA = _META["resolution_criteria"]
NULLIFIED_QUESTION_IDS = {nq.id for nq in _META["nullified_questions"]}
16 changes: 6 additions & 10 deletions src/helpers/wikipedia.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

sys.path.append(os.path.join(os.path.dirname(__file__), ".."))

from sources._metadata import SOURCE_METADATA # noqa: E402
from sources.wikipedia import _IDS_TO_NULLIFY as IDS_TO_NULLIFY # noqa: F401, E402
from sources.wikipedia import ( # noqa: F401, E402
_TRANSFORM_ID_MAPPING as transform_id_mapping,
Expand All @@ -20,6 +21,9 @@

from . import constants # noqa: E402

SOURCE_INTRO = SOURCE_METADATA["wikipedia"]["source_intro"]
RESOLUTION_CRITERIA = SOURCE_METADATA["wikipedia"]["resolution_criteria"]

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

Expand All @@ -43,20 +47,12 @@
def _get_source():
global _source
if _source is None:
from sources import SOURCES
from sources.wikipedia import WikipediaSource

_source = SOURCES[source]
_source = WikipediaSource()
return _source


SOURCE_INTRO = (
"Wikipedia is an online encyclopedia created and edited by volunteers. You're going to predict "
"how questions based on data sourced from Wikipedia will resolve."
)

RESOLUTION_CRITERIA = "Resolves to the value calculated from {url} on the resolution date."


def transform_id(wid):
"""Transform old id to new id."""
return _get_source()._transform_id(wid)
Expand Down
15 changes: 4 additions & 11 deletions src/helpers/yfinance.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
"""Yfinance-specific variables."""
"""Yfinance-specific variables. Delegates to sources._metadata."""

SOURCE_INTRO = (
"Yahoo Finance provides financial data on stocks, bonds, and currencies and also offers news, "
"commentary and tools for personal financial management. You're going to predict how questions "
"based on this data will resolve."
)
from sources._metadata import SOURCE_METADATA

RESOLUTION_CRITERIA = (
"Resolves to the market close price at {url} for the resolution date. If the resolution date "
"coincides with a day the market is closed (weekend, holiday, etc.) the previous market close "
"price is used."
)
SOURCE_INTRO = SOURCE_METADATA["yfinance"]["source_intro"]
RESOLUTION_CRITERIA = SOURCE_METADATA["yfinance"]["resolution_criteria"]
2 changes: 1 addition & 1 deletion src/orchestration/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
"""Orchestration layer for resolve pipeline."""
"""Orchestration layer."""
108 changes: 108 additions & 0 deletions src/orchestration/_source_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""Shared IO helpers for source fetch/update orchestration."""

from __future__ import annotations

import json
import logging
import os
from typing import Iterable

import pandas as pd

from helpers import constants, data_utils, env
from utils import gcp

logger = logging.getLogger(__name__)


def write_fetch_output(source: str, dff: pd.DataFrame) -> None:
"""Write fetch DataFrame to <source>_fetch.jsonl and upload.

Args:
source (str): Source name (e.g. "infer").
dff (pd.DataFrame): Fetched data to write.
"""
filenames = data_utils.generate_filenames(source)
local = filenames["local_fetch"]
with open(local, "w", encoding="utf-8") as f:
for record in dff.to_dict(orient="records"):
f.write(json.dumps(record, ensure_ascii=False) + "\n")
logger.info(f"Uploading {filenames['jsonl_fetch']} to GCP...")
gcp.storage.upload(
bucket_name=env.QUESTION_BANK_BUCKET,
local_filename=local,
)


def load_existing_resolution_files(
source: str,
ids: Iterable[str] | None = None,
) -> dict[str, pd.DataFrame]:
"""Download <source>/<id>.jsonl resolution files.

If ids is given, download only those. If ids is None, list the bucket and
download every .jsonl under <source>/ — use sparingly, scales with backlog.

Args:
source (str): Source name (e.g. "infer").
ids (Iterable[str] | None): Specific question IDs to load. If None,
load every resolution file present in the bucket for this source.

Returns:
dict mapping question_id to its resolution DataFrame.
"""
if ids is None:
paths = gcp.storage.list_with_prefix(
bucket_name=env.QUESTION_BANK_BUCKET, prefix=f"{source}/"
)
question_ids = [
os.path.basename(p).removesuffix(".jsonl") for p in paths if p.endswith(".jsonl")
]
else:
question_ids = [str(qid) for qid in ids]

result: dict[str, pd.DataFrame] = {}
for question_id in question_ids:
basename = f"{question_id}.jsonl"
remote_path = f"{source}/{basename}"
local_filename = f"/tmp/{source}_{basename}"

gcp.storage.download_no_error_message_on_404(
bucket_name=env.QUESTION_BANK_BUCKET,
filename=remote_path,
local_filename=local_filename,
)
if os.path.exists(local_filename):
Comment thread
nikbpetrov marked this conversation as resolved.
df = pd.read_json(
local_filename,
lines=True,
dtype=constants.RESOLUTION_FILE_COLUMN_DTYPE,
convert_dates=False,
)
if not df.empty:
result[question_id] = df
logger.info(f"Loaded {len(result)} existing resolution files for {source}.")
return result


def upload_resolution_files(source: str, resolution_files: dict[str, pd.DataFrame]) -> None:
"""Upload per-question resolution files to <source>/<id>.jsonl.

Args:
source (str): Source name (e.g. "infer").
resolution_files (dict): Mapping of question_id to resolution DataFrame.
"""
for question_id, df in resolution_files.items():
basename = f"{question_id}.jsonl"
remote_filename = f"{source}/{basename}"
local_filename = f"/tmp/{basename}"

df[["id", "date", "value"]].to_json(
local_filename, orient="records", lines=True, date_format="iso"
)
gcp.storage.upload(
bucket_name=env.QUESTION_BANK_BUCKET,
local_filename=local_filename,
filename=remote_filename,
)
logger.info(f"Uploaded {len(resolution_files)} resolution files for {source}.")
Loading
Loading