diff --git a/common/constants.py b/common/constants.py index dd24b4ead..7956276f8 100644 --- a/common/constants.py +++ b/common/constants.py @@ -118,6 +118,7 @@ class FileSource(StrEnum): SHAREPOINT = "sharepoint" SLACK = "slack" TEAMS = "teams" + WEBDAV = "webdav" class PipelineTaskType(StrEnum): diff --git a/common/data_source/__init__.py b/common/data_source/__init__.py index 611c3c61a..1678bf9c3 100644 --- a/common/data_source/__init__.py +++ b/common/data_source/__init__.py @@ -14,6 +14,7 @@ from .google_drive.connector import GoogleDriveConnector from .jira.connector import JiraConnector from .sharepoint_connector import SharePointConnector from .teams_connector import TeamsConnector +from .webdav_connector import WebDAVConnector from .config import BlobType, DocumentSource from .models import Document, TextSection, ImageSection, BasicExpertInfo from .exceptions import ( @@ -36,6 +37,7 @@ __all__ = [ "JiraConnector", "SharePointConnector", "TeamsConnector", + "WebDAVConnector", "BlobType", "DocumentSource", "Document", diff --git a/common/data_source/config.py b/common/data_source/config.py index 6cd497527..f1b671fa4 100644 --- a/common/data_source/config.py +++ b/common/data_source/config.py @@ -42,13 +42,14 @@ class DocumentSource(str, Enum): R2 = "r2" GOOGLE_CLOUD_STORAGE = "google_cloud_storage" OCI_STORAGE = "oci_storage" + S3_COMPATIBLE = "s3_compatible" SLACK = "slack" CONFLUENCE = "confluence" JIRA = "jira" GOOGLE_DRIVE = "google_drive" GMAIL = "gmail" DISCORD = "discord" - S3_COMPATIBLE = "s3_compatible" + WEBDAV = "webdav" class FileOrigin(str, Enum): diff --git a/common/data_source/webdav_connector.py b/common/data_source/webdav_connector.py new file mode 100644 index 000000000..74fe7e9df --- /dev/null +++ b/common/data_source/webdav_connector.py @@ -0,0 +1,372 @@ +"""WebDAV connector""" +import logging +import os +from datetime import datetime, timezone +from typing import Any, Optional, Generator +from urllib.parse import urljoin, quote + +from webdav4.client import Client as WebDAVClient + +from common.data_source.utils import ( + extract_size_bytes, + get_file_ext, +) +from common.data_source.config import DocumentSource, INDEX_BATCH_SIZE, BLOB_STORAGE_SIZE_THRESHOLD +from common.data_source.exceptions import ( + ConnectorMissingCredentialError, + ConnectorValidationError, + CredentialExpiredError, + InsufficientPermissionsError +) +from common.data_source.interfaces import LoadConnector, PollConnector +from common.data_source.models import Document, SecondsSinceUnixEpoch, GenerateDocumentsOutput + + +class WebDAVConnector(LoadConnector, PollConnector): + """WebDAV connector for syncing files from WebDAV servers""" + + def __init__( + self, + base_url: str, + remote_path: str = "/", + batch_size: int = INDEX_BATCH_SIZE, + ) -> None: + """Initialize WebDAV connector + + Args: + base_url: Base URL of the WebDAV server (e.g., "https://webdav.example.com") + remote_path: Remote path to sync from (default: "/") + batch_size: Number of documents per batch + """ + self.base_url = base_url.rstrip("/") + if not remote_path: + remote_path = "/" + if not remote_path.startswith("/"): + remote_path = f"/{remote_path}" + if remote_path.endswith("/") and remote_path != "/": + remote_path = remote_path.rstrip("/") + self.remote_path = remote_path + self.batch_size = batch_size + self.client: Optional[WebDAVClient] = None + self._allow_images: bool | None = None + self.size_threshold: int | None = BLOB_STORAGE_SIZE_THRESHOLD + + def set_allow_images(self, allow_images: bool) -> None: + """Set whether to process images""" + logging.info(f"Setting allow_images to {allow_images}.") + self._allow_images = allow_images + + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: + """Load credentials and initialize WebDAV client + + Args: + credentials: Dictionary containing 'username' and 'password' + + Returns: + None + + Raises: + ConnectorMissingCredentialError: If required credentials are missing + """ + logging.debug(f"Loading credentials for WebDAV server {self.base_url}") + + username = credentials.get("username") + password = credentials.get("password") + + if not username or not password: + raise ConnectorMissingCredentialError( + "WebDAV requires 'username' and 'password' credentials" + ) + + try: + # Initialize WebDAV client + self.client = WebDAVClient( + base_url=self.base_url, + auth=(username, password) + ) + + # Test connection + self.client.exists(self.remote_path) + + except Exception as e: + logging.error(f"Failed to connect to WebDAV server: {e}") + raise ConnectorMissingCredentialError( + f"Failed to authenticate with WebDAV server: {e}" + ) + + return None + + def _list_files_recursive( + self, + path: str, + start: datetime, + end: datetime, + ) -> list[tuple[str, dict]]: + """Recursively list all files in the given path + + Args: + path: Path to list files from + start: Start datetime for filtering + end: End datetime for filtering + + Returns: + List of tuples containing (file_path, file_info) + """ + if self.client is None: + raise ConnectorMissingCredentialError("WebDAV client not initialized") + + files = [] + + try: + logging.debug(f"Listing directory: {path}") + for item in self.client.ls(path, detail=True): + item_path = item['name'] + + if item_path == path or item_path == path + '/': + continue + + logging.debug(f"Found item: {item_path}, type: {item.get('type')}") + + if item.get('type') == 'directory': + try: + files.extend(self._list_files_recursive(item_path, start, end)) + except Exception as e: + logging.error(f"Error recursing into directory {item_path}: {e}") + continue + else: + try: + modified_time = item.get('modified') + if modified_time: + if isinstance(modified_time, datetime): + modified = modified_time + if modified.tzinfo is None: + modified = modified.replace(tzinfo=timezone.utc) + elif isinstance(modified_time, str): + try: + modified = datetime.strptime(modified_time, '%a, %d %b %Y %H:%M:%S %Z') + modified = modified.replace(tzinfo=timezone.utc) + except (ValueError, TypeError): + try: + modified = datetime.fromisoformat(modified_time.replace('Z', '+00:00')) + except (ValueError, TypeError): + logging.warning(f"Could not parse modified time for {item_path}: {modified_time}") + modified = datetime.now(timezone.utc) + else: + modified = datetime.now(timezone.utc) + else: + modified = datetime.now(timezone.utc) + + + logging.debug(f"File {item_path}: modified={modified}, start={start}, end={end}, include={start < modified <= end}") + if start < modified <= end: + files.append((item_path, item)) + else: + logging.debug(f"File {item_path} filtered out by time range") + except Exception as e: + logging.error(f"Error processing file {item_path}: {e}") + continue + + except Exception as e: + logging.error(f"Error listing directory {path}: {e}") + + return files + + def _yield_webdav_documents( + self, + start: datetime, + end: datetime, + ) -> GenerateDocumentsOutput: + """Generate documents from WebDAV server + + Args: + start: Start datetime for filtering + end: End datetime for filtering + + Yields: + Batches of documents + """ + if self.client is None: + raise ConnectorMissingCredentialError("WebDAV client not initialized") + + logging.info(f"Searching for files in {self.remote_path} between {start} and {end}") + files = self._list_files_recursive(self.remote_path, start, end) + logging.info(f"Found {len(files)} files matching time criteria") + + batch: list[Document] = [] + for file_path, file_info in files: + file_name = os.path.basename(file_path) + + size_bytes = file_info.get('size', 0) + if ( + self.size_threshold is not None + and isinstance(size_bytes, int) + and size_bytes > self.size_threshold + ): + logging.warning( + f"{file_name} exceeds size threshold of {self.size_threshold}. Skipping." + ) + continue + + try: + logging.debug(f"Downloading file: {file_path}") + from io import BytesIO + buffer = BytesIO() + self.client.download_fileobj(file_path, buffer) + blob = buffer.getvalue() + + if blob is None or len(blob) == 0: + logging.warning(f"Downloaded content is empty for {file_path}") + continue + + modified_time = file_info.get('modified') + if modified_time: + if isinstance(modified_time, datetime): + modified = modified_time + if modified.tzinfo is None: + modified = modified.replace(tzinfo=timezone.utc) + elif isinstance(modified_time, str): + try: + modified = datetime.strptime(modified_time, '%a, %d %b %Y %H:%M:%S %Z') + modified = modified.replace(tzinfo=timezone.utc) + except (ValueError, TypeError): + try: + modified = datetime.fromisoformat(modified_time.replace('Z', '+00:00')) + except (ValueError, TypeError): + logging.warning(f"Could not parse modified time for {file_path}: {modified_time}") + modified = datetime.now(timezone.utc) + else: + modified = datetime.now(timezone.utc) + else: + modified = datetime.now(timezone.utc) + + batch.append( + Document( + id=f"webdav:{self.base_url}:{file_path}", + blob=blob, + source=DocumentSource.WEBDAV, + semantic_identifier=file_name, + extension=get_file_ext(file_name), + doc_updated_at=modified, + size_bytes=size_bytes if size_bytes else 0 + ) + ) + + if len(batch) == self.batch_size: + yield batch + batch = [] + + except Exception as e: + logging.exception(f"Error downloading file {file_path}: {e}") + + if batch: + yield batch + + def load_from_state(self) -> GenerateDocumentsOutput: + """Load all documents from WebDAV server + + Yields: + Batches of documents + """ + logging.debug(f"Loading documents from WebDAV server {self.base_url}") + return self._yield_webdav_documents( + start=datetime(1970, 1, 1, tzinfo=timezone.utc), + end=datetime.now(timezone.utc), + ) + + def poll_source( + self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch + ) -> GenerateDocumentsOutput: + """Poll WebDAV server for updated documents + + Args: + start: Start timestamp (seconds since Unix epoch) + end: End timestamp (seconds since Unix epoch) + + Yields: + Batches of documents + """ + if self.client is None: + raise ConnectorMissingCredentialError("WebDAV client not initialized") + + start_datetime = datetime.fromtimestamp(start, tz=timezone.utc) + end_datetime = datetime.fromtimestamp(end, tz=timezone.utc) + + for batch in self._yield_webdav_documents(start_datetime, end_datetime): + yield batch + + def validate_connector_settings(self) -> None: + """Validate WebDAV connector settings + + Raises: + ConnectorMissingCredentialError: If credentials are not loaded + ConnectorValidationError: If settings are invalid + """ + if self.client is None: + raise ConnectorMissingCredentialError( + "WebDAV credentials not loaded." + ) + + if not self.base_url: + raise ConnectorValidationError( + "No base URL was provided in connector settings." + ) + + try: + if not self.client.exists(self.remote_path): + raise ConnectorValidationError( + f"Remote path '{self.remote_path}' does not exist on WebDAV server." + ) + + except Exception as e: + error_message = str(e) + + if "401" in error_message or "unauthorized" in error_message.lower(): + raise CredentialExpiredError( + "WebDAV credentials appear invalid or expired." + ) + + if "403" in error_message or "forbidden" in error_message.lower(): + raise InsufficientPermissionsError( + f"Insufficient permissions to access path '{self.remote_path}' on WebDAV server." + ) + + if "404" in error_message or "not found" in error_message.lower(): + raise ConnectorValidationError( + f"Remote path '{self.remote_path}' does not exist on WebDAV server." + ) + + raise ConnectorValidationError( + f"Unexpected WebDAV client error: {e}" + ) + + +if __name__ == "__main__": + credentials_dict = { + "username": os.environ.get("WEBDAV_USERNAME"), + "password": os.environ.get("WEBDAV_PASSWORD"), + } + + connector = WebDAVConnector( + base_url=os.environ.get("WEBDAV_URL") or "https://webdav.example.com", + remote_path=os.environ.get("WEBDAV_PATH") or "/", + ) + + try: + connector.load_credentials(credentials_dict) + connector.validate_connector_settings() + + document_batch_generator = connector.load_from_state() + for document_batch in document_batch_generator: + print("First batch of documents:") + for doc in document_batch: + print(f"Document ID: {doc.id}") + print(f"Semantic Identifier: {doc.semantic_identifier}") + print(f"Source: {doc.source}") + print(f"Updated At: {doc.doc_updated_at}") + print("---") + break + + except ConnectorMissingCredentialError as e: + print(f"Error: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") diff --git a/pyproject.toml b/pyproject.toml index f8902d00f..1bcc99ccf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -116,6 +116,7 @@ dependencies = [ "google-genai>=1.41.0,<2.0.0", "volcengine==1.0.194", "voyageai==0.2.3", + "webdav4>=0.10.0,<0.11.0", "webdriver-manager==4.0.1", "werkzeug==3.0.6", "wikipedia==1.4.0", diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 6925eb5f7..195b9c523 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -35,6 +35,16 @@ import trio from api.db.services.connector_service import ConnectorService, SyncLogsService from api.db.services.knowledgebase_service import KnowledgebaseService +from common.log_utils import init_root_logger +from common.config_utils import show_configs +from common.data_source import BlobStorageConnector, NotionConnector, DiscordConnector, GoogleDriveConnector, WebDAVConnector +import logging +import os +from datetime import datetime, timezone +import signal +import trio +import faulthandler +from common.constants import FileSource, TaskStatus from common import settings from common.config_utils import show_configs from common.constants import FileSource, TaskStatus @@ -418,6 +428,37 @@ class Teams(SyncBase): pass +class WebDAV(SyncBase): + SOURCE_NAME: str = FileSource.WEBDAV + + async def _generate(self, task: dict): + self.connector = WebDAVConnector( + base_url=self.conf["base_url"], + remote_path=self.conf.get("remote_path", "/") + ) + self.connector.load_credentials(self.conf["credentials"]) + + logging.info(f"Task info: reindex={task['reindex']}, poll_range_start={task['poll_range_start']}") + + if task["reindex"]=="1" or not task["poll_range_start"]: + logging.info("Using load_from_state (full sync)") + document_batch_generator = self.connector.load_from_state() + begin_info = "totally" + else: + start_ts = task["poll_range_start"].timestamp() + end_ts = datetime.now(timezone.utc).timestamp() + logging.info(f"Polling WebDAV from {task['poll_range_start']} (ts: {start_ts}) to now (ts: {end_ts})") + document_batch_generator = self.connector.poll_source(start_ts, end_ts) + begin_info = "from {}".format(task["poll_range_start"]) + + logging.info("Connect to WebDAV: {}(path: {}) {}".format( + self.conf["base_url"], + self.conf.get("remote_path", "/"), + begin_info + )) + return document_batch_generator + + func_factory = { FileSource.S3: S3, FileSource.NOTION: Notion, @@ -429,6 +470,7 @@ func_factory = { FileSource.SHAREPOINT: SharePoint, FileSource.SLACK: Slack, FileSource.TEAMS: Teams, + FileSource.WEBDAV: WebDAV } diff --git a/uv.lock b/uv.lock index 6a6728adc..2c476f8a6 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.10, <3.13" resolution-markers = [ "python_full_version >= '3.12' and sys_platform == 'darwin'", @@ -5443,6 +5443,7 @@ dependencies = [ { name = "vertexai" }, { name = "volcengine" }, { name = "voyageai" }, + { name = "webdav4" }, { name = "webdriver-manager" }, { name = "werkzeug" }, { name = "wikipedia" }, @@ -5603,6 +5604,7 @@ requires-dist = [ { name = "vertexai", specifier = "==1.70.0" }, { name = "volcengine", specifier = "==1.0.194" }, { name = "voyageai", specifier = "==0.2.3" }, + { name = "webdav4", specifier = ">=0.10.0,<0.11.0" }, { name = "webdriver-manager", specifier = "==4.0.1" }, { name = "werkzeug", specifier = "==3.0.6" }, { name = "wikipedia", specifier = "==1.4.0" }, @@ -7131,6 +7133,19 @@ wheels = [ { url = "https://pypi.tuna.tsinghua.edu.cn/packages/fd/84/fd2ba7aafacbad3c4201d395674fc6348826569da3c0937e75505ead3528/wcwidth-0.2.13-py2.py3-none-any.whl", hash = "sha256:3da69048e4540d84af32131829ff948f1e022c1c6bdb8d6102117aac784f6859", size = 34166, upload-time = "2024-01-06T02:10:55.763Z" }, ] +[[package]] +name = "webdav4" +version = "0.10.0" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +dependencies = [ + { name = "httpx" }, + { name = "python-dateutil" }, +] +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/08/3d/d604f9d5195689e578f124f196a5d7e80f3106c8404f5c19b2181691de19/webdav4-0.10.0.tar.gz", hash = "sha256:387da6f0ee384e77149dddd9bcfd434afa155882f6c440a529a7cb458624407f", size = 229195, upload-time = "2024-07-13T19:42:42.593Z" } +wheels = [ + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/60/02/1b77232297fa52f7bedcf70f3ebe3817e9295f302389fb57dd0e6c077329/webdav4-0.10.0-py3-none-any.whl", hash = "sha256:8f915d72483e572089a3af0a2ad20c7e12d04eee9b9134eb718dbfa37af221d8", size = 36350, upload-time = "2024-07-13T19:42:41.087Z" }, +] + [[package]] name = "webdriver-manager" version = "4.0.1" diff --git a/web/src/assets/svg/data-source/webdav.svg b/web/src/assets/svg/data-source/webdav.svg new file mode 100644 index 000000000..a970d38fe --- /dev/null +++ b/web/src/assets/svg/data-source/webdav.svg @@ -0,0 +1,15 @@ + + + + + + + + + + \ No newline at end of file diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts index 3014fbd4f..d9e07824a 100644 --- a/web/src/locales/en.ts +++ b/web/src/locales/en.ts @@ -729,6 +729,10 @@ Example: https://fsn1.your-objectstorage.com`, 'Sync pages and databases from Notion for knowledge retrieval.', google_driveDescription: 'Connect your Google Drive via OAuth and sync specific folders or drives.', + webdavDescription: + 'Connect to WebDAV servers (including Nextcloud, ownCloud) to sync files.', + webdavRemotePathTip: + 'Optional: Specify a folder path on the WebDAV server (e.g., /Documents). Leave empty to sync from root.', google_driveTokenTip: 'Upload the OAuth token JSON generated from the OAuth helper or Google Cloud Console. You may also upload a client_secret JSON from an "installed" or "web" application. If this is your first sync, a browser window will open to complete the OAuth consent. If the JSON already contains a refresh token, it will be reused automatically.', google_drivePrimaryAdminTip: diff --git a/web/src/pages/user-setting/data-source/contant.tsx b/web/src/pages/user-setting/data-source/contant.tsx index 65788464c..992e7ae39 100644 --- a/web/src/pages/user-setting/data-source/contant.tsx +++ b/web/src/pages/user-setting/data-source/contant.tsx @@ -11,6 +11,9 @@ export enum DataSourceKey { GOOGLE_DRIVE = 'google_drive', // GMAIL = 'gmail', JIRA = 'jira', + WEBDAV = 'webdav', + // GMAIL = 'gmail', + // JIRA = 'jira', // SHAREPOINT = 'sharepoint', // SLACK = 'slack', // TEAMS = 'teams', @@ -47,6 +50,11 @@ export const DataSourceInfo = { description: t(`setting.${DataSourceKey.JIRA}Description`), icon: , }, + [DataSourceKey.WEBDAV]: { + name: 'WebDAV', + description: t(`setting.${DataSourceKey.WEBDAV}Description`), + icon: , + }, }; export const DataSourceFormBaseFields = [ @@ -387,6 +395,35 @@ export const DataSourceFormFields = { tooltip: t('setting.jiraPasswordTip'), }, ], + [DataSourceKey.WEBDAV]: [ + { + label: 'WebDAV Server URL', + name: 'config.base_url', + type: FormFieldType.Text, + required: true, + placeholder: 'https://webdav.example.com', + }, + { + label: 'Username', + name: 'config.credentials.username', + type: FormFieldType.Text, + required: true, + }, + { + label: 'Password', + name: 'config.credentials.password', + type: FormFieldType.Password, + required: true, + }, + { + label: 'Remote Path', + name: 'config.remote_path', + type: FormFieldType.Text, + required: false, + placeholder: '/', + tooltip: t('setting.webdavRemotePathTip'), + }, + ], // [DataSourceKey.GOOGLE_DRIVE]: [ // { // label: 'Primary Admin Email', @@ -572,4 +609,16 @@ export const DataSourceFormDefaultValues = { }, }, }, + [DataSourceKey.WEBDAV]: { + name: '', + source: DataSourceKey.WEBDAV, + config: { + base_url: '', + remote_path: '/', + credentials: { + username: '', + password: '', + }, + }, + }, }; diff --git a/web/src/pages/user-setting/data-source/index.tsx b/web/src/pages/user-setting/data-source/index.tsx index 9cb58672a..b6ec14a88 100644 --- a/web/src/pages/user-setting/data-source/index.tsx +++ b/web/src/pages/user-setting/data-source/index.tsx @@ -50,6 +50,12 @@ const dataSourceTemplates = [ description: DataSourceInfo[DataSourceKey.JIRA].description, icon: DataSourceInfo[DataSourceKey.JIRA].icon, }, + { + id: DataSourceKey.WEBDAV, + name: DataSourceInfo[DataSourceKey.WEBDAV].name, + description: DataSourceInfo[DataSourceKey.WEBDAV].description, + icon: DataSourceInfo[DataSourceKey.WEBDAV].icon, + }, ]; const DataSource = () => { const { t } = useTranslation();