diff --git a/common/data_source/webdav_connector.py b/common/data_source/webdav_connector.py index 6ea6558ad5..8cdd295794 100644 --- a/common/data_source/webdav_connector.py +++ b/common/data_source/webdav_connector.py @@ -3,6 +3,7 @@ import logging import os from datetime import datetime, timezone from typing import Any, Optional +from urllib.parse import urlsplit from webdav4.client import Client as WebDAVClient @@ -60,6 +61,49 @@ class WebDAVConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync): file_ext = get_file_ext(file_name) return is_accepted_file_ext(file_ext, self._build_extension_type()) + @staticmethod + def _coerce_size_bytes(size_bytes: Any) -> int | None: + if isinstance(size_bytes, bool): + return None + if isinstance(size_bytes, int): + return size_bytes if size_bytes >= 0 else None + if isinstance(size_bytes, str): + size_text = size_bytes.strip() + if not size_text or len(size_text) > 20 or not size_text.isdecimal(): + return None + parsed_size = int(size_text) + return parsed_size if parsed_size >= 0 else None + return None + + @classmethod + def _get_size_bytes(cls, file_info: dict[str, Any]) -> int | None: + # webdav4's Client.ls(detail=True) reports the size under "content_length" + # (see webdav4.multistatus.Response.as_dict); other servers/libraries or + # webdav4's fsspec wrapper may instead use "size" or the raw + # "getcontentlength" property. Try each so the size guard isn't silently + # skipped — otherwise file_info.get("size") is always None and every file + # trips the missing-metadata warning. + for key in ("size", "content_length", "getcontentlength"): + if key not in file_info: + continue + size_bytes = cls._coerce_size_bytes(file_info[key]) + if size_bytes is not None: + return size_bytes + return None + + @staticmethod + def _get_log_file_identifier(file_info: dict[str, Any], fallback_path: str) -> str: + raw_identifier = str(file_info.get("name") or file_info.get("href") or fallback_path) + try: + parsed_identifier = urlsplit(raw_identifier) + identifier_path = parsed_identifier.path if parsed_identifier.scheme else raw_identifier + except ValueError: + identifier_path = fallback_path if "://" not in fallback_path else "" + identifier_path = identifier_path.split("?", 1)[0].split("#", 1)[0] + fallback_identifier = "" if "://" in fallback_path else os.path.basename(fallback_path.rstrip("/")) + identifier = os.path.basename(identifier_path.rstrip("/")) or fallback_identifier or "" + return identifier.encode("unicode_escape").decode("ascii") + def set_allow_images(self, allow_images: bool) -> None: """Set whether to process images""" logging.info(f"Setting allow_images to {allow_images}.") @@ -228,14 +272,23 @@ class WebDAVConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync): logging.debug(f"Skipping file {file_path} due to unsupported extension.") continue - size_bytes = file_info.get('size', 0) + size_bytes = self._get_size_bytes(file_info) + if self.size_threshold is not None and size_bytes is None: + file_identifier = self._get_log_file_identifier(file_info, file_path) + logging.warning( + f"{file_identifier}: size metadata missing from WebDAV server response, " + f"skipping to avoid processing potentially large files." + ) + continue if ( self.size_threshold is not None - and isinstance(size_bytes, int) + and size_bytes is not None and size_bytes > self.size_threshold ): + file_identifier = self._get_log_file_identifier(file_info, file_path) logging.warning( - f"{file_name} exceeds size threshold of {self.size_threshold}. Skipping." + f"{file_identifier} exceeds size threshold of {self.size_threshold} " + f"(size_bytes={size_bytes}). Skipping." ) continue @@ -289,7 +342,7 @@ class WebDAVConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync): semantic_identifier=semantic_id, extension=get_file_ext(file_name), doc_updated_at=modified, - size_bytes=size_bytes if size_bytes else 0 + size_bytes=size_bytes if size_bytes is not None else 0 ) ) @@ -367,12 +420,24 @@ class WebDAVConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync): file_name = os.path.basename(file_path) if not self._is_supported_file(file_name): continue - size_bytes = file_info.get("size", 0) + size_bytes = self._get_size_bytes(file_info) + if self.size_threshold is not None and size_bytes is None: + file_identifier = self._get_log_file_identifier(file_info, file_path) + logging.warning( + f"{file_identifier}: size metadata missing from WebDAV server response, " + f"skipping to avoid processing potentially large files." + ) + continue if ( self.size_threshold is not None - and isinstance(size_bytes, int) + and size_bytes is not None and size_bytes > self.size_threshold ): + file_identifier = self._get_log_file_identifier(file_info, file_path) + logging.warning( + f"{file_identifier} exceeds size threshold of {self.size_threshold} " + f"(size_bytes={size_bytes}). Skipping." + ) continue batch.append( SlimDocument(id=f"webdav:{self.base_url}:{file_path}") diff --git a/test/unit_test/data_source/test_webdav_connector_unit.py b/test/unit_test/data_source/test_webdav_connector_unit.py new file mode 100644 index 0000000000..c7651d058c --- /dev/null +++ b/test/unit_test/data_source/test_webdav_connector_unit.py @@ -0,0 +1,307 @@ +# +# Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import importlib.util +import logging +import sys +from dataclasses import dataclass +from datetime import datetime, timezone +from enum import IntFlag, auto +from pathlib import Path +from types import ModuleType + +import pytest + + +@dataclass +class _Document: + id: str + blob: bytes + source: str + semantic_identifier: str + extension: str + doc_updated_at: datetime + size_bytes: int + + +@dataclass +class _SlimDocument: + id: str + + +class _DocumentSource: + WEBDAV = "webdav" + + +class _OnyxExtensionType(IntFlag): + Plain = auto() + Document = auto() + Multimedia = auto() + + +class _LoadConnector: + pass + + +class _PollConnector: + pass + + +class _SlimConnectorWithPermSync: + pass + + +def _install_dependency_stubs(): + config_module = ModuleType("common.data_source.config") + config_module.DocumentSource = _DocumentSource + config_module.INDEX_BATCH_SIZE = 10 + config_module.BLOB_STORAGE_SIZE_THRESHOLD = 20 + + exceptions_module = ModuleType("common.data_source.exceptions") + for name in ( + "ConnectorMissingCredentialError", + "ConnectorValidationError", + "CredentialExpiredError", + "InsufficientPermissionsError", + ): + setattr(exceptions_module, name, type(name, (Exception,), {})) + + interfaces_module = ModuleType("common.data_source.interfaces") + interfaces_module.LoadConnector = _LoadConnector + interfaces_module.PollConnector = _PollConnector + interfaces_module.SlimConnectorWithPermSync = _SlimConnectorWithPermSync + interfaces_module.OnyxExtensionType = _OnyxExtensionType + interfaces_module.GenerateDocumentsOutput = object + interfaces_module.GenerateSlimDocumentOutput = object + interfaces_module.SecondsSinceUnixEpoch = int + + models_module = ModuleType("common.data_source.models") + models_module.Document = _Document + models_module.SlimDocument = _SlimDocument + models_module.GenerateDocumentsOutput = object + models_module.GenerateSlimDocumentOutput = object + models_module.SecondsSinceUnixEpoch = int + + utils_module = ModuleType("common.data_source.utils") + utils_module.get_file_ext = lambda file_name: Path(file_name).suffix.lstrip(".").lower() + utils_module.is_accepted_file_ext = lambda file_ext, extension_type: bool(file_ext) + + webdav4_module = ModuleType("webdav4") + webdav4_client_module = ModuleType("webdav4.client") + webdav4_client_module.Client = object + + sys.modules["common.data_source.config"] = config_module + sys.modules["common.data_source.exceptions"] = exceptions_module + sys.modules["common.data_source.interfaces"] = interfaces_module + sys.modules["common.data_source.models"] = models_module + sys.modules["common.data_source.utils"] = utils_module + sys.modules["webdav4"] = webdav4_module + sys.modules["webdav4.client"] = webdav4_client_module + + +def _load_webdav_connector_module(): + """Load webdav_connector.py in isolation (avoid the package __init__).""" + repo_root = Path(__file__).resolve().parents[3] + package_name = "common.data_source" + saved_modules = {name: module for name, module in sys.modules.items() if name == package_name or name.startswith(f"{package_name}.")} + saved_webdav_modules = {name: sys.modules[name] for name in ("webdav4", "webdav4.client") if name in sys.modules} + package_stub = ModuleType(package_name) + package_stub.__path__ = [str(repo_root / "common" / "data_source")] + sys.modules[package_name] = package_stub + _install_dependency_stubs() + + try: + spec = importlib.util.spec_from_file_location( + "_webdav_connector_under_test", + repo_root / "common" / "data_source" / "webdav_connector.py", + ) + module = importlib.util.module_from_spec(spec) + assert spec.loader is not None + spec.loader.exec_module(module) + return module + finally: + for name in list(sys.modules): + if name == package_name or name.startswith(f"{package_name}."): + if name in saved_modules: + sys.modules[name] = saved_modules[name] + else: + sys.modules.pop(name, None) + for name in ("webdav4", "webdav4.client"): + if name in saved_webdav_modules: + sys.modules[name] = saved_webdav_modules[name] + else: + sys.modules.pop(name, None) + + +webdav_connector = _load_webdav_connector_module() +WebDAVConnector = webdav_connector.WebDAVConnector + + +class _FakeClient: + def __init__(self): + self.downloaded_paths = [] + + def download_fileobj(self, path, buffer): + self.downloaded_paths.append(path) + buffer.write(b"hello webdav") + + +def _stub_files(files): + def _list_files_recursive(*args, **kwargs): + return files + + return _list_files_recursive + + +@pytest.mark.p2 +def test_get_size_bytes_accepts_integer_strings(): + assert WebDAVConnector._get_size_bytes({"size": 128}) == 128 + assert WebDAVConnector._get_size_bytes({"size": "128"}) == 128 + assert WebDAVConnector._get_size_bytes({"size": " 128 "}) == 128 + assert WebDAVConnector._get_size_bytes({"size": 0}) == 0 + + +@pytest.mark.p2 +def test_get_size_bytes_ignores_unknown_values(): + assert WebDAVConnector._get_size_bytes({}) is None + assert WebDAVConnector._get_size_bytes({"size": ""}) is None + assert WebDAVConnector._get_size_bytes({"size": "unknown"}) is None + assert WebDAVConnector._get_size_bytes({"size": "-1"}) is None + assert WebDAVConnector._get_size_bytes({"size": -1}) is None + assert WebDAVConnector._get_size_bytes({"size": True}) is None + assert WebDAVConnector._get_size_bytes({"size": "1" * 21}) is None + + +@pytest.mark.p2 +def test_get_size_bytes_reads_webdav4_content_length(): + # webdav4's Client.ls(detail=True) exposes size as "content_length", not "size". + assert WebDAVConnector._get_size_bytes({"content_length": 128}) == 128 + assert WebDAVConnector._get_size_bytes({"content_length": "128"}) == 128 + assert WebDAVConnector._get_size_bytes({"content_length": 0}) == 0 + assert WebDAVConnector._get_size_bytes({"getcontentlength": "128"}) == 128 + + +@pytest.mark.p2 +def test_get_size_bytes_falls_back_across_keys(): + # When the preferred key is absent/invalid, fall back to the next valid key. + assert WebDAVConnector._get_size_bytes({"content_length": "256"}) == 256 + assert WebDAVConnector._get_size_bytes({"size": None, "content_length": 256}) == 256 + assert WebDAVConnector._get_size_bytes({"size": "bad", "content_length": 256}) == 256 + assert WebDAVConnector._get_size_bytes({"content_length": None, "getcontentlength": "256"}) == 256 + + +@pytest.mark.p1 +def test_yield_webdav_documents_skips_numeric_string_sizes_over_threshold(caplog): + connector = WebDAVConnector("https://webdav.example", batch_size=10) + connector.client = _FakeClient() + connector.size_threshold = 20 + modified = datetime(2026, 1, 1, tzinfo=timezone.utc) + caplog.set_level(logging.WARNING) + connector._list_files_recursive = _stub_files( + [ + ("/large.txt", {"size": "21", "modified": modified}), + ("/small.txt", {"size": "20", "modified": modified}), + ] + ) + + batches = list( + connector._yield_webdav_documents( + datetime(2025, 1, 1, tzinfo=timezone.utc), + datetime(2026, 2, 1, tzinfo=timezone.utc), + ) + ) + + assert [doc.id for batch in batches for doc in batch] == [ + "webdav:https://webdav.example:/small.txt", + ] + assert connector.client.downloaded_paths == ["/small.txt"] + assert batches[0][0].size_bytes == 20 + assert "large.txt exceeds size threshold of 20 (size_bytes=21). Skipping." in caplog.text + + +@pytest.mark.p1 +def test_yield_webdav_documents_skips_missing_size_metadata(caplog): + connector = WebDAVConnector("https://webdav.example", batch_size=10) + connector.client = _FakeClient() + connector.size_threshold = 20 + modified = datetime(2026, 1, 1, tzinfo=timezone.utc) + caplog.set_level(logging.WARNING) + connector._list_files_recursive = _stub_files( + [ + ("/missing.txt", {"modified": modified}), + ("/small.txt", {"size": "20", "modified": modified}), + ] + ) + + batches = list( + connector._yield_webdav_documents( + datetime(2025, 1, 1, tzinfo=timezone.utc), + datetime(2026, 2, 1, tzinfo=timezone.utc), + ) + ) + + assert [doc.id for batch in batches for doc in batch] == [ + "webdav:https://webdav.example:/small.txt", + ] + assert connector.client.downloaded_paths == ["/small.txt"] + assert ( + "missing.txt: size metadata missing from WebDAV server response, " + "skipping to avoid processing potentially large files." + ) in caplog.text + + +@pytest.mark.p1 +def test_retrieve_all_slim_docs_skips_numeric_string_sizes_over_threshold(caplog): + connector = WebDAVConnector("https://webdav.example", batch_size=10) + connector.client = object() + connector.size_threshold = 20 + caplog.set_level(logging.WARNING) + connector._list_files_recursive = _stub_files( + [ + ("/large.txt", {"size": "21"}), + ("/small.txt", {"size": "20"}), + ] + ) + + batches = list(connector.retrieve_all_slim_docs_perm_sync()) + + assert [doc.id for batch in batches for doc in batch] == [ + "webdav:https://webdav.example:/small.txt", + ] + assert "large.txt exceeds size threshold of 20 (size_bytes=21). Skipping." in caplog.text + + +@pytest.mark.p1 +def test_retrieve_all_slim_docs_skips_missing_size_metadata(caplog): + connector = WebDAVConnector("https://webdav.example", batch_size=10) + connector.client = object() + connector.size_threshold = 20 + caplog.set_level(logging.WARNING) + connector._list_files_recursive = _stub_files( + [ + ("/missing.txt", {}), + ("/small.txt", {"size": "20"}), + ] + ) + + batches = list(connector.retrieve_all_slim_docs_perm_sync()) + + assert [doc.id for batch in batches for doc in batch] == [ + "webdav:https://webdav.example:/small.txt", + ] + assert ( + "missing.txt: size metadata missing from WebDAV server response, " + "skipping to avoid processing potentially large files." + ) in caplog.text