Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions nemo_curator/stages/deduplication/io_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import Callable
from collections.abc import Callable, Iterator
from contextlib import ExitStack, contextmanager

# TODO: Should this be a safe import?
import cudf
import numpy as np
import ray

from nemo_curator.stages.deduplication.id_generator import CURATOR_DEDUP_ID_STR, IdGenerator
from nemo_curator.utils.client_utils import is_remote_url
from nemo_curator.utils.file_utils import get_fs


Expand All @@ -35,11 +37,31 @@ def __init__(
def read_jsonl(
self, filepath: str | list[str], columns: list[str] | None = None, assign_id: bool = False, **kwargs
) -> "cudf.DataFrame":
df = cudf.read_json(filepath, lines=True, **kwargs)
with self._open_remote_files(filepath, kwargs) as (read_filepath, read_kwargs):
df = cudf.read_json(read_filepath, lines=True, **read_kwargs)
if columns is not None:
df = df[columns]
return self.assign_id(filepath, df) if assign_id and self.id_generator else df

@contextmanager
def _open_remote_files(
self, filepath: str | list[str], kwargs: dict
) -> Iterator[tuple[str | list[str] | object | list[object], dict]]:
paths = [filepath] if isinstance(filepath, str) else filepath
if not paths or not all(isinstance(path, str) and is_remote_url(path) for path in paths):
yield filepath, kwargs
return

read_kwargs = kwargs.copy()
storage_options = read_kwargs.pop("storage_options", {})

with ExitStack() as stack:
buffers = []
for path in paths:
fs = get_fs(path, storage_options=storage_options)
buffers.append(stack.enter_context(fs.open(path, mode="rb")))
Comment on lines +60 to +62

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Per-file filesystem instantiation

A new filesystem object is created for every path in the loop via get_fs. For a list of N files that all share the same protocol and storage_options, this produces N separate client instances — each may negotiate its own connection or credential chain. A single fs instance could be created once (keyed on protocol) and reused across all files from the same backend.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

yield (buffers[0] if isinstance(filepath, str) else buffers), read_kwargs

def read_parquet(self, filepath: str | list[str], assign_id: bool = False, **kwargs) -> "cudf.DataFrame":
read_kwargs = kwargs.copy()
read_kwargs["allow_mismatched_pq_schemas"] = True
Expand Down