Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] Refactor Table API #340

Closed
2 tasks done
wuchong opened this issue Feb 5, 2025 · 1 comment · Fixed by #341
Closed
2 tasks done

[Improve] Refactor Table API #340

wuchong opened this issue Feb 5, 2025 · 1 comment · Fixed by #341
Assignees
Milestone

Comments

@wuchong
Copy link
Member

wuchong commented Feb 5, 2025

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

Currently, the Table API of the Fluss client is not very clean and consistent. When supporting some new features, it becomes more and more complicated to extend the current Table API.

The problems of the current Table API:

  • limitScan, getLogScanner, getSnapshotScanner are all scanner, but APIs are very different.
  • Refactor SnapshotScanner
    • it is not an interface, we need a more general interface for bounded scan, no need to design a scanner interface for snapshot.
    • The snapshot scanner should be created from a snapshot id instead of snapshot files, because users don't know what files, snapshot files are internal objects.
  • getLookuper and getPrefixLookuper can be unified, there are no different on the lookuper interface, but only the configuring.
  • It is not a fluent API.

Solution

So here is a proposal to refactor the Table API.

The new Table API:

public interface Table extends AutoCloseable {
    Scan newScan();
    Lookup newLookup();
    Append newAppend();
    Upsert newUpsert();
}

/**
 * Used to configure and create a scanners to scan data for a table.
 *
 * <p>{@link Scan} objects are immutable and can be shared between threads. Refinement methods, like
 * {@link #project} and {@link #limit(int)}, create new Scan instances.
 */
public interface Scan {
    Scan project(@Nullable int[] projectedColumns);
    Scan project(List<String> projectedColumnNames);
    Scan limit(int rowNumber);

    LogScanner createLogScanner();
    BatchScanner createBatchScanner(TableBucket tableBucket);
    BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId);
}

// no changes to LogScanner

public interface BatchScanner extends Closeable {
    /**
     * Poll one batch records. The method should return null when reaching the end of the input.
     */
    @Nullable
    CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException;
}

public interface Lookup {
    Lookup lookupBy(List<String> lookupColumnNames);
    Lookuper createLookuper();
}

public interface Lookuper {
    CompletableFuture<LookupResult> lookup(InternalRow lookupKey);
}

public interface Upsert {
    Upsert partialUpdate(@Nullable int[] targetColumns);
    Upsert partialUpdate(String... targetColumnNames);
    UpsertWriter createWriter();
}



public interface UpsertWriter extends TableWriter {
    CompletableFuture<UpsertResult> upsert(InternalRow row);
    CompletableFuture<DeleteResult> delete(InternalRow row);
}

How to use:

  1. Appending data
AppendWriter appendWriter = table.newAppend().createWriter();
appendWriter.append(row)
  1. Upserting and deleting data
UpsertWriter upsertWriter = table.newUpsert().createWriter();
// or with partial update specified columns
// UpsertWriter upsertWriter = table.newUpsert().partialUpdate("a", "b").createWriter();
upsertWriter.upsert(row1);
upsertWriter.delete(key1);
  1. Scanning log data
LogScanner logScanner = table.newScan().createLogScanner();
// or with projection pushdown
// LogScanner logScanner = table.newScan().project(projectedFields).createLogScanner();
logScanner.subscribeFromBeginning(bucketId);
ScanRecords scanRecords = logScanner.poll();
...
  1. Batch scan data with limit
BatchScanner scanner = table.newScan().limit(limitSize).createBatchScanner(tableBucket);
// or with projection pushdown
// BatchScanner scanner = table.newScan().limit(limitSize).project(projectedFields).createBatchScanner(tableBucket);
List<InternalRow> result = collectRows(scanner);
  1. Scan snapshot data
BatchScanner scanner = table.newScan().createBatchScanner(tableBucket, snapshotId);
// or with projection pushdown
// BatchScanner scanner = table.newScan().project(projectedFields).createBatchScanner(tableBucket, snapshotId);
...
  1. Lookup with primary key
Lookuper lookuper = table.newLookup().createLookuper();
InternalRow row = lookuper.lookup(keyRow).get().getSingletonRow();
  1. Lookup with prefix key
Lookuper lookuper = table.newLookup().lookupBy("k1", "k2").createLookuper();
List<InternalRow> rows = lookuper.lookup(prefixKey).get().getRowList();

Anything else?

In order to make the new snapshot scanner API work (table.newScan().createBatchScanner(tableBucket, snapshotId)), we need to refactor Admin interface and RPC a bit to get the latest snapshot id for a given table.

The existing CompletableFuture<KvSnapshotInfo> getKvSnapshot(TablePath tablePath) and CompletableFuture<PartitionSnapshotInfo> getPartitionSnapshot(TablePath tablePath, String partitionName) API will be removed, and the following two methods are introduced:

/**
     * Get the latest kv snapshots of the given table asynchronously. A kv snapshot is a snapshot of
     * a bucket of a primary key table at a certain point in time. Therefore, there are at-most
     * {@code N} snapshots for a primary key table, {@code N} is the number of buckets.
     *
     * <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
     *
     * <ul>
     *   <li>{@link TableNotExistException} if the table does not exist.
     *   <li>{@link NonPrimaryKeyTableException} if the table is not a primary key table.
     *   <li>{@link InvalidTableException} if the table is partitioned, use {@link
     *       #getLatestKvSnapshots(TablePath, String)} instead to get the latest kv snapshot of a
     *       partition of a partitioned table.
     *   <li>
     * </ul>
     *
     * @param tablePath the table path of the table.
     */
    CompletableFuture<KvSnapshots> getLatestKvSnapshots(TablePath tablePath[, String partitionName]);

    /**
     * Get the kv snapshot metadata of the given kv snapshot asynchronously. The kv snapshot
     * metadata including the snapshot files for the kv tablet and the log offset for the changelog
     * at the snapshot time.
     *
     * <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
     *
     * <ul>
     *   <li>{@link KvSnapshotNotExistException} if the snapshot does not exist.
     * </ul>
     *
     * @param bucket the table bucket of the kv snapshot.
     * @param snapshotId the snapshot id.
     */
    CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
            TableBucket bucket, long snapshotId);

Corresponding RPC methods are required to add, and existing RPC methods (GetKvSnapshot, GetPartitionSnapshot) will be removed.

Willingness to contribute

  • I'm willing to submit a PR!
@wuchong wuchong added the feature label Feb 5, 2025
@wuchong wuchong self-assigned this Feb 5, 2025
@wuchong
Copy link
Member Author

wuchong commented Feb 5, 2025

What do you think? @loserwang1024 @luoyuxia @swuferhong

@wuchong wuchong removed the feature label Feb 6, 2025
@wuchong wuchong added this to the v0.6 milestone Feb 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant