Skip to content

Commit

Permalink
chore: change the liverootpath and archiverootpath to absolute paths …
Browse files Browse the repository at this point in the history
…as default (#656)

Signed-off-by: Atanas Atanasov <[email protected]>
  • Loading branch information
ata-nas authored Feb 20, 2025
1 parent addf891 commit cd2284b
Show file tree
Hide file tree
Showing 22 changed files with 150 additions and 347 deletions.
2 changes: 1 addition & 1 deletion server/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ COPY logging.properties /app/logs/config/logging.properties
WORKDIR /

# Ensure proper file permissions
RUN chown -R 2000:2000 /app
RUN chown -R 2000:2000 /app /opt

########################################
#### Deterministic Build Hack ####
Expand Down
35 changes: 18 additions & 17 deletions server/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,21 @@ The default configuration allows users to quickly get up and running without hav
ease of use at the trade-off of some insecure default configuration. Most configuration settings have appropriate
defaults and can be left unchanged. It is recommended to browse the properties below and adjust to your needs.

| Environment Variable | Description | Default Value |
|:--------------------------------------|:---------------------------------------------------------------------------------------------|--------------------:|
| PERSISTENCE_STORAGE_LIVE_ROOT_PATH | The root path for the live storage. | |
| PERSISTENCE_STORAGE_ARCHIVE_ROOT_PATH | The root path for the archive storage. | |
| PERSISTENCE_STORAGE_TYPE | Type of the persistence storage | BLOCK_AS_LOCAL_FILE |
| PERSISTENCE_STORAGE_COMPRESSION | Compression algorithm used during persistence (could be none as well) | ZSTD |
| PERSISTENCE_STORAGE_COMPRESSION_LEVEL | Compression level to be used by the compression algorithm | 3 |
| CONSUMER_MAX_BLOCK_ITEM_BATCH_SIZE | Maximum size of block item batches streamed to a client for closed-range historical requests | 1000 |
| CONSUMER_TIMEOUT_THRESHOLD_MILLIS | Time to wait for subscribers before disconnecting in milliseconds | 1500 |
| SERVICE_DELAY_MILLIS | Service shutdown delay in milliseconds | 500 |
| MEDIATOR_RING_BUFFER_SIZE | Size of the ring buffer used by the mediator (must be a power of 2) | 67108864 |
| NOTIFIER_RING_BUFFER_SIZE | Size of the ring buffer used by the notifier (must be a power of 2) | 2048 |
| SERVER_PORT | The port the server will listen on | 8080 |
| SERVER_MAX_MESSAGE_SIZE_BYTES | The maximum size of a message frame in bytes | 1048576 |
| VERIFICATION_ENABLED | Enables or disables the block verification process | true |
| VERIFICATION_SESSION_TYPE | The type of BlockVerificationSession to use, either `ASYNC` or `SYNC` | ASYNC |
| VERIFICATION_HASH_COMBINE_BATCH_SIZE | The number of hashes to combine into a single hash during verification | 32 |
| Environment Variable | Description | Default Value |
|:---------------------------------------|:---------------------------------------------------------------------------------------------|:--------------------------------------|
| PERSISTENCE_STORAGE_LIVE_ROOT_PATH | The root path for the live storage. | /opt/hashgraph/blocknode/data/live |
| PERSISTENCE_STORAGE_ARCHIVE_ROOT_PATH | The root path for the archive storage. | /opt/hashgraph/blocknode/data/archive |
| PERSISTENCE_STORAGE_TYPE | Type of the persistence storage | BLOCK_AS_LOCAL_FILE |
| PERSISTENCE_STORAGE_COMPRESSION | Compression algorithm used during persistence (could be none as well) | ZSTD |
| PERSISTENCE_STORAGE_COMPRESSION_LEVEL | Compression level to be used by the compression algorithm | 3 |
| PERSISTENCE_STORAGE_ARCHIVE_GROUP_SIZE | The size of the group of blocks to be archived at once | 1_000 |
| CONSUMER_MAX_BLOCK_ITEM_BATCH_SIZE | Maximum size of block item batches streamed to a client for closed-range historical requests | 1000 |
| CONSUMER_TIMEOUT_THRESHOLD_MILLIS | Time to wait for subscribers before disconnecting in milliseconds | 1500 |
| SERVICE_DELAY_MILLIS | Service shutdown delay in milliseconds | 500 |
| MEDIATOR_RING_BUFFER_SIZE | Size of the ring buffer used by the mediator (must be a power of 2) | 67108864 |
| NOTIFIER_RING_BUFFER_SIZE | Size of the ring buffer used by the notifier (must be a power of 2) | 2048 |
| SERVER_PORT | The port the server will listen on | 8080 |
| SERVER_MAX_MESSAGE_SIZE_BYTES | The maximum size of a message frame in bytes | 1048576 |
| VERIFICATION_ENABLED | Enables or disables the block verification process | true |
| VERIFICATION_SESSION_TYPE | The type of BlockVerificationSession to use, either `ASYNC` or `SYNC` | ASYNC |
| VERIFICATION_HASH_COMBINE_BATCH_SIZE | The number of hashes to combine into a single hash during verification | 32 |
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public final class ServerMappedConfigSourceInitializer {
new ConfigMapping("persistence.storage.liveRootPath", "PERSISTENCE_STORAGE_LIVE_ROOT_PATH"),
new ConfigMapping("persistence.storage.type", "PERSISTENCE_STORAGE_TYPE"),
new ConfigMapping("persistence.storage.archiveEnabled", "PERSISTENCE_STORAGE_ARCHIVE_ENABLED"),
new ConfigMapping("persistence.storage.archiveBatchSize", "PERSISTENCE_STORAGE_ARCHIVE_BATCH_SIZE"),
new ConfigMapping("persistence.storage.archiveGroupSize", "PERSISTENCE_STORAGE_ARCHIVE_BATCH_SIZE"),

// Producer Config
new ConfigMapping("producer.type", "PRODUCER_TYPE"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import dagger.Module;
import dagger.Provides;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -119,10 +121,14 @@ static BlockRemover providesBlockRemover(
@Singleton
static BlockPathResolver providesPathResolver(@NonNull final PersistenceStorageConfig config) {
final StorageType persistenceType = config.type();
return switch (persistenceType) {
case BLOCK_AS_LOCAL_FILE -> BlockAsLocalFilePathResolver.of(config);
case NO_OP -> NoOpBlockPathResolver.newInstance();
};
try {
return switch (persistenceType) {
case BLOCK_AS_LOCAL_FILE -> new BlockAsLocalFilePathResolver(config);
case NO_OP -> new NoOpBlockPathResolver();
};
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.persistence.storage;

import static com.hedera.block.server.Constants.BLOCK_NODE_ARCHIVE_ROOT_DIRECTORY_SEMANTIC_NAME;
import static com.hedera.block.server.Constants.BLOCK_NODE_LIVE_ROOT_DIRECTORY_SEMANTIC_NAME;

import com.hedera.block.common.utils.Preconditions;
import com.hedera.block.common.utils.StringUtilities;
import com.hedera.block.server.config.logging.Loggable;
import com.swirlds.config.api.ConfigData;
import com.swirlds.config.api.ConfigProperty;
import com.swirlds.config.api.validation.annotation.Max;
import com.swirlds.config.api.validation.annotation.Min;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;

Expand All @@ -27,92 +19,27 @@
* @param compression compression type to use for the storage
* @param compressionLevel compression level used by the compression algorithm
* Non-PRODUCTION values should only be used for troubleshooting and development purposes.
* @param archiveEnabled whether to enable archiving
* @param archiveGroupSize the number of blocks to archive in a single group
*/
@ConfigData("persistence.storage")
public record PersistenceStorageConfig(
// @todo(#371) - the default life/archive root path must be absolute starting from /opt
@Loggable @ConfigProperty(defaultValue = "") String liveRootPath,
// @todo(#371) - the default life/archive root path must be absolute starting from /opt
@Loggable @ConfigProperty(defaultValue = "") String archiveRootPath,
@Loggable @ConfigProperty(defaultValue = "/opt/hashgraph/blocknode/data/live") Path liveRootPath,
@Loggable @ConfigProperty(defaultValue = "/opt/hashgraph/blocknode/data/archive") Path archiveRootPath,
@Loggable @ConfigProperty(defaultValue = "BLOCK_AS_LOCAL_FILE") StorageType type,
@Loggable @ConfigProperty(defaultValue = "ZSTD") CompressionType compression,
@Loggable @ConfigProperty(defaultValue = "3") @Min(0) @Max(20) int compressionLevel,
@Loggable @ConfigProperty(defaultValue = "true") boolean archiveEnabled,
@Loggable @ConfigProperty(defaultValue = "1_000")
int archiveBatchSize) { // @todo(517) rename batch to group size
// @todo(#371) - the default life/archive root path must be absolute starting from /opt
private static final String LIVE_ROOT_PATH =
Path.of("hashgraph/blocknode/data/live/").toAbsolutePath().toString();
// @todo(#371) - the default life/archive root path must be absolute starting from /opt
private static final String ARCHIVE_ROOT_PATH =
Path.of("hashgraph/blocknode/data/archive/").toAbsolutePath().toString();

@Loggable @ConfigProperty(defaultValue = "1_000") int archiveGroupSize) {
/**
* Constructor.
*/
public PersistenceStorageConfig {
Objects.requireNonNull(liveRootPath);
Objects.requireNonNull(archiveRootPath);
Objects.requireNonNull(type);
Preconditions.requirePositivePowerOf10(archiveBatchSize);
compression.verifyCompressionLevel(compressionLevel);
liveRootPath = resolvePath(liveRootPath, LIVE_ROOT_PATH, BLOCK_NODE_LIVE_ROOT_DIRECTORY_SEMANTIC_NAME);
archiveRootPath =
resolvePath(archiveRootPath, ARCHIVE_ROOT_PATH, BLOCK_NODE_ARCHIVE_ROOT_DIRECTORY_SEMANTIC_NAME);
}

/**
* This method attempts to resolve a given configured path. If the input
* path is blank, a default path is used. The resolved path must be
* absolute! If the path resolution is successful, at attempt is made to
* create the directory path. If the directory path cannot be created, an
* {@link UncheckedIOException} is thrown.
*
* @param pathToResolve the path to resolve
* @param defaultIfBlank the default path if the path to resolve is blank
* @param semanticPathName the semantic name of the path used for logging
* @return the resolved path
* @throws IllegalArgumentException if the resolved path is not absolute
* @throws UncheckedIOException if the resolved path cannot be created
*/
@NonNull
private String resolvePath(
final String pathToResolve, @NonNull final String defaultIfBlank, @NonNull final String semanticPathName) {
final Path normalized = getNormalizedPath(pathToResolve, defaultIfBlank);
createDirectoryPath(normalized, semanticPathName);
return normalized.toString();
}

/**
* This method normalizes a given path. If the path to normalize is blank,
* a default path is used. The normalized path must be absolute!
*
* @param pathToNormalize the path to normalize
* @param defaultIfBlank the default path if the path to normalize is blank
* @throws IllegalArgumentException if the path to normalize is not absolute
*/
@NonNull
private Path getNormalizedPath(final String pathToNormalize, @NonNull final String defaultIfBlank) {
final String actualToNormalize = StringUtilities.isBlank(pathToNormalize) ? defaultIfBlank : pathToNormalize;
return Path.of(actualToNormalize).normalize().toAbsolutePath();
}

/**
* This method creates a directory path at the given target path. If the
* directory path cannot be created, an {@link UncheckedIOException} is
* thrown.
*
* @param targetPath the target path to create the directory path
* @param semanticPathName the semantic name of the path used for logging
* @throws UncheckedIOException if the directory path cannot be created
*/
private void createDirectoryPath(@NonNull final Path targetPath, @NonNull final String semanticPathName) {
try {
Files.createDirectories(targetPath);
} catch (final IOException e) {
final String classname = this.getClass().getName();
final String message = "Unable to instantiate [%s]! Unable to create the [%s] path that was provided!"
.formatted(classname, semanticPathName);
throw new UncheckedIOException(message, e);
}
Preconditions.requirePositivePowerOf10(archiveGroupSize);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,15 @@ public final class BlockAsLocalFileArchiver implements LocalBlockArchiver {
private final BlockArchiverRunnable archiverRunnable;
private final ExecutorService executor;

private BlockAsLocalFileArchiver(
@NonNull final PersistenceStorageConfig config, @NonNull final BlockPathResolver blockPathResolver) {
public BlockAsLocalFileArchiver(
@NonNull final PersistenceStorageConfig config, @NonNull final BlockPathResolver blockPathResolver)
throws IOException {
this.archiverRunnable =
new BlockArchiverRunnable(Objects.requireNonNull(config), Objects.requireNonNull(blockPathResolver));
this.executor = Executors.newSingleThreadExecutor();
this.executor.submit(archiverRunnable);
}

public static BlockArchiver of(
@NonNull final PersistenceStorageConfig config, @NonNull final BlockPathResolver blockPathResolver) {
return new BlockAsLocalFileArchiver(config, blockPathResolver);
}

@Override
public void signalBlockWritten(final long latestBlockNumber) {
this.archiverRunnable.signalBlockWritten(latestBlockNumber);
Expand All @@ -66,7 +62,7 @@ public void stop() throws InterruptedException {
private static final class BlockArchiverRunnable implements Runnable {
private static final System.Logger LOGGER = System.getLogger(BlockArchiverRunnable.class.getName());
private final Path archiveRootPath;
private final int archiveBatchSize;
private final int archiveGroupSize;
private final BlockPathResolver blockPathResolver;
private volatile ThreadSignalCarrier threadSignalCarrier;
private volatile boolean running;
Expand All @@ -76,9 +72,11 @@ private static final class BlockArchiverRunnable implements Runnable {
// @todo(517) no state will be needed once we move to task based solution

private BlockArchiverRunnable(
@NonNull final PersistenceStorageConfig config, final BlockPathResolver blockPathResolver) {
this.archiveRootPath = Path.of(config.archiveRootPath());
this.archiveBatchSize = config.archiveBatchSize();
@NonNull final PersistenceStorageConfig config, final BlockPathResolver blockPathResolver)
throws IOException {
this.archiveRootPath = Objects.requireNonNull(config.archiveRootPath());
Files.createDirectories(archiveRootPath);
this.archiveGroupSize = config.archiveGroupSize();
this.blockPathResolver = Objects.requireNonNull(blockPathResolver);
}

Expand All @@ -88,7 +86,7 @@ private void signalBlockWritten(final long latestBlockNumber) {
if (running) {
// the logic inside this block should be accessed only by a single thread
final int currentSignalCount = ++blockWrittenSignalCounter;
if (currentSignalCount >= archiveBatchSize) {
if (currentSignalCount >= archiveGroupSize) {
// if threshold is reached, notify the worker and reset counter
blockWrittenSignalCounter = 0;
lastWrittenBlockNumber = latestBlockNumber;
Expand Down Expand Up @@ -136,10 +134,10 @@ private void archive() throws IOException {
}

final boolean shouldArchiveBlocks =
lastWrittenLastArchivedGap >= (archiveBatchSize * 2L); // todo is this correct x2 ?
lastWrittenLastArchivedGap >= (archiveGroupSize * 2L); // todo is this correct x2 ?

if (shouldArchiveBlocks) {
final long amountOfBlocksToWrite = lastWrittenLastArchivedGap - archiveBatchSize;
final long amountOfBlocksToWrite = lastWrittenLastArchivedGap - archiveGroupSize;
final List<Path> pathsToArchiveAscending = new ArrayList<>();
for (int i = 0; i < amountOfBlocksToWrite; i++) {
final Optional<LiveBlockPath> block =
Expand All @@ -158,7 +156,7 @@ private void archive() throws IOException {
final long blockNumber =
Long.parseLong(path.getFileName().toString().split("\\.")[0]);
final Path zipPath =
resolveArchivePathForZipOfBlockNumber(blockNumber, archiveBatchSize, archiveRootPath);
resolveArchivePathForZipOfBlockNumber(blockNumber, archiveGroupSize, archiveRootPath);
if (pathsToArchive.containsKey(zipPath)) {
pathsToArchive.get(zipPath).add(path);
} else {
Expand All @@ -169,10 +167,10 @@ private void archive() throws IOException {
}

pathsToArchive = pathsToArchive.entrySet().stream()
.filter(entry -> entry.getValue().size() == archiveBatchSize
.filter(entry -> entry.getValue().size() == archiveGroupSize
|| entry.getKey()
.toString()
.endsWith("/0".repeat(19 - (int) Math.log10(archiveBatchSize)) + ".zip"))
.endsWith("/0".repeat(19 - (int) Math.log10(archiveGroupSize)) + ".zip"))
.collect(
Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, TreeMap::new));

Expand Down
Loading

0 comments on commit cd2284b

Please sign in to comment.