Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions src/detectmatelibrary/common/_core_op/_fit_logic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@

from enum import Enum


class TrainState(Enum):
DEFAULT = 0
STOP_TRAINING = 1
KEEP_TRAINING = 2

def describe(self) -> str:
descriptions = [
"Follow default training behavior.",
"Force stop training.",
"Keep training regardless of default behavior."
]

return descriptions[self.value]


class ConfigState(Enum):
DEFAULT = 0
STOP_CONFIGURE = 1
KEEP_CONFIGURE = 2

def describe(self) -> str:
descriptions = [
"Follow default configuration behavior.",
"Force stop configuration.",
"Keep configuring regardless of default behavior."
]

return descriptions[self.value]


def do_training(
data_use_training: int | None, index: int, train_state: TrainState
) -> bool:
if train_state == TrainState.STOP_TRAINING:
return False
elif train_state == TrainState.KEEP_TRAINING:
return True

return data_use_training is not None and data_use_training > index


def do_configure(
data_use_configure: int | None, index: int, configure_state: ConfigState
) -> bool:
if configure_state == ConfigState.STOP_CONFIGURE:
return False
elif configure_state == ConfigState.KEEP_CONFIGURE:
return True

return data_use_configure is not None and data_use_configure > index


class FitLogicState(Enum):
DO_CONFIG = 0
DO_TRAIN = 1
NOTHING = 2


class FitLogic:
def __init__(
self, data_use_configure: int | None, data_use_training: int | None
) -> None:

self.train_state = TrainState.DEFAULT
self.configure_state = ConfigState.DEFAULT

self.data_used_train = 0
self.data_used_configure = 0

self._configuration_done = False
self.config_finished = False

self.data_use_configure = data_use_configure
self.data_use_training = data_use_training

def finish_config(self) -> bool:
if self._configuration_done and not self.config_finished:
self.config_finished = True
return True

return False

def run(self) -> FitLogicState:
if do_configure(
data_use_configure=self.data_use_configure,
index=self.data_used_configure,
configure_state=self.configure_state
):
self.data_used_configure += 1
return FitLogicState.DO_CONFIG
else:
if self.data_used_configure > 0 and not self._configuration_done:
self._configuration_done = True

if do_training(
data_use_training=self.data_use_training,
index=self.data_used_train,
train_state=self.train_state
):
self.data_used_train += 1
return FitLogicState.DO_TRAIN

return FitLogicState.NOTHING
28 changes: 28 additions & 0 deletions src/detectmatelibrary/common/_core_op/_schema_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from detectmatelibrary.schemas import BaseSchema


from typing import Tuple


class SchemaPipeline:
@staticmethod
def preprocess(
input_: BaseSchema, data: BaseSchema | bytes
) -> Tuple[bool, BaseSchema]:

is_byte = False
if isinstance(data, bytes):
is_byte = True
input_.deserialize(data)
data = input_.copy()
else:
data = data.copy()

return is_byte, data

@staticmethod
def postprocess(
data: BaseSchema, is_byte: bool
) -> BaseSchema | bytes:

return data if not is_byte else data.serialize()
152 changes: 42 additions & 110 deletions src/detectmatelibrary/common/core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from detectmatelibrary.common._core_op._fit_logic import FitLogicState
from detectmatelibrary.common._core_op._schema_pipeline import SchemaPipeline
from detectmatelibrary.common._core_op._fit_logic import FitLogic

from detectmatelibrary.utils.data_buffer import DataBuffer, ArgsBuffer, BufferMode
from detectmatelibrary.utils.id_generator import SimpleIDGenerator

Expand All @@ -7,113 +11,27 @@

from tools.logging import logger, setup_logging

from typing import Any, Dict, Tuple, List
from enum import Enum
from typing import Any, Dict, List


setup_logging()


class SchemaPipeline:
@staticmethod
def preprocess(
input_: BaseSchema, data: BaseSchema | bytes
) -> Tuple[bool, BaseSchema]:

is_byte = False
if isinstance(data, bytes):
is_byte = True
input_.deserialize(data)
data = input_.copy()
else:
data = data.copy()

return is_byte, data

@staticmethod
def postprocess(
data: BaseSchema, is_byte: bool
) -> BaseSchema | bytes:

return data if not is_byte else data.serialize()


class TrainState(Enum):
DEFAULT = 0
STOP_TRAINING = 1
KEEP_TRAINING = 2

def describe(self) -> str:
descriptions = [
"Follow default training behavior.",
"Force stop training.",
"Keep training regardless of default behavior."
]

return descriptions[self.value]


class ConfigState(Enum):
DEFAULT = 0
STOP_CONFIGURE = 1
KEEP_CONFIGURE = 2

def describe(self) -> str:
descriptions = [
"Follow default configuration behavior.",
"Force stop configuration.",
"Keep configuring regardless of default behavior."
]

return descriptions[self.value]


class CoreConfig(BasicConfig):
start_id: int = 10
data_use_training: int | None = None
data_use_configure: int | None = None


def do_training(config: CoreConfig, index: int, train_state: TrainState) -> bool:
if train_state == TrainState.STOP_TRAINING:
return False
elif train_state == TrainState.KEEP_TRAINING:
return True

return config.data_use_training is not None and config.data_use_training > index


def do_configure(config: CoreConfig, index: int, configure_state: ConfigState) -> bool:
if configure_state == ConfigState.STOP_CONFIGURE:
return False
elif configure_state == ConfigState.KEEP_CONFIGURE:
return True

return config.data_use_configure is not None and config.data_use_configure > index


class CoreComponent:
"""Base class for all components in the system."""
class Component:
"""Empty methods."""
def __init__(
self,
name: str,
type_: str = "Core",
config: CoreConfig = CoreConfig(),
args_buffer: ArgsBuffer = ArgsBuffer(BufferMode.NO_BUF),
input_schema: type[BaseSchema] = BaseSchema,
output_schema: type[BaseSchema] = BaseSchema
) -> None:

self.name, self.type_, self.config = name, type_, config
self.input_schema, self.output_schema = input_schema, output_schema

self.data_buffer = DataBuffer(args_buffer)
self.id_generator = SimpleIDGenerator(self.config.start_id)
self.data_used_train = 0
self.train_state: TrainState = TrainState.DEFAULT
self.data_used_configure = 0
self.configure_state: ConfigState = ConfigState.DEFAULT
self._configuration_done = False

def __repr__(self) -> str:
return f"<{self.type_}> {self.name}: {self.config}"
Expand All @@ -136,32 +54,52 @@ def configure(
def set_configuration(self) -> None:
pass

def get_config(self) -> Dict[str, Any]:
return self.config.get_config()

def update_config(self, new_config: Dict[str, Any]) -> None:
self.config.update_config(new_config)


class CoreComponent(Component):
"""Base class for all components in the system."""
def __init__(
self,
name: str,
type_: str = "Core",
config: CoreConfig = CoreConfig(),
args_buffer: ArgsBuffer = ArgsBuffer(BufferMode.NO_BUF),
input_schema: type[BaseSchema] = BaseSchema,
output_schema: type[BaseSchema] = BaseSchema
) -> None:
super().__init__(name=name, type_=type_, config=config)
self.input_schema, self.output_schema = input_schema, output_schema

self.data_buffer = DataBuffer(args_buffer)
self.id_generator = SimpleIDGenerator(self.config.start_id)
self.fitlogic = FitLogic(
data_use_configure=self.config.data_use_configure,
data_use_training=self.config.data_use_training,
)

def process(self, data: BaseSchema | bytes) -> BaseSchema | bytes | None:
is_byte, data = SchemaPipeline.preprocess(self.input_schema(), data)
logger.debug(f"<<{self.name}>> received:\n{data}")

if (data_buffered := self.data_buffer.add(data)) is None: # type: ignore
return None

if do_configure(
config=self.config,
index=self.data_used_configure,
configure_state=self.configure_state
):
self.data_used_configure += 1
if (fit_state := self.fitlogic.run()) == FitLogicState.DO_CONFIG:
logger.info(f"<<{self.name}>> use data for configuration")
self.configure(input_=data_buffered)
return None
else:
if self.data_used_configure > 0 and not self._configuration_done:
self._configuration_done = True
logger.info(f"<<{self.name}>> finalizing configuration")
self.set_configuration()
elif self.fitlogic.finish_config():
logger.info(f"<<{self.name}>> finalizing configuration")
self.set_configuration()

if do_training(config=self.config, index=self.data_used_train, train_state=self.train_state):
self.data_used_train += 1
logger.info(f"<<{self.name}>> use data for training")
self.train(input_=data_buffered)
if fit_state == FitLogicState.DO_TRAIN:
logger.info(f"<<{self.name}>> use data for training")
self.train(input_=data_buffered)

output_ = self.output_schema()
logger.info(f"<<{self.name}>> processing data")
Expand All @@ -172,9 +110,3 @@ def process(self, data: BaseSchema | bytes) -> BaseSchema | bytes | None:

logger.debug(f"<<{self.name}>> processed:\n{output_}")
return SchemaPipeline.postprocess(output_, is_byte=is_byte)

def get_config(self) -> Dict[str, Any]:
return self.config.get_config()

def update_config(self, new_config: Dict[str, Any]) -> None:
self.config.update_config(new_config)
Loading
Loading