Skip to content

Commit

Permalink
Update sdks (#148)
Browse files Browse the repository at this point in the history
* Update Kotlin examples to Kotlin 2.0

Signed-off-by: slinkydeveloper <[email protected]>

* Update usages of CoreSerdes to JsonSerdes

Signed-off-by: slinkydeveloper <[email protected]>

* Update all SDKs to 1.0.0

Signed-off-by: slinkydeveloper <[email protected]>

---------

Signed-off-by: slinkydeveloper <[email protected]>
  • Loading branch information
slinkydeveloper authored Jun 7, 2024
1 parent f222361 commit 69533b4
Show file tree
Hide file tree
Showing 42 changed files with 159 additions and 150 deletions.
2 changes: 1 addition & 1 deletion basics/basics-java/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ repositories {
mavenCentral()
}

val restateVersion = "0.9.2"
val restateVersion = "1.0.0"

dependencies {
annotationProcessor("dev.restate:sdk-api-gen:$restateVersion")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

package durable_execution;

import dev.restate.sdk.JsonSerdes;
import dev.restate.sdk.Context;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.Service;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder;
import utils.Permission;
import durable_execution.utils.UpdateRequest;
Expand Down Expand Up @@ -45,7 +45,7 @@ public void applyRoleUpdate(Context ctx, UpdateRequest req) {
// Apply a change to one system (e.g., DB upsert, API call, ...).
// The side effect persists the result with a consensus method so
// any later code relies on a deterministic result.
boolean success = ctx.run(CoreSerdes.JSON_BOOLEAN, () ->
boolean success = ctx.run(JsonSerdes.BOOLEAN, () ->
applyUserRole(req.getUserId(), req.getRole()));
if (!success) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@
// entire partitions.
//

import dev.restate.sdk.JsonSerdes;
import dev.restate.sdk.ObjectContext;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.VirtualObject;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder;
import events_state.ProfileService;
import utils.UserUpdate;

import java.time.Duration;
Expand Down Expand Up @@ -54,19 +53,19 @@ public class UserUpdatesService {
public void updateUserEvent(ObjectContext ctx, UserUpdate update) {

// event handler is a durably executed function that can use all the features of Restate
String userId = ctx.run(CoreSerdes.JSON_STRING, () -> updateUserProfile(update.getProfile()));
String userId = ctx.run(JsonSerdes.STRING, () -> updateUserProfile(update.getProfile()));
while(userId.equals("NOT_READY")) {
// Delay the processing of the event by sleeping.
// The other events for this Virtual Object / key are queued.
// Events for other keys are processed concurrently.
// The sleep suspends the function (e.g., when running on FaaS).
ctx.sleep(Duration.ofMillis(5000));
userId = ctx.run(CoreSerdes.JSON_STRING, () -> updateUserProfile(update.getProfile()));
userId = ctx.run(JsonSerdes.STRING, () -> updateUserProfile(update.getProfile()));
}


String finalUserId = userId;
String roleId = ctx.run(CoreSerdes.JSON_STRING,
String roleId = ctx.run(JsonSerdes.STRING,
() -> setUserPermissions(finalUserId, update.getPermissions()));
ctx.run(() -> provisionResources(finalUserId, roleId, update.getResources()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

package events_state;

import dev.restate.sdk.JsonSerdes;
import dev.restate.sdk.ObjectContext;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.VirtualObject;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder;
import utils.UserProfile;
Expand All @@ -27,10 +27,10 @@
public class ProfileService {

private static final StateKey<String> NAME =
StateKey.of("name", CoreSerdes.JSON_STRING);
StateKey.of("name", JsonSerdes.STRING);

private static final StateKey<String> EMAIL =
StateKey.of("email", CoreSerdes.JSON_STRING);
StateKey.of("email", JsonSerdes.STRING);

@Handler
public void registration(ObjectContext ctx, String name){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
*/
package virtual_objects;

import dev.restate.sdk.JsonSerdes;
import dev.restate.sdk.ObjectContext;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.VirtualObject;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder;

Expand All @@ -32,7 +32,7 @@
public class GreeterObject {

private static final StateKey<Integer> COUNT =
StateKey.of("available-drivers", CoreSerdes.JSON_INT);
StateKey.of("available-drivers", JsonSerdes.INT);

@Handler
public String greet(ObjectContext ctx, String greeting) {
Expand Down
3 changes: 2 additions & 1 deletion basics/basics-typescript/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"example-4": "ts-node-dev --transpile-only src/4_virtual_objects.ts"
},
"dependencies": {
"@restatedev/restate-sdk": "^0.9.2"
"@restatedev/restate-sdk": "^1.0.0",
"@restatedev/restate-sdk-clients": "^1.0.0"
},
"devDependencies": {
"@restatedev/restate": "^0.9.2",
Expand Down
117 changes: 60 additions & 57 deletions basics/basics-typescript/src/3_workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/

import * as restate from "@restatedev/restate-sdk";
import { workflow as wf } from "@restatedev/restate-sdk";
import * as restateClients from "@restatedev/restate-sdk-clients";
import { createUserEntry, sendEmailWithLink } from "./utils/workflow_stubs";

//
Expand All @@ -23,87 +23,90 @@ import { createUserEntry, sendEmailWithLink } from "./utils/workflow_stubs";
// Workflow instances always have a unique ID that identifies the workflow execution.
// Each workflow instance (ID) can run only once (to success or failure).
//
const myWorkflow = wf.workflow("usersignup", {
const myWorkflow = restate.workflow({
// --- The workflow logic is in the run() function ---
name: "usersignup",
handlers: {
run: async (ctx: restate.WorkflowContext, params: { name: string; email: string }) => {
const {name, email} = params;
const userId = ctx.key;

// publish state, for the world to see our progress
ctx.set("stage", "Creating User");

// use all the standard durable execution features here
await ctx.run(() => createUserEntry({userId, name}));

ctx.set("stage", "Email Verification");

// send the email with the verification secret
const secret = await ctx.run(() => crypto.randomUUID());
ctx.run(() => sendEmailWithLink({email, secret}));

try {
// the promise here is resolved or rejected by the additional workflow methods below
const clickSecret = await ctx.promise<string>("email-link");
if (clickSecret !== secret) {
throw new restate.TerminalError("Wrong secret from email link");
}
} catch (err: any) {
ctx.set("stage", "Verification failed: " + err.message);
return;
}

run: async (ctx: wf.WfContext, params: { name: string; email: string }) => {
const { name, email } = params;
const userId = ctx.workflowId();

// publish state, for the world to see our progress
ctx.set("stage", "Creating User");
ctx.set("stage", "User verified");
},

// use all the standard durable execution features here
await ctx.run(() => createUserEntry({ userId, name }));
// --- various interactions for queries and signals ---

ctx.set("stage", "Email Verification");
getStage: (ctx: restate.WorkflowSharedContext) => {
// read the state to get the stage where the workflow is
return ctx.get("stage");
},

// send the email with the verification secret
const secret = await ctx.run(() => crypto.randomUUID());
ctx.run(() => sendEmailWithLink({ email, secret }));
verifyEmail: async (ctx: restate.WorkflowSharedContext, request: { secret: string }) => {
// resolve the durable promise to let the awaiter know
await ctx.promise<string>("email-link").resolve(request.secret);
},

try {
// the promise here is resolved or rejected by the additional workflow methods below
const clickSecret = await ctx.promise<string>("email-link");
if (clickSecret !== secret) {
throw new restate.TerminalError("Wrong secret from email link");
}
} catch (err: any) {
ctx.set("stage", "Verification failed: " + err.message);
return;
}

ctx.set("stage", "User verified");
},

// --- various interactions for queries and signals ---

getStage: (ctx: wf.SharedWfContext) => {
// read the state to get the stage where the workflow is
return ctx.get("stage");
},

verifyEmail: async (ctx: wf.SharedWfContext, request: { secret: string }) => {
// resolve the durable promise to let the awaiter know
ctx.promise<string>("email-link").resolve(request.secret);
},

abortVerification: async (ctx: wf.SharedWfContext) => {
// failing the durable promise will throw an Error for the awaiting thread
ctx.promise<string>("email-link").reject("User aborted verification");
},
abortVerification: async (ctx: restate.WorkflowSharedContext) => {
// failing the durable promise will throw an Error for the awaiting thread
await ctx.promise<string>("email-link").reject("User aborted verification");
},
}
});

export const workflowApi = myWorkflow.api;
restate.endpoint().bind(myWorkflow).listen();

export type WorkflowApi = typeof myWorkflow;

// ---------- ⬆️⬆️ deploy this as a container, lambda, etc. ⬆️⬆️ ----------

// start it via an HTTP call.
// `curl restate:8080/usersignup/submit --json '{ "request": {
// "workflowId": "signup-userid1",
// "name": "Bob",
// "email": "[email protected]"
// } }'
// `curl restate:8080/usersignup/signup-userid1/run/send --json '{ "name": "Bob", "email": "[email protected]" }'

// or programmatically
async function signupUser(userId: string, name: string, email: string) {
const rs = restate.clients.connect("http://restate:8080");
const { client, status } = await rs.submitWorkflow(workflowApi, "signup-" + userId, {
const rs = restateClients.connect({ url: "http://restate:8080" });
const workflow: WorkflowApi = { name: "usersignup" };
const workflowClient = rs.workflowClient(workflow, "signup-" + userId);
const { status } = await workflowClient.workflowSubmit({
name,
email,
});

if (status != wf.WorkflowStartResult.STARTED) {
if (status != "Accepted") {
throw new Error("User ID already taken");
}

await client.result();
await workflowClient.workflowAttach();
}

// interact with the workflow from any other code
async function verifyEmail(userId: string, emailSecret: string) {
const rs = restate.clients.connect("http://restate:8080");
const { client, status } = await rs.connectToWorkflow(workflowApi, "signup-" + userId);
const rs = restateClients.connect({ url: "http://restate:8080" });
const workflow: WorkflowApi = { name: "usersignup" };
const workflowClient = rs.workflowClient(workflow, "signup-" + userId);

client.workflowInterface().verifyEmail({ secret: emailSecret });
workflowClient.verifyEmail({ secret: emailSecret });
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repositories {
mavenCentral()
}

val restateVersion = "0.9.2"
val restateVersion = "1.0.0"

dependencies {
// Restate SDK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

package dev.restate.sdk.examples;

import dev.restate.sdk.JsonSerdes;
import dev.restate.sdk.ObjectContext;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.VirtualObject;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.common.Serde;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.common.TerminalException;
Expand Down Expand Up @@ -59,7 +59,7 @@ public void start(ObjectContext ctx, DeliveryRequest request) throws TerminalExc
ctx.set(DELIVERY_INFO, deliveryInfo);

// Acquire a driver
var driverAwakeable = ctx.awakeable(CoreSerdes.JSON_STRING);
var driverAwakeable = ctx.awakeable(JsonSerdes.STRING);
DriverDeliveryMatcherClient.fromContext(ctx, GeoUtils.DEMO_REGION)
.send()
.requestDriverForDelivery(driverAwakeable.id());
Expand Down Expand Up @@ -119,7 +119,7 @@ public void notifyDeliveryDelivered(ObjectContext ctx) throws TerminalException
ctx.clear(DELIVERY_INFO);

// Notify the OrderService that the delivery has been completed
ctx.awakeableHandle(delivery.getCallbackId()).resolve(CoreSerdes.VOID, null);
ctx.awakeableHandle(delivery.getCallbackId()).resolve(Serde.VOID, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
package dev.restate.sdk.examples;

import com.fasterxml.jackson.core.type.TypeReference;
import dev.restate.sdk.JsonSerdes;
import dev.restate.sdk.ObjectContext;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.VirtualObject;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.common.TerminalException;
import dev.restate.sdk.serde.jackson.JacksonSerdes;
Expand Down Expand Up @@ -52,7 +52,7 @@ public void setDriverAvailable(ObjectContext ctx, String driverId) throws Termin
// Update the queue in state. Delivery was removed.
ctx.set(PENDING_DELIVERIES, pendingDeliveries);
// Notify that delivery is ongoing
ctx.awakeableHandle(nextDelivery).resolve(CoreSerdes.JSON_STRING, driverId);
ctx.awakeableHandle(nextDelivery).resolve(JsonSerdes.STRING, driverId);
return;
}

Expand All @@ -77,7 +77,7 @@ public void requestDriverForDelivery(ObjectContext ctx, String deliveryCallbackI
// Remove driver from the pool
ctx.set(AVAILABLE_DRIVERS, availableDrivers);
// Notify that delivery is ongoing
ctx.awakeableHandle(deliveryCallbackId).resolve(CoreSerdes.JSON_STRING, nextAvailableDriver);
ctx.awakeableHandle(deliveryCallbackId).resolve(JsonSerdes.STRING, nextAvailableDriver);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

package dev.restate.sdk.examples;

import dev.restate.sdk.JsonSerdes;
import dev.restate.sdk.ObjectContext;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.VirtualObject;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.common.TerminalException;
import dev.restate.sdk.examples.types.StatusEnum;
Expand All @@ -25,7 +25,7 @@ public class OrderStatusService {

private static final StateKey<StatusEnum> ORDER_STATUS =
StateKey.of("order-status", JacksonSerdes.of(StatusEnum.class));
private static final StateKey<Long> ORDER_ETA = StateKey.of("order-eta", CoreSerdes.JSON_LONG);
private static final StateKey<Long> ORDER_ETA = StateKey.of("order-eta", JsonSerdes.LONG);

public static class OrderStatus {
private final StatusEnum status;
Expand Down
Loading

0 comments on commit 69533b4

Please sign in to comment.