Tested and ran the script to make sure the outputs make sense and actions get completed

This commit is contained in:
Vasilije 2023-08-27 20:05:19 +02:00
parent 40b93f11ca
commit e7698af597
3 changed files with 187 additions and 263 deletions

View file

@ -9,7 +9,8 @@ Initial code lets you do three operations:
1. Add to memory
2. Retrieve from memory
3. Structure the data to schema and load to duckdb
3. Structure the data to schema
4. Load to a database
#How to use
@ -38,13 +39,17 @@ The Memory API provides the following endpoints:
- /run-buffer (POST)
- /buffer/create-context (POST)
Here is a payload example:
## How To Get Started
1. We do a post request to add-memory endpoint with the following payload:
It will upload Jack London "Call of the Wild" to SEMANTIC memory
```
{
curl -X POST http://localhost:8000/semantic/add-memory -H "Content-Type: application/json" -d '{
"payload": {
"user_id": "681",
"prompt": "I want ",
"prompt": "I am adding docs",
"params": {
"version": "1.0",
"agreement_id": "AG123456",
@ -59,13 +64,45 @@ Here is a payload example:
"validity_end": "2024-07-31"
},
"loader_settings": {
"format": "PDF", // or "HTML" // or 'DOCX' // or 'TXT'
"source": "url", // or "file"
"path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf"
"format": "PDF",
"source": "url",
"path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf"
}
}
}'
```
2. We run the buffer with the prompt "I want to know how does Buck adapt to life in the wild and then have that info translated to German "
```
curl -X POST http://localhost:8000/run-buffer -H "Content-Type: application/json" -d '{
"payload": {
"user_id": "681",
"prompt": "I want to know how does Buck adapt to life in the wild and then have that info translated to German ",
"params": {
"version": "1.0",
"agreement_id": "AG123456",
"privacy_policy": "https://example.com/privacy",
"terms_of_service": "https://example.com/terms",
"format": "json",
"schema_version": "1.1",
"checksum": "a1b2c3d4e5f6",
"owner": "John Doe",
"license": "MIT",
"validity_start": "2023-08-01",
"validity_end": "2024-07-31"
},
"attention_modulators": {
"relevance": 0.5,
"saliency": 0.5,
"relevance": 0.0,
"saliency": 0.1
}
}
}'
```
Other attention modulators that could be implemented:
"frequency": 0.5,
"repetition": 0.5,
"length": 0.5,
@ -96,8 +133,4 @@ Here is a payload example:
"vividness": 0.5,
"originality": 0.5,
"creativity": 0.5,
"humor": 0.5,
},
}
}
```
"humor": 0.5,

View file

@ -1,24 +1,14 @@
from io import BytesIO
import logging
import os
from typing import Dict, Any
from langchain.document_loaders import PyPDFLoader
from level_2_pdf_vectorstore__dlt_contracts import Memory
import uvicorn
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Dict, Any
import re
import json
import logging
import os
import uvicorn
from fastapi import Request
import yaml
from fastapi import HTTPException
from fastapi import FastAPI, UploadFile, File
from typing import List
import requests
from level_2_pdf_vectorstore__dlt_contracts import Memory
from dotenv import load_dotenv
# Set up logging
logging.basicConfig(
level=logging.INFO, # Set the logging level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL)
@ -26,23 +16,18 @@ logging.basicConfig(
)
logger = logging.getLogger(__name__)
from dotenv import load_dotenv
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
app = FastAPI(debug=True)
from fastapi import Depends
class ImageResponse(BaseModel):
success: bool
message: str
@app.get(
"/",
)
@ -52,83 +37,15 @@ async def root():
"""
return {"message": "Hello, World, I am alive!"}
@app.get("/health")
def health_check():
"""
Health check endpoint that returns the server status.
"""
return {"status": "OK"}
# curl -X POST -H "Content-Type: application/json" -d '{"data": "YourPayload"}' -F "files=@/path/to/your/pdf/file.pdf" http://127.0.0.1:8000/upload/
class Payload(BaseModel):
payload: Dict[str, Any]
# @app.post("/upload/", response_model=dict)
# async def upload_pdf_and_payload(
# payload: Payload,
# # files: List[UploadFile] = File(...),
# ):
# try:
# # Process the payload
# decoded_payload = payload.payload
# # except:
# # pass
# #
# # return JSONResponse(content={"response": decoded_payload}, status_code=200)
#
# # Download the remote PDF if URL is provided
# if 'pdf_url' in decoded_payload:
# pdf_response = requests.get(decoded_payload['pdf_url'])
# pdf_content = pdf_response.content
#
# logging.info("Downloaded PDF from URL")
#
# # Create an in-memory file-like object for the PDF content
# pdf_stream = BytesIO(pdf_content)
#
# contents = pdf_stream.read()
#
# tmp_location = os.path.join('/tmp', "tmp.pdf")
# with open(tmp_location, 'wb') as tmp_file:
# tmp_file.write(contents)
#
# logging.info("Wrote PDF from URL")
#
# # Process the PDF using PyPDFLoader
# loader = PyPDFLoader(tmp_location)
# pages = loader.load_and_split()
# logging.info(" PDF split into pages")
# Memory_ = Memory(index_name="my-agent", user_id='555' )
# await Memory_.async_init()
# Memory_._add_episodic_memory(user_input="I want to get a schema for my data", content =pages)
#
#
# # Run the buffer
# response = Memory_._run_buffer(user_input="I want to get a schema for my data")
# return JSONResponse(content={"response": response}, status_code=200)
#
# #to do: add the user id to the payload
# #to do add the raw pdf to payload
# # bb = await Memory_._run_buffer(user_input=decoded_payload['prompt'])
# # print(bb)
#
#
# except Exception as e:
#
# return {"error": str(e)}
# # Here you can perform your processing on the PDF contents
# # results.append({"filename": file.filename, "size": len(contents)})
#
# # Append the in-memory file to the files list
# # files.append(UploadFile(pdf_stream, filename="downloaded.pdf"))
#
def memory_factory(memory_type):
load_dotenv()
@ -231,7 +148,7 @@ async def available_buffer_actions(
@app.post("/run-buffer", response_model=dict)
async def available_buffer_actions(
async def run_buffer(
payload: Payload,
# files: List[UploadFile] = File(...),
):
@ -243,8 +160,8 @@ async def available_buffer_actions(
await Memory_.async_init()
# memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
output = await Memory_._run_buffer(
user_input=decoded_payload["prompt"], params=decoded_payload["params"]
output = await Memory_._run_main_buffer(
user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"]
)
return JSONResponse(content={"response": output}, status_code=200)
@ -253,7 +170,7 @@ async def available_buffer_actions(
@app.post("/buffer/create-context", response_model=dict)
async def available_buffer_actions(
async def create_context(
payload: Payload,
# files: List[UploadFile] = File(...),
):
@ -266,7 +183,7 @@ async def available_buffer_actions(
# memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
output = await Memory_._create_buffer_context(
user_input=decoded_payload["prompt"], params=decoded_payload["params"]
user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"]
)
return JSONResponse(content={"response": output}, status_code=200)
@ -274,29 +191,26 @@ async def available_buffer_actions(
return JSONResponse(content={"response": {"error": str(e)}}, status_code=503)
#
# # Process each uploaded PDF file
# results = []
# for file in files:
# contents = await file.read()
# tmp_location = os.path.join('/tmp', "tmp.pdf")
# with open(tmp_location, 'wb') as tmp_file:
# tmp_file.write(contents)
# loader = PyPDFLoader(tmp_location)
# pages = loader.load_and_split()
#
# stm = ShortTermMemory(user_id=decoded_payload['user_id'])
# stm.episodic_buffer.main_buffer(prompt=decoded_payload['prompt'], pages=pages)
# # Here you can perform your processing on the PDF contents
# results.append({"filename": file.filename, "size": len(contents)})
#
# return {"message": "Upload successful", "results": results}
#
# except Exception as e:
# return {"error": str(e)}
@app.post("/buffer/get-tasks", response_model=dict)
async def create_context(
payload: Payload,
# files: List[UploadFile] = File(...),
):
try:
decoded_payload = payload.payload
Memory_ = Memory(user_id=decoded_payload["user_id"])
#
await Memory_.async_init()
# memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
output = await Memory_._get_task_list(
user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"]
)
return JSONResponse(content={"response": output}, status_code=200)
except Exception as e:
return JSONResponse(content={"response": {"error": str(e)}}, status_code=503)
def start_api_server(host: str = "0.0.0.0", port: int = 8000):
"""

View file

@ -208,11 +208,13 @@ class WeaviateVectorDB(VectorDB):
async def add_memories(
self, observation: str, loader_settings: dict = None, params: dict = None
self, observation: str, loader_settings: dict = None, params: dict = None ,namespace:str=None
):
# Update Weaviate memories here
print(self.namespace)
retriever = self.init_weaviate(self.namespace)
if namespace is None:
namespace = self.namespace
retriever = self.init_weaviate(namespace)
def _stuct(observation, params):
"""Utility function to not repeat metadata structure"""
@ -348,7 +350,9 @@ class WeaviateVectorDB(VectorDB):
.with_additional(
["id", "creationTimeUnix", "lastUpdateTimeUnix", "score", 'distance']
)
.with_near_text({"concepts": [observation]})
.with_hybrid(
query=observation,
)
.with_autocut(1)
.with_where(params_user_id)
.do()
@ -444,10 +448,11 @@ class BaseMemory:
observation: Optional[str] = None,
loader_settings: dict = None,
params: Optional[dict] = None,
namespace: Optional[str] = None,
):
if self.db_type == "weaviate":
return await self.vector_db.add_memories(
observation=observation, loader_settings=loader_settings, params=params
observation=observation, loader_settings=loader_settings, params=params, namespace=namespace
)
# Add other db_type conditions if necessary
@ -552,7 +557,6 @@ class EpisodicBuffer(BaseMemory):
last_update_datetime = datetime.fromtimestamp(int(unix_t) / 1000)
time_difference = datetime.now() - last_update_datetime
time_difference_text = humanize.naturaltime(time_difference)
print("WE GOT THIS FAR")
namespace = await self.memory_route(str(time_difference_text))
return [namespace.value, lookup_value]
@ -569,12 +573,6 @@ class EpisodicBuffer(BaseMemory):
number_of_total_events = (
weaviate_client.query.aggregate(namespace).with_meta_count().do()
)
print(number_of_total_events)
print(
number_of_total_events["data"]["Aggregate"]["EPISODICMEMORY"][0]["meta"][
"count"
]
)
frequency = float(number_of_relevant_events) / float(
number_of_total_events["data"]["Aggregate"]["EPISODICMEMORY"][0]["meta"][
"count"
@ -587,33 +585,37 @@ class EpisodicBuffer(BaseMemory):
Stored in the episodic memory, mainly to show how well a buffer did the job
Starts at 0, gets updated based on the user feedback"""
print("WE START WITH RELEVANCE")
return ["0", "memory"]
weaviate_client = self.init_client(namespace=namespace)
async def saliency(self, observation: str, namespace=None) -> list[str]:
"""Determines saliency by scoring the set of retrieved documents against each other and trying to determine saliency
"""
class SaliencyRawList(BaseModel):
"""Schema for documentGroups"""
original_document: str = Field(
...,
description="The original document retrieved from the database")
saliency_score: str = Field(
None, description="The score between 0 and 1")
class SailencyContextList(BaseModel):
"""Buffer raw context processed by the buffer"""
result_output = await self.fetch_memories(
observation=observation, params=None, namespace=namespace
docs: List[SaliencyRawList] = Field(..., description="List of docs")
observation: str = Field(..., description="The original user query")
parser = PydanticOutputParser(pydantic_object=SailencyContextList)
prompt = PromptTemplate(
template="Determine saliency of documents compared to the other documents retrieved \n{format_instructions}\nOriginal observation is: {query}\n",
input_variables=["query"],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
print("HERE IS THE RESULT OUTPUT TO DETERMINE RELEVANCE: ", result_output)
_input = prompt.format_prompt(query=observation)
document_context_result = self.llm_base(_input.to_string())
document_context_result_parsed = parser.parse(document_context_result)
return document_context_result_parsed.json()
return ["0", "memory"]
async def saliency(self, observation: str) -> list[str]:
"""Determines saliency by finding relevance between user input and document schema values.
After finding document schena value relevant for the user, it forms a new query based on the schema value and the user input
"""
return ["0", "memory"]
async def encoding(
self, document: str, namespace: str = "EPISODICBUFFER", params: dict = None
) -> list[str]:
"""Encoding for the buffer, stores raw data in the buffer
Note, this is not comp-sci encoding, but rather encoding in the sense of storing the content in the buffer
"""
query = await self.add_memories(document, params=params)
return query
async def handle_modulator(
self,
@ -639,6 +641,7 @@ class EpisodicBuffer(BaseMemory):
"freshness": lambda obs, ns: self.freshness(observation=obs, namespace=ns),
"frequency": lambda obs, ns: self.frequency(observation=obs, namespace=ns),
"relevance": lambda obs, ns: self.relevance(observation=obs, namespace=ns),
"saliency": lambda obs, ns: self.saliency(observation=obs, namespace=ns),
}
result_func = modulator_functions.get(modulator_name)
@ -663,16 +666,15 @@ class EpisodicBuffer(BaseMemory):
return [
"translate",
"structure",
"load to database",
"load to semantic memory",
"load to episodic memory",
"load to buffer",
"fetch from vector store"
# "load to semantic memory",
# "load to episodic memory",
# "load to buffer",
]
async def buffer_context(
self,
user_input=None,
content=None,
params=None,
attention_modulators: dict = None,
):
@ -685,7 +687,7 @@ class EpisodicBuffer(BaseMemory):
pass
# we just filter the data here to make sure input is clean
prompt_filter = ChatPromptTemplate.from_template(
"Filter and remove uneccessary information that is not relevant in the user query, keep it as original as possbile: {query}"
"Filter and remove uneccessary information that is not relevant in the query to the vector store to get more information, keep it as original as possbile: {query}"
)
chain_filter = prompt_filter | self.llm
output = await chain_filter.ainvoke({"query": user_input})
@ -739,8 +741,8 @@ class EpisodicBuffer(BaseMemory):
...,
description="The search term to use to get relevant input based on user query",
)
document_description: str = Field(
None, description="The short summary of what the document is about"
document_content: str = Field(
None, description="Shortened original content of the document"
)
document_relevance: str = Field(
None,
@ -767,21 +769,15 @@ class EpisodicBuffer(BaseMemory):
_input = prompt.format_prompt(query=user_input, context=context)
document_context_result = self.llm_base(_input.to_string())
document_context_result_parsed = parser.parse(document_context_result)
print("HERE ARE THE DOCS PARSED AND STRUCTURED", document_context_result_parsed)
print("HERE ARE THE DOCS PARSED AND STRUCTURED", document_context_result_parsed.json())
return document_context_result_parsed
async def get_task_list(
self, user_input=None, content=None, params=None, attention_modulators=None
self, user_input=None, params=None, attention_modulators=None,
):
"""Gets the task list from the document context result to enchance it and be able to pass to the agent"""
list_of_operations = await self.available_operations()
document_context_result_parsed = await self.buffer_context(
user_input=user_input,
content=content,
params=params,
attention_modulators=attention_modulators,
)
class Task(BaseModel):
"""Schema for an individual task."""
@ -802,19 +798,17 @@ class EpisodicBuffer(BaseMemory):
tasks: List[Task] = Field(..., description="List of tasks")
prompt_filter_chunk = f"The raw context data is {str(document_context_result_parsed)} Based on available operations {list_of_operations} determine only the relevant list of steps and operations sequentially based {user_input}"
# chain_filter_chunk = prompt_filter_chunk | self.llm.bind(function_call={"TaskList": "tasks"}, functions=TaskList)
# output_chunk = await chain_filter_chunk.ainvoke({"query": output, "list_of_operations": list_of_operations})
prompt_filter_chunk = f" Based on available operations {list_of_operations} determine only the relevant list of steps and operations sequentially based {user_input}"
prompt_msgs = [
SystemMessage(
content="You are a world class algorithm for decomposing prompts into steps and operations and choosing relevant ones"
),
HumanMessage(content="Decompose based on the following prompt:"),
HumanMessage(content="Decompose based on the following prompt and provide relevant document context reponse:"),
HumanMessagePromptTemplate.from_template("{input}"),
HumanMessage(content="Tips: Make sure to answer in the correct format"),
HumanMessage(
content="Tips: Only choose actions that are relevant to the user query and ignore others"
),
)
]
prompt_ = ChatPromptTemplate(messages=prompt_msgs)
chain = create_structured_output_chain(
@ -828,29 +822,36 @@ class EpisodicBuffer(BaseMemory):
# output = json.dumps(output)
my_object = parse_obj_as(TaskList, output)
print("HERE IS THE OUTPUT", my_object.json())
data = json.loads(my_object.json())
# Extract the list of tasks
tasks_list = data["tasks"]
return tasks_list
async def main_buffer(
self, user_input=None, content=None, params=None, attention_modulators=None
self, user_input=None, params=None, attention_modulators=None
):
"""AI buffer to understand user PDF query, prioritize memory info and process it based on available operations"""
"""AI buffer to run the AI agent to execute the set of tasks"""
memory = Memory(user_id=self.user_id)
await memory.async_init()
tasks_list = await self.get_task_list(
document_context_result_parsed = await self.buffer_context(
user_input=user_input,
content=content,
params=params,
attention_modulators=attention_modulators,
)
tasks_list = await self.get_task_list(
user_input=user_input,
params=params,
attention_modulators=attention_modulators
)
result_tasks = []
document_context_result_parsed = document_context_result_parsed.dict()
document_from_vectorstore = [doc["document_content"] for doc in document_context_result_parsed["docs"]]
for task in tasks_list:
print("HERE IS THE TASK", task)
complete_agent_prompt= f" Document context is: {document_from_vectorstore} \n Task is : {task['task_order']} {task['task_name']} {task['operation']} "
# task['vector_store_context_results']=document_context_result_parsed.dict()
class PromptWrapper(BaseModel):
observation: str = Field(
description="observation we want to fetch from vectordb"
@ -901,6 +902,17 @@ class EpisodicBuffer(BaseMemory):
result = run_open_ai_mapper(observation, json_schema)
return result
class FetchText(BaseModel):
observation: str = Field(description="observation we want to translate")
@tool("fetch_from_vector_store", args_schema=FetchText, return_direct=True)
def fetch_from_vector_store(observation, args_schema=FetchText):
"""Fetch from vectorstore if data doesn't exist in the context"""
if document_context_result_parsed:
return document_context_result_parsed
else:
out = self.fetch_memories(observation['original_query'], namespace="SEMANTICMEMORY")
return out
class TranslateText(BaseModel):
observation: str = Field(description="observation we want to translate")
@ -914,23 +926,21 @@ class EpisodicBuffer(BaseMemory):
agent = initialize_agent(
llm=self.llm,
tools=[translate_to_de, convert_to_structured],
tools=[fetch_from_vector_store,translate_to_de, convert_to_structured],
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True,
)
print("HERE IS THE TASK", task)
output = agent.run(input=task)
print(output)
output = agent.run(input=complete_agent_prompt )
result_tasks.append(task)
result_tasks.append(output)
print("HERE IS THE RESULT TASKS", str(result_tasks))
await self.encoding(str(result_tasks), self.namespace, params=params)
buffer_result = await self.fetch_memories(observation=str(user_input))
print("HERE IS THE RESULT TASKS", str(buffer_result))
# print("HERE IS THE RESULT TASKS", str(result_tasks))
#
# buffer_result = await self.fetch_memories(observation=str(user_input))
#
# print("HERE IS THE RESULT TASKS", str(buffer_result))
class EpisodicTask(BaseModel):
"""Schema for an individual task."""
@ -969,19 +979,19 @@ class EpisodicBuffer(BaseMemory):
)
_input = prompt.format_prompt(
query=user_input, steps=str(tasks_list), buffer=buffer_result
query=user_input, steps=str(tasks_list)
, buffer=str(result_tasks)
)
# return "a few things to do like load episodic memory in a structured format"
output = self.llm_base(_input.to_string())
result_parsing = parser.parse(output)
print("here is the parsing result", result_parsing)
lookup_value = await memory._add_episodic_memory(
observation=str(output), params=params
lookup_value = await self.add_memories(
observation=str(result_parsing.json()), params=params, namespace='EPISODICMEMORY'
)
# print("THE RESULT OF THIS QUERY IS ", result_parsing.json())
await self.delete_memories()
return lookup_value
return result_parsing.json()
class LongTermMemory:
@ -1116,12 +1126,6 @@ class Memory:
params=params
)
async def _run_buffer(
self, user_input: str, content: str = None, params: str = None
):
return await self.short_term_memory.episodic_buffer.main_buffer(
user_input=user_input, content=content, params=params
)
async def _add_buffer_memory(
self,
@ -1147,27 +1151,33 @@ class Memory:
async def _create_buffer_context(
self,
user_input: str,
content: str = None,
params: str = None,
params: dict = None,
attention_modulators: dict = None,
):
return await self.short_term_memory.episodic_buffer.buffer_context(
user_input=user_input,
content=content,
params=params,
attention_modulators=attention_modulators,
)
async def _get_task_list(
self,
user_input: str,
params: str = None,
attention_modulators: dict = None,
):
return await self.short_term_memory.episodic_buffer.get_task_list(
user_input=user_input,
params=params,
attention_modulators=attention_modulators,
)
async def _run_main_buffer(
self,
user_input: str,
content: str = None,
params: str = None,
attention_modulators: str = None,
params: dict = None,
attention_modulators: dict = None,
):
return await self.short_term_memory.episodic_buffer.main_buffer(
user_input=user_input,
content=content,
params=params,
attention_modulators=attention_modulators,
)
@ -1199,35 +1209,15 @@ async def main():
# gg = await memory._fetch_buffer_memory(user_input="i TO GERMAN ")
# print(gg)
episodic = """{
"start_date": "2023-08-23",
"end_date": "2023-08-30",
"user_query": "How can I plan a healthy diet?",
"action_steps": [
{
"step_number": 1,
"description": "Research and gather information about basic principles of a healthy diet."
},
{
"step_number": 2,
"description": "Create a weekly meal plan that includes a variety of nutritious foods."
},
{
"step_number": 3,
"description": "Prepare and cook meals according to your meal plan. Include fruits, vegetables, lean proteins, and whole grains."
}
]
}"""
# modulator = {"relevance": 0.2, "saliency": 0.0, "frequency": 0.7}
modulator = {"relevance": 0.0, "saliency": 0.0, "frequency": 0.0}
# #
# ggur = await memory._create_buffer_context(
# user_input="i NEED TRANSLATION TO GERMAN ",
# content="i NEED TRANSLATION TO GERMAN ",
# params=str(params),
# attention_modulators=modulator,
# )
# print(ggur)
ggur = await memory._run_main_buffer(
user_input="I want to know how does Buck adapt to life in the wild and then have that info translated to german ",
params=params,
attention_modulators=modulator,
)
print(ggur)
ll = {
"format": "PDF",
@ -1239,8 +1229,8 @@ async def main():
# fff = await memory._delete_semantic_memory()
# print(fff)
fff = await memory._fetch_semantic_memory(observation = "dog pulling sleds ", params=None)
print(fff)
# fff = await memory._fetch_semantic_memory(observation = "dog pulling sleds ", params=None)
# print(fff)
# print(len(fff["data"]["Get"]["EPISODICMEMORY"]))
@ -1249,16 +1239,3 @@ if __name__ == "__main__":
asyncio.run(main())
# bb = agent._update_semantic_memory(semantic_memory="Users core summary")
# bb = agent._fetch_semantic_memory(observation= "Users core summary", params = {
# "path": ["inserted_at"],
# "operator": "Equal",
# "valueText": "*2023*"
# })
# buffer = agent._run_buffer(user_input="I want to get a schema for my data")
# print(bb)
# rrr = {
# "path": ["year"],
# "operator": "Equal",
# "valueText": "2017*"
# }