Skip to content

Commit

Permalink
Merge pull request #4 from apache/master
Browse files Browse the repository at this point in the history
3rd Dec 2019 21:59PM
  • Loading branch information
amoght authored Dec 3, 2019
2 parents 4f89355 + c1e759c commit 87d1f0a
Show file tree
Hide file tree
Showing 14 changed files with 347 additions and 151 deletions.
10 changes: 6 additions & 4 deletions model/job-management/src/main/proto/beam_job_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ option java_outer_classname = "JobApi";
import "beam_runner_api.proto";
import "endpoints.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";
import "metrics.proto";

// Job Service for running RunnerAPI pipelines
Expand All @@ -47,7 +48,7 @@ service JobService {
rpc GetJobs (GetJobsRequest) returns (GetJobsResponse);

// Get the current state of the job
rpc GetState (GetJobStateRequest) returns (GetJobStateResponse);
rpc GetState (GetJobStateRequest) returns (JobStateEvent);

// Get the job's pipeline
rpc GetPipeline (GetJobPipelineRequest) returns (GetJobPipelineResponse);
Expand All @@ -56,7 +57,7 @@ service JobService {
rpc Cancel (CancelJobRequest) returns (CancelJobResponse);

// Subscribe to a stream of state changes of the job, will immediately return the current state of the job as the first response.
rpc GetStateStream (GetJobStateRequest) returns (stream GetJobStateResponse);
rpc GetStateStream (GetJobStateRequest) returns (stream JobStateEvent);

// Subscribe to a stream of state changes and messages from the job
rpc GetMessageStream (JobMessagesRequest) returns (stream JobMessagesResponse);
Expand Down Expand Up @@ -151,8 +152,9 @@ message GetJobStateRequest {

}

message GetJobStateResponse {
message JobStateEvent {
JobState.Enum state = 1; // (required)
google.protobuf.Timestamp timestamp = 2; // (required)
}


Expand Down Expand Up @@ -196,7 +198,7 @@ message JobMessage {
message JobMessagesResponse {
oneof response {
JobMessage message_response = 1;
GetJobStateResponse state_response = 2;
JobStateEvent state_response = 2;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobPipelineRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobPipelineResponse;
import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobsRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobsResponse;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobInfo;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobStateEvent;
import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
Expand Down Expand Up @@ -190,8 +190,8 @@ public void run(RunJobRequest request, StreamObserver<RunJobResponse> responseOb
String invocationId = invocation.getId();

invocation.addStateListener(
state -> {
if (!JobInvocation.isTerminated(state)) {
event -> {
if (!JobInvocation.isTerminated(event.getState())) {
return;
}
String stagingSessionToken = stagingSessionTokens.get(preparationId);
Expand Down Expand Up @@ -243,14 +243,12 @@ public void getJobs(GetJobsRequest request, StreamObserver<GetJobsResponse> resp
}

@Override
public void getState(
GetJobStateRequest request, StreamObserver<GetJobStateResponse> responseObserver) {
public void getState(GetJobStateRequest request, StreamObserver<JobStateEvent> responseObserver) {
LOG.trace("{} {}", GetJobStateRequest.class.getSimpleName(), request);
String invocationId = request.getJobId();
try {
JobInvocation invocation = getInvocation(invocationId);
JobState.Enum state = invocation.getState();
GetJobStateResponse response = GetJobStateResponse.newBuilder().setState(state).build();
JobStateEvent response = invocation.getStateEvent();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (StatusRuntimeException | StatusException e) {
Expand Down Expand Up @@ -308,15 +306,16 @@ public void cancel(CancelJobRequest request, StreamObserver<CancelJobResponse> r

@Override
public void getStateStream(
GetJobStateRequest request, StreamObserver<GetJobStateResponse> responseObserver) {
GetJobStateRequest request, StreamObserver<JobStateEvent> responseObserver) {
LOG.trace("{} {}", GetJobStateRequest.class.getSimpleName(), request);
String invocationId = request.getJobId();
try {
JobInvocation invocation = getInvocation(invocationId);
Consumer<JobState.Enum> stateListener =
state -> {
responseObserver.onNext(GetJobStateResponse.newBuilder().setState(state).build());
if (JobInvocation.isTerminated(state)) {

Consumer<JobStateEvent> stateListener =
event -> {
responseObserver.onNext(event);
if (JobInvocation.isTerminated(event.getState())) {
responseObserver.onCompleted();
}
};
Expand All @@ -341,13 +340,11 @@ public void getMessageStream(
// and message listener.
StreamObserver<JobMessagesResponse> syncResponseObserver =
SynchronizedStreamObserver.wrapping(responseObserver);
Consumer<JobState.Enum> stateListener =
state -> {
Consumer<JobStateEvent> stateListener =
event -> {
syncResponseObserver.onNext(
JobMessagesResponse.newBuilder()
.setStateResponse(GetJobStateResponse.newBuilder().setState(state).build())
.build());
if (JobInvocation.isTerminated(state)) {
JobMessagesResponse.newBuilder().setStateResponse(event).build());
if (JobInvocation.isTerminated(event.getState())) {
responseObserver.onCompleted();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobStateEvent;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.util.Timestamps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.FutureCallback;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -51,12 +53,12 @@ public class JobInvocation {
private final PortablePipelineRunner pipelineRunner;
private final JobInfo jobInfo;
private final ListeningExecutorService executorService;
private List<Consumer<Enum>> stateObservers;
private List<Consumer<JobStateEvent>> stateObservers;
private List<Consumer<JobMessage>> messageObservers;
private JobState.Enum jobState;
private JobApi.MetricResults metrics;
private PortablePipelineResult resultHandle;
@Nullable private ListenableFuture<PortablePipelineResult> invocationFuture;
private List<JobStateEvent> stateHistory;

public JobInvocation(
JobInfo jobInfo,
Expand All @@ -70,8 +72,9 @@ public JobInvocation(
this.stateObservers = new ArrayList<>();
this.messageObservers = new ArrayList<>();
this.invocationFuture = null;
this.jobState = JobState.Enum.STOPPED;
this.stateHistory = new ArrayList<>();
this.metrics = JobApi.MetricResults.newBuilder().build();
this.setState(JobState.Enum.STOPPED);
}

private PortablePipelineResult runPipeline() throws Exception {
Expand Down Expand Up @@ -191,7 +194,12 @@ public JobApi.MetricResults getMetrics() {

/** Retrieve the job's current state. */
public JobState.Enum getState() {
return this.jobState;
return getStateEvent().getState();
}

/** Retrieve the job's current state. */
public JobStateEvent getStateEvent() {
return stateHistory.get(stateHistory.size() - 1);
}

/** Retrieve the job's pipeline. */
Expand All @@ -200,8 +208,10 @@ public RunnerApi.Pipeline getPipeline() {
}

/** Listen for job state changes with a {@link Consumer}. */
public synchronized void addStateListener(Consumer<JobState.Enum> stateStreamObserver) {
stateStreamObserver.accept(getState());
public synchronized void addStateListener(Consumer<JobStateEvent> stateStreamObserver) {
for (JobStateEvent event : stateHistory) {
stateStreamObserver.accept(event);
}
stateObservers.add(stateStreamObserver);
}

Expand All @@ -221,9 +231,14 @@ public JobApi.JobInfo toProto() {
}

private synchronized void setState(JobState.Enum state) {
this.jobState = state;
for (Consumer<JobState.Enum> observer : stateObservers) {
observer.accept(state);
JobStateEvent event =
JobStateEvent.newBuilder()
.setState(state)
.setTimestamp(Timestamps.fromMillis(System.currentTimeMillis()))
.build();
this.stateHistory.add(event);
for (Consumer<JobStateEvent> observer : stateObservers) {
observer.accept(event);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobStateEvent;
import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceBlockingStub;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
Expand Down Expand Up @@ -61,7 +61,7 @@ public State getState() {
return terminationState;
}
JobServiceBlockingStub stub = jobService.get();
GetJobStateResponse response =
JobStateEvent response =
stub.getState(GetJobStateRequest.newBuilder().setJobIdBytes(jobId).build());
return getJavaState(response.getState());
}
Expand Down Expand Up @@ -103,7 +103,7 @@ public State waitUntilFinish() {
}
JobServiceBlockingStub stub = jobService.get();
GetJobStateRequest request = GetJobStateRequest.newBuilder().setJobIdBytes(jobId).build();
GetJobStateResponse response = stub.getState(request);
JobStateEvent response = stub.getState(request);
State lastState = getJavaState(response.getState());
while (!lastState.isTerminal()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobStateEvent;
import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
Expand Down Expand Up @@ -75,9 +75,8 @@ public void run(RunJobRequest request, StreamObserver<RunJobResponse> responseOb
}

@Override
public void getState(
GetJobStateRequest request, StreamObserver<GetJobStateResponse> responseObserver) {
responseObserver.onNext(GetJobStateResponse.newBuilder().setState(jobState).build());
public void getState(GetJobStateRequest request, StreamObserver<JobStateEvent> responseObserver) {
responseObserver.onNext(JobStateEvent.newBuilder().setState(jobState).build());
responseObserver.onCompleted();
}

Expand Down
Loading

0 comments on commit 87d1f0a

Please sign in to comment.