Skip to content

Commit

Permalink
Merge branch '1.1.0' into 'main'
Browse files Browse the repository at this point in the history
1.1.0 release

See merge request belle2/software/b2luigi!42
  • Loading branch information
AlexanderHeidelbach committed Nov 20, 2024
2 parents 011775c + be25dc9 commit 0bf35b7
Show file tree
Hide file tree
Showing 28 changed files with 980 additions and 137 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.0.2
current_version = 1.1.0
commit = True
tag = True
commit_args = --no-verify
Expand Down
20 changes: 12 additions & 8 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ variables:
GITLAB_BELLE2_TOOLS_B2SETUP: "/cvmfs/belle.cern.ch/tools/b2setup"
FLIT_INDEX_URL: "https://upload.pypi.org/legacy/"
FLIT_USERNAME: __token__
BASF2_RELEASE_VERSION: "light-2409-toyger"

workflow:
rules:
Expand All @@ -22,7 +23,7 @@ workflow:
- if: $GITLAB_FORCE_RUNNING_PIPELINE == 'yes'

default:
image: belle2/externals-ubuntu2004:v01-12-01
image: belle2/belle2-base-el9:latest
tags:
- extagent48
before_script:
Expand All @@ -32,24 +33,27 @@ default:
- git config --global --add safe.directory $(dirname ${GITLAB_BELLE2_TOOLS_B2SETUP})
# Setup of basf2
- source ${GITLAB_BELLE2_TOOLS_B2SETUP}
- b2setup $(b2help-releases)
# Add $HOME/.local/bin to the path
- export PATH=${PATH}:${HOME}/.local/bin
- echo ${BASF2_RELEASE_VERSION}
- b2venv ${BASF2_RELEASE_VERSION}
- source venv/bin/activate

pre_commit:
stage: pre_commit
script:
- pip install -U --user pre-commit
- pip install pre-commit
- pre-commit run --all-files

test:
stage: test
script:
- pip install -U --user pytest pytest-cov python-coveralls flit
- pip install pytest pytest-cov python-coveralls flit
- flit install -s
- pytest -v --cov b2luigi --junitxml=$ARTIFACTS_TEST_PATH --cov-report=xml --cov-report=html --cov-report=term tests
- mv htmlcov $ARTIFACTS_COV_PATH
coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/'
parallel:
matrix:
- BASF2_RELEASE_VERSION: [$BASF2_PYTHON38_RELEASE, $BASF2_PYTHON311_RELEASE]
artifacts:
reports:
junit: $ARTIFACTS_TEST_PATH
Expand All @@ -67,7 +71,7 @@ docs:
- job: test
artifacts: false
script:
- pip install -U --user flit
- pip install flit
- flit install -s
- sphinx-build docs/ $ARTIFACTS_DOCS_PATH
artifacts:
Expand All @@ -85,7 +89,7 @@ publish:
- job: docs
artifacts: false
script:
- pip install -U --user setuptools wheel twine flit
- pip install setuptools wheel twine flit
- flit publish
only:
- tags
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ repos:
- id: trailing-whitespace
- id: check-added-large-files
- id: check-symlinks
- id: no-commit-to-branch
# - id: no-commit-to-branch
- id: end-of-file-fixer
exclude: ^tests/batch/_gbasf2_project_download_stdouts/
- id: fix-byte-order-marker
Expand Down
7 changes: 4 additions & 3 deletions b2luigi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Task scheduling and batch running for basf2 jobs made simple"""

from luigi import *
from luigi.util import copies

# version must be defined after importing the luigi namespace,
# otherwise the b2luigi.__version__ gets overwritten by the one from luigi
__version__ = "1.0.2"
__version__ = "1.1.0"

from b2luigi.core.parameter import wrap_parameter, BoolParameter
from typing import Optional, Union, Collection
Expand All @@ -14,7 +15,8 @@
from b2luigi.core.task import Task, ExternalTask, WrapperTask
from b2luigi.core.temporary_wrapper import on_temporary_files
from b2luigi.core.dispatchable_task import DispatchableTask, dispatch
from b2luigi.core.settings import get_setting, set_setting, clear_setting
from b2luigi.core.settings import get_setting, set_setting, clear_setting, _setting_file_iterator
from b2luigi.core.xrootd_targets import XrootDSystem, XrootDTarget
from b2luigi.cli.process import process


Expand Down Expand Up @@ -72,7 +74,6 @@ def requires(_self):


class inherits(object):

"""
This copies the luigi.inherits functionality but allows specifying parameters you
don't want to inherit.
Expand Down
105 changes: 105 additions & 0 deletions b2luigi/batch/processes/apptainer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import os
import subprocess

from b2luigi.batch.processes import BatchProcess, JobStatus
from b2luigi.core.utils import create_cmd_from_task, create_apptainer_command, get_log_file_dir


class ApptainerProcess(BatchProcess):
"""
Simple implementation of a batch process for running jobs in an Apptainer container. Strictly speaking,
this is not a batch process, but it is a simple way to run jobs in a container environment.
This process inherits the basic properties from :class:`b2luigi.batch.processes.BatchProcess` but does not
need to be executed in the ``batch`` context. However, running in ``batch`` mode is possible for the
``lsf`` and the ``htcondor`` batch systems. Although, for the latter batch system it is not recommended
to use apptainer images since HTCondor is already running in a container environment.
The core pinciple of this process is to run the task in an Apptainer container. To achive the execution of
tasks, an ``apptainer exec`` command is build within this class and executed in a subprocess. To stear the
execution, one can steer the following settings:
* ``apptainer_image``: The image to use for the Apptainer container.s
This parameter is mandatory and needs to be set if the task should be executed in an Apptainer container.
The image needs to be accessible from the machine where the task is executed. There are no further checks
if the image is available or valid. When using custom images, it may be helpfull to fisrt check the image
with ``apptainer inspect``. For people with access to the Belle II own ``/cvmfs`` directory, images are
provided in the ``/cvmfs/belle.cern.ch/images`` directory. The description of the images (the repository
contains the docker images which are transformed to Apptainer images) and instructions on how to create them
can be found in https://gitlab.desy.de/belle2/software/docker-images.
* ``apptainer_mounts``: A list of directories to mount into the Apptainer container.
This parameter is optional and can be used to mount directories into the Apptainer container. The directories
need to be accessible from the machine where the task is executed. The directories are mounted under the exact
same path as they are provided/on the host machine. For most usecases mounts need to be provided to access software
or data locations. For people using for example ``basf2`` software in the Apptainer container, the ``/cvmfs``
directory needs to be mounted. Caution is required when system specific directories are mounted.
* ``apptainer_mount_defaults``: Boolean parameter to mount ``log_dir`` and ``result_dir`` by default.
The default value is ``True`` meaning the ``result_dir`` and ``log_dir`` are automacially created and mounted if
they are not accessible from the execution location. When using custom targets with non local output directories,
this parameter should be set to ``False`` to avoid mounting unexisting directories.
* ``apptainer_additional_params``: Additional parameters to pass to the ``apptainer exec`` command.
This parameter should be a string and will be directly appended to the ``apptainer exec`` command. It can be used to
pass additional parameters to the ``apptainer exec`` command as they would be added in the CLI. A very
usefull parameter is the ``--cleanenv`` parameter which will clean the environment before executing the task in the
Apptainer container. This can be usefull to avoid conflicts with the environment in the container.
A prominent usecase is the usage of software which dependes on the operating system.
A simple example of how an Apptainer based task can be defined is shown below:
.. code-block:: python
class MyApptainerTask(luigi.Task):
apptainer_image = "/cvmfs/belle.cern.ch/images/belle2-base-el9"
apptainer_mounts = ["/cvmfs"]
apptainer_mount_defaults = True
apptainer_additional_params = "--cleanenv"
<rest of the task definition>
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._process = None
self._sdtout = None
self._stderr = None

def get_job_status(self):
if self._process is None:
return JobStatus.aborted

# Poll the process to check if it is still running
if self._process.poll() is None:
return JobStatus.running
else:
# If the process has finished, write output and return the appropriate status
self._stdout, self._stderr = self._process.communicate()
self._write_output()
return JobStatus.successful if self._process.returncode == 0 else JobStatus.aborted

def start_job(self):
command = " ".join(create_cmd_from_task(self.task))
exec_command = create_apptainer_command(command, task=self.task)

# Start the job and capture the job ID
self._process = subprocess.Popen(exec_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

def terminate_job(self):
if self._process is not None:
self._process.terminate()

def _write_output(self):
log_file_dir = get_log_file_dir(self.task)

with open(os.path.join(log_file_dir, "stdout"), "w") as f:
if self._stdout is not None:
f.write(self._stdout.decode())

with open(os.path.join(log_file_dir, "stderr"), "w") as f:
if self._stderr is not None:
f.write(self._stderr.decode())
18 changes: 9 additions & 9 deletions b2luigi/batch/processes/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,24 @@ class LSFProcess(BatchProcess):
For example:
.. code-block:: python
.. code-block:: python
class MyLongTask(b2luigi.Task):
queue = "l"
job_name = "my_long_task"
The default queue is the short queue ``"s"``. If no ``job_name`` is set the task
will appear as ::
The default queue is the short queue ``"s"``. If no ``job_name`` is set the task
will appear as ::
<result_dir>/parameter1=value/.../executable_wrapper.sh"
when running ``bjobs``.
when running ``bjobs``.
* By default, the environment variables from the scheduler are copied to
the workers.
This also applies we start in the same working directory and can reuse
the same executable etc.
Normally, you do not need to supply ``env_script`` or alike.
By default, the environment variables from the scheduler are copied to
the workers.
This also implies we start in the same working directory, can reuse
the same executable, etc.
Normally, you do not need to supply ``env_script`` or alike.
"""

def __init__(self, *args, **kwargs):
Expand Down
24 changes: 20 additions & 4 deletions b2luigi/batch/workers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import enum
import shutil

import luigi.interface
import luigi.worker

from b2luigi.batch.processes.lsf import LSFProcess
from b2luigi.batch.processes.htcondor import HTCondorProcess
from b2luigi.batch.processes.gbasf2 import Gbasf2Process
from b2luigi.batch.processes.apptainer import ApptainerProcess
from b2luigi.batch.processes.test import TestProcess
from b2luigi.core.settings import get_setting
from b2luigi.core.utils import create_output_dirs
Expand All @@ -20,9 +22,20 @@ class BatchSystems(enum.Enum):


class SendJobWorker(luigi.worker.Worker):
def _create_task_process(self, task):
batch_system = BatchSystems(get_setting("batch_system", default=BatchSystems.lsf, task=task))
def detect_batch_system(self, task):
batch_system_setting = get_setting("batch_system", default=BatchSystems.lsf, task=task)
if batch_system_setting == "auto":
if shutil.which("bsub"):
batch_system_setting = "lsf"
elif shutil.which("condor_submit"):
batch_system_setting = "htcondor"
else:
batch_system_setting = "local"

return BatchSystems(batch_system_setting)

def _create_task_process(self, task):
batch_system = self.detect_batch_system(task)
if batch_system == BatchSystems.lsf:
process_class = LSFProcess
elif batch_system == BatchSystems.htcondor:
Expand All @@ -32,8 +45,11 @@ def _create_task_process(self, task):
elif batch_system == BatchSystems.test:
process_class = TestProcess
elif batch_system == BatchSystems.local:
create_output_dirs(task)
return super()._create_task_process(task)
if get_setting("apptainer_image", default="", task=task):
process_class = ApptainerProcess
else:
create_output_dirs(task)
return super()._create_task_process(task)
else:
raise NotImplementedError

Expand Down
21 changes: 18 additions & 3 deletions b2luigi/core/executable.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
get_log_file_dir,
get_task_file_dir,
map_folder,
create_apptainer_command,
)


Expand Down Expand Up @@ -48,10 +49,24 @@ def create_executable_wrapper(task):

executable_wrapper_content.append("echo 'Current environment:'; env")

# 3. Third part is to call the actual program
# 3. Third part is to build the actual program
command = " ".join(create_cmd_from_task(task))
executable_wrapper_content.append("echo 'Will now execute the program'")
executable_wrapper_content.append(f"exec {command}")

# 4. Forth part is to create the correct execution command
# (a) If a valid apptainer image is provided, build an apptainer command
apptainer_image = get_setting("apptainer_image", task=task, default="")
if apptainer_image:
executable_wrapper_content.append(f"echo 'Will now execute the program with the image {apptainer_image}'")
apptainer_command_list = create_apptainer_command(command, task=task)
apptainer_command = " ".join(apptainer_command_list[:-1])
apptainer_command += f" '{apptainer_command_list[-1]}'"

executable_wrapper_content.append(apptainer_command)

# (b) Otherwise, just execute the command
else:
executable_wrapper_content.append("echo 'Will now execute the program'")
executable_wrapper_content.append(f"exec {command}")

# Now we can write the file
executable_file_dir = get_task_file_dir(task)
Expand Down
24 changes: 14 additions & 10 deletions b2luigi/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,20 @@ def clear_setting(key):


def _setting_file_iterator():
path = os.getcwd()

while True:
json_file = os.path.join(path, "settings.json")
if os.path.exists(json_file):
yield json_file

path = os.path.split(path)[0]
if path == "/":
break
# first, check if B2LUIGI_SETTINGS_JSON is set in the enviroment
if "B2LUIGI_SETTINGS_JSON" in os.environ:
yield os.environ["B2LUIGI_SETTINGS_JSON"]
# if it is not set, search in the durrent working dir (old behaviour)
else:
path = os.getcwd()
while True:
json_file = os.path.join(path, "settings.json")
if os.path.exists(json_file):
yield json_file

path = os.path.split(path)[0]
if path == "/":
break


@contextlib.contextmanager
Expand Down
2 changes: 1 addition & 1 deletion b2luigi/core/task.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections.abc import Iterable, Iterator
from typing import Iterable, Iterator
from b2luigi.core import utils
from typing import Any, Union, List, Dict, Optional

Expand Down
Loading

0 comments on commit 0bf35b7

Please sign in to comment.