diff --git a/frontend/src/components/storage/__tests__/storage-inspector.test.ts b/frontend/src/components/storage/__tests__/storage-inspector.test.ts new file mode 100644 index 00000000000..f7754db22bd --- /dev/null +++ b/frontend/src/components/storage/__tests__/storage-inspector.test.ts @@ -0,0 +1,148 @@ +/* Copyright 2026 Marimo. All rights reserved. */ +import { describe, expect, it } from "vitest"; +import type { StorageEntry, StoragePathKey } from "@/core/storage/types"; +import { storagePathKey } from "@/core/storage/types"; +import { exportedForTesting } from "../storage-inspector"; + +const { filterEntries, remoteSearchPrefix } = exportedForTesting; + +function makeEntry( + overrides: Partial & { path: string }, +): StorageEntry { + return { + kind: overrides.kind ?? "file", + lastModified: overrides.lastModified ?? null, + metadata: overrides.metadata ?? {}, + path: overrides.path, + size: overrides.size ?? 0, + }; +} + +describe("storage inspector search", () => { + describe("remoteSearchPrefix", () => { + it("returns the parent directory of the query so backends can list it", () => { + // Object stores match prefixes on path segments, so we list the parent + // directory and filter the rest client-side. + expect(remoteSearchPrefix("folder/x")).toBe("folder/"); + expect(remoteSearchPrefix("nested/folder/x")).toBe("nested/folder/"); + }); + + it("preserves trailing slashes when the user explicitly typed one", () => { + expect(remoteSearchPrefix("folder/x/")).toBe("folder/x/"); + expect(remoteSearchPrefix("nested/folder/x/")).toBe("nested/folder/x/"); + expect(remoteSearchPrefix("/")).toBe("/"); + }); + + it("returns an empty string for queries without a directory component", () => { + // Without a slash there is nothing useful to send to the backend; rely on + // local filtering instead. + expect(remoteSearchPrefix("x")).toBe(""); + expect(remoteSearchPrefix("report")).toBe(""); + }); + + it("trims leading/trailing whitespace before computing the prefix", () => { + expect(remoteSearchPrefix(" nested/folder/x ")).toBe("nested/folder/"); + expect(remoteSearchPrefix(" ")).toBe(""); + }); + }); + + describe("filterEntries", () => { + it("matches loaded entries by basename, extension, and full path", () => { + const entries = [ + makeEntry({ path: "data/report.csv" }), + makeEntry({ path: "logs/readme.md" }), + ]; + + expect( + filterEntries({ + entries, + namespace: "store", + searchValue: ".csv", + entriesByPath: new Map(), + }), + ).toEqual([entries[0]]); + expect( + filterEntries({ + entries, + namespace: "store", + searchValue: "data/report", + entriesByPath: new Map(), + }), + ).toEqual([entries[0]]); + expect( + filterEntries({ + entries, + namespace: "store", + searchValue: "readme", + entriesByPath: new Map(), + }), + ).toEqual([entries[1]]); + }); + + it("returns the original entries when search is cleared", () => { + const entries = [ + makeEntry({ path: "data/report.csv" }), + makeEntry({ path: "logs/readme.md" }), + ]; + + expect( + filterEntries({ + entries, + namespace: "store", + searchValue: "", + entriesByPath: new Map(), + }), + ).toBe(entries); + }); + + it("keeps directories whose loaded descendants match a path query", () => { + const directory = makeEntry({ path: "data", kind: "directory" }); + const child = makeEntry({ path: "data/report.csv" }); + const entriesByPath = new Map([ + [storagePathKey("store", "data/"), [child]], + ]); + + expect( + filterEntries({ + entries: [directory], + namespace: "store", + searchValue: "data/report", + entriesByPath, + }), + ).toEqual([directory]); + }); + + it("matches partial path queries against loaded sibling files (folder/x -> folder/xsomething)", () => { + // Regression test for the obstore path-segment prefix bug: typing + // "folder/x" should surface "folder/xsomething" once the folder has + // been listed locally. + const folder = makeEntry({ path: "folder/", kind: "directory" }); + const matching = makeEntry({ path: "folder/xsomething" }); + const matchingAlt = makeEntry({ path: "folder/xanother" }); + const other = makeEntry({ path: "folder/something" }); + const entriesByPath = new Map([ + [storagePathKey("store", "folder/"), [other, matchingAlt, matching]], + ]); + + expect( + filterEntries({ + entries: [folder], + namespace: "store", + searchValue: "folder/x", + entriesByPath, + }), + ).toEqual([folder]); + + // Same query, but applied directly to the loaded folder contents (as + // happens for the "remote results" fallback) should drop non-matches. + expect( + filterEntries({ + entries: [other, matchingAlt, matching], + namespace: "store", + searchValue: "folder/x", + entriesByPath, + }), + ).toEqual([matchingAlt, matching]); + }); + }); +}); diff --git a/frontend/src/components/storage/storage-inspector.tsx b/frontend/src/components/storage/storage-inspector.tsx index b7e2c2b3c9e..0dd6ad05a18 100644 --- a/frontend/src/components/storage/storage-inspector.tsx +++ b/frontend/src/components/storage/storage-inspector.tsx @@ -7,6 +7,7 @@ import { FolderIcon, HardDriveIcon, HelpCircleIcon, + InfoIcon, LoaderCircle, PlusIcon, ViewIcon, @@ -43,10 +44,12 @@ import { useStorage, useStorageActions, useStorageEntries, + useStoragePageFetcher, } from "@/core/storage/state"; import type { StorageEntry, StorageNamespace, + StoragePageMetadata, StoragePathKey, } from "@/core/storage/types"; import { storagePathKey } from "@/core/storage/types"; @@ -94,28 +97,46 @@ function displayName(path: string): string { return parts[parts.length - 1] || trimmed; } +function directoryPrefix(path: string): string { + return path.endsWith("/") ? path : `${path}/`; +} + /** * Recursively check whether an entry (or any of its loaded descendants) * matches the search query. */ -function entryMatchesSearch( - entry: StorageEntry, - namespace: string, - searchValue: string, - entriesByPath: ReadonlyMap, -): boolean { - const query = searchValue.toLowerCase(); +function entryMatchesSearch({ + entry, + namespace, + searchValue, + entriesByPath, +}: { + entry: StorageEntry; + namespace: string; + searchValue: string; + entriesByPath: ReadonlyMap; +}): boolean { + const query = searchValue.trim().toLowerCase(); + const path = entry.path.toLowerCase(); + const name = displayName(entry.path).toLowerCase(); - if (displayName(entry.path).toLowerCase().includes(query)) { + if (name.includes(query) || path.includes(query)) { return true; } // For directories, check loaded children recursively if (entry.kind === "directory") { - const children = entriesByPath.get(storagePathKey(namespace, entry.path)); + const children = entriesByPath.get( + storagePathKey(namespace, directoryPrefix(entry.path)), + ); if (children) { return children.some((child) => - entryMatchesSearch(child, namespace, searchValue, entriesByPath), + entryMatchesSearch({ + entry: child, + namespace, + searchValue, + entriesByPath, + }), ); } } @@ -127,20 +148,156 @@ function entryMatchesSearch( * Filter entries to those matching the search (or having loaded descendants * that match). Returns all entries when there is no active search. */ -function filterEntries( - entries: StorageEntry[], - namespace: string, - searchValue: string, - entriesByPath: ReadonlyMap, -): StorageEntry[] { +function filterEntries({ + entries, + namespace, + searchValue, + entriesByPath, +}: { + entries: StorageEntry[]; + namespace: string; + searchValue: string; + entriesByPath: ReadonlyMap; +}): StorageEntry[] { if (!searchValue.trim()) { return entries; } return entries.filter((entry) => - entryMatchesSearch(entry, namespace, searchValue, entriesByPath), + entryMatchesSearch({ entry, namespace, searchValue, entriesByPath }), + ); +} + +const MAX_REMOTE_SEARCH_PAGES = 5; + +type RemoteSearchState = + | { query: string; status: "idle" } + | { query: string; status: "searching" } + | { query: string; status: "found" } + | { query: string; status: "exhausted" } + | { query: string; status: "capped" } + | { query: string; status: "error"; error: Error }; + +type RemoteSearchByNamespace = Record; + +function idleRemoteSearch(query: string): RemoteSearchState { + return { query, status: "idle" }; +} + +function canRetryRemoteSearch(remoteSearch: RemoteSearchState): boolean { + return ( + remoteSearch.status === "idle" || + remoteSearch.status === "error" || + remoteSearch.status === "capped" ); } +function canSearchMoreRemoteEntries({ + hasSearch, + hasLoadedMatches, + isPending, + remoteSearch, + searchKey, + entriesByPath, + pageMetadataByPath, +}: { + hasSearch: boolean; + hasLoadedMatches: boolean; + isPending: boolean; + remoteSearch: RemoteSearchState; + searchKey: StoragePathKey; + entriesByPath: ReadonlyMap; + pageMetadataByPath: ReadonlyMap; +}): boolean { + if (!hasSearch || hasLoadedMatches || isPending) { + return false; + } + if (!canRetryRemoteSearch(remoteSearch)) { + return false; + } + + return ( + entriesByPath.get(searchKey) === undefined || + pageMetadataByPath.get(searchKey)?.nextPageToken != null + ); +} + +/** + * Returns the directory prefix to query the backend with for a given search. + * + * Object stores like obstore evaluate prefixes on a path-segment basis + * (`folder/x` would only match `folder/x/...`, never `folder/xsomething`), so + * for substring searches we list the parent directory and filter on the + * client. Returns `""` when the search has no directory component. + */ +function remoteSearchPrefix(searchValue: string): string { + const trimmed = searchValue.trim(); + const lastSlash = trimmed.lastIndexOf("/"); + return lastSlash === -1 ? "" : trimmed.slice(0, lastSlash + 1); +} + +/** + * Shallow check (no recursion into loaded children) used inside the + * remote-search pagination loop to decide whether a fetched page has + * any candidates worth surfacing to the user. + */ +function entryMatchesQueryShallow( + entry: StorageEntry, + searchValue: string, +): boolean { + const query = searchValue.trim().toLowerCase(); + if (!query) { + return true; + } + return ( + entry.path.toLowerCase().includes(query) || + displayName(entry.path).toLowerCase().includes(query) + ); +} + +const LoadMoreStorageEntries: React.FC<{ + depth: number; + isLoading: boolean; + error?: Error; + onLoadMore: () => void; +}> = ({ depth, isLoading, error, onLoadMore }) => { + return ( +
+ + {error && ( + + Failed to load: {error.message} + + )} +
+ ); +}; + +const MayHaveMoreStorageEntries: React.FC<{ depth: number }> = ({ depth }) => { + return ( +
+ More files may exist in this folder. + + + +
+ ); +}; + /** * Lazily loaded children of a directory entry. * Caches fetched entries in the Jotai store so re-expanding doesn't re-fetch. @@ -171,6 +328,11 @@ const StorageEntryChildren: React.FC<{ entries: children, isPending, error, + hasMore, + mayHaveMore, + loadMore, + isLoadingMore, + loadMoreError, } = useStorageEntries(namespace, prefix); if (isPending) { @@ -204,12 +366,12 @@ const StorageEntryChildren: React.FC<{ ); } - const filtered = filterEntries( - children, + const filtered = filterEntries({ + entries: children, namespace, searchValue, entriesByPath, - ); + }); return ( <> @@ -227,6 +389,15 @@ const StorageEntryChildren: React.FC<{ onOpenFile={onOpenFile} /> ))} + {hasMore && ( + + )} + {mayHaveMore && } ); }; @@ -269,9 +440,14 @@ const StorageEntryRow: React.FC<{ isDir && hasSearch && !!entriesByPath - .get(storagePathKey(namespace, entry.path)) + .get(storagePathKey(namespace, directoryPrefix(entry.path))) ?.some((child) => - entryMatchesSearch(child, namespace, searchValue, entriesByPath), + entryMatchesSearch({ + entry: child, + namespace, + searchValue, + entriesByPath, + }), ); // Folder is shown expanded by manual toggle OR by search auto-expand @@ -418,7 +594,7 @@ const StorageEntryRow: React.FC<{ protocol={protocol} rootPath={rootPath} backendType={backendType} - prefix={entry.path} + prefix={directoryPrefix(entry.path)} depth={depth + 1} locale={locale} searchValue={selfMatches ? "" : searchValue} // When a parent directory matches the search, we don't need to filter the children. @@ -433,10 +609,19 @@ const StorageNamespaceSection: React.FC<{ namespace: StorageNamespace; locale: string; searchValue: string; + remoteSearch: RemoteSearchState; + onContinueRemoteSearch: () => void; onOpenFile: (info: OpenFileInfo) => void; -}> = ({ namespace, locale, searchValue, onOpenFile }) => { +}> = ({ + namespace, + locale, + searchValue, + remoteSearch, + onContinueRemoteSearch, + onOpenFile, +}) => { const [isExpanded, setIsExpanded] = useState(true); - const { entriesByPath } = useStorage(); + const { entriesByPath, pageMetadataByPath } = useStorage(); const { clearNamespaceCache } = useStorageActions(); const namespaceName = namespace.name ?? namespace.displayName; @@ -444,6 +629,11 @@ const StorageNamespaceSection: React.FC<{ entries: fetchedEntries, isPending, error, + hasMore, + mayHaveMore, + loadMore, + isLoadingMore, + loadMoreError, refetch, } = useStorageEntries(namespaceName); @@ -458,12 +648,108 @@ const StorageNamespaceSection: React.FC<{ // While loading, fall back to initial entries from the namespace notification const entries = isPending ? namespace.storageEntries : fetchedEntries; - const filtered = filterEntries( + const filtered = filterEntries({ entries, - namespaceName, + namespace: namespaceName, searchValue, entriesByPath, - ); + }); + const searchPrefix = remoteSearchPrefix(searchValue); + const searchKey = storagePathKey(namespaceName, searchPrefix); + const remoteEntries = + searchPrefix === "" ? [] : (entriesByPath.get(searchKey) ?? []); + // The fetched page is the whole parent directory; we still need to filter + // it by the full search query before showing entries to the user. + const filteredRemoteEntries = + remoteEntries.length > 0 + ? filterEntries({ + entries: remoteEntries, + namespace: namespaceName, + searchValue, + entriesByPath, + }) + : remoteEntries; + const hasSearch = !!searchValue.trim(); + const hasLoadedMatches = + filtered.length > 0 || filteredRemoteEntries.length > 0; + const canSearchMore = canSearchMoreRemoteEntries({ + hasSearch, + hasLoadedMatches, + isPending, + remoteSearch, + searchKey, + entriesByPath, + pageMetadataByPath, + }); + + const showRemoteResults = hasSearch && filtered.length === 0; + const statusRow = (() => { + if (isPending && entries.length === 0) { + return ( + + + Loading... + + ); + } + if (remoteSearch.status === "searching") { + return ( + + + Searching more entries... + + ); + } + if (remoteSearch.status === "error") { + return ( + + Search failed: {remoteSearch.error.message} + + ); + } + if (remoteSearch.status === "capped") { + return ( + + Searched more entries. + + (or press Enter) + + ); + } + if (remoteSearch.status === "exhausted") { + return "No matches"; + } + if (!hasSearch && !isPending && entries.length === 0 && !error) { + return "No entries"; + } + if (canSearchMore) { + return ( + + No loaded matches. + + (or press Enter) + + ); + } + if (hasSearch && !hasLoadedMatches && entries.length > 0) { + return "No matches"; + } + return null; + })(); return ( <> @@ -492,15 +778,6 @@ const StorageNamespaceSection: React.FC<{ {isExpanded && ( <> - {isPending && entries.length === 0 && ( -
- - Loading... -
- )} {error && entries.length === 0 && ( )} - {!isPending && entries.length === 0 && !error && ( -
- No entries -
- )} - {searchValue && filtered.length === 0 && entries.length > 0 && ( + {!error && statusRow && (
- No matches + {statusRow}
)} {filtered.map((entry) => ( @@ -539,6 +808,32 @@ const StorageNamespaceSection: React.FC<{ onOpenFile={onOpenFile} /> ))} + {showRemoteResults && + filteredRemoteEntries.map((entry) => ( + + ))} + {hasMore && !canSearchMore && ( + + )} + {mayHaveMore && !canSearchMore && ( + + )} )} @@ -546,11 +841,164 @@ const StorageNamespaceSection: React.FC<{ }; export const StorageInspector: React.FC = () => { - const { namespaces } = useStorage(); + const { namespaces, entriesByPath, pageMetadataByPath } = useStorage(); const { locale } = useLocale(); const [searchValue, setSearchValue] = useState(""); + const [remoteSearchByNamespace, setRemoteSearchByNamespace] = + useState({}); const [openFile, setOpenFile] = useState(null); + const fetchStoragePage = useStoragePageFetcher(); const hasSearch = !!searchValue.trim(); + const currentQuery = searchValue.trim(); + + const remoteSearchForNamespace = useCallback( + (namespaceName: string): RemoteSearchState => { + const remoteSearch = remoteSearchByNamespace[namespaceName]; + if (remoteSearch?.query === currentQuery) { + return remoteSearch; + } + return idleRemoteSearch(currentQuery); + }, + [currentQuery, remoteSearchByNamespace], + ); + + const setRemoteSearch = useCallback( + (namespaceName: string, remoteSearch: RemoteSearchState) => { + setRemoteSearchByNamespace((state) => ({ + ...state, + [namespaceName]: remoteSearch, + })); + }, + [], + ); + + const canContinueRemoteSearch = useCallback( + (namespace: StorageNamespace): boolean => { + if (!currentQuery) { + return false; + } + + const namespaceName = namespace.name ?? namespace.displayName; + const searchPrefix = remoteSearchPrefix(currentQuery); + // No directory component in the query - the user is doing a fuzzy + // search and the backend can't help; rely on local filtering instead. + if (searchPrefix === "") { + return false; + } + + const remoteSearch = remoteSearchForNamespace(namespaceName); + if (!canRetryRemoteSearch(remoteSearch)) { + return false; + } + + // Already surfacing matches from loaded entries? + const rootEntries = + entriesByPath.get(storagePathKey(namespaceName, "")) ?? + namespace.storageEntries; + const rootMatches = filterEntries({ + entries: rootEntries, + namespace: namespaceName, + searchValue: currentQuery, + entriesByPath, + }); + if (rootMatches.length > 0) { + return false; + } + + const searchKey = storagePathKey(namespaceName, searchPrefix); + const prefixEntries = entriesByPath.get(searchKey) ?? []; + const prefixMatches = filterEntries({ + entries: prefixEntries, + namespace: namespaceName, + searchValue: currentQuery, + entriesByPath, + }); + if (prefixMatches.length > 0) { + return false; + } + + const canFetchPrefix = + entriesByPath.get(searchKey) === undefined || + pageMetadataByPath.get(searchKey)?.nextPageToken != null; + return canFetchPrefix; + }, + [currentQuery, entriesByPath, pageMetadataByPath, remoteSearchForNamespace], + ); + + const continueRemoteSearch = useCallback( + async (namespace: StorageNamespace) => { + if (!canContinueRemoteSearch(namespace)) { + return; + } + + const query = currentQuery; + const namespaceName = namespace.name ?? namespace.displayName; + const prefix = remoteSearchPrefix(query); + const key = storagePathKey(namespaceName, prefix); + const cachedEntries = entriesByPath.get(key); + let nextPageToken = pageMetadataByPath.get(key)?.nextPageToken ?? null; + let hasFetchedAny = cachedEntries !== undefined; + + setRemoteSearch(namespaceName, { query, status: "searching" }); + try { + for (let page = 0; page < MAX_REMOTE_SEARCH_PAGES; page++) { + // First iteration with a stale cache hit needs no fetch; just check + // the cached page before paginating. + const shouldFetch = !hasFetchedAny || nextPageToken !== null; + let newEntries: StorageEntry[] = []; + if (shouldFetch) { + const result = await fetchStoragePage({ + namespace: namespaceName, + prefix, + pageToken: nextPageToken, + append: hasFetchedAny, + }); + newEntries = result.entries; + nextPageToken = result.next_page_token ?? null; + hasFetchedAny = true; + } + + const entriesToCheck = shouldFetch + ? newEntries + : (cachedEntries ?? []); + const hasMatches = entriesToCheck.some((entry) => + entryMatchesQueryShallow(entry, query), + ); + if (hasMatches) { + setRemoteSearch(namespaceName, { query, status: "found" }); + return; + } + + if (nextPageToken === null) { + setRemoteSearch(namespaceName, { query, status: "exhausted" }); + return; + } + } + setRemoteSearch(namespaceName, { query, status: "capped" }); + } catch (error) { + setRemoteSearch(namespaceName, { + query, + status: "error", + error: error instanceof Error ? error : new Error(String(error)), + }); + } + }, + [ + canContinueRemoteSearch, + currentQuery, + entriesByPath, + fetchStoragePage, + pageMetadataByPath, + setRemoteSearch, + ], + ); + + const continueRemoteSearches = useCallback(() => { + const searchableNamespaces = namespaces.filter(canContinueRemoteSearch); + for (const namespace of searchableNamespaces) { + void continueRemoteSearch(namespace); + } + }, [canContinueRemoteSearch, continueRemoteSearch, namespaces]); if (namespaces.length === 0) { return ( @@ -609,6 +1057,12 @@ export const StorageInspector: React.FC = () => { className="h-6 m-1" value={searchValue} onValueChange={setSearchValue} + onKeyDown={(event) => { + if (event.key === "Enter" && hasSearch) { + event.preventDefault(); + continueRemoteSearches(); + } + }} rootClassName="flex-1 border-b-0" /> {hasSearch && ( @@ -622,7 +1076,7 @@ export const StorageInspector: React.FC = () => { )} @@ -638,17 +1092,27 @@ export const StorageInspector: React.FC = () => { - {namespaces.map((ns) => ( - - ))} + {namespaces.map((ns) => { + const namespaceName = ns.name ?? ns.displayName; + return ( + void continueRemoteSearch(ns)} + onOpenFile={setOpenFile} + /> + ); + })} ); }; + +export const exportedForTesting = { + filterEntries, + remoteSearchPrefix, +}; diff --git a/frontend/src/core/storage/__tests__/state.test.ts b/frontend/src/core/storage/__tests__/state.test.ts index 997e3167896..aa613bd64ab 100644 --- a/frontend/src/core/storage/__tests__/state.test.ts +++ b/frontend/src/core/storage/__tests__/state.test.ts @@ -48,6 +48,7 @@ describe("storage state", () => { expect(state).toEqual({ namespaces: [], entriesByPath: new Map(), + pageMetadataByPath: new Map(), }); }); }); @@ -134,6 +135,65 @@ describe("storage state", () => { expect(state.entriesByPath.get("my_s3::data/")).toEqual(entries); }); + it("should store next page tokens keyed by namespace and prefix", () => { + const entries = [makeEntry({ path: "a.txt" })]; + + actions.setEntries({ + namespace: "my_s3", + prefix: "data/", + entries, + nextPageToken: "150", + }); + + expect(state.entriesByPath.get("my_s3::data/")).toEqual(entries); + expect(state.pageMetadataByPath.get("my_s3::data/")?.nextPageToken).toBe( + "150", + ); + }); + + it("should store may-have-more state keyed by namespace and prefix", () => { + const entries = [makeEntry({ path: "a.txt" })]; + + actions.setEntries({ + namespace: "my_s3", + prefix: "data/", + entries, + mayHaveMore: true, + }); + + expect(state.entriesByPath.get("my_s3::data/")).toEqual(entries); + expect(state.pageMetadataByPath.get("my_s3::data/")?.mayHaveMore).toBe( + true, + ); + }); + + it("should append entries when requested", () => { + const firstPage = [makeEntry({ path: "a.txt" })]; + const secondPage = [makeEntry({ path: "b.txt" })]; + + actions.setEntries({ + namespace: "my_s3", + prefix: "data/", + entries: firstPage, + nextPageToken: "150", + }); + actions.setEntries({ + namespace: "my_s3", + prefix: "data/", + entries: secondPage, + nextPageToken: null, + append: true, + }); + + expect(state.entriesByPath.get("my_s3::data/")).toEqual([ + ...firstPage, + ...secondPage, + ]); + expect( + state.pageMetadataByPath.get("my_s3::data/")?.nextPageToken, + ).toBeNull(); + }); + it("should use empty string for null prefix", () => { const entries = [makeEntry({ path: "root.txt" })]; @@ -251,6 +311,11 @@ describe("storage state", () => { expect(state.entriesByPath.get("my_s3::")).toBeUndefined(); expect(state.entriesByPath.get("my_s3::data/")).toBeUndefined(); expect(state.entriesByPath.get("my_s3::data/nested/")).toBeUndefined(); + expect(state.pageMetadataByPath.get("my_s3::")).toBeUndefined(); + expect(state.pageMetadataByPath.get("my_s3::data/")).toBeUndefined(); + expect( + state.pageMetadataByPath.get("my_s3::data/nested/"), + ).toBeUndefined(); }); it("should not affect entries from other namespaces", () => { diff --git a/frontend/src/core/storage/__tests__/types.test.ts b/frontend/src/core/storage/__tests__/types.test.ts index 61fe2b5e6c2..d37d60038f1 100644 --- a/frontend/src/core/storage/__tests__/types.test.ts +++ b/frontend/src/core/storage/__tests__/types.test.ts @@ -24,60 +24,94 @@ describe("storageNamespacePrefix", () => { describe("storageUrl", () => { it("should combine protocol, rootPath, and entryPath", () => { - expect(storageUrl("s3", "my-bucket", "data/file.csv")).toEqual( - new URL("s3://my-bucket/data/file.csv"), - ); + expect( + storageUrl({ + protocol: "s3", + rootPath: "my-bucket", + entryPath: "data/file.csv", + }), + ).toEqual(new URL("s3://my-bucket/data/file.csv")); }); it("should handle empty rootPath", () => { - expect(storageUrl("s3", "", "marimo-artifacts/file.csv")).toEqual( - new URL("s3://marimo-artifacts/file.csv"), - ); + expect( + storageUrl({ + protocol: "s3", + rootPath: "", + entryPath: "marimo-artifacts/file.csv", + }), + ).toEqual(new URL("s3://marimo-artifacts/file.csv")); }); it("should handle rootPath with trailing slash", () => { - expect(storageUrl("s3", "my-bucket/", "data/file.csv")).toEqual( - new URL("s3://my-bucket/data/file.csv"), - ); + expect( + storageUrl({ + protocol: "s3", + rootPath: "my-bucket/", + entryPath: "data/file.csv", + }), + ).toEqual(new URL("s3://my-bucket/data/file.csv")); }); it("should handle entryPath with leading slash", () => { - expect(storageUrl("s3", "my-bucket", "/data/file.csv")).toEqual( - new URL("s3://my-bucket/data/file.csv"), - ); + expect( + storageUrl({ + protocol: "s3", + rootPath: "my-bucket", + entryPath: "/data/file.csv", + }), + ).toEqual(new URL("s3://my-bucket/data/file.csv")); }); it("should collapse multiple slashes between rootPath and entryPath", () => { - expect(storageUrl("s3", "my-bucket/", "/data/file.csv")).toEqual( - new URL("s3://my-bucket/data/file.csv"), - ); + expect( + storageUrl({ + protocol: "s3", + rootPath: "my-bucket/", + entryPath: "/data/file.csv", + }), + ).toEqual(new URL("s3://my-bucket/data/file.csv")); }); it("should handle directory paths with trailing slash", () => { - expect(storageUrl("gcs", "my-bucket", "data/")).toEqual( - new URL("gcs://my-bucket/data/"), - ); + expect( + storageUrl({ + protocol: "gcs", + rootPath: "my-bucket", + entryPath: "data/", + }), + ).toEqual(new URL("gcs://my-bucket/data/")); }); it("should handle empty entryPath", () => { - expect(storageUrl("s3", "my-bucket", "")).toEqual( - new URL("s3://my-bucket"), - ); + expect( + storageUrl({ protocol: "s3", rootPath: "my-bucket", entryPath: "" }), + ).toEqual(new URL("s3://my-bucket")); }); it("should handle both rootPath and entryPath empty", () => { - expect(storageUrl("s3", "", "")).toEqual(new URL("s3://")); + expect(storageUrl({ protocol: "s3", rootPath: "", entryPath: "" })).toEqual( + new URL("s3://"), + ); }); it("should work with file protocol", () => { - expect(storageUrl("file", "/home/user", "docs/readme.md")).toEqual( - new URL("file:///home/user/docs/readme.md"), - ); + expect( + storageUrl({ + protocol: "file", + rootPath: "/home/user", + entryPath: "docs/readme.md", + }), + ).toEqual(new URL("file:///home/user/docs/readme.md")); }); it("should work with nested rootPath", () => { - expect(storageUrl("s3", "bucket/prefix", "sub/file.txt")).toEqual( - new URL("s3://bucket/prefix/sub/file.txt"), - ); + expect( + storageUrl({ + protocol: "s3", + rootPath: "bucket/prefix", + entryPath: "sub/file.txt", + }), + ).toEqual(new URL("s3://bucket/prefix/sub/file.txt")); }); }); diff --git a/frontend/src/core/storage/__tests__/useStorageEntries.test.tsx b/frontend/src/core/storage/__tests__/useStorageEntries.test.tsx index 8cd37b3434e..97ed5be3d6f 100644 --- a/frontend/src/core/storage/__tests__/useStorageEntries.test.tsx +++ b/frontend/src/core/storage/__tests__/useStorageEntries.test.tsx @@ -1,11 +1,15 @@ /* Copyright 2026 Marimo. All rights reserved. */ -import { renderHook, waitFor } from "@testing-library/react"; +import { act, renderHook, waitFor } from "@testing-library/react"; import { createStore, Provider } from "jotai"; import type { ReactNode } from "react"; import * as React from "react"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { storageAtom, useStorageEntries } from "../state"; +import { + storageAtom, + useStorageEntries, + useStoragePageFetcher, +} from "../state"; import type { StorageEntry, StorageState } from "../types"; const mockRequest = vi.fn(); @@ -175,13 +179,155 @@ describe("useStorageEntries", () => { it("should store fetched entries in the atom", async () => { const entries = [makeEntry({ path: "new.txt" })]; - mockRequest.mockResolvedValue({ entries }); + mockRequest.mockResolvedValue({ + entries, + next_page_token: "150", + may_have_more: true, + }); renderHook(() => useStorageEntries("ns", "sub/"), { wrapper }); await waitFor(() => { const state = store.get(storageAtom); expect(state.entriesByPath.get("ns::sub/")).toEqual(entries); + expect(state.pageMetadataByPath.get("ns::sub/")?.nextPageToken).toBe( + "150", + ); + expect(state.pageMetadataByPath.get("ns::sub/")?.mayHaveMore).toBe(true); + }); + }); + + it("should load more entries when a next page token exists", async () => { + const firstPage = [makeEntry({ path: "a.txt" })]; + const secondPage = [makeEntry({ path: "b.txt" })]; + mockRequest + .mockResolvedValueOnce({ + entries: firstPage, + next_page_token: "150", + may_have_more: true, + }) + .mockResolvedValueOnce({ + entries: secondPage, + next_page_token: null, + may_have_more: true, + }); + + const { result } = renderHook(() => useStorageEntries("ns", "sub/"), { + wrapper, + }); + + await waitFor(() => { + expect(result.current.entries).toEqual(firstPage); + }); + expect(result.current.hasMore).toBe(true); + expect(result.current.mayHaveMore).toBe(false); + + await act(async () => { + await result.current.loadMore(); + }); + + expect(result.current.entries).toEqual([...firstPage, ...secondPage]); + expect(result.current.hasMore).toBe(false); + expect(result.current.mayHaveMore).toBe(true); + expect(mockRequest).toHaveBeenLastCalledWith({ + namespace: "ns", + prefix: "sub/", + limit: 150, + pageToken: "150", + }); + }); + + it("should ignore duplicate load more calls while a page is loading", async () => { + const firstPage = [makeEntry({ path: "a.txt" })]; + const secondPage = [makeEntry({ path: "b.txt" })]; + let resolveLoadMore!: (value: { + entries: StorageEntry[]; + next_page_token: string | null; + may_have_more: boolean; + }) => void; + const loadMorePromise = new Promise<{ + entries: StorageEntry[]; + next_page_token: string | null; + may_have_more: boolean; + }>((resolve) => { + resolveLoadMore = resolve; + }); + mockRequest + .mockResolvedValueOnce({ + entries: firstPage, + next_page_token: "150", + may_have_more: true, + }) + .mockReturnValueOnce(loadMorePromise); + + const { result } = renderHook(() => useStorageEntries("ns", "sub/"), { + wrapper, + }); + + await waitFor(() => { + expect(result.current.entries).toEqual(firstPage); + }); + + await act(async () => { + const firstLoad = result.current.loadMore(); + const secondLoad = result.current.loadMore(); + + expect(mockRequest).toHaveBeenCalledTimes(2); + resolveLoadMore({ + entries: secondPage, + next_page_token: null, + may_have_more: false, + }); + await Promise.all([firstLoad, secondLoad]); + }); + + expect(result.current.entries).toEqual([...firstPage, ...secondPage]); + expect(mockRequest).toHaveBeenCalledTimes(2); + }); + + it("should fetch arbitrary storage pages", async () => { + const firstPage = [makeEntry({ path: "folder/a.txt" })]; + const secondPage = [makeEntry({ path: "folder/b.txt" })]; + mockRequest + .mockResolvedValueOnce({ + entries: firstPage, + next_page_token: "150", + may_have_more: false, + }) + .mockResolvedValueOnce({ + entries: secondPage, + next_page_token: null, + may_have_more: false, + }); + + const { result } = renderHook(() => useStoragePageFetcher(), { + wrapper, + }); + + await act(async () => { + await result.current({ + namespace: "ns", + prefix: "folder/", + }); + }); + await act(async () => { + await result.current({ + namespace: "ns", + prefix: "folder/", + pageToken: "150", + append: true, + }); + }); + + expect(store.get(storageAtom).entriesByPath.get("ns::folder/")).toEqual([ + ...firstPage, + ...secondPage, + ]); + expect(mockRequest).toHaveBeenLastCalledWith({ + namespace: "ns", + prefix: "folder/", + limit: 150, + pageToken: "150", }); }); }); diff --git a/frontend/src/core/storage/state.ts b/frontend/src/core/storage/state.ts index 1a49431b3db..7718c4780b1 100644 --- a/frontend/src/core/storage/state.ts +++ b/frontend/src/core/storage/state.ts @@ -1,11 +1,15 @@ /* Copyright 2026 Marimo. All rights reserved. */ import { atom, useAtomValue } from "jotai"; +import { useCallback, useRef, useState } from "react"; import { useAsyncData } from "@/hooks/useAsyncData"; import { createReducerAndAtoms } from "@/utils/createReducer"; import type { NotificationMessageData } from "../kernel/messages"; import type { VariableName } from "../variables/types"; -import { ListStorageEntries } from "./request-registry"; +import { + ListStorageEntries, + type StorageEntriesResult, +} from "./request-registry"; import type { StorageEntry, StorageState } from "./types"; import { DEFAULT_FETCH_LIMIT, @@ -18,6 +22,7 @@ function initialState(): StorageState { return { namespaces: [], entriesByPath: new Map(), + pageMetadataByPath: new Map(), }; } @@ -48,23 +53,50 @@ const { namespace: string; prefix: string | null | undefined; entries: StorageEntry[]; + nextPageToken?: string | null; + mayHaveMore?: boolean; + append?: boolean; }, ) => { const key = storagePathKey(opts.namespace, opts.prefix); const entriesByPath = new Map(state.entriesByPath); - entriesByPath.set(key, opts.entries); - return { ...state, entriesByPath }; + const entries = + opts.append && entriesByPath.has(key) + ? [...(entriesByPath.get(key) ?? []), ...opts.entries] + : opts.entries; + entriesByPath.set(key, entries); + + const pageMetadataByPath = new Map(state.pageMetadataByPath); + pageMetadataByPath.set(key, { + nextPageToken: opts.nextPageToken ?? null, + mayHaveMore: opts.mayHaveMore ?? false, + }); + return { + ...state, + entriesByPath, + pageMetadataByPath, + }; }, clearNamespaceCache: (state, namespace: string) => { const entriesByPath = new Map(state.entriesByPath); + const pageMetadataByPath = new Map(state.pageMetadataByPath); const prefix = storageNamespacePrefix(namespace); for (const key of entriesByPath.keys()) { if (key.startsWith(prefix)) { entriesByPath.delete(key); } } - return { ...state, entriesByPath }; + for (const key of pageMetadataByPath.keys()) { + if (key.startsWith(prefix)) { + pageMetadataByPath.delete(key); + } + } + return { + ...state, + entriesByPath, + pageMetadataByPath, + }; }, filterFromVariables: (state, variableNames: VariableName[]) => { @@ -93,38 +125,122 @@ export function useStorageActions() { export { storageAtom }; +async function fetchStorageEntriesPage({ + namespace, + prefix, + pageToken, + append, + setEntries, +}: { + namespace: string; + prefix: string | null | undefined; + pageToken?: string | null; + append?: boolean; + setEntries: ReturnType["setEntries"]; +}): Promise { + const result = await ListStorageEntries.request({ + namespace, + prefix: prefix ?? ROOT_PATH, + limit: DEFAULT_FETCH_LIMIT, + ...(pageToken ? { pageToken } : {}), + }); + if (result.error) { + throw new Error(result.error); + } + setEntries({ + namespace, + prefix, + entries: result.entries, + nextPageToken: result.next_page_token, + mayHaveMore: result.may_have_more, + append, + }); + return result; +} + /** * Hook that fetches and caches storage entries for a given namespace/prefix. * Entries are fetched on first access and cached in the store for subsequent renders. */ export function useStorageEntries(namespace: string, prefix?: string) { - const { entriesByPath } = useStorage(); + const { entriesByPath, pageMetadataByPath } = useStorage(); const { setEntries } = useStorageActions(); - const cached = entriesByPath.get(storagePathKey(namespace, prefix)); + const key = storagePathKey(namespace, prefix); + const cached = entriesByPath.get(key); + const metadata = pageMetadataByPath.get(key); + const nextPageToken = metadata?.nextPageToken ?? null; + const mayHaveMore = metadata?.mayHaveMore ?? false; + const isLoadingMoreRef = useRef(false); + const [isLoadingMore, setIsLoadingMore] = useState(false); + const [loadMoreError, setLoadMoreError] = useState(); const { isPending, error, refetch } = useAsyncData(async () => { if (cached) { return; } - const result = await ListStorageEntries.request({ + await fetchStorageEntriesPage({ namespace, - prefix: prefix ?? ROOT_PATH, - limit: DEFAULT_FETCH_LIMIT, + prefix, + setEntries, }); - if (result.error) { - throw new Error(result.error); - } - setEntries({ namespace, prefix, entries: result.entries }); }, [namespace, prefix, cached === undefined]); + const loadMore = useCallback(async () => { + if (!nextPageToken || isLoadingMoreRef.current) { + return; + } + + isLoadingMoreRef.current = true; + setIsLoadingMore(true); + setLoadMoreError(undefined); + try { + await fetchStorageEntriesPage({ + namespace, + prefix, + pageToken: nextPageToken, + append: true, + setEntries, + }); + } catch (error) { + setLoadMoreError( + error instanceof Error ? error : new Error(String(error)), + ); + } finally { + isLoadingMoreRef.current = false; + setIsLoadingMore(false); + } + }, [namespace, prefix, nextPageToken, setEntries]); + return { entries: cached ?? [], isPending: isPending && !cached, error: cached ? undefined : error, + hasMore: nextPageToken !== null, + mayHaveMore: nextPageToken === null && mayHaveMore, + loadMore, + isLoadingMore, + loadMoreError, refetch, }; } +export function useStoragePageFetcher() { + const { setEntries } = useStorageActions(); + return useCallback( + (opts: { + namespace: string; + prefix: string | null | undefined; + pageToken?: string | null; + append?: boolean; + }) => + fetchStorageEntriesPage({ + ...opts, + setEntries, + }), + [setEntries], + ); +} + export const exportedForTesting = { reducer, createActions, diff --git a/frontend/src/core/storage/types.ts b/frontend/src/core/storage/types.ts index f8514c947a3..b893be2c0d0 100644 --- a/frontend/src/core/storage/types.ts +++ b/frontend/src/core/storage/types.ts @@ -30,13 +30,24 @@ export interface StorageState { namespaces: StorageNamespace[]; /** Lazy-loaded entries keyed by "namespace::prefix" */ entriesByPath: ReadonlyMap; + /** Pagination metadata keyed by "namespace::prefix" */ + pageMetadataByPath: ReadonlyMap; } -export function storageUrl( - protocol: string, - rootPath: string, - entryPath: string, -): URL { +export interface StoragePageMetadata { + nextPageToken: string | null; + mayHaveMore: boolean; +} + +export function storageUrl({ + protocol, + rootPath, + entryPath, +}: { + protocol: string; + rootPath: string; + entryPath: string; +}): URL { const parts = [rootPath, entryPath].filter(Boolean); const path = parts.join("/").replaceAll(/\/+/g, "/"); return new URL(`${protocol}://${path}`); diff --git a/marimo/_data/_external_storage/models.py b/marimo/_data/_external_storage/models.py index 9d3257fccc3..13435c9ff9c 100644 --- a/marimo/_data/_external_storage/models.py +++ b/marimo/_data/_external_storage/models.py @@ -60,6 +60,15 @@ class StorageNamespace(msgspec.Struct, rename="camel"): DEFAULT_FETCH_LIMIT = 100 +@dataclass +class StorageListResult: + """Result of listing entries from a storage backend.""" + + entries: list[StorageEntry] + next_page_token: str | None = None + may_have_more: bool = False + + @dataclass class DownloadResult: """Result of downloading a file from external storage. @@ -90,9 +99,11 @@ def list_entries( prefix: str | None, *, limit: int = DEFAULT_FETCH_LIMIT, - ) -> list[StorageEntry]: + page_token: str | None = None, + ) -> StorageListResult: """ - List the entries at the given prefix. If no prefix is provided, list the root entries. + List one page of entries at the given prefix. If no prefix is provided, + list the root entries. """ @abc.abstractmethod diff --git a/marimo/_data/_external_storage/storage.py b/marimo/_data/_external_storage/storage.py index 65199dbc6df..37268337e2c 100644 --- a/marimo/_data/_external_storage/storage.py +++ b/marimo/_data/_external_storage/storage.py @@ -15,16 +15,19 @@ BackendType, StorageBackend, StorageEntry, + StorageListResult, ) from marimo._dependencies.dependencies import DependencyManager from marimo._utils.assert_never import log_never from marimo._utils.dicts import remove_none_values if TYPE_CHECKING: + from collections.abc import Sequence + from fsspec import ( # type: ignore[import-untyped] AbstractFileSystem, # noqa: F401 ) - from obstore import ObjectMeta + from obstore import ListResult, ObjectMeta from obstore.store import ( AzureConfig, AzureStore, @@ -44,7 +47,20 @@ def list_entries( prefix: str | None, *, limit: int = DEFAULT_FETCH_LIMIT, - ) -> list[StorageEntry]: + page_token: str | None = None, + ) -> StorageListResult: + offset = _parse_page_offset(page_token) + storage_entries, may_have_more = self._list_storage_entries(prefix) + return _paginate_entries( + storage_entries, + offset=offset, + limit=limit, + may_have_more=may_have_more, + ) + + def _list_storage_entries( + self, prefix: str | None + ) -> tuple[list[StorageEntry], bool]: result = self.store.list_with_delimiter(prefix=prefix) storage_entries: list[StorageEntry] = [] @@ -71,15 +87,18 @@ def list_entries( continue storage_entries.append(self._create_storage_entry(entry)) - if len(storage_entries) > limit: - LOGGER.debug( - "Fetched %s entries, but limiting to %s", - len(storage_entries), - limit, - ) - storage_entries = storage_entries[:limit] + return storage_entries, self._result_may_have_more(result) - return storage_entries + def _result_may_have_more( + self, + result: ListResult[Sequence[ObjectMeta]], + ) -> bool: + """Return whether an obstore delimiter listing may be provider-truncated.""" + # Object stores commonly cap delimiter listings at 1000 returned entries. + # https://docs.rs/object_store/latest/object_store/struct.ListResult.html + LIMIT_ENTRIES = 1000 + entry_count = len(result["common_prefixes"]) + len(result["objects"]) + return entry_count >= LIMIT_ENTRIES async def get_entry(self, path: str) -> StorageEntry: entry = await self.store.head_async(path) @@ -247,7 +266,13 @@ def list_entries( prefix: str | None, *, limit: int = DEFAULT_FETCH_LIMIT, - ) -> list[StorageEntry]: + page_token: str | None = None, + ) -> StorageListResult: + offset = _parse_page_offset(page_token) + entries = self._list_storage_entries(prefix) + return _paginate_entries(entries, offset=offset, limit=limit) + + def _list_storage_entries(self, prefix: str | None) -> list[StorageEntry]: # If no prefix provided, we use empty string to list root entries # Else, an error is raised if prefix is None: @@ -263,15 +288,6 @@ def list_entries( self._invalidate_listing_cache_for_prefix(prefix) files = self._list_files(prefix) - total_files = len(files) - if total_files > limit: - LOGGER.debug( - "Fetched %s files, but limiting to %s", - total_files, - limit, - ) - files = files[:limit] - storage_entries = [] for file in files: if isinstance(file, dict): @@ -511,3 +527,44 @@ def detect_protocol_from_url(url: str) -> CLOUD_STORAGE_TYPES | None: def normalize_protocol(protocol: str) -> KNOWN_STORAGE_TYPES | None: """Normalize a protocol string (e.g. 's3a', 'gs', 'abfs') to a known storage type.""" return _PROTOCOL_MAP.get(protocol.strip().lower()) + + +def _parse_page_offset(page_token: str | None) -> int: + if page_token is None: + return 0 + try: + offset = int(page_token) + except ValueError as exc: + raise ValueError(f"Invalid storage page token: {page_token}") from exc + if offset < 0: + raise ValueError(f"Invalid storage page token: {page_token}") + return offset + + +def _paginate_entries( + entries: list[StorageEntry], + *, + offset: int, + limit: int, + may_have_more: bool = False, +) -> StorageListResult: + if limit < 1: + raise ValueError("Storage list limit must be positive") + + total_entries = len(entries) + if total_entries > limit: + LOGGER.debug( + "Fetched %s entries, returning page offset %s with limit %s", + total_entries, + offset, + limit, + ) + + end = offset + limit + has_next_page = end < total_entries + next_page_token = str(end) if has_next_page else None + return StorageListResult( + entries=entries[offset:end], + next_page_token=next_page_token, + may_have_more=may_have_more, + ) diff --git a/marimo/_messaging/notification.py b/marimo/_messaging/notification.py index 15fd418a9ed..fb8d2969429 100644 --- a/marimo/_messaging/notification.py +++ b/marimo/_messaging/notification.py @@ -655,6 +655,9 @@ class StorageEntriesNotification(Notification, tag="storage-entries"): namespace: Variable name of the storage backend. prefix: The prefix that was listed (set by list_entries). query: The search query that was used (set by search). + next_page_token: Token for fetching the next page of entries. + may_have_more: Whether the backend may have more entries it cannot + currently page through. error: Error message if the operation failed. """ @@ -664,6 +667,8 @@ class StorageEntriesNotification(Notification, tag="storage-entries"): namespace: str prefix: str | None = None query: str | None = None + next_page_token: str | None = None + may_have_more: bool = False error: str | None = None diff --git a/marimo/_runtime/callbacks/external_storage.py b/marimo/_runtime/callbacks/external_storage.py index d76347bc139..c971821ade5 100644 --- a/marimo/_runtime/callbacks/external_storage.py +++ b/marimo/_runtime/callbacks/external_storage.py @@ -6,7 +6,10 @@ from typing import TYPE_CHECKING, Any from marimo import _loggers -from marimo._data._external_storage.models import StorageBackend, StorageEntry +from marimo._data._external_storage.models import ( + StorageBackend, + StorageListResult, +) from marimo._messaging.notification import ( StorageDownloadReadyNotification, StorageEntriesNotification, @@ -92,19 +95,23 @@ async def list_entries(self, request: StorageListEntriesCommand) -> None: return # list_entries is synchronous, so we wrap it in asyncio.to_thread - def list_entries() -> list[StorageEntry]: + def list_entries() -> StorageListResult: return backend.list_entries( - prefix=request.prefix, limit=request.limit + prefix=request.prefix, + limit=request.limit, + page_token=request.page_token, ) try: - entries = await asyncio.to_thread(list_entries) + result = await asyncio.to_thread(list_entries) broadcast_notification( StorageEntriesNotification( request_id=request.request_id, - entries=entries, + entries=result.entries, namespace=request.namespace, prefix=request.prefix, + next_page_token=result.next_page_token, + may_have_more=result.may_have_more, ), ) except Exception as e: diff --git a/marimo/_runtime/commands.py b/marimo/_runtime/commands.py index 32d6397a7b7..0b860176c91 100644 --- a/marimo/_runtime/commands.py +++ b/marimo/_runtime/commands.py @@ -757,12 +757,14 @@ class StorageListEntriesCommand(Command): namespace: Variable name identifying the storage backend. limit: Max entries to return. prefix: Path prefix to list (None = root). + page_token: Token for the next page of entries. """ request_id: RequestId namespace: str limit: int prefix: str | None = None + page_token: str | None = None class StorageDownloadCommand(Command): diff --git a/marimo/_server/models/models.py b/marimo/_server/models/models.py index c21a652adac..f97c191188c 100644 --- a/marimo/_server/models/models.py +++ b/marimo/_server/models/models.py @@ -155,6 +155,7 @@ def as_command(self) -> StorageListEntriesCommand: namespace=self.namespace, limit=self.limit, prefix=self.prefix, + page_token=self.page_token, ) diff --git a/packages/openapi/api.yaml b/packages/openapi/api.yaml index 91ca46def5c..7f918f1bd20 100644 --- a/packages/openapi/api.yaml +++ b/packages/openapi/api.yaml @@ -4867,7 +4867,10 @@ components: \ entries returned by the operation.\n namespace: Variable name of\ \ the storage backend.\n prefix: The prefix that was listed (set by\ \ list_entries).\n query: The search query that was used (set by search).\n\ - \ error: Error message if the operation failed." + \ next_page_token: Token for fetching the next page of entries.\n \ + \ may_have_more: Whether the backend may have more entries it cannot\n\ + \ currently page through.\n error: Error message if the\ + \ operation failed." properties: entries: items: @@ -4878,8 +4881,16 @@ components: - type: string - type: 'null' default: null + may_have_more: + default: false + type: boolean namespace: type: string + next_page_token: + anyOf: + - type: string + - type: 'null' + default: null op: enum: - storage-entries @@ -4944,12 +4955,18 @@ components: \ and virtual directories at one level.\n\n Attributes:\n request_id:\ \ Unique identifier for this request.\n namespace: Variable name identifying\ \ the storage backend.\n limit: Max entries to return.\n prefix:\ - \ Path prefix to list (None = root)." + \ Path prefix to list (None = root).\n page_token: Token for the next\ + \ page of entries." properties: limit: type: integer namespace: type: string + pageToken: + anyOf: + - type: string + - type: 'null' + default: null prefix: anyOf: - type: string @@ -4973,6 +4990,11 @@ components: type: integer namespace: type: string + pageToken: + anyOf: + - type: string + - type: 'null' + default: null prefix: anyOf: - type: string diff --git a/packages/openapi/src/api.ts b/packages/openapi/src/api.ts index 3cf49d21074..acbfdc88529 100644 --- a/packages/openapi/src/api.ts +++ b/packages/openapi/src/api.ts @@ -6334,13 +6334,20 @@ export interface components { * namespace: Variable name of the storage backend. * prefix: The prefix that was listed (set by list_entries). * query: The search query that was used (set by search). + * next_page_token: Token for fetching the next page of entries. + * may_have_more: Whether the backend may have more entries it cannot + * currently page through. * error: Error message if the operation failed. */ StorageEntriesNotification: { entries: components["schemas"]["StorageEntry"][]; /** @default null */ error?: string | null; + /** @default false */ + may_have_more?: boolean; namespace: string; + /** @default null */ + next_page_token?: string | null; /** @enum {unknown} */ op: "storage-entries"; /** @default null */ @@ -6384,11 +6391,14 @@ export interface components { * namespace: Variable name identifying the storage backend. * limit: Max entries to return. * prefix: Path prefix to list (None = root). + * page_token: Token for the next page of entries. */ StorageListEntriesCommand: { limit: number; namespace: string; /** @default null */ + pageToken?: string | null; + /** @default null */ prefix?: string | null; requestId: string; /** @enum {unknown} */ @@ -6399,6 +6409,8 @@ export interface components { limit: number; namespace: string; /** @default null */ + pageToken?: string | null; + /** @default null */ prefix?: string | null; requestId: components["schemas"]["RequestId"]; }; diff --git a/tests/_data/_external_storage/test_storage_models.py b/tests/_data/_external_storage/test_storage_models.py index e1e0efc4ac2..48f2ad9c958 100644 --- a/tests/_data/_external_storage/test_storage_models.py +++ b/tests/_data/_external_storage/test_storage_models.py @@ -10,7 +10,11 @@ from dirty_equals import IsDatetime, IsPositiveFloat from inline_snapshot import snapshot -from marimo._data._external_storage.models import DownloadResult, StorageEntry +from marimo._data._external_storage.models import ( + DownloadResult, + StorageEntry, + StorageListResult, +) from marimo._data._external_storage.storage import ( FsspecFilesystem, Obstore, @@ -58,7 +62,7 @@ def test_list_entries(self) -> None: mock_store.list_with_delimiter.assert_called_once_with( prefix="some/prefix", ) - assert result == snapshot( + assert result.entries == snapshot( [ StorageEntry( path="subdir/", @@ -113,7 +117,7 @@ def test_list_entries_skips_zero_byte_folder_marker(self) -> None: backend = self._make_backend(mock_store) result = backend.list_entries(prefix="folder") - assert result == [ + assert result.entries == [ StorageEntry( path="folder/order_details.csv", kind="object", @@ -133,7 +137,91 @@ def test_list_entries_empty(self) -> None: backend = self._make_backend(mock_store) result = backend.list_entries(prefix=None) - assert result == [] + assert result.entries == [] + + def test_list_entries_returns_next_page_token(self) -> None: + now = datetime.now(tz=timezone.utc) + mock_store = MagicMock() + mock_store.list_with_delimiter.return_value = { + "common_prefixes": ["a/", "b/"], + "objects": [ + { + "path": "c.txt", + "size": 1, + "last_modified": now, + "e_tag": None, + "version": None, + }, + ], + } + + backend = self._make_backend(mock_store) + + assert backend.list_entries(prefix=None, limit=2) == snapshot( + StorageListResult( + entries=[ + StorageEntry( + path="a/", + kind="directory", + size=0, + last_modified=None, + metadata={}, + mime_type=None, + ), + StorageEntry( + path="b/", + kind="directory", + size=0, + last_modified=None, + metadata={}, + mime_type=None, + ), + ], + next_page_token="2", + ) + ) + + assert backend.list_entries( + prefix=None, limit=2, page_token="2" + ) == snapshot( + StorageListResult( + entries=[ + StorageEntry( + path="c.txt", + kind="object", + size=1, + last_modified=now.timestamp(), + metadata={}, + mime_type="text/plain", + ), + ], + next_page_token=None, + ) + ) + + def test_list_entries_warns_on_provider_boundary(self) -> None: + now = datetime.now(tz=timezone.utc) + mock_store = MagicMock() + mock_store.list_with_delimiter.return_value = { + "common_prefixes": [], + "objects": [ + { + "path": f"file{i}.txt", + "size": i, + "last_modified": now, + "e_tag": None, + "version": None, + } + for i in range(1000) + ], + } + + backend = self._make_backend(mock_store) + result = backend.list_entries(prefix=None, limit=100, page_token="900") + + assert result.next_page_token is None + assert result.may_have_more is True + assert len(result.entries) == 100 def test_create_storage_entry_missing_fields(self) -> None: mock_store = MagicMock() @@ -586,7 +674,7 @@ def test_list_entries(self) -> None: result = backend.list_entries(prefix="some/path") mock_store.ls.assert_called_once_with(path="some/path", detail=True) - assert result == snapshot( + assert result.entries == snapshot( [ StorageEntry( path="file1.txt", @@ -646,7 +734,7 @@ def test_list_entries_retries_when_self_entry_detected(self) -> None: assert mock_store.ls.call_count == 2 assert "" not in mock_store.dircache assert "folder" not in mock_store.dircache - assert result == snapshot( + assert result.entries == snapshot( [ StorageEntry( path="folder/file.txt", @@ -690,7 +778,7 @@ def test_list_entries_returns_multi_file_list_without_self_scan( result = backend.list_entries(prefix="folder") mock_store.ls.assert_called_once_with(path="folder", detail=True) - assert result == snapshot( + assert result.entries == snapshot( [ StorageEntry( path="folder", @@ -735,7 +823,7 @@ def test_list_entries_respects_limit(self) -> None: backend = self._make_backend(mock_store) result = backend.list_entries(prefix="", limit=3) - assert result == snapshot( + assert result.entries == snapshot( [ StorageEntry( path="file0.txt", @@ -764,6 +852,63 @@ def test_list_entries_respects_limit(self) -> None: ] ) + def test_list_entries_returns_next_page_token(self) -> None: + mock_store = MagicMock() + files = [ + { + "name": f"file{i}.txt", + "size": i, + "type": "file", + "mtime": None, + } + for i in range(3) + ] + mock_store.ls.return_value = files + + backend = self._make_backend(mock_store) + + assert backend.list_entries(prefix="", limit=2) == snapshot( + StorageListResult( + entries=[ + StorageEntry( + path="file0.txt", + kind="file", + size=0, + last_modified=None, + metadata={}, + mime_type="text/plain", + ), + StorageEntry( + path="file1.txt", + kind="file", + size=1, + last_modified=None, + metadata={}, + mime_type="text/plain", + ), + ], + next_page_token="2", + ) + ) + + assert backend.list_entries( + prefix="", limit=2, page_token="2" + ) == snapshot( + StorageListResult( + entries=[ + StorageEntry( + path="file2.txt", + kind="file", + size=2, + last_modified=None, + metadata={}, + mime_type="text/plain", + ), + ], + next_page_token=None, + ) + ) + def test_list_entries_raises_on_non_list(self) -> None: mock_store = MagicMock() mock_store.ls.return_value = "not_a_list" @@ -783,7 +928,7 @@ def test_list_entries_skips_non_dict_entries(self) -> None: backend = self._make_backend(mock_store) result = backend.list_entries(prefix="") - assert result == snapshot( + assert result.entries == snapshot( [ StorageEntry( path="good.txt", @@ -1104,7 +1249,7 @@ async def test_list_and_download_with_memory_fs(self) -> None: backend = FsspecFilesystem(fs, VariableName("mem_fs")) entries = backend.list_entries(prefix="/test") - assert entries == snapshot( + assert entries.entries == snapshot( [ StorageEntry( path="/test/hello.txt", @@ -1228,7 +1373,7 @@ async def test_list_entries_with_memory_store(self) -> None: backend = Obstore(store, VariableName("mem_store")) entries = backend.list_entries(prefix="test/") - assert entries == snapshot( + assert entries.entries == snapshot( [ StorageEntry( path="test/file1.txt", diff --git a/tests/_runtime/test_runtime_external_storage.py b/tests/_runtime/test_runtime_external_storage.py index 7268072cfe1..416942d625e 100644 --- a/tests/_runtime/test_runtime_external_storage.py +++ b/tests/_runtime/test_runtime_external_storage.py @@ -8,7 +8,10 @@ import pytest from dirty_equals import IsPositiveFloat, IsStr -from marimo._data._external_storage.models import StorageEntry +from marimo._data._external_storage.models import ( + StorageEntry, + StorageListResult, +) from marimo._data._external_storage.storage import Obstore from marimo._dependencies.dependencies import DependencyManager from marimo._messaging.notification import ( @@ -290,6 +293,40 @@ async def test_list_entries_with_limit( ], namespace=STORAGE_VAR, prefix=None, + next_page_token="2", + ) + ] + + request = StorageListEntriesCommand( + request_id=RequestId("req-11-page-2"), + namespace=STORAGE_VAR, + limit=2, + prefix=None, + page_token="2", + ) + await k.handle_message(request) + + results = [ + op + for op in stream.operations + if isinstance(op, StorageEntriesNotification) + and op.request_id == RequestId("req-11-page-2") + ] + assert results == [ + StorageEntriesNotification( + request_id=RequestId("req-11-page-2"), + entries=[ + StorageEntry( + path="c.txt", + kind="object", + size=1, + last_modified=IsPositiveFloat(), # pyright: ignore[reportArgumentType] + metadata={"e_tag": IsStr()}, + mime_type="text/plain", + ), + ], + namespace=STORAGE_VAR, + prefix=None, ) ] @@ -413,11 +450,14 @@ async def test_list_entries_runs_in_background_thread( original_list = real_backend.list_entries def spy_list_entries( - prefix: str | None, *, limit: int = 100 - ) -> list[StorageEntry]: + prefix: str | None, + *, + limit: int = 100, + page_token: str | None = None, + ) -> StorageListResult: nonlocal call_thread_name call_thread_name = threading.current_thread().name - return original_list(prefix, limit=limit) + return original_list(prefix, limit=limit, page_token=page_token) real_backend.list_entries = spy_list_entries # type: ignore[method-assign]