Skip to content

Commit

Permalink
fix(cat-gateway): Cip36 indexing working, improve bad cassandra query…
Browse files Browse the repository at this point in the history
… reporting.
  • Loading branch information
stevenj committed Sep 20, 2024
1 parent be0d3fc commit 43152e4
Show file tree
Hide file tree
Showing 19 changed files with 109 additions and 27 deletions.
2 changes: 1 addition & 1 deletion catalyst-gateway/bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ build-info = "0.0.38"
ed25519-dalek = "2.1.1"
scylla = { version = "0.14.0", features = ["cloud", "full-serialization"] }
strum = { version = "0.26.3", features = ["derive"] }
# strum_macros = "0.26.4"
strum_macros = "0.26.4"
openssl = { version = "0.10.66", features = ["vendored"] }
num-bigint = "0.4.6"
futures = "0.3.30"
Expand Down
34 changes: 33 additions & 1 deletion catalyst-gateway/bin/src/db/index/block/certs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Index certs found in a transaction.

use std::sync::Arc;
use std::{fmt::Debug, sync::Arc};

use cardano_chain_follower::MultiEraBlock;
use pallas::ledger::primitives::{alonzo, conway};
Expand Down Expand Up @@ -37,6 +37,38 @@ pub(crate) struct StakeRegistrationInsertQuery {
pool_delegation: MaybeUnset<Vec<u8>>,
}

impl Debug for StakeRegistrationInsertQuery {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
let stake_address = match self.stake_address {
MaybeUnset::Unset => "UNSET",
MaybeUnset::Set(ref v) => &hex::encode(v),
};
let register = match self.register {
MaybeUnset::Unset => "UNSET",
MaybeUnset::Set(v) => &format!("{v:?}"),
};
let deregister = match self.deregister {
MaybeUnset::Unset => "UNSET",
MaybeUnset::Set(v) => &format!("{v:?}"),
};
let pool_delegation = match self.pool_delegation {
MaybeUnset::Unset => "UNSET",
MaybeUnset::Set(ref v) => &hex::encode(v),
};

f.debug_struct("StakeRegistrationInsertQuery")
.field("stake_hash", &hex::encode(hex::encode(&self.stake_hash)))
.field("slot_no", &self.slot_no)
.field("txn", &self.txn)
.field("stake_address", &stake_address)
.field("script", &self.script)
.field("register", &register)
.field("deregister", &deregister)
.field("pool_delegation", &pool_delegation)
.finish()
}
}

/// TXI by Txn hash Index
const INSERT_STAKE_REGISTRATION_QUERY: &str = include_str!("./cql/insert_stake_registration.cql");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ INSERT INTO cip36_registration (
payment_address,
is_payable,
raw_nonce,
cip36,
cip36
) VALUES (
:stake_address,
:nonce,
Expand All @@ -18,5 +18,5 @@ INSERT INTO cip36_registration (
:payment_address,
:is_payable,
:raw_nonce,
:cip36,
:cip36
);
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
-- Index CIP-36 Registration (Valid)
INSERT INTO cip36_registration_for_stake_addr (
-- Index CIP-36 Registration (For each Vote Key)
INSERT INTO cip36_registration_for_vote_key (
vote_key,
stake_address,
slot_no,
txn,
valid,
valid
) VALUES (
:vote_key,
:stake_address,
:slot_no,
:txn,
:valid,
:valid
);
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ INSERT INTO cip36_registration_invalid (
nonce,
cip36,
signed,
error_report,
error_report
) VALUES (
:stake_address,
:slot_no,
Expand Down
22 changes: 21 additions & 1 deletion catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Insert CIP36 Registration Query

use std::sync::Arc;
use std::{fmt::Debug, sync::Arc};

use cardano_chain_follower::Metadata::cip36::{Cip36, VotingPubKey};
use scylla::{frame::value::MaybeUnset, SerializeRow, Session};
Expand Down Expand Up @@ -37,6 +37,26 @@ pub(super) struct Params {
cip36: bool,
}

impl Debug for Params {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let payment_address = match self.payment_address {
MaybeUnset::Unset => "UNSET",
MaybeUnset::Set(ref v) => &hex::encode(v),
};
f.debug_struct("Params")
.field("stake_address", &self.stake_address)
.field("nonce", &self.nonce)
.field("slot_no", &self.slot_no)
.field("txn", &self.txn)
.field("vote_key", &self.vote_key)
.field("payment_address", &payment_address)
.field("is_payable", &self.is_payable)
.field("raw_nonce", &self.raw_nonce)
.field("cip36", &self.cip36)
.finish()
}
}

impl Params {
/// Create a new Insert Query.
pub fn new(vote_key: &VotingPubKey, slot_no: u64, txn: i16, cip36: &Cip36) -> Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const INSERT_CIP36_REGISTRATION_FOR_VOTE_KEY_QUERY: &str =
include_str!("./cql/insert_cip36_for_vote_key.cql");

/// Insert CIP-36 Registration Invalid Query Parameters
#[derive(SerializeRow, Clone)]
#[derive(SerializeRow, Debug)]
pub(super) struct Params {
/// Voting Public Key
vote_key: Vec<u8>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Insert CIP36 Registration Query (Invalid Records)

use std::sync::Arc;
use std::{fmt::Debug, sync::Arc};

use cardano_chain_follower::Metadata::cip36::{Cip36, VotingPubKey};
use scylla::{frame::value::MaybeUnset, SerializeRow, Session};
Expand Down Expand Up @@ -42,6 +42,28 @@ pub(super) struct Params {
error_report: Vec<String>,
}

impl Debug for Params {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let cip36 = match self.cip36 {
MaybeUnset::Unset => "UNSET",
MaybeUnset::Set(v) => &format!("{v:?}"),
};
f.debug_struct("Params")
.field("stake_address", &self.stake_address)
.field("slot_no", &self.slot_no)
.field("txn", &self.txn)
.field("vote_key", &self.vote_key)
.field("payment_address", &self.payment_address)
.field("is_payable", &self.is_payable)
.field("raw_nonce", &self.raw_nonce)
.field("nonce", &self.nonce)
.field("cip36", &cip36)
.field("signed", &self.signed)
.field("error_report", &self.error_report)
.finish()
}
}

impl Params {
/// Create a new Insert Query.
pub fn new(
Expand Down
3 changes: 2 additions & 1 deletion catalyst-gateway/bin/src/db/index/block/cip36/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ impl Cip36InsertQuery {
vote_key, slot_no, txn_index, cip36, true,
));
}
} else {
} else if cip36.stake_pk.is_some() {
// We can't index an error, if there is no stake public key.
if cip36.voting_keys.is_empty() {
self.invalid.push(insert_cip36_invalid::Params::new(
None,
Expand Down
2 changes: 1 addition & 1 deletion catalyst-gateway/bin/src/db/index/block/txi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
};

/// Insert TXI Query and Parameters
#[derive(SerializeRow)]
#[derive(SerializeRow, Debug)]
pub(crate) struct TxiInsertParams {
/// Spent Transactions Hash
txn_hash: Vec<u8>,
Expand Down
2 changes: 1 addition & 1 deletion catalyst-gateway/bin/src/db/index/block/txo/insert_txo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const INSERT_TXO_QUERY: &str = include_str!("./cql/insert_txo.cql");

/// Insert TXO Query Parameters
/// (Superset of data to support both Staked and Unstaked TXO records.)
#[derive(SerializeRow)]
#[derive(SerializeRow, Debug)]
pub(super) struct Params {
/// Stake Address - Binary 28 bytes. 0 bytes = not staked.
stake_address: Vec<u8>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const INSERT_TXO_ASSET_QUERY: &str = include_str!("./cql/insert_txo_asset.cql");

/// Insert TXO Asset Query Parameters
/// (Superset of data to support both Staked and Unstaked TXO records.)
#[derive(SerializeRow)]
#[derive(SerializeRow, Debug)]
pub(super) struct Params {
/// Stake Address - Binary 28 bytes. 0 bytes = not staked.
stake_address: Vec<u8>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const INSERT_UNSTAKED_TXO_QUERY: &str = include_str!("./cql/insert_unstaked_txo.

/// Insert TXO Unstaked Query Parameters
/// (Superset of data to support both Staked and Unstaked TXO records.)
#[derive(SerializeRow)]
#[derive(SerializeRow, Debug)]
pub(super) struct Params {
/// Transactions hash.
txn_hash: Vec<u8>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const INSERT_UNSTAKED_TXO_ASSET_QUERY: &str = include_str!("./cql/insert_unstake

/// Insert TXO Asset Query Parameters
/// (Superset of data to support both Staked and Unstaked TXO records.)
#[derive(SerializeRow)]
#[derive(SerializeRow, Debug)]
pub(super) struct Params {
/// Transactions hash.
txn_hash: Vec<u8>,
Expand Down
16 changes: 11 additions & 5 deletions catalyst-gateway/bin/src/db/index/queries/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

pub(crate) mod staked_ada;

use std::sync::Arc;
use std::{fmt::Debug, sync::Arc};

use anyhow::bail;
use anyhow::{bail, Context};
use crossbeam_skiplist::SkipMap;
use scylla::{
batch::Batch, prepared_statement::PreparedStatement, serialize::row::SerializeRow,
Expand All @@ -26,7 +26,8 @@ use crate::settings::{CassandraEnvVars, CASSANDRA_MIN_BATCH_SIZE};
pub(crate) type SizedBatch = SkipMap<u16, Arc<Batch>>;

/// All Prepared Queries that we know about.
#[allow(clippy::enum_variant_names, dead_code)]
#[derive(strum_macros::Display)]
#[allow(clippy::enum_variant_names)]
pub(crate) enum PreparedQuery {
/// TXO Insert query.
TxoAdaInsertQuery,
Expand Down Expand Up @@ -206,7 +207,7 @@ impl PreparedQueries {
///
/// This will divide the batch into optimal sized chunks and execute them until all
/// values have been executed or the first error is encountered.
pub(crate) async fn execute_batch<T: SerializeRow>(
pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
&self, session: Arc<Session>, cfg: Arc<CassandraEnvVars>, query: PreparedQuery,
values: Vec<T>,
) -> FallibleQueryResults {
Expand Down Expand Up @@ -238,7 +239,12 @@ impl PreparedQueries {
bail!("No batch query found for size {}", chunk_size);
};
let batch_query_statements = batch_query.value().clone();
results.push(session.batch(&batch_query_statements, chunk).await?);
results.push(
session
.batch(&batch_query_statements, chunk)
.await
.context(format!("query={query}, chunk={chunk:?}"))?,
);
}

Ok(results)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
const UPDATE_TXO_SPENT_QUERY: &str = include_str!("../cql/update_txo_spent.cql");

/// Update TXO spent query params.
#[derive(SerializeRow)]
#[derive(SerializeRow, Debug)]
pub(crate) struct UpdateTxoSpentQueryParams {
/// TXO stake address.
pub stake_address: Vec<u8>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- Index of CIP-36 registrations searchable by Stake Address.
-- Full registration data needs to be queried from the man cip36 registration tables.
-- Includes both Valid and Invalid registrations.
CREATE TABLE IF NOT EXISTS cip36_registration_for_stake_addr (
CREATE TABLE IF NOT EXISTS cip36_registration_for_vote_key (
-- Primary Key Data
vote_key blob, -- 32 Bytes of Vote Key.
stake_address blob, -- 32 Bytes of Stake Address.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ CREATE TABLE IF NOT EXISTS cip36_registration_invalid (
signed boolean, -- Signature validates.
error_report list<text>, -- List of serialization errors in the registration.

PRIMARY KEY (vote_key, slot_no, txn)
PRIMARY KEY (stake_address, slot_no, txn)
)
WITH CLUSTERING ORDER BY (slot_no DESC, txn DESC);
3 changes: 2 additions & 1 deletion catalyst-gateway/bin/src/db/index/session.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Session creation and storage

use std::{
fmt::Debug,
path::PathBuf,
sync::{Arc, OnceLock},
time::Duration,
Expand Down Expand Up @@ -122,7 +123,7 @@ impl CassandraSession {
///
/// This will divide the batch into optimal sized chunks and execute them until all
/// values have been executed or the first error is encountered.
pub(crate) async fn execute_batch<T: SerializeRow>(
pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
&self, query: PreparedQuery, values: Vec<T>,
) -> FallibleQueryResults {
let session = self.session.clone();
Expand Down

0 comments on commit 43152e4

Please sign in to comment.