diff --git a/upath/core.py b/upath/core.py index d572a4c9..d4c08a46 100644 --- a/upath/core.py +++ b/upath/core.py @@ -41,6 +41,7 @@ from upath.registry import get_upath_class from upath.types import UNSET_DEFAULT from upath.types import JoinablePathLike +from upath.types import OnNameCollisionFunc from upath.types import PathInfo from upath.types import ReadablePath from upath.types import ReadablePathLike @@ -1305,9 +1306,48 @@ def _copy_from( self, source: ReadablePath, follow_symlinks: bool = True, + on_name_collision: OnNameCollisionFunc | None = None, **kwargs: Any, ) -> None: - return super()._copy_from(source, follow_symlinks) + """ + UPath custom:: Recursively copy the given path to this path. + """ + # fixme: it would be best if this would be upstreamed + from pathlib_abc import vfsopen + from pathlib_abc import vfspath + from pathlib_abc._os import copyfileobj + from pathlib_abc._os import ensure_different_files + + stack: list[tuple[ReadablePath, WritablePath]] = [(source, self)] + while stack: + src, dst = stack.pop() + info = src.info + if not follow_symlinks and info.is_symlink(): + dst.symlink_to(vfspath(src.readlink()), src.info.is_dir()) + elif on_name_collision and info.is_file() and info.is_dir(): + dst_file, dst_dir = on_name_collision(src, dst) + if dst_file is not None: + ensure_different_files(src, dst_file) + with vfsopen(src, "rb") as source_f: + with vfsopen(dst_file, "wb") as target_f: + copyfileobj(source_f, target_f) + if dst_dir is not None: + children = src.iterdir() + dst_dir.mkdir() + # feed through dict.fromkeys to remove duplicates + for child in dict.fromkeys(children): + stack.append((child, dst_dir.joinpath(child.name))) + elif info.is_dir(): + children = src.iterdir() + dst.mkdir() + # feed through dict.fromkeys to remove duplicates + for child in dict.fromkeys(children): + stack.append((child, dst.joinpath(child.name))) + else: + ensure_different_files(src, dst) + with vfsopen(src, "rb") as source_f: + with vfsopen(dst, "wb") as target_f: + copyfileobj(source_f, target_f) # --- WritablePath attributes ------------------------------------- diff --git a/upath/implementations/cloud.py b/upath/implementations/cloud.py index 7814a456..0f76b4c1 100644 --- a/upath/implementations/cloud.py +++ b/upath/implementations/cloud.py @@ -5,15 +5,19 @@ from collections.abc import Sequence from typing import TYPE_CHECKING from typing import Any +from typing import overload from upath import UnsupportedOperation from upath._chain import DEFAULT_CHAIN_PARSER from upath._flavour import upath_strip_protocol from upath.core import UPath from upath.types import JoinablePathLike +from upath.types import SupportsPathLike +from upath.types import WritablePath if TYPE_CHECKING: from typing import Literal + from typing import TypeVar if sys.version_info >= (3, 11): from typing import Self @@ -28,6 +32,8 @@ from upath.types.storage_options import HfStorageOptions from upath.types.storage_options import S3StorageOptions + _WT = TypeVar("_WT", bound="WritablePath") + __all__ = [ "CloudPath", "GCSPath", @@ -147,6 +153,30 @@ def __init__( if not self.drive and len(self.parts) > 1: raise ValueError("non key-like path provided (bucket/container missing)") + @overload + def copy(self, target: _WT, **kwargs: Any) -> _WT: ... + + @overload + def copy(self, target: SupportsPathLike | str, **kwargs: Any) -> Self: ... + + def copy(self, target: _WT | SupportsPathLike | str, **kwargs: Any) -> _WT | UPath: + """ + Recursively copy this file or directory tree to the given destination. + """ + # to allow _copy_from to check if a path isfile AND isdir + # we need to disable s3fs's dircache mechanism because it + # currently implements a XOR relation the two for objects + # ref: fsspec/s3fs#999 + sopts = dict(self.storage_options) + sopts["use_listings_cache"] = False + new_self = type(self)( + self.path, + protocol=self.protocol, # type: ignore + **sopts, + ) + assert type(self) is type(new_self) + return super(type(new_self), new_self).copy(target, **kwargs) + class AzurePath(CloudPath): __slots__ = () diff --git a/upath/implementations/local.py b/upath/implementations/local.py index 3bf3ec76..62f9d472 100644 --- a/upath/implementations/local.py +++ b/upath/implementations/local.py @@ -364,6 +364,11 @@ def rmdir(self, recursive: bool = UNSET_DEFAULT) -> None: else: shutil.rmtree(self) + # we need to override pathlib.Path._copy_from to support it as a + # WritablePath._copy_from target with support for on_name_collision + # Issue: https://github.com/barneygale/pathlib-abc/issues/48 + _copy_from = UPath._copy_from + if sys.version_info < (3, 14): # noqa: C901 @overload @@ -720,17 +725,6 @@ def chmod( ) return super().chmod(mode) - if not hasattr(pathlib.Path, "_copy_from"): - - def _copy_from( - self, - source: ReadablePath | LocalPath, - follow_symlinks: bool = True, - preserve_metadata: bool = False, - ) -> None: - _copy_from: Any = WritablePath._copy_from.__get__(self) - _copy_from(source, follow_symlinks=follow_symlinks) - UPath.register(LocalPath) diff --git a/upath/tests/conftest.py b/upath/tests/conftest.py index c9333c6f..4fdeae62 100644 --- a/upath/tests/conftest.py +++ b/upath/tests/conftest.py @@ -238,9 +238,19 @@ def docker_gcs(): pytest.skip("docker not installed") container = "gcsfs_test" - cmd = ( - "docker run -d -p 4443:4443 --name gcsfs_test fsouza/fake-gcs-server:latest -scheme " # noqa: E501 - "http -public-host http://localhost:4443 -external-url http://localhost:4443" # noqa: E501 + cmd = " ".join( + [ + "docker", + "run", + "-d", + "-p 4443:4443", + "--name gcsfs_test", + "fsouza/fake-gcs-server:latest", + "-scheme http", + "-public-host http://localhost:4443", + "-external-url http://localhost:4443", + "-backend memory", + ] ) stop_docker(container) subprocess.check_output(shlex.split(cmd)) @@ -385,8 +395,8 @@ def docker_azurite(azurite_credentials): image = "mcr.microsoft.com/azure-storage/azurite" container_name = "azure_test" cmd = ( - f"docker run --rm -d -p {AZURITE_PORT}:10000 --name {container_name} {image}" # noqa: E501 - " azurite-blob --loose --blobHost 0.0.0.0" # noqa: E501 + f"docker run --rm -d -p {AZURITE_PORT}:10000 --name {container_name} {image}:latest" # noqa: E501 + " azurite-blob --loose --blobHost 0.0.0.0 --skipApiVersionCheck" # noqa: E501 ) url = f"http://localhost:{AZURITE_PORT}" diff --git a/upath/tests/implementations/test_azure.py b/upath/tests/implementations/test_azure.py index eb82e930..ee8ed625 100644 --- a/upath/tests/implementations/test_azure.py +++ b/upath/tests/implementations/test_azure.py @@ -1,3 +1,6 @@ +import warnings + +import fsspec import pytest from upath import UPath @@ -7,6 +10,7 @@ from ..utils import OverrideMeta from ..utils import extends_base from ..utils import overrides_base +from ..utils import posixify from ..utils import skip_on_windows @@ -61,3 +65,56 @@ def test_broken_mkdir(self): (path / "file").write_text("foo") assert path.exists() + + +@skip_on_windows +@pytest.mark.xfail(reason="adlfs returns isdir false") +def test_copy__object_key_collides_with_dir_prefix(azurite_credentials, tmp_path): + account_name, connection_string = azurite_credentials + storage_options = { + "account_name": account_name, + "connection_string": connection_string, + } + + az = fsspec.filesystem("az", **storage_options, use_listings_cache=False) + container = "copy-into-collision-container" + az.mkdir(container) + # store more objects with same prefix + az.pipe_file(f"{container}/src/common_prefix/file1.txt", b"1") + az.pipe_file(f"{container}/src/common_prefix/file2.txt", b"2") + # object under common prefix as key + az.pipe_file(f"{container}/src/common_prefix", b"hello world") + az.invalidate_cache() + + # make sure the sources have a collision + assert az.isfile(f"{container}/src/common_prefix") + assert az.isdir(f"{container}/src/common_prefix") + assert az.isfile(f"{container}/src/common_prefix/file1.txt") + assert az.isfile(f"{container}/src/common_prefix/file2.txt") + # prepare source and destination + src = UPath(f"az://{container}/src", **storage_options) + dst = UPath(tmp_path) + + def on_collision_rename_file(src, dst): + warnings.warn( + f"{src!s} collides with prefix. Renaming target file object to {dst!s}", + UserWarning, + stacklevel=3, + ) + return ( + dst.with_suffix(dst.suffix + ".COLLISION"), + dst, + ) + + # perform copy + src.copy_into(dst, on_name_collision=on_collision_rename_file) + + # check results + dst_files = sorted(posixify(x.relative_to(tmp_path)) for x in dst.glob("**/*")) + assert dst_files == [ + "src", + "src/common_prefix", + "src/common_prefix.COLLISION", + "src/common_prefix/file1.txt", + "src/common_prefix/file2.txt", + ] diff --git a/upath/tests/implementations/test_gcs.py b/upath/tests/implementations/test_gcs.py index 41d16c63..c7f13a82 100644 --- a/upath/tests/implementations/test_gcs.py +++ b/upath/tests/implementations/test_gcs.py @@ -1,3 +1,5 @@ +import warnings + import fsspec import pytest @@ -8,6 +10,7 @@ from ..utils import OverrideMeta from ..utils import extends_base from ..utils import overrides_base +from ..utils import posixify from ..utils import skip_on_windows @@ -49,3 +52,56 @@ def test_mkdir_in_empty_bucket(docker_gcs): endpoint_url=docker_gcs, token="anon", ).parent.mkdir(parents=True, exist_ok=True) + + +@skip_on_windows +@pytest.mark.xfail(reason="gcsfs returns isdir false") +def test_copy__object_key_collides_with_dir_prefix(docker_gcs, tmp_path): + gcs = fsspec.filesystem( + "gcs", + endpoint_url=docker_gcs, + token="anon", + use_listings_cache=False, + ) + bucket = "copy_into_collision_bucket" + gcs.mkdir(bucket) + # gcs.mkdir(bucket + "/src" + "/common_prefix/") + # object under common prefix as key + gcs.pipe_file(f"{bucket}/src/common_prefix", b"hello world") + # store more objects with same prefix + gcs.pipe_file(f"{bucket}/src/common_prefix/file1.txt", b"1") + gcs.pipe_file(f"{bucket}/src/common_prefix/file2.txt", b"2") + gcs.invalidate_cache() + + # make sure the sources have a collision + assert gcs.isfile(f"{bucket}/src/common_prefix") + assert gcs.isdir(f"{bucket}/src/common_prefix") # BROKEN in gcsfs + assert gcs.isfile(f"{bucket}/src/common_prefix/file1.txt") + assert gcs.isfile(f"{bucket}/src/common_prefix/file2.txt") + # prepare source and destination + src = UPath(f"gs://{bucket}/src", endpoint_url=docker_gcs, token="anon") + dst = UPath(tmp_path) + + def on_collision_rename_file(src, dst): + warnings.warn( + f"{src!s} collides with prefix. Renaming target file object to {dst!s}", + UserWarning, + stacklevel=3, + ) + return ( + dst.with_suffix(dst.suffix + ".COLLISION"), + dst, + ) + + # perform copy + src.copy_into(dst, on_name_collision=on_collision_rename_file) + + # check results + dst_files = sorted(posixify(x.relative_to(tmp_path)) for x in dst.glob("**/*")) + assert dst_files == [ + "src", + "src/common_prefix", + "src/common_prefix.COLLISION", + "src/common_prefix/file1.txt", + "src/common_prefix/file2.txt", + ] diff --git a/upath/tests/implementations/test_s3.py b/upath/tests/implementations/test_s3.py index d4027fdc..a7fcc5ad 100644 --- a/upath/tests/implementations/test_s3.py +++ b/upath/tests/implementations/test_s3.py @@ -1,6 +1,7 @@ """see upath/tests/conftest.py for fixtures""" import sys +import warnings import fsspec import pytest @@ -12,6 +13,7 @@ from ..utils import OverrideMeta from ..utils import extends_base from ..utils import overrides_base +from ..utils import posixify def silence_botocore_datetime_deprecation(cls): @@ -165,3 +167,49 @@ def test_pathlib_consistent_join(): b1 = UPath("s3://mybucket/withkey").joinpath("subfolder/myfile.txt") assert b0 == b1 assert "s3://mybucket/withkey/subfolder/myfile.txt" == str(b0) == str(b1) + + +def test_copy__object_key_collides_with_dir_prefix(s3_server, tmp_path): + anon, s3so = s3_server + + s3 = fsspec.filesystem("s3", anon=anon, **{**s3so, "use_listings_cache": False}) + bucket = "copy_into_collision_bucket" + s3.mkdir(bucket + "/src" + "/common_prefix/") + # object under common prefix as key + s3.pipe_file(f"{bucket}/src/common_prefix", b"hello world") + # store more objects with same prefix + s3.pipe_file(f"{bucket}/src/common_prefix/file1.txt", b"1") + s3.pipe_file(f"{bucket}/src/common_prefix/file2.txt", b"2") + + # make sure the sources have a collision + assert s3.isdir(f"{bucket}/src/common_prefix") + assert s3.isfile(f"{bucket}/src/common_prefix") + assert s3.isfile(f"{bucket}/src/common_prefix/file1.txt") + assert s3.isfile(f"{bucket}/src/common_prefix/file2.txt") + # prepare source and destination + src = UPath(f"s3://{bucket}/src", anon=anon, **s3so) + dst = UPath(tmp_path) + + def on_collision_rename_file(src, dst): + warnings.warn( + f"{src!s} collides with prefix. Renaming target file object to {dst!s}", + UserWarning, + stacklevel=3, + ) + return ( + dst.with_suffix(dst.suffix + ".COLLISION"), + dst, + ) + + # perform copy + src.copy_into(dst, on_name_collision=on_collision_rename_file) + + # check results + dst_files = sorted(posixify(x.relative_to(tmp_path)) for x in dst.glob("**/*")) + assert dst_files == [ + "src", + "src/common_prefix", + "src/common_prefix.COLLISION", + "src/common_prefix/file1.txt", + "src/common_prefix/file2.txt", + ] diff --git a/upath/types/__init__.py b/upath/types/__init__.py index f0c6c965..0bb07878 100644 --- a/upath/types/__init__.py +++ b/upath/types/__init__.py @@ -2,9 +2,11 @@ import enum import sys +from collections.abc import Callable from os import PathLike from typing import TYPE_CHECKING from typing import Any +from typing import Optional from typing import Protocol from typing import Union from typing import runtime_checkable @@ -35,6 +37,7 @@ "PathParser", "UPathParser", "UNSET_DEFAULT", + "OnNameCollisionFunc", ] @@ -124,3 +127,8 @@ def isabs(self, path: JoinablePathLike) -> bool: ... def splitdrive(self, path: JoinablePathLike) -> tuple[str, str]: ... def splitroot(self, path: JoinablePathLike) -> tuple[str, str, str]: ... + + +OnNameCollisionFunc: TypeAlias = Callable[ + [ReadablePath, WritablePath], tuple[Optional[WritablePath], Optional[WritablePath]] +]