feat: move processed files to __enqueued__ directory after processing with filename conflicts handling
This commit is contained in:
parent
dc7a6e1c5b
commit
5d00c4c7a8
1 changed files with 57 additions and 0 deletions
|
|
@ -746,6 +746,39 @@ class DocumentManager:
|
||||||
return any(filename.lower().endswith(ext) for ext in self.supported_extensions)
|
return any(filename.lower().endswith(ext) for ext in self.supported_extensions)
|
||||||
|
|
||||||
|
|
||||||
|
def get_unique_filename_in_enqueued(target_dir: Path, original_name: str) -> str:
|
||||||
|
"""Generate a unique filename in the target directory by adding numeric suffixes if needed
|
||||||
|
|
||||||
|
Args:
|
||||||
|
target_dir: Target directory path
|
||||||
|
original_name: Original filename
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: Unique filename (may have numeric suffix added)
|
||||||
|
"""
|
||||||
|
from pathlib import Path
|
||||||
|
import time
|
||||||
|
|
||||||
|
original_path = Path(original_name)
|
||||||
|
base_name = original_path.stem
|
||||||
|
extension = original_path.suffix
|
||||||
|
|
||||||
|
# Try original name first
|
||||||
|
if not (target_dir / original_name).exists():
|
||||||
|
return original_name
|
||||||
|
|
||||||
|
# Try with numeric suffixes 001-999
|
||||||
|
for i in range(1, 1000):
|
||||||
|
suffix = f"{i:03d}"
|
||||||
|
new_name = f"{base_name}_{suffix}{extension}"
|
||||||
|
if not (target_dir / new_name).exists():
|
||||||
|
return new_name
|
||||||
|
|
||||||
|
# Fallback with timestamp if all 999 slots are taken
|
||||||
|
timestamp = int(time.time())
|
||||||
|
return f"{base_name}_{timestamp}{extension}"
|
||||||
|
|
||||||
|
|
||||||
async def pipeline_enqueue_file(
|
async def pipeline_enqueue_file(
|
||||||
rag: LightRAG, file_path: Path, track_id: str = None
|
rag: LightRAG, file_path: Path, track_id: str = None
|
||||||
) -> tuple[bool, str]:
|
) -> tuple[bool, str]:
|
||||||
|
|
@ -939,6 +972,30 @@ async def pipeline_enqueue_file(
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(f"Successfully fetched and enqueued file: {file_path.name}")
|
logger.info(f"Successfully fetched and enqueued file: {file_path.name}")
|
||||||
|
|
||||||
|
# Move file to __enqueued__ directory after enqueuing
|
||||||
|
try:
|
||||||
|
enqueued_dir = file_path.parent / "__enqueued__"
|
||||||
|
enqueued_dir.mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
# Generate unique filename to avoid conflicts
|
||||||
|
unique_filename = get_unique_filename_in_enqueued(
|
||||||
|
enqueued_dir, file_path.name
|
||||||
|
)
|
||||||
|
target_path = enqueued_dir / unique_filename
|
||||||
|
|
||||||
|
# Move the file
|
||||||
|
file_path.rename(target_path)
|
||||||
|
logger.info(
|
||||||
|
f"Moved file to enqueued directory: {file_path.name} -> {unique_filename}"
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as move_error:
|
||||||
|
logger.error(
|
||||||
|
f"Failed to move file {file_path.name} to __enqueued__ directory: {move_error}"
|
||||||
|
)
|
||||||
|
# Don't affect the main function's success status
|
||||||
|
|
||||||
return True, track_id
|
return True, track_id
|
||||||
else:
|
else:
|
||||||
logger.error(f"No content could be extracted from file: {file_path.name}")
|
logger.error(f"No content could be extracted from file: {file_path.name}")
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue