Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "cosmap"
version = "0.3.2"
description = ""
authors = [{ name = "Patrick Wells", email = "patrick@astropatty.com" }]
requires-python = ">=3.11"
requires-python = ">=3.11,<3.14"
readme = "README.md"
classifiers = [
"Programming Language :: Python :: 3",
Expand Down
1 change: 0 additions & 1 deletion src/cosmap/analysis/known_analyses.json

This file was deleted.

19 changes: 17 additions & 2 deletions src/cosmap/config/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

from pathlib import Path
from types import ModuleType
from typing import Optional

from pydantic import BaseModel, Field, validator
from pydantic import BaseModel, Field, model_validator, validator

from cosmap.config.models import sky

Expand Down Expand Up @@ -92,8 +93,22 @@ class CosmapDatasetParameters(BaseModel):
heinlein, which is optimized for large survey datasets.
"""

dataset_name: str
dataset_name: Optional[str] = None
dataset_path: Optional[Path] = None
dataset_wrapper: str = "heinlein"
dataset_columns: Optional[list[str]] = None

@model_validator(mode="after")
def validate_wrapper(self):
if self.dataset_wrapper == "heinlein" and self.dataset_name is None:
raise ValueError(
"When using the heinlein wrapper, a dataset name must be set"
)
if self.dataset_wrapper == "opencosmo" and self.dataset_path is None:
raise ValueError(
"When using the opencosmo wrapper, a dataset path must be set"
)
return self


class CosmapOutputParameters(BaseModel):
Expand Down
1 change: 0 additions & 1 deletion src/cosmap/config/models/sky.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def sky_coord_validator(v: dict, *args, **kwargs) -> SkC:


def sky_coord_serializer(value):
print(value)
return {
"coordinate": [value.ra.value, value.dec.value],
"units": [value.ra.unit.to_string(), value.dec.unit.to_string()],
Expand Down
67 changes: 67 additions & 0 deletions src/cosmap/dataset/opencosmo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from pathlib import Path
from typing import Optional

import astropy.units as u
import opencosmo as oc
from astropy.coordinates import SkyCoord
from dask.distributed.diagnostics.plugin import WorkerPlugin


class opencosmoPlugin(WorkerPlugin):
def __init__(
self,
name: Optional[str],
path: Path,
dataset_columns: Optional[list[str]],
**kwargs,
):
self.__files = identify_opencosmo_files(path)
self.__columns = dataset_columns

def setup(self, worker):
dataset = oc.open(self.__files)
if self.__columns is not None:
dataset = dataset.select(self.__columns)
worker.dataset = OpenCosmoProxy(dataset)

def teardown(self, worker):
try:
del worker.dataset
except AttributeError:
return


class OpenCosmoProxy:
def __init__(self, dataset):
self.__dataset = dataset

def get_data_from_samples(
self, coordinates: SkyCoord, dtypes, sample_type, sample_dimensions: u.Quantity
):
assert sample_type == "cone"
min_ra = coordinates.ra.min()
max_ra = coordinates.ra.max()
min_dec = coordinates.dec.min()
max_dec = coordinates.dec.max()
# this is just for caching optimization. Does not have to be perfect
min_ra = min_ra - 2 * sample_dimensions
max_ra = max_ra - 2 * sample_dimensions
min_dec = min_dec - 2 * sample_dimensions
max_dec = max_dec + 2 * sample_dimensions

dataset = self.__dataset.box_search((min_ra, min_dec), (max_ra, max_dec))
_ = dataset.get_data() # load into cache
for coordinate in coordinates:
region = oc.make_cone(coordinate, sample_dimensions)
yield region, {"catalog": dataset.bound(region).get_data()}


def identify_opencosmo_files(path: Path):
if path.exists() and path.is_file() and path.suffix == ".hdf5":
return [path]

elif path.exists() and path.is_dir():
return list(path.glob("*.hdf5"))

else:
raise FileNotFoundError(f"Unable to identify opencosmo files at path {path}")
12 changes: 9 additions & 3 deletions src/cosmap/dataset/plugins.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from pathlib import Path

from dask.distributed.diagnostics.plugin import WorkerPlugin
from heinlein import load_dataset
from pydantic import BaseModel

from cosmap.dataset.opencosmo import opencosmoPlugin

"""
At present, datasets are attached to Dask workers as plugins. Ideally, a dataset
would operate a server process that would be queried by the workers. In practice though,
Expand All @@ -25,19 +29,21 @@ def teardown(self, worker):
del worker.dataset


known_wrappers = {"heinlein": heinleinPlugin}
known_wrappers = {"heinlein": heinleinPlugin, "opencosmo": opencosmoPlugin}


def get_dataset(dataset_parameters: BaseModel):
return _get_dataset(**dataset_parameters.dict())


def _get_dataset(dataset_wrapper: str, dataset_name: str, *args, **kwargs):
def _get_dataset(
dataset_wrapper: str, dataset_name: str, dataset_path: Path, *args, **kwargs
):
"""
Get a dataset from a given wrapper. In the future, we will
support custom wrappers.
"""
if dataset_wrapper not in known_wrappers:
raise ValueError(f"Unknown wrapper {dataset_wrapper}")
wrapper = known_wrappers[dataset_wrapper]
return wrapper(dataset_name)
return wrapper(dataset_name, dataset_path, **kwargs)
Loading