Skip to content

Commit

Permalink
Merge pull request #125 from cloudera/mob/main
Browse files Browse the repository at this point in the history
Datasource Metrics
  • Loading branch information
ewilliams-cloudera authored Feb 11, 2025
2 parents 82fa9c2 + 169dd65 commit abcc1bc
Show file tree
Hide file tree
Showing 23 changed files with 1,615 additions and 512 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*******************************************************************************
/*
* CLOUDERA APPLIED MACHINE LEARNING PROTOTYPE (AMP)
* (C) Cloudera, Inc. 2024
* All rights reserved.
Expand Down Expand Up @@ -76,5 +76,5 @@ public String extractUsername(Cookie[] cookies) {
return "unknown";
}

public record JwtCookie(String username) {}
record JwtCookie(String username) {}
}
118 changes: 114 additions & 4 deletions llm-service/app/routers/index/data_source/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,21 @@
# BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
# DATA.
# ##############################################################################

import json
import logging
import pathlib
import tempfile
from collections import Counter
from http import HTTPStatus
from typing import Any, Dict, Optional

import mlflow
import pandas as pd
from fastapi import APIRouter, Depends, HTTPException
from fastapi_utils.cbv import cbv
from llama_index.core.llms import LLM
from llama_index.core.node_parser import SentenceSplitter
from mlflow.entities import Experiment
from mlflow.entities import Experiment, Run, FileInfo
from pydantic import BaseModel

from .... import exceptions
Expand All @@ -47,10 +50,18 @@
from ....ai.indexing.summary_indexer import SummaryIndexer
from ....ai.vector_stores.qdrant import QdrantVectorStore
from ....ai.vector_stores.vector_store import VectorStore
from ....services.metadata_apis import data_sources_metadata_api
from ....services import document_storage, models
from ....services.metadata_apis import data_sources_metadata_api
from ....services.metadata_apis.data_sources_metadata_api import RagDataSource

STANDARD_FEEDBACK = [
"Inaccurate",
"Not Helpful",
"Out of date",
"Too short",
"Too long",
]

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/data_sources/{data_source_id}", tags=["Data Sources"])
Expand All @@ -65,7 +76,6 @@ class SummarizeDocumentRequest(BaseModel):


class RagIndexDocumentConfiguration(BaseModel):
# TODO: Add more params
chunk_size: int = 512 # this is llama-index's default
chunk_overlap: int = 10 # percentage of tokens in a chunk (chunk_size)

Expand All @@ -82,6 +92,19 @@ class ChunkContentsResponse(BaseModel):
metadata: Dict[str, Any]


class Metrics(BaseModel):
positive_ratings: int
negative_ratings: int
no_ratings: int
count_of_interactions: int
count_of_direct_interactions: int
aggregated_feedback: dict[str, int]
unique_users: int
max_score_over_time: list[tuple[int, float]]
input_word_count_over_time: list[tuple[int, int]]
output_word_count_over_time: list[tuple[int, int]]


@cbv(router)
class DataSourceController:
chunks_vector_store: VectorStore = Depends(
Expand Down Expand Up @@ -308,3 +331,90 @@ def visualize_with_query(
self, request: VisualizationRequest
) -> list[tuple[tuple[float, float], str]]:
return self.chunks_vector_store.visualize(request.user_query)

@router.get("/metrics")
@exceptions.propagates
def metrics(self, data_source_id: int) -> Metrics:
runs: list[Run] = mlflow.search_runs(
output_format="list", search_all_experiments=True
)
relevant_runs: list[Run] = list(
filter(
lambda r: data_source_id
in json.loads(r.data.params.get("data_source_ids", "[]"))
or [],
runs,
)
)
positive_ratings = len(
list(filter(lambda r: r.data.metrics.get("rating", 0) > 0, relevant_runs))
)
negative_ratings = len(
list(filter(lambda r: r.data.metrics.get("rating", 0) < 0, relevant_runs))
)
no_ratings = len(
list(filter(lambda r: r.data.metrics.get("rating", 0) == 0, relevant_runs))
)

run: Run
scores: list[float] = list()
feedback_entries: list[str] = list()
unique_users = len(
set(map(lambda r: r.data.params.get("user_name", "unknown"), runs))
)
count_of_direct_interactions = 0
max_score_over_time: list[tuple[int, float]] = []
input_word_count_over_time: list[tuple[int, int]] = []
output_word_count_over_time: list[tuple[int, int]] = []
for run in relevant_runs:
base_artifact_uri: str = run.info.artifact_uri
artifacts: list[FileInfo] = mlflow.artifacts.list_artifacts(
base_artifact_uri
)
if run.data.tags.get("direct_llm") == "True":
count_of_direct_interactions += 1

artifact: FileInfo
for artifact in artifacts:
## get the last segment of the path
name = pathlib.Path(artifact.path).name
if name == "response_details.json":
df = self.load_dataframe_from_artifact(base_artifact_uri, name)
if "score" in df.columns:
scores.extend(df["score"].to_list())
if name == "feedback.json":
df = self.load_dataframe_from_artifact(base_artifact_uri, name)
if "feedback" in df.columns:
feedback_entries.extend(df["feedback"].to_list())
max_score_over_time.append(
(run.info.start_time, run.data.metrics.get("max_score", 0))
)
input_word_count_over_time.append(
(run.info.start_time, run.data.metrics.get("input_word_count", 0))
)
output_word_count_over_time.append(
(run.info.start_time, run.data.metrics.get("output_word_count", 0))
)
cleaned_feedback = list(
map(
lambda feedback: feedback if feedback in STANDARD_FEEDBACK else "Other",
feedback_entries,
)
)
return Metrics(
positive_ratings=positive_ratings,
negative_ratings=negative_ratings,
no_ratings=no_ratings,
count_of_interactions=len(relevant_runs),
count_of_direct_interactions=count_of_direct_interactions,
aggregated_feedback=(dict(Counter(cleaned_feedback))),
unique_users=unique_users,
max_score_over_time=max_score_over_time,
input_word_count_over_time=input_word_count_over_time,
output_word_count_over_time=output_word_count_over_time,
)

def load_dataframe_from_artifact(self, uri: str, name: str) -> pd.DataFrame:
artifact_loc = uri + "/" + name
data = mlflow.artifacts.load_text(artifact_loc)
return pd.read_json(data, orient="split")
115 changes: 67 additions & 48 deletions llm-service/app/routers/index/sessions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,23 @@
# BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
# DATA.
# ##############################################################################
import time
import uuid
import base64
import json
import logging
from typing import Annotated

import mlflow
from fastapi import APIRouter
from fastapi import APIRouter, Cookie
from mlflow.entities import Experiment, Run
from pydantic import BaseModel

from .... import exceptions
from ....rag_types import RagPredictConfiguration
from ....services import llm_completion
from ....services.chat import generate_suggested_questions, v2_chat
from ....services.chat_store import ChatHistoryManager, RagStudioChatMessage, RagMessage
from ....services.chat import generate_suggested_questions, v2_chat, direct_llm_chat
from ....services.chat_store import ChatHistoryManager, RagStudioChatMessage
from ....services.metadata_apis import session_metadata_api

logger = logging.getLogger(__name__)
router = APIRouter(prefix="/sessions/{session_id}", tags=["Sessions"])


Expand Down Expand Up @@ -78,16 +80,46 @@ def delete_chat_history(session_id: int) -> str:
return "Chat history deleted."


class ChatResponseFeedback(BaseModel):
class ChatResponseRating(BaseModel):
rating: bool


@router.post(
"/responses/{response_id}/rating", summary="Provide a rating on a chat response."
)
@exceptions.propagates
def rating(
session_id: int,
response_id: str,
request: ChatResponseRating,
) -> ChatResponseRating:
session = session_metadata_api.get_session(session_id)
experiment: Experiment = mlflow.set_experiment(
experiment_name=f"session_{session.name}_{session.id}"
)
runs: list[Run] = mlflow.search_runs(
[experiment.experiment_id],
filter_string=f"tags.response_id='{response_id}'",
output_format="list",
)
for run in runs:
value: int = 1 if request.rating else -1
mlflow.log_metric("rating", value, run_id=run.info.run_id)
return ChatResponseRating(rating=request.rating)


class ChatResponseFeedback(BaseModel):
feedback: str


@router.post(
"/responses/{response_id}/feedback", summary="Provide feedback on a chat response."
)
@exceptions.propagates
def evaluate(
session_id: int, response_id: str, feedback: ChatResponseFeedback
def feedback(
session_id: int,
response_id: str,
request: ChatResponseFeedback,
) -> ChatResponseFeedback:
session = session_metadata_api.get_session(session_id)
experiment: Experiment = mlflow.set_experiment(
Expand All @@ -99,62 +131,49 @@ def evaluate(
output_format="list",
)
for run in runs:
mlflow.log_metric("rating", feedback.rating, run_id=run.info.run_id)
return ChatResponseFeedback(
rating=feedback.rating,
)
mlflow.log_table(
data={"feedback": request.feedback},
artifact_file="feedback.json",
run_id=run.info.run_id,
)
return ChatResponseFeedback(feedback=request.feedback)


class RagStudioChatRequest(BaseModel):
query: str
configuration: RagPredictConfiguration | None = None


def parse_jwt_cookie(jwt_cookie: str | None) -> str:
if jwt_cookie is None:
return "unknown"
try:
cookie_crumbs = jwt_cookie.strip().split(".")
if len(cookie_crumbs) != 3:
return "unknown"
base_64_user_info = cookie_crumbs[1]
user_info_json = base64.b64decode(base_64_user_info + "===")
user_info = json.loads(user_info_json)
return str(user_info["username"])
except Exception:
logger.exception("Failed to parse JWT cookie")
return "unknown"


@router.post("/chat", summary="Chat with your documents in the requested datasource")
@exceptions.propagates
def chat(
session_id: int,
request: RagStudioChatRequest,
_basusertoken: Annotated[str | None, Cookie()] = None,
) -> RagStudioChatMessage:
user_name = parse_jwt_cookie(_basusertoken)
mlflow.llama_index.autolog()

configuration = request.configuration or RagPredictConfiguration()
if configuration.exclude_knowledge_base:
return llm_talk(session_id, request.query)
return v2_chat(session_id, request.query, configuration)


def llm_talk(
session_id: int,
query: str,
) -> RagStudioChatMessage:
session = session_metadata_api.get_session(session_id)
experiment = mlflow.set_experiment(
experiment_name=f"session_{session.name}_{session.id}"
)
response_id = str(uuid.uuid4())
with mlflow.start_run(
experiment_id=experiment.experiment_id, run_name=f"{response_id}"
):
mlflow.set_tag("response_id", response_id)

chat_response = llm_completion.completion(
session_id, query, session.inference_model
)
new_chat_message = RagStudioChatMessage(
id=response_id,
source_nodes=[],
inference_model=session.inference_model,
evaluations=[],
rag_message=RagMessage(
user=query,
assistant=str(chat_response.message.content),
),
timestamp=time.time(),
condensed_question=None,
)
ChatHistoryManager().append_to_history(session_id, [new_chat_message])
return new_chat_message
return direct_llm_chat(session_id, request.query, user_name)
return v2_chat(session_id, request.query, configuration, user_name)


class RagSuggestedQuestionsResponse(BaseModel):
Expand Down
Loading

0 comments on commit abcc1bc

Please sign in to comment.