Feat: add webdav data source to data sync service

This commit is contained in:
Jonah879 2025-11-13 14:15:54 +00:00
parent d3d2ccc76c
commit a7c7b55043
11 changed files with 510 additions and 2 deletions

View file

@ -118,6 +118,7 @@ class FileSource(StrEnum):
SHAREPOINT = "sharepoint"
SLACK = "slack"
TEAMS = "teams"
WEBDAV = "webdav"
class PipelineTaskType(StrEnum):

View file

@ -14,6 +14,7 @@ from .google_drive.connector import GoogleDriveConnector
from .jira.connector import JiraConnector
from .sharepoint_connector import SharePointConnector
from .teams_connector import TeamsConnector
from .webdav_connector import WebDAVConnector
from .config import BlobType, DocumentSource
from .models import Document, TextSection, ImageSection, BasicExpertInfo
from .exceptions import (
@ -36,6 +37,7 @@ __all__ = [
"JiraConnector",
"SharePointConnector",
"TeamsConnector",
"WebDAVConnector",
"BlobType",
"DocumentSource",
"Document",

View file

@ -42,13 +42,14 @@ class DocumentSource(str, Enum):
R2 = "r2"
GOOGLE_CLOUD_STORAGE = "google_cloud_storage"
OCI_STORAGE = "oci_storage"
S3_COMPATIBLE = "s3_compatible"
SLACK = "slack"
CONFLUENCE = "confluence"
JIRA = "jira"
GOOGLE_DRIVE = "google_drive"
GMAIL = "gmail"
DISCORD = "discord"
S3_COMPATIBLE = "s3_compatible"
WEBDAV = "webdav"
class FileOrigin(str, Enum):

View file

@ -0,0 +1,372 @@
"""WebDAV connector"""
import logging
import os
from datetime import datetime, timezone
from typing import Any, Optional, Generator
from urllib.parse import urljoin, quote
from webdav4.client import Client as WebDAVClient
from common.data_source.utils import (
extract_size_bytes,
get_file_ext,
)
from common.data_source.config import DocumentSource, INDEX_BATCH_SIZE, BLOB_STORAGE_SIZE_THRESHOLD
from common.data_source.exceptions import (
ConnectorMissingCredentialError,
ConnectorValidationError,
CredentialExpiredError,
InsufficientPermissionsError
)
from common.data_source.interfaces import LoadConnector, PollConnector
from common.data_source.models import Document, SecondsSinceUnixEpoch, GenerateDocumentsOutput
class WebDAVConnector(LoadConnector, PollConnector):
"""WebDAV connector for syncing files from WebDAV servers"""
def __init__(
self,
base_url: str,
remote_path: str = "/",
batch_size: int = INDEX_BATCH_SIZE,
) -> None:
"""Initialize WebDAV connector
Args:
base_url: Base URL of the WebDAV server (e.g., "https://webdav.example.com")
remote_path: Remote path to sync from (default: "/")
batch_size: Number of documents per batch
"""
self.base_url = base_url.rstrip("/")
if not remote_path:
remote_path = "/"
if not remote_path.startswith("/"):
remote_path = f"/{remote_path}"
if remote_path.endswith("/") and remote_path != "/":
remote_path = remote_path.rstrip("/")
self.remote_path = remote_path
self.batch_size = batch_size
self.client: Optional[WebDAVClient] = None
self._allow_images: bool | None = None
self.size_threshold: int | None = BLOB_STORAGE_SIZE_THRESHOLD
def set_allow_images(self, allow_images: bool) -> None:
"""Set whether to process images"""
logging.info(f"Setting allow_images to {allow_images}.")
self._allow_images = allow_images
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
"""Load credentials and initialize WebDAV client
Args:
credentials: Dictionary containing 'username' and 'password'
Returns:
None
Raises:
ConnectorMissingCredentialError: If required credentials are missing
"""
logging.debug(f"Loading credentials for WebDAV server {self.base_url}")
username = credentials.get("username")
password = credentials.get("password")
if not username or not password:
raise ConnectorMissingCredentialError(
"WebDAV requires 'username' and 'password' credentials"
)
try:
# Initialize WebDAV client
self.client = WebDAVClient(
base_url=self.base_url,
auth=(username, password)
)
# Test connection
self.client.exists(self.remote_path)
except Exception as e:
logging.error(f"Failed to connect to WebDAV server: {e}")
raise ConnectorMissingCredentialError(
f"Failed to authenticate with WebDAV server: {e}"
)
return None
def _list_files_recursive(
self,
path: str,
start: datetime,
end: datetime,
) -> list[tuple[str, dict]]:
"""Recursively list all files in the given path
Args:
path: Path to list files from
start: Start datetime for filtering
end: End datetime for filtering
Returns:
List of tuples containing (file_path, file_info)
"""
if self.client is None:
raise ConnectorMissingCredentialError("WebDAV client not initialized")
files = []
try:
logging.debug(f"Listing directory: {path}")
for item in self.client.ls(path, detail=True):
item_path = item['name']
if item_path == path or item_path == path + '/':
continue
logging.debug(f"Found item: {item_path}, type: {item.get('type')}")
if item.get('type') == 'directory':
try:
files.extend(self._list_files_recursive(item_path, start, end))
except Exception as e:
logging.error(f"Error recursing into directory {item_path}: {e}")
continue
else:
try:
modified_time = item.get('modified')
if modified_time:
if isinstance(modified_time, datetime):
modified = modified_time
if modified.tzinfo is None:
modified = modified.replace(tzinfo=timezone.utc)
elif isinstance(modified_time, str):
try:
modified = datetime.strptime(modified_time, '%a, %d %b %Y %H:%M:%S %Z')
modified = modified.replace(tzinfo=timezone.utc)
except (ValueError, TypeError):
try:
modified = datetime.fromisoformat(modified_time.replace('Z', '+00:00'))
except (ValueError, TypeError):
logging.warning(f"Could not parse modified time for {item_path}: {modified_time}")
modified = datetime.now(timezone.utc)
else:
modified = datetime.now(timezone.utc)
else:
modified = datetime.now(timezone.utc)
logging.debug(f"File {item_path}: modified={modified}, start={start}, end={end}, include={start < modified <= end}")
if start < modified <= end:
files.append((item_path, item))
else:
logging.debug(f"File {item_path} filtered out by time range")
except Exception as e:
logging.error(f"Error processing file {item_path}: {e}")
continue
except Exception as e:
logging.error(f"Error listing directory {path}: {e}")
return files
def _yield_webdav_documents(
self,
start: datetime,
end: datetime,
) -> GenerateDocumentsOutput:
"""Generate documents from WebDAV server
Args:
start: Start datetime for filtering
end: End datetime for filtering
Yields:
Batches of documents
"""
if self.client is None:
raise ConnectorMissingCredentialError("WebDAV client not initialized")
logging.info(f"Searching for files in {self.remote_path} between {start} and {end}")
files = self._list_files_recursive(self.remote_path, start, end)
logging.info(f"Found {len(files)} files matching time criteria")
batch: list[Document] = []
for file_path, file_info in files:
file_name = os.path.basename(file_path)
size_bytes = file_info.get('size', 0)
if (
self.size_threshold is not None
and isinstance(size_bytes, int)
and size_bytes > self.size_threshold
):
logging.warning(
f"{file_name} exceeds size threshold of {self.size_threshold}. Skipping."
)
continue
try:
logging.debug(f"Downloading file: {file_path}")
from io import BytesIO
buffer = BytesIO()
self.client.download_fileobj(file_path, buffer)
blob = buffer.getvalue()
if blob is None or len(blob) == 0:
logging.warning(f"Downloaded content is empty for {file_path}")
continue
modified_time = file_info.get('modified')
if modified_time:
if isinstance(modified_time, datetime):
modified = modified_time
if modified.tzinfo is None:
modified = modified.replace(tzinfo=timezone.utc)
elif isinstance(modified_time, str):
try:
modified = datetime.strptime(modified_time, '%a, %d %b %Y %H:%M:%S %Z')
modified = modified.replace(tzinfo=timezone.utc)
except (ValueError, TypeError):
try:
modified = datetime.fromisoformat(modified_time.replace('Z', '+00:00'))
except (ValueError, TypeError):
logging.warning(f"Could not parse modified time for {file_path}: {modified_time}")
modified = datetime.now(timezone.utc)
else:
modified = datetime.now(timezone.utc)
else:
modified = datetime.now(timezone.utc)
batch.append(
Document(
id=f"webdav:{self.base_url}:{file_path}",
blob=blob,
source=DocumentSource.WEBDAV,
semantic_identifier=file_name,
extension=get_file_ext(file_name),
doc_updated_at=modified,
size_bytes=size_bytes if size_bytes else 0
)
)
if len(batch) == self.batch_size:
yield batch
batch = []
except Exception as e:
logging.exception(f"Error downloading file {file_path}: {e}")
if batch:
yield batch
def load_from_state(self) -> GenerateDocumentsOutput:
"""Load all documents from WebDAV server
Yields:
Batches of documents
"""
logging.debug(f"Loading documents from WebDAV server {self.base_url}")
return self._yield_webdav_documents(
start=datetime(1970, 1, 1, tzinfo=timezone.utc),
end=datetime.now(timezone.utc),
)
def poll_source(
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
) -> GenerateDocumentsOutput:
"""Poll WebDAV server for updated documents
Args:
start: Start timestamp (seconds since Unix epoch)
end: End timestamp (seconds since Unix epoch)
Yields:
Batches of documents
"""
if self.client is None:
raise ConnectorMissingCredentialError("WebDAV client not initialized")
start_datetime = datetime.fromtimestamp(start, tz=timezone.utc)
end_datetime = datetime.fromtimestamp(end, tz=timezone.utc)
for batch in self._yield_webdav_documents(start_datetime, end_datetime):
yield batch
def validate_connector_settings(self) -> None:
"""Validate WebDAV connector settings
Raises:
ConnectorMissingCredentialError: If credentials are not loaded
ConnectorValidationError: If settings are invalid
"""
if self.client is None:
raise ConnectorMissingCredentialError(
"WebDAV credentials not loaded."
)
if not self.base_url:
raise ConnectorValidationError(
"No base URL was provided in connector settings."
)
try:
if not self.client.exists(self.remote_path):
raise ConnectorValidationError(
f"Remote path '{self.remote_path}' does not exist on WebDAV server."
)
except Exception as e:
error_message = str(e)
if "401" in error_message or "unauthorized" in error_message.lower():
raise CredentialExpiredError(
"WebDAV credentials appear invalid or expired."
)
if "403" in error_message or "forbidden" in error_message.lower():
raise InsufficientPermissionsError(
f"Insufficient permissions to access path '{self.remote_path}' on WebDAV server."
)
if "404" in error_message or "not found" in error_message.lower():
raise ConnectorValidationError(
f"Remote path '{self.remote_path}' does not exist on WebDAV server."
)
raise ConnectorValidationError(
f"Unexpected WebDAV client error: {e}"
)
if __name__ == "__main__":
credentials_dict = {
"username": os.environ.get("WEBDAV_USERNAME"),
"password": os.environ.get("WEBDAV_PASSWORD"),
}
connector = WebDAVConnector(
base_url=os.environ.get("WEBDAV_URL") or "https://webdav.example.com",
remote_path=os.environ.get("WEBDAV_PATH") or "/",
)
try:
connector.load_credentials(credentials_dict)
connector.validate_connector_settings()
document_batch_generator = connector.load_from_state()
for document_batch in document_batch_generator:
print("First batch of documents:")
for doc in document_batch:
print(f"Document ID: {doc.id}")
print(f"Semantic Identifier: {doc.semantic_identifier}")
print(f"Source: {doc.source}")
print(f"Updated At: {doc.doc_updated_at}")
print("---")
break
except ConnectorMissingCredentialError as e:
print(f"Error: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")

View file

@ -116,6 +116,7 @@ dependencies = [
"google-genai>=1.41.0,<2.0.0",
"volcengine==1.0.194",
"voyageai==0.2.3",
"webdav4>=0.10.0,<0.11.0",
"webdriver-manager==4.0.1",
"werkzeug==3.0.6",
"wikipedia==1.4.0",

View file

@ -35,6 +35,16 @@ import trio
from api.db.services.connector_service import ConnectorService, SyncLogsService
from api.db.services.knowledgebase_service import KnowledgebaseService
from common.log_utils import init_root_logger
from common.config_utils import show_configs
from common.data_source import BlobStorageConnector, NotionConnector, DiscordConnector, GoogleDriveConnector, WebDAVConnector
import logging
import os
from datetime import datetime, timezone
import signal
import trio
import faulthandler
from common.constants import FileSource, TaskStatus
from common import settings
from common.config_utils import show_configs
from common.constants import FileSource, TaskStatus
@ -418,6 +428,37 @@ class Teams(SyncBase):
pass
class WebDAV(SyncBase):
SOURCE_NAME: str = FileSource.WEBDAV
async def _generate(self, task: dict):
self.connector = WebDAVConnector(
base_url=self.conf["base_url"],
remote_path=self.conf.get("remote_path", "/")
)
self.connector.load_credentials(self.conf["credentials"])
logging.info(f"Task info: reindex={task['reindex']}, poll_range_start={task['poll_range_start']}")
if task["reindex"]=="1" or not task["poll_range_start"]:
logging.info("Using load_from_state (full sync)")
document_batch_generator = self.connector.load_from_state()
begin_info = "totally"
else:
start_ts = task["poll_range_start"].timestamp()
end_ts = datetime.now(timezone.utc).timestamp()
logging.info(f"Polling WebDAV from {task['poll_range_start']} (ts: {start_ts}) to now (ts: {end_ts})")
document_batch_generator = self.connector.poll_source(start_ts, end_ts)
begin_info = "from {}".format(task["poll_range_start"])
logging.info("Connect to WebDAV: {}(path: {}) {}".format(
self.conf["base_url"],
self.conf.get("remote_path", "/"),
begin_info
))
return document_batch_generator
func_factory = {
FileSource.S3: S3,
FileSource.NOTION: Notion,
@ -429,6 +470,7 @@ func_factory = {
FileSource.SHAREPOINT: SharePoint,
FileSource.SLACK: Slack,
FileSource.TEAMS: Teams,
FileSource.WEBDAV: WebDAV
}

17
uv.lock generated
View file

@ -1,5 +1,5 @@
version = 1
revision = 3
revision = 2
requires-python = ">=3.10, <3.13"
resolution-markers = [
"python_full_version >= '3.12' and sys_platform == 'darwin'",
@ -5443,6 +5443,7 @@ dependencies = [
{ name = "vertexai" },
{ name = "volcengine" },
{ name = "voyageai" },
{ name = "webdav4" },
{ name = "webdriver-manager" },
{ name = "werkzeug" },
{ name = "wikipedia" },
@ -5603,6 +5604,7 @@ requires-dist = [
{ name = "vertexai", specifier = "==1.70.0" },
{ name = "volcengine", specifier = "==1.0.194" },
{ name = "voyageai", specifier = "==0.2.3" },
{ name = "webdav4", specifier = ">=0.10.0,<0.11.0" },
{ name = "webdriver-manager", specifier = "==4.0.1" },
{ name = "werkzeug", specifier = "==3.0.6" },
{ name = "wikipedia", specifier = "==1.4.0" },
@ -7131,6 +7133,19 @@ wheels = [
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/fd/84/fd2ba7aafacbad3c4201d395674fc6348826569da3c0937e75505ead3528/wcwidth-0.2.13-py2.py3-none-any.whl", hash = "sha256:3da69048e4540d84af32131829ff948f1e022c1c6bdb8d6102117aac784f6859", size = 34166, upload-time = "2024-01-06T02:10:55.763Z" },
]
[[package]]
name = "webdav4"
version = "0.10.0"
source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" }
dependencies = [
{ name = "httpx" },
{ name = "python-dateutil" },
]
sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/08/3d/d604f9d5195689e578f124f196a5d7e80f3106c8404f5c19b2181691de19/webdav4-0.10.0.tar.gz", hash = "sha256:387da6f0ee384e77149dddd9bcfd434afa155882f6c440a529a7cb458624407f", size = 229195, upload-time = "2024-07-13T19:42:42.593Z" }
wheels = [
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/60/02/1b77232297fa52f7bedcf70f3ebe3817e9295f302389fb57dd0e6c077329/webdav4-0.10.0-py3-none-any.whl", hash = "sha256:8f915d72483e572089a3af0a2ad20c7e12d04eee9b9134eb718dbfa37af221d8", size = 36350, upload-time = "2024-07-13T19:42:41.087Z" },
]
[[package]]
name = "webdriver-manager"
version = "4.0.1"

View file

@ -0,0 +1,15 @@
<?xml version="1.0" encoding="utf-8"?>
<!-- Uploaded to: SVG Repo, www.svgrepo.com, Generator: SVG Repo Mixer Tools -->
<svg xmlns="http://www.w3.org/2000/svg"
aria-label="NextCloud" role="img"
viewBox="0 0 512 512">
<rect
width="512" height="512"
rx="15%"
fill="#0082c9"/>
<g stroke="#ffffff" stroke-width="33" fill="none">
<circle r="40" cy="256" cx="120"/>
<circle r="71" cy="256" cx="256"/>
<circle r="40" cy="256" cx="392"/>
</g>
</svg>

After

Width:  |  Height:  |  Size: 449 B

View file

@ -729,6 +729,10 @@ Example: https://fsn1.your-objectstorage.com`,
'Sync pages and databases from Notion for knowledge retrieval.',
google_driveDescription:
'Connect your Google Drive via OAuth and sync specific folders or drives.',
webdavDescription:
'Connect to WebDAV servers (including Nextcloud, ownCloud) to sync files.',
webdavRemotePathTip:
'Optional: Specify a folder path on the WebDAV server (e.g., /Documents). Leave empty to sync from root.',
google_driveTokenTip:
'Upload the OAuth token JSON generated from the OAuth helper or Google Cloud Console. You may also upload a client_secret JSON from an "installed" or "web" application. If this is your first sync, a browser window will open to complete the OAuth consent. If the JSON already contains a refresh token, it will be reused automatically.',
google_drivePrimaryAdminTip:

View file

@ -11,6 +11,9 @@ export enum DataSourceKey {
GOOGLE_DRIVE = 'google_drive',
// GMAIL = 'gmail',
JIRA = 'jira',
WEBDAV = 'webdav',
// GMAIL = 'gmail',
// JIRA = 'jira',
// SHAREPOINT = 'sharepoint',
// SLACK = 'slack',
// TEAMS = 'teams',
@ -47,6 +50,11 @@ export const DataSourceInfo = {
description: t(`setting.${DataSourceKey.JIRA}Description`),
icon: <SvgIcon name={'data-source/jira'} width={38} />,
},
[DataSourceKey.WEBDAV]: {
name: 'WebDAV',
description: t(`setting.${DataSourceKey.WEBDAV}Description`),
icon: <SvgIcon name={'data-source/webdav'} width={38} />,
},
};
export const DataSourceFormBaseFields = [
@ -387,6 +395,35 @@ export const DataSourceFormFields = {
tooltip: t('setting.jiraPasswordTip'),
},
],
[DataSourceKey.WEBDAV]: [
{
label: 'WebDAV Server URL',
name: 'config.base_url',
type: FormFieldType.Text,
required: true,
placeholder: 'https://webdav.example.com',
},
{
label: 'Username',
name: 'config.credentials.username',
type: FormFieldType.Text,
required: true,
},
{
label: 'Password',
name: 'config.credentials.password',
type: FormFieldType.Password,
required: true,
},
{
label: 'Remote Path',
name: 'config.remote_path',
type: FormFieldType.Text,
required: false,
placeholder: '/',
tooltip: t('setting.webdavRemotePathTip'),
},
],
// [DataSourceKey.GOOGLE_DRIVE]: [
// {
// label: 'Primary Admin Email',
@ -572,4 +609,16 @@ export const DataSourceFormDefaultValues = {
},
},
},
[DataSourceKey.WEBDAV]: {
name: '',
source: DataSourceKey.WEBDAV,
config: {
base_url: '',
remote_path: '/',
credentials: {
username: '',
password: '',
},
},
},
};

View file

@ -50,6 +50,12 @@ const dataSourceTemplates = [
description: DataSourceInfo[DataSourceKey.JIRA].description,
icon: DataSourceInfo[DataSourceKey.JIRA].icon,
},
{
id: DataSourceKey.WEBDAV,
name: DataSourceInfo[DataSourceKey.WEBDAV].name,
description: DataSourceInfo[DataSourceKey.WEBDAV].description,
icon: DataSourceInfo[DataSourceKey.WEBDAV].icon,
},
];
const DataSource = () => {
const { t } = useTranslation();