Skip to content

Commit

Permalink
[EAGLE-5342] Added Model Upload Tests (#495)
Browse files Browse the repository at this point in the history
* created temp dummy_models_path

* created temp dummy_models_path

* created temp dummy_models_path in run_locally

* Added hf model run locally tests

* Added hf_mbart_model dummy model for tests

* remove xformers

* Added model upload tests

* Fix minor status_code_pb2 issue

* Fix issues

* Fix minor status_code_pb2 issue

* Fix minor  issue

* Fix minor  issue

* reduce transformers version

* fix requirements version

* fix path for windows

* fix tests for windows

* use python builtin tar function for taring
  • Loading branch information
luv-bansal authored Jan 24, 2025
1 parent 4a7a506 commit b85883f
Show file tree
Hide file tree
Showing 6 changed files with 414 additions and 19 deletions.
20 changes: 11 additions & 9 deletions clarifai/runners/models/model_upload.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import re
import sys
import tarfile
import time
from string import Template

Expand Down Expand Up @@ -54,7 +55,7 @@ def __init__(self, folder: str, validate_api_ids: bool = True, download_validati
def _validate_folder(self, folder):
if folder == ".":
folder = "" # will getcwd() next which ends with /
if not folder.startswith("/"):
if not os.path.isabs(folder):
folder = os.path.join(os.getcwd(), folder)
logger.info(f"Validating folder: {folder}")
if not os.path.exists(folder):
Expand Down Expand Up @@ -428,14 +429,15 @@ def upload_model_version(self, download_checkpoints):

model_version_proto = self.get_model_version_proto()

if download_checkpoints:
tar_cmd = f"tar --exclude=*~ --exclude={self.tar_file} -czvf {self.tar_file} -C {self.folder} ."
else: # we don't want to send the checkpoints up even if they are in the folder.
logger.info(f"Skipping {self.checkpoint_path} in the tar file that is uploaded.")
tar_cmd = f"tar --exclude={self.checkpoint_suffix} --exclude=*~ --exclude={self.tar_file} -czvf {self.tar_file} -C {self.folder} ."
# Tar the folder
logger.debug(tar_cmd)
os.system(tar_cmd)
def filter_func(tarinfo):
name = tarinfo.name
exclude = [self.tar_file, "*~"]
if not download_checkpoints:
exclude.append(self.checkpoint_suffix)
return None if any(name.endswith(ex) for ex in exclude) else tarinfo

with tarfile.open(self.tar_file, "w:gz") as tar:
tar.add(self.folder, arcname=".", filter=filter_func)
logger.info("Tarring complete, about to start upload.")

file_size = os.path.getsize(self.tar_file)
Expand Down
68 changes: 68 additions & 0 deletions tests/runners/hf_mbart_model/1/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import os
from typing import Iterator

import torch
from clarifai_grpc.grpc.api import resources_pb2, service_pb2
from clarifai_grpc.grpc.api.status import status_code_pb2, status_pb2
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer

from clarifai.runners.models.model_runner import ModelRunner
from clarifai.utils.logging import logger

NUM_GPUS = 1


def set_output(texts: list):
assert isinstance(texts, list)
output_protos = []
for text in texts:
output_protos.append(
resources_pb2.Output(
data=resources_pb2.Data(text=resources_pb2.Text(raw=text)),
status=status_pb2.Status(code=status_code_pb2.SUCCESS)))
return output_protos


class MyRunner(ModelRunner):
"""A custom runner that loads the model and generates text using lmdeploy inference.
"""

def load_model(self):
"""Load the model here"""
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
logger.info(f"Running on device: {self.device}")
checkpoints = os.path.join(os.path.dirname(__file__), "checkpoints")

for root, dirs, files in os.walk(checkpoints):
for f in files:
logger.info(os.path.join(root, f))

# if checkpoints section is in config.yaml file then checkpoints will be downloaded at this path during model upload time.
self.tokenizer = AutoTokenizer.from_pretrained(checkpoints)
self.model = AutoModelForSeq2SeqLM.from_pretrained(
checkpoints, torch_dtype="auto", device_map=self.device)

def predict(self, request: service_pb2.PostModelOutputsRequest
) -> Iterator[service_pb2.MultiOutputResponse]:
"""This is the method that will be called when the runner is run. It takes in an input and
returns an output.
"""
texts = [inp.data.text.raw for inp in request.inputs]

raw_texts = []
for t in texts:
inputs = self.tokenizer.encode(t, return_tensors="pt").to(self.device)
outputs = self.model.generate(inputs)
raw_texts.append(self.tokenizer.decode(outputs[0]))
output_protos = set_output(raw_texts)

return service_pb2.MultiOutputResponse(outputs=output_protos)

def generate(self, request: service_pb2.PostModelOutputsRequest
) -> Iterator[service_pb2.MultiOutputResponse]:
"""Example yielding a whole batch of streamed stuff back."""
raise NotImplementedError("This method is not implemented yet.")

def stream(self, request_iterator: Iterator[service_pb2.PostModelOutputsRequest]
) -> Iterator[service_pb2.MultiOutputResponse]:
pass
20 changes: 20 additions & 0 deletions tests/runners/hf_mbart_model/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Config file for the VLLM runner

model:
id: "hf-mbart-model"
user_id: "user_id"
app_id: "app_id"
model_type_id: "text-to-text"

build_info:
python_version: "3.12"

inference_compute_info:
cpu_limit: "500m"
cpu_memory: "500Mi"
num_accelerators: 0

checkpoints:
type: "huggingface"
repo_id: "sshleifer/tiny-mbart"
hf_token: ""
7 changes: 7 additions & 0 deletions tests/runners/hf_mbart_model/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
torch==2.4.0
tokenizers>=0.19.0
transformers>=4.44
accelerate>=1.0.1
optimum>=1.20.0
sentencepiece==0.2.0
requests==2.23.0
152 changes: 142 additions & 10 deletions tests/runners/test_model_run_locally.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,97 @@
import os
import shutil
import subprocess
from pathlib import Path

import pytest
import yaml

from clarifai.runners.models.model_run_locally import ModelRunLocally

MODEL_PATH = os.path.join(os.path.dirname(__file__), "dummy_runner_models")
CLARIFAI_USER_ID = os.environ["CLARIFAI_USER_ID"]
CLARIFAI_PAT = os.environ["CLARIFAI_PAT"]


@pytest.fixture
def model_run_locally():
def dummy_models_path(tmp_path):
"""
Copy the dummy_runner_models folder to a temp directory and update app_id in config.yaml
so that your e2e tests use a newly created ephemeral app on your Clarifai account.
"""
tests_dir = Path(__file__).parent.resolve()
original_dummy_path = tests_dir / "dummy_runner_models"
if not original_dummy_path.exists():
# Adjust or raise an error if you cannot locate the dummy_runner_models folder
raise FileNotFoundError(f"Could not find dummy_runner_models at {original_dummy_path}. "
"Adjust path or ensure it exists.")

# Copy the entire folder to tmp_path
target_folder = tmp_path / "dummy_runner_models"
shutil.copytree(original_dummy_path, target_folder)

# Update the config.yaml to override the app_id with the ephemeral one
config_yaml_path = target_folder / "config.yaml"
with config_yaml_path.open("r") as f:
config = yaml.safe_load(f)

# Overwrite the app_id with the newly created clarifai_app
config["model"]["user_id"] = CLARIFAI_USER_ID

# Rewrite config.yaml
with config_yaml_path.open("w") as f:
yaml.dump(config, f, sort_keys=False)

return str(target_folder)


@pytest.fixture
def model_run_locally(dummy_models_path):
"""
Fixture that instantiates the ModelRunLocally class
with the dummy model_path that already exists.
"""
return ModelRunLocally(dummy_models_path)


@pytest.fixture
def dummy_hf_models_path(tmp_path):
"""
Copy the hf_mbart_model folder to a temp directory and update app_id in config.yaml
so that your e2e tests use a newly created ephemeral app on your Clarifai account.
"""
tests_dir = Path(__file__).parent.resolve()
original_dummy_path = tests_dir / "hf_mbart_model"
if not original_dummy_path.exists():
# Adjust or raise an error if you cannot locate the hf_mbart_model folder
raise FileNotFoundError(f"Could not find hf_mbart_model at {original_dummy_path}. "
"Adjust path or ensure it exists.")

# Copy the entire folder to tmp_path
target_folder = tmp_path / "hf_mbart_model"
shutil.copytree(original_dummy_path, target_folder)

# Update the config.yaml to override the app_id with the ephemeral one
config_yaml_path = target_folder / "config.yaml"
with config_yaml_path.open("r") as f:
config = yaml.safe_load(f)

# Overwrite the app_id with the newly created clarifai_app
config["model"]["user_id"] = CLARIFAI_USER_ID

# Rewrite config.yaml
with config_yaml_path.open("w") as f:
yaml.dump(config, f, sort_keys=False)

return str(target_folder)


@pytest.fixture
def hf_model_run_locally(dummy_hf_models_path):
"""
Fixture that instantiates the ModelRunLocally class
with the dummy model_path that already exists.
"""
return ModelRunLocally(MODEL_PATH)
return ModelRunLocally(dummy_hf_models_path)


def test_get_model_runner(model_run_locally):
Expand Down Expand Up @@ -68,9 +142,6 @@ def test_install_requirements(model_run_locally):
model_run_locally.install_requirements()
except SystemExit:
pytest.fail("install_requirements() failed and exited.")
# You might want to verify the presence of installed packages by checking
# the venv's site-packages or something similar. For simplicity, we'll only
# verify that no exception was raised.
# Clean up
model_run_locally.clean_up()

Expand Down Expand Up @@ -103,10 +174,6 @@ def test_docker_build_and_test_container(model_run_locally):
Test building a Docker image and running a container test using the dummy model.
This test will be skipped if Docker is not installed.
"""
# Setup
# download_checkpoints & createDockerfile are called in the main()
# but we can do it here if needed. The code calls them automatically
# in main if inside_container is True, we directly test the method:

# Test if Docker is installed
assert model_run_locally.is_docker_installed(), "Docker not installed, skipping."
Expand Down Expand Up @@ -138,3 +205,68 @@ def test_docker_build_and_test_container(model_run_locally):

# Remove the image
model_run_locally.remove_docker_image(image_name)


def test_hf_test_model_success(hf_model_run_locally):
"""
Test that test_model succeeds with the dummy model.
This calls the script's test_model method, which runs a subprocess.
"""
hf_model_run_locally.uploader.download_checkpoints()
hf_model_run_locally.create_temp_venv()
hf_model_run_locally.install_requirements()

# Catch the subprocess call. If the dummy model is correct, exit code should be 0.
try:
hf_model_run_locally.test_model()
except SystemExit:
pytest.fail("test_model() triggered a system exit with non-zero code.")
except subprocess.CalledProcessError:
# If the process didn't return code 0, fail the test
pytest.fail("The model test did not complete successfully in the subprocess.")
finally:
# Clean up
hf_model_run_locally.clean_up()


# @pytest.mark.skipif(shutil.which("docker") is None, reason="Docker not installed or not in PATH.")
@pytest.mark.skip(reason="Will add later after new clarifai package is released")
def test_hf_docker_build_and_test_container(hf_model_run_locally):
"""
Test building a Docker image and running a container test using the dummy model.
This test will be skipped if Docker is not installed.
"""

# Download the checkpoints for the model
hf_model_run_locally.uploader.download_checkpoints()

# Test if Docker is installed
assert hf_model_run_locally.is_docker_installed(), "Docker not installed, skipping."

# Build or re-build the Docker image
hf_model_run_locally.uploader.create_dockerfile()
image_tag = hf_model_run_locally._docker_hash()
image_name = f"{hf_model_run_locally.config['model']['id']}:{image_tag}"

if not hf_model_run_locally.docker_image_exists(image_name):
hf_model_run_locally.build_docker_image(image_name=image_name)

# Run tests inside the container
try:
hf_model_run_locally.test_model_container(
image_name=image_name,
container_name="test_clarifai_model_container",
env_vars={
'CLARIFAI_PAT': CLARIFAI_PAT,
'CLARIFAI_API_BASE': os.environ.get('CLARIFAI_API_BASE', 'https://api.clarifai.com')
})
except subprocess.CalledProcessError:
pytest.fail("Failed to test the model inside the docker container.")
finally:
# Clean up the container if it still exists
if hf_model_run_locally.container_exists("test_clarifai_model_container"):
hf_model_run_locally.stop_docker_container("test_clarifai_model_container")
hf_model_run_locally.remove_docker_container("test_clarifai_model_container")

# Remove the image
hf_model_run_locally.remove_docker_image(image_name)
Loading

0 comments on commit b85883f

Please sign in to comment.