Skip to content

Commit

Permalink
feat(opensearch): capture logs from Dask cluster pods (#616)
Browse files Browse the repository at this point in the history
This commit collects logs from Dask scheduler and workers and propagates
them to all REANA jobs that are using the same Dask cluster. This is not
ideal, since Dask logs can become thusly duplicated for different
workflow steps of the workflow, which could be confusing for the user.

However, when a user uses Dask to parallelise the workflow jobs, usually
the workflow steps are defined only within Dask, so this situation does
not occur. Hence we can afford doing this in usual real-life conditions.

Separating Dask scheduler and worker logs from regular Kubernetes job
logs would require a larger architectural change and is therefore
deferred to a future commit.

Closes #610
  • Loading branch information
Alputer authored and tiborsimko committed Jan 22, 2025
1 parent afd1400 commit 51fad95
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 14 deletions.
77 changes: 68 additions & 9 deletions reana_workflow_controller/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ def __init__(
os_client: OpenSearch | None = None,
job_index: str = "fluentbit-job_log",
workflow_index: str = "fluentbit-workflow_log",
dask_index: str = "fluentbit-dask_log",
max_rows: int = 5000,
log_key: str = "log",
order: str = "asc",
job_log_matcher: str = "kubernetes.labels.job-name.keyword",
workflow_log_matcher: str = "kubernetes.labels.reana-run-batch-workflow-uuid.keyword",
dask_log_matcher: str = "kubernetes.labels.dask.org/cluster-name.keyword",
timeout: int = 5,
) -> None:
"""
Expand All @@ -91,34 +93,57 @@ def __init__(
self.os_client = os_client
self.job_index = job_index
self.workflow_index = workflow_index
self.dask_index = dask_index
self.max_rows = max_rows
self.log_key = log_key
self.order = order
self.job_log_matcher = job_log_matcher
self.workflow_log_matcher = workflow_log_matcher
self.dask_log_matcher = dask_log_matcher
self.timeout = timeout

def fetch_logs(self, id: str, index: str, match: str) -> str | None:
def fetch_logs(
self, id: str, index: str, match: str = None, matches: dict | None = None
) -> str | None:
"""
Fetch logs of a specific job or workflow.
Fetch logs of a specific job, workflow or Dask cluster.
:param id: Job or workflow ID.
:param index: Index name for logs.
:param match: Matcher for logs.
:param match: Single matcher for logs (mutually exclusive with `matches`).
:param matches: Dictionary of field-to-value pairs for multiple match conditions.
:return: Job or workflow logs.
:return: Job, workflow or Dask cluster logs matching the conditions.
"""
query = {
"query": {"match": {match: id}},
"sort": [{"@timestamp": {"order": self.order}}],
}
if matches:
# Build a bool query with multiple conditions
query = {

Check warning on line 120 in reana_workflow_controller/opensearch.py

View check run for this annotation

Codecov / codecov/patch

reana_workflow_controller/opensearch.py#L120

Added line #L120 was not covered by tests
"query": {
"bool": {
"must": [
{"match": {field: value}}
for field, value in matches.items()
]
}
},
"sort": [{"@timestamp": {"order": self.order}}],
}
elif match:
# Build a simple single-match query
query = {
"query": {"match": {match: id}},
"sort": [{"@timestamp": {"order": self.order}}],
}
else:
logging.error("Either `match` or `matches` must be provided.")
return None

Check warning on line 139 in reana_workflow_controller/opensearch.py

View check run for this annotation

Codecov / codecov/patch

reana_workflow_controller/opensearch.py#L138-L139

Added lines #L138 - L139 were not covered by tests

try:
response = self.os_client.search(
index=index, body=query, size=self.max_rows, timeout=self.timeout
)
except Exception as e:
logging.error("Failed to fetch logs for {0}: {1}".format(id, e))
logging.error(f"Failed to fetch logs for {id}: {e}")
return None

return self._concat_rows(response["hits"]["hits"])
Expand Down Expand Up @@ -151,6 +176,40 @@ def fetch_workflow_logs(self, workflow_id: str) -> str | None:
self.workflow_log_matcher,
)

def fetch_dask_scheduler_logs(self, workflow_id: str) -> str | None:
"""
Fetch logs of the scheduler of a Dask cluster.
:param workflow_id: Workflow ID.
:return: Dask cluster scheduler logs.
"""
return self.fetch_logs(

Check warning on line 187 in reana_workflow_controller/opensearch.py

View check run for this annotation

Codecov / codecov/patch

reana_workflow_controller/opensearch.py#L187

Added line #L187 was not covered by tests
id=None,
index=self.dask_index,
matches={
self.dask_log_matcher: f"reana-run-dask-{workflow_id}",
"kubernetes.labels.dask.org/component": "scheduler",
},
)

def fetch_dask_worker_logs(self, workflow_id: str) -> str | None:
"""
Fetch logs of the workers of a Dask cluster.
:param workflow_id: Workflow ID.
:return: Dask cluster worker logs.
"""
return self.fetch_logs(

Check warning on line 204 in reana_workflow_controller/opensearch.py

View check run for this annotation

Codecov / codecov/patch

reana_workflow_controller/opensearch.py#L204

Added line #L204 was not covered by tests
id=None,
index=self.dask_index,
matches={
self.dask_log_matcher: f"reana-run-dask-{workflow_id}",
"kubernetes.labels.dask.org/component": "worker",
},
)

def _concat_rows(self, rows: list) -> str | None:
"""
Concatenate log messages from rows.
Expand Down
53 changes: 48 additions & 5 deletions reana_workflow_controller/rest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@
from werkzeug.exceptions import BadRequest, NotFound

from reana_workflow_controller.config import (
DASK_ENABLED,
PROGRESS_STATUSES,
REANA_GITLAB_HOST,
PREVIEWABLE_MIME_TYPE_PREFIXES,
)
from reana_workflow_controller.dask import requires_dask
from reana_workflow_controller.consumer import _update_workflow_status
from reana_workflow_controller.errors import (
REANAExternalCallError,
Expand Down Expand Up @@ -183,11 +185,52 @@ def build_workflow_logs(workflow, steps=None, paginate=None):

open_search_log_fetcher = build_opensearch_log_fetcher()

logs = (
open_search_log_fetcher.fetch_job_logs(job.backend_job_id)
if open_search_log_fetcher
else None
)
logs = None

if DASK_ENABLED and requires_dask(workflow):
logs = (

Check warning on line 191 in reana_workflow_controller/rest/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_workflow_controller/rest/utils.py#L191

Added line #L191 was not covered by tests
(open_search_log_fetcher.fetch_job_logs(job.backend_job_id) or "")
+ """
-------------------------------------------------------------------
-------------------------------------------------------------------
---------------- DASK SCHEDULER LOGS ----------------
-------------------------------------------------------------------
-------------------------------------------------------------------
"""
+ (
open_search_log_fetcher.fetch_dask_scheduler_logs(job.workflow_uuid)
or ""
)
+ """
-------------------------------------------------------------------
-------------------------------------------------------------------
---------------- DASK WORKER LOGS ----------------
-------------------------------------------------------------------
-------------------------------------------------------------------
"""
+ (
open_search_log_fetcher.fetch_dask_worker_logs(job.workflow_uuid)
or ""
)
if open_search_log_fetcher
else None
)

else:
logs = (
open_search_log_fetcher.fetch_job_logs(job.backend_job_id)
if open_search_log_fetcher
else None
)

item = {
"workflow_uuid": str(job.workflow_uuid) or "",
Expand Down

0 comments on commit 51fad95

Please sign in to comment.