Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throw an error depending on the expiration of TemporalConfigStorage #1513

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions digdag-spi/src/main/java/io/digdag/spi/Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,14 @@ default Optional<DirectUploadHandle> getDirectUploadHandle(String key)
{
return Optional.absent();
}

default Optional<Long> getDirectDownloadExpiration()
{
return Optional.absent();
}

default Optional<Long> getDirectUploadExpiration()
{
return Optional.absent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ CommandStatus runOnKubernetes(final CommandContext context,
nextStatus.set("cluster_name", FACTORY.textNode(client.getConfig().getName()));
nextStatus.set("pod_name", FACTORY.textNode(pod.getName()));
nextStatus.set("pod_creation_timestamp", FACTORY.numberNode(pod.getCreationTimestamp()));
nextStatus.set("in_temporal_config_storage_expiration", FACTORY.numberNode(inConfigStorage.getDirectDownloadExpiration().get()));
nextStatus.set("out_temporal_config_storage_expiration", FACTORY.numberNode(outConfigStorage.getDirectUploadExpiration().get()));
nextStatus.set("io_directory", FACTORY.textNode(ioDirectoryPath.toString()));
nextStatus.set("executor_state", FACTORY.objectNode());
return createCommandStatus(pod, false, nextStatus);
Expand Down Expand Up @@ -256,6 +258,21 @@ CommandStatus getCommandStatusFromKubernetes(final CommandContext context,
log(logMessage, clog);
nextExecutorState.set("log_offset", FACTORY.numberNode(offset + logMessage.length())); // update log_offset
}
else if(isLaunchingLongerThanInConfigStorageExpiration(previousStatusJson)) {
// Throw error because launching pod time is longer than inTemporalConfigStorage expires.
TaskRequest request = context.getTaskRequest();
long attemptId = request.getAttemptId();
long taskId = request.getTaskId();

final String message = s("Pod launch timeout: attempt=%d, task=%d", attemptId, taskId);
logger.warn(message);

logger.info(s("Delete pod %d", pod.getName()));
client.deletePod(pod.getName());

// Throw exception to stop the task as failure
throw new TaskExecutionException(message);
}
else { // 'waiting'
// Write pod status to the command logger to avoid users confusing. For example, the container
// waits starting if it will take long time to download container images.
Expand All @@ -276,7 +293,7 @@ CommandStatus getCommandStatusFromKubernetes(final CommandContext context,
final InputStream in = outConfigStorage.getContentInputStream(outputArchiveKey);
ProjectArchives.extractTarArchive(context.getLocalProjectPath(), in); // runtime exception
}
else if (defaultPodTTL.isPresent() && isRunningLongerThanTTL(previousStatusJson)) {
else if (isRunningLongerThanOutConfigStorageExpiration(previousStatusJson) || (defaultPodTTL.isPresent() && isRunningLongerThanTTL(previousStatusJson))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you are using a non-blocking operator such as py>. In this case, IIRC, (defaultPodTTL.isPresent() && isRunningLongerThanTTL(previousStatusJson)) will never become true. In addtion, this block will not execute when user/WorkflowExecutionTimeoutEnforcer cancel-requested to the attempt. Is it OK/acceptable for you?
We also had the same kind of issue. Thus we introduce the cleanup function to the Operator and CommnadExecutor interfaces and implements them in PyOperatorFactory and EcsCommandExecutor. Please see #1480, and this function helps to perform the cleanup process if the attempt received cancel-requested. I feel it is a good idea to handle it by using the same way to avoid such an issue.
How about adding it to KubernetesCommandExecutor as well? How do you think about that?

TaskRequest request = context.getTaskRequest();
long attemptId = request.getAttemptId();
long taskId = request.getTaskId();
Expand Down Expand Up @@ -307,6 +324,22 @@ protected List<String> setArgumentsAfterScriptCommandLine()
return ImmutableList.of();
}

private boolean isLaunchingLongerThanInConfigStorageExpiration(final ObjectNode previousStatusJson)
{
long creationTimestamp = previousStatusJson.get("pod_creation_timestamp").asLong();
long inTemporalConfigStorageExpiration = previousStatusJson.get("in_temporal_config_storage_expiration").asLong();
long currentTimestamp = Instant.now().getEpochSecond();
return currentTimestamp > creationTimestamp + inTemporalConfigStorageExpiration;
}

private boolean isRunningLongerThanOutConfigStorageExpiration(final ObjectNode previousStatusJson)
{
long creationTimestamp = previousStatusJson.get("pod_creation_timestamp").asLong();
long outTemporalConfigStorageExpiration = previousStatusJson.get("out_temporal_config_storage_expiration").asLong();
long currentTimestamp = Instant.now().getEpochSecond();
return currentTimestamp > creationTimestamp + outTemporalConfigStorageExpiration;
}

private boolean isRunningLongerThanTTL(final ObjectNode previousStatusJson)
{
long creationTimestamp = previousStatusJson.get("pod_creation_timestamp").asLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import com.google.common.base.Optional;

public class TemporalConfigStorage
{
Expand Down Expand Up @@ -75,4 +76,14 @@ public InputStream getContentInputStream(final String key)
throw Throwables.propagate(e);
}
}

public Optional<Long> getDirectDownloadExpiration()
{
return storage.getDirectDownloadExpiration();
}

public Optional<Long> getDirectUploadExpiration()
{
return storage.getDirectUploadExpiration();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void list(String objectPrefix, FileListing callback)
@Override
public Optional<DirectDownloadHandle> getDirectDownloadHandle(String object)
{
final long secondsToExpire = config.get("direct_download_expiration", Long.class, 10L*60);
final long secondsToExpire = getDirectDownloadExpiration().get();

BlobInfo blobInfo = BlobInfo.newBuilder(bucket, object).build();
URL signedUrl = this.storage.signUrl(blobInfo, secondsToExpire, TimeUnit.SECONDS, Storage.SignUrlOption.httpMethod(HttpMethod.GET), Storage.SignUrlOption.withV4Signature());
Expand All @@ -156,7 +156,7 @@ public Optional<DirectDownloadHandle> getDirectDownloadHandle(String object)
@Override
public Optional<DirectUploadHandle> getDirectUploadHandle(String object)
{
final long secondsToExpire = config.get("direct_upload_expiration", Long.class, 10L*60);
final long secondsToExpire = getDirectUploadExpiration().get();

BlobInfo blobInfo = BlobInfo.newBuilder(bucket, object).build();
URL signedUrl = this.storage.signUrl(blobInfo, secondsToExpire, TimeUnit.SECONDS, Storage.SignUrlOption.httpMethod(HttpMethod.PUT), Storage.SignUrlOption.withV4Signature());
Expand All @@ -165,6 +165,18 @@ public Optional<DirectUploadHandle> getDirectUploadHandle(String object)
return Optional.of(DirectUploadHandle.of(url));
}

@Override
public Optional<Long> getDirectDownloadExpiration()
{
return Optional.of(config.get("direct_download_expiration", Long.class, 10L*60));
}

@Override
public Optional<Long> getDirectUploadExpiration()
{
return Optional.of(config.get("direct_upload_expiration", Long.class, 10L*60));
}

private <T> T getWithRetry(String message, Callable<T> callable)
throws StorageException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void list(String keyPrefix, FileListing callback)
@Override
public Optional<DirectDownloadHandle> getDirectDownloadHandle(String key)
{
final long secondsToExpire = config.get("direct_download_expiration", Long.class, 10L*60);
final long secondsToExpire = getDirectDownloadExpiration().get();

GeneratePresignedUrlRequest req = new GeneratePresignedUrlRequest(bucket, key);
req.setExpiration(Date.from(Instant.now().plusSeconds(secondsToExpire)));
Expand All @@ -213,7 +213,8 @@ public Optional<DirectDownloadHandle> getDirectDownloadHandle(String key)
@Override
public Optional<DirectUploadHandle> getDirectUploadHandle(String key)
{
final long secondsToExpire = config.get("direct_upload_expiration", Long.class, 10L*60);
final long secondsToExpire = getDirectUploadExpiration().get();


GeneratePresignedUrlRequest req = new GeneratePresignedUrlRequest(bucket, key);
req.setMethod(HttpMethod.PUT);
Expand All @@ -224,6 +225,18 @@ public Optional<DirectUploadHandle> getDirectUploadHandle(String key)
return Optional.of(DirectUploadHandle.of(url));
}

@Override
public Optional<Long> getDirectDownloadExpiration()
{
return Optional.of(config.get("direct_download_expiration", Long.class, 10L*60));
}

@Override
public Optional<Long> getDirectUploadExpiration()
{
return Optional.of(config.get("direct_upload_expiration", Long.class, 10L*60));
}

private <T> T getWithRetry(String message, Callable<T> callable)
throws StorageFileNotFoundException
{
Expand Down