This commit is contained in:
phact 2025-07-10 22:36:45 -04:00
commit 6882fe59d2
9 changed files with 2222 additions and 0 deletions

12
.gitignore vendored Normal file
View file

@ -0,0 +1,12 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
.idea/

1
.python-version Normal file
View file

@ -0,0 +1 @@
3.13

40
Dockerfile Normal file
View file

@ -0,0 +1,40 @@
FROM opensearchproject/opensearch:3.0.0
USER root
RUN echo y | dnf install less procps-ng findutils sysstat perf sudo
# Grant the opensearchuser sudo privileges
# 'wheel' is the sudo group in Amazon Linux
RUN usermod -aG wheel opensearch
# Change the sudoers file to allow passwordless sudo
RUN echo "opensearch ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers
# FIXME handle the machine arch better, somehow
ARG ASYNC_PROFILER_URL=https://github.com/async-profiler/async-profiler/releases/download/v4.0/async-profiler-4.0-linux-x64.tar.gz
RUN mkdir /opt/async-profiler
RUN curl -s -L $ASYNC_PROFILER_URL | tar zxvf - --strip-components=1 -C /opt/async-profiler
RUN chown -R opensearch:opensearch /opt/async-profiler
RUN echo "#!/bin/bash" > /usr/share/opensearch/profile.sh
RUN echo "export PATH=\$PATH:/opt/async-profiler/bin" >> /usr/share/opensearch/profile.sh
RUN echo "echo 1 | sudo tee /proc/sys/kernel/perf_event_paranoid >/dev/null" >> /usr/share/opensearch/profile.sh
RUN echo "echo 0 | sudo tee /proc/sys/kernel/kptr_restrict >/dev/null" >> /usr/share/opensearch/profile.sh
RUN echo "asprof \$@" >> /usr/share/opensearch/profile.sh
RUN chmod 777 /usr/share/opensearch/profile.sh
USER opensearch
RUN opensearch-plugin remove opensearch-neural-search
RUN opensearch-plugin remove opensearch-knn
# FIXME installing the prom exporter plugin ahead of time isn't compatible with the operator, for now
# RUN opensearch-plugin install https://github.com/Virtimo/prometheus-exporter-plugin-for-opensearch/releases/download/v2.18.0/prometheus-exporter-2.18.0.0.zip
RUN echo y | opensearch-plugin install https://repo1.maven.org/maven2/org/opensearch/plugin/opensearch-jvector-plugin/3.0.0.3/opensearch-jvector-plugin-3.0.0.3.zip
RUN echo y | opensearch-plugin install repository-gcs
RUN echo y | opensearch-plugin install repository-azure
RUN echo y | opensearch-plugin install repository-s3

0
README.md Normal file
View file

25
docker-compose.yml Normal file
View file

@ -0,0 +1,25 @@
services:
opensearch:
build:
context: .
dockerfile: Dockerfile
container_name: os
environment:
- discovery.type=single-node
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=OSisgendb1!
ports:
- "9200:9200"
- "9600:9600"
dashboards:
image: opensearchproject/opensearch-dashboards:3.0.0
container_name: osdash
depends_on:
- opensearch
environment:
OPENSEARCH_HOSTS: '["https://opensearch:9200"]'
OPENSEARCH_USERNAME: "admin"
OPENSEARCH_PASSWORD: "OSisgendb1!"
ports:
- "5601:5601"

BIN
documents/2506.08231v1.pdf Normal file

Binary file not shown.

28
pyproject.toml Normal file
View file

@ -0,0 +1,28 @@
[project]
name = "gendb"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"aiofiles>=24.1.0",
"docling>=2.41.0",
"opensearch-py[async]>=3.0.0",
"python-multipart>=0.0.20",
"starlette>=0.47.1",
"torch>=2.7.1",
"uvicorn>=0.35.0",
]
[tool.uv.sources]
torch = [
{ index = "pytorch-cu128" },
]
torchvision = [
{ index = "pytorch-cu128" },
]
[[tool.uv.index]]
name = "pytorch-cu128"
url = "https://download.pytorch.org/whl/cu128"
explicit = true

172
src/app.py Normal file
View file

@ -0,0 +1,172 @@
# app.py
import os
os.environ['USE_CPU_ONLY'] = 'true'
import json
import hashlib
import tempfile
import asyncio
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
import aiofiles
from opensearchpy import AsyncOpenSearch
from opensearchpy._async.http_aiohttp import AIOHttpConnection
from docling.document_converter import DocumentConverter
# Initialize Docling converter
converter = DocumentConverter() # basic converter; tweak via PipelineOptions if you need OCR, etc. :contentReference[oaicite:0]{index=0}
# Initialize Async OpenSearch (adjust hosts/auth as needed)
es = AsyncOpenSearch(
hosts=[{"host": "localhost", "port": 9200}],
connection_class=AIOHttpConnection,
scheme="https",
use_ssl=True,
verify_certs=False,
ssl_assert_fingerprint=None,
http_auth=("admin","OSisgendb1!"),
http_compress=True,
)
INDEX_NAME = "documents"
index_body = {
"settings": {"number_of_shards":1, "number_of_replicas":1},
"mappings": {
"properties": {
"origin": {
"properties": {
"binary_hash": {"type":"keyword"}
}
}
}
}
}
async def init_index():
if not await es.indices.exists(index=INDEX_NAME):
await es.indices.create(index=INDEX_NAME, body=index_body)
print(f"Created index '{INDEX_NAME}'")
else:
print(f"Index '{INDEX_NAME}' already exists, skipping creation.")
# Index will be initialized when the app starts
# ——————————————
# CORE PROCESSING LOGIC
# ——————————————
async def process_file_on_disk(path: str):
"""
1. Compute SHA256 hash by streaming the file in chunks.
2. If OpenSearch already has a doc with that ID, skip.
3. Otherwise, run Docling.convert(path) JSON index into OpenSearch.
"""
# 1) compute hash
sha256 = hashlib.sha256()
async with aiofiles.open(path, "rb") as f:
while True:
chunk = await f.read(1 << 20) # 1 MiB
if not chunk:
break
sha256.update(chunk)
file_hash = sha256.hexdigest()
# 2) check in OpenSearch
exists = await es.exists(index=INDEX_NAME, id=file_hash)
if exists:
return {"path": path, "status": "unchanged", "id": file_hash}
# 3) parse + index
result = converter.convert(path)
doc_dict = result.document.export_to_dict()
await es.index(index=INDEX_NAME, id=file_hash, body=doc_dict)
return {"path": path, "status": "indexed", "id": file_hash}
async def upload(request: Request):
"""
POST /upload
Form-data with a `file` field. Streams to disk + processes it.
"""
form = await request.form()
upload_file = form["file"] # starlette.datastructures.UploadFile
# stream into a temp file while hashing
sha256 = hashlib.sha256()
tmp = tempfile.NamedTemporaryFile(delete=False)
try:
while True:
chunk = await upload_file.read(1 << 20)
if not chunk:
break
sha256.update(chunk)
tmp.write(chunk)
tmp.flush()
file_hash = sha256.hexdigest()
# if you prefer the Datastax pattern for naming IDs, see:
# https://github.com/datastax/astra-assistants-api/blob/main/impl/utils.py#L229 :contentReference[oaicite:1]{index=1}
# check + index
exists = await es.exists(index=INDEX_NAME, id=file_hash)
if exists:
return JSONResponse({"status": "unchanged", "id": file_hash})
result = converter.convert(tmp.name)
doc_dict = result.document.export_to_dict()
await es.index(index=INDEX_NAME, id=file_hash, body=doc_dict)
return JSONResponse({"status": "indexed", "id": file_hash})
finally:
tmp.close()
os.remove(tmp.name)
async def upload_path(request: Request):
"""
POST /upload_path
JSON body: { "path": "/absolute/path/to/dir" }
Recursively processes every file found there in parallel.
"""
payload = await request.json()
base_dir = payload.get("path")
if not base_dir or not os.path.isdir(base_dir):
return JSONResponse({"error": "Invalid path"}, status_code=400)
tasks = []
for root, _, files in os.walk(base_dir):
for fn in files:
full = os.path.join(root, fn)
tasks.append(process_file_on_disk(full))
results = await asyncio.gather(*tasks)
return JSONResponse({"results": results})
app = Starlette(debug=True, routes=[
Route("/upload", upload, methods=["POST"]),
Route("/upload_path", upload_path, methods=["POST"]),
])
if __name__ == "__main__":
import uvicorn
# Initialize index before starting server
asyncio.run(init_index())
uvicorn.run(
"app:app", # "module:variable"
host="0.0.0.0",
port=8000,
reload=True, # dev only
)

1944
uv.lock generated Normal file

File diff suppressed because it is too large Load diff