From 124b367019e8183f9f8e3c747eec0f0ca096dc0d Mon Sep 17 00:00:00 2001 From: Matthew Stavola Date: Sun, 27 May 2018 01:37:28 -0700 Subject: [PATCH 1/5] Upgrade to 0.6 and compile --- Cargo.toml | 4 +-- src/parser.rs | 81 +++++++++++++++++++++++++------------------------- src/rpc.rs | 43 +++++++++++++++++---------- src/util.rs | 6 +++- src/watcher.rs | 2 +- 5 files changed, 75 insertions(+), 61 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1c28941..376a6f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,6 @@ tokio-core = "^0.1" serde_json = "^1.0.0" error-chain = "^0.11.0" serde_derive = "^1.0.0" +sozu-command-lib = "0.6" pretty_env_logger = "^0.1.1" -sozu-command-lib = { git = "https://github.com/sozu-proxy/sozu" } -sozu-command-futures = { git = "https://github.com/sozu-proxy/sozu-command-futures" } \ No newline at end of file +sozu-command-futures = "0.6" \ No newline at end of file diff --git a/src/parser.rs b/src/parser.rs index 742f85e..48e3340 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -1,11 +1,29 @@ use toml; -use sozu_command::config::Config; -use sozu_command::state::ConfigState; -use sozu_command::certificate::{calculate_fingerprint, split_certificate_chain}; -use sozu_command::messages::{Application, CertificateAndKey, CertFingerprint, HttpFront, HttpsFront, Instance, Order}; - -use std::path::PathBuf; -use std::collections::{HashMap, HashSet}; +use sozu_command::{ + state::ConfigState, + certificate::{ + calculate_fingerprint, + split_certificate_chain + }, + config::{ + Config, + ProxyProtocolConfig + }, + messages::{ + Application, + AddCertificate, + CertificateAndKey, + CertFingerprint, + HttpFront, + HttpsFront, + Order, + }, +}; + +use std::{ + path::PathBuf, + collections::{HashMap, HashSet}, +}; use util::errors::*; @@ -28,9 +46,13 @@ fn parse_config(data: &str) -> Result { { let sticky_session = routing_config.sticky_session.unwrap_or(false); + let https_redirect = routing_config.https_redirect.unwrap_or(false); + let add_instance = &Order::AddApplication(Application { app_id: app_id.clone(), - sticky_session + proxy_protocol: routing_config.proxy_protocol, + sticky_session, + https_redirect, }); state.handle_order(add_instance); @@ -64,16 +86,20 @@ fn parse_config(data: &str) -> Result { let certificate_and_key = CertificateAndKey { certificate, key, - certificate_chain + certificate_chain, }; let fingerprint: CertFingerprint; { - let bytes = calculate_fingerprint(&certificate_and_key.certificate.as_bytes()[..])?; + let bytes = calculate_fingerprint(&certificate_and_key.certificate.as_bytes()[..]) + .ok_or_else(|| ErrorKind::FingerprintError)?; fingerprint = CertFingerprint(bytes); } - let add_certificate = &Order::AddCertificate(certificate_and_key); + let add_certificate = &Order::AddCertificate(AddCertificate { + certificate: certificate_and_key, + names: vec![hostname.clone()], + }); let add_https_front = &Order::AddHttpsFront(HttpsFront { app_id: app_id.clone(), hostname: hostname.clone(), @@ -84,35 +110,6 @@ fn parse_config(data: &str) -> Result { state.handle_order(add_certificate); state.handle_order(add_https_front); } - - { - let authorities = routing_config.backends.iter().map(|authority| { - let mut split = authority.split(':'); - - match (split.next(), split.next()) { - (Some(host), Some(port)) => { - port.parse::().map(|port| (host.to_owned(), port)) - .chain_err(|| ErrorKind::ParseError("Could not parse port".to_owned())) - } - (Some(host), None) => Ok((host.to_owned(), 80)), - _ => Err(ErrorKind::ParseError("Missing host".to_owned()).into()) - } - }).collect::>>()?; - - let add_instances: Vec = authorities.iter().map(|authority| { - let (ref host, port): (String, u16) = *authority; - - Order::AddInstance(Instance { - app_id: app_id.clone(), - ip_address: host.clone(), - port - }) - }).collect(); - - for order in add_instances { - state.handle_order(&order); - } - } } } @@ -128,5 +125,7 @@ struct RoutingConfig<'a> { certificate_chain: Option<&'a str>, frontends: HashSet<&'a str>, backends: Vec<&'a str>, - sticky_session: Option + sticky_session: Option, + https_redirect: Option, + proxy_protocol: Option, } \ No newline at end of file diff --git a/src/rpc.rs b/src/rpc.rs index 48160cd..3439376 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,14 +1,24 @@ use serde_json; -use futures::future; -use futures::IntoFuture; use tokio_uds::UnixStream; use rand::{thread_rng, Rng}; -use futures::future::Future; use command::SozuCommandClient; use tokio_core::reactor::Handle; -use sozu_command::messages::Order; -use sozu_command::state::ConfigState; -use sozu_command::data::{ConfigCommand, ConfigMessage, ConfigMessageStatus}; + +use futures::{ + future, + IntoFuture, + future::Future, +}; + +use sozu_command::{ + messages::Order, + state::ConfigState, + data::{ + ConfigCommand, + ConfigMessage, + ConfigMessageStatus, + }, +}; use util::errors::*; @@ -17,8 +27,12 @@ fn generate_id() -> String { format!("ID-{}", s) } -pub fn execute_orders(socket_path: &str, handle: &Handle, orders: &[Order]) -> Result, Error=Error>>> { - let stream = UnixStream::connect(socket_path, handle)?; +pub fn execute_orders(socket_path: &str, handle: &Handle, orders: &[Order]) -> Box, Error=Error>> { + let stream = match UnixStream::connect(socket_path, handle) { + Ok(stream) => stream, + Err(e) => return Box::new(future::err(e.into())) + }; + let mut client = SozuCommandClient::new(stream); let mut message_futures: Vec>> = Vec::new(); @@ -27,7 +41,7 @@ pub fn execute_orders(socket_path: &str, handle: &Handle, orders: &[Order]) -> R let message = ConfigMessage::new( id.clone(), ConfigCommand::ProxyConfiguration(order.clone()), - None + None, ); let order = order.clone(); @@ -57,10 +71,9 @@ pub fn execute_orders(socket_path: &str, handle: &Handle, orders: &[Order]) -> R let (item, action) = match order { Order::AddApplication(_) => ("Application", "added"), Order::RemoveApplication(_) => ("Application", "removed"), - Order::AddInstance(_) => ("Backend", "added"), - Order::RemoveInstance(_) => ("Backend", "removed"), Order::AddCertificate(_) => ("Certificate", "added"), Order::RemoveCertificate(_) => ("Certificate", "removed"), + Order::ReplaceCertificate(_) => ("Certificate", "replaced"), Order::AddHttpFront(_) => ("HTTP front", "added"), Order::RemoveHttpFront(_) => ("HTTP front", "removed"), Order::AddHttpsFront(_) => ("HTTPS front", "added"), @@ -81,9 +94,7 @@ pub fn execute_orders(socket_path: &str, handle: &Handle, orders: &[Order]) -> R message_futures.push(Box::new(future)); } - let future = future::join_all(message_futures).into_future(); - - Ok(Box::new(future)) + Box::new(future::join_all(message_futures).into_future()) } pub fn get_config_state(socket_path: &str, handle: &Handle) -> Result>> { @@ -93,7 +104,7 @@ pub fn get_config_state(socket_path: &str, handle: &Handle) -> Result Result { id: &'a str, - state: ConfigState + state: ConfigState, } \ No newline at end of file diff --git a/src/util.rs b/src/util.rs index f13cbd6..9ce4d0f 100644 --- a/src/util.rs +++ b/src/util.rs @@ -4,8 +4,8 @@ pub mod errors { use std::io; use toml::de; use serde_json; - use std::sync::mpsc; + use std::sync::mpsc; use std::path::PathBuf; error_chain! { @@ -39,6 +39,10 @@ pub mod errors { description("the proxy encountered an error") display("Proxy responded with an error: {}.", error) } + FingerprintError { + description("could not calculate fingerprint from cert") + display("Unable to calculate a fingerprint for the provided certificate.") + } } } } \ No newline at end of file diff --git a/src/watcher.rs b/src/watcher.rs index 2fc2fb2..e1678cb 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -41,7 +41,7 @@ pub fn watch(application_file: &str, socket_path: &str, watch_interval: Duration if !orders.is_empty() { info!("Sending new configuration to server."); - let execution_future = execute_orders(socket_path, &handle, &orders)?; + let execution_future = execute_orders(socket_path, &handle, &orders); core.run(execution_future)?; } else { warn!("No changes made."); From 437f55a00a44be4246a458b3c6ad1bd43583326c Mon Sep 17 00:00:00 2001 From: Matthew Stavola Date: Sun, 27 May 2018 01:40:21 -0700 Subject: [PATCH 2/5] Convert Result -> Future --- src/rpc.rs | 10 +++++++--- src/watcher.rs | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/rpc.rs b/src/rpc.rs index 3439376..856b704 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -97,8 +97,12 @@ pub fn execute_orders(socket_path: &str, handle: &Handle, orders: &[Order]) -> B Box::new(future::join_all(message_futures).into_future()) } -pub fn get_config_state(socket_path: &str, handle: &Handle) -> Result>> { - let stream = UnixStream::connect(socket_path, handle)?; +pub fn get_config_state(socket_path: &str, handle: &Handle) -> Box> { + let stream = match UnixStream::connect(socket_path, handle) { + Ok(stream) => stream, + Err(e) => return Box::new(future::err(e.into())) + }; + let mut client = SozuCommandClient::new(stream); let message = ConfigMessage::new( @@ -124,7 +128,7 @@ pub fn get_config_state(socket_path: &str, handle: &Handle) -> Result { info!("Retrieving current proxy state."); - let orders_future = get_config_state(socket_path, &handle)? + let orders_future = get_config_state(socket_path, &handle) .and_then(|current_state| { info!("Current proxy state retrieved, generating orders."); future::ok(current_state.diff(&new_state)) From c6c614a1e6238820a842f3d61383ed6d811f6aa9 Mon Sep 17 00:00:00 2001 From: Matthew Stavola Date: Sun, 27 May 2018 12:45:22 -0700 Subject: [PATCH 3/5] Update config file and log crate versions --- Cargo.toml | 4 +- assets/config.toml | 182 +++++++++++++++++++++++++++++++++++++++------ src/main.rs | 2 +- 3 files changed, 164 insertions(+), 24 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 376a6f5..563bf4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ license = "AGPL-3.0" categories = ["network-programming"] [dependencies] -log = "^0.3.8" +log = "^0.4.1" clap = "^2.19.0" serde = "^1.0.0" toml = "^0.4" @@ -25,5 +25,5 @@ serde_json = "^1.0.0" error-chain = "^0.11.0" serde_derive = "^1.0.0" sozu-command-lib = "0.6" -pretty_env_logger = "^0.1.1" +pretty_env_logger = "^0.2.3" sozu-command-futures = "0.6" \ No newline at end of file diff --git a/assets/config.toml b/assets/config.toml index 3abca6a..a1d89f7 100644 --- a/assets/config.toml +++ b/assets/config.toml @@ -1,51 +1,191 @@ # sozu proxy test config file +# top level options + +# path to a file sozu can use to load an initial configuration state for its +# routing. You can generate this file from sozu's current routing by running +# the command `sozuctl state save -f state.json` +saved_state = "assets/state.json" + +# logging verbosity. Possible values are "error", "warn", "info", "debug" and +# "trace". For performance reasons, the logs at "debug" or "trace" level are +# not compiled by default. To activate them, pass the "logs-debug" and +# "logs-trace" compilation options to cargo +log_level = "info" + +# where the logs will be sent. It defaults to sending the logs on standard output, +# but they could be written to a UDP address: +# log_target = "udp://127.0.0.1:9876" +# to a TCP address: +# log_target = "tcp://127.0.0.1:9876" +# to a unix socket +# log_target = "unix:///var/sozu/logs +# to a file +# log_target = "file:///var/logs/sozu.log" +# to_stdout +log_target = "stdout" + +# optional different target for access logs (IP addresses, domains, URI, HTTP status, etc) +# It supports the same options as log_target +# log_access_target = "file:///var/logs/sozu-access.log" + +# path to the unix socket file used to send commands to sozu command_socket = "assets/socket/sock" -saved_state = "./state.json" -log_level = "info" -log_target = "stdout" -#log_target = "udp://127.0.0.1:9876" -#log_target = "tcp://127.0.0.1:9876" + +# size in bytes of the buffer used by the command socket protocol. If the message +# sent to sozu is too large, or the data that sozu must return is too large, the +# buffer will grow up to max_command_buffer_size. If the buffer is still not large +# enough sozu will close the connection +# defaults to 1000000 command_buffer_size = 16384 +# defaults to command_buffer_size * 2 +max_command_buffer_size = 163840 + +# the number of worker processes that will handle traffic worker_count = 2 + +# indicates if workers should be automatically restarted if they crash / hang +# should be true for production and false for development +# defaults to true +worker_automatic_restart = true + +# indicates if worker process will be pinned on a core. If you activate this, be sure +# that you do not have more workers than CPU cores (and leave at least one core for +# the kernel and the master process) handle_process_affinity = false -[metrics] -address = "127.0.0.1" -port = 8125 +# maximum number of connections to a worker. If it reached that number and +# there are new connections available, the worker will accept and close them +# immediately to indicate it is too busy to handle traffic +max_connections = 500 +# maximum number of buffers used by the protocol implementations for active +# connections (ie currently serving a request). For now, you should estimate +# that max_buffers = number of concurrent requests * 2 +max_buffers = 500 + +# size of the buffers used by the protocol implementations. Each worker will +# preallocate max_buffers * 2 * buffer_size bytes, so you should plan for this +# memory usage. If you plan to use sozu's runtime upgrade feature, you should +# leave enough memory for one more worker (also for the kernel, etc), so total +# RAM should be larger than (worker count + 1) * max_buffers * 2 * buffer_size bytes +buffer_size = 16384 + +# how much time (in milliseconds) sozuctl will wait for a command to complete. +# Defaults to 1000 milliseconds +#ctl_command_timeout = 1000 + +# PID file is a file containing the PID of the master process of sozu. +# It can be helpful to help systemd or any other service system to keep track +# of the main process across upgrades. PID file is not created unless this option +# is set or if SOZU_PID_FILE_PATH environment variable was defined at build time. +# pid_file_path = "/run/sozu/sozu.pid" + +# various statistics can be sent to a server that supports the statsd protocol +# You can see those statistics with sozuctl, like this: `sozuctl metrics` or +# `sozuctl metrics --json` for machine consumption +# +#[metrics] +# address = "127.0.0.1" +# port = 8125 +# use InfluxDB's statsd protocol flavor to add tags +# tagged_metrics = false + +# options specific to the HTTP (plaintext) proxy [http] +# listening IP address = "127.0.0.1" -max_connections = 500 +# listening port port = 8080 -buffer_size = 16384 -#answer_404 = "./404.html" -#answer_503 = "./503.html" +# path to custom 404 and 503 answers +# a 404 response is sent when sozu does not know about the requested domain or path +# a 503 response is sent if there are no backend servers available +#answer_404 = "../lib/assets/404.html" +#answer_503 = "../lib/assets/503.html" + +# options specific to the HTTPS (OpenSSL basedor rustls based) proxy [https] address = "127.0.0.1" -max_connections = 500 port = 8443 -buffer_size = 16384 -default_app_id = "MyApp" + +#answer_404 = "../lib/assets/404.html" +#answer_503 = "../lib/assets/503.html" + +# defines how the TLS protocol will be handled by sozu. Possible values +# are "openssl" or "rustls". The "openssl" option will only work if sozu +# was built with the "use-openssl" feature. +# tls_provider = "rustls" + +# supported TLS versions. Possible values are "SSLv2", "SSLv3", "TLSv1", +# "TLSv1.1", "TLSv1.2". Defaults to "TLSv1.2" #tls_versions = ["TLSv1.2"] + +# cipher combinations used by OpenSSL #cipher_list = "ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256" + +# application whose certificate will be used by default by OpenSSL +default_app_id = "MyApp" + +# default certificate used by OpenSSL #default_certificate = "../lib/assets/cert_test.pem" +# default certificate chain used by OpenSSL #default_certificate_chain = "../lib/assets/cert__chain_test.pem" +# default key used by OpenSSL #default_key = "../lib/assets/key_test.pem" -#answer_404 = "../lib/assets/404.html" -#answer_503 = "../lib/assets/503.html" +# options specific to the TCP proxy +# +# there is nothing here for now, but to activate the TCP proxy, +# uncomment that section +# [tcp] +# static configuration for applications +# +# those applications will be routed by sozu directly from start [applications] +# every application has an "application id", here it is "MyApp" +# this is an example of a routing configuration for the HTTP and HTTPS proxies [applications.MyApp] -hostname = "lolcatho.st" + +# host name of the application +hostname = "lolcatho.st" + +# an application can receive requests going to a hostname and path prefix #path_begin = "/api" # optional + +# list of backend servers for this application +backends = [ "127.0.0.1:1026" ] # list of IP/port + +# activates sticky sessions for this application +#sticky_session = false +# +# activates automatic redirection to HTTPS for this application +#https_redirect = false + +# TLS related options for the application. If those are active and the HTTPS +# proxy is activated too, the application will be reachable through TLS +# +# certificate for this application certificate = "assets/certs/certificate.pem" # optional +# key for this application key = "assets/certs/key.pem" # optional +# certificate chain for this application certificate_chain = "assets/certs/certificate_chain.pem" # optional -frontends = ["HTTP", "HTTPS"] # list of proxy tags -backends = [ "127.0.0.1:1026" ] # list of IP/port -#sticky_session = true +# this is an example of a routing configuration for the TCP proxy +[applications.TcpTest] +# IP address for the listener +ip_address = "127.0.0.1" +# port for the listener +port = 8081 + +# list of backend servers for this application +backends = [ "127.0.0.1:1026" ] # list of IP/port + +# activates the proxy protocol to send IP information to the backend +# send_proxy = false + +# Configures the client-facing connection to receive a PROXY protocol header +# expect_proxy = false \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 3d839d0..c150c35 100644 --- a/src/main.rs +++ b/src/main.rs @@ -34,7 +34,7 @@ use sozu_command::config::Config; use std::time::Duration; fn main() { - pretty_env_logger::init().unwrap(); + pretty_env_logger::init(); let matches = App::new("sozuconfw") .version(crate_version!()) From 9c4c276b56512978ba1bf8d6bff279a4b85534ee Mon Sep 17 00:00:00 2001 From: Matthew Stavola Date: Sat, 23 Jun 2018 23:53:44 -0700 Subject: [PATCH 4/5] error-chain -> failure --- Cargo.toml | 2 +- src/main.rs | 10 ++++---- src/parser.rs | 42 +++++++++++++++++++------------ src/rpc.rs | 17 +++++++------ src/util.rs | 67 +++++++++++++++++--------------------------------- src/watcher.rs | 17 +++++++------ 6 files changed, 73 insertions(+), 82 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 563bf4b..f83c573 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,12 +17,12 @@ toml = "^0.4" rand = "^0.3.16" notify = "^4.0.0" openssl = "^0.9.0" +failure = "^0.1.1" futures = "^0.1" tokio-io = "^0.1" tokio-uds = "^0.1" tokio-core = "^0.1" serde_json = "^1.0.0" -error-chain = "^0.11.0" serde_derive = "^1.0.0" sozu-command-lib = "0.6" pretty_env_logger = "^0.2.3" diff --git a/src/main.rs b/src/main.rs index c150c35..5440a4c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,14 @@ #[macro_use] -extern crate clap; +extern crate log; #[macro_use] -extern crate serde_derive; +extern crate clap; #[macro_use] -extern crate error_chain; +extern crate failure; #[macro_use] -extern crate log; +extern crate serde_derive; extern crate toml; extern crate rand; @@ -83,7 +83,7 @@ fn main() { info!("Exiting sozuconfw"); } Err(err) => { - error!("{}", err.0); + error!("{}", err); } }; } diff --git a/src/parser.rs b/src/parser.rs index 48e3340..77c8c9f 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -1,13 +1,14 @@ use toml; +use failure::{Error, err_msg}; use sozu_command::{ state::ConfigState, certificate::{ calculate_fingerprint, - split_certificate_chain + split_certificate_chain, }, config::{ Config, - ProxyProtocolConfig + ProxyProtocolConfig, }, messages::{ Application, @@ -25,16 +26,16 @@ use std::{ collections::{HashMap, HashSet}, }; -use util::errors::*; +use util::{OperationError, ParseError}; -pub fn parse_config_file(path: &PathBuf) -> Result { - let path = path.to_str().ok_or_else(|| ErrorKind::InvalidPath(path.to_path_buf()))?; +pub fn parse_config_file(path: &PathBuf) -> Result { + let path = path.to_str().ok_or_else(|| OperationError::FileLoad(path.to_path_buf()))?; let data = Config::load_file(path)?; parse_config(&data) } -fn parse_config(data: &str) -> Result { +fn parse_config(data: &str) -> Result { let mut state = ConfigState::new(); let app_map: HashMap> = toml::from_str(data)?; @@ -69,17 +70,26 @@ fn parse_config(data: &str) -> Result { } if routing_config.frontends.contains(&"HTTPS") { - let certificate = routing_config.certificate - .ok_or_else(|| ErrorKind::MissingItem("Certificate".to_string()).into()) - .and_then(|path| Config::load_file(path).chain_err(|| ErrorKind::FileLoad(path.to_string())))?; - - let key = routing_config.key - .ok_or_else(|| ErrorKind::MissingItem("Key".to_string()).into()) - .and_then(|path| Config::load_file(path).chain_err(|| ErrorKind::FileLoad(path.to_string())))?; + let certificate: String = routing_config.certificate + .ok_or_else(|| { + let new_error: Error = ParseError::MissingItem("Certificate".to_string()).into(); + new_error + }) + .and_then(|path| Config::load_file(path).map_err(|e| e.into()))?; + + let key: String = routing_config.key + .ok_or_else(|| { + let new_error: Error = ParseError::MissingItem("Key".to_string()).into(); + new_error + }) + .and_then(|path| Config::load_file(path).map_err(|e| e.into()))?; let certificate_chain = routing_config.certificate_chain - .ok_or_else(|| ErrorKind::MissingItem("Certificate Chain".to_string()).into()) - .and_then(|path| Config::load_file(path).chain_err(|| ErrorKind::FileLoad(path.to_string()))) + .ok_or_else(|| { + let new_error: Error = ParseError::MissingItem("Certificate Chain".to_string()).into(); + new_error + }) + .and_then(|path| Config::load_file(path).map_err(|e| e.into())) .map(split_certificate_chain) .unwrap_or_default(); @@ -92,7 +102,7 @@ fn parse_config(data: &str) -> Result { let fingerprint: CertFingerprint; { let bytes = calculate_fingerprint(&certificate_and_key.certificate.as_bytes()[..]) - .ok_or_else(|| ErrorKind::FingerprintError)?; + .ok_or_else(|| err_msg("Could not calculate fingerprint for cert and key"))?; fingerprint = CertFingerprint(bytes); } diff --git a/src/rpc.rs b/src/rpc.rs index 856b704..eefece8 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,4 +1,5 @@ use serde_json; +use failure::Error; use tokio_uds::UnixStream; use rand::{thread_rng, Rng}; use command::SozuCommandClient; @@ -20,14 +21,14 @@ use sozu_command::{ }, }; -use util::errors::*; +use util::RpcError; fn generate_id() -> String { let s: String = thread_rng().gen_ascii_chars().take(6).collect(); format!("ID-{}", s) } -pub fn execute_orders(socket_path: &str, handle: &Handle, orders: &[Order]) -> Box, Error=Error>> { +pub fn execute_orders(socket_path: &str, handle: &Handle, orders: &[Order]) -> Box, Error=Error>> { let stream = match UnixStream::connect(socket_path, handle) { Ok(stream) => stream, Err(e) => return Box::new(future::err(e.into())) @@ -35,7 +36,7 @@ pub fn execute_orders(socket_path: &str, handle: &Handle, orders: &[Order]) -> B let mut client = SozuCommandClient::new(stream); - let mut message_futures: Vec>> = Vec::new(); + let mut message_futures: Vec>> = Vec::new(); for order in orders { let id = generate_id(); let message = ConfigMessage::new( @@ -53,7 +54,7 @@ pub fn execute_orders(socket_path: &str, handle: &Handle, orders: &[Order]) -> B .and_then(move |response| { if id != response.id { error!("Received message with invalid id: {:?}.", response); - return Err(ErrorKind::ProxyError("Invalid message ID".to_string()).into()); + return Err(RpcError::MalformedMessage("Invalid message ID".to_string()).into()); } match response.status { @@ -65,7 +66,7 @@ pub fn execute_orders(socket_path: &str, handle: &Handle, orders: &[Order]) -> B } ConfigMessageStatus::Error => { error!("Could not execute order: {}", response.message); - Err(ErrorKind::ProxyError(response.message).into()) + Err(RpcError::ExecutionFailure(response.message).into()) } ConfigMessageStatus::Ok => { let (item, action) = match order { @@ -80,7 +81,7 @@ pub fn execute_orders(socket_path: &str, handle: &Handle, orders: &[Order]) -> B Order::RemoveHttpsFront(_) => ("HTTPS front", "removed"), order => { warn!("Unsupported order: {:?}", order); - return Err(ErrorKind::ProxyError("Unsupported order".to_string()).into()); + return Err(RpcError::UnsupportedOrder(order).into()); } }; @@ -97,7 +98,7 @@ pub fn execute_orders(socket_path: &str, handle: &Handle, orders: &[Order]) -> B Box::new(future::join_all(message_futures).into_future()) } -pub fn get_config_state(socket_path: &str, handle: &Handle) -> Box> { +pub fn get_config_state(socket_path: &str, handle: &Handle) -> Box> { let stream = match UnixStream::connect(socket_path, handle) { Ok(stream) => stream, Err(e) => return Box::new(future::err(e.into())) @@ -117,7 +118,7 @@ pub fn get_config_state(socket_path: &str, handle: &Handle) -> Box = serde_json::from_str(&answer.message) + let config_state: Result = serde_json::from_str(&answer.message) .map(|config_state: ConfigStateResponse| config_state.state) .map_err(|e| { let new_error: Error = e.into(); diff --git a/src/util.rs b/src/util.rs index 9ce4d0f..534b6e3 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,48 +1,27 @@ -pub mod errors { - use notify; - use openssl; - use std::io; - use toml::de; - use serde_json; +use sozu_command::messages::Order; - use std::sync::mpsc; - use std::path::PathBuf; +use std::path::PathBuf; - error_chain! { - foreign_links { - Io(io::Error); - Toml(de::Error); - Notify(notify::Error); - Json(serde_json::Error); - Channel(mpsc::RecvError); - OpenSSL(openssl::error::ErrorStack); - } +#[derive(Debug, Fail)] +pub enum OperationError { + #[fail(display = "path provided was not valid: {:?}", _0)] + InvalidPath(PathBuf), + #[fail(display = "could not load file: {:?}", _0)] + FileLoad(PathBuf), +} - errors { - InvalidPath(path: PathBuf) { - description("path is invalid") - display("Path '{:?}' is invalid.", path) - } - FileLoad(filename: String) { - description("could not load file") - display("File '{}' could not be loaded.", filename) - } - ParseError(issue: String) { - description("encountered error while parsing") - display("Parse error: {}.", issue) - } - MissingItem(item: String) { - description("missing required item") - display("Item `{}` required, but not present.", item) - } - ProxyError(error: String) { - description("the proxy encountered an error") - display("Proxy responded with an error: {}.", error) - } - FingerprintError { - description("could not calculate fingerprint from cert") - display("Unable to calculate a fingerprint for the provided certificate.") - } - } - } +#[derive(Debug, Fail)] +pub enum ParseError { + #[fail(display = "missing required item: {}", _0)] + MissingItem(String) +} + +#[derive(Debug, Fail)] +pub enum RpcError { + #[fail(display = "message wasn't properly formed: {}", _0)] + MalformedMessage(String), + #[fail(display = "failed to execute order: {}", _0)] + ExecutionFailure(String), + #[fail(display = "unknown order: {:?}", _0)] + UnsupportedOrder(Order), } \ No newline at end of file diff --git a/src/watcher.rs b/src/watcher.rs index 2ddf98a..c77da30 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -1,3 +1,4 @@ +use failure::Error; use futures::Future; use futures::future; use tokio_core::reactor::Core; @@ -6,11 +7,11 @@ use notify::{RecommendedWatcher, Watcher, RecursiveMode, DebouncedEvent}; use std::time::Duration; use std::sync::mpsc::channel; -use util::errors::*; +use util::OperationError; use parser::parse_config_file; use rpc::{get_config_state, execute_orders}; -pub fn watch(application_file: &str, socket_path: &str, watch_interval: Duration) -> Result<()> { +pub fn watch(application_file: &str, socket_path: &str, watch_interval: Duration) -> Result<(), Error> { let (tx, rx) = channel(); info!("Watching file `{}`. Updating every {} second(s).", application_file, watch_interval.as_secs()); @@ -55,13 +56,13 @@ pub fn watch(application_file: &str, socket_path: &str, watch_interval: Duration } DebouncedEvent::Rename(old_path, new_path) => { // Track changed filename - info!("File renamed:\n\tOld path: {}\n\tNew path: {}.", - old_path.to_str().ok_or_else(|| ErrorKind::InvalidPath(old_path.clone()))?, - new_path.to_str().ok_or_else(|| ErrorKind::InvalidPath(new_path.clone()))? - ); + let old_path_str = old_path.to_str().ok_or_else(|| OperationError::InvalidPath(old_path.clone()))?; + let new_path_str = new_path.to_str().ok_or_else(|| OperationError::InvalidPath(new_path.clone()))?; - watcher.unwatch(old_path)?; - watcher.watch(new_path, RecursiveMode::NonRecursive)?; + info!("File renamed:\n\tOld path: {}\n\tNew path: {}.", old_path_str, new_path_str); + + watcher.unwatch(old_path.clone())?; + watcher.watch(new_path.clone(), RecursiveMode::NonRecursive)?; } event => { debug!("Unhandled event: {:?}.", event); From 32277800290c9c3fa22f405c02947c24426c49c2 Mon Sep 17 00:00:00 2001 From: Matthew Stavola Date: Sun, 24 Jun 2018 00:37:09 -0700 Subject: [PATCH 5/5] Get state properly --- src/rpc.rs | 16 ++++++++-------- src/util.rs | 2 ++ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/rpc.rs b/src/rpc.rs index eefece8..f63d73b 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -15,6 +15,7 @@ use sozu_command::{ messages::Order, state::ConfigState, data::{ + AnswerData, ConfigCommand, ConfigMessage, ConfigMessageStatus, @@ -72,6 +73,8 @@ pub fn execute_orders(socket_path: &str, handle: &Handle, orders: &[Order]) -> B let (item, action) = match order { Order::AddApplication(_) => ("Application", "added"), Order::RemoveApplication(_) => ("Application", "removed"), + Order::AddBackend(_) => ("Backend", "added"), + Order::RemoveBackend(_) => ("Backend", "removed"), Order::AddCertificate(_) => ("Certificate", "added"), Order::RemoveCertificate(_) => ("Certificate", "removed"), Order::ReplaceCertificate(_) => ("Certificate", "replaced"), @@ -118,8 +121,11 @@ pub fn get_config_state(socket_path: &str, handle: &Handle) -> Box = serde_json::from_str(&answer.message) - .map(|config_state: ConfigStateResponse| config_state.state) + let config_state: Result = answer.data.ok_or(RpcError::MalformedMessage("Unable to parse response".to_string())) + .and_then(|data: AnswerData| match data { + AnswerData::State(config_state) => Ok(config_state), + _ => Err(RpcError::UnexpectedResponse) + }) .map_err(|e| { let new_error: Error = e.into(); new_error @@ -130,10 +136,4 @@ pub fn get_config_state(socket_path: &str, handle: &Handle) -> Box { - id: &'a str, - state: ConfigState, } \ No newline at end of file diff --git a/src/util.rs b/src/util.rs index 534b6e3..c96357c 100644 --- a/src/util.rs +++ b/src/util.rs @@ -22,6 +22,8 @@ pub enum RpcError { MalformedMessage(String), #[fail(display = "failed to execute order: {}", _0)] ExecutionFailure(String), + #[fail(display = "unexpected order")] + UnexpectedResponse, #[fail(display = "unknown order: {:?}", _0)] UnsupportedOrder(Order), } \ No newline at end of file