diff --git a/app.py b/app.py index 9bceb8ee..a1ce13a9 100644 --- a/app.py +++ b/app.py @@ -9,12 +9,13 @@ import glob from datetime import datetime, timedelta import numpy as np -from utilsAPI import getAPIURL, getWorkerType, getASInstance, unprotect_current_instance, get_number_of_pending_trials +from utilsAPI import getAPIURL, getWorkerType, getErrorLogBool, getASInstance, unprotect_current_instance, get_number_of_pending_trials from utilsAuth import getToken from utils import (getDataDirectory, checkTime, checkResourceUsage, sendStatusEmail, checkForTrialsWithStatus, getCommitHash, getHostname, postLocalClientInfo, - postProcessedDuration, makeRequestWithRetry) + postProcessedDuration, makeRequestWithRetry, + writeToErrorLog) logging.basicConfig(level=logging.INFO) @@ -24,6 +25,9 @@ autoScalingInstance = getASInstance() logging.info(f"AUTOSCALING TEST INSTANCE: {autoScalingInstance}") +ERROR_LOG = getErrorLogBool() +error_log_path = "/data/error_log.json" + # if true, will delete entire data directory when finished with a trial isDocker = True @@ -160,10 +164,24 @@ time.sleep(0.5) except Exception as e: - r = makeRequestWithRetry('PATCH', - trial_url, data={"status": "error"}, - headers = {"Authorization": "Token {}".format(API_TOKEN)}) - traceback.print_exc() + try: + r = makeRequestWithRetry('PATCH', + trial_url, data={"status": "error"}, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + traceback.print_exc() + + if ERROR_LOG: + stack = traceback.format_exc() + writeToErrorLog(error_log_path, trial["session"], trial["id"], + e, stack) + + except: + traceback.print_exc() + + if ERROR_LOG: + stack = traceback.format_exc() + writeToErrorLog(error_log_path, trial["session"], trial["id"], + e, stack) # Antoine: Removing this, it is too often causing the machines to stop. Not because # the machines are failing, but because for instance the video is very long with a lot @@ -178,8 +196,16 @@ finally: # End process duration timer and post duration to database - process_end_time = datetime.now() - postProcessedDuration(trial_url, process_end_time - process_start_time) + try: + process_end_time = datetime.now() + postProcessedDuration(trial_url, process_end_time - process_start_time) + except Exception as e: + traceback.print_exc() + + if ERROR_LOG: + stack = traceback.format_exc() + writeToErrorLog(error_log_path, trial["session"], trial["id"], + e, stack) justProcessed = True diff --git a/docker/Makefile b/docker/Makefile index fda79023..ff2197b5 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -3,6 +3,10 @@ REPO_NAME := opencap PROD_BRANCH := main DEV_BRANCH := dev +# Initialize variables if not passed in +INSTANCE_ID ?= 0 +CPU_SET ?= "" + # Determine the branch name CURRENT_BRANCH := $(shell git rev-parse --abbrev-ref HEAD) @@ -68,12 +72,13 @@ endif .PHONY: run run: -ifeq ($(CURRENT_BRANCH),$(PROD_BRANCH)) - aws ecr get-login-password --region us-west-2 --profile opencap | docker login --username AWS --password-stdin 660440363484.dkr.ecr.us-west-2.amazonaws.com - -else ifeq ($(CURRENT_BRANCH),$(DEV_BRANCH)) - aws ecr get-login-password --region us-west-2 --profile opencap | docker login --username AWS --password-stdin 660440363484.dkr.ecr.us-west-2.amazonaws.com - -endif - - OPENCAP_IMAGE_NAME=$(OPENCAP_IMAGE_NAME) OPENPOSE_IMAGE_NAME=$(OPENPOSE_IMAGE_NAME) MMPOSE_IMAGE_NAME=$(MMPOSE_IMAGE_NAME) docker-compose up \ No newline at end of file + @echo "Usage: sudo make run INSTANCE_ID= CPU_SET=" + @echo "Defaults: INSTANCE_ID=0, CPU_SET=\"\"" + + COMPOSE_PROJECT_NAME=opencap_$(INSTANCE_ID) \ + OPENCAP_IMAGE_NAME=$(OPENCAP_IMAGE_NAME) \ + OPENPOSE_IMAGE_NAME=$(OPENPOSE_IMAGE_NAME) \ + MMPOSE_IMAGE_NAME=$(MMPOSE_IMAGE_NAME) \ + INSTANCE_ID=$(INSTANCE_ID) \ + CPU_SET=$(CPU_SET) \ + docker compose up -d diff --git a/docker/check-containers-health.sh b/docker/check-containers-health.sh new file mode 100755 index 00000000..e87cfc0c --- /dev/null +++ b/docker/check-containers-health.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +# Function to check if a container is running +is_container_alive() { + local container_name=$1 + docker ps --filter "name=^/${container_name}$" --filter "status=running" --format '{{.Names}}' | grep -wq "$container_name" + return $? +} + +# Loop through numbers 0 to 7 +for n in {0..7}; do + # Container names + opencap_openpose="opencap_${n}-openpose-1" + opencap_mmpose="opencap_${n}-mmpose-1" + opencap_mobilecap="opencap_${n}-mobilecap-1" + + # Check if all three containers are alive + if is_container_alive "$opencap_openpose" && \ + is_container_alive "$opencap_mmpose" && \ + is_container_alive "$opencap_mobilecap"; then + echo "All containers for instance $n are alive. Skipping." + continue + fi + + # Check if any container exists + if docker ps -a --filter "name=^/opencap_${n}-(openpose|mmpose|mobilecap)-1$" --format '{{.Names}}' | grep -q "opencap_${n}"; then + echo "Some containers for instance $n are not alive. Stopping instance." + ./stop-container.sh "$n" + ./start-container.sh "$n" + else + echo "No containers for instance $n. Skipping." + fi + +done diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 169ff406..55acdbdc 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -13,8 +13,14 @@ services: reservations: devices: - driver: nvidia - count: 1 capabilities: [gpu] + device_ids: ["${INSTANCE_ID}"] + cpuset: "${CPU_SET}" + logging: + driver: "json-file" + options: + max-size: "100m" # Rotate when the log reaches 10MB + max-file: "7" # Keep the last 7 log files openpose: image: ${OPENPOSE_IMAGE_NAME} volumes: @@ -24,8 +30,14 @@ services: reservations: devices: - driver: nvidia - count: 1 capabilities: [gpu] + device_ids: ["${INSTANCE_ID}"] + cpuset: "${CPU_SET}" + logging: + driver: "json-file" + options: + max-size: "100m" # Rotate when the log reaches 10MB + max-file: "7" # Keep the last 7 log files mmpose: image: ${MMPOSE_IMAGE_NAME} volumes: @@ -35,7 +47,14 @@ services: reservations: devices: - driver: nvidia - count: 1 capabilities: [gpu] + device_ids: ["${INSTANCE_ID}"] + cpuset: "${CPU_SET}" + logging: + driver: "json-file" + options: + max-size: "100m" # Rotate when the log reaches 10MB + max-file: "7" # Keep the last 7 log files + volumes: data: {} diff --git a/docker/start-container.sh b/docker/start-container.sh new file mode 100755 index 00000000..18a3dd36 --- /dev/null +++ b/docker/start-container.sh @@ -0,0 +1,51 @@ +#!/bin/bash + +# Configuration +MAX_INSTANCES=8 +CPUS_PER_INSTANCE=14 +GPUS_PER_INSTANCE=1 + +# Get the total number of CPUs and GPUs available +TOTAL_CPUS=$(nproc) +TOTAL_GPUS=$(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l) + +# Check if an instance number is provided +if [ -z "$1" ]; then + echo "Usage: $0 " + echo "Provide the instance number to start (0 to $((MAX_INSTANCES - 1)))." + exit 1 +fi + +INSTANCE_NUMBER=$1 + +# Validate the instance number +if (( INSTANCE_NUMBER < 0 || INSTANCE_NUMBER >= MAX_INSTANCES )); then + echo "Error: Instance number must be between 0 and $((MAX_INSTANCES - 1))." + exit 1 +fi + +# Compute CPU and GPU offsets for the selected instance +CPU_START=$(( INSTANCE_NUMBER * CPUS_PER_INSTANCE )) +CPU_END=$(( CPU_START + CPUS_PER_INSTANCE - 1 )) +CPU_SET="${CPU_START}-${CPU_END}" + +# Validate resource availability +if (( CPU_START + CPUS_PER_INSTANCE > TOTAL_CPUS )); then + echo "Error: Not enough CPUs available for instance $INSTANCE_NUMBER." + exit 1 +fi + +if (( INSTANCE_NUMBER >= TOTAL_GPUS )); then + echo "Error: Not enough GPUs available for instance $INSTANCE_NUMBER." + exit 1 +fi + +# Start the specific instance +echo "Starting instance $INSTANCE_NUMBER with CPU_SET=${CPU_SET} and GPU=${INSTANCE_NUMBER}" + +# Run docker-compose for the specific instance +make run INSTANCE_ID=$INSTANCE_NUMBER CPU_SET=$CPU_SET + +sleep 10 + +echo "Instance $INSTANCE_NUMBER started successfully." diff --git a/docker/start-containers.sh b/docker/start-containers.sh new file mode 100755 index 00000000..fb281e39 --- /dev/null +++ b/docker/start-containers.sh @@ -0,0 +1,60 @@ +#!/bin/bash + +# Configuration +MAX_INSTANCES=8 +CPUS_PER_INSTANCE=14 +GPUS_PER_INSTANCE=1 + +# Get the total number of CPUs and GPUs available +TOTAL_CPUS=$(nproc) +TOTAL_GPUS=$(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l) + +# Read number of instances to start +if [ -z "$1" ]; then + echo "Usage: $0 " + echo "Provide the number of instances to start (max $MAX_INSTANCES)." + exit 1 +fi + +NUM_INSTANCES=$1 + +# Validate the number of instances +if (( NUM_INSTANCES > MAX_INSTANCES )); then + echo "Error: Maximum number of instances is $MAX_INSTANCES." + exit 1 +fi + +# Check if there are enough resources +if (( NUM_INSTANCES * CPUS_PER_INSTANCE > TOTAL_CPUS )); then + echo "Error: Not enough CPUs. Required: $((NUM_INSTANCES * CPUS_PER_INSTANCE)), Available: $TOTAL_CPUS." + exit 1 +fi + +if (( NUM_INSTANCES * GPUS_PER_INSTANCE > TOTAL_GPUS )); then + echo "Error: Not enough GPUs. Required: $((NUM_INSTANCES * GPUS_PER_INSTANCE)), Available: $TOTAL_GPUS." + exit 1 +fi + +# Display summary +echo "Starting $NUM_INSTANCES instances..." +echo "Total CPUs: $TOTAL_CPUS (using $CPUS_PER_INSTANCE per instance)" +echo "Total GPUs: $TOTAL_GPUS (using $GPUS_PER_INSTANCE per instance)" +echo + +# Start instances +for (( i=0; i" + exit 1 +fi + +INSTANCE_ID=$1 +COMPOSE_PROJECT_NAME="opencap_${INSTANCE_ID}" + +echo "Stopping and removing containers for INSTANCE_ID=${INSTANCE_ID}..." + +# Stop and remove containers associated with the project +docker-compose \ + --project-name $COMPOSE_PROJECT_NAME \ + down + +# Verify if containers are removed +if [ $? -eq 0 ]; then + echo "Successfully stopped and removed containers for INSTANCE_ID=${INSTANCE_ID}." +else + echo "Failed to stop and remove containers for INSTANCE_ID=${INSTANCE_ID}." +fi + diff --git a/utils.py b/utils.py index 2b140eed..1c2445f5 100644 --- a/utils.py +++ b/utils.py @@ -12,6 +12,7 @@ import subprocess import zipfile import time +import datetime import numpy as np import pandas as pd @@ -1585,6 +1586,35 @@ def checkCudaTF(): sendStatusEmail(message=message) raise Exception("No GPU detected. Exiting.") +def writeToJsonLog(path, new_dict, max_entries=1000): + dir_name = os.path.dirname(path) + if not os.path.exists(dir_name): + os.makedirs(dir_name) + + if os.path.exists(path): + with open(path, 'r') as f: + data = json.load(f) + else: + data = [] + + data.append(new_dict) + + while len(data) > max_entries: + data.pop(0) + + with open(path, 'w') as f: + json.dump(data, f) + +def writeToErrorLog(path, session_id, trial_id, error, stack, max_entries=1000): + error_entry = { + 'session_id': session_id, + 'trial_id': trial_id, + 'datetime': datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S"), + 'error': str(error), + 'stack': stack + } + writeToJsonLog(path, error_entry, max_entries) + # %% Some functions for loading subject data def getSubjectNumber(subjectName): @@ -1717,28 +1747,22 @@ def makeRequestWithRetry(method, url, Returns: requests.Response: The response object for further processing. """ - try: - retry_strategy = Retry( - total=retries, - backoff_factor=backoff_factor, - status_forcelist=[429, 500, 502, 503, 504], - allowed_methods={'DELETE', 'GET', 'POST', 'PUT', 'PATCH'} - ) - - adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy) - with requests.Session() as session: - session.mount("https://", adapter) - response = session.request(method, - url, - headers=headers, - data=data, - params=params, - files=files) - response.raise_for_status() - return response - - except requests.exceptions.HTTPError as e: - raise Exception(f"HTTP error occurred: {e}") - - except Exception as e: - raise Exception(f"An error occurred: {e}") + retry_strategy = Retry( + total=retries, + backoff_factor=backoff_factor, + status_forcelist=[429, 500, 502, 503, 504], + allowed_methods={'DELETE', 'GET', 'POST', 'PUT', 'PATCH'} + ) + + adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy) + with requests.Session() as session: + session.mount("https://", adapter) + response = session.request(method, + url, + headers=headers, + data=data, + params=params, + files=files) + response.raise_for_status() + return response + diff --git a/utilsAPI.py b/utilsAPI.py index 982bf4f7..1535406c 100644 --- a/utilsAPI.py +++ b/utilsAPI.py @@ -48,6 +48,9 @@ def getStatusEmails(): return emailInfo +def getErrorLogBool(): + return config('ERROR_LOG', default=False, cast=bool) + def getASInstance(): try: # Check if the ECS_CONTAINER_METADATA_FILE environment variable exists diff --git a/utilsServer.py b/utilsServer.py index 17d5da51..116c5390 100644 --- a/utilsServer.py +++ b/utilsServer.py @@ -4,6 +4,8 @@ import requests import json import logging +import time +import random from main import main from utils import getDataDirectory @@ -463,35 +465,63 @@ def batchReprocess(session_ids,calib_id,static_id,dynamic_trialNames,poseDetecto data=statusData, headers = {"Authorization": "Token {}".format(API_TOKEN)}) -def runTestSession(pose='all',isDocker=True): - trials = {} - - if not any(s in API_URL for s in ['dev.opencap', '127.0']) : # prod trials - trials['openpose'] = '3f2960c7-ca29-45b0-9be5-8d74db6131e5' # session ae2d50f1-537a-44f1-96a5-f5b7717452a3 - trials['hrnet'] = '299ca938-8765-4a84-9adf-6bdf0e072451' # session faef80d3-0c26-452c-a7be-28dbfe04178e - # trials['failure'] = '698162c8-3980-46e5-a3c5-8d4f081db4c4' # failed trial for testing - else: # dev trials - trials['openpose'] = '89d77579-8371-4760-a019-95f2c793622c' # session acd0e19c-6c86-4ba4-95fd-94b97229a926 - trials['hrnet'] = 'e0e02393-42ee-46d4-9ae1-a6fbb0b89c42' # session 3510c726-a1b8-4de4-a4a2-52b021b4aab2 - - if pose == 'all': - trialList = list(trials.values()) - else: - try: - trialList = [trials[pose]] - except: +def runTestSession(pose='all',isDocker=True,maxNumTries=3): + # We retry test sessions because different sometimes when different + # containers are processing the test trial, the API can change the + # URL, causing 404 errors. + numTries = 0 + while numTries < maxNumTries: + numTries += 1 + logging.info(f"Starting test trial attempt #{numTries} of {maxNumTries}") + trials = {} + + if not any(s in API_URL for s in ['dev.opencap', '127.0']) : # prod trials + trials['openpose'] = '3f2960c7-ca29-45b0-9be5-8d74db6131e5' # session ae2d50f1-537a-44f1-96a5-f5b7717452a3 + trials['hrnet'] = '299ca938-8765-4a84-9adf-6bdf0e072451' # session faef80d3-0c26-452c-a7be-28dbfe04178e + # trials['failure'] = '698162c8-3980-46e5-a3c5-8d4f081db4c4' # failed trial for testing + else: # dev trials + trials['openpose'] = '89d77579-8371-4760-a019-95f2c793622c' # session acd0e19c-6c86-4ba4-95fd-94b97229a926 + trials['hrnet'] = 'e0e02393-42ee-46d4-9ae1-a6fbb0b89c42' # session 3510c726-a1b8-4de4-a4a2-52b021b4aab2 + + if pose == 'all': trialList = list(trials.values()) + else: + try: + trialList = [trials[pose]] + except: + trialList = list(trials.values()) - try: - for trial_id in trialList: - trial = getTrialJson(trial_id) - logging.info("Running status check on trial name: " + trial['name'] + "_" + str(trial_id) + "\n\n") - processTrial(trial["session"], trial_id, trial_type='static', isDocker=isDocker) - except: - logging.info("test trial failed. stopping machine.") - # send email - message = "A backend OpenCap machine failed the status check. It has been stopped." - sendStatusEmail(message=message) - raise Exception('Failed status check. Stopped.') - - logging.info("\n\n\nStatus check succeeded. \n\n") + try: + for trial_id in trialList: + trial = getTrialJson(trial_id) + logging.info("Running status check on trial name: " + trial['name'] + "_" + str(trial_id) + "\n\n") + processTrial(trial["session"], trial_id, trial_type='static', isDocker=isDocker) + + logging.info("\n\n\nStatus check succeeded. \n\n") + return + + # Catch and re-enter while loop if it's an HTTPError (could be more + # than just 404 errors). Wait between 30 and 60 seconds before + # retrying. + except requests.exceptions.HTTPError as e: + if numTries < maxNumTries: + logging.info(f"test trial failed on try #{numTries} due to HTTPError. Retrying.") + wait_time = random.randint(30,60) + logging.info(f"waiting {wait_time} seconds then retrying...") + time.sleep(wait_time) + continue + else: + logging.info(f"test trial failed on try #{numTries} due to HTTPError.") + # send email + message = "A backend OpenCap machine failed the status check (HTTPError). It has been stopped." + sendStatusEmail(message=message) + raise Exception('Failed status check (HTTPError). Stopped.') + + # Catch other errors and stop + except: + logging.info("test trial failed. stopping machine.") + # send email + message = "A backend OpenCap machine failed the status check. It has been stopped." + sendStatusEmail(message=message) + raise Exception('Failed status check. Stopped.') + \ No newline at end of file