From c98cf094362f366a3397a7e146ee2d943ed34114 Mon Sep 17 00:00:00 2001 From: George Zelenfroind Date: Tue, 6 May 2025 04:42:14 -0700 Subject: [PATCH 1/5] test_chunk_manifest Signed-off-by: George Zelenfroind --- sdp/processors/base_processor.py | 55 ++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/sdp/processors/base_processor.py b/sdp/processors/base_processor.py index 6fc22ee8..fe1eb4c3 100644 --- a/sdp/processors/base_processor.py +++ b/sdp/processors/base_processor.py @@ -166,28 +166,43 @@ def _process_with_dask(self, metrics): logger.info(f"Using Dask client with dashboard at: {client.dashboard_link}") # Delegate manifest reading to read_manifest() which returns a Dask bag. - bag = self.read_manifest() - - if not isinstance(bag, db.Bag): - bag = db.from_sequence(bag) - total_entries = bag.count().compute() - - if total_entries == 0: - logger.info("No entries found in the manifest input. Proceeding to create an empty output manifest.") - results = [] - else: - processed_bag = bag.map(lambda entry: self.process_dataset_entry(entry)).flatten() - results = processed_bag.compute() with open(self.output_manifest_file, "wt", encoding="utf8") as fout: - for entry in results: - metrics.append(entry.metrics) - if entry.data is not None: - json.dump(entry.data, fout, ensure_ascii=False) - fout.write("\n") - self.number_of_entries += 1 - self.total_duration += entry.data.get("duration", 0) - logger.info(f"Processed {total_entries} entries using Dask.") + for manifest_chunk in self._chunk_manifest(): + chunk_bag = db.from_sequence(manifest_chunk) + processed_chunk = chunk_bag.map(self.process_dataset_entry).flatten().compute() + + # Write results from this chunk to the output file + for entry in processed_chunk: + metrics.append(entry.metrics) + if entry.data is not None: + json.dump(entry.data, fout, ensure_ascii=False) + fout.write("\n") + self.number_of_entries += 1 + self.total_duration += entry.data.get("duration", 0) + + # bag = self.read_manifest() + + # if not isinstance(bag, db.Bag): + # bag = db.from_sequence(bag) + # total_entries = bag.count().compute() + + # if total_entries == 0: + # logger.info("No entries found in the manifest input. Proceeding to create an empty output manifest.") + # results = [] + # else: + # processed_bag = bag.map(lambda entry: self.process_dataset_entry(entry)).flatten() + # results = processed_bag.compute() + + # with open(self.output_manifest_file, "wt", encoding="utf8") as fout: + # for entry in results: + # metrics.append(entry.metrics) + # if entry.data is not None: + # json.dump(entry.data, fout, ensure_ascii=False) + # fout.write("\n") + # self.number_of_entries += 1 + # self.total_duration += entry.data.get("duration", 0) + # logger.info(f"Processed {total_entries} entries using Dask.") def _process_with_multiprocessing(self, metrics): with open(self.output_manifest_file, "wt", encoding="utf8") as fout: From cc028cc56007dd8785a4237c8756d0ddca92375f Mon Sep 17 00:00:00 2001 From: George Zelenfroind Date: Wed, 7 May 2025 05:49:29 -0700 Subject: [PATCH 2/5] fix toloka warnongs Signed-off-by: George Zelenfroind --- sdp/processors/toloka/accept_if.py | 6 +++++- sdp/processors/toloka/create_pool.py | 7 ++++++- sdp/processors/toloka/create_task_set.py | 8 +++++++- sdp/processors/toloka/download_responses.py | 7 ++++++- sdp/processors/toloka/reject_if.py | 5 ++++- 5 files changed, 28 insertions(+), 5 deletions(-) diff --git a/sdp/processors/toloka/accept_if.py b/sdp/processors/toloka/accept_if.py index 3b0b7452..a5ce40c1 100644 --- a/sdp/processors/toloka/accept_if.py +++ b/sdp/processors/toloka/accept_if.py @@ -25,9 +25,9 @@ import toloka.client.project.template_builder TOLOKA_AVAILABLE = True except ImportError: - logger.warning("Toloka is currently not supported. AcceptIf processor functionality will be limited.") TOLOKA_AVAILABLE = False toloka = None + pass from tqdm import tqdm @@ -82,6 +82,7 @@ def __init__( self.pool_id = pool_id if self.config_file: self.load_config() + self.toloka_available = TOLOKA_AVAILABLE def load_config(self): """ @@ -107,6 +108,9 @@ def prepare(self): This method loads necessary configurations and initializes the Toloka client to interact with Toloka's API. """ + if self.toloka_available != True: + logger.warning("Toloka is currently not supported. AcceptIf processor functionality will be limited.") + if not self.API_KEY or not self.platform or not self.pool_id: try: with open(self.input_data_file, 'r') as file: diff --git a/sdp/processors/toloka/create_pool.py b/sdp/processors/toloka/create_pool.py index 88da4960..28ddfe6f 100644 --- a/sdp/processors/toloka/create_pool.py +++ b/sdp/processors/toloka/create_pool.py @@ -24,9 +24,9 @@ import toloka.client.project.template_builder TOLOKA_AVAILABLE = True except ImportError: - logger.warning("Toloka is currently not supported. CreatePool processor functionality will be limited.") TOLOKA_AVAILABLE = False toloka = None + pass class CreateTolokaPool(BaseParallelProcessor): @@ -67,6 +67,7 @@ def __init__( # Project ID will be read from the input manifest file in process_dataset_entry self.project_id = None self.lang = lang + self.toloka_available = TOLOKA_AVAILABLE def process_dataset_entry(self, data_entry): """ @@ -85,6 +86,10 @@ def process_dataset_entry(self, data_entry): list A list containing a DataEntry object with the new pool ID if successful, or an empty list if failed. """ + + if self.toloka_available != True: + logger.warning("Toloka is currently not supported. CreatePool processor functionality will be limited.") + # Get project_id from the data entry project_id = data_entry.get("project_id") if not project_id: diff --git a/sdp/processors/toloka/create_task_set.py b/sdp/processors/toloka/create_task_set.py index 692a1e2f..dd18a2ec 100644 --- a/sdp/processors/toloka/create_task_set.py +++ b/sdp/processors/toloka/create_task_set.py @@ -25,9 +25,9 @@ import toloka.client.project.template_builder TOLOKA_AVAILABLE = True except ImportError: - logger.warning("Toloka is currently not supported. CreateTaskSet processor functionality will be limited.") TOLOKA_AVAILABLE = False toloka = None + pass @@ -57,6 +57,10 @@ def __init__( self.input_pool_file = input_pool_file self.limit = limit self.pool_id = None + self.toloka_available = TOLOKA_AVAILABLE + + + # Get API key and platform from environment variables self.API_KEY = os.getenv('TOLOKA_API_KEY') @@ -76,6 +80,8 @@ def prepare(self): This method sets up the necessary components for task creation, including loading the pool configuration and initializing the Toloka client. """ + if self.toloka_available != True: + logger.warning("Toloka is currently not supported. CreateTaskSet processor functionality will be limited.") self.load_pool_config() self.toloka_client = toloka.client.TolokaClient(self.API_KEY, self.platform) diff --git a/sdp/processors/toloka/download_responses.py b/sdp/processors/toloka/download_responses.py index 9e6c08a3..6e07e5b9 100644 --- a/sdp/processors/toloka/download_responses.py +++ b/sdp/processors/toloka/download_responses.py @@ -22,9 +22,9 @@ import toloka.client TOLOKA_AVAILABLE = True except ImportError: - logger.warning("Toloka is currently not supported. DownloadResponses processor functionality will be limited.") TOLOKA_AVAILABLE = False toloka = None + pass @@ -92,6 +92,7 @@ def __init__( self.pool_id = pool_id if self.config_file: self.load_config() + self.toloka_available = TOLOKA_AVAILABLE def load_config(self): """ @@ -100,6 +101,7 @@ def load_config(self): This method attempts to read configuration details such as API key, platform, and pool ID from a JSON file. If the file is missing or improperly formatted, an appropriate error is logged. """ + try: with open(self.config_file, 'r') as file: config = json.load(file) @@ -117,6 +119,9 @@ def prepare(self): This method loads necessary configurations and initializes the Toloka client to interact with Toloka's API. """ + if self.toloka_available != True: + logger.warning("Toloka is currently not supported. DownloadResponses processor functionality will be limited.") + if not self.API_KEY or not self.platform or not self.pool_id: try: with open(self.input_data_file, 'r') as file: diff --git a/sdp/processors/toloka/reject_if.py b/sdp/processors/toloka/reject_if.py index 7da755a4..d1d689e0 100644 --- a/sdp/processors/toloka/reject_if.py +++ b/sdp/processors/toloka/reject_if.py @@ -23,9 +23,9 @@ import toloka.client.project.template_builder TOLOKA_AVAILABLE = True except ImportError: - logger.warning("Toloka is currently not supported. RejectIf processor functionality will be limited.") TOLOKA_AVAILABLE = False toloka = None + pass from docx import Document from tqdm import tqdm @@ -83,6 +83,7 @@ def __init__( self.API_KEY = API_KEY or os.getenv('TOLOKA_API_KEY') self.platform = platform or os.getenv('TOLOKA_PLATFORM') self.pool_id = pool_id + self.toloka_available = TOLOKA_AVAILABLE if self.config_file: self.load_config() @@ -110,6 +111,8 @@ def prepare(self): This method loads necessary configurations and initializes the Toloka client to interact with Toloka's API. """ + if self.toloka_available != True: + logger.warning("Toloka is currently not supported. RejectIf processor functionality will be limited.") if not self.API_KEY or not self.platform or not self.pool_id: try: with open(self.input_data_file, 'r') as file: From 5fd400f61d93f5994afeb77b1eba8e0e00b0ed17 Mon Sep 17 00:00:00 2001 From: George Zelenfroind Date: Wed, 7 May 2025 06:08:48 -0700 Subject: [PATCH 3/5] Revert "test_chunk_manifest" This reverts commit 6d7055c901c396f9a1436f85b36e9acf3e446a29. Signed-off-by: George Zelenfroind --- sdp/processors/base_processor.py | 55 ++++++++++++-------------------- 1 file changed, 20 insertions(+), 35 deletions(-) diff --git a/sdp/processors/base_processor.py b/sdp/processors/base_processor.py index fe1eb4c3..6fc22ee8 100644 --- a/sdp/processors/base_processor.py +++ b/sdp/processors/base_processor.py @@ -166,43 +166,28 @@ def _process_with_dask(self, metrics): logger.info(f"Using Dask client with dashboard at: {client.dashboard_link}") # Delegate manifest reading to read_manifest() which returns a Dask bag. + bag = self.read_manifest() + + if not isinstance(bag, db.Bag): + bag = db.from_sequence(bag) + total_entries = bag.count().compute() + + if total_entries == 0: + logger.info("No entries found in the manifest input. Proceeding to create an empty output manifest.") + results = [] + else: + processed_bag = bag.map(lambda entry: self.process_dataset_entry(entry)).flatten() + results = processed_bag.compute() with open(self.output_manifest_file, "wt", encoding="utf8") as fout: - for manifest_chunk in self._chunk_manifest(): - chunk_bag = db.from_sequence(manifest_chunk) - processed_chunk = chunk_bag.map(self.process_dataset_entry).flatten().compute() - - # Write results from this chunk to the output file - for entry in processed_chunk: - metrics.append(entry.metrics) - if entry.data is not None: - json.dump(entry.data, fout, ensure_ascii=False) - fout.write("\n") - self.number_of_entries += 1 - self.total_duration += entry.data.get("duration", 0) - - # bag = self.read_manifest() - - # if not isinstance(bag, db.Bag): - # bag = db.from_sequence(bag) - # total_entries = bag.count().compute() - - # if total_entries == 0: - # logger.info("No entries found in the manifest input. Proceeding to create an empty output manifest.") - # results = [] - # else: - # processed_bag = bag.map(lambda entry: self.process_dataset_entry(entry)).flatten() - # results = processed_bag.compute() - - # with open(self.output_manifest_file, "wt", encoding="utf8") as fout: - # for entry in results: - # metrics.append(entry.metrics) - # if entry.data is not None: - # json.dump(entry.data, fout, ensure_ascii=False) - # fout.write("\n") - # self.number_of_entries += 1 - # self.total_duration += entry.data.get("duration", 0) - # logger.info(f"Processed {total_entries} entries using Dask.") + for entry in results: + metrics.append(entry.metrics) + if entry.data is not None: + json.dump(entry.data, fout, ensure_ascii=False) + fout.write("\n") + self.number_of_entries += 1 + self.total_duration += entry.data.get("duration", 0) + logger.info(f"Processed {total_entries} entries using Dask.") def _process_with_multiprocessing(self, metrics): with open(self.output_manifest_file, "wt", encoding="utf8") as fout: From 6097d54b08853384ae996e3401ce8707d97a1b20 Mon Sep 17 00:00:00 2001 From: George Zelenfroind Date: Wed, 7 May 2025 06:13:01 -0700 Subject: [PATCH 4/5] add create project Signed-off-by: George Zelenfroind --- sdp/processors/toloka/create_project.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdp/processors/toloka/create_project.py b/sdp/processors/toloka/create_project.py index a229214e..d4d9bf0f 100644 --- a/sdp/processors/toloka/create_project.py +++ b/sdp/processors/toloka/create_project.py @@ -23,9 +23,9 @@ import toloka.client.project.template_builder TOLOKA_AVAILABLE = True except ImportError: - logger.warning("Toloka is currently not supported. CreateTolokaProject processor functionality will be limited.") TOLOKA_AVAILABLE = False toloka = None + pass class CreateTolokaProject(BaseParallelProcessor): @@ -63,6 +63,7 @@ def __init__( self.project_name = project_name self.project_description = project_description self.project_instructions = project_instructions + self.toloka_availabe = TOLOKA_AVAILABLE def process(self): """ @@ -75,6 +76,8 @@ def process(self): After creating the project, it saves the project details (including the project ID) to a specified file. """ logger.info("Processing Toloka project creation...") + if self.toloka_availabe != True: + logger.warning("Toloka is currently not supported. CreateTolokaProject processor functionality will be limited.") toloka_client = toloka.client.TolokaClient(self.API_KEY, self.platform) From 94f2f1749c81028a59a23f929daa4f31f2c99fc8 Mon Sep 17 00:00:00 2001 From: George Zelenfroind Date: Thu, 8 May 2025 03:39:00 -0700 Subject: [PATCH 5/5] remove leftovers from prev verstion Signed-off-by: George Zelenfroind --- sdp/processors/toloka/accept_if.py | 2 +- sdp/processors/toloka/create_pool.py | 2 +- sdp/processors/toloka/create_project.py | 2 +- sdp/processors/toloka/create_task_set.py | 2 +- sdp/processors/toloka/download_responses.py | 2 +- sdp/processors/toloka/reject_if.py | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sdp/processors/toloka/accept_if.py b/sdp/processors/toloka/accept_if.py index a5ce40c1..8472f601 100644 --- a/sdp/processors/toloka/accept_if.py +++ b/sdp/processors/toloka/accept_if.py @@ -27,7 +27,7 @@ except ImportError: TOLOKA_AVAILABLE = False toloka = None - pass + from tqdm import tqdm diff --git a/sdp/processors/toloka/create_pool.py b/sdp/processors/toloka/create_pool.py index 28ddfe6f..9948cef4 100644 --- a/sdp/processors/toloka/create_pool.py +++ b/sdp/processors/toloka/create_pool.py @@ -26,7 +26,7 @@ except ImportError: TOLOKA_AVAILABLE = False toloka = None - pass + class CreateTolokaPool(BaseParallelProcessor): diff --git a/sdp/processors/toloka/create_project.py b/sdp/processors/toloka/create_project.py index d4d9bf0f..bf8ece19 100644 --- a/sdp/processors/toloka/create_project.py +++ b/sdp/processors/toloka/create_project.py @@ -25,7 +25,7 @@ except ImportError: TOLOKA_AVAILABLE = False toloka = None - pass + class CreateTolokaProject(BaseParallelProcessor): diff --git a/sdp/processors/toloka/create_task_set.py b/sdp/processors/toloka/create_task_set.py index dd18a2ec..3957091f 100644 --- a/sdp/processors/toloka/create_task_set.py +++ b/sdp/processors/toloka/create_task_set.py @@ -27,7 +27,7 @@ except ImportError: TOLOKA_AVAILABLE = False toloka = None - pass + diff --git a/sdp/processors/toloka/download_responses.py b/sdp/processors/toloka/download_responses.py index 6e07e5b9..aa2563cf 100644 --- a/sdp/processors/toloka/download_responses.py +++ b/sdp/processors/toloka/download_responses.py @@ -24,7 +24,7 @@ except ImportError: TOLOKA_AVAILABLE = False toloka = None - pass + diff --git a/sdp/processors/toloka/reject_if.py b/sdp/processors/toloka/reject_if.py index d1d689e0..182c3e86 100644 --- a/sdp/processors/toloka/reject_if.py +++ b/sdp/processors/toloka/reject_if.py @@ -25,7 +25,7 @@ except ImportError: TOLOKA_AVAILABLE = False toloka = None - pass + from docx import Document from tqdm import tqdm