From b85883f37384f64205e51f1d11477427ecfa5298 Mon Sep 17 00:00:00 2001 From: Luv Bansal <70321430+luv-bansal@users.noreply.github.com> Date: Fri, 24 Jan 2025 20:35:05 +0530 Subject: [PATCH] [EAGLE-5342] Added Model Upload Tests (#495) * created temp dummy_models_path * created temp dummy_models_path * created temp dummy_models_path in run_locally * Added hf model run locally tests * Added hf_mbart_model dummy model for tests * remove xformers * Added model upload tests * Fix minor status_code_pb2 issue * Fix issues * Fix minor status_code_pb2 issue * Fix minor issue * Fix minor issue * reduce transformers version * fix requirements version * fix path for windows * fix tests for windows * use python builtin tar function for taring --- clarifai/runners/models/model_upload.py | 20 ++- tests/runners/hf_mbart_model/1/model.py | 68 +++++++ tests/runners/hf_mbart_model/config.yaml | 20 +++ tests/runners/hf_mbart_model/requirements.txt | 7 + tests/runners/test_model_run_locally.py | 152 ++++++++++++++-- tests/runners/test_model_upload.py | 166 ++++++++++++++++++ 6 files changed, 414 insertions(+), 19 deletions(-) create mode 100644 tests/runners/hf_mbart_model/1/model.py create mode 100644 tests/runners/hf_mbart_model/config.yaml create mode 100644 tests/runners/hf_mbart_model/requirements.txt create mode 100644 tests/runners/test_model_upload.py diff --git a/clarifai/runners/models/model_upload.py b/clarifai/runners/models/model_upload.py index 5b3f19a0..1daa89e7 100644 --- a/clarifai/runners/models/model_upload.py +++ b/clarifai/runners/models/model_upload.py @@ -1,6 +1,7 @@ import os import re import sys +import tarfile import time from string import Template @@ -54,7 +55,7 @@ def __init__(self, folder: str, validate_api_ids: bool = True, download_validati def _validate_folder(self, folder): if folder == ".": folder = "" # will getcwd() next which ends with / - if not folder.startswith("/"): + if not os.path.isabs(folder): folder = os.path.join(os.getcwd(), folder) logger.info(f"Validating folder: {folder}") if not os.path.exists(folder): @@ -428,14 +429,15 @@ def upload_model_version(self, download_checkpoints): model_version_proto = self.get_model_version_proto() - if download_checkpoints: - tar_cmd = f"tar --exclude=*~ --exclude={self.tar_file} -czvf {self.tar_file} -C {self.folder} ." - else: # we don't want to send the checkpoints up even if they are in the folder. - logger.info(f"Skipping {self.checkpoint_path} in the tar file that is uploaded.") - tar_cmd = f"tar --exclude={self.checkpoint_suffix} --exclude=*~ --exclude={self.tar_file} -czvf {self.tar_file} -C {self.folder} ." - # Tar the folder - logger.debug(tar_cmd) - os.system(tar_cmd) + def filter_func(tarinfo): + name = tarinfo.name + exclude = [self.tar_file, "*~"] + if not download_checkpoints: + exclude.append(self.checkpoint_suffix) + return None if any(name.endswith(ex) for ex in exclude) else tarinfo + + with tarfile.open(self.tar_file, "w:gz") as tar: + tar.add(self.folder, arcname=".", filter=filter_func) logger.info("Tarring complete, about to start upload.") file_size = os.path.getsize(self.tar_file) diff --git a/tests/runners/hf_mbart_model/1/model.py b/tests/runners/hf_mbart_model/1/model.py new file mode 100644 index 00000000..aded9e1b --- /dev/null +++ b/tests/runners/hf_mbart_model/1/model.py @@ -0,0 +1,68 @@ +import os +from typing import Iterator + +import torch +from clarifai_grpc.grpc.api import resources_pb2, service_pb2 +from clarifai_grpc.grpc.api.status import status_code_pb2, status_pb2 +from transformers import AutoModelForSeq2SeqLM, AutoTokenizer + +from clarifai.runners.models.model_runner import ModelRunner +from clarifai.utils.logging import logger + +NUM_GPUS = 1 + + +def set_output(texts: list): + assert isinstance(texts, list) + output_protos = [] + for text in texts: + output_protos.append( + resources_pb2.Output( + data=resources_pb2.Data(text=resources_pb2.Text(raw=text)), + status=status_pb2.Status(code=status_code_pb2.SUCCESS))) + return output_protos + + +class MyRunner(ModelRunner): + """A custom runner that loads the model and generates text using lmdeploy inference. + """ + + def load_model(self): + """Load the model here""" + self.device = 'cuda' if torch.cuda.is_available() else 'cpu' + logger.info(f"Running on device: {self.device}") + checkpoints = os.path.join(os.path.dirname(__file__), "checkpoints") + + for root, dirs, files in os.walk(checkpoints): + for f in files: + logger.info(os.path.join(root, f)) + + # if checkpoints section is in config.yaml file then checkpoints will be downloaded at this path during model upload time. + self.tokenizer = AutoTokenizer.from_pretrained(checkpoints) + self.model = AutoModelForSeq2SeqLM.from_pretrained( + checkpoints, torch_dtype="auto", device_map=self.device) + + def predict(self, request: service_pb2.PostModelOutputsRequest + ) -> Iterator[service_pb2.MultiOutputResponse]: + """This is the method that will be called when the runner is run. It takes in an input and + returns an output. + """ + texts = [inp.data.text.raw for inp in request.inputs] + + raw_texts = [] + for t in texts: + inputs = self.tokenizer.encode(t, return_tensors="pt").to(self.device) + outputs = self.model.generate(inputs) + raw_texts.append(self.tokenizer.decode(outputs[0])) + output_protos = set_output(raw_texts) + + return service_pb2.MultiOutputResponse(outputs=output_protos) + + def generate(self, request: service_pb2.PostModelOutputsRequest + ) -> Iterator[service_pb2.MultiOutputResponse]: + """Example yielding a whole batch of streamed stuff back.""" + raise NotImplementedError("This method is not implemented yet.") + + def stream(self, request_iterator: Iterator[service_pb2.PostModelOutputsRequest] + ) -> Iterator[service_pb2.MultiOutputResponse]: + pass diff --git a/tests/runners/hf_mbart_model/config.yaml b/tests/runners/hf_mbart_model/config.yaml new file mode 100644 index 00000000..3bceb411 --- /dev/null +++ b/tests/runners/hf_mbart_model/config.yaml @@ -0,0 +1,20 @@ +# Config file for the VLLM runner + +model: + id: "hf-mbart-model" + user_id: "user_id" + app_id: "app_id" + model_type_id: "text-to-text" + +build_info: + python_version: "3.12" + +inference_compute_info: + cpu_limit: "500m" + cpu_memory: "500Mi" + num_accelerators: 0 + +checkpoints: + type: "huggingface" + repo_id: "sshleifer/tiny-mbart" + hf_token: "" diff --git a/tests/runners/hf_mbart_model/requirements.txt b/tests/runners/hf_mbart_model/requirements.txt new file mode 100644 index 00000000..fdb027d6 --- /dev/null +++ b/tests/runners/hf_mbart_model/requirements.txt @@ -0,0 +1,7 @@ +torch==2.4.0 +tokenizers>=0.19.0 +transformers>=4.44 +accelerate>=1.0.1 +optimum>=1.20.0 +sentencepiece==0.2.0 +requests==2.23.0 diff --git a/tests/runners/test_model_run_locally.py b/tests/runners/test_model_run_locally.py index c9a6b29b..5cece674 100644 --- a/tests/runners/test_model_run_locally.py +++ b/tests/runners/test_model_run_locally.py @@ -1,23 +1,97 @@ import os +import shutil import subprocess from pathlib import Path import pytest +import yaml from clarifai.runners.models.model_run_locally import ModelRunLocally -MODEL_PATH = os.path.join(os.path.dirname(__file__), "dummy_runner_models") CLARIFAI_USER_ID = os.environ["CLARIFAI_USER_ID"] CLARIFAI_PAT = os.environ["CLARIFAI_PAT"] @pytest.fixture -def model_run_locally(): +def dummy_models_path(tmp_path): + """ + Copy the dummy_runner_models folder to a temp directory and update app_id in config.yaml + so that your e2e tests use a newly created ephemeral app on your Clarifai account. + """ + tests_dir = Path(__file__).parent.resolve() + original_dummy_path = tests_dir / "dummy_runner_models" + if not original_dummy_path.exists(): + # Adjust or raise an error if you cannot locate the dummy_runner_models folder + raise FileNotFoundError(f"Could not find dummy_runner_models at {original_dummy_path}. " + "Adjust path or ensure it exists.") + + # Copy the entire folder to tmp_path + target_folder = tmp_path / "dummy_runner_models" + shutil.copytree(original_dummy_path, target_folder) + + # Update the config.yaml to override the app_id with the ephemeral one + config_yaml_path = target_folder / "config.yaml" + with config_yaml_path.open("r") as f: + config = yaml.safe_load(f) + + # Overwrite the app_id with the newly created clarifai_app + config["model"]["user_id"] = CLARIFAI_USER_ID + + # Rewrite config.yaml + with config_yaml_path.open("w") as f: + yaml.dump(config, f, sort_keys=False) + + return str(target_folder) + + +@pytest.fixture +def model_run_locally(dummy_models_path): + """ + Fixture that instantiates the ModelRunLocally class + with the dummy model_path that already exists. + """ + return ModelRunLocally(dummy_models_path) + + +@pytest.fixture +def dummy_hf_models_path(tmp_path): + """ + Copy the hf_mbart_model folder to a temp directory and update app_id in config.yaml + so that your e2e tests use a newly created ephemeral app on your Clarifai account. + """ + tests_dir = Path(__file__).parent.resolve() + original_dummy_path = tests_dir / "hf_mbart_model" + if not original_dummy_path.exists(): + # Adjust or raise an error if you cannot locate the hf_mbart_model folder + raise FileNotFoundError(f"Could not find hf_mbart_model at {original_dummy_path}. " + "Adjust path or ensure it exists.") + + # Copy the entire folder to tmp_path + target_folder = tmp_path / "hf_mbart_model" + shutil.copytree(original_dummy_path, target_folder) + + # Update the config.yaml to override the app_id with the ephemeral one + config_yaml_path = target_folder / "config.yaml" + with config_yaml_path.open("r") as f: + config = yaml.safe_load(f) + + # Overwrite the app_id with the newly created clarifai_app + config["model"]["user_id"] = CLARIFAI_USER_ID + + # Rewrite config.yaml + with config_yaml_path.open("w") as f: + yaml.dump(config, f, sort_keys=False) + + return str(target_folder) + + +@pytest.fixture +def hf_model_run_locally(dummy_hf_models_path): """ Fixture that instantiates the ModelRunLocally class with the dummy model_path that already exists. """ - return ModelRunLocally(MODEL_PATH) + return ModelRunLocally(dummy_hf_models_path) def test_get_model_runner(model_run_locally): @@ -68,9 +142,6 @@ def test_install_requirements(model_run_locally): model_run_locally.install_requirements() except SystemExit: pytest.fail("install_requirements() failed and exited.") - # You might want to verify the presence of installed packages by checking - # the venv's site-packages or something similar. For simplicity, we'll only - # verify that no exception was raised. # Clean up model_run_locally.clean_up() @@ -103,10 +174,6 @@ def test_docker_build_and_test_container(model_run_locally): Test building a Docker image and running a container test using the dummy model. This test will be skipped if Docker is not installed. """ - # Setup - # download_checkpoints & createDockerfile are called in the main() - # but we can do it here if needed. The code calls them automatically - # in main if inside_container is True, we directly test the method: # Test if Docker is installed assert model_run_locally.is_docker_installed(), "Docker not installed, skipping." @@ -138,3 +205,68 @@ def test_docker_build_and_test_container(model_run_locally): # Remove the image model_run_locally.remove_docker_image(image_name) + + +def test_hf_test_model_success(hf_model_run_locally): + """ + Test that test_model succeeds with the dummy model. + This calls the script's test_model method, which runs a subprocess. + """ + hf_model_run_locally.uploader.download_checkpoints() + hf_model_run_locally.create_temp_venv() + hf_model_run_locally.install_requirements() + + # Catch the subprocess call. If the dummy model is correct, exit code should be 0. + try: + hf_model_run_locally.test_model() + except SystemExit: + pytest.fail("test_model() triggered a system exit with non-zero code.") + except subprocess.CalledProcessError: + # If the process didn't return code 0, fail the test + pytest.fail("The model test did not complete successfully in the subprocess.") + finally: + # Clean up + hf_model_run_locally.clean_up() + + +# @pytest.mark.skipif(shutil.which("docker") is None, reason="Docker not installed or not in PATH.") +@pytest.mark.skip(reason="Will add later after new clarifai package is released") +def test_hf_docker_build_and_test_container(hf_model_run_locally): + """ + Test building a Docker image and running a container test using the dummy model. + This test will be skipped if Docker is not installed. + """ + + # Download the checkpoints for the model + hf_model_run_locally.uploader.download_checkpoints() + + # Test if Docker is installed + assert hf_model_run_locally.is_docker_installed(), "Docker not installed, skipping." + + # Build or re-build the Docker image + hf_model_run_locally.uploader.create_dockerfile() + image_tag = hf_model_run_locally._docker_hash() + image_name = f"{hf_model_run_locally.config['model']['id']}:{image_tag}" + + if not hf_model_run_locally.docker_image_exists(image_name): + hf_model_run_locally.build_docker_image(image_name=image_name) + + # Run tests inside the container + try: + hf_model_run_locally.test_model_container( + image_name=image_name, + container_name="test_clarifai_model_container", + env_vars={ + 'CLARIFAI_PAT': CLARIFAI_PAT, + 'CLARIFAI_API_BASE': os.environ.get('CLARIFAI_API_BASE', 'https://api.clarifai.com') + }) + except subprocess.CalledProcessError: + pytest.fail("Failed to test the model inside the docker container.") + finally: + # Clean up the container if it still exists + if hf_model_run_locally.container_exists("test_clarifai_model_container"): + hf_model_run_locally.stop_docker_container("test_clarifai_model_container") + hf_model_run_locally.remove_docker_container("test_clarifai_model_container") + + # Remove the image + hf_model_run_locally.remove_docker_image(image_name) diff --git a/tests/runners/test_model_upload.py b/tests/runners/test_model_upload.py new file mode 100644 index 00000000..979aba90 --- /dev/null +++ b/tests/runners/test_model_upload.py @@ -0,0 +1,166 @@ +import os +import shutil +import uuid +from pathlib import Path + +import pytest +import yaml +from clarifai_grpc.grpc.api.status import status_code_pb2 + +from clarifai.client import User +from clarifai.runners.models.model_upload import ModelUploader + +MODEL_PATH = os.path.join(os.path.dirname(__file__), "dummy_runner_models") +CLARIFAI_USER_ID = os.environ["CLARIFAI_USER_ID"] +CLARIFAI_PAT = os.environ["CLARIFAI_PAT"] +NOW = uuid.uuid4().hex[:10] +CREATE_APP_ID = f"pytest-model-upload-test-{NOW}" + + +def check_app_exists(): + """ + Check if the app exists on the user account. + """ + user = User( + user_id=CLARIFAI_USER_ID, + base_url=os.environ.get('CLARIFAI_API_BASE', 'https://api.clarifai.com'), + pat=CLARIFAI_PAT, + ) + apps = user.list_apps() + for app in apps: + if app.id == CREATE_APP_ID: + return True + return False + + +def create_app(): + """ + Creates a Clarifai app for testing purposes. + """ + + user = User( + user_id=CLARIFAI_USER_ID, + base_url=os.environ.get('CLARIFAI_API_BASE', 'https://api.clarifai.com'), + pat=CLARIFAI_PAT, + ) + if check_app_exists(): + print(f"App '{CREATE_APP_ID}' already exists.") + else: + print(f"Creating app '{CREATE_APP_ID}'...") + user.create_app(app_id=CREATE_APP_ID) + return CREATE_APP_ID, user + + +@pytest.fixture(scope="module") +def clarifai_app(): + """ + Fixture to create and clean up a Clarifai app before/after running the tests. + """ + app_id, user = create_app() + yield app_id # Provide the app_id to the tests + # Cleanup: delete the app after tests + try: + user.delete_app(app_id=app_id) + print(f"Deleted app '{app_id}' successfully.") + except Exception as e: + print(f"Failed to delete app '{app_id}': {e}") + + +@pytest.fixture +def dummy_models_path(tmp_path, clarifai_app): + """ + Copy the dummy_runner_models folder to a temp directory and update app_id in config.yaml + so that your e2e tests use a newly created ephemeral app on your Clarifai account. + """ + tests_dir = Path(__file__).parent.resolve() + original_dummy_path = tests_dir / "dummy_runner_models" + if not original_dummy_path.exists(): + # Adjust or raise an error if you cannot locate the dummy_runner_models folder + raise FileNotFoundError(f"Could not find dummy_runner_models at {original_dummy_path}. " + "Adjust path or ensure it exists.") + + # Copy the entire folder to tmp_path + target_folder = tmp_path / "dummy_runner_models" + shutil.copytree(original_dummy_path, target_folder) + + # Update the config.yaml to override the app_id with the ephemeral one + config_yaml_path = target_folder / "config.yaml" + with config_yaml_path.open("r") as f: + config = yaml.safe_load(f) + + # Overwrite the app_id with the newly created clarifai_app + config["model"]["user_id"] = CLARIFAI_USER_ID + config["model"]["app_id"] = clarifai_app + + # Rewrite config.yaml + with config_yaml_path.open("w") as f: + yaml.dump(config, f, sort_keys=False) + + return str(target_folder) + + +@pytest.fixture +def model_uploader(dummy_models_path): + """ + Returns a ModelUploader instance for general usage in tests. + """ + uploader = ModelUploader(folder=dummy_models_path, validate_api_ids=False) + return uploader + + +def test_init_valid_folder(model_uploader): + """ + Ensure that creating a ModelUploader with a valid folder + does not raise any exceptions and sets up the object correctly. + """ + assert os.path.exists(model_uploader.folder) + assert "config.yaml" in os.listdir(model_uploader.folder) + + +def test_model_uploader_flow(dummy_models_path): + """ + End-to-end test that: + 1. Initializes the ModelUploader on the dummy_runner_models folder + 2. Checks folder validation + 3. Creates or reuses an existing model + 4. Uploads a new model version + 5. Waits for the build + """ + # Initialize + uploader = ModelUploader(folder=str(dummy_models_path)) + assert uploader.folder == str(dummy_models_path), "Uploader folder mismatch" + + # Basic checks on config + assert uploader.config["model"]["id"] == "dummy-runner-model" + assert uploader.config["model"]["user_id"] == os.environ["CLARIFAI_USER_ID"] + # The app_id should be updated to the newly created ephemeral one + assert uploader.config["model"]["app_id"] == CREATE_APP_ID + + # # Validate that the model doesn't exist yet + # # Because we are using a new ephemeral app, it's unlikely to exist + # assert uploader.check_model_exists() is False, "Model should not exist on new ephemeral app" + + # Create the model (on Clarifai side) + create_resp = uploader.maybe_create_model() + + if create_resp: + returned_code = create_resp.status.code + assert returned_code in [ + status_code_pb2.SUCCESS, + ], f"Model creation failed with {returned_code}" + + # Now the model should exist + assert uploader.check_model_exists() is True, "Model should exist after creation" + + # Create the Dockerfile (not crucial for the actual build, but tested in the script) + uploader.create_dockerfile() + dockerfile_path = Path(uploader.folder) / "Dockerfile" + assert dockerfile_path.exists(), "Dockerfile was not created." + + # Upload a new version + uploader.upload_model_version(download_checkpoints=False) + + # After starting the upload/build, we expect model_version_id to be set if it began building + assert uploader.model_version_id is not None, "Model version upload failed to initialize" + + print(f"Test completed successfully with model_version_id={uploader.model_version_id}")