From a51481073564b931b261b796cc55bbf1299998c2 Mon Sep 17 00:00:00 2001 From: TrsNium Date: Wed, 23 Dec 2020 14:35:32 +0900 Subject: [PATCH] Throw an error depending on the expiration date of TemporalConfigStorage --- .../src/main/java/io/digdag/spi/Storage.java | 4 +++ .../command/KubernetesCommandExecutor.java | 35 ++++++++++++++++++- .../kubernetes/TemporalConfigStorage.java | 10 ++++++ .../io/digdag/storage/gcs/GCSStorage.java | 16 +++++++-- .../java/io/digdag/storage/s3/S3Storage.java | 17 +++++++-- 5 files changed, 77 insertions(+), 5 deletions(-) diff --git a/digdag-spi/src/main/java/io/digdag/spi/Storage.java b/digdag-spi/src/main/java/io/digdag/spi/Storage.java index 40d0213842..6d7b0c87e4 100644 --- a/digdag-spi/src/main/java/io/digdag/spi/Storage.java +++ b/digdag-spi/src/main/java/io/digdag/spi/Storage.java @@ -35,4 +35,8 @@ default Optional getDirectUploadHandle(String key) { return Optional.absent(); } + + Long getDirectDownloadExpiration(); + + Long getDirectUploadExpiration(); } diff --git a/digdag-standards/src/main/java/io/digdag/standards/command/KubernetesCommandExecutor.java b/digdag-standards/src/main/java/io/digdag/standards/command/KubernetesCommandExecutor.java index 872927484c..7409c3aedb 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/command/KubernetesCommandExecutor.java +++ b/digdag-standards/src/main/java/io/digdag/standards/command/KubernetesCommandExecutor.java @@ -225,6 +225,8 @@ CommandStatus runOnKubernetes(final CommandContext context, nextStatus.set("cluster_name", FACTORY.textNode(client.getConfig().getName())); nextStatus.set("pod_name", FACTORY.textNode(pod.getName())); nextStatus.set("pod_creation_timestamp", FACTORY.numberNode(pod.getCreationTimestamp())); + nextStatus.set("in_temporal_config_storage_expiration", FACTORY.numberNode(inConfigStorage.getDirectDownloadExpiration())); + nextStatus.set("out_temporal_config_storage_expiration", FACTORY.numberNode(outConfigStorage.getDirectUploadExpiration())); nextStatus.set("io_directory", FACTORY.textNode(ioDirectoryPath.toString())); nextStatus.set("executor_state", FACTORY.objectNode()); return createCommandStatus(pod, false, nextStatus); @@ -256,6 +258,21 @@ CommandStatus getCommandStatusFromKubernetes(final CommandContext context, log(logMessage, clog); nextExecutorState.set("log_offset", FACTORY.numberNode(offset + logMessage.length())); // update log_offset } + else if(isLaunchingLongerThanInConfigStorageExpiration(previousStatusJson)){ + // Throw error because launching pod time is longer than inTemporalConfigStorage expires. + TaskRequest request = context.getTaskRequest(); + long attemptId = request.getAttemptId(); + long taskId = request.getTaskId(); + + final String message = s("Pod launch timeout: attempt=%d, task=%d", attemptId, taskId); + logger.warn(message); + + logger.info(s("Delete pod %d", pod.getName())); + client.deletePod(pod.getName()); + + // Throw exception to stop the task as failure + throw new TaskExecutionException(message); + } else { // 'waiting' // Write pod status to the command logger to avoid users confusing. For example, the container // waits starting if it will take long time to download container images. @@ -276,7 +293,7 @@ CommandStatus getCommandStatusFromKubernetes(final CommandContext context, final InputStream in = outConfigStorage.getContentInputStream(outputArchiveKey); ProjectArchives.extractTarArchive(context.getLocalProjectPath(), in); // runtime exception } - else if (defaultPodTTL.isPresent() && isRunningLongerThanTTL(previousStatusJson)) { + else if (isRunningLongerThanOutConfigStorageExpiration(previousStatusJson) || (defaultPodTTL.isPresent() && isRunningLongerThanTTL(previousStatusJson))) { TaskRequest request = context.getTaskRequest(); long attemptId = request.getAttemptId(); long taskId = request.getTaskId(); @@ -307,6 +324,22 @@ protected List setArgumentsAfterScriptCommandLine() return ImmutableList.of(); } + private boolean isLaunchingLongerThanInConfigStorageExpiration(final ObjectNode previousStatusJson) + { + long creationTimestamp = previousStatusJson.get("pod_creation_timestamp").asLong(); + long inTemporalConfigStorageExpiration = previousStatusJson.get("in_temporal_config_storage_expiration").asLong(); + long currentTimestamp = Instant.now().getEpochSecond(); + return currentTimestamp > creationTimestamp + inTemporalConfigStorageExpiration; + } + + private boolean isRunningLongerThanOutConfigStorageExpiration(final ObjectNode previousStatusJson) + { + long creationTimestamp = previousStatusJson.get("pod_creation_timestamp").asLong(); + long outTemporalConfigStorageExpiration = previousStatusJson.get("out_temporal_config_storage_expiration").asLong(); + long currentTimestamp = Instant.now().getEpochSecond(); + return currentTimestamp > creationTimestamp + outTemporalConfigStorageExpiration; + } + private boolean isRunningLongerThanTTL(final ObjectNode previousStatusJson) { long creationTimestamp = previousStatusJson.get("pod_creation_timestamp").asLong(); diff --git a/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/TemporalConfigStorage.java b/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/TemporalConfigStorage.java index 5e5c6e612f..bf82538650 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/TemporalConfigStorage.java +++ b/digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/TemporalConfigStorage.java @@ -75,4 +75,14 @@ public InputStream getContentInputStream(final String key) throw Throwables.propagate(e); } } + + public Long getDirectDownloadExpiration() + { + return storage.getDirectDownloadExpiration(); + } + + public Long getDirectUploadExpiration() + { + return storage.getDirectUploadExpiration(); + } } diff --git a/digdag-storage-gcs/src/main/java/io/digdag/storage/gcs/GCSStorage.java b/digdag-storage-gcs/src/main/java/io/digdag/storage/gcs/GCSStorage.java index e772fb644c..03c2fa1de2 100644 --- a/digdag-storage-gcs/src/main/java/io/digdag/storage/gcs/GCSStorage.java +++ b/digdag-storage-gcs/src/main/java/io/digdag/storage/gcs/GCSStorage.java @@ -144,7 +144,7 @@ public void list(String objectPrefix, FileListing callback) @Override public Optional getDirectDownloadHandle(String object) { - final long secondsToExpire = config.get("direct_download_expiration", Long.class, 10L*60); + final long secondsToExpire = getDirectDownloadExpiration(); BlobInfo blobInfo = BlobInfo.newBuilder(bucket, object).build(); URL signedUrl = this.storage.signUrl(blobInfo, secondsToExpire, TimeUnit.SECONDS, Storage.SignUrlOption.httpMethod(HttpMethod.GET), Storage.SignUrlOption.withV4Signature()); @@ -156,7 +156,7 @@ public Optional getDirectDownloadHandle(String object) @Override public Optional getDirectUploadHandle(String object) { - final long secondsToExpire = config.get("direct_upload_expiration", Long.class, 10L*60); + final long secondsToExpire = getDirectUploadExpiration(); BlobInfo blobInfo = BlobInfo.newBuilder(bucket, object).build(); URL signedUrl = this.storage.signUrl(blobInfo, secondsToExpire, TimeUnit.SECONDS, Storage.SignUrlOption.httpMethod(HttpMethod.PUT), Storage.SignUrlOption.withV4Signature()); @@ -165,6 +165,18 @@ public Optional getDirectUploadHandle(String object) return Optional.of(DirectUploadHandle.of(url)); } + @Override + public Long getDirectDownloadExpiration() + { + return config.get("direct_download_expiration", Long.class, 10L*60); + } + + @Override + public Long getDirectUploadExpiration() + { + return config.get("direct_upload_expiration", Long.class, 10L*60); + } + private T getWithRetry(String message, Callable callable) throws StorageException { diff --git a/digdag-storage-s3/src/main/java/io/digdag/storage/s3/S3Storage.java b/digdag-storage-s3/src/main/java/io/digdag/storage/s3/S3Storage.java index 5a364062c5..2b721a00f5 100644 --- a/digdag-storage-s3/src/main/java/io/digdag/storage/s3/S3Storage.java +++ b/digdag-storage-s3/src/main/java/io/digdag/storage/s3/S3Storage.java @@ -200,7 +200,7 @@ public void list(String keyPrefix, FileListing callback) @Override public Optional getDirectDownloadHandle(String key) { - final long secondsToExpire = config.get("direct_download_expiration", Long.class, 10L*60); + final long secondsToExpire = getDirectDownloadExpiration(); GeneratePresignedUrlRequest req = new GeneratePresignedUrlRequest(bucket, key); req.setExpiration(Date.from(Instant.now().plusSeconds(secondsToExpire))); @@ -213,7 +213,8 @@ public Optional getDirectDownloadHandle(String key) @Override public Optional getDirectUploadHandle(String key) { - final long secondsToExpire = config.get("direct_upload_expiration", Long.class, 10L*60); + final long secondsToExpire = getDirectUploadExpiration(); + GeneratePresignedUrlRequest req = new GeneratePresignedUrlRequest(bucket, key); req.setMethod(HttpMethod.PUT); @@ -224,6 +225,18 @@ public Optional getDirectUploadHandle(String key) return Optional.of(DirectUploadHandle.of(url)); } + @Override + public Long getDirectDownloadExpiration() + { + return config.get("direct_download_expiration", Long.class, 10L*60); + } + + @Override + public Long getDirectUploadExpiration() + { + return config.get("direct_upload_expiration", Long.class, 10L*60); + } + private T getWithRetry(String message, Callable callable) throws StorageFileNotFoundException {