Skip to content

Commit

Permalink
defer proxying proof of concept
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Sep 4, 2020
1 parent 0dedab2 commit 421166b
Show file tree
Hide file tree
Showing 14 changed files with 231 additions and 26 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@
"./website"
],
"resolutions": {
"graphql": "15.3.0"
"graphql": "npm:graphql-experimental"
}
}
24 changes: 22 additions & 2 deletions packages/delegate/src/defaultMergedResolver.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { defaultFieldResolver, GraphQLResolveInfo } from 'graphql';

import { getResponseKeyFromInfo } from '@graphql-tools/utils';
import { getResponseKeyFromInfo, ExecutionResult } from '@graphql-tools/utils';

import { resolveExternalValue } from './resolveExternalValue';
import { getSubschema } from './Subschema';
Expand All @@ -18,7 +18,7 @@ export function defaultMergedResolver(
args: Record<string, any>,
context: Record<string, any>,
info: GraphQLResolveInfo
) {
): any {
if (!parent) {
return null;
}
Expand All @@ -33,7 +33,27 @@ export function defaultMergedResolver(

const data = parent[responseKey];
const unpathedErrors = getUnpathedErrors(parent);

// To Do:
// add support for transforms
// call out to Receiver abstraction that will publish all patches with channel based on path
// edit code below to subscribe to appropriate channel based on path
// so as to handle multiple defer patches and discriminate between them without need for labels

if (data === undefined && 'ASYNC_ITERABLE' in parent) {
const asyncIterable = parent['ASYNC_ITERABLE'];
return asyncIterableToResult(asyncIterable).then(patch => {
return defaultMergedResolver(patch.data, args, context, info);
});
}

const subschema = getSubschema(parent, responseKey);

return resolveExternalValue(data, unpathedErrors, subschema, context, info);
}

async function asyncIterableToResult(asyncIterable: AsyncIterable<ExecutionResult>): Promise<any> {
const asyncIterator = asyncIterable[Symbol.asyncIterator]();
const payload = await asyncIterator.next();
return payload.value;
}
32 changes: 24 additions & 8 deletions packages/delegate/src/delegateToSchema.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
subscribe,
execute,
validate,
GraphQLSchema,
isSchema,
Expand All @@ -13,9 +12,11 @@ import {
GraphQLObjectType,
} from 'graphql';

import { execute } from 'graphql/experimental';

import isPromise from 'is-promise';

import { mapAsyncIterator, ExecutionResult } from '@graphql-tools/utils';
import { mapAsyncIterator, ExecutionResult, isAsyncIterable } from '@graphql-tools/utils';

import {
IDelegateToSchemaOptions,
Expand All @@ -25,6 +26,7 @@ import {
StitchingInfo,
Endpoint,
Transform,
Executor,
} from './types';

import { isSubschemaConfig } from './Subschema';
Expand Down Expand Up @@ -189,8 +191,16 @@ export function delegateRequest({
info,
});

if (isPromise(executionResult)) {
return executionResult.then(originalResult => transformer.transformResult(originalResult));
if (isAsyncIterable(executionResult)) {
return asyncIterableToResult(executionResult).then(originalResult => {
const transformedResult = transformer.transformResult(originalResult);
transformedResult['ASYNC_ITERABLE'] = executionResult;
return transformedResult;
});
} else if (isPromise(executionResult)) {
return (executionResult as Promise<ExecutionResult>).then(originalResult =>
transformer.transformResult(originalResult)
);
}
return transformer.transformResult(executionResult);
}
Expand All @@ -203,7 +213,7 @@ export function delegateRequest({
context,
info,
}).then((subscriptionResult: AsyncIterableIterator<ExecutionResult> | ExecutionResult) => {
if (Symbol.asyncIterator in subscriptionResult) {
if (isAsyncIterable(subscriptionResult)) {
// "subscribe" to the subscription result and map the result through the transforms
return mapAsyncIterator<ExecutionResult, any>(
subscriptionResult as AsyncIterableIterator<ExecutionResult>,
Expand All @@ -229,15 +239,15 @@ function validateRequest(targetSchema: GraphQLSchema, document: DocumentNode) {
}
}

function createDefaultExecutor(schema: GraphQLSchema, rootValue: Record<string, any>) {
return ({ document, context, variables, info }: ExecutionParams) =>
function createDefaultExecutor(schema: GraphQLSchema, rootValue: Record<string, any>): Executor {
return (({ document, context, variables, info }: ExecutionParams) =>
execute({
schema,
document,
contextValue: context,
variableValues: variables,
rootValue: rootValue ?? info?.rootValue,
});
})) as Executor;
}

function createDefaultSubscriber(schema: GraphQLSchema, rootValue: Record<string, any>) {
Expand All @@ -250,3 +260,9 @@ function createDefaultSubscriber(schema: GraphQLSchema, rootValue: Record<string
rootValue: rootValue ?? info?.rootValue,
}) as any;
}

async function asyncIterableToResult(asyncIterable: AsyncIterable<ExecutionResult>): Promise<any> {
const asyncIterator = asyncIterable[Symbol.asyncIterator]();
const payload = await asyncIterator.next();
return payload.value;
}
18 changes: 12 additions & 6 deletions packages/delegate/src/getBatchingExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ import isPromise from 'is-promise';

import DataLoader from 'dataloader';

import { ExecutionResult } from '@graphql-tools/utils';
import { ExecutionResult, isAsyncIterable } from '@graphql-tools/utils';

import { ExecutionParams, Endpoint } from './types';
import { ExecutionParams, Endpoint, Executor } from './types';
import { memoize2of3 } from './memoize';
import { mergeExecutionParams } from './mergeExecutionParams';
import { splitResult } from './splitResult';

export const getBatchingExecutor = memoize2of3(function (
_context: Record<string, any>,
endpoint: Endpoint,
executor: ({ document, context, variables, info }: ExecutionParams) => ExecutionResult | Promise<ExecutionResult>
executor: Executor
) {
const loader = new DataLoader(
createLoadFn(
Expand All @@ -27,7 +27,7 @@ export const getBatchingExecutor = memoize2of3(function (
});

function createLoadFn(
executor: ({ document, context, variables, info }: ExecutionParams) => ExecutionResult | Promise<ExecutionResult>,
executor: Executor,
extensionsReducer: (mergedExtensions: Record<string, any>, executionParams: ExecutionParams) => Record<string, any>
) {
return async (execs: Array<ExecutionParams>): Promise<Array<ExecutionResult>> => {
Expand All @@ -53,10 +53,16 @@ function createLoadFn(
const mergedExecutionParams = mergeExecutionParams(execBatch, extensionsReducer);
const executionResult = executor(mergedExecutionParams);

if (isPromise(executionResult)) {
if (isAsyncIterable(executionResult)) {
throw new Error('batching not yet possible with queries that return an async iterable (defer/stream)');
// requires splitting up the async iterable into multiple async iterables by path versus possibly just promises
// so requires analyzing which of the results would get an async iterable (ie included defer/stream within the subdocument)
// or returning an async iterable even though defer/stream was not actually present, which is probably simpler
// but most probably against the spec.
} else if (isPromise(executionResult)) {
containsPromises = true;
}
executionResults.push(executionResult);
executionResults.push(executionResult as ExecutionResult);
});

if (containsPromises) {
Expand Down
6 changes: 5 additions & 1 deletion packages/delegate/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import {
GraphQLError,
} from 'graphql';

import { AsyncExecutionResult } from 'graphql/experimental';

import DataLoader from 'dataloader';

import { Operation, Request, TypeMap, ExecutionResult } from '@graphql-tools/utils';
Expand Down Expand Up @@ -131,7 +133,9 @@ export type AsyncExecutor = <
export type SyncExecutor = <TReturn = Record<string, any>, TArgs = Record<string, any>, TContext = Record<string, any>>(
params: ExecutionParams<TArgs, TContext>
) => ExecutionResult<TReturn>;
export type Executor = AsyncExecutor | SyncExecutor;
export type Executor = <TReturn = Record<string, any>, TArgs = Record<string, any>, TContext = Record<string, any>>(
params: ExecutionParams<TArgs, TContext>
) => Promise<ExecutionResult<TReturn>> | ExecutionResult<TReturn> | AsyncIterable<AsyncExecutionResult>;
export type Subscriber = <TReturn = Record<string, any>, TArgs = Record<string, any>, TContext = Record<string, any>>(
params: ExecutionParams<TArgs, TContext>
) => Promise<AsyncIterator<ExecutionResult<TReturn>> | ExecutionResult<TReturn>>;
Expand Down
114 changes: 114 additions & 0 deletions packages/delegate/tests/deferStream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { graphql } from 'graphql/experimental';

import { makeExecutableSchema } from '@graphql-tools/schema';
import { stitchSchemas } from '@graphql-tools/stitch';
import { isAsyncIterable } from '@graphql-tools/utils';

describe('defer support', () => {
test('should work for root fields', async () => {
const schema = makeExecutableSchema({
typeDefs: `
type Query {
test: String
}
`,
resolvers: {
Query: {
test: () => 'test',
}
},
});

const stitchedSchema = stitchSchemas({
subschemas: [schema]
});

const result = await graphql(
stitchedSchema,
`
query {
... on Query @defer {
test
}
}
`,
);

const results = [];
if (isAsyncIterable(result)) {
for await (let patch of result) {
results.push(patch);
}
}

expect(results[0]).toEqual({
data: {},
hasNext: true,
});
expect(results[1]).toEqual({
data: {
test: 'test'
},
hasNext: false,
path: [],
});
});

test('should work for nested fields', async () => {
const schema = makeExecutableSchema({
typeDefs: `
type Object {
test: String
}
type Query {
object: Object
}
`,
resolvers: {
Object: {
test: () => 'test',
},
Query: {
object: () => ({}),
}
},
});

const stitchedSchema = stitchSchemas({
subschemas: [schema]
});

const result = await graphql(
stitchedSchema,
`
query {
object {
... on Object @defer {
test
}
}
}
`,
);

const results = [];
if (isAsyncIterable(result)) {
for await (let patch of result) {
results.push(patch);
}
}

expect(results[0]).toEqual({
data: { object: {} },
hasNext: true,
});
expect(results[1]).toEqual({
data: {
test: 'test'
},
hasNext: false,
path: ['object'],
});
});
});

25 changes: 25 additions & 0 deletions packages/load/tests/loaders/schema/integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,31 @@ describe('loadSchema', () => {
const schemaStr = printSchema(schema);

expect(schemaStr).toBeSimilarGqlDoc(/* GraphQL */`
"""
Directs the executor to defer this fragment when the \`if\` argument is true or undefined.
"""
directive @defer(
"""Deferred when true or undefined."""
if: Boolean
"""Unique name"""
label: String
) on FRAGMENT_SPREAD | INLINE_FRAGMENT
"""
Directs the executor to stream plural fields when the \`if\` argument is true or undefined.
"""
directive @stream(
"""Stream when true or undefined."""
if: Boolean
"""Unique name"""
label: String
"""Number of items to return immediately"""
initialCount: Int!
) on FIELD
type Query {
a: A
b: B
Expand Down
12 changes: 8 additions & 4 deletions packages/schema/src/buildSchemaFromTypeDefinitions.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { extendSchema, buildASTSchema, GraphQLSchema, DocumentNode, ASTNode } from 'graphql';
import { extendSchema, buildASTSchema, GraphQLSchema, DocumentNode, ASTNode, BuildSchemaOptions } from 'graphql';

import { ITypeDefinitions, GraphQLParseOptions, parseGraphQLSDL } from '@graphql-tools/utils';

Expand All @@ -12,12 +12,16 @@ export function buildSchemaFromTypeDefinitions(
const document = buildDocumentFromTypeDefinitions(typeDefinitions, parseOptions);
const typesAst = filterExtensionDefinitions(document);

const backcompatOptions = { commentDescriptions: true };
let schema: GraphQLSchema = buildASTSchema(typesAst, backcompatOptions);
const options: BuildSchemaOptions = {
commentDescriptions: true,
experimentalDefer: true,
experimentalStream: true,
};
let schema: GraphQLSchema = buildASTSchema(typesAst, options);

const extensionsAst = extractExtensionDefinitions(document);
if (extensionsAst.definitions.length > 0) {
schema = extendSchema(schema, extensionsAst, backcompatOptions);
schema = extendSchema(schema, extensionsAst, options);
}

return schema;
Expand Down
8 changes: 8 additions & 0 deletions packages/schema/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,12 @@ export interface IExecutableSchemaDefinition<TContext = any> {
* Additional options for removing unused types from the schema
*/
pruningOptions?: PruneSchemaOptions;
/**
* Set to `true` to enable support within queries for the experimental `defer` directive
*/
experimentalDefer?: boolean;
/**
* Set to `true` to enable support within queries for the experimental `stream` directive
*/
experimentalStream?: boolean;
}
Loading

0 comments on commit 421166b

Please sign in to comment.