Skip to content

Commit

Permalink
[major] DCS: Fetch (encrypted) object size from S3 incoming event (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
mephenor authored Jan 17, 2025
1 parent 3d03bef commit fec8533
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 37 deletions.
6 changes: 3 additions & 3 deletions services/dcs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,21 @@ We recommend using the provided Docker container.

A pre-build version is available at [docker hub](https://hub.docker.com/repository/docker/ghga/download-controller-service):
```bash
docker pull ghga/download-controller-service:3.0.0
docker pull ghga/download-controller-service:4.0.0
```

Or you can build the container yourself from the [`./Dockerfile`](./Dockerfile):
```bash
# Execute in the repo's root dir:
docker build -t ghga/download-controller-service:3.0.0 .
docker build -t ghga/download-controller-service:4.0.0 .
```

For production-ready deployment, we recommend using Kubernetes, however,
for simple use cases, you could execute the service using docker
on a single server:
```bash
# The entrypoint is preconfigured:
docker run -p 8080:8080 ghga/download-controller-service:3.0.0 --help
docker run -p 8080:8080 ghga/download-controller-service:4.0.0 --help
```

If you prefer not to use containers, you may install the service from source:
Expand Down
2 changes: 1 addition & 1 deletion services/dcs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ info:
\ Object Storage. \n\nThis is an implementation of the DRS standard from the Global\
\ Alliance for Genomics and Health, please find more information at: https://github.com/ga4gh/data-repository-service-schemas"
title: Download Controller Service
version: 3.0.0
version: 4.0.0
openapi: 3.1.0
paths:
/health:
Expand Down
2 changes: 1 addition & 1 deletion services/dcs/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ classifiers = [
"Intended Audience :: Developers",
]
name = "dcs"
version = "3.0.0"
version = "4.0.0"
description = "Download Controller Service - a GA4GH DRS-compliant service for delivering files from S3 encrypted according to the GA4GH Crypt4GH standard."


Expand Down
1 change: 1 addition & 0 deletions services/dcs/src/dcs/adapters/inbound/event_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ async def _consume_files_to_register(self, *, payload: JsonObject) -> None:
decryption_secret_id=validated_payload.decryption_secret_id,
decrypted_sha256=validated_payload.decrypted_sha256,
decrypted_size=validated_payload.decrypted_size,
encrypted_size=validated_payload.encrypted_size,
creation_date=validated_payload.upload_date,
s3_endpoint_alias=validated_payload.s3_endpoint_alias,
)
Expand Down
66 changes: 34 additions & 32 deletions services/dcs/src/dcs/core/data_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import re
import uuid
from datetime import timedelta
from time import perf_counter

from ghga_event_schemas import pydantic_ as event_schemas
from ghga_service_commons.utils import utc_dates
Expand Down Expand Up @@ -171,7 +172,10 @@ async def access_drs_object(self, *, drs_id: str) -> models.DrsObjectResponseMod
"""
# make sure that metadata for the DRS object exists in the database:
try:
started = perf_counter()
drs_object_with_access_time = await self._drs_object_dao.get_by_id(drs_id)
stopped = perf_counter() - started
log.debug("Fetched DRS object model in %.3f seconds.", stopped)
except ResourceNotFoundError as error:
drs_object_not_found = self.DrsObjectNotFoundError(drs_id=drs_id)
log.error(drs_object_not_found)
Expand All @@ -180,11 +184,7 @@ async def access_drs_object(self, *, drs_id: str) -> models.DrsObjectResponseMod
drs_object = models.DrsObject(
**drs_object_with_access_time.model_dump(exclude={"last_accessed"})
)

drs_object_with_uri = self._get_model_with_self_uri(drs_object=drs_object)

storage_alias = drs_object.s3_endpoint_alias

try:
bucket_id, object_storage = self._object_storages.for_alias(storage_alias)
except KeyError as exc:
Expand All @@ -194,11 +194,17 @@ async def access_drs_object(self, *, drs_id: str) -> models.DrsObjectResponseMod
log.critical(storage_alias_not_configured)
raise storage_alias_not_configured from exc

# check if the file corresponding to the DRS object is already in the outbox:
if not await object_storage.does_object_exist(
bucket_id=bucket_id, object_id=drs_object.object_id
):
log.info(f"File not in outbox for '{drs_id}'. Request staging...")
try:
started = perf_counter()
drs_object_with_access = await self._get_access_model(
drs_object=drs_object,
object_storage=object_storage,
bucket_id=bucket_id,
)
stopped = perf_counter() - started
log.debug("Fetched new presigned URL in %.3f seconds.", stopped)
except object_storage.ObjectNotFoundError as exc:
log.info("File not in outbox for '%s'. Request staging...", drs_id)

# publish an outbox event to request a stage of the corresponding file:
unstaged_file_dto = event_schemas.NonStagedFileRequested(
Expand All @@ -209,7 +215,10 @@ async def access_drs_object(self, *, drs_id: str) -> models.DrsObjectResponseMod
decrypted_sha256=drs_object.decrypted_sha256,
)

started = perf_counter()
await self._nonstaged_file_requested_dao.upsert(unstaged_file_dto)
stopped = perf_counter() - started
log.debug("Upserted outbox DAO in %.3f seconds.", stopped)

# calculate the required time in seconds based on the decrypted file size
# (actually the encrypted file is staged, but this is an estimate anyway)
Expand All @@ -219,31 +228,30 @@ async def access_drs_object(self, *, drs_id: str) -> models.DrsObjectResponseMod
retry_after = max(retry_after, config.retry_after_min)
retry_after = min(retry_after, config.retry_after_max)
# instruct to retry later:
raise self.RetryAccessLaterError(retry_after=retry_after)
raise self.RetryAccessLaterError(retry_after=retry_after) from exc

# Successfully staged, update access information now
log.debug(f"Updating access time of for '{drs_id}'.")
log.debug("Updating access time of for '%s'.", drs_id)
drs_object_with_access_time.last_accessed = utc_dates.now_as_utc()
started = perf_counter()
await self._drs_object_dao.update(drs_object_with_access_time)

drs_object_with_access = await self._get_access_model(
drs_object=drs_object,
object_storage=object_storage,
bucket_id=bucket_id,
)
stopped = perf_counter() - started
log.debug("Updated last access time in %.3f seconds.", stopped)

# publish an event indicating the served download:
drs_object_with_uri = self._get_model_with_self_uri(drs_object=drs_object)
started = perf_counter()
await self._event_publisher.download_served(
drs_object=drs_object_with_uri,
target_bucket_id=bucket_id,
)
log.info(f"Sent download served event for '{drs_id}'.")

# CLI needs to have the encrypted size to correctly download all file parts
encrypted_size = await object_storage.get_object_size(
bucket_id=bucket_id, object_id=drs_object.object_id
stopped = perf_counter() - started
log.info(
"Sent download served event for '%s' in %.3f seconds.", drs_id, stopped
)
return drs_object_with_access.convert_to_drs_response_model(
size=drs_object.encrypted_size
)
return drs_object_with_access.convert_to_drs_response_model(size=encrypted_size)

async def cleanup_outbox_buckets(
self, *, object_storages_config: S3ObjectStoragesConfig
Expand All @@ -262,9 +270,7 @@ async def cleanup_outbox(self, *, storage_alias: str):
"""
# Run on demand through CLI, so crashing should be ok if the alias is not configured
log.info(
f"Starting outbox cleanup for storage identified by alias '{
storage_alias
}'."
f"Starting outbox cleanup for storage identified by alias '{storage_alias}'."
)
try:
bucket_id, object_storage = self._object_storages.for_alias(storage_alias)
Expand Down Expand Up @@ -321,9 +327,7 @@ async def register_new_file(self, *, file: models.DrsObjectBase):
with contextlib.suppress(ResourceNotFoundError):
await self._drs_object_dao.get_by_id(file.file_id)
log.error(
f"Could not register file with id '{
file.file_id
}' as an entry already exists for this id."
f"Could not register file with id '{file.file_id}' as an entry already exists for this id."
)
return

Expand Down Expand Up @@ -426,9 +430,7 @@ async def delete_file(self, *, file_id: str) -> None:
bucket_id=bucket_id, object_id=drs_object.object_id
)
log.debug(
f"Successfully deleted file object for '{
file_id
}' from object storage identified by '{alias}'."
f"Successfully deleted file object for '{file_id}' from object storage identified by '{alias}'."
)

# Remove file from database and send success event
Expand Down
1 change: 1 addition & 0 deletions services/dcs/src/dcs/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class DrsObjectBase(BaseModel):
decryption_secret_id: str
decrypted_sha256: str
decrypted_size: int
encrypted_size: int
creation_date: str
s3_endpoint_alias: str

Expand Down
1 change: 1 addition & 0 deletions services/dcs/tests_dcs/fixtures/joint.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
creation_date=utc_dates.now_as_utc().isoformat(),
decrypted_size=12345,
decryption_secret_id="some-secret",
encrypted_size=23456,
s3_endpoint_alias=STORAGE_ALIAS,
last_accessed=utc_dates.now_as_utc(),
)
Expand Down
1 change: 1 addition & 0 deletions services/dcs/tests_dcs/test_edge_cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ async def test_register_file_twice(populated_fixture: PopulatedFixture, caplog):
decrypted_sha256=example_file.decrypted_sha256,
decrypted_size=example_file.decrypted_size,
creation_date=now_as_utc().isoformat(),
encrypted_size=example_file.encrypted_size,
s3_endpoint_alias=example_file.s3_endpoint_alias,
)

Expand Down

0 comments on commit fec8533

Please sign in to comment.