Compare commits
6 commits
main
...
cognify-sp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
382b7ec146 | ||
|
|
84a23756f5 | ||
|
|
eb631a23ad | ||
|
|
13d1133680 | ||
|
|
757d745b5d | ||
|
|
abfcbc69d6 |
4 changed files with 35 additions and 24 deletions
|
|
@ -269,13 +269,13 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's
|
||||||
graph_model=graph_model,
|
graph_model=graph_model,
|
||||||
config=config,
|
config=config,
|
||||||
custom_prompt=custom_prompt,
|
custom_prompt=custom_prompt,
|
||||||
task_config={"batch_size": 10},
|
task_config={"batch_size": 20},
|
||||||
), # Generate knowledge graphs from the document chunks.
|
), # Generate knowledge graphs from the document chunks.
|
||||||
Task(
|
Task(
|
||||||
summarize_text,
|
summarize_text,
|
||||||
task_config={"batch_size": 10},
|
task_config={"batch_size": 20},
|
||||||
),
|
),
|
||||||
Task(add_data_points, task_config={"batch_size": 10}),
|
Task(add_data_points, task_config={"batch_size": 20}),
|
||||||
]
|
]
|
||||||
|
|
||||||
return default_tasks
|
return default_tasks
|
||||||
|
|
@ -311,7 +311,7 @@ async def get_temporal_tasks(
|
||||||
max_chunk_size=chunk_size or get_max_chunk_tokens(),
|
max_chunk_size=chunk_size or get_max_chunk_tokens(),
|
||||||
chunker=chunker,
|
chunker=chunker,
|
||||||
),
|
),
|
||||||
Task(extract_events_and_timestamps, task_config={"chunk_size": 10}),
|
Task(extract_events_and_timestamps, task_config={"batch_size": 10}),
|
||||||
Task(extract_knowledge_graph_from_events),
|
Task(extract_knowledge_graph_from_events),
|
||||||
Task(add_data_points, task_config={"batch_size": 10}),
|
Task(add_data_points, task_config={"batch_size": 10}),
|
||||||
]
|
]
|
||||||
|
|
|
||||||
|
|
@ -24,9 +24,8 @@ class EmbeddingConfig(BaseSettings):
|
||||||
model_config = SettingsConfigDict(env_file=".env", extra="allow")
|
model_config = SettingsConfigDict(env_file=".env", extra="allow")
|
||||||
|
|
||||||
def model_post_init(self, __context) -> None:
|
def model_post_init(self, __context) -> None:
|
||||||
# If embedding batch size is not defined use 2048 as default for OpenAI and 100 for all other embedding models
|
|
||||||
if not self.embedding_batch_size and self.embedding_provider.lower() == "openai":
|
if not self.embedding_batch_size and self.embedding_provider.lower() == "openai":
|
||||||
self.embedding_batch_size = 2048
|
self.embedding_batch_size = 1024
|
||||||
elif not self.embedding_batch_size:
|
elif not self.embedding_batch_size:
|
||||||
self.embedding_batch_size = 100
|
self.embedding_batch_size = 100
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
from cognee.shared.logging_utils import get_logger
|
import asyncio
|
||||||
|
|
||||||
from cognee.infrastructure.databases.exceptions import EmbeddingException
|
from cognee.shared.logging_utils import get_logger
|
||||||
from cognee.infrastructure.databases.vector import get_vector_engine
|
from cognee.infrastructure.databases.vector import get_vector_engine
|
||||||
from cognee.infrastructure.engine import DataPoint
|
from cognee.infrastructure.engine import DataPoint
|
||||||
|
|
||||||
|
|
@ -33,18 +33,23 @@ async def index_data_points(data_points: list[DataPoint]):
|
||||||
indexed_data_point.metadata["index_fields"] = [field_name]
|
indexed_data_point.metadata["index_fields"] = [field_name]
|
||||||
index_points[index_name].append(indexed_data_point)
|
index_points[index_name].append(indexed_data_point)
|
||||||
|
|
||||||
for index_name_and_field, indexable_points in index_points.items():
|
tasks: list[asyncio.Task] = []
|
||||||
first_occurence = index_name_and_field.index("_")
|
batch_size = vector_engine.embedding_engine.get_batch_size()
|
||||||
index_name = index_name_and_field[:first_occurence]
|
|
||||||
field_name = index_name_and_field[first_occurence + 1 :]
|
for index_name_and_field, points in index_points.items():
|
||||||
try:
|
first = index_name_and_field.index("_")
|
||||||
# In case the amount of indexable points is too large we need to send them in batches
|
index_name = index_name_and_field[:first]
|
||||||
batch_size = vector_engine.embedding_engine.get_batch_size()
|
field_name = index_name_and_field[first + 1 :]
|
||||||
for i in range(0, len(indexable_points), batch_size):
|
|
||||||
batch = indexable_points[i : i + batch_size]
|
# Create embedding requests per batch to run in parallel later
|
||||||
await vector_engine.index_data_points(index_name, field_name, batch)
|
for i in range(0, len(points), batch_size):
|
||||||
except EmbeddingException as e:
|
batch = points[i : i + batch_size]
|
||||||
logger.warning(f"Failed to index data points for {index_name}.{field_name}: {e}")
|
tasks.append(
|
||||||
|
asyncio.create_task(vector_engine.index_data_points(index_name, field_name, batch))
|
||||||
|
)
|
||||||
|
|
||||||
|
# Run all embedding requests in parallel
|
||||||
|
await asyncio.gather(*tasks)
|
||||||
|
|
||||||
return data_points
|
return data_points
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,5 @@
|
||||||
|
import asyncio
|
||||||
|
|
||||||
from cognee.modules.engine.utils.generate_edge_id import generate_edge_id
|
from cognee.modules.engine.utils.generate_edge_id import generate_edge_id
|
||||||
from cognee.shared.logging_utils import get_logger
|
from cognee.shared.logging_utils import get_logger
|
||||||
from collections import Counter
|
from collections import Counter
|
||||||
|
|
@ -76,15 +78,20 @@ async def index_graph_edges(
|
||||||
indexed_data_point.metadata["index_fields"] = [field_name]
|
indexed_data_point.metadata["index_fields"] = [field_name]
|
||||||
index_points[index_name].append(indexed_data_point)
|
index_points[index_name].append(indexed_data_point)
|
||||||
|
|
||||||
|
# Get maximum batch size for embedding model
|
||||||
|
batch_size = vector_engine.embedding_engine.get_batch_size()
|
||||||
|
tasks: list[asyncio.Task] = []
|
||||||
|
|
||||||
for index_name, indexable_points in index_points.items():
|
for index_name, indexable_points in index_points.items():
|
||||||
index_name, field_name = index_name.split(".")
|
index_name, field_name = index_name.split(".")
|
||||||
|
|
||||||
# Get maximum batch size for embedding model
|
# Create embedding tasks to run in parallel later
|
||||||
batch_size = vector_engine.embedding_engine.get_batch_size()
|
|
||||||
# We save the data in batches of {batch_size} to not put a lot of pressure on the database
|
|
||||||
for start in range(0, len(indexable_points), batch_size):
|
for start in range(0, len(indexable_points), batch_size):
|
||||||
batch = indexable_points[start : start + batch_size]
|
batch = indexable_points[start : start + batch_size]
|
||||||
|
|
||||||
await vector_engine.index_data_points(index_name, field_name, batch)
|
tasks.append(vector_engine.index_data_points(index_name, field_name, batch))
|
||||||
|
|
||||||
|
# Start all embedding tasks and wait for completion
|
||||||
|
await asyncio.gather(*tasks)
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue