diff --git a/examples/python/pokemon_datapoints_example.py b/examples/python/pokemon_datapoints_example.py index 058492e63..83179cf9f 100644 --- a/examples/python/pokemon_datapoints_example.py +++ b/examples/python/pokemon_datapoints_example.py @@ -21,11 +21,13 @@ BASE_URL = "https://pokeapi.co/api/v2/" os.environ["BUCKET_URL"] = "./.data_storage" os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "true" + # Data Models class Abilities(DataPoint): name: str = "Abilities" metadata: dict = {"index_fields": ["name"]} + class PokemonAbility(DataPoint): name: str ability__name: str @@ -39,11 +41,13 @@ class PokemonAbility(DataPoint): is_type: Abilities metadata: dict = {"index_fields": ["ability__name"]} + class Pokemons(DataPoint): name: str = "Pokemons" have: Abilities metadata: dict = {"index_fields": ["name"]} + class Pokemon(DataPoint): name: str base_experience: int @@ -66,6 +70,7 @@ class Pokemon(DataPoint): abilities: List[PokemonAbility] metadata: dict = {"index_fields": ["name"]} + # Data Collection Functions @dlt.resource(write_disposition="replace") def pokemon_list(limit: int = 50): @@ -73,6 +78,7 @@ def pokemon_list(limit: int = 50): response.raise_for_status() yield response.json()["results"] + @dlt.transformer(data_from=pokemon_list) def pokemon_details(pokemons): """Fetches detailed info for each Pokémon""" @@ -81,6 +87,7 @@ def pokemon_details(pokemons): response.raise_for_status() yield response.json() + # Data Loading Functions def load_abilities_data(jsonl_abilities): abilities_root = Abilities() @@ -97,6 +104,7 @@ def load_abilities_data(jsonl_abilities): return abilities_root, pokemon_abilities + def load_pokemon_data(jsonl_pokemons, pokemon_abilities, pokemon_root): pokemons = [] @@ -105,7 +113,8 @@ def load_pokemon_data(jsonl_pokemons, pokemon_abilities, pokemon_root): for line in f: pokemon_data = json.loads(line) abilities = [ - ability for ability in pokemon_abilities + ability + for ability in pokemon_abilities if ability["_dlt_parent_id"] == pokemon_data["_dlt_id"] ] pokemon_data["external_id"] = pokemon_data["id"] @@ -116,12 +125,17 @@ def load_pokemon_data(jsonl_pokemons, pokemon_abilities, pokemon_root): return pokemons + # Main Application Logic async def setup_and_process_data(): """Setup configuration and process Pokemon data""" # Setup configuration - data_directory_path = str(pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".data_storage")).resolve()) - cognee_directory_path = str(pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".cognee_system")).resolve()) + data_directory_path = str( + pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".data_storage")).resolve() + ) + cognee_directory_path = str( + pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".cognee_system")).resolve() + ) cognee.config.data_root_directory(data_directory_path) cognee.config.system_root_directory(cognee_directory_path) @@ -153,6 +167,7 @@ async def setup_and_process_data(): return pokemons + async def pokemon_cognify(pokemons): """Process Pokemon data with Cognee and perform search""" # Setup and run Cognee tasks @@ -165,7 +180,7 @@ async def pokemon_cognify(pokemons): tasks=tasks, data=pokemons, dataset_id=uuid5(NAMESPACE_OID, "Pokemon"), - pipeline_name='pokemon_pipeline', + pipeline_name="pokemon_pipeline", ) async for result in results: @@ -174,17 +189,18 @@ async def pokemon_cognify(pokemons): # Perform search search_results = await cognee.search( - query_type=SearchType.GRAPH_COMPLETION, - query_text="pokemons?" + query_type=SearchType.GRAPH_COMPLETION, query_text="pokemons?" ) print("Search results:") for result_text in search_results: print(result_text) + async def main(): pokemons = await setup_and_process_data() await pokemon_cognify(pokemons) + if __name__ == "__main__": asyncio.run(main())