From e841b09d631f2426067a2e45f25721e8d9ca9285 Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Fri, 14 Nov 2025 20:39:54 +0800 Subject: [PATCH 1/6] Remove unused code and fix performance issue (#11284) ### What problem does this PR solve? 1. remove redundant code 2. fix miner performance issue ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [x] Refactoring Signed-off-by: Jin Hai --- agent/canvas.py | 2 -- agent/component/base.py | 11 +++++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/agent/canvas.py b/agent/canvas.py index f262cd597..e18cb8d26 100644 --- a/agent/canvas.py +++ b/agent/canvas.py @@ -298,8 +298,6 @@ class Canvas(Graph): for kk, vv in kwargs["webhook_payload"].items(): self.components[k]["obj"].set_output(kk, vv) - self.components[k]["obj"].reset(True) - for k in kwargs.keys(): if k in ["query", "user_id", "files"] and kwargs[k]: if k == "files": diff --git a/agent/component/base.py b/agent/component/base.py index 31ad46820..0864ccb9e 100644 --- a/agent/component/base.py +++ b/agent/component/base.py @@ -463,12 +463,15 @@ class ComponentBase(ABC): return self._param.outputs.get("_ERROR", {}).get("value") def reset(self, only_output=False): - for k in self._param.outputs.keys(): - self._param.outputs[k]["value"] = None + outputs: dict = self._param.outputs # for better performance + for k in outputs.keys(): + outputs[k]["value"] = None if only_output: return - for k in self._param.inputs.keys(): - self._param.inputs[k]["value"] = None + + inputs: dict = self._param.inputs # for better performance + for k in inputs.keys(): + inputs[k]["value"] = None self._param.debug_inputs = {} def get_input(self, key: str=None) -> Union[Any, dict[str, Any]]: From 61cf430dbb8507ee7d53cc5fe35a23ee8e271e55 Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Sun, 16 Nov 2025 19:29:20 +0800 Subject: [PATCH 2/6] Minor tweats (#11271) ### What problem does this PR solve? As title. ### Type of change - [x] Refactoring --------- Signed-off-by: Jin Hai --- api/db/db_models.py | 5 +++-- api/db/services/connector_service.py | 6 ++--- api/db/services/dialog_service.py | 8 ++++--- api/db/services/document_service.py | 28 ++++++++++++------------ api/db/services/knowledgebase_service.py | 1 + api/utils/email_templates.py | 16 ++++++++++++++ api/utils/json_encode.py | 16 ++++++++++++++ 7 files changed, 58 insertions(+), 22 deletions(-) diff --git a/api/db/db_models.py b/api/db/db_models.py index 68bf37ce4..2b4c4a0ef 100644 --- a/api/db/db_models.py +++ b/api/db/db_models.py @@ -305,6 +305,7 @@ class RetryingPooledMySQLDatabase(PooledMySQLDatabase): time.sleep(self.retry_delay * (2 ** attempt)) else: raise + return None class RetryingPooledPostgresqlDatabase(PooledPostgresqlDatabase): @@ -772,7 +773,7 @@ class Document(DataBaseModel): thumbnail = TextField(null=True, help_text="thumbnail base64 string") kb_id = CharField(max_length=256, null=False, index=True) parser_id = CharField(max_length=32, null=False, help_text="default parser ID", index=True) - pipeline_id = CharField(max_length=32, null=True, help_text="pipleline ID", index=True) + pipeline_id = CharField(max_length=32, null=True, help_text="pipeline ID", index=True) parser_config = JSONField(null=False, default={"pages": [[1, 1000000]]}) source_type = CharField(max_length=128, null=False, default="local", help_text="where dose this document come from", index=True) type = CharField(max_length=32, null=False, help_text="file extension", index=True) @@ -876,7 +877,7 @@ class Dialog(DataBaseModel): class Conversation(DataBaseModel): id = CharField(max_length=32, primary_key=True) dialog_id = CharField(max_length=32, null=False, index=True) - name = CharField(max_length=255, null=True, help_text="converastion name", index=True) + name = CharField(max_length=255, null=True, help_text="conversation name", index=True) message = JSONField(null=True) reference = JSONField(null=True, default=[]) user_id = CharField(max_length=255, null=True, help_text="user_id", index=True) diff --git a/api/db/services/connector_service.py b/api/db/services/connector_service.py index 3e65c87da..2f29c3324 100644 --- a/api/db/services/connector_service.py +++ b/api/db/services/connector_service.py @@ -70,7 +70,7 @@ class ConnectorService(CommonService): def rebuild(cls, kb_id:str, connector_id: str, tenant_id:str): e, conn = cls.get_by_id(connector_id) if not e: - return + return None SyncLogsService.filter_delete([SyncLogs.connector_id==connector_id, SyncLogs.kb_id==kb_id]) docs = DocumentService.query(source_type=f"{conn.source}/{conn.id}", kb_id=kb_id) err = FileService.delete_docs([d.id for d in docs], tenant_id) @@ -125,11 +125,11 @@ class SyncLogsService(CommonService): ) query = query.distinct().order_by(cls.model.update_time.desc()) - totbal = query.count() + total = query.count() if page_number: query = query.paginate(page_number, items_per_page) - return list(query.dicts()), totbal + return list(query.dicts()), total @classmethod def start(cls, id, connector_id): diff --git a/api/db/services/dialog_service.py b/api/db/services/dialog_service.py index f54ebf709..d2f3b9bc1 100644 --- a/api/db/services/dialog_service.py +++ b/api/db/services/dialog_service.py @@ -342,7 +342,7 @@ def chat(dialog, messages, stream=True, **kwargs): if not dialog.kb_ids and not dialog.prompt_config.get("tavily_api_key"): for ans in chat_solo(dialog, messages, stream): yield ans - return + return None chat_start_ts = timer() @@ -386,7 +386,7 @@ def chat(dialog, messages, stream=True, **kwargs): ans = use_sql(questions[-1], field_map, dialog.tenant_id, chat_mdl, prompt_config.get("quote", True), dialog.kb_ids) if ans: yield ans - return + return None for p in prompt_config["parameters"]: if p["key"] == "knowledge": @@ -617,6 +617,8 @@ def chat(dialog, messages, stream=True, **kwargs): res["audio_binary"] = tts(tts_mdl, answer) yield res + return None + def use_sql(question, field_map, tenant_id, chat_mdl, quota=True, kb_ids=None): sys_prompt = """ @@ -745,7 +747,7 @@ Please write the SQL, only SQL, without any other explanations or text. def tts(tts_mdl, text): if not tts_mdl or not text: - return + return None bin = b"" for chunk in tts_mdl.tts(text): bin += chunk diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index 530133164..0abf1b1f3 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -113,7 +113,7 @@ class DocumentService(CommonService): def check_doc_health(cls, tenant_id: str, filename): import os MAX_FILE_NUM_PER_USER = int(os.environ.get("MAX_FILE_NUM_PER_USER", 0)) - if MAX_FILE_NUM_PER_USER > 0 and DocumentService.get_doc_count(tenant_id) >= MAX_FILE_NUM_PER_USER: + if 0 < MAX_FILE_NUM_PER_USER <= DocumentService.get_doc_count(tenant_id): raise RuntimeError("Exceed the maximum file number of a free user!") if len(filename.encode("utf-8")) > FILE_NAME_LEN_LIMIT: raise RuntimeError("Exceed the maximum length of file name!") @@ -464,7 +464,7 @@ class DocumentService(CommonService): cls.model.id == doc_id, Knowledgebase.status == StatusEnum.VALID.value) docs = docs.dicts() if not docs: - return + return None return docs[0]["tenant_id"] @classmethod @@ -473,7 +473,7 @@ class DocumentService(CommonService): docs = cls.model.select(cls.model.kb_id).where(cls.model.id == doc_id) docs = docs.dicts() if not docs: - return + return None return docs[0]["kb_id"] @classmethod @@ -486,7 +486,7 @@ class DocumentService(CommonService): cls.model.name == name, Knowledgebase.status == StatusEnum.VALID.value) docs = docs.dicts() if not docs: - return + return None return docs[0]["tenant_id"] @classmethod @@ -533,7 +533,7 @@ class DocumentService(CommonService): cls.model.id == doc_id, Knowledgebase.status == StatusEnum.VALID.value) docs = docs.dicts() if not docs: - return + return None return docs[0]["embd_id"] @classmethod @@ -569,7 +569,7 @@ class DocumentService(CommonService): .where(cls.model.name == doc_name) doc_id = doc_id.dicts() if not doc_id: - return + return None return doc_id[0]["id"] @classmethod @@ -715,7 +715,7 @@ class DocumentService(CommonService): prg = 1 status = TaskStatus.DONE.value - # only for special task and parsed docs and unfinised + # only for special task and parsed docs and unfinished freeze_progress = special_task_running and doc_progress >= 1 and not finished msg = "\n".join(sorted(msg)) info = { @@ -974,13 +974,13 @@ def doc_upload_and_parse(conversation_id, file_objs, user_id): def embedding(doc_id, cnts, batch_size=16): nonlocal embd_mdl, chunk_counts, token_counts - vects = [] + vectors = [] for i in range(0, len(cnts), batch_size): vts, c = embd_mdl.encode(cnts[i: i + batch_size]) - vects.extend(vts.tolist()) + vectors.extend(vts.tolist()) chunk_counts[doc_id] += len(cnts[i:i + batch_size]) token_counts[doc_id] += c - return vects + return vectors idxnm = search.index_name(kb.tenant_id) try_create_idx = True @@ -1011,15 +1011,15 @@ def doc_upload_and_parse(conversation_id, file_objs, user_id): except Exception: logging.exception("Mind map generation error") - vects = embedding(doc_id, [c["content_with_weight"] for c in cks]) - assert len(cks) == len(vects) + vectors = embedding(doc_id, [c["content_with_weight"] for c in cks]) + assert len(cks) == len(vectors) for i, d in enumerate(cks): - v = vects[i] + v = vectors[i] d["q_%d_vec" % len(v)] = v for b in range(0, len(cks), es_bulk_size): if try_create_idx: if not settings.docStoreConn.indexExist(idxnm, kb_id): - settings.docStoreConn.createIdx(idxnm, kb_id, len(vects[0])) + settings.docStoreConn.createIdx(idxnm, kb_id, len(vectors[0])) try_create_idx = False settings.docStoreConn.insert(cks[b:b + es_bulk_size], idxnm, kb_id) diff --git a/api/db/services/knowledgebase_service.py b/api/db/services/knowledgebase_service.py index 03179da49..ca30ca074 100644 --- a/api/db/services/knowledgebase_service.py +++ b/api/db/services/knowledgebase_service.py @@ -424,6 +424,7 @@ class KnowledgebaseService(CommonService): # Default parser_config (align with kb_app.create) — do not accept external overrides payload["parser_config"] = get_parser_config(parser_id, kwargs.get("parser_config")) + return payload diff --git a/api/utils/email_templates.py b/api/utils/email_templates.py index 10473908a..34201ee38 100644 --- a/api/utils/email_templates.py +++ b/api/utils/email_templates.py @@ -1,3 +1,19 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + """ Reusable HTML email templates and registry. """ diff --git a/api/utils/json_encode.py b/api/utils/json_encode.py index b21addd4f..fa5ea973a 100644 --- a/api/utils/json_encode.py +++ b/api/utils/json_encode.py @@ -1,3 +1,19 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + import datetime import json from enum import Enum, IntEnum From 13e212c8561dda5a8f2cc31df64eea5354abbc4f Mon Sep 17 00:00:00 2001 From: Yongteng Lei Date: Mon, 17 Nov 2025 09:38:04 +0800 Subject: [PATCH 3/6] Feat: add Jira connector (#11285) ### What problem does this PR solve? Add Jira connector. image --- image --- image --- image ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- common/data_source/__init__.py | 2 +- common/data_source/config.py | 17 + common/data_source/confluence_connector.py | 1 + common/data_source/jira/__init__.py | 0 common/data_source/jira/connector.py | 973 ++++++++++++++++++ common/data_source/jira/utils.py | 149 +++ common/data_source/jira_connector.py | 112 -- common/data_source/utils.py | 40 +- common/log_utils.py | 2 +- rag/svr/sync_data_source.py | 208 ++-- web/src/assets/svg/data-source/jira.svg | 16 + web/src/locales/en.ts | 27 + web/src/locales/zh.ts | 17 + .../user-setting/data-source/contant.tsx | 130 ++- .../pages/user-setting/data-source/index.tsx | 6 + 15 files changed, 1521 insertions(+), 179 deletions(-) create mode 100644 common/data_source/jira/__init__.py create mode 100644 common/data_source/jira/connector.py create mode 100644 common/data_source/jira/utils.py delete mode 100644 common/data_source/jira_connector.py create mode 100644 web/src/assets/svg/data-source/jira.svg diff --git a/common/data_source/__init__.py b/common/data_source/__init__.py index 0802a5285..611c3c61a 100644 --- a/common/data_source/__init__.py +++ b/common/data_source/__init__.py @@ -11,7 +11,7 @@ from .confluence_connector import ConfluenceConnector from .discord_connector import DiscordConnector from .dropbox_connector import DropboxConnector from .google_drive.connector import GoogleDriveConnector -from .jira_connector import JiraConnector +from .jira.connector import JiraConnector from .sharepoint_connector import SharePointConnector from .teams_connector import TeamsConnector from .config import BlobType, DocumentSource diff --git a/common/data_source/config.py b/common/data_source/config.py index 02684dbac..e4040f85e 100644 --- a/common/data_source/config.py +++ b/common/data_source/config.py @@ -13,6 +13,7 @@ def get_current_tz_offset() -> int: return round(time_diff.total_seconds() / 3600) +ONE_MINUTE = 60 ONE_HOUR = 3600 ONE_DAY = ONE_HOUR * 24 @@ -42,6 +43,7 @@ class DocumentSource(str, Enum): OCI_STORAGE = "oci_storage" SLACK = "slack" CONFLUENCE = "confluence" + JIRA = "jira" GOOGLE_DRIVE = "google_drive" GMAIL = "gmail" DISCORD = "discord" @@ -178,6 +180,21 @@ GOOGLE_DRIVE_CONNECTOR_SIZE_THRESHOLD = int( os.environ.get("GOOGLE_DRIVE_CONNECTOR_SIZE_THRESHOLD", 10 * 1024 * 1024) ) +JIRA_CONNECTOR_LABELS_TO_SKIP = [ + ignored_tag + for ignored_tag in os.environ.get("JIRA_CONNECTOR_LABELS_TO_SKIP", "").split(",") + if ignored_tag +] +JIRA_CONNECTOR_MAX_TICKET_SIZE = int( + os.environ.get("JIRA_CONNECTOR_MAX_TICKET_SIZE", 100 * 1024) +) +JIRA_SYNC_TIME_BUFFER_SECONDS = int( + os.environ.get("JIRA_SYNC_TIME_BUFFER_SECONDS", ONE_MINUTE) +) +JIRA_TIMEZONE_OFFSET = float( + os.environ.get("JIRA_TIMEZONE_OFFSET", get_current_tz_offset()) +) + OAUTH_SLACK_CLIENT_ID = os.environ.get("OAUTH_SLACK_CLIENT_ID", "") OAUTH_SLACK_CLIENT_SECRET = os.environ.get("OAUTH_SLACK_CLIENT_SECRET", "") OAUTH_CONFLUENCE_CLOUD_CLIENT_ID = os.environ.get( diff --git a/common/data_source/confluence_connector.py b/common/data_source/confluence_connector.py index aed16ad2b..821f79862 100644 --- a/common/data_source/confluence_connector.py +++ b/common/data_source/confluence_connector.py @@ -1788,6 +1788,7 @@ class ConfluenceConnector( cql_url = self.confluence_client.build_cql_url( page_query, expand=",".join(_PAGE_EXPANSION_FIELDS) ) + logging.info(f"[Confluence Connector] Building CQL URL {cql_url}") return update_param_in_path(cql_url, "limit", str(limit)) @override diff --git a/common/data_source/jira/__init__.py b/common/data_source/jira/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/common/data_source/jira/connector.py b/common/data_source/jira/connector.py new file mode 100644 index 000000000..4635d72f3 --- /dev/null +++ b/common/data_source/jira/connector.py @@ -0,0 +1,973 @@ +"""Checkpointed Jira connector that emits markdown blobs for each issue.""" + +from __future__ import annotations + +import argparse +import copy +import logging +import os +import re +from collections.abc import Callable, Generator, Iterable, Iterator, Sequence +from datetime import datetime, timedelta, timezone +from typing import Any +from zoneinfo import ZoneInfo, ZoneInfoNotFoundError + +from jira import JIRA +from jira.resources import Issue +from pydantic import Field + +from common.data_source.config import ( + INDEX_BATCH_SIZE, + JIRA_CONNECTOR_LABELS_TO_SKIP, + JIRA_CONNECTOR_MAX_TICKET_SIZE, + JIRA_TIMEZONE_OFFSET, + ONE_HOUR, + DocumentSource, +) +from common.data_source.exceptions import ( + ConnectorMissingCredentialError, + ConnectorValidationError, + InsufficientPermissionsError, + UnexpectedValidationError, +) +from common.data_source.interfaces import ( + CheckpointedConnectorWithPermSync, + CheckpointOutputWrapper, + SecondsSinceUnixEpoch, + SlimConnectorWithPermSync, +) +from common.data_source.jira.utils import ( + JIRA_CLOUD_API_VERSION, + JIRA_SERVER_API_VERSION, + build_issue_url, + extract_body_text, + extract_named_value, + extract_user, + format_attachments, + format_comments, + parse_jira_datetime, + should_skip_issue, +) +from common.data_source.models import ( + ConnectorCheckpoint, + ConnectorFailure, + Document, + DocumentFailure, + SlimDocument, +) +from common.data_source.utils import is_atlassian_cloud_url, is_atlassian_date_error, scoped_url + +logger = logging.getLogger(__name__) + +_DEFAULT_FIELDS = "summary,description,updated,created,status,priority,assignee,reporter,labels,issuetype,project,comment,attachment" +_SLIM_FIELDS = "key,project" +_MAX_RESULTS_FETCH_IDS = 5000 +_JIRA_SLIM_PAGE_SIZE = 500 +_JIRA_FULL_PAGE_SIZE = 50 +_DEFAULT_ATTACHMENT_SIZE_LIMIT = 10 * 1024 * 1024 # 10MB + + +class JiraCheckpoint(ConnectorCheckpoint): + """Checkpoint that tracks which slice of the current JQL result set was emitted.""" + + start_at: int = 0 + cursor: str | None = None + ids_done: bool = False + all_issue_ids: list[list[str]] = Field(default_factory=list) + + +_TZ_OFFSET_PATTERN = re.compile(r"([+-])(\d{2})(:?)(\d{2})$") + + +class JiraConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync): + """Retrieve Jira issues and emit them as markdown documents.""" + + def __init__( + self, + jira_base_url: str, + project_key: str | None = None, + jql_query: str | None = None, + batch_size: int = INDEX_BATCH_SIZE, + include_comments: bool = True, + include_attachments: bool = False, + labels_to_skip: Sequence[str] | None = None, + comment_email_blacklist: Sequence[str] | None = None, + scoped_token: bool = False, + attachment_size_limit: int | None = None, + timezone_offset: float | None = None, + ) -> None: + if not jira_base_url: + raise ConnectorValidationError("Jira base URL must be provided.") + + self.jira_base_url = jira_base_url.rstrip("/") + self.project_key = project_key + self.jql_query = jql_query + self.batch_size = batch_size + self.include_comments = include_comments + self.include_attachments = include_attachments + configured_labels = labels_to_skip or JIRA_CONNECTOR_LABELS_TO_SKIP + self.labels_to_skip = {label.lower() for label in configured_labels} + self.comment_email_blacklist = {email.lower() for email in comment_email_blacklist or []} + self.scoped_token = scoped_token + self.jira_client: JIRA | None = None + + self.max_ticket_size = JIRA_CONNECTOR_MAX_TICKET_SIZE + self.attachment_size_limit = attachment_size_limit if attachment_size_limit and attachment_size_limit > 0 else _DEFAULT_ATTACHMENT_SIZE_LIMIT + self._fields_param = _DEFAULT_FIELDS + self._slim_fields = _SLIM_FIELDS + + tz_offset_value = float(timezone_offset) if timezone_offset is not None else float(JIRA_TIMEZONE_OFFSET) + self.timezone_offset = tz_offset_value + self.timezone = timezone(offset=timedelta(hours=tz_offset_value)) + self._timezone_overridden = timezone_offset is not None + + # ------------------------------------------------------------------------- + # Connector lifecycle helpers + # ------------------------------------------------------------------------- + + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: + """Instantiate the Jira client using either an API token or username/password.""" + jira_url_for_client = self.jira_base_url + if self.scoped_token: + if is_atlassian_cloud_url(self.jira_base_url): + try: + jira_url_for_client = scoped_url(self.jira_base_url, "jira") + except ValueError as exc: + raise ConnectorValidationError(str(exc)) from exc + else: + logger.warning(f"[Jira] Scoped token requested but Jira base URL {self.jira_base_url} does not appear to be an Atlassian Cloud domain; scoped token ignored.") + + user_email = credentials.get("jira_user_email") or credentials.get("username") + api_token = credentials.get("jira_api_token") or credentials.get("token") or credentials.get("api_token") + password = credentials.get("jira_password") or credentials.get("password") + rest_api_version = credentials.get("rest_api_version") + + if not rest_api_version: + rest_api_version = JIRA_CLOUD_API_VERSION if api_token else JIRA_SERVER_API_VERSION + options: dict[str, Any] = {"rest_api_version": rest_api_version} + + try: + if user_email and api_token: + self.jira_client = JIRA( + server=jira_url_for_client, + basic_auth=(user_email, api_token), + options=options, + ) + elif api_token: + self.jira_client = JIRA( + server=jira_url_for_client, + token_auth=api_token, + options=options, + ) + elif user_email and password: + self.jira_client = JIRA( + server=jira_url_for_client, + basic_auth=(user_email, password), + options=options, + ) + else: + raise ConnectorMissingCredentialError("Jira credentials must include either an API token or username/password.") + except Exception as exc: # pragma: no cover - jira lib raises many types + raise ConnectorMissingCredentialError(f"Jira: {exc}") from exc + self._sync_timezone_from_server() + return None + + def validate_connector_settings(self) -> None: + """Validate connectivity by fetching basic Jira info.""" + if not self.jira_client: + raise ConnectorMissingCredentialError("Jira") + + try: + if self.jql_query: + dummy_checkpoint = self.build_dummy_checkpoint() + checkpoint_callback = self._make_checkpoint_callback(dummy_checkpoint) + iterator = self._perform_jql_search( + jql=self.jql_query, + start=0, + max_results=1, + fields="key", + all_issue_ids=dummy_checkpoint.all_issue_ids, + checkpoint_callback=checkpoint_callback, + next_page_token=dummy_checkpoint.cursor, + ids_done=dummy_checkpoint.ids_done, + ) + next(iter(iterator), None) + elif self.project_key: + self.jira_client.project(self.project_key) + else: + self.jira_client.projects() + except Exception as exc: # pragma: no cover - dependent on Jira responses + self._handle_validation_error(exc) + + # ------------------------------------------------------------------------- + # Checkpointed connector implementation + # ------------------------------------------------------------------------- + + def load_from_checkpoint( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: JiraCheckpoint, + ) -> Generator[Document | ConnectorFailure, None, JiraCheckpoint]: + """Load Jira issues, emitting a Document per issue.""" + try: + return (yield from self._load_with_retry(start, end, checkpoint)) + except Exception as exc: + logger.exception(f"[Jira] Jira query ultimately failed: {exc}") + yield ConnectorFailure( + failure_message=f"Failed to query Jira: {exc}", + exception=exc, + ) + return JiraCheckpoint(has_more=False, start_at=checkpoint.start_at) + + def load_from_checkpoint_with_perm_sync( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: JiraCheckpoint, + ) -> Generator[Document | ConnectorFailure, None, JiraCheckpoint]: + """Permissions are not synced separately, so reuse the standard loader.""" + return (yield from self.load_from_checkpoint(start=start, end=end, checkpoint=checkpoint)) + + def _load_with_retry( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: JiraCheckpoint, + ) -> Generator[Document | ConnectorFailure, None, JiraCheckpoint]: + if not self.jira_client: + raise ConnectorMissingCredentialError("Jira") + + attempt_start = start + retried_with_buffer = False + attempt = 0 + + while True: + attempt += 1 + jql = self._build_jql(attempt_start, end) + logger.info(f"[Jira] Executing Jira JQL attempt {attempt} (start={attempt_start}, end={end}, buffered_retry={retried_with_buffer}): {jql}") + try: + return (yield from self._load_from_checkpoint_internal(jql, checkpoint, start_filter=start)) + except Exception as exc: + if attempt_start is not None and not retried_with_buffer and is_atlassian_date_error(exc): + attempt_start = attempt_start - ONE_HOUR + retried_with_buffer = True + logger.info(f"[Jira] Atlassian date error detected; retrying with start={attempt_start}.") + continue + raise + + def _handle_validation_error(self, exc: Exception) -> None: + status_code = getattr(exc, "status_code", None) + if status_code == 401: + raise InsufficientPermissionsError("Jira credential appears to be invalid or expired (HTTP 401).") from exc + if status_code == 403: + raise InsufficientPermissionsError("Jira token does not have permission to access the requested resources (HTTP 403).") from exc + if status_code == 404: + raise ConnectorValidationError("Jira resource not found (HTTP 404).") from exc + if status_code == 429: + raise ConnectorValidationError("Jira rate limit exceeded during validation (HTTP 429).") from exc + + message = getattr(exc, "text", str(exc)) + if not message: + raise UnexpectedValidationError("Unexpected Jira validation error.") from exc + + raise ConnectorValidationError(f"Jira validation failed: {message}") from exc + + def _load_from_checkpoint_internal( + self, + jql: str, + checkpoint: JiraCheckpoint, + start_filter: SecondsSinceUnixEpoch | None = None, + ) -> Generator[Document | ConnectorFailure, None, JiraCheckpoint]: + assert self.jira_client, "load_credentials must be called before loading issues." + + page_size = self._full_page_size() + new_checkpoint = copy.deepcopy(checkpoint) + starting_offset = new_checkpoint.start_at or 0 + current_offset = starting_offset + checkpoint_callback = self._make_checkpoint_callback(new_checkpoint) + + issue_iter = self._perform_jql_search( + jql=jql, + start=current_offset, + max_results=page_size, + fields=self._fields_param, + all_issue_ids=new_checkpoint.all_issue_ids, + checkpoint_callback=checkpoint_callback, + next_page_token=new_checkpoint.cursor, + ids_done=new_checkpoint.ids_done, + ) + + start_cutoff = float(start_filter) if start_filter is not None else None + + for issue in issue_iter: + current_offset += 1 + issue_key = getattr(issue, "key", "unknown") + if should_skip_issue(issue, self.labels_to_skip): + continue + + issue_updated = parse_jira_datetime(issue.raw.get("fields", {}).get("updated")) + if start_cutoff is not None and issue_updated is not None and issue_updated.timestamp() <= start_cutoff: + # Jira JQL only supports minute precision, so we discard already-processed + # issues here based on the original second-level cutoff. + continue + + try: + document = self._issue_to_document(issue) + except Exception as exc: # pragma: no cover - defensive + logger.exception(f"[Jira] Failed to convert Jira issue {issue_key}: {exc}") + yield ConnectorFailure( + failure_message=f"Failed to convert Jira issue {issue_key}: {exc}", + failed_document=DocumentFailure( + document_id=issue_key, + document_link=build_issue_url(self.jira_base_url, issue_key), + ), + exception=exc, + ) + continue + + if document is not None: + yield document + if self.include_attachments: + for attachment_document in self._attachment_documents(issue): + if attachment_document is not None: + yield attachment_document + + self._update_checkpoint_for_next_run( + checkpoint=new_checkpoint, + current_offset=current_offset, + starting_offset=starting_offset, + page_size=page_size, + ) + new_checkpoint.start_at = current_offset + return new_checkpoint + + def build_dummy_checkpoint(self) -> JiraCheckpoint: + """Create an empty checkpoint used to kick off ingestion.""" + return JiraCheckpoint(has_more=True, start_at=0) + + def validate_checkpoint_json(self, checkpoint_json: str) -> JiraCheckpoint: + """Validate a serialized checkpoint.""" + return JiraCheckpoint.model_validate_json(checkpoint_json) + + # ------------------------------------------------------------------------- + # Slim connector implementation + # ------------------------------------------------------------------------- + + def retrieve_all_slim_docs_perm_sync( + self, + start: SecondsSinceUnixEpoch | None = None, + end: SecondsSinceUnixEpoch | None = None, + callback: Any = None, # noqa: ARG002 - maintained for interface compatibility + ) -> Generator[list[SlimDocument], None, None]: + """Return lightweight references to Jira issues (used for permission syncing).""" + if not self.jira_client: + raise ConnectorMissingCredentialError("Jira") + + start_ts = start if start is not None else 0 + end_ts = end if end is not None else datetime.now(timezone.utc).timestamp() + jql = self._build_jql(start_ts, end_ts) + + checkpoint = self.build_dummy_checkpoint() + checkpoint_callback = self._make_checkpoint_callback(checkpoint) + prev_offset = 0 + current_offset = 0 + slim_batch: list[SlimDocument] = [] + + while checkpoint.has_more: + for issue in self._perform_jql_search( + jql=jql, + start=current_offset, + max_results=_JIRA_SLIM_PAGE_SIZE, + fields=self._slim_fields, + all_issue_ids=checkpoint.all_issue_ids, + checkpoint_callback=checkpoint_callback, + next_page_token=checkpoint.cursor, + ids_done=checkpoint.ids_done, + ): + current_offset += 1 + if should_skip_issue(issue, self.labels_to_skip): + continue + + doc_id = build_issue_url(self.jira_base_url, issue.key) + slim_batch.append(SlimDocument(id=doc_id)) + + if len(slim_batch) >= _JIRA_SLIM_PAGE_SIZE: + yield slim_batch + slim_batch = [] + + self._update_checkpoint_for_next_run( + checkpoint=checkpoint, + current_offset=current_offset, + starting_offset=prev_offset, + page_size=_JIRA_SLIM_PAGE_SIZE, + ) + prev_offset = current_offset + + if slim_batch: + yield slim_batch + + # ------------------------------------------------------------------------- + # Internal helpers + # ------------------------------------------------------------------------- + + def _build_jql(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> str: + clauses: list[str] = [] + if self.jql_query: + clauses.append(f"({self.jql_query})") + elif self.project_key: + clauses.append(f'project = "{self.project_key}"') + else: + raise ConnectorValidationError("Either project_key or jql_query must be provided for Jira connector.") + + if self.labels_to_skip: + labels = ", ".join(f'"{label}"' for label in self.labels_to_skip) + clauses.append(f"labels NOT IN ({labels})") + + if start is not None: + clauses.append(f'updated >= "{self._format_jql_time(start)}"') + if end is not None: + clauses.append(f'updated <= "{self._format_jql_time(end)}"') + + if not clauses: + raise ConnectorValidationError("Unable to build Jira JQL query.") + + jql = " AND ".join(clauses) + if "order by" not in jql.lower(): + jql = f"{jql} ORDER BY updated ASC" + return jql + + def _format_jql_time(self, timestamp: SecondsSinceUnixEpoch) -> str: + dt_utc = datetime.fromtimestamp(float(timestamp), tz=timezone.utc) + dt_local = dt_utc.astimezone(self.timezone) + # Jira only accepts minute-precision timestamps in JQL, so we format accordingly + # and rely on a post-query second-level filter to avoid duplicates. + return dt_local.strftime("%Y-%m-%d %H:%M") + + def _issue_to_document(self, issue: Issue) -> Document | None: + fields = issue.raw.get("fields", {}) + summary = fields.get("summary") or "" + description_text = extract_body_text(fields.get("description")) + comments_text = ( + format_comments( + fields.get("comment"), + blacklist=self.comment_email_blacklist, + ) + if self.include_comments + else "" + ) + attachments_text = format_attachments(fields.get("attachment")) + + reporter_name, reporter_email = extract_user(fields.get("reporter")) + assignee_name, assignee_email = extract_user(fields.get("assignee")) + status = extract_named_value(fields.get("status")) + priority = extract_named_value(fields.get("priority")) + issue_type = extract_named_value(fields.get("issuetype")) + project = fields.get("project") or {} + + issue_url = build_issue_url(self.jira_base_url, issue.key) + + metadata_lines = [ + f"key: {issue.key}", + f"url: {issue_url}", + f"summary: {summary}", + f"status: {status or 'Unknown'}", + f"priority: {priority or 'Unspecified'}", + f"issue_type: {issue_type or 'Unknown'}", + f"project: {project.get('name') or ''}", + f"project_key: {project.get('key') or self.project_key or ''}", + ] + + if reporter_name: + metadata_lines.append(f"reporter: {reporter_name}") + if reporter_email: + metadata_lines.append(f"reporter_email: {reporter_email}") + if assignee_name: + metadata_lines.append(f"assignee: {assignee_name}") + if assignee_email: + metadata_lines.append(f"assignee_email: {assignee_email}") + if fields.get("labels"): + metadata_lines.append(f"labels: {', '.join(fields.get('labels'))}") + + created_dt = parse_jira_datetime(fields.get("created")) + updated_dt = parse_jira_datetime(fields.get("updated")) or created_dt or datetime.now(timezone.utc) + metadata_lines.append(f"created: {created_dt.isoformat() if created_dt else ''}") + metadata_lines.append(f"updated: {updated_dt.isoformat() if updated_dt else ''}") + + sections: list[str] = [ + "---", + "\n".join(filter(None, metadata_lines)), + "---", + "", + "## Description", + description_text or "No description provided.", + ] + + if comments_text: + sections.extend(["", "## Comments", comments_text]) + if attachments_text: + sections.extend(["", "## Attachments", attachments_text]) + + blob_text = "\n".join(sections).strip() + "\n" + blob = blob_text.encode("utf-8") + + if len(blob) > self.max_ticket_size: + logger.info(f"[Jira] Skipping {issue.key} because it exceeds the maximum size of {self.max_ticket_size} bytes.") + return None + + semantic_identifier = f"{issue.key}: {summary}" if summary else issue.key + + return Document( + id=issue_url, + source=DocumentSource.JIRA, + semantic_identifier=semantic_identifier, + extension=".md", + blob=blob, + doc_updated_at=updated_dt, + size_bytes=len(blob), + ) + + def _attachment_documents(self, issue: Issue) -> Iterable[Document]: + attachments = issue.raw.get("fields", {}).get("attachment") or [] + for attachment in attachments: + try: + document = self._attachment_to_document(issue, attachment) + if document is not None: + yield document + except Exception as exc: # pragma: no cover - defensive + failed_id = attachment.get("id") or attachment.get("filename") + issue_key = getattr(issue, "key", "unknown") + logger.warning(f"[Jira] Failed to process attachment {failed_id} for issue {issue_key}: {exc}") + + def _attachment_to_document(self, issue: Issue, attachment: dict[str, Any]) -> Document | None: + if not self.include_attachments: + return None + + filename = attachment.get("filename") + content_url = attachment.get("content") + if not filename or not content_url: + return None + + try: + attachment_size = int(attachment.get("size", 0)) + except (TypeError, ValueError): + attachment_size = 0 + if attachment_size and attachment_size > self.attachment_size_limit: + logger.info(f"[Jira] Skipping attachment {filename} on {issue.key} because reported size exceeds limit ({self.attachment_size_limit} bytes).") + return None + + blob = self._download_attachment(content_url) + if blob is None: + return None + + if len(blob) > self.attachment_size_limit: + logger.info(f"[Jira] Skipping attachment {filename} on {issue.key} because it exceeds the size limit ({self.attachment_size_limit} bytes).") + return None + + attachment_time = parse_jira_datetime(attachment.get("created")) or parse_jira_datetime(attachment.get("updated")) + updated_dt = attachment_time or parse_jira_datetime(issue.raw.get("fields", {}).get("updated")) or datetime.now(timezone.utc) + + extension = os.path.splitext(filename)[1] or "" + document_id = f"{issue.key}::attachment::{attachment.get('id') or filename}" + semantic_identifier = f"{issue.key} attachment: {filename}" + + return Document( + id=document_id, + source=DocumentSource.JIRA, + semantic_identifier=semantic_identifier, + extension=extension, + blob=blob, + doc_updated_at=updated_dt, + size_bytes=len(blob), + ) + + def _download_attachment(self, url: str) -> bytes | None: + if not self.jira_client: + raise ConnectorMissingCredentialError("Jira") + response = self.jira_client._session.get(url) + response.raise_for_status() + return response.content + + def _sync_timezone_from_server(self) -> None: + if self._timezone_overridden or not self.jira_client: + return + try: + server_info = self.jira_client.server_info() + except Exception as exc: # pragma: no cover - defensive + logger.info(f"[Jira] Unable to determine timezone from server info; continuing with offset {self.timezone_offset}. Error: {exc}") + return + + detected_offset = self._extract_timezone_offset(server_info) + if detected_offset is None or detected_offset == self.timezone_offset: + return + + self.timezone_offset = detected_offset + self.timezone = timezone(offset=timedelta(hours=detected_offset)) + logger.info(f"[Jira] Timezone offset adjusted to {detected_offset} hours using Jira server info.") + + def _extract_timezone_offset(self, server_info: dict[str, Any]) -> float | None: + server_time_raw = server_info.get("serverTime") + if isinstance(server_time_raw, str): + offset = self._parse_offset_from_datetime_string(server_time_raw) + if offset is not None: + return offset + + tz_name = server_info.get("timeZone") + if isinstance(tz_name, str): + offset = self._offset_from_zone_name(tz_name) + if offset is not None: + return offset + return None + + @staticmethod + def _parse_offset_from_datetime_string(value: str) -> float | None: + normalized = JiraConnector._normalize_datetime_string(value) + try: + dt = datetime.fromisoformat(normalized) + except ValueError: + return None + + if dt.tzinfo is None: + return 0.0 + + offset = dt.tzinfo.utcoffset(dt) + if offset is None: + return None + return offset.total_seconds() / 3600.0 + + @staticmethod + def _normalize_datetime_string(value: str) -> str: + trimmed = (value or "").strip() + if trimmed.endswith("Z"): + return f"{trimmed[:-1]}+00:00" + + match = _TZ_OFFSET_PATTERN.search(trimmed) + if match and match.group(3) != ":": + sign, hours, _, minutes = match.groups() + trimmed = f"{trimmed[: match.start()]}{sign}{hours}:{minutes}" + return trimmed + + @staticmethod + def _offset_from_zone_name(name: str) -> float | None: + try: + tz = ZoneInfo(name) + except (ZoneInfoNotFoundError, ValueError): + return None + reference = datetime.now(tz) + offset = reference.utcoffset() + if offset is None: + return None + return offset.total_seconds() / 3600.0 + + def _is_cloud_client(self) -> bool: + if not self.jira_client: + return False + rest_version = str(self.jira_client._options.get("rest_api_version", "")).strip() + return rest_version == str(JIRA_CLOUD_API_VERSION) + + def _full_page_size(self) -> int: + return max(1, min(self.batch_size, _JIRA_FULL_PAGE_SIZE)) + + def _perform_jql_search( + self, + *, + jql: str, + start: int, + max_results: int, + fields: str | None = None, + all_issue_ids: list[list[str]] | None = None, + checkpoint_callback: Callable[[Iterable[list[str]], str | None], None] | None = None, + next_page_token: str | None = None, + ids_done: bool = False, + ) -> Iterable[Issue]: + assert self.jira_client, "Jira client not initialized." + is_cloud = self._is_cloud_client() + if is_cloud: + if all_issue_ids is None: + raise ValueError("all_issue_ids is required for Jira Cloud searches.") + yield from self._perform_jql_search_v3( + jql=jql, + max_results=max_results, + fields=fields, + all_issue_ids=all_issue_ids, + checkpoint_callback=checkpoint_callback, + next_page_token=next_page_token, + ids_done=ids_done, + ) + else: + yield from self._perform_jql_search_v2( + jql=jql, + start=start, + max_results=max_results, + fields=fields, + ) + + def _perform_jql_search_v3( + self, + *, + jql: str, + max_results: int, + all_issue_ids: list[list[str]], + fields: str | None = None, + checkpoint_callback: Callable[[Iterable[list[str]], str | None], None] | None = None, + next_page_token: str | None = None, + ids_done: bool = False, + ) -> Iterable[Issue]: + assert self.jira_client, "Jira client not initialized." + + if not ids_done: + new_ids, page_token = self._enhanced_search_ids(jql, next_page_token) + if checkpoint_callback is not None and new_ids: + checkpoint_callback( + self._chunk_issue_ids(new_ids, max_results), + page_token, + ) + elif checkpoint_callback is not None: + checkpoint_callback([], page_token) + + if all_issue_ids: + issue_ids = all_issue_ids.pop() + if issue_ids: + yield from self._bulk_fetch_issues(issue_ids, fields) + + def _perform_jql_search_v2( + self, + *, + jql: str, + start: int, + max_results: int, + fields: str | None = None, + ) -> Iterable[Issue]: + assert self.jira_client, "Jira client not initialized." + + issues = self.jira_client.search_issues( + jql_str=jql, + startAt=start, + maxResults=max_results, + fields=fields or self._fields_param, + expand="renderedFields", + ) + for issue in issues: + yield issue + + def _enhanced_search_ids( + self, + jql: str, + next_page_token: str | None, + ) -> tuple[list[str], str | None]: + assert self.jira_client, "Jira client not initialized." + enhanced_search_path = self.jira_client._get_url("search/jql") + params: dict[str, str | int | None] = { + "jql": jql, + "maxResults": _MAX_RESULTS_FETCH_IDS, + "nextPageToken": next_page_token, + "fields": "id", + } + response = self.jira_client._session.get(enhanced_search_path, params=params) + response.raise_for_status() + data = response.json() + return [str(issue["id"]) for issue in data.get("issues", [])], data.get("nextPageToken") + + def _bulk_fetch_issues( + self, + issue_ids: list[str], + fields: str | None, + ) -> Iterable[Issue]: + assert self.jira_client, "Jira client not initialized." + if not issue_ids: + return [] + + bulk_fetch_path = self.jira_client._get_url("issue/bulkfetch") + payload: dict[str, Any] = {"issueIdsOrKeys": issue_ids} + payload["fields"] = fields.split(",") if fields else ["*all"] + + response = self.jira_client._session.post(bulk_fetch_path, json=payload) + response.raise_for_status() + data = response.json() + return [Issue(self.jira_client._options, self.jira_client._session, raw=issue) for issue in data.get("issues", [])] + + @staticmethod + def _chunk_issue_ids(issue_ids: list[str], chunk_size: int) -> Iterable[list[str]]: + if chunk_size <= 0: + chunk_size = _JIRA_FULL_PAGE_SIZE + + for idx in range(0, len(issue_ids), chunk_size): + yield issue_ids[idx : idx + chunk_size] + + def _make_checkpoint_callback(self, checkpoint: JiraCheckpoint) -> Callable[[Iterable[list[str]], str | None], None]: + def checkpoint_callback( + issue_ids: Iterable[list[str]] | list[list[str]], + page_token: str | None, + ) -> None: + for id_batch in issue_ids: + checkpoint.all_issue_ids.append(list(id_batch)) + checkpoint.cursor = page_token + checkpoint.ids_done = page_token is None + + return checkpoint_callback + + def _update_checkpoint_for_next_run( + self, + *, + checkpoint: JiraCheckpoint, + current_offset: int, + starting_offset: int, + page_size: int, + ) -> None: + if self._is_cloud_client(): + checkpoint.has_more = bool(checkpoint.all_issue_ids) or not checkpoint.ids_done + else: + checkpoint.has_more = current_offset - starting_offset == page_size + checkpoint.cursor = None + checkpoint.ids_done = True + checkpoint.all_issue_ids = [] + + +def iterate_jira_documents( + connector: "JiraConnector", + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + iteration_limit: int = 100_000, +) -> Iterator[Document]: + """Yield documents without materializing the entire result set.""" + + checkpoint = connector.build_dummy_checkpoint() + iterations = 0 + + while checkpoint.has_more: + wrapper = CheckpointOutputWrapper[JiraCheckpoint]() + generator = wrapper(connector.load_from_checkpoint(start=start, end=end, checkpoint=checkpoint)) + + for document, failure, next_checkpoint in generator: + if failure is not None: + failure_message = getattr(failure, "failure_message", str(failure)) + raise RuntimeError(f"Failed to load Jira documents: {failure_message}") + if document is not None: + yield document + if next_checkpoint is not None: + checkpoint = next_checkpoint + + iterations += 1 + if iterations > iteration_limit: + raise RuntimeError("Too many iterations while loading Jira documents.") + + +def test_jira( + *, + base_url: str, + project_key: str | None = None, + jql_query: str | None = None, + credentials: dict[str, Any], + batch_size: int = INDEX_BATCH_SIZE, + start_ts: float | None = None, + end_ts: float | None = None, + connector_options: dict[str, Any] | None = None, +) -> list[Document]: + """Programmatic entry point that mirrors the CLI workflow.""" + + connector_kwargs = connector_options.copy() if connector_options else {} + connector = JiraConnector( + jira_base_url=base_url, + project_key=project_key, + jql_query=jql_query, + batch_size=batch_size, + **connector_kwargs, + ) + connector.load_credentials(credentials) + connector.validate_connector_settings() + + now_ts = datetime.now(timezone.utc).timestamp() + start = start_ts if start_ts is not None else 0.0 + end = end_ts if end_ts is not None else now_ts + + documents = list(iterate_jira_documents(connector, start=start, end=end)) + logger.info(f"[Jira] Fetched {len(documents)} Jira documents.") + for doc in documents[:5]: + logger.info(f"[Jira] Document {doc.semantic_identifier} ({doc.id}) size={doc.size_bytes} bytes") + return documents + + +def _build_arg_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Fetch Jira issues and print summary statistics.") + parser.add_argument("--base-url", dest="base_url", default=os.environ.get("JIRA_BASE_URL")) + parser.add_argument("--project", dest="project_key", default=os.environ.get("JIRA_PROJECT_KEY")) + parser.add_argument("--jql", dest="jql_query", default=os.environ.get("JIRA_JQL")) + parser.add_argument("--email", dest="user_email", default=os.environ.get("JIRA_USER_EMAIL")) + parser.add_argument("--token", dest="api_token", default=os.environ.get("JIRA_API_TOKEN")) + parser.add_argument("--password", dest="password", default=os.environ.get("JIRA_PASSWORD")) + parser.add_argument("--batch-size", dest="batch_size", type=int, default=int(os.environ.get("JIRA_BATCH_SIZE", INDEX_BATCH_SIZE))) + parser.add_argument("--include_comments", dest="include_comments", type=bool, default=True) + parser.add_argument("--include_attachments", dest="include_attachments", type=bool, default=True) + parser.add_argument("--attachment_size_limit", dest="attachment_size_limit", type=float, default=_DEFAULT_ATTACHMENT_SIZE_LIMIT) + parser.add_argument("--start-ts", dest="start_ts", type=float, default=None, help="Epoch seconds inclusive lower bound for updated issues.") + parser.add_argument("--end-ts", dest="end_ts", type=float, default=9999999999, help="Epoch seconds inclusive upper bound for updated issues.") + return parser + + +def main(config: dict[str, Any] | None = None) -> None: + if config is None: + args = _build_arg_parser().parse_args() + config = { + "base_url": args.base_url, + "project_key": args.project_key, + "jql_query": args.jql_query, + "batch_size": args.batch_size, + "start_ts": args.start_ts, + "end_ts": args.end_ts, + "include_comments": args.include_comments, + "include_attachments": args.include_attachments, + "attachment_size_limit": args.attachment_size_limit, + "credentials": { + "jira_user_email": args.user_email, + "jira_api_token": args.api_token, + "jira_password": args.password, + }, + } + + base_url = config.get("base_url") + credentials = config.get("credentials", {}) + + print(f"[Jira] {config=}", flush=True) + print(f"[Jira] {credentials=}", flush=True) + + if not base_url: + raise RuntimeError("Jira base URL must be provided via config or CLI arguments.") + if not (credentials.get("jira_api_token") or (credentials.get("jira_user_email") and credentials.get("jira_password"))): + raise RuntimeError("Provide either an API token or both email/password for Jira authentication.") + + connector_options = { + key: value + for key, value in ( + ("include_comments", config.get("include_comments")), + ("include_attachments", config.get("include_attachments")), + ("attachment_size_limit", config.get("attachment_size_limit")), + ("labels_to_skip", config.get("labels_to_skip")), + ("comment_email_blacklist", config.get("comment_email_blacklist")), + ("scoped_token", config.get("scoped_token")), + ("timezone_offset", config.get("timezone_offset")), + ) + if value is not None + } + + documents = test_jira( + base_url=base_url, + project_key=config.get("project_key"), + jql_query=config.get("jql_query"), + credentials=credentials, + batch_size=config.get("batch_size", INDEX_BATCH_SIZE), + start_ts=config.get("start_ts"), + end_ts=config.get("end_ts"), + connector_options=connector_options, + ) + + preview_count = min(len(documents), 5) + for idx in range(preview_count): + doc = documents[idx] + print(f"[Jira] [Sample {idx + 1}] {doc.semantic_identifier} | id={doc.id} | size={doc.size_bytes} bytes") + + print(f"[Jira] Jira connector test completed. Documents fetched: {len(documents)}") + + +if __name__ == "__main__": # pragma: no cover - manual execution path + logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s %(name)s %(message)s") + main() diff --git a/common/data_source/jira/utils.py b/common/data_source/jira/utils.py new file mode 100644 index 000000000..62219d36d --- /dev/null +++ b/common/data_source/jira/utils.py @@ -0,0 +1,149 @@ +"""Helper utilities for the Jira connector.""" + +from __future__ import annotations + +import os +from collections.abc import Collection +from datetime import datetime, timezone +from typing import Any, Iterable + +from jira.resources import Issue + +from common.data_source.utils import datetime_from_string + +JIRA_SERVER_API_VERSION = os.environ.get("JIRA_SERVER_API_VERSION", "2") +JIRA_CLOUD_API_VERSION = os.environ.get("JIRA_CLOUD_API_VERSION", "3") + + +def build_issue_url(base_url: str, issue_key: str) -> str: + """Return the canonical UI URL for a Jira issue.""" + return f"{base_url.rstrip('/')}/browse/{issue_key}" + + +def parse_jira_datetime(value: Any) -> datetime | None: + """Best-effort parse of Jira datetime values to aware UTC datetimes.""" + if value is None: + return None + if isinstance(value, datetime): + return value.astimezone(timezone.utc) if value.tzinfo else value.replace(tzinfo=timezone.utc) + if isinstance(value, str): + return datetime_from_string(value) + return None + + +def extract_named_value(value: Any) -> str | None: + """Extract a readable string out of Jira's typed objects.""" + if value is None: + return None + if isinstance(value, str): + return value + if isinstance(value, dict): + return value.get("name") or value.get("value") + return getattr(value, "name", None) + + +def extract_user(value: Any) -> tuple[str | None, str | None]: + """Return display name + email tuple for a Jira user blob.""" + if value is None: + return None, None + if isinstance(value, dict): + return value.get("displayName"), value.get("emailAddress") + + display = getattr(value, "displayName", None) + email = getattr(value, "emailAddress", None) + return display, email + + +def extract_text_from_adf(adf: Any) -> str: + """Flatten Atlassian Document Format (ADF) structures to text.""" + texts: list[str] = [] + + def _walk(node: Any) -> None: + if node is None: + return + if isinstance(node, dict): + node_type = node.get("type") + if node_type == "text": + texts.append(node.get("text", "")) + for child in node.get("content", []): + _walk(child) + elif isinstance(node, list): + for child in node: + _walk(child) + + _walk(adf) + return "\n".join(part for part in texts if part) + + +def extract_body_text(value: Any) -> str: + """Normalize Jira description/comments (raw/adf/str) into plain text.""" + if value is None: + return "" + if isinstance(value, str): + return value.strip() + if isinstance(value, dict): + return extract_text_from_adf(value).strip() + return str(value).strip() + + +def format_comments( + comment_block: Any, + *, + blacklist: Collection[str], +) -> str: + """Convert Jira comments into a markdown-ish bullet list.""" + if not isinstance(comment_block, dict): + return "" + + comments = comment_block.get("comments") or [] + lines: list[str] = [] + normalized_blacklist = {email.lower() for email in blacklist if email} + + for comment in comments: + author = comment.get("author") or {} + author_email = (author.get("emailAddress") or "").lower() + if author_email and author_email in normalized_blacklist: + continue + + author_name = author.get("displayName") or author.get("name") or author_email or "Unknown" + created = parse_jira_datetime(comment.get("created")) + created_str = created.isoformat() if created else "Unknown time" + body = extract_body_text(comment.get("body")) + if not body: + continue + + lines.append(f"- {author_name} ({created_str}):\n{body}") + + return "\n\n".join(lines) + + +def format_attachments(attachments: Any) -> str: + """List Jira attachments as bullet points.""" + if not isinstance(attachments, list): + return "" + + attachment_lines: list[str] = [] + for attachment in attachments: + filename = attachment.get("filename") + if not filename: + continue + size = attachment.get("size") + size_text = f" ({size} bytes)" if isinstance(size, int) else "" + content_url = attachment.get("content") or "" + url_suffix = f" -> {content_url}" if content_url else "" + attachment_lines.append(f"- {filename}{size_text}{url_suffix}") + + return "\n".join(attachment_lines) + + +def should_skip_issue(issue: Issue, labels_to_skip: set[str]) -> bool: + """Return True if the issue contains any label from the skip list.""" + if not labels_to_skip: + return False + + fields = getattr(issue, "raw", {}).get("fields", {}) + labels: Iterable[str] = fields.get("labels") or [] + for label in labels: + if (label or "").lower() in labels_to_skip: + return True + return False diff --git a/common/data_source/jira_connector.py b/common/data_source/jira_connector.py deleted file mode 100644 index 4d6f1160e..000000000 --- a/common/data_source/jira_connector.py +++ /dev/null @@ -1,112 +0,0 @@ -"""Jira connector""" - -from typing import Any - -from jira import JIRA - -from common.data_source.config import INDEX_BATCH_SIZE -from common.data_source.exceptions import ( - ConnectorValidationError, - InsufficientPermissionsError, - UnexpectedValidationError, ConnectorMissingCredentialError -) -from common.data_source.interfaces import ( - CheckpointedConnectorWithPermSync, - SecondsSinceUnixEpoch, - SlimConnectorWithPermSync -) -from common.data_source.models import ( - ConnectorCheckpoint -) - - -class JiraConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync): - """Jira connector for accessing Jira issues and projects""" - - def __init__(self, batch_size: int = INDEX_BATCH_SIZE) -> None: - self.batch_size = batch_size - self.jira_client: JIRA | None = None - - def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: - """Load Jira credentials""" - try: - url = credentials.get("url") - username = credentials.get("username") - password = credentials.get("password") - token = credentials.get("token") - - if not url: - raise ConnectorMissingCredentialError("Jira URL is required") - - if token: - # API token authentication - self.jira_client = JIRA(server=url, token_auth=token) - elif username and password: - # Basic authentication - self.jira_client = JIRA(server=url, basic_auth=(username, password)) - else: - raise ConnectorMissingCredentialError("Jira credentials are incomplete") - - return None - except Exception as e: - raise ConnectorMissingCredentialError(f"Jira: {e}") - - def validate_connector_settings(self) -> None: - """Validate Jira connector settings""" - if not self.jira_client: - raise ConnectorMissingCredentialError("Jira") - - try: - # Test connection by getting server info - self.jira_client.server_info() - except Exception as e: - if "401" in str(e) or "403" in str(e): - raise InsufficientPermissionsError("Invalid credentials or insufficient permissions") - elif "404" in str(e): - raise ConnectorValidationError("Jira instance not found") - else: - raise UnexpectedValidationError(f"Jira validation error: {e}") - - def poll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> Any: - """Poll Jira for recent issues""" - # Simplified implementation - in production this would handle actual polling - return [] - - def load_from_checkpoint( - self, - start: SecondsSinceUnixEpoch, - end: SecondsSinceUnixEpoch, - checkpoint: ConnectorCheckpoint, - ) -> Any: - """Load documents from checkpoint""" - # Simplified implementation - return [] - - def load_from_checkpoint_with_perm_sync( - self, - start: SecondsSinceUnixEpoch, - end: SecondsSinceUnixEpoch, - checkpoint: ConnectorCheckpoint, - ) -> Any: - """Load documents from checkpoint with permission sync""" - # Simplified implementation - return [] - - def build_dummy_checkpoint(self) -> ConnectorCheckpoint: - """Build dummy checkpoint""" - return ConnectorCheckpoint() - - def validate_checkpoint_json(self, checkpoint_json: str) -> ConnectorCheckpoint: - """Validate checkpoint JSON""" - # Simplified implementation - return ConnectorCheckpoint() - - def retrieve_all_slim_docs_perm_sync( - self, - start: SecondsSinceUnixEpoch | None = None, - end: SecondsSinceUnixEpoch | None = None, - callback: Any = None, - ) -> Any: - """Retrieve all simplified documents with permission sync""" - # Simplified implementation - return [] \ No newline at end of file diff --git a/common/data_source/utils.py b/common/data_source/utils.py index 7c2cdf898..b42c3833b 100644 --- a/common/data_source/utils.py +++ b/common/data_source/utils.py @@ -48,17 +48,35 @@ from common.data_source.exceptions import RateLimitTriedTooManyTimesError from common.data_source.interfaces import CT, CheckpointedConnector, CheckpointOutputWrapper, ConfluenceUser, LoadFunction, OnyxExtensionType, SecondsSinceUnixEpoch, TokenResponse from common.data_source.models import BasicExpertInfo, Document +_TZ_SUFFIX_PATTERN = re.compile(r"([+-])([\d:]+)$") + def datetime_from_string(datetime_string: str) -> datetime: datetime_string = datetime_string.strip() + match_jira_format = _TZ_SUFFIX_PATTERN.search(datetime_string) + if match_jira_format: + sign, tz_field = match_jira_format.groups() + digits = tz_field.replace(":", "") + + if digits.isdigit() and 1 <= len(digits) <= 4: + if len(digits) >= 3: + hours = digits[:-2].rjust(2, "0") + minutes = digits[-2:] + else: + hours = digits.rjust(2, "0") + minutes = "00" + + normalized = f"{sign}{hours}:{minutes}" + datetime_string = f"{datetime_string[: match_jira_format.start()]}{normalized}" + # Handle the case where the datetime string ends with 'Z' (Zulu time) - if datetime_string.endswith('Z'): - datetime_string = datetime_string[:-1] + '+00:00' + if datetime_string.endswith("Z"): + datetime_string = datetime_string[:-1] + "+00:00" # Handle timezone format "+0000" -> "+00:00" - if datetime_string.endswith('+0000'): - datetime_string = datetime_string[:-5] + '+00:00' + if datetime_string.endswith("+0000"): + datetime_string = datetime_string[:-5] + "+00:00" datetime_object = datetime.fromisoformat(datetime_string) @@ -480,7 +498,7 @@ def get_file_ext(file_name: str) -> str: def is_accepted_file_ext(file_ext: str, extension_type: OnyxExtensionType) -> bool: - image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'} + image_extensions = {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"} text_extensions = {".txt", ".md", ".mdx", ".conf", ".log", ".json", ".csv", ".tsv", ".xml", ".yml", ".yaml", ".sql"} document_extensions = {".pdf", ".docx", ".pptx", ".xlsx", ".eml", ".epub", ".html"} @@ -902,6 +920,18 @@ def load_all_docs_from_checkpoint_connector( ) +_ATLASSIAN_CLOUD_DOMAINS = (".atlassian.net", ".jira.com", ".jira-dev.com") + + +def is_atlassian_cloud_url(url: str) -> bool: + try: + host = urlparse(url).hostname or "" + except ValueError: + return False + host = host.lower() + return any(host.endswith(domain) for domain in _ATLASSIAN_CLOUD_DOMAINS) + + def get_cloudId(base_url: str) -> str: tenant_info_url = urljoin(base_url, "/_edge/tenant_info") response = requests.get(tenant_info_url, timeout=10) diff --git a/common/log_utils.py b/common/log_utils.py index e2110ebeb..abbcd286b 100644 --- a/common/log_utils.py +++ b/common/log_utils.py @@ -80,4 +80,4 @@ def log_exception(e, *args): raise Exception(a.text) else: logging.error(str(a)) - raise e \ No newline at end of file + raise e diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 3dc9a7a3c..6925eb5f7 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -20,33 +20,40 @@ import copy +import faulthandler +import logging +import os +import signal import sys import threading import time import traceback +from datetime import datetime, timezone from typing import Any +import trio + from api.db.services.connector_service import ConnectorService, SyncLogsService from api.db.services.knowledgebase_service import KnowledgebaseService -from common.log_utils import init_root_logger -from common.config_utils import show_configs -from common.data_source import BlobStorageConnector, NotionConnector, DiscordConnector, GoogleDriveConnector -import logging -import os -from datetime import datetime, timezone -import signal -import trio -import faulthandler -from common.constants import FileSource, TaskStatus from common import settings -from common.versions import get_ragflow_version +from common.config_utils import show_configs +from common.constants import FileSource, TaskStatus +from common.data_source import ( + BlobStorageConnector, + DiscordConnector, + GoogleDriveConnector, + JiraConnector, + NotionConnector, +) +from common.data_source.config import INDEX_BATCH_SIZE from common.data_source.confluence_connector import ConfluenceConnector from common.data_source.interfaces import CheckpointOutputWrapper from common.data_source.utils import load_all_docs_from_checkpoint_connector -from common.data_source.config import INDEX_BATCH_SIZE +from common.log_utils import init_root_logger from common.signal_utils import start_tracemalloc_and_snapshot, stop_tracemalloc +from common.versions import get_ragflow_version -MAX_CONCURRENT_TASKS = int(os.environ.get('MAX_CONCURRENT_TASKS', "5")) +MAX_CONCURRENT_TASKS = int(os.environ.get("MAX_CONCURRENT_TASKS", "5")) task_limiter = trio.Semaphore(MAX_CONCURRENT_TASKS) @@ -72,31 +79,32 @@ class SyncBase: min_update = min([doc.doc_updated_at for doc in document_batch]) max_update = max([doc.doc_updated_at for doc in document_batch]) next_update = max([next_update, max_update]) - docs = [{ - "id": doc.id, - "connector_id": task["connector_id"], - "source": self.SOURCE_NAME, - "semantic_identifier": doc.semantic_identifier, - "extension": doc.extension, - "size_bytes": doc.size_bytes, - "doc_updated_at": doc.doc_updated_at, - "blob": doc.blob - } for doc in document_batch] + docs = [ + { + "id": doc.id, + "connector_id": task["connector_id"], + "source": self.SOURCE_NAME, + "semantic_identifier": doc.semantic_identifier, + "extension": doc.extension, + "size_bytes": doc.size_bytes, + "doc_updated_at": doc.doc_updated_at, + "blob": doc.blob, + } + for doc in document_batch + ] e, kb = KnowledgebaseService.get_by_id(task["kb_id"]) err, dids = SyncLogsService.duplicate_and_parse(kb, docs, task["tenant_id"], f"{self.SOURCE_NAME}/{task['connector_id']}", task["auto_parse"]) SyncLogsService.increase_docs(task["id"], min_update, max_update, len(docs), "\n".join(err), len(err)) doc_num += len(docs) - logging.info("{} docs synchronized till {}".format(doc_num, next_update)) + prefix = "[Jira] " if self.SOURCE_NAME == FileSource.JIRA else "" + logging.info(f"{prefix}{doc_num} docs synchronized till {next_update}") SyncLogsService.done(task["id"], task["connector_id"]) task["poll_range_start"] = next_update except Exception as ex: - msg = '\n'.join([ - ''.join(traceback.format_exception_only(None, ex)).strip(), - ''.join(traceback.format_exception(None, ex, ex.__traceback__)).strip() - ]) + msg = "\n".join(["".join(traceback.format_exception_only(None, ex)).strip(), "".join(traceback.format_exception(None, ex, ex.__traceback__)).strip()]) SyncLogsService.update_by_id(task["id"], {"status": TaskStatus.FAIL, "full_exception_trace": msg, "error_msg": str(ex)}) SyncLogsService.schedule(task["connector_id"], task["kb_id"], task["poll_range_start"]) @@ -109,21 +117,16 @@ class S3(SyncBase): SOURCE_NAME: str = FileSource.S3 async def _generate(self, task: dict): - self.connector = BlobStorageConnector( - bucket_type=self.conf.get("bucket_type", "s3"), - bucket_name=self.conf["bucket_name"], - prefix=self.conf.get("prefix", "") - ) + self.connector = BlobStorageConnector(bucket_type=self.conf.get("bucket_type", "s3"), bucket_name=self.conf["bucket_name"], prefix=self.conf.get("prefix", "")) self.connector.load_credentials(self.conf["credentials"]) - document_batch_generator = self.connector.load_from_state() if task["reindex"]=="1" or not task["poll_range_start"] \ - else self.connector.poll_source(task["poll_range_start"].timestamp(), datetime.now(timezone.utc).timestamp()) + document_batch_generator = ( + self.connector.load_from_state() + if task["reindex"] == "1" or not task["poll_range_start"] + else self.connector.poll_source(task["poll_range_start"].timestamp(), datetime.now(timezone.utc).timestamp()) + ) - begin_info = "totally" if task["reindex"]=="1" or not task["poll_range_start"] else "from {}".format(task["poll_range_start"]) - logging.info("Connect to {}: {}(prefix/{}) {}".format(self.conf.get("bucket_type", "s3"), - self.conf["bucket_name"], - self.conf.get("prefix", ""), - begin_info - )) + begin_info = "totally" if task["reindex"] == "1" or not task["poll_range_start"] else "from {}".format(task["poll_range_start"]) + logging.info("Connect to {}: {}(prefix/{}) {}".format(self.conf.get("bucket_type", "s3"), self.conf["bucket_name"], self.conf.get("prefix", ""), begin_info)) return document_batch_generator @@ -131,8 +134,8 @@ class Confluence(SyncBase): SOURCE_NAME: str = FileSource.CONFLUENCE async def _generate(self, task: dict): - from common.data_source.interfaces import StaticCredentialsProvider from common.data_source.config import DocumentSource + from common.data_source.interfaces import StaticCredentialsProvider self.connector = ConfluenceConnector( wiki_base=self.conf["wiki_base"], @@ -141,11 +144,7 @@ class Confluence(SyncBase): # page_id=self.conf.get("page_id", ""), ) - credentials_provider = StaticCredentialsProvider( - tenant_id=task["tenant_id"], - connector_name=DocumentSource.CONFLUENCE, - credential_json=self.conf["credentials"] - ) + credentials_provider = StaticCredentialsProvider(tenant_id=task["tenant_id"], connector_name=DocumentSource.CONFLUENCE, credential_json=self.conf["credentials"]) self.connector.set_credentials_provider(credentials_provider) # Determine the time range for synchronization based on reindex or poll_range_start @@ -174,10 +173,13 @@ class Notion(SyncBase): async def _generate(self, task: dict): self.connector = NotionConnector(root_page_id=self.conf["root_page_id"]) self.connector.load_credentials(self.conf["credentials"]) - document_generator = self.connector.load_from_state() if task["reindex"]=="1" or not task["poll_range_start"] \ - else self.connector.poll_source(task["poll_range_start"].timestamp(), datetime.now(timezone.utc).timestamp()) + document_generator = ( + self.connector.load_from_state() + if task["reindex"] == "1" or not task["poll_range_start"] + else self.connector.poll_source(task["poll_range_start"].timestamp(), datetime.now(timezone.utc).timestamp()) + ) - begin_info = "totally" if task["reindex"]=="1" or not task["poll_range_start"] else "from {}".format(task["poll_range_start"]) + begin_info = "totally" if task["reindex"] == "1" or not task["poll_range_start"] else "from {}".format(task["poll_range_start"]) logging.info("Connect to Notion: root({}) {}".format(self.conf["root_page_id"], begin_info)) return document_generator @@ -194,13 +196,16 @@ class Discord(SyncBase): server_ids=server_ids.split(",") if server_ids else [], channel_names=channel_names.split(",") if channel_names else [], start_date=datetime(1970, 1, 1, tzinfo=timezone.utc).strftime("%Y-%m-%d"), - batch_size=self.conf.get("batch_size", 1024) + batch_size=self.conf.get("batch_size", 1024), ) self.connector.load_credentials(self.conf["credentials"]) - document_generator = self.connector.load_from_state() if task["reindex"]=="1" or not task["poll_range_start"] \ - else self.connector.poll_source(task["poll_range_start"].timestamp(), datetime.now(timezone.utc).timestamp()) + document_generator = ( + self.connector.load_from_state() + if task["reindex"] == "1" or not task["poll_range_start"] + else self.connector.poll_source(task["poll_range_start"].timestamp(), datetime.now(timezone.utc).timestamp()) + ) - begin_info = "totally" if task["reindex"]=="1" or not task["poll_range_start"] else "from {}".format(task["poll_range_start"]) + begin_info = "totally" if task["reindex"] == "1" or not task["poll_range_start"] else "from {}".format(task["poll_range_start"]) logging.info("Connect to Discord: servers({}), channel({}) {}".format(server_ids, channel_names, begin_info)) return document_generator @@ -285,7 +290,7 @@ class GoogleDrive(SyncBase): admin_email = self.connector.primary_admin_email except RuntimeError: admin_email = "unknown" - logging.info("Connect to Google Drive as %s %s", admin_email, begin_info) + logging.info(f"Connect to Google Drive as {admin_email} {begin_info}") return document_batches() def _persist_rotated_credentials(self, connector_id: str, credentials: dict[str, Any]) -> None: @@ -303,7 +308,93 @@ class Jira(SyncBase): SOURCE_NAME: str = FileSource.JIRA async def _generate(self, task: dict): - pass + connector_kwargs = { + "jira_base_url": self.conf["base_url"], + "project_key": self.conf.get("project_key"), + "jql_query": self.conf.get("jql_query"), + "batch_size": self.conf.get("batch_size", INDEX_BATCH_SIZE), + "include_comments": self.conf.get("include_comments", True), + "include_attachments": self.conf.get("include_attachments", False), + "labels_to_skip": self._normalize_list(self.conf.get("labels_to_skip")), + "comment_email_blacklist": self._normalize_list(self.conf.get("comment_email_blacklist")), + "scoped_token": self.conf.get("scoped_token", False), + "attachment_size_limit": self.conf.get("attachment_size_limit"), + "timezone_offset": self.conf.get("timezone_offset"), + } + + self.connector = JiraConnector(**connector_kwargs) + + credentials = self.conf.get("credentials") + if not credentials: + raise ValueError("Jira connector is missing credentials.") + + self.connector.load_credentials(credentials) + self.connector.validate_connector_settings() + + if task["reindex"] == "1" or not task["poll_range_start"]: + start_time = 0.0 + begin_info = "totally" + else: + start_time = task["poll_range_start"].timestamp() + begin_info = f"from {task['poll_range_start']}" + + end_time = datetime.now(timezone.utc).timestamp() + + raw_batch_size = self.conf.get("sync_batch_size") or self.conf.get("batch_size") or INDEX_BATCH_SIZE + try: + batch_size = int(raw_batch_size) + except (TypeError, ValueError): + batch_size = INDEX_BATCH_SIZE + if batch_size <= 0: + batch_size = INDEX_BATCH_SIZE + + def document_batches(): + checkpoint = self.connector.build_dummy_checkpoint() + pending_docs = [] + iterations = 0 + iteration_limit = 100_000 + + while checkpoint.has_more: + wrapper = CheckpointOutputWrapper() + generator = wrapper( + self.connector.load_from_checkpoint( + start_time, + end_time, + checkpoint, + ) + ) + for document, failure, next_checkpoint in generator: + if failure is not None: + logging.warning( + f"[Jira] Jira connector failure: {getattr(failure, 'failure_message', failure)}" + ) + continue + if document is not None: + pending_docs.append(document) + if len(pending_docs) >= batch_size: + yield pending_docs + pending_docs = [] + if next_checkpoint is not None: + checkpoint = next_checkpoint + + iterations += 1 + if iterations > iteration_limit: + logging.error(f"[Jira] Task {task.get('id')} exceeded iteration limit ({iteration_limit}).") + raise RuntimeError("Too many iterations while loading Jira documents.") + + if pending_docs: + yield pending_docs + + logging.info(f"[Jira] Connect to Jira {connector_kwargs['jira_base_url']} {begin_info}") + return document_batches() + + @staticmethod + def _normalize_list(values: Any) -> list[str] | None: + if values is None: + return None + if isinstance(values, str): + values = [item.strip() for item in values.split(",")] + return [str(value).strip() for value in values if value is not None and str(value).strip()] class SharePoint(SyncBase): @@ -337,9 +428,10 @@ func_factory = { FileSource.JIRA: Jira, FileSource.SHAREPOINT: SharePoint, FileSource.SLACK: Slack, - FileSource.TEAMS: Teams + FileSource.TEAMS: Teams, } + async def dispatch_tasks(): async with trio.open_nursery() as nursery: while True: @@ -385,7 +477,7 @@ async def main(): __/ | |___/ """) - logging.info(f'RAGFlow version: {get_ragflow_version()}') + logging.info(f"RAGFlow version: {get_ragflow_version()}") show_configs() settings.init_settings() if sys.platform != "win32": diff --git a/web/src/assets/svg/data-source/jira.svg b/web/src/assets/svg/data-source/jira.svg new file mode 100644 index 000000000..8f9cd8b97 --- /dev/null +++ b/web/src/assets/svg/data-source/jira.svg @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts index e2035a378..350a64db8 100644 --- a/web/src/locales/en.ts +++ b/web/src/locales/en.ts @@ -732,6 +732,33 @@ Example: general/v2/`, 'Comma-separated emails whose “My Drive” contents should be indexed (include the primary admin).', google_driveSharedFoldersTip: 'Comma-separated Google Drive folder links to crawl.', + jiraDescription: + 'Connect your Jira workspace to sync issues, comments, and attachments.', + jiraBaseUrlTip: + 'Base URL of your Jira site (e.g., https://your-domain.atlassian.net).', + jiraProjectKeyTip: + 'Optional: limit syncing to a single project key (e.g., ENG).', + jiraJqlTip: + 'Optional JQL filter. Leave blank to rely on project/time filters.', + jiraBatchSizeTip: + 'Maximum number of issues requested from Jira per batch.', + jiraCommentsTip: + 'Include Jira comments in the generated markdown document.', + jiraAttachmentsTip: + 'Download attachments as separate documents during sync.', + jiraAttachmentSizeTip: + 'Attachments larger than this number of bytes will be skipped.', + jiraLabelsTip: + 'Labels that should be skipped while indexing (comma separated).', + jiraBlacklistTip: + 'Comments whose author email matches these entries will be ignored.', + jiraScopedTokenTip: + 'Enable this when using scoped Atlassian tokens (api.atlassian.com).', + jiraEmailTip: 'Email associated with the Jira account/API token.', + jiraTokenTip: + 'API token generated from https://id.atlassian.com/manage-profile/security/api-tokens.', + jiraPasswordTip: + 'Optional password for Jira Server/Data Center environments.', availableSourcesDescription: 'Select a data source to add', availableSources: 'Available sources', datasourceDescription: 'Manage your data source and connections', diff --git a/web/src/locales/zh.ts b/web/src/locales/zh.ts index 301719117..b6d25dc1f 100644 --- a/web/src/locales/zh.ts +++ b/web/src/locales/zh.ts @@ -716,6 +716,23 @@ General:实体和关系提取提示来自 GitHub - microsoft/graphrag:基于 '需要索引其 “我的云端硬盘” 的邮箱,多个邮箱用逗号分隔(建议包含管理员)。', google_driveSharedFoldersTip: '需要同步的 Google Drive 文件夹链接,多个链接用逗号分隔。', + jiraDescription: '接入 Jira 工作区,持续同步Issues、评论与附件。', + jiraBaseUrlTip: + 'Jira 的 Base URL,例如:https://your-domain.atlassian.net。', + jiraProjectKeyTip: '可选:仅同步指定的项目(如 RAG)。', + jiraJqlTip: '可选:自定义 JQL 过滤条件,留空则使用项目 / 时间范围。', + jiraBatchSizeTip: '每次向 Jira 请求的 Issue 数量上限。', + jiraCommentsTip: '同步评论。', + jiraAttachmentsTip: '开启后会将附件下载为独立文档。', + jiraAttachmentSizeTip: '超过该字节阈值的附件会被跳过。', + jiraLabelsTip: '需要跳过的标签(逗号分隔)。', + jiraBlacklistTip: '这些邮箱作者的评论会被忽略。', + jiraScopedTokenTip: + '仅当凭证为 Atlassian scoped token(api.atlassian.com)时生效。', + jiraEmailTip: '与 API Token 对应的 Jira 账户邮箱。', + jiraTokenTip: + '在 https://id.atlassian.com/manage-profile/security/api-tokens 生成的 API Token。 (Clould only)', + jiraPasswordTip: '可选:仅 Jira Server/Data Center 环境需要的密码字段。', availableSourcesDescription: '选择要添加的数据源', availableSources: '可用数据源', datasourceDescription: '管理您的数据源和连接', diff --git a/web/src/pages/user-setting/data-source/contant.tsx b/web/src/pages/user-setting/data-source/contant.tsx index 7acf3036d..3c8c55826 100644 --- a/web/src/pages/user-setting/data-source/contant.tsx +++ b/web/src/pages/user-setting/data-source/contant.tsx @@ -9,8 +9,8 @@ export enum DataSourceKey { NOTION = 'notion', DISCORD = 'discord', GOOGLE_DRIVE = 'google_drive', - // GMAIL = 'gmail', - // JIRA = 'jira', + // GMAIL = 'gmail', + JIRA = 'jira', // SHAREPOINT = 'sharepoint', // SLACK = 'slack', // TEAMS = 'teams', @@ -42,6 +42,11 @@ export const DataSourceInfo = { description: t(`setting.${DataSourceKey.GOOGLE_DRIVE}Description`), icon: , }, + [DataSourceKey.JIRA]: { + name: 'Jira', + description: t(`setting.${DataSourceKey.JIRA}Description`), + icon: , + }, }; export const DataSourceFormBaseFields = [ @@ -270,6 +275,106 @@ export const DataSourceFormFields = { defaultValue: 'uploaded', }, ], + [DataSourceKey.JIRA]: [ + { + label: 'Jira Base URL', + name: 'config.base_url', + type: FormFieldType.Text, + required: true, + placeholder: 'https://your-domain.atlassian.net', + tooltip: t('setting.jiraBaseUrlTip'), + }, + { + label: 'Project Key', + name: 'config.project_key', + type: FormFieldType.Text, + required: false, + placeholder: 'RAGFlow', + tooltip: t('setting.jiraProjectKeyTip'), + }, + { + label: 'Custom JQL', + name: 'config.jql_query', + type: FormFieldType.Textarea, + required: false, + placeholder: 'project = RAG AND updated >= -7d', + tooltip: t('setting.jiraJqlTip'), + }, + { + label: 'Batch Size', + name: 'config.batch_size', + type: FormFieldType.Number, + required: false, + tooltip: t('setting.jiraBatchSizeTip'), + }, + { + label: 'Include Comments', + name: 'config.include_comments', + type: FormFieldType.Checkbox, + required: false, + defaultValue: true, + tooltip: t('setting.jiraCommentsTip'), + }, + { + label: 'Include Attachments', + name: 'config.include_attachments', + type: FormFieldType.Checkbox, + required: false, + defaultValue: false, + tooltip: t('setting.jiraAttachmentsTip'), + }, + { + label: 'Attachment Size Limit (bytes)', + name: 'config.attachment_size_limit', + type: FormFieldType.Number, + required: false, + defaultValue: 10 * 1024 * 1024, + tooltip: t('setting.jiraAttachmentSizeTip'), + }, + { + label: 'Labels to Skip', + name: 'config.labels_to_skip', + type: FormFieldType.Tag, + required: false, + tooltip: t('setting.jiraLabelsTip'), + }, + { + label: 'Comment Email Blacklist', + name: 'config.comment_email_blacklist', + type: FormFieldType.Tag, + required: false, + tooltip: t('setting.jiraBlacklistTip'), + }, + { + label: 'Use Scoped Token (Clould only)', + name: 'config.scoped_token', + type: FormFieldType.Checkbox, + required: false, + tooltip: t('setting.jiraScopedTokenTip'), + }, + { + label: 'Jira User Email (Cloud) or User Name (Server)', + name: 'config.credentials.jira_user_email', + type: FormFieldType.Text, + required: true, + placeholder: 'you@example.com', + tooltip: t('setting.jiraEmailTip'), + }, + { + label: 'Jira API Token (Cloud only)', + name: 'config.credentials.jira_api_token', + type: FormFieldType.Password, + required: false, + tooltip: t('setting.jiraTokenTip'), + }, + { + label: 'Jira Password (Server only)', + name: 'config.credentials.jira_password', + type: FormFieldType.Password, + required: false, + tooltip: t('setting.jiraPasswordTip'), + }, + ], // [DataSourceKey.GOOGLE_DRIVE]: [ // { // label: 'Primary Admin Email', @@ -433,4 +538,25 @@ export const DataSourceFormDefaultValues = { }, }, }, + [DataSourceKey.JIRA]: { + name: '', + source: DataSourceKey.JIRA, + config: { + base_url: '', + project_key: '', + jql_query: '', + batch_size: 2, + include_comments: true, + include_attachments: false, + attachment_size_limit: 10 * 1024 * 1024, + labels_to_skip: [], + comment_email_blacklist: [], + scoped_token: false, + credentials: { + jira_user_email: '', + jira_api_token: '', + jira_password: '', + }, + }, + }, }; diff --git a/web/src/pages/user-setting/data-source/index.tsx b/web/src/pages/user-setting/data-source/index.tsx index 80ceea1d7..9cb58672a 100644 --- a/web/src/pages/user-setting/data-source/index.tsx +++ b/web/src/pages/user-setting/data-source/index.tsx @@ -44,6 +44,12 @@ const dataSourceTemplates = [ description: DataSourceInfo[DataSourceKey.NOTION].description, icon: DataSourceInfo[DataSourceKey.NOTION].icon, }, + { + id: DataSourceKey.JIRA, + name: DataSourceInfo[DataSourceKey.JIRA].name, + description: DataSourceInfo[DataSourceKey.JIRA].description, + icon: DataSourceInfo[DataSourceKey.JIRA].icon, + }, ]; const DataSource = () => { const { t } = useTranslation(); From e7e89d3ecbf9638865b15f951549875534b62538 Mon Sep 17 00:00:00 2001 From: Billy Bao Date: Mon, 17 Nov 2025 11:16:34 +0800 Subject: [PATCH 4/6] Doc: style fix (#11295) ### What problem does this PR solve? Style fix based on #11283 ### Type of change - [x] Documentation Update --- deepdoc/parser/mineru_parser.py | 2 +- docs/guides/accessing_admin_ui.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/deepdoc/parser/mineru_parser.py b/deepdoc/parser/mineru_parser.py index bb663de0d..6d3b292d0 100644 --- a/deepdoc/parser/mineru_parser.py +++ b/deepdoc/parser/mineru_parser.py @@ -434,7 +434,7 @@ class MinerUParser(RAGFlowPdfParser): if not section.strip(): section = "FAILED TO PARSE TABLE" case MinerUContentType.IMAGE: - section = "".join(output.get(["image_caption"],[])) + "\n" + "".join(output.get(["image_footnote"],[])) + section = "".join(output.get("image_caption", [])) + "\n" + "".join(output.get("image_footnote", [])) case MinerUContentType.EQUATION: section = output["text"] case MinerUContentType.CODE: diff --git a/docs/guides/accessing_admin_ui.md b/docs/guides/accessing_admin_ui.md index 23521244b..181cff5ac 100644 --- a/docs/guides/accessing_admin_ui.md +++ b/docs/guides/accessing_admin_ui.md @@ -15,7 +15,7 @@ To access the RAGFlow admin UI, append `/admin` to the web UI's address, e.g. `h ### Default Credentials | Username | Password | |----------|----------| -| admin@ragflow.io | admin | +| `admin@ragflow.io` | `admin` | ## Admin UI Overview From 9cef3a26250667fa6761dbe9893a194de0c28aef Mon Sep 17 00:00:00 2001 From: chanx <1243304602@qq.com> Date: Mon, 17 Nov 2025 11:16:55 +0800 Subject: [PATCH 5/6] Fix: Fixed the issue of not being able to select the time zone in the user center. (#11298) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit … user center. ### What problem does this PR solve? Fix: Fixed the issue of not being able to select the time zone in the user center. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- web/src/components/ui/modal/modal.tsx | 4 +++ web/src/pages/user-setting/profile/index.tsx | 30 ++++++-------------- 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/web/src/components/ui/modal/modal.tsx b/web/src/components/ui/modal/modal.tsx index acae6c147..af516b1e6 100644 --- a/web/src/components/ui/modal/modal.tsx +++ b/web/src/components/ui/modal/modal.tsx @@ -86,6 +86,9 @@ const Modal: ModalType = ({ onOk?.(); }, [onOk, onOpenChange]); const handleChange = (open: boolean) => { + if (!open && !maskClosable) { + return; + } onOpenChange?.(open); console.log('open', open, onOpenChange); if (open && !disabled) { @@ -185,6 +188,7 @@ const Modal: ModalType = ({ diff --git a/web/src/pages/user-setting/profile/index.tsx b/web/src/pages/user-setting/profile/index.tsx index dceb2cdf3..5c2741cf6 100644 --- a/web/src/pages/user-setting/profile/index.tsx +++ b/web/src/pages/user-setting/profile/index.tsx @@ -13,13 +13,7 @@ import { } from '@/components/ui/form'; import { Input } from '@/components/ui/input'; import { Modal } from '@/components/ui/modal/modal'; -import { - Select, - SelectContent, - SelectItem, - SelectTrigger, - SelectValue, -} from '@/components/ui/select'; +import { RAGFlowSelect } from '@/components/ui/select'; import { useTranslate } from '@/hooks/common-hooks'; import { TimezoneList } from '@/pages/user-setting/constants'; import { zodResolver } from '@hookform/resolvers/zod'; @@ -230,6 +224,7 @@ const ProfilePage: FC = () => { title={modalTitle[editType]} open={isEditing} showfooter={false} + maskClosable={false} titleClassName="text-base" onOpenChange={(open) => { if (!open) { @@ -281,23 +276,14 @@ const ProfilePage: FC = () => { {t('timezone')} - + />
From 6b64641042342fafe9da6026c6391bc8280fd4f7 Mon Sep 17 00:00:00 2001 From: Scott Davidson <49713135+sd109@users.noreply.github.com> Date: Mon, 17 Nov 2025 06:21:27 +0000 Subject: [PATCH 6/6] Fix: default model base url extraction logic (#11263) ### What problem does this PR solve? Fixes an issue where default models which used the same factory but different base URLs would all be initialised with the default chat model's base URL and would ignore e.g. the embedding model's base URL config. For example, with the following service config, the embedding and reranker models would end up using the base URL for the default chat model (i.e. `llm1.example.com`): ```yaml ragflow: service_conf: user_default_llm: factory: OpenAI-API-Compatible api_key: not-used default_models: chat_model: name: llm1 base_url: https://llm1.example.com/v1 embedding_model: name: llm2 base_url: https://llm2.example.com/v1 rerank_model: name: llm3 base_url: https://llm3.example.com/v1/rerank llm_factories: factory_llm_infos: - name: OpenAI-API-Compatible logo: "" tags: "LLM,TEXT EMBEDDING,SPEECH2TEXT,MODERATION" status: "1" llm: - llm_name: llm1 base_url: 'https://llm1.example.com/v1' api_key: not-used tags: "LLM,CHAT,IMAGE2TEXT" max_tokens: 100000 model_type: chat is_tools: false - llm_name: llm2 base_url: https://llm2.example.com/v1 api_key: not-used tags: "TEXT EMBEDDING" max_tokens: 10000 model_type: embedding - llm_name: llm3 base_url: https://llm3.example.com/v1/rerank api_key: not-used tags: "RERANK,1k" max_tokens: 10000 model_type: rerank ``` ### Type of change - [X] Bug Fix (non-breaking change which fixes an issue) --- api/db/services/llm_service.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/api/db/services/llm_service.py b/api/db/services/llm_service.py index 6ccbf5a94..4d4ccaa57 100644 --- a/api/db/services/llm_service.py +++ b/api/db/services/llm_service.py @@ -19,6 +19,7 @@ import re from common.token_utils import num_tokens_from_string from functools import partial from typing import Generator +from common.constants import LLMType from api.db.db_models import LLM from api.db.services.common_service import CommonService from api.db.services.tenant_llm_service import LLM4Tenant, TenantLLMService @@ -32,6 +33,14 @@ def get_init_tenant_llm(user_id): from common import settings tenant_llm = [] + model_configs = { + LLMType.CHAT: settings.CHAT_CFG, + LLMType.EMBEDDING: settings.EMBEDDING_CFG, + LLMType.SPEECH2TEXT: settings.ASR_CFG, + LLMType.IMAGE2TEXT: settings.IMAGE2TEXT_CFG, + LLMType.RERANK: settings.RERANK_CFG, + } + seen = set() factory_configs = [] for factory_config in [ @@ -54,8 +63,8 @@ def get_init_tenant_llm(user_id): "llm_factory": factory_config["factory"], "llm_name": llm.llm_name, "model_type": llm.model_type, - "api_key": factory_config["api_key"], - "api_base": factory_config["base_url"], + "api_key": model_configs.get(llm.model_type, {}).get("api_key", factory_config["api_key"]), + "api_base": model_configs.get(llm.model_type, {}).get("base_url", factory_config["base_url"]), "max_tokens": llm.max_tokens if llm.max_tokens else 8192, } ) @@ -80,8 +89,8 @@ class LLMBundle(LLM4Tenant): def encode(self, texts: list): if self.langfuse: - generation = self.langfuse.start_generation(trace_context=self.trace_context, name="encode", model=self.llm_name, input={"texts": texts}) - + generation = self.langfuse.start_generation(trace_context=self.trace_context, name="encode", model=self.llm_name, input={"texts": texts}) + safe_texts = [] for text in texts: token_size = num_tokens_from_string(text) @@ -90,7 +99,7 @@ class LLMBundle(LLM4Tenant): safe_texts.append(text[:target_len]) else: safe_texts.append(text) - + embeddings, used_tokens = self.mdl.encode(safe_texts) llm_name = getattr(self, "llm_name", None)