Skip to content

Commit

Permalink
chore(bors): merge pull request #409
Browse files Browse the repository at this point in the history
409: fix(v1/concurrency): enable openapi client concurrency r=tiagolobocastro a=tiagolobocastro

Upgrade openapi-generator which includes a new concurrency_limit option. Added env var "MAX_CONCURRENCY_RPC" to both csi-controller and core agent, with defaults of 10 and 3, respectively.
On develop there is no limit for core, but we haven't tested this before on v1 and that's why we're setting it conservatively to 3.

Signed-off-by: Tiago Castro <[email protected]>

Co-authored-by: Tiago Castro <[email protected]>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Jan 27, 2023
2 parents b852b4f + d5ed581 commit 9a45458
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 69 deletions.
34 changes: 23 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 20 additions & 8 deletions control-plane/agents/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub enum ServiceError {
GetMessageId { channel: Channel, source: Error },
#[snafu(display("Failed to find subscription '{}' on Channel '{}'", id.to_string(), channel.to_string()))]
FindSubscription { channel: Channel, id: MessageId },
#[snafu(display("Subscription Semaphore closed"))]
SubscriptionClosed {},
#[snafu(display("Failed to handle message id '{}' on Channel '{}', details: {}", id.to_string(), channel.to_string(), details))]
HandleMessage {
channel: Channel,
Expand All @@ -58,6 +60,12 @@ pub enum ServiceError {
},
}

impl From<tokio::sync::AcquireError> for ServiceError {
fn from(_: tokio::sync::AcquireError) -> Self {
Self::SubscriptionClosed {}
}
}

/// Runnable service with N subscriptions which listen on a given
/// message bus channel on a specific ID
pub struct Service {
Expand Down Expand Up @@ -323,10 +331,14 @@ impl Service {

// Gated access to a subscription. This means we can concurrently handle CreateVolume and
// GetVolume but we handle CreateVolume one at a time.
let concurrency: usize = std::env::var("MAX_CONCURRENT_RPC")
.ok()
.and_then(|i| i.parse().ok())
.unwrap_or(3usize);
let gated_subs = Arc::new(
subscriptions
.into_iter()
.map(tokio::sync::Mutex::new)
.map(|i| (tokio::sync::Semaphore::new(concurrency), i))
.collect::<Vec<_>>(),
);

Expand Down Expand Up @@ -391,7 +403,7 @@ impl Service {

async fn process_message(
arguments: Arguments<'_>,
subscriptions: &Arc<Vec<tokio::sync::Mutex<Box<dyn ServiceSubscriber>>>>,
subscriptions: &Arc<Vec<(tokio::sync::Semaphore, Box<dyn ServiceSubscriber>)>>,
) -> Result<(), ServiceError> {
let channel = arguments.request.channel();
let id = &arguments.request.id().context(GetMessageId {
Expand All @@ -403,18 +415,18 @@ impl Service {
id: id.clone(),
});
for sub in subscriptions.iter() {
let sub_inner = sub.lock().await;
if sub_inner.filter().iter().any(|find_id| find_id == id) {
if sub.1.filter().iter().any(|find_id| find_id == id) {
subscription = Ok(sub);
break;
}
}
let subscription = subscription?;
let _guard = subscription.0.acquire().await?;

match subscription?
.lock()
.with_context(arguments.request.context())
.await
match subscription
.1
.handler(arguments.clone())
.with_context(arguments.request.context())
.await
{
Ok(_) => Ok(()),
Expand Down
26 changes: 18 additions & 8 deletions control-plane/csi-controller/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,24 @@ impl MayastorApiClient {

let url = clients::tower::Url::parse(endpoint)
.map_err(|error| anyhow!("Invalid API endpoint URL {}: {:?}", endpoint, error))?;
let tower =
clients::tower::Configuration::new(url, Duration::from_secs(5), None, None, true)
.map_err(|error| {
anyhow::anyhow!(
"Failed to create openapi configuration, Error: '{:?}'",
error
)
})?;
let concurrency: usize = std::env::var("MAX_CONCURRENT_RPC")
.ok()
.and_then(|i| i.parse().ok())
.unwrap_or(10usize);
let tower = clients::tower::Configuration::new(
url,
Duration::from_secs(5),
None,
None,
true,
Some(concurrency),
)
.map_err(|error| {
anyhow::anyhow!(
"Failed to create openapi configuration, Error: '{:?}'",
error
)
})?;

REST_CLIENT.get_or_init(|| Self {
rest_client: clients::tower::ApiClient::new(tower),
Expand Down
47 changes: 25 additions & 22 deletions control-plane/csi-controller/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,29 +329,32 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
let vt_mapper = VolumeTopologyMapper::init().await?;

// First check if the volume already exists.
if let Some(existing_volume) = MayastorApiClient::get_client()
.list_volumes()
.await?
.into_iter()
.find(|v| v.spec.uuid == u)
{
check_existing_volume(&existing_volume, replica_count, size, pinned_volume)?;
debug!(
"Volume {} already exists and is compatible with requested config",
volume_uuid
);
} else {
let volume_topology =
CreateVolumeTopology::new(allowed_nodes, preferred_nodes, inclusive_label_topology);

MayastorApiClient::get_client()
.create_volume(&u, replica_count, size, volume_topology, pinned_volume)
.await?;
match MayastorApiClient::get_client().get_volume(&u).await {
Ok(volume) => {
check_existing_volume(&volume, replica_count, size, pinned_volume)?;
debug!(
"Volume {} already exists and is compatible with requested config",
volume_uuid
);
}
// If the volume doesn't exist, create it.
Err(ApiClientError::ResourceNotExists(_)) => {
let volume_topology = CreateVolumeTopology::new(
allowed_nodes,
preferred_nodes,
inclusive_label_topology,
);

MayastorApiClient::get_client()
.create_volume(&u, replica_count, size, volume_topology, pinned_volume)
.await?;

debug!(
"Volume {} successfully created, pinned volume = {}",
volume_uuid, pinned_volume
);
debug!(
"Volume {} successfully created, pinned volume = {}",
volume_uuid, pinned_volume
);
}
Err(e) => return Err(e.into()),
}

let volume = rpc::csi::Volume {
Expand Down
15 changes: 8 additions & 7 deletions control-plane/msp-operator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -912,13 +912,14 @@ async fn pool_controller(args: ArgMatches<'_>) -> anyhow::Result<()> {
let msp: Api<MayastorPool> = Api::namespaced(k8s.clone(), namespace);
let lp = ListParams::default();
let url = Url::parse(args.value_of("endpoint").unwrap()).expect("endpoint is not a valid URL");
let cfg = clients::tower::Configuration::new(url, Duration::from_secs(5), None, None, true)
.map_err(|error| {
anyhow::anyhow!(
"Failed to create openapi configuration, Error: '{:?}'",
error
)
})?;
let cfg =
clients::tower::Configuration::new(url, Duration::from_secs(5), None, None, true, None)
.map_err(|error| {
anyhow::anyhow!(
"Failed to create openapi configuration, Error: '{:?}'",
error
)
})?;

let context = Context::new(OperatorContext {
k8s,
Expand Down
4 changes: 2 additions & 2 deletions control-plane/rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl RestClient {
let cert_file = &std::include_bytes!("../certs/rsa/ca.cert")[..];

let openapi_client_config =
client::Configuration::new(url, timeout, bearer_token, Some(cert_file), trace)
client::Configuration::new(url, timeout, bearer_token, Some(cert_file), trace, None)
.map_err(|e| anyhow::anyhow!("Failed to create rest client config: '{:?}'", e))?;
let openapi_client = client::direct::ApiClient::new(openapi_client_config);

Expand All @@ -76,7 +76,7 @@ impl RestClient {
trace: bool,
) -> anyhow::Result<Self> {
let openapi_client_config =
client::Configuration::new(url, timeout, bearer_token, None, trace)
client::Configuration::new(url, timeout, bearer_token, None, trace, None)
.map_err(|e| anyhow::anyhow!("Failed to create rest client config: '{:?}'", e))?;
let openapi_client = client::direct::ApiClient::new(openapi_client_config);
Ok(Self {
Expand Down
2 changes: 1 addition & 1 deletion kubectl-plugin/src/rest_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl RestClient {
url.set_port(Some(30011))
.map_err(|_| anyhow::anyhow!("Failed to set REST client port"))?;
}
let cfg = Configuration::new(url, timeout, None, None, true).map_err(|error| {
let cfg = Configuration::new(url, timeout, None, None, true, None).map_err(|error| {
anyhow::anyhow!(
"Failed to create openapi configuration, Error: '{:?}'",
error
Expand Down
11 changes: 7 additions & 4 deletions nix/pkgs/openapi-generator/default.nix
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{ pkgs, lib, stdenv, fetchFromGitHub, maven, jdk, jre, makeWrapper }:

let
src = fetchFromGitHub (lib.importJSON ./source.json);
version = "5.2.1-${src.rev}";
version = "6.1.0-${src.rev}";

# perform fake build to make a fixed-output derivation out of the files downloaded from maven central
deps = stdenv.mkDerivation {
Expand All @@ -19,10 +20,11 @@ let
runHook postBuild
'';
# keep only *.{pom,jar,sha1,nbm} and delete all ephemeral files with lastModified timestamps inside
installPhase = ''find $out/.m2 -type f -regex '.+\(\.lastUpdated\|resolver-status\.properties\|_remote\.repositories\)' -delete'';
installPhase =
"find $out/.m2 -type f -regex '.+\\(\\.lastUpdated\\|resolver-status\\.properties\\|_remote\\.repositories\\)' -delete";
outputHashAlgo = "sha256";
outputHashMode = "recursive";
outputHash = "0f30vfvqrwa4gdgid9c94kvv83yfrgpx6ii1npjxspdawqr3whrj";
outputHash = if stdenv.hostPlatform.isDarwin then "sha256-9Li0uSD39ZwptIRgOXeBkLeZvfy/9w69faNDm75zdws=" else "sha256-MieSA5Y8u35H1xdP27A+YDekyyQ6CThNXOjQ82ArM7U=";
};
in
stdenv.mkDerivation rec {
Expand Down Expand Up @@ -56,7 +58,8 @@ stdenv.mkDerivation rec {
'';

meta = with lib; {
description = "Allows generation of API client libraries (SDK generation), server stubs and documentation automatically given an OpenAPI Spec";
description =
"Allows generation of API client libraries (SDK generation), server stubs and documentation automatically given an OpenAPI Spec";
homepage = "https://github.com/openebs/openapi-generator";
license = licenses.asl20;
maintainers = [ maintainers.tiagolobocastro ];
Expand Down
4 changes: 2 additions & 2 deletions nix/pkgs/openapi-generator/source.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"owner": "openebs",
"repo": "openapi-generator",
"rev": "27b71d9cc49bc24c8af83e4fb2b693c5350c397f",
"sha256": "00axipm0nkhp05dqiyzvgflwdfdwj4kw9q22g39mb061piwm9zqc"
"rev": "c40274969da7ed4f04686ac0224f47b157ae0284",
"hash": "sha256-Vcre5OoCcut7jIIH8eijZ2tnVezEvHxjFdYrsV5BdFw="
}
2 changes: 1 addition & 1 deletion openapi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ awc = { version = "3.0.0-beta.7", optional = true }

# tower and hyper dependencies
hyper = { version = "0.14.13", features = [ "client", "http1", "http2", "tcp", "stream" ], optional = true }
tower = { version = "0.4.8", features = [ "timeout", "util" ], optional = true }
tower = { version = "0.4.13", features = [ "timeout", "util", "limit" ], optional = true }
tower-http = { version = "0.1.1", features = [ "trace", "map-response-body", "auth" ], optional = true }
bytes = {version = "1.1.0", optional = true }
tokio = { version = "1.12.0", features = ["full"], optional = true }
Expand Down
2 changes: 1 addition & 1 deletion scripts/generate-openapi-bindings.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fi
tmpd=$(mktemp -d /tmp/openapi-gen-XXXXXXX)

# Generate a new openapi crate
openapi-generator-cli generate -i "$SPEC" -g rust-mayastor -o "$tmpd" --additional-properties actixWebVersion="4.0.0-beta.8" --additional-properties actixWebTelemetryVersion='"0.11.0-beta.4"'
openapi-generator-cli generate -i "$SPEC" -g rust-mayastor -o "$tmpd" --additional-properties actixWeb4Beta="True"

if [[ $default_toml = "no" ]]; then
cp "$CARGO_TOML" "$tmpd"
Expand Down
4 changes: 2 additions & 2 deletions tests/tests-mayastor/src/rest_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl RestClient {
let cert_file = &std::include_bytes!("../../../control-plane/rest/certs/rsa/ca.cert")[..];

let openapi_client_config =
client::Configuration::new(url, timeout, bearer_token, Some(cert_file), trace)
client::Configuration::new(url, timeout, bearer_token, Some(cert_file), trace, None)
.map_err(|e| anyhow::anyhow!("Failed to create rest client config: '{:?}'", e))?;
let openapi_client = client::direct::ApiClient::new(openapi_client_config);

Expand All @@ -58,7 +58,7 @@ impl RestClient {
trace: bool,
) -> anyhow::Result<Self> {
let openapi_client_config =
client::Configuration::new(url, timeout, bearer_token, None, trace)
client::Configuration::new(url, timeout, bearer_token, None, trace, None)
.map_err(|e| anyhow::anyhow!("Failed to create rest client config: '{:?}'", e))?;
let openapi_client = client::direct::ApiClient::new(openapi_client_config);
Ok(Self {
Expand Down

0 comments on commit 9a45458

Please sign in to comment.