Skip to content

Commit

Permalink
ariadne: Autoremove orphaned data even for old/running/completed jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
ximion committed Oct 25, 2024
1 parent f4a1e5c commit 5a2c92f
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 59 deletions.
6 changes: 6 additions & 0 deletions src/laniakea/ariadne/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@
#
# SPDX-License-Identifier: LGPL-3.0+

from laniakea.ariadne.maintenance import (
delete_orphaned_jobs,
remove_superfluous_pending_jobs,
)
from laniakea.ariadne.package_jobs import schedule_package_builds_for_source

__all__ = [
'schedule_package_builds_for_source',
'remove_superfluous_pending_jobs',
'delete_orphaned_jobs',
]
110 changes: 110 additions & 0 deletions src/laniakea/ariadne/maintenance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2023-2024 Matthias Klumpp <[email protected]>
#
# SPDX-License-Identifier: LGPL-3.0+

import os

from laniakea import LkModule, LocalConfig
from laniakea.db import Job, JobKind, JobStatus, SourcePackage, config_get_value
from laniakea.utils import get_dir_shorthand_for_uuid
from laniakea.logging import log


def remove_superfluous_pending_jobs(session, simulate: bool = False, arch_indep_affinity: str | None = None):
"""Remove pending jobs that are no longer needed.
Pending jobs may become superfluous because their binaries have shown up by other means (e.g. due to
a manual upload), or because the source package that triggered them is no longer available.
:param session: A SQLAlchemy session
:param simulate: Do not perform any changes, just log what would be done
:param arch_indep_affinity: The architecture to use for arch-independent packages.
"""

if not arch_indep_affinity:
arch_indep_affinity = config_get_value(LkModule.ARIADNE, 'indep_arch_affinity')

pending_jobs = (
session.query(Job)
.filter(Job.module == LkModule.ARIADNE)
.filter(Job.status.in_((JobStatus.UNKNOWN, JobStatus.WAITING, JobStatus.DEPWAIT)))
.all()
)
for job in pending_jobs:
# The job only is an orphan if the source package triggering it
# does no longer exist with the given version number.
spkg = (
session.query(SourcePackage)
.filter(SourcePackage.source_uuid == job.trigger)
.filter(SourcePackage.version == job.version)
.one_or_none()
)
if spkg:
# check if we have binaries on the requested architecture,
# if so, this job is also no longer needed and can be removed.
binaries_available = False
for bin in spkg.binaries:
bin_arch = bin.architecture.name
if bin_arch == 'all':
bin_arch = arch_indep_affinity
if bin_arch == job.architecture:
binaries_available = True
break
if not binaries_available:
continue

# we have no source package for this job, so this job is orphaned and can never be processed.
# This happens if a job is scheduled for a package, and then the package is removed entirely from
# all archive suites while the job has not finished yet.
if simulate:
log.info('Delete orphaned pending job: {}'.format(str(job.uuid)))
else:
log.debug('Deleting orphaned pending job: {}'.format(str(job.uuid)))
session.delete(job)


def delete_orphaned_jobs(
session,
simulate: bool = False,
):
"""Expire records of all jobs that no longer have a source package in the archive.
:param session: A SQLAlchemy session
:param simulate: Do not perform any changes, just log what would be done
"""

lconf = LocalConfig()
# find all jobs where the source package has gone missing
pkgbuild_jobs = session.query(Job).filter(Job.module == LkModule.ARIADNE, Job.kind == JobKind.PACKAGE_BUILD).all()
for job in pkgbuild_jobs:
spkg = (
session.query(SourcePackage)
.filter(SourcePackage.source_uuid == job.trigger)
.filter(SourcePackage.version == job.version)
.one_or_none()
)
if not spkg:
log.info(f"Deleting old job {job.uuid} (package that triggered it is no longer available)")

# don't perform any action if we're just simulating
if simulate:
continue

# remove log files from disk, if they exist
job_id = str(job.uuid)
log_dir = os.path.join(lconf.logs_metadata_dir, get_dir_shorthand_for_uuid(job_id))
firehose_dir = os.path.join(log_dir, 'firehose')

log_fname = os.path.join(log_dir, job_id + '.log')
if os.path.exists(log_fname):
log.debug(f"Removing old log file {log_fname}")
os.remove(log_fname)
fh_fname = os.path.join(firehose_dir, job_id + '.firehose.xml')
if os.path.exists(fh_fname):
log.debug(f"Removing old firehose file {fh_fname}")
os.remove(fh_fname)

# drop the job
session.delete(job)
75 changes: 23 additions & 52 deletions src/lkarchive/ariadne.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@

from laniakea import LkModule
from laniakea.db import (
Job,
JobStatus,
SourcePackage,
ArchiveRepoSuiteSettings,
session_scope,
config_get_value,
)
from laniakea.utils import process_file_lock
from laniakea.ariadne import schedule_package_builds_for_source
from laniakea.ariadne import (
delete_orphaned_jobs,
remove_superfluous_pending_jobs,
schedule_package_builds_for_source,
)


def get_newest_sources_index(session, rss: ArchiveRepoSuiteSettings):
Expand Down Expand Up @@ -61,54 +63,6 @@ def get_newest_sources_index(session, rss: ArchiveRepoSuiteSettings):
return latest_spkg


def delete_orphaned_jobs(session, simulate: bool = False, arch_indep_affinity: str | None = None):
'''
Clean up jobs that were scheduled for source packages that have meanwhile been removed from
the archive entirely.
'''

if not arch_indep_affinity:
arch_indep_affinity = config_get_value(LkModule.ARIADNE, 'indep_arch_affinity')

pending_jobs = (
session.query(Job)
.filter(Job.module == LkModule.ARIADNE)
.filter(Job.status.in_((JobStatus.UNKNOWN, JobStatus.WAITING, JobStatus.DEPWAIT)))
.all()
)
for job in pending_jobs:
# The job only is an orphan if the source package triggering it
# does no longer exist with the given version number.
spkg = (
session.query(SourcePackage)
.filter(SourcePackage.source_uuid == job.trigger)
.filter(SourcePackage.version == job.version)
.one_or_none()
)
if spkg:
# check if we have binaries on the requested architecture,
# if so, this job is also no longer needed and can be removed.
binaries_available = False
for bin in spkg.binaries:
bin_arch = bin.architecture.name
if bin_arch == 'all':
bin_arch = arch_indep_affinity
if bin_arch == job.architecture:
binaries_available = True
break
if not binaries_available:
continue

# we have no source package for this job, so this job is orphaned and can never be processed.
# This happens if a job is scheduled for a package, and then the package is removed entirely from
# all archive suites while the job has not finished yet.
if simulate:
log.info('Delete orphaned job: {}'.format(str(job.uuid)))
else:
log.debug('Deleting orphaned job: {}'.format(str(job.uuid)))
session.delete(job)


def update_package_build_schedule(
session, rss: ArchiveRepoSuiteSettings, simulate=False, limit_architecture=None, limit_count=0
) -> int:
Expand Down Expand Up @@ -154,7 +108,7 @@ def update_package_build_schedule(
break

# cleanup
delete_orphaned_jobs(session, simulate, arch_indep_affinity=arch_indep_affinity)
remove_superfluous_pending_jobs(session, simulate, arch_indep_affinity=arch_indep_affinity)

# write all changes to database
session.commit()
Expand Down Expand Up @@ -251,3 +205,20 @@ def update_jobs(
file=sys.stderr,
)
sys.exit(3)


@click.command('cleanup-jobs')
@click.option(
'--simulate',
'simulate',
is_flag=True,
default=False,
help='Run simulation, don\'t remove any jobs and instead just display what would be done.',
)
def cleanup_jobs(
simulate: bool = False,
):
"""Remove orphaned completed jobs and their logs."""

with session_scope() as session:
delete_orphaned_jobs(session, simulate)
1 change: 1 addition & 0 deletions src/lkarchive/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def _register_commands():
import lkarchive.ariadne as ariadne

cli.add_command(ariadne.update_jobs)
cli.add_command(ariadne.cleanup_jobs)


def run(mainfile, args):
Expand Down
27 changes: 22 additions & 5 deletions src/lkscheduler/sdaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ def task_repository_publish(registry: JobsRegistry):


def task_repository_expire(registry: JobsRegistry):
"""Expire old packages in all repositories."""
"""Expire old packages and data in all repositories."""
import subprocess

# expire packages
with registry.lock_publish_job():
conf = SchedulerConfig()
log.info('Cleaning up repository data')
Expand All @@ -73,10 +74,26 @@ def task_repository_expire(registry: JobsRegistry):
start_new_session=True,
check=False,
)
if proc.returncode == 0:
scheduler_log.info('Archive-Expire: Success.')
else:
scheduler_log.error('Archive-Expire: Error: %s', str(proc.stdout, 'utf-8'))
if proc.returncode == 0:
scheduler_log.info('Archive-Expire: Success.')
else:
scheduler_log.error('Archive-Expire: Error: %s', str(proc.stdout, 'utf-8'))

# remove job data that is no longer needed
with registry.lock_publish_job():
log.info('Cleaning up job data')
proc = subprocess.run(
[conf.lk_archive_exe, 'cleanup-jobs'],
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
start_new_session=True,
check=False,
)
if proc.returncode == 0:
scheduler_log.info('Archive-JobCleanup: Success.')
else:
scheduler_log.error('Archive-JobCleanup: Error: %s', str(proc.stdout, 'utf-8'))


def task_rubicon_scan(registry: JobsRegistry):
Expand Down
4 changes: 2 additions & 2 deletions src/rubicon/rubiconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ def load_from_file(self, fname):
with open(fname) as json_file:
cdata = tomlkit.load(json_file)

self.log_storage_dir = cdata.get('LogStorage', self._lconf.logs_metadata_dir)
self.log_storage_dir = self._lconf.logs_metadata_dir
if not self.log_storage_dir:
raise Exception('No "LogStorage" entry in Rubicon configuration: We need to know where to store log files.')
raise Exception('No "LogStorage" entry in base configuration: We need to know where to store log files.')

self.rejected_dir = cdata.get('RejectedDir', self._lconf.upload_rejected_dir)
if not self.rejected_dir:
Expand Down

0 comments on commit 5a2c92f

Please sign in to comment.