Skip to content

Commit

Permalink
Replace QueryStorage grpc with QueryContext
Browse files Browse the repository at this point in the history
Instead of using an additional grpc call to call from the Admin component
to the node grpc service, the Admin component uses directly the QueryContext
to answer query requests.
  • Loading branch information
tillrohrmann committed Oct 31, 2024
1 parent e40eee1 commit fdd2c36
Show file tree
Hide file tree
Showing 14 changed files with 102 additions and 226 deletions.
24 changes: 1 addition & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ restate-futures-util = { workspace = true }
restate-metadata-store = { workspace = true }
restate-service-client = { workspace = true }
restate-service-protocol = { workspace = true, features = ["discovery"] }
restate-storage-query-datafusion = { workspace = true }
restate-types = { workspace = true, features = ["schemars"] }
restate-wal-protocol = { workspace = true }
restate-web-ui = { workspace = true, optional = true }

anyhow = { workspace = true }
arc-swap = { workspace = true }
arrow-flight = { workspace = true }
axum = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true }
Expand Down
17 changes: 12 additions & 5 deletions crates/admin/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ use http::StatusCode;
use restate_bifrost::Bifrost;
use restate_types::config::AdminOptions;
use restate_types::live::LiveLoad;
use tonic::transport::Channel;
use tower::ServiceBuilder;

use restate_core::metadata_store::MetadataStoreClient;
use restate_core::network::net_util;
use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient;
use restate_core::MetadataWriter;
use restate_service_protocol::discovery::ServiceDiscovery;
use restate_storage_query_datafusion::context::QueryContext;
use restate_types::net::BindAddress;
use restate_types::schema::subscriptions::SubscriptionValidator;

Expand All @@ -36,6 +35,7 @@ pub struct BuildError(#[from] restate_service_client::BuildError);
pub struct AdminService<V> {
bifrost: Bifrost,
schema_registry: SchemaRegistry<V>,
query_context: Option<QueryContext>,
}

impl<V> AdminService<V>
Expand All @@ -48,6 +48,7 @@ where
bifrost: Bifrost,
subscription_validator: V,
service_discovery: ServiceDiscovery,
query_context: Option<QueryContext>,
) -> Self {
Self {
bifrost,
Expand All @@ -57,20 +58,26 @@ where
service_discovery,
subscription_validator,
),
query_context,
}
}

pub async fn run(
self,
mut updateable_config: impl LiveLoad<AdminOptions> + Send + 'static,
node_svc_client: NodeSvcClient<Channel>,
) -> anyhow::Result<()> {
let opts = updateable_config.live_load();

let rest_state = state::AdminServiceState::new(self.schema_registry, self.bifrost);

let query_state = Arc::new(state::QueryServiceState { node_svc_client });
let router = axum::Router::new().merge(storage_query::create_router(query_state));
let router = self
.query_context
.map(|query_context| {
let query_state = Arc::new(state::QueryServiceState { query_context });

axum::Router::new().merge(storage_query::create_router(query_state))
})
.unwrap_or_default();

// Merge Web UI router
#[cfg(feature = "serve-web-ui")]
Expand Down
5 changes: 2 additions & 3 deletions crates/admin/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@

use crate::schema_registry::SchemaRegistry;
use restate_bifrost::Bifrost;
use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient;
use tonic::transport::Channel;
use restate_storage_query_datafusion::context::QueryContext;

#[derive(Clone, derive_builder::Builder)]
pub struct AdminServiceState<V> {
Expand All @@ -22,7 +21,7 @@ pub struct AdminServiceState<V> {

#[derive(Clone)]
pub struct QueryServiceState {
pub node_svc_client: NodeSvcClient<Channel>,
pub query_context: QueryContext,
}

impl<V> AdminServiceState<V> {
Expand Down
6 changes: 3 additions & 3 deletions crates/admin/src/storage_query/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::Json;

use datafusion::error::DataFusionError;
use okapi_operation::anyhow::Error;
use okapi_operation::okapi::map;
use okapi_operation::okapi::openapi3::Responses;
Expand All @@ -23,8 +23,8 @@ use serde::Serialize;
/// and later converted to a response through the IntoResponse implementation
#[derive(Debug, thiserror::Error)]
pub enum StorageQueryError {
#[error("failed grpc: {0}")]
Tonic(#[from] tonic::Status),
#[error("datafusion failed: {0}")]
DataFusion(#[from] DataFusionError),
}

/// # Error description response
Expand Down
121 changes: 30 additions & 91 deletions crates/admin/src/storage_query/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::error::FlightError;
use arrow_flight::FlightData;
use axum::extract::State;
use axum::response::{IntoResponse, Response};
use axum::{http, Json};
Expand All @@ -27,11 +24,12 @@ use datafusion::arrow::datatypes::{ByteArrayType, DataType, Field, FieldRef, Sch
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::ipc::writer::StreamWriter;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::DataFusionError;
use datafusion::execution::SendableRecordBatchStream;
use futures::{ready, Stream, StreamExt, TryStreamExt};
use http_body::Frame;
use http_body_util::StreamBody;
use okapi_operation::*;
use restate_core::network::protobuf::node_svc::StorageQueryRequest;
use schemars::JsonSchema;
use serde::Deserialize;
use serde_with::serde_as;
Expand Down Expand Up @@ -62,27 +60,10 @@ pub async fn query(
State(state): State<Arc<QueryServiceState>>,
#[request_body(required = true)] Json(payload): Json<QueryRequest>,
) -> Result<impl IntoResponse, StorageQueryError> {
let mut worker_grpc_client = state.node_svc_client.clone();

let response_stream = worker_grpc_client
.query_storage(StorageQueryRequest {
query: payload.query,
})
.await?
.into_inner();

let record_batch_stream = FlightRecordBatchStream::new_from_flight_data(
response_stream
.map_ok(|response| FlightData {
data_header: response.header,
data_body: response.data,
..FlightData::default()
})
.map_err(FlightError::from),
);
let record_batch_stream = state.query_context.execute(&payload.query).await?;

// create a stream without LargeUtf8 or LargeBinary columns as JS doesn't support these yet
let result_stream = ConvertRecordBatchStream::new(record_batch_stream).map_ok(Frame::data);
let result_stream = ConvertRecordBatchStream::new(record_batch_stream)?.map_ok(Frame::data);

Ok(Response::builder()
.header(
Expand Down Expand Up @@ -154,76 +135,49 @@ where
)
}

enum ConversionState {
WaitForSchema,
WaitForRecords(SchemaRef, StreamWriter<Vec<u8>>),
}

/// Convert the record batches so that they don't contain LargeUtf8 or LargeBinary columns as JS doesn't
/// support these yet.
struct ConvertRecordBatchStream {
done: bool,
state: ConversionState,

record_batch_stream: FlightRecordBatchStream,
record_batch_stream: SendableRecordBatchStream,
stream_writer: StreamWriter<Vec<u8>>,
schema: SchemaRef,
}

impl ConvertRecordBatchStream {
fn new(record_batch_stream: FlightRecordBatchStream) -> Self {
ConvertRecordBatchStream {
fn new(record_batch_stream: SendableRecordBatchStream) -> Result<Self, DataFusionError> {
let converted_schema = convert_schema(record_batch_stream.schema());
let stream_writer = StreamWriter::try_new(Vec::new(), converted_schema.as_ref())?;

Ok(ConvertRecordBatchStream {
done: false,
state: ConversionState::WaitForSchema,
record_batch_stream,
}
stream_writer,
schema: converted_schema,
})
}
}

impl ConvertRecordBatchStream {
fn create_stream_writer(
record_batch: &RecordBatch,
) -> Result<(SchemaRef, StreamWriter<Vec<u8>>), ArrowError> {
let converted_schema = convert_schema(record_batch.schema());
let stream_writer = StreamWriter::try_new(Vec::new(), converted_schema.as_ref())?;

Ok((converted_schema, stream_writer))
}

fn write_batch(
converted_schema: &SchemaRef,
stream_writer: &mut StreamWriter<Vec<u8>>,
record_batch: RecordBatch,
) -> Result<(), ArrowError> {
let record_batch = convert_record_batch(converted_schema.clone(), record_batch)?;
stream_writer.write(&record_batch)
fn write_batch(&mut self, record_batch: RecordBatch) -> Result<(), ArrowError> {
let record_batch = convert_record_batch(self.schema.clone(), record_batch)?;
self.stream_writer.write(&record_batch)
}

fn process_record(
mut self: Pin<&mut Self>,
record_batch: Result<RecordBatch, FlightError>,
) -> Result<Bytes, FlightError> {
record_batch: Result<RecordBatch, DataFusionError>,
) -> Result<Bytes, DataFusionError> {
let record_batch = record_batch?;
match &mut self.state {
ConversionState::WaitForSchema => {
let (converted_schema, mut stream_writer) =
Self::create_stream_writer(&record_batch)?;
Self::write_batch(&converted_schema, &mut stream_writer, record_batch)?;
let bytes = Bytes::copy_from_slice(stream_writer.get_ref());
stream_writer.get_mut().clear();
self.state = ConversionState::WaitForRecords(converted_schema, stream_writer);
Ok(bytes)
}
ConversionState::WaitForRecords(converted_schema, stream_writer) => {
Self::write_batch(converted_schema, stream_writer, record_batch)?;
let bytes = Bytes::copy_from_slice(stream_writer.get_ref());
stream_writer.get_mut().clear();
Ok(bytes)
}
}
self.write_batch(record_batch)?;
let bytes = Bytes::copy_from_slice(self.stream_writer.get_ref());
self.stream_writer.get_mut().clear();
Ok(bytes)
}
}

impl Stream for ConvertRecordBatchStream {
type Item = Result<Bytes, FlightError>;
type Item = Result<Bytes, DataFusionError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.done {
Expand All @@ -242,27 +196,12 @@ impl Stream for ConvertRecordBatchStream {
}
} else {
self.done = true;
if let ConversionState::WaitForRecords(_, stream_writer) = &mut self.state {
if let Err(err) = stream_writer.finish() {
Poll::Ready(Some(Err(err.into())))
} else {
let bytes = Bytes::copy_from_slice(stream_writer.get_ref());
stream_writer.get_mut().clear();
Poll::Ready(Some(Ok(bytes)))
}
if let Err(err) = self.stream_writer.finish() {
Poll::Ready(Some(Err(err.into())))
} else {
// CLI is expecting schema information
if let (Some(schema), ConversionState::WaitForSchema) =
(self.record_batch_stream.schema(), &self.state)
{
let schema_bytes = StreamWriter::try_new(Vec::new(), schema)
.and_then(|stream_writer| stream_writer.into_inner().map(Bytes::from))
.map_err(FlightError::from);

Poll::Ready(Some(schema_bytes))
} else {
Poll::Ready(None)
}
let bytes = Bytes::copy_from_slice(self.stream_writer.get_ref());
self.stream_writer.get_mut().clear();
Poll::Ready(Some(Ok(bytes)))
}
}
}
Expand Down
13 changes: 1 addition & 12 deletions crates/core/protobuf/node_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ service NodeSvc {
// Get identity information from this node.
rpc GetIdent(google.protobuf.Empty) returns (IdentResponse);

// Queries the storage of the worker and returns the result as a stream of
// responses
rpc QueryStorage(StorageQueryRequest) returns (stream StorageQueryResponse);

// Create a bidirectional node-to-node stream
rpc CreateConnection(stream restate.node.Message)
returns (stream restate.node.Message);
Expand All @@ -45,11 +41,4 @@ message IdentResponse {
uint32 logs_version = 11;
uint32 schema_version = 12;
uint32 partition_table_version = 13;
}

message StorageQueryRequest { string query = 1; }

message StorageQueryResponse {
bytes header = 1;
bytes data = 2;
}
}
1 change: 0 additions & 1 deletion crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ restate-worker = { workspace = true }

anyhow = { workspace = true }
arc-swap = { workspace = true }
arrow-flight = { workspace = true }
async-trait = { workspace = true }
axum = { workspace = true }
bytes = { workspace = true }
Expand Down
Loading

0 comments on commit fdd2c36

Please sign in to comment.