Skip to content

Commit

Permalink
admin: Add command to retry stalled jobs in one go
Browse files Browse the repository at this point in the history
  • Loading branch information
ximion committed Nov 2, 2024
1 parent aff5807 commit d1d6aca
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 2 deletions.
2 changes: 2 additions & 0 deletions src/laniakea/ariadne/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# SPDX-License-Identifier: LGPL-3.0+

from laniakea.ariadne.maintenance import (
retry_stalled_jobs,
delete_orphaned_jobs,
remove_superfluous_pending_jobs,
)
Expand All @@ -14,4 +15,5 @@
'schedule_package_builds_for_source',
'remove_superfluous_pending_jobs',
'delete_orphaned_jobs',
'retry_stalled_jobs',
]
46 changes: 44 additions & 2 deletions src/laniakea/ariadne/maintenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@
# SPDX-License-Identifier: LGPL-3.0+

import os
from datetime import UTC, datetime, timedelta

from laniakea import LkModule, LocalConfig
from laniakea.db import Job, JobKind, JobStatus, SourcePackage, config_get_value
from laniakea.db import (
Job,
JobKind,
JobResult,
JobStatus,
SourcePackage,
config_get_value,
)
from laniakea.utils import get_dir_shorthand_for_uuid
from laniakea.logging import log

Expand Down Expand Up @@ -86,7 +94,7 @@ def delete_orphaned_jobs(
.one_or_none()
)
if not spkg:
log.info(f"Deleting old job {job.uuid} (package that triggered it is no longer available)")
log.info(f'Deleting old job {job.uuid} (package that triggered it is no longer available)')

# don't perform any action if we're just simulating
if simulate:
Expand All @@ -108,3 +116,37 @@ def delete_orphaned_jobs(

# drop the job
session.delete(job)


def retry_stalled_jobs(
session,
simulate: bool = False,
):
"""Reschedule all jobs that have been in a running/accepted state for too long.
:param session: A SQLAlchemy session
:param simulate: Do not perform any changes, just log what would be done
"""

# we assume that anything that is stuck in running or scheduled for two weeks is probably stalled for some reason
fourteen_days_ago = datetime.now(UTC) - timedelta(days=14)
stalled_jobs = (
session.query(Job)
.filter(Job.status.in_((JobStatus.RUNNING, JobStatus.SCHEDULED)))
.filter(Job.time_assigned <= fourteen_days_ago)
.all()
)

for job in stalled_jobs:
log.info(f'Rescheduling stalled job: {job.uuid}')

# don't perform any action if we're just simulating
if simulate:
continue

# if we are here, it should be safe to reschedule the job
job.status = JobStatus.WAITING
job.result = JobResult.UNKNOWN
job.time_assigned = None
job.time_finished = None
job.latest_log_excerpt = 'Job has been rescheduled due to inactivity'
18 changes: 18 additions & 0 deletions src/lkadmin/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from sqlalchemy.orm import undefer

from laniakea.db import Job, JobResult, JobStatus, session_scope
from laniakea.ariadne import retry_stalled_jobs

from .utils import print_note

Expand Down Expand Up @@ -60,3 +61,20 @@ def retry(id):
job.latest_log_excerpt = None

print_note('Job {}/{}::{} was rescheduled.'.format(str(job.module), str(job.kind), str(job.uuid)))


@job.command()
@click.option(
'--simulate',
'simulate',
is_flag=True,
default=False,
help='Run simulation, just display what would be done instead of doing it.',
)
def retry_stalled(
simulate: bool = False,
):
"""Reschedule all jobs that have been in a running/accepted state for too long."""

with session_scope() as session:
retry_stalled_jobs(session, simulate)

0 comments on commit d1d6aca

Please sign in to comment.