fix: example ruff errors

This commit is contained in:
Boris Arzentar 2025-03-11 16:44:00 +01:00
parent 40c0015f0d
commit 2e4aab9a9a

View file

@ -21,11 +21,13 @@ BASE_URL = "https://pokeapi.co/api/v2/"
os.environ["BUCKET_URL"] = "./.data_storage"
os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "true"
# Data Models
class Abilities(DataPoint):
name: str = "Abilities"
metadata: dict = {"index_fields": ["name"]}
class PokemonAbility(DataPoint):
name: str
ability__name: str
@ -39,11 +41,13 @@ class PokemonAbility(DataPoint):
is_type: Abilities
metadata: dict = {"index_fields": ["ability__name"]}
class Pokemons(DataPoint):
name: str = "Pokemons"
have: Abilities
metadata: dict = {"index_fields": ["name"]}
class Pokemon(DataPoint):
name: str
base_experience: int
@ -66,6 +70,7 @@ class Pokemon(DataPoint):
abilities: List[PokemonAbility]
metadata: dict = {"index_fields": ["name"]}
# Data Collection Functions
@dlt.resource(write_disposition="replace")
def pokemon_list(limit: int = 50):
@ -73,6 +78,7 @@ def pokemon_list(limit: int = 50):
response.raise_for_status()
yield response.json()["results"]
@dlt.transformer(data_from=pokemon_list)
def pokemon_details(pokemons):
"""Fetches detailed info for each Pokémon"""
@ -81,6 +87,7 @@ def pokemon_details(pokemons):
response.raise_for_status()
yield response.json()
# Data Loading Functions
def load_abilities_data(jsonl_abilities):
abilities_root = Abilities()
@ -97,6 +104,7 @@ def load_abilities_data(jsonl_abilities):
return abilities_root, pokemon_abilities
def load_pokemon_data(jsonl_pokemons, pokemon_abilities, pokemon_root):
pokemons = []
@ -105,7 +113,8 @@ def load_pokemon_data(jsonl_pokemons, pokemon_abilities, pokemon_root):
for line in f:
pokemon_data = json.loads(line)
abilities = [
ability for ability in pokemon_abilities
ability
for ability in pokemon_abilities
if ability["_dlt_parent_id"] == pokemon_data["_dlt_id"]
]
pokemon_data["external_id"] = pokemon_data["id"]
@ -116,12 +125,17 @@ def load_pokemon_data(jsonl_pokemons, pokemon_abilities, pokemon_root):
return pokemons
# Main Application Logic
async def setup_and_process_data():
"""Setup configuration and process Pokemon data"""
# Setup configuration
data_directory_path = str(pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".data_storage")).resolve())
cognee_directory_path = str(pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".cognee_system")).resolve())
data_directory_path = str(
pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".data_storage")).resolve()
)
cognee_directory_path = str(
pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".cognee_system")).resolve()
)
cognee.config.data_root_directory(data_directory_path)
cognee.config.system_root_directory(cognee_directory_path)
@ -153,6 +167,7 @@ async def setup_and_process_data():
return pokemons
async def pokemon_cognify(pokemons):
"""Process Pokemon data with Cognee and perform search"""
# Setup and run Cognee tasks
@ -165,7 +180,7 @@ async def pokemon_cognify(pokemons):
tasks=tasks,
data=pokemons,
dataset_id=uuid5(NAMESPACE_OID, "Pokemon"),
pipeline_name='pokemon_pipeline',
pipeline_name="pokemon_pipeline",
)
async for result in results:
@ -174,17 +189,18 @@ async def pokemon_cognify(pokemons):
# Perform search
search_results = await cognee.search(
query_type=SearchType.GRAPH_COMPLETION,
query_text="pokemons?"
query_type=SearchType.GRAPH_COMPLETION, query_text="pokemons?"
)
print("Search results:")
for result_text in search_results:
print(result_text)
async def main():
pokemons = await setup_and_process_data()
await pokemon_cognify(pokemons)
if __name__ == "__main__":
asyncio.run(main())