Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion upath/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from upath.registry import get_upath_class
from upath.types import UNSET_DEFAULT
from upath.types import JoinablePathLike
from upath.types import OnNameCollisionFunc
from upath.types import PathInfo
from upath.types import ReadablePath
from upath.types import ReadablePathLike
Expand Down Expand Up @@ -1305,9 +1306,48 @@ def _copy_from(
self,
source: ReadablePath,
follow_symlinks: bool = True,
on_name_collision: OnNameCollisionFunc | None = None,
**kwargs: Any,
) -> None:
return super()._copy_from(source, follow_symlinks)
"""
UPath custom:: Recursively copy the given path to this path.
"""
# fixme: it would be best if this would be upstreamed
from pathlib_abc import vfsopen
from pathlib_abc import vfspath
from pathlib_abc._os import copyfileobj
from pathlib_abc._os import ensure_different_files

stack: list[tuple[ReadablePath, WritablePath]] = [(source, self)]
while stack:
src, dst = stack.pop()
info = src.info
if not follow_symlinks and info.is_symlink():
dst.symlink_to(vfspath(src.readlink()), src.info.is_dir())
elif on_name_collision and info.is_file() and info.is_dir():
dst_file, dst_dir = on_name_collision(src, dst)
if dst_file is not None:
ensure_different_files(src, dst_file)
with vfsopen(src, "rb") as source_f:
with vfsopen(dst_file, "wb") as target_f:
copyfileobj(source_f, target_f)
if dst_dir is not None:
children = src.iterdir()
dst_dir.mkdir()
# feed through dict.fromkeys to remove duplicates
for child in dict.fromkeys(children):
stack.append((child, dst_dir.joinpath(child.name)))
elif info.is_dir():
children = src.iterdir()
dst.mkdir()
# feed through dict.fromkeys to remove duplicates
for child in dict.fromkeys(children):
stack.append((child, dst.joinpath(child.name)))
else:
ensure_different_files(src, dst)
with vfsopen(src, "rb") as source_f:
with vfsopen(dst, "wb") as target_f:
copyfileobj(source_f, target_f)

# --- WritablePath attributes -------------------------------------

Expand Down
30 changes: 30 additions & 0 deletions upath/implementations/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@
from collections.abc import Sequence
from typing import TYPE_CHECKING
from typing import Any
from typing import overload

from upath import UnsupportedOperation
from upath._chain import DEFAULT_CHAIN_PARSER
from upath._flavour import upath_strip_protocol
from upath.core import UPath
from upath.types import JoinablePathLike
from upath.types import SupportsPathLike
from upath.types import WritablePath

if TYPE_CHECKING:
from typing import Literal
from typing import TypeVar

if sys.version_info >= (3, 11):
from typing import Self
Expand All @@ -28,6 +32,8 @@
from upath.types.storage_options import HfStorageOptions
from upath.types.storage_options import S3StorageOptions

_WT = TypeVar("_WT", bound="WritablePath")

__all__ = [
"CloudPath",
"GCSPath",
Expand Down Expand Up @@ -147,6 +153,30 @@ def __init__(
if not self.drive and len(self.parts) > 1:
raise ValueError("non key-like path provided (bucket/container missing)")

@overload
def copy(self, target: _WT, **kwargs: Any) -> _WT: ...

@overload
def copy(self, target: SupportsPathLike | str, **kwargs: Any) -> Self: ...

def copy(self, target: _WT | SupportsPathLike | str, **kwargs: Any) -> _WT | UPath:
"""
Recursively copy this file or directory tree to the given destination.
"""
# to allow _copy_from to check if a path isfile AND isdir
# we need to disable s3fs's dircache mechanism because it
# currently implements a XOR relation the two for objects
# ref: fsspec/s3fs#999
sopts = dict(self.storage_options)
sopts["use_listings_cache"] = False
new_self = type(self)(
self.path,
protocol=self.protocol, # type: ignore
**sopts,
)
assert type(self) is type(new_self)
return super(type(new_self), new_self).copy(target, **kwargs)


class AzurePath(CloudPath):
__slots__ = ()
Expand Down
16 changes: 5 additions & 11 deletions upath/implementations/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ def rmdir(self, recursive: bool = UNSET_DEFAULT) -> None:
else:
shutil.rmtree(self)

# we need to override pathlib.Path._copy_from to support it as a
# WritablePath._copy_from target with support for on_name_collision
# Issue: https://github.com/barneygale/pathlib-abc/issues/48
_copy_from = UPath._copy_from

if sys.version_info < (3, 14): # noqa: C901

@overload
Expand Down Expand Up @@ -720,17 +725,6 @@ def chmod(
)
return super().chmod(mode)

if not hasattr(pathlib.Path, "_copy_from"):

def _copy_from(
self,
source: ReadablePath | LocalPath,
follow_symlinks: bool = True,
preserve_metadata: bool = False,
) -> None:
_copy_from: Any = WritablePath._copy_from.__get__(self)
_copy_from(source, follow_symlinks=follow_symlinks)


UPath.register(LocalPath)

Expand Down
20 changes: 15 additions & 5 deletions upath/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,19 @@ def docker_gcs():
pytest.skip("docker not installed")

container = "gcsfs_test"
cmd = (
"docker run -d -p 4443:4443 --name gcsfs_test fsouza/fake-gcs-server:latest -scheme " # noqa: E501
"http -public-host http://localhost:4443 -external-url http://localhost:4443" # noqa: E501
cmd = " ".join(
[
"docker",
"run",
"-d",
"-p 4443:4443",
"--name gcsfs_test",
"fsouza/fake-gcs-server:latest",
"-scheme http",
"-public-host http://localhost:4443",
"-external-url http://localhost:4443",
"-backend memory",
]
)
stop_docker(container)
subprocess.check_output(shlex.split(cmd))
Expand Down Expand Up @@ -385,8 +395,8 @@ def docker_azurite(azurite_credentials):
image = "mcr.microsoft.com/azure-storage/azurite"
container_name = "azure_test"
cmd = (
f"docker run --rm -d -p {AZURITE_PORT}:10000 --name {container_name} {image}" # noqa: E501
" azurite-blob --loose --blobHost 0.0.0.0" # noqa: E501
f"docker run --rm -d -p {AZURITE_PORT}:10000 --name {container_name} {image}:latest" # noqa: E501
" azurite-blob --loose --blobHost 0.0.0.0 --skipApiVersionCheck" # noqa: E501
)
url = f"http://localhost:{AZURITE_PORT}"

Expand Down
57 changes: 57 additions & 0 deletions upath/tests/implementations/test_azure.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import warnings

import fsspec
import pytest

from upath import UPath
Expand All @@ -7,6 +10,7 @@
from ..utils import OverrideMeta
from ..utils import extends_base
from ..utils import overrides_base
from ..utils import posixify
from ..utils import skip_on_windows


Expand Down Expand Up @@ -61,3 +65,56 @@ def test_broken_mkdir(self):

(path / "file").write_text("foo")
assert path.exists()


@skip_on_windows
@pytest.mark.xfail(reason="adlfs returns isdir false")
def test_copy__object_key_collides_with_dir_prefix(azurite_credentials, tmp_path):
account_name, connection_string = azurite_credentials
storage_options = {
"account_name": account_name,
"connection_string": connection_string,
}

az = fsspec.filesystem("az", **storage_options, use_listings_cache=False)
container = "copy-into-collision-container"
az.mkdir(container)
# store more objects with same prefix
az.pipe_file(f"{container}/src/common_prefix/file1.txt", b"1")
az.pipe_file(f"{container}/src/common_prefix/file2.txt", b"2")
# object under common prefix as key
az.pipe_file(f"{container}/src/common_prefix", b"hello world")
az.invalidate_cache()

# make sure the sources have a collision
assert az.isfile(f"{container}/src/common_prefix")
assert az.isdir(f"{container}/src/common_prefix")
assert az.isfile(f"{container}/src/common_prefix/file1.txt")
assert az.isfile(f"{container}/src/common_prefix/file2.txt")
# prepare source and destination
src = UPath(f"az://{container}/src", **storage_options)
dst = UPath(tmp_path)

def on_collision_rename_file(src, dst):
warnings.warn(
f"{src!s} collides with prefix. Renaming target file object to {dst!s}",
UserWarning,
stacklevel=3,
)
return (
dst.with_suffix(dst.suffix + ".COLLISION"),
dst,
)

# perform copy
src.copy_into(dst, on_name_collision=on_collision_rename_file)

# check results
dst_files = sorted(posixify(x.relative_to(tmp_path)) for x in dst.glob("**/*"))
assert dst_files == [
"src",
"src/common_prefix",
"src/common_prefix.COLLISION",
"src/common_prefix/file1.txt",
"src/common_prefix/file2.txt",
]
56 changes: 56 additions & 0 deletions upath/tests/implementations/test_gcs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import warnings

import fsspec
import pytest

Expand All @@ -8,6 +10,7 @@
from ..utils import OverrideMeta
from ..utils import extends_base
from ..utils import overrides_base
from ..utils import posixify
from ..utils import skip_on_windows


Expand Down Expand Up @@ -49,3 +52,56 @@ def test_mkdir_in_empty_bucket(docker_gcs):
endpoint_url=docker_gcs,
token="anon",
).parent.mkdir(parents=True, exist_ok=True)


@skip_on_windows
@pytest.mark.xfail(reason="gcsfs returns isdir false")
def test_copy__object_key_collides_with_dir_prefix(docker_gcs, tmp_path):
gcs = fsspec.filesystem(
"gcs",
endpoint_url=docker_gcs,
token="anon",
use_listings_cache=False,
)
bucket = "copy_into_collision_bucket"
gcs.mkdir(bucket)
# gcs.mkdir(bucket + "/src" + "/common_prefix/")
# object under common prefix as key
gcs.pipe_file(f"{bucket}/src/common_prefix", b"hello world")
# store more objects with same prefix
gcs.pipe_file(f"{bucket}/src/common_prefix/file1.txt", b"1")
gcs.pipe_file(f"{bucket}/src/common_prefix/file2.txt", b"2")
gcs.invalidate_cache()

# make sure the sources have a collision
assert gcs.isfile(f"{bucket}/src/common_prefix")
assert gcs.isdir(f"{bucket}/src/common_prefix") # BROKEN in gcsfs
assert gcs.isfile(f"{bucket}/src/common_prefix/file1.txt")
assert gcs.isfile(f"{bucket}/src/common_prefix/file2.txt")
# prepare source and destination
src = UPath(f"gs://{bucket}/src", endpoint_url=docker_gcs, token="anon")
dst = UPath(tmp_path)

def on_collision_rename_file(src, dst):
warnings.warn(
f"{src!s} collides with prefix. Renaming target file object to {dst!s}",
UserWarning,
stacklevel=3,
)
return (
dst.with_suffix(dst.suffix + ".COLLISION"),
dst,
)

# perform copy
src.copy_into(dst, on_name_collision=on_collision_rename_file)

# check results
dst_files = sorted(posixify(x.relative_to(tmp_path)) for x in dst.glob("**/*"))
assert dst_files == [
"src",
"src/common_prefix",
"src/common_prefix.COLLISION",
"src/common_prefix/file1.txt",
"src/common_prefix/file2.txt",
]
48 changes: 48 additions & 0 deletions upath/tests/implementations/test_s3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""see upath/tests/conftest.py for fixtures"""

import sys
import warnings

import fsspec
import pytest
Expand All @@ -12,6 +13,7 @@
from ..utils import OverrideMeta
from ..utils import extends_base
from ..utils import overrides_base
from ..utils import posixify


def silence_botocore_datetime_deprecation(cls):
Expand Down Expand Up @@ -165,3 +167,49 @@ def test_pathlib_consistent_join():
b1 = UPath("s3://mybucket/withkey").joinpath("subfolder/myfile.txt")
assert b0 == b1
assert "s3://mybucket/withkey/subfolder/myfile.txt" == str(b0) == str(b1)


def test_copy__object_key_collides_with_dir_prefix(s3_server, tmp_path):
anon, s3so = s3_server

s3 = fsspec.filesystem("s3", anon=anon, **{**s3so, "use_listings_cache": False})
bucket = "copy_into_collision_bucket"
s3.mkdir(bucket + "/src" + "/common_prefix/")
# object under common prefix as key
s3.pipe_file(f"{bucket}/src/common_prefix", b"hello world")
# store more objects with same prefix
s3.pipe_file(f"{bucket}/src/common_prefix/file1.txt", b"1")
s3.pipe_file(f"{bucket}/src/common_prefix/file2.txt", b"2")

# make sure the sources have a collision
assert s3.isdir(f"{bucket}/src/common_prefix")
assert s3.isfile(f"{bucket}/src/common_prefix")
assert s3.isfile(f"{bucket}/src/common_prefix/file1.txt")
assert s3.isfile(f"{bucket}/src/common_prefix/file2.txt")
# prepare source and destination
src = UPath(f"s3://{bucket}/src", anon=anon, **s3so)
dst = UPath(tmp_path)

def on_collision_rename_file(src, dst):
warnings.warn(
f"{src!s} collides with prefix. Renaming target file object to {dst!s}",
UserWarning,
stacklevel=3,
)
return (
dst.with_suffix(dst.suffix + ".COLLISION"),
dst,
)

# perform copy
src.copy_into(dst, on_name_collision=on_collision_rename_file)

# check results
dst_files = sorted(posixify(x.relative_to(tmp_path)) for x in dst.glob("**/*"))
assert dst_files == [
"src",
"src/common_prefix",
"src/common_prefix.COLLISION",
"src/common_prefix/file1.txt",
"src/common_prefix/file2.txt",
]
Loading