diff --git a/distributed/entrypoint.py b/distributed/entrypoint.py index 29c009f33..a2ea601d8 100644 --- a/distributed/entrypoint.py +++ b/distributed/entrypoint.py @@ -59,19 +59,21 @@ async def main(): worker_future = data_point_saver_worker.spawn(total_number_of_workers=len(documents)) consumer_futures.append(worker_future) + producer_futures = [] + def process_chunks_remotely(document_chunks: list[DocumentChunk], document: Document): - return graph_extraction_worker.spawn( + producer_future = graph_extraction_worker.spawn( user=user, document_name=document.name, document_chunks=document_chunks ) + producer_futures.append(producer_future) + return producer_future # Produce chunks and spawn a graph_extraction_worker job for each batch of chunks for i in range(0, len(documents), document_batch_size): batch = documents[i : i + document_batch_size] - producer_futures = [] - for item in batch: - async for run_info in run_tasks( + async for worker_feature in run_tasks( [ Task(classify_documents), Task( @@ -89,7 +91,7 @@ async def main(): user=user, pipeline_name="chunk_processing", ): - producer_futures.append(run_info) + pass batch_results = [] for producer_future in producer_futures: