-
Notifications
You must be signed in to change notification settings - Fork 80
/
Copy pathhuman_input_chain.py
113 lines (105 loc) · 4.31 KB
/
human_input_chain.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from aws_cdk import (
Duration,
Stack,
aws_bedrock as bedrock,
aws_sns as sns,
aws_stepfunctions as sfn,
aws_stepfunctions_tasks as tasks,
)
from constructs import Construct
class HumanInputChain(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
get_advertisement = tasks.BedrockInvokeModel(
self,
"Generate Advertisement",
# Choose the model to invoke
model=bedrock.FoundationModel.from_foundation_model_id(
self,
"Model",
bedrock.FoundationModelIdentifier.ANTHROPIC_CLAUDE_3_HAIKU_20240307_V1_0,
),
# Provide the input to the model, including the templated prompt and inference properties
body=sfn.TaskInput.from_object(
{
"anthropic_version": "bedrock-2023-05-31",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
# The prompt is templated with the novel name as variable input.
# The input to the Step Functions execution could be:
# "Pride and Prejudice"
"text": sfn.JsonPath.format(
"Write a short advertisement for the book {}.",
sfn.JsonPath.string_at("$$.Execution.Input"),
),
}
],
}
],
"max_tokens": 250,
"temperature": 1,
}
),
)
# Send the generated advertisement to a SNS topic.
# The human receiving the notification is expected to approve or reject the advertisement.
# The human's decision should be sent to the Step Functions execution using the task token.
# aws stepfunctions send-task-success --task-output "{\"decision\": \"yes\"}" --task-token "AQB8A..."
topic = sns.Topic(
self, "Topic", display_name="Human input topic for techniques example"
)
publish_ad_for_approval = tasks.SnsPublish(
self,
"Get Approval For Advertisement",
topic=topic,
integration_pattern=sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
message=sfn.TaskInput.from_object(
{
"advertisement": sfn.JsonPath.string_at("$.Body.content[0].text"),
"task_token": sfn.JsonPath.task_token,
}
),
result_path="$.human_input",
)
extract_ad = sfn.Pass(
self,
"Extract Advertisement",
parameters={
"advertisement": sfn.JsonPath.string_at("$.Body.content[0].text"),
},
)
handle_user_decision = (
sfn.Choice(self, "Is Advertisement Approved?")
.when(
# Human approved the ad - finish the Step Functions execution
sfn.Condition.string_equals("$.human_input.decision", "yes"),
extract_ad,
)
.when(
# Human rejected the ad - loop back to generate a new ad
sfn.Condition.string_equals("$.human_input.decision", "no"),
get_advertisement,
)
.otherwise(
sfn.Fail(
self,
"Invalid Advertisement Approval Value",
cause="Unknown user choice (decision must be yes or no)",
error="Unknown user choice (decision must be yes or no)",
)
)
)
chain = get_advertisement.next(publish_ad_for_approval).next(
handle_user_decision
)
sfn.StateMachine(
self,
"HumanInputExample",
state_machine_name="Techniques-HumanInput",
definition_body=sfn.DefinitionBody.from_chainable(chain),
timeout=Duration.minutes(10),
)