Skip to content

Commit

Permalink
Remove custom generic stringifiers (#1912)
Browse files Browse the repository at this point in the history
* Remove stringified map

* Remove stringified set

* Fix lint error

* Fix @see usage
  • Loading branch information
Ekrekr authored Feb 6, 2025
1 parent 5995f4e commit 0b5520e
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 371 deletions.
24 changes: 14 additions & 10 deletions cli/api/commands/build.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { prune } from "df/cli/api/commands/prune";
import { state } from "df/cli/api/commands/state";
import * as dbadapters from "df/cli/api/dbadapters";
import { ExecutionSql } from "df/cli/api/dbadapters/execution_sql";
import { StringifiedMap, StringifiedSet } from "df/common/strings/stringifier";
import { targetStringifier } from "df/core/targets";
import * as utils from "df/core/utils";
import { dataform } from "df/protos/ts";
Expand All @@ -14,15 +13,17 @@ export async function build(
) {
const prunedGraph = prune(compiledGraph, runConfig);

const allInvolvedTargets = new StringifiedSet<dataform.ITarget>(
targetStringifier,
prunedGraph.tables.map(table => table.target)
const allInvolvedTargets = new Set<string>(
prunedGraph.tables.map(table => targetStringifier.stringify(table.target))
);

return new Builder(
prunedGraph,
runConfig,
await state(dbadapter, Array.from(allInvolvedTargets))
await state(
dbadapter,
Array.from(allInvolvedTargets).map(target => targetStringifier.parse(target))
)
).build();
}

Expand All @@ -46,16 +47,19 @@ export class Builder {
throw new Error(`Project has unresolved compilation or validation errors.`);
}

const tableMetadataByTarget = new StringifiedMap<dataform.ITarget, dataform.ITableMetadata>(
targetStringifier
);
const tableMetadataByTarget = new Map<string, dataform.ITableMetadata>();

this.warehouseState.tables.forEach(tableState => {
tableMetadataByTarget.set(tableState.target, tableState);
tableMetadataByTarget.set(targetStringifier.stringify(tableState.target), tableState);
});

const actions: dataform.IExecutionAction[] = [].concat(
this.prunedGraph.tables.map(t =>
this.buildTable(t, tableMetadataByTarget.get(t.target), this.runConfig)
this.buildTable(
t,
tableMetadataByTarget.get(targetStringifier.stringify(t.target)),
this.runConfig
)
),
this.prunedGraph.operations.map(o => this.buildOperation(o)),
this.prunedGraph.assertions.map(a => this.buildAssertion(a))
Expand Down
55 changes: 28 additions & 27 deletions cli/api/commands/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { IBigQueryExecutionOptions } from "df/cli/api/dbadapters/bigquery";
import { Flags } from "df/common/flags";
import { retry } from "df/common/promises";
import { deepClone, equals } from "df/common/protos";
import { StringifiedMap, StringifiedSet } from "df/common/strings/stringifier";
import { targetStringifier } from "df/core/targets";
import { dataform } from "df/protos/ts";

Expand Down Expand Up @@ -45,17 +44,14 @@ export function run(
}

export class Runner {
private readonly warehouseStateByTarget: StringifiedMap<
dataform.ITarget,
dataform.ITableMetadata
>;
private readonly warehouseStateByTarget: Map<string, dataform.ITableMetadata>;

private readonly allActionTargets: StringifiedSet<dataform.ITarget>;
private readonly allActionTargets: Set<string>;
private readonly runResult: dataform.IRunResult;
private readonly changeListeners: Array<(graph: dataform.IRunResult) => void> = [];
private readonly eEmitter: EventEmitter;
private executedActionTargets: StringifiedSet<dataform.ITarget>;
private successfullyExecutedActionTargets: StringifiedSet<dataform.ITarget>;
private executedActionTargets: Set<string>;
private successfullyExecutedActionTargets: Set<string>;
private pendingActions: dataform.IExecutionAction[];
private lastNotificationTimestampMillis = 0;
private stopped = false;
Expand All @@ -71,30 +67,32 @@ export class Runner {
partiallyExecutedRunResult: dataform.IRunResult = {},
private readonly runnerNotificationPeriodMillis: number = flags.runnerNotificationPeriodMillis.get()
) {
this.allActionTargets = new StringifiedSet<dataform.ITarget>(
targetStringifier,
graph.actions.map(action => action.target)
this.allActionTargets = new Set<string>(
graph.actions.map(action => targetStringifier.stringify(action.target))
);
this.runResult = {
actions: [],
...partiallyExecutedRunResult
};
this.warehouseStateByTarget = new StringifiedMap(
targetStringifier,
graph.warehouseState.tables?.map(tableMetadata => [tableMetadata.target, tableMetadata])
this.warehouseStateByTarget = new Map<string, dataform.ITableMetadata>();
graph.warehouseState.tables?.forEach(tableMetadata =>
this.warehouseStateByTarget.set(
targetStringifier.stringify(tableMetadata.target),
tableMetadata
)
);
this.executedActionTargets = new StringifiedSet(
targetStringifier,
this.executedActionTargets = new Set(
this.runResult.actions
.filter(action => action.status !== dataform.ActionResult.ExecutionStatus.RUNNING)
.map(action => action.target)
.map(action => targetStringifier.stringify(action.target))
);
this.successfullyExecutedActionTargets = new StringifiedSet(
targetStringifier,
this.runResult.actions.filter(isSuccessfulAction).map(action => action.target)
this.successfullyExecutedActionTargets = new Set<string>(
this.runResult.actions
.filter(isSuccessfulAction)
.map(action => targetStringifier.stringify(action.target))
);
this.pendingActions = graph.actions.filter(
action => !this.executedActionTargets.has(action.target)
action => !this.executedActionTargets.has(targetStringifier.stringify(action.target))
);
this.eEmitter = new EventEmitter();
// There could feasibly be thousands of listeners to this, 0 makes the limit infinite.
Expand Down Expand Up @@ -247,8 +245,8 @@ export class Runner {
// have executed successfully.
pendingAction.dependencyTargets.every(
dependency =>
!this.allActionTargets.has(dependency) ||
this.successfullyExecutedActionTargets.has(dependency)
!this.allActionTargets.has(targetStringifier.stringify(dependency)) ||
this.successfullyExecutedActionTargets.has(targetStringifier.stringify(dependency))
)
) {
executableActions.push(pendingAction);
Expand All @@ -257,7 +255,8 @@ export class Runner {
// exist in the graph, or have completed execution.
pendingAction.dependencyTargets.every(
dependency =>
!this.allActionTargets.has(dependency) || this.executedActionTargets.has(dependency)
!this.allActionTargets.has(targetStringifier.stringify(dependency)) ||
this.executedActionTargets.has(targetStringifier.stringify(dependency))
)
) {
skippableActions.push(pendingAction);
Expand Down Expand Up @@ -287,9 +286,11 @@ export class Runner {
Promise.all(
executableActions.map(async executableAction => {
const actionResult = await this.executeAction(executableAction);
this.executedActionTargets.add(executableAction.target);
this.executedActionTargets.add(targetStringifier.stringify(executableAction.target));
if (isSuccessfulAction(actionResult)) {
this.successfullyExecutedActionTargets.add(executableAction.target);
this.successfullyExecutedActionTargets.add(
targetStringifier.stringify(executableAction.target)
);
}
await this.executeAllActionsReadyForExecution();
})
Expand Down Expand Up @@ -383,7 +384,7 @@ export class Runner {
}
}

this.warehouseStateByTarget.delete(action.target);
this.warehouseStateByTarget.delete(targetStringifier.stringify(action.target));

if (actionResult.status === dataform.ActionResult.ExecutionStatus.RUNNING) {
actionResult.status = dataform.ActionResult.ExecutionStatus.SUCCESSFUL;
Expand Down
44 changes: 24 additions & 20 deletions cli/api/utils/graphs.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,39 @@
import { StringifiedMap } from "df/common/strings/stringifier";
import { targetStringifier } from "df/core/targets";
import { dataform } from "df/protos/ts";

export const combineAllActions = (graph: dataform.ICompiledGraph) => {
return ([] as Array<
dataform.ITable | dataform.IOperation | dataform.IAssertion | dataform.IDeclaration | dataform.IDataPreparation
>).concat(
type CoreProtoActionTypes =
| dataform.ITable
| dataform.IOperation
| dataform.IAssertion
| dataform.IDeclaration
| dataform.IDataPreparation;

function combineAllActions(graph: dataform.ICompiledGraph) {
return ([] as CoreProtoActionTypes[]).concat(
graph.tables || ([] as dataform.ITable[]),
graph.operations || ([] as dataform.IOperation[]),
graph.assertions || ([] as dataform.IAssertion[]),
graph.declarations || ([] as dataform.IDeclaration[]),
graph.dataPreparations || ([] as dataform.IDataPreparation[])
);
};
}

export function actionsByTarget(compiledGraph: dataform.ICompiledGraph) {
return new StringifiedMap(
targetStringifier,
combineAllActions(compiledGraph)
// Required for backwards compatibility with old versions of @dataform/core.
.filter(action => !!action.target)
.map(action => [action.target, action])
);
const actionsMap = new Map<string, CoreProtoActionTypes>();
combineAllActions(compiledGraph)
// Required for backwards compatibility with old versions of @dataform/core.
.filter(action => !!action.target)
.forEach(action => {
actionsMap.set(targetStringifier.stringify(action.target), action);
});
}

export function actionsByCanonicalTarget(compiledGraph: dataform.ICompiledGraph) {
return new StringifiedMap(
targetStringifier,
combineAllActions(compiledGraph)
// Required for backwards compatibility with old versions of @dataform/core.
.filter(action => !!action.canonicalTarget)
.map(action => [action.canonicalTarget, action])
);
const actionsMap = new Map<string, CoreProtoActionTypes>();
combineAllActions(compiledGraph)
// Required for backwards compatibility with old versions of @dataform/core.
.filter(action => !!action.canonicalTarget)
.forEach(action => {
actionsMap.set(targetStringifier.stringify(action.canonicalTarget), action);
});
}
2 changes: 1 addition & 1 deletion cli/console.ts
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ function printExecutedActionErrors(
executionAction: dataform.IExecutionAction
) {
const failingTasks = executedAction.tasks.filter(
task => task.status === dataform.ActionResult.ExecutionStatus.FAILED
task => task.status === dataform.TaskResult.ExecutionStatus.FAILED
);
failingTasks.forEach((task, i) => {
executionAction.tasks[i].statement.split("\n").forEach(line => {
Expand Down
2 changes: 1 addition & 1 deletion cli/vm/compile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ if (require.main === module) {
}

/**
* @returns a base64 encoded {@see dataform.CoreExecutionRequest} proto.
* @returns a base64 encoded @see {@link dataform.CoreExecutionRequest} proto.
*/
function createCoreExecutionRequest(compileConfig: dataform.ICompileConfig): string {
const filePaths = Array.from(
Expand Down
85 changes: 1 addition & 84 deletions common/strings/stringifier.spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import { expect } from "chai";
import { basename } from "path";

import {
JSONObjectStringifier,
StringifiedMap,
StringifiedSet
} from "df/common/strings/stringifier";
import { JSONObjectStringifier } from "df/common/strings/stringifier";
import { suite, test } from "df/testing";

interface IKey {
Expand Down Expand Up @@ -50,83 +46,4 @@ suite(basename(__filename), () => {
expect(stringifier.stringify(value)).equals(stringifier.stringify(value2));
});
});

suite("stringified map", () => {
const jsonStringifiedMap = new StringifiedMap<IKey, string>(new JSONObjectStringifier(), [
[{ a: "1", b: 2 }, "x"],
[{ a: "2", b: 2 }, "y"],
[{ a: "2", b: 3 }, "z"]
]);

test("has correct stringified keys", () => {
expect([...jsonStringifiedMap.keys()]).deep.equals([
{ a: "1", b: 2 },
{ a: "2", b: 2 },
{ a: "2", b: 3 }
]);
});

test("has correct stringified iterator", () => {
expect([...jsonStringifiedMap]).deep.equals([
[{ a: "1", b: 2 }, "x"],
[{ a: "2", b: 2 }, "y"],
[{ a: "2", b: 3 }, "z"]
]);
});

test("set and get", () => {
const map = new StringifiedMap<IKey, string>(new JSONObjectStringifier());
map.set(
{
a: "1",
b: 2
},
"test"
);
expect(
map.get({
a: "1",
b: 2
})
).equals("test");
});
});

suite("stringified set", () => {
const jsonStringifiedSet = new StringifiedSet<IKey>(new JSONObjectStringifier(), [
{ a: "1", b: 2 },
{ a: "2", b: 2 },
{ a: "2", b: 3 }
]);

test("has correct stringified values", () => {
expect([...jsonStringifiedSet.values()]).deep.equals([
{ a: "1", b: 2 },
{ a: "2", b: 2 },
{ a: "2", b: 3 }
]);
});

test("has correct stringified iterator", () => {
expect([...jsonStringifiedSet]).deep.equals([
{ a: "1", b: 2 },
{ a: "2", b: 2 },
{ a: "2", b: 3 }
]);
});

test("add and has", () => {
const set = new StringifiedSet<IKey>(new JSONObjectStringifier());
set.add({
a: "1",
b: 2
});
expect(
set.has({
a: "1",
b: 2
})
).equals(true);
});
});
});
Loading

0 comments on commit 0b5520e

Please sign in to comment.