Skip to content

Commit

Permalink
feat: task based archiving
Browse files Browse the repository at this point in the history
- introduce a task-based archiver
- the new archiver is currently part of the persistence service
- updates to reading logic to accomodate proper zip entry naming
- currently we rely on gap between latest live written and what will be zipped for proper operation due to lack of certain infrastructure

Signed-off-by: Atanas Atanasov <[email protected]>
  • Loading branch information
ata-nas committed Feb 19, 2025
1 parent 5845f60 commit de7fdea
Show file tree
Hide file tree
Showing 17 changed files with 342 additions and 366 deletions.
1 change: 1 addition & 0 deletions server/docker/logging.properties
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ io.helidon.common.level=INFO
#com.hedera.block.server.pbj.PbjBlockStreamServiceProxy.level=FINE
#com.hedera.block.server.persistence.StreamPersistenceHandlerImpl.level=FINE
#com.hedera.block.server.persistence.storage.write.level=FINE
com.hedera.block.server.persistence.storage.archive.AsyncBlockAsLocalFileArchiver.level=FINE

# Helidon PBJ Plugin loggers
#com.hedera.pbj.grpc.helidon.PbjProtocolHandler.level=FINE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig.CompressionType;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig.StorageType;
import com.hedera.block.server.persistence.storage.archive.AsyncBlockAsLocalFileFactory;
import com.hedera.block.server.persistence.storage.archive.AsyncLocalBlockArchiverFactory;
import com.hedera.block.server.persistence.storage.archive.BlockAsLocalFileArchiver;
import com.hedera.block.server.persistence.storage.archive.LocalBlockArchiver;
import com.hedera.block.server.persistence.storage.compression.Compression;
import com.hedera.block.server.persistence.storage.compression.NoOpCompression;
import com.hedera.block.server.persistence.storage.compression.ZstdCompression;
Expand Down Expand Up @@ -141,6 +145,30 @@ static Compression providesCompression(@NonNull final PersistenceStorageConfig c
};
}

@Provides
@Singleton
static AsyncLocalBlockArchiverFactory providesAsyncLocalBlockArchiverFactory(
@NonNull final PersistenceStorageConfig config, @NonNull final BlockPathResolver blockPathResolver) {
final StorageType persistenceType = config.type();
return switch (persistenceType) {
case BLOCK_AS_LOCAL_FILE -> new AsyncBlockAsLocalFileFactory(config, blockPathResolver);
case NO_OP -> throw new UnsupportedOperationException(); // todo implement
};
}

@Provides
@Singleton
static LocalBlockArchiver providesLocalBlockArchiver(
@NonNull final PersistenceStorageConfig config,
@NonNull final AsyncLocalBlockArchiverFactory asyncLocalBlockArchiverFactory) {
final StorageType persistenceType = config.type();
return switch (persistenceType) {
case BLOCK_AS_LOCAL_FILE -> new BlockAsLocalFileArchiver(
config, asyncLocalBlockArchiverFactory, Executors.newFixedThreadPool(10));
case NO_OP -> throw new UnsupportedOperationException();
};
}

/**
* Provides a block node event handler singleton (stream persistence handler)
* @param subscriptionHandler the subscription handler
Expand All @@ -159,14 +187,18 @@ static BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponseUnparsed>> provi
@NonNull final BlockNodeContext blockNodeContext,
@NonNull final ServiceStatus serviceStatus,
@NonNull final AckHandler ackHandler,
@NonNull final AsyncBlockWriterFactory asyncBlockWriterFactory) {
@NonNull final AsyncBlockWriterFactory asyncBlockWriterFactory,
@NonNull final PersistenceStorageConfig persistenceStorageConfig,
@NonNull final LocalBlockArchiver localBlockArchiver) {
return new StreamPersistenceHandlerImpl(
subscriptionHandler,
notifier,
blockNodeContext,
serviceStatus,
ackHandler,
asyncBlockWriterFactory,
localBlockArchiver,
persistenceStorageConfig,
Executors.newFixedThreadPool(5));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.hedera.block.server.mediator.SubscriptionHandler;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.block.server.notifier.Notifier;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.hedera.block.server.persistence.storage.archive.LocalBlockArchiver;
import com.hedera.block.server.persistence.storage.write.AsyncBlockWriter;
import com.hedera.block.server.persistence.storage.write.AsyncBlockWriterFactory;
import com.hedera.block.server.persistence.storage.write.BlockPersistenceResult;
Expand Down Expand Up @@ -57,8 +59,29 @@ public class StreamPersistenceHandlerImpl
private final AckHandler ackHandler;
private final AsyncBlockWriterFactory asyncBlockWriterFactory;
private final CompletionService<Void> completionService;
private final LocalBlockArchiver archiver;
private final int archiveGroupSize;
private TransferQueue<BlockItemUnparsed> currentWriterQueue;

public StreamPersistenceHandlerImpl(
@NonNull final SubscriptionHandler<SubscribeStreamResponseUnparsed> subscriptionHandler,
@NonNull final Notifier notifier,
@NonNull final BlockNodeContext blockNodeContext,
@NonNull final ServiceStatus serviceStatus,
@NonNull final AckHandler ackHandler,
@NonNull final AsyncBlockWriterFactory asyncBlockWriterFactory,
@NonNull final Executor executor) {
this.subscriptionHandler = Objects.requireNonNull(subscriptionHandler);
this.notifier = Objects.requireNonNull(notifier);
this.metricsService = blockNodeContext.metricsService();
this.serviceStatus = Objects.requireNonNull(serviceStatus);
this.ackHandler = Objects.requireNonNull(ackHandler);
this.asyncBlockWriterFactory = Objects.requireNonNull(asyncBlockWriterFactory);
this.completionService = new ExecutorCompletionService<>(Objects.requireNonNull(executor));
this.archiver = null;
this.archiveGroupSize = 0;
}

/**
* Constructor.
*
Expand All @@ -78,13 +101,17 @@ public StreamPersistenceHandlerImpl(
@NonNull final ServiceStatus serviceStatus,
@NonNull final AckHandler ackHandler,
@NonNull final AsyncBlockWriterFactory asyncBlockWriterFactory,
@NonNull final LocalBlockArchiver archiver,
@NonNull final PersistenceStorageConfig persistenceStorageConfig,
@NonNull final Executor executor) {
this.subscriptionHandler = Objects.requireNonNull(subscriptionHandler);
this.notifier = Objects.requireNonNull(notifier);
this.metricsService = blockNodeContext.metricsService();
this.serviceStatus = Objects.requireNonNull(serviceStatus);
this.ackHandler = Objects.requireNonNull(ackHandler);
this.asyncBlockWriterFactory = Objects.requireNonNull(asyncBlockWriterFactory);
this.archiver = Objects.requireNonNull(archiver);
this.archiveGroupSize = persistenceStorageConfig.archiveBatchSize();
this.completionService = new ExecutorCompletionService<>(Objects.requireNonNull(executor));
}

Expand Down Expand Up @@ -166,6 +193,11 @@ private void handleBlockItems(final List<BlockItemUnparsed> blockItems)
final AsyncBlockWriter writer = asyncBlockWriterFactory.create(blockNumber);
currentWriterQueue = writer.getQueue();
completionService.submit(writer);
if (blockNumber % archiveGroupSize == 0) {
// threshold reached, archive blocks 1 order of magnitude
// lower than the threshold juxtaposed to the archive group size
archiver.signalThresholdPassed(blockNumber);
}
} else {
// we need to notify the ackHandler that the block number is invalid
// IMPORTANT: the currentWriterQueue MUST be null after we have
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.persistence.storage.archive;

import com.hedera.block.common.utils.FileUtilities;
import com.hedera.block.common.utils.Preconditions;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.hedera.block.server.persistence.storage.path.BlockPathResolver;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.File;
import java.io.IOException;
import java.lang.System.Logger.Level;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

/**
* TODO: add documentation
*/
public final class AsyncBlockAsLocalFileArchiver implements AsyncLocalBlockArchiver {
private static final System.Logger LOGGER = System.getLogger(AsyncBlockAsLocalFileArchiver.class.getName());
private final BlockPathResolver pathResolver;
private final long blockNumberThreshold;

AsyncBlockAsLocalFileArchiver(
final long blockNumberThreshold,
@NonNull final PersistenceStorageConfig config,
@NonNull final BlockPathResolver pathResolver) {
this.blockNumberThreshold = blockNumberThreshold;
this.pathResolver = Objects.requireNonNull(pathResolver);
Preconditions.requireWhole(blockNumberThreshold);
final int archiveGroupSize = config.archiveBatchSize();
if (blockNumberThreshold % archiveGroupSize != 0) { // @todo(517) require divisible exactly by
throw new IllegalArgumentException("Block number must be divisible by " + archiveGroupSize);
}
}

@Override
public void run() {
try {
doArchive();
} catch (final IOException e) {
// todo return a result instead of exception
throw new RuntimeException(e);
}
}

private void doArchive() throws IOException {
final long upperBound = blockNumberThreshold - 1;
final Path rootToArchive = pathResolver.resolveRawPathToArchiveParentUnderLive(upperBound);
LOGGER.log(Level.DEBUG, "Archiving Block Files under [%s]".formatted(rootToArchive));
final List<Path> pathsToArchive;
try (final Stream<Path> tree = Files.walk(rootToArchive)) {
pathsToArchive = tree.sorted(Comparator.reverseOrder())
.filter(Files::isRegularFile)
.toList();
}
if (!pathsToArchive.isEmpty()) {
final Path zipFilePath = archiveInZip(upperBound, pathsToArchive, rootToArchive);
createSymlink(rootToArchive, zipFilePath);
deleteLive(rootToArchive);
}
}

private Path archiveInZip(final long upperBound, final List<Path> pathsToArchive, final Path rootToArchive)
throws IOException {
final Path zipFilePath = pathResolver.resolveRawPathToArchiveParentUnderArchive(upperBound);
if (!Files.exists(zipFilePath)) {
// @todo(517) should we assume something if the zip file already exists? If yes, what and how to
// handle?
FileUtilities.createFile(zipFilePath);
}
LOGGER.log(Level.DEBUG, "Target Zip Path [%s]".formatted(zipFilePath));
try (final ZipOutputStream out = new ZipOutputStream(Files.newOutputStream(zipFilePath))) {
for (int i = 0; i < pathsToArchive.size(); i++) {
final Path pathToArchive = pathsToArchive.get(i);
final String relativizedEntryName =
rootToArchive.relativize(pathToArchive).toString();
final ZipEntry zipEntry = new ZipEntry(relativizedEntryName);
LOGGER.log(Level.TRACE, "Adding Zip Entry [%s] to zip file [%s]".formatted(zipEntry, zipFilePath));
out.putNextEntry(zipEntry);
Files.copy(pathToArchive, out);
out.closeEntry();
LOGGER.log(
Level.TRACE,
"Zip Entry [%s] successfully added to zip file [%s]".formatted(zipEntry, zipFilePath));
}
}
LOGGER.log(Level.DEBUG, "Zip File [%s] successfully created".formatted(zipFilePath));
return zipFilePath;
}

private static void createSymlink(final Path rootToArchive, final Path zipFilePath) throws IOException {
// we need to create a symlink to the zip file we just created so readers can find it
final Path liveSymlink = FileUtilities.appendExtension(rootToArchive, ".zip");
Files.createSymbolicLink(liveSymlink, zipFilePath);
LOGGER.log(Level.DEBUG, "Symlink [%s <-> %s] created".formatted(liveSymlink, zipFilePath));
}

private void deleteLive(final Path rootToArchive) throws IOException {
// we need to move the live dir that we just archived so readers will no longer be able
// to find it, hence they will fall back to search for the symlink we just made as well
// in the meantime, while readers get data from the symlink, we can safely delete the
// live dir
final Path movedToDelete = FileUtilities.appendExtension(rootToArchive, "del");
Files.move(rootToArchive, movedToDelete);
try (Stream<Path> paths = Files.walk(movedToDelete)) {
paths.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.persistence.storage.archive;

import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.hedera.block.server.persistence.storage.path.BlockPathResolver;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;

/**
* TODO: add documentation
*/
public class AsyncBlockAsLocalFileFactory implements AsyncLocalBlockArchiverFactory {
private final PersistenceStorageConfig config;
private final BlockPathResolver pathResolver;

public AsyncBlockAsLocalFileFactory(
@NonNull final PersistenceStorageConfig config, @NonNull final BlockPathResolver pathResolver) {
this.config = Objects.requireNonNull(config);
this.pathResolver = Objects.requireNonNull(pathResolver);
}

@Override
public AsyncLocalBlockArchiver create(final long blockNumber) {
return new AsyncBlockAsLocalFileArchiver(blockNumber, config, pathResolver);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.persistence.storage.archive;

/**
* TODO: add documentation
*/
public interface AsyncLocalBlockArchiver extends Runnable {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.persistence.storage.archive;

/**
* TODO: add documentation
*/
public interface AsyncLocalBlockArchiverFactory {
AsyncLocalBlockArchiver create(final long blockNumber);
}

This file was deleted.

Loading

0 comments on commit de7fdea

Please sign in to comment.