여기에서는 RAG의 성능 향상 기법인 Agentic RAG, Corrective RAG, Self RAG를 구현하는 방법을 설명합니다. 또한 RAG의 데이터 수집에 필요한 PDF의 header/footer의 처리, 이미지의 추출 및 분석과 함께 contextual retrieval을 활용하는 방법을 설명합니다. 이를 통해서 생성형 AI 애플리케이션을 위한 데이터를 효과적으로 수집하여 활용할 수 있습니다. 여기서는 오픈소스 LLM Framework인 LangGraph을 이용하고, 구현된 workflow들은 Streamlit을 이용해 개발 및 테스트를 수행할 수 있습니다. AWS CDK를 이용하고 한번에 배포할 수 있고, CloudFront - ALB 구조를 이용해 HTTPS로 안전하게 접속할 수 있습니다.
전체적인 architecture는 아래와 같습니다. Streamlit이 설치된 EC2는 private subnet에 있고, CloudFront-ALB를 이용해 외부와 연결됩니다. RAG는 OpenSearch를 활용하고 있습니다. 인터넷 검색은 tavily를 사용하고 날씨 API를 추가로 활용합니다.

여기에서는 Lambda-Document를 이용해 입력된 문서를 parsing하여 OpenSearch에 push합니다. 이를 위한 event driven 방식의 데이터 처리 방식은 아래와 같습니다.
Agentic workflow (tool use)는 아래와 같이 구현할 수 있습니다. 상세한 내용은 chat.py을 참조합니다.
일반적인 대화는 아래와 같이 stream으로 결과를 얻을 수 있습니다. 여기에서는 LangChain의 ChatBedrock을 이용합니다. Model ID로 사용할 모델을 지정합니다. 아래 예제에서는 Nova Pro의 모델명인 "us.amazon.nova-pro-v1:0"을 활용하고 있습니다. Nova 모델는 동급 모델대비 빠르고, 높은 가성비와 함께 훌륭한 멀티모달 성능을 가지고 있습니다. 만약 Claude Sonnet 3.5을 사용한 다면 "anthropic.claude-3-5-sonnet-20240620-v1:0"을 입력합니다.
modelId = "us.amazon.nova-pro-v1:0"
bedrock_region = "us-west-2"
boto3_bedrock = boto3.client(
service_name='bedrock-runtime',
region_name=bedrock_region,
config=Config(
retries = {
'max_attempts': 30
}
)
)
parameters = {
"max_tokens":maxOutputTokens,
"temperature":0.1,
"top_k":250,
"top_p":0.9,
"stop_sequences": ["\n\n<thinking>", "\n<thinking>", " <thinking>"]
}
chat = ChatBedrock(
model_id=modelId,
client=boto3_bedrock,
model_kwargs=parameters,
region_name=bedrock_region
)
system = (
"당신의 이름은 서연이고, 질문에 대해 친절하게 답변하는 사려깊은 인공지능 도우미입니다."
"상황에 맞는 구체적인 세부 정보를 충분히 제공합니다."
"모르는 질문을 받으면 솔직히 모른다고 말합니다."
)
human = "Question: {input}"
prompt = ChatPromptTemplate.from_messages([
("system", system),
MessagesPlaceholder(variable_name="history"),
("human", human)
])
history = memory_chain.load_memory_variables({})["chat_history"]
chain = prompt | chat | StrOutputParser()
stream = chain.stream(
{
"history": history,
"input": query,
}
)
print('stream: ', stream)
여기에서는 RAG 구현을 위하여 OpenSearch를 이용합니다.
LangChain의 OpenSearchVectorSearch을 이용하여 관련된 문서를 가져옵니다.
vectorstore_opensearch = OpenSearchVectorSearch(
index_name = index_name,
is_aoss = False,
ef_search = 1024,
m=48,
embedding_function = bedrock_embedding,
opensearch_url=opensearch_url,
http_auth=(opensearch_account, opensearch_passwd)
)
relevant_documents = vectorstore_opensearch.similarity_search_with_score(
query = query,
k = top_k
)
for i, document in enumerate(relevant_documents):
name = document[0].metadata['name']
url = document[0].metadata['url']
content = document[0].page_content
얻어온 문서가 적절한지를 판단하기 위하여 아래와 같이 prompt를 이용해 관련도를 평가하고 structured output을 이용해 관련도를 평가합니다.
system = (
"You are a grader assessing relevance of a retrieved document to a user question."
"If the document contains keyword(s) or semantic meaning related to the question, grade it as relevant."
"Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."
)
grade_prompt = ChatPromptTemplate.from_messages(
[
("system", system),
("human", "Retrieved document: \n\n {document} \n\n User question: {question}"),
]
)
structured_llm_grader = chat.with_structured_output(GradeDocuments)
retrieval_grader = grade_prompt | structured_llm_grader
filtered_docs = []
for i, doc in enumerate(documents):
score = retrieval_grader.invoke({"question": question, "document": doc.page_content})
grade = score.binary_score
if grade.lower() == "yes":
print("---GRADE: DOCUMENT RELEVANT---")
filtered_docs.append(doc)
else:
print("---GRADE: DOCUMENT NOT RELEVANT---")
continue
이후 아래와 같이 RAG를 활용하여 원하는 응답을 얻습니다.
system = (
"당신의 이름은 서연이고, 질문에 대해 친절하게 답변하는 사려깊은 인공지능 도우미입니다."
"다음의 Reference texts을 이용하여 user의 질문에 답변합니다."
"모르는 질문을 받으면 솔직히 모른다고 말합니다."
"답변의 이유를 풀어서 명확하게 설명합니다."
)
human = (
"Question: {input}"
"Reference texts: "
"{context}"
)
prompt = ChatPromptTemplate.from_messages([("system", system), ("human", human)])
chain = prompt | chat
stream = chain.invoke(
{
"context": context,
"input": revised_question,
}
)
print(stream.content)
문서를 크기에 따라 parent chunk와 child chunk로 나누어서 child chunk를 찾은 후에 LLM의 context에는 parent chunk를 사용하면, 검색의 정확도는 높이고 충분한 문서를 context로 활용할 수 있습니다. 아래에서는 parent doc을 생성후에 다시 child doc을 생성합니다. child doc은 metadata에 parent doc의 id를 가지고 있습니다. parent, child의 문서 id는 저장하여 문서 삭제, 업데이트시에 활용됩니다. 세부 코드는 Lambda-Document을 참조합니다.
parent_splitter = RecursiveCharacterTextSplitter(
chunk_size=2000,
chunk_overlap=100,
separators=["\n\n", "\n", ".", " ", ""],
length_function = len,
)
child_splitter = RecursiveCharacterTextSplitter(
chunk_size=400,
chunk_overlap=50,
# separators=["\n\n", "\n", ".", " ", ""],
length_function = len,
)
parent_docs = parent_splitter.split_documents(docs)
parent_doc_ids = vectorstore.add_documents(parent_docs, bulk_size = 10000)
ids = parent_doc_ids
for i, doc in enumerate(parent_docs):
_id = parent_doc_ids[i]
sub_docs = child_splitter.split_documents([doc])
for _doc in sub_docs:
_doc.metadata["parent_doc_id"] = _id
_doc.metadata["doc_level"] = "child"
child_doc_ids = vectorstore.add_documents(sub_docs, bulk_size = 10000)
ids += child_doc_ids
chat.py와 같이 pre_filter를 이용해 child 문서를 검색하여, parent_doc_id를 이용해 parent 문서를 context로 활용합니다. 하나의 parent doc에서 여러개의 child doc이 선택될 수 있으로 parent_doc_id를 이용해 중복을 확인하여 제거합니다.
result = vectorstore_opensearch.similarity_search_with_score(
query = query,
k = top_k*2,
search_type="script_scoring",
pre_filter={"term": {"metadata.doc_level": "child"}}
)
relevant_documents = []
docList = []
for re in result:
if 'parent_doc_id' in re[0].metadata:
parent_doc_id = re[0].metadata['parent_doc_id']
doc_level = re[0].metadata['doc_level']
if doc_level == 'child':
if parent_doc_id in docList:
print('duplicated!')
else:
relevant_documents.append(re)
docList.append(parent_doc_id)
if len(relevant_documents)>=top_k:
break
검색된 child 문서에서 parent_doc_id를 추출하여 parent 문서를 가져와 활용합니다.
for i, document in enumerate(relevant_documents):
parent_doc_id = document[0].metadata['parent_doc_id']
doc_level = document[0].metadata['doc_level']
content, name, url = get_parent_content(parent_doc_id)
def get_parent_content(parent_doc_id):
response = os_client.get(
index = index_name,
id = parent_doc_id
)
source = response['_source']
return source['text']
Header/Footer에서는 이미지의 header/footer를 제거하는 것을 보여주고 있습니다. 문서의 header/footer는 문서마다 다를 수 있으므로 문서마다 header/footer의 높이를 지정하여야 합니다.
image_obj = s3_client.get_object(Bucket=s3_bucket, Key=key)
image_content = image_obj['Body'].read()
img = Image.open(BytesIO(image_content))
width, height = img.size
pos = key.rfind('/')
prefix = key[pos+1:pos+5]
print('img_prefix: ', prefix)
if pdf_profile=='ocean' and prefix == "img_":
area = (0, 175, width, height-175)
img = img.crop(area)
width, height = img.size
if width < 100 or height < 100: # skip small size image
return []
문서의 이미지나 표에는 본문에 없는 중요한 정보가 있을 수 있습니다. PDF와 같은 문서에서 이미지를 추출하여 RAG에서 활용합니다. 이미지는 LLM에서 처리할 수 있도록 resize후에 텍스트를 추출합니다.
isResized = False
while(width*height > 5242880):
width = int(width/2)
height = int(height/2)
isResized = True
if isResized:
img = img.resize((width, height))
buffer = BytesIO()
img.save(buffer, format="PNG")
img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
chat = get_multimodal()
summary = summary_image(img_base64, subject_company)
텍스트 추출시 아래와 같이 prompt를 이용해 이미지의 내용을 활용합니다.
def summary_image(img_base64):
chat = get_chat()
query = "이미지가 의미하는 내용을 풀어서 자세히 알려주세요. markdown 포맷으로 답변을 작성합니다."
messages = [
HumanMessage(
content=[
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{img_base64}",
},
},
{
"type": "text", "text": query
},
]
)
]
result = chat.invoke(messages)
return result.content
Contextual Retrieval와 같이 contextual embedding을 이용하여 chunk에 대한 설명을 추가하면, 검색의 정확도를 높일 수 있습니다. 또한 BM25(keyword) 검색은 OpenSearch의 hybrid 검색을 통해 구현할 수 있습니다. 상세한 코드는 lambda_function.py를 참조합니다.
def get_contexual_docs(whole_doc, splitted_docs):
contextual_template = (
"<document>"
"{WHOLE_DOCUMENT}"
"</document>"
"Here is the chunk we want to situate within the whole document."
"<chunk>"
"{CHUNK_CONTENT}"
"</chunk>"
"Please give a short succinct context to situate this chunk within the overall document for the purposes of improving search retrieval of the chunk."
"Answer only with the succinct context and nothing else."
"Put it in <result> tags."
)
contextual_prompt = ChatPromptTemplate([
('human', contextual_template)
])
docs = []
for i, doc in enumerate(splitted_docs):
chat = get_contexual_retrieval_chat()
contexual_chain = contextual_prompt | chat
response = contexual_chain.invoke(
{
"WHOLE_DOCUMENT": whole_doc.page_content,
"CHUNK_CONTENT": doc.page_content
}
)
output = response.content
contextualized_chunk = output[output.find('<result>')+8:len(output)-9]
docs.append(
Document(
page_content=contextualized_chunk+"\n\n"+doc.page_content,
metadata=doc.metadata
)
)
return docs
"Strawberry의 'r'은 몇개인가요?"의 질문을 하면 code interpreter가 생성한 코드를 실행하여 아래와 같은 결과를 얻을 수 있습니다.

이때 실행된 코드는 아래와 같습니다.
os.environ[ 'MPLCONFIGDIR' ] = '/tmp/'
word = "Strawberry"
r_count = word.lower().count(\'r\')
print(f"\'Strawberry\'에서 \'r\'의 개수는 {r_count}개 입니다.")
LangSmith에서 확인한 동작은 아래와 같습니다.
아래와 같이 activity diagram을 이용하여 node/edge/conditional edge로 구성되는 tool use 방식의 agent를 구현할 수 있습니다.

Tool use 방식 agent의 workflow는 아래와 같습니다. Fuction을 선택하는 call model 노드과 실행하는 tool 노드로 구성됩니다. 선택된 tool의 결과에 따라 cycle형태로 추가 실행을 하거나 종료하면서 결과를 전달할 수 있습니다.
workflow = StateGraph(State)
workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
"agent",
should_continue,
{
"continue": "action",
"end": END,
},
)
workflow.add_edge("action", "agent")
app = workflow.compile()
inputs = [HumanMessage(content=query)]
config = {
"recursion_limit": 50
}
message = app.invoke({"messages": inputs}, config)
Tool use 패턴의 agent는 정의된 tool 함수의 docstring을 이용해 목적에 맞는 tool을 선택합니다. 아래의 search_by_knowledge_base는 OpenSearch를 데이터 저장소로 사용하는 knowledbe base로 부터 관련된 문서를 얻어오는 tool의 예입니다. "Search technical information by keyword"로 정의하였으므로 질문이 기술적인 내용이라면 search_by_knowledge_base가 호출되게 됩니다.
@tool
def search_by_knowledge_base(keyword: str) -> str:
"""
Search technical information by keyword and then return the result as a string.
keyword: search keyword
return: the technical information of keyword
"""
relevant_docs = []
if knowledge_base_id:
retriever = AmazonKnowledgeBasesRetriever(
knowledge_base_id=knowledge_base_id,
retrieval_config={"vectorSearchConfiguration": {
"numberOfResults": top_k,
"overrideSearchType": "HYBRID"
}},
)
docs = retriever.invoke(keyword)
relevant_context = ""
for i, doc in enumerate(docs):
relevant_context += doc.page_content + "\n\n"
return relevant_context
아래와 같이 tool들로 tools를 정의한 후에 bind_tools을 이용하여 call_model 노드를 정의합니다.
tools = [get_current_time, get_book_list, get_weather_info, search_by_tavily, search_by_knowledge_base]
def call_model(state: State, config):
system = (
"당신의 이름은 서연이고, 질문에 친근한 방식으로 대답하도록 설계된 대화형 AI입니다."
"상황에 맞는 구체적인 세부 정보를 충분히 제공합니다."
"모르는 질문을 받으면 솔직히 모른다고 말합니다."
)
prompt = ChatPromptTemplate.from_messages(
[
("system", system),
MessagesPlaceholder(variable_name="messages"),
]
)
model = chat.bind_tools(tools)
chain = prompt | model
response = chain.invoke(state["messages"])
return {"messages": [response]}
또한, tool 노드는 아래와 같이 ToolNode을 이용해 정의합니다.
from langgraph.prebuilt import ToolNode
tool_node = ToolNode(tools)
Corrective RAG(CRAG)는 retrival/grading 후에 질문을 rewrite한 후 인터넷 검색에서 얻어진 결과로 RAG의 성능을 강화하는 방법입니다.
CRAG의 workflow는 아래와 같습니다.
workflow = StateGraph(State)
# Define the nodes
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("grade_documents", grade_documents_node)
workflow.add_node("generate", generate_node)
workflow.add_node("rewrite", rewrite_node)
workflow.add_node("websearch", web_search_node)
# Build graph
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
"grade_documents",
decide_to_generate,
{
"rewrite": "rewrite",
"generate": "generate",
},
)
workflow.add_edge("rewrite", "websearch")
workflow.add_edge("websearch", "generate")
workflow.add_edge("generate", END)
Self RAG는 retrieve/grading 후에 generation을 수행하는데, grading의 결과에 따라 필요시 rewtire후 retrieve를 수행하며, 생성된 결과가 hallucination인지, 답변이 적절한지를 판단하여 필요시 rewtire / retrieve를 반복합니다.
Self RAG의 workflow는 아래와 같습니다.
workflow = StateGraph(State)
# Define the nodes
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("grade_documents", grade_documents_node)
workflow.add_node("generate", generate_node)
workflow.add_node("rewrite", rewrite_node)
# Build graph
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
"grade_documents",
decide_to_generate,
{
"no document": "rewrite",
"document": "generate",
"not available": "generate",
},
)
workflow.add_edge("rewrite", "retrieve")
workflow.add_conditional_edges(
"generate",
grade_generation,
{
"not supported": "generate",
"useful": END,
"not useful": "rewrite",
"not available": END,
},
)
Self Corrective RAG는 Self RAG처럼 retrieve / generate 후에 hallucination인지 답변이 적절한지 확인후 필요시 질문을 rewrite하거나 인터넷 검색을 통해 RAG의 성능을 향상시키는 방법입니다.
Self Corrective RAG의 workflow는 아래와 같습니다.
workflow = StateGraph(State)
# Define the nodes
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
workflow.add_node("rewrite", rewrite_node)
workflow.add_node("websearch", web_search_node)
workflow.add_node("finalize_response", finalize_response_node)
# Build graph
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("rewrite", "retrieve")
workflow.add_edge("websearch", "generate")
workflow.add_edge("finalize_response", END)
workflow.add_conditional_edges(
"generate",
grade_generation,
{
"generate": "generate",
"websearch": "websearch",
"rewrite": "rewrite",
"finalize_response": "finalize_response",
},
)
EC2는 Private Subnet에 있으므로 SSL로 접속할 수 없습니다. 따라서, Console-EC2에 접속하여 "app-for-llm-streamlit"를 선택한 후에 Connect에서 sesseion manager를 선택하여 접속합니다.
Github에서 app에 대한 코드를 업데이트 하였다면, session manager에 접속하여 아래 명령어로 업데이트 합니다.
sudo runuser -l ec2-user -c 'cd /home/ec2-user/agentic-rag && git pull'
Streamlit의 재시작이 필요하다면 아래 명령어로 service를 stop/start 시키고 동작을 확인할 수 있습니다.
sudo systemctl stop streamlit
sudo systemctl start streamlit
sudo systemctl status streamlit -l
Local에서 디버깅을 빠르게 진행하고 싶다면 Local에서 실행하기에 따라서 Local에 필요한 패키지와 환경변수를 업데이트 합니다. 이후 아래 명령어서 실행합니다.
streamlit run application/app.py
EC2에서 debug을 하면서 개발할때 사용하는 명령어입니다.
먼저, 시스템에 등록된 streamlit을 종료합니다.
sudo systemctl stop streamlit
이후 EC2를 session manager를 이용해 접속한 이후에 아래 명령어를 이용해 실행하면 로그를 보면서 수정을 할 수 있습니다.
sudo runuser -l ec2-user -c "/home/ec2-user/.local/bin/streamlit run /home/ec2-user/agentic-rag/application/app.py"
이 솔루션을 사용하기 위해서는 사전에 아래와 같은 준비가 되어야 합니다.
- AWS Account 생성에 따라 계정을 준비합니다.
본 실습에서는 us-west-2 리전을 사용합니다. 인프라 설치에 따라 CDK로 인프라 설치를 진행합니다.
메뉴에서는 아래와 항목들을 제공하고 있습니다.
메뉴에서 [이미지 분석]과 모델로 [Claude 3.5 Sonnet]을 선택한 후에 기다리는 사람들 사진을 다운받아서 업로드합니다. 이후 "사진속에 있는 사람들은 모두 몇명인가요?"라고 입력후 결과를 확인하면 아래와 같습니다.
