diff --git a/cluster.go b/cluster.go index 13e62f3b0..da3d30841 100644 --- a/cluster.go +++ b/cluster.go @@ -214,6 +214,14 @@ type ClusterConfig struct { // See https://issues.apache.org/jira/browse/CASSANDRA-10786 DisableSkipMetadata bool + // QueryInterceptor will set the provided query interceptor on all queries created from this session. + // Use it to intercept and modify queries by providing an implementation of QueryInterceptor. + QueryInterceptor QueryInterceptor + + // QueryInterceptor will set the provided query interceptor on all queries created from this session. + // Use it to intercept and modify batches by providing an implementation of BatchInterceptor. + BatchInterceptor BatchInterceptor + // QueryObserver will set the provided query observer on all queries created from this session. // Use it to collect metrics / stats from queries by providing an implementation of QueryObserver. QueryObserver QueryObserver diff --git a/doc.go b/doc.go index f23e812c5..699af685f 100644 --- a/doc.go +++ b/doc.go @@ -362,6 +362,26 @@ // // See Example_userDefinedTypesMap, Example_userDefinedTypesStruct, ExampleUDTMarshaler, ExampleUDTUnmarshaler. // +// # Query Interceptors +// +// Query Interceptors wrap query execution and can be used to inject logic that should apply to all query and batch +// executions. For example, interceptors can be used for rate limiting, logging, attaching distributed tracing +// metadata to the context, modifying queries, and inspecting query results. +// +// Interceptors are invoked once prior to query execution and are not re-invoked on retry attempts or speculative +// execution attempts. Interceptors are responsible for calling the provided handler and returning a non-nil Iter +// or error. +// +// type MyQueryInterceptor struct {} +// +// func (q MyQueryInterceptor) InterceptQuery(qry *gocql.Query, handler gocql.QueryHandler) (*gocql.Iter, error) { +// return handler(qry.WithContext(context.WithValue(qry.Context(), "trace-id", "123"))) +// } +// +// func (q MyQueryInterceptor) InterceptBatch(batch *gocql.Batch, handler gocql.BatchHandler) (*gocql.Iter, error) { +// return handler(batch.WithContext(context.WithValue(batch.Context(), "trace-id", "456"))) +// } +// // # Metrics and tracing // // It is possible to provide observer implementations that could be used to gather metrics: diff --git a/session.go b/session.go index a600b95f3..d5d91c9a3 100644 --- a/session.go +++ b/session.go @@ -57,6 +57,8 @@ type Session struct { routingKeyInfoCache routingKeyInfoLRU schemaDescriber *schemaDescriber trace Tracer + queryInterceptor QueryInterceptor + batchInterceptor BatchInterceptor queryObserver QueryObserver batchObserver BatchObserver connectObserver ConnectObserver @@ -182,6 +184,8 @@ func NewSession(cfg ClusterConfig) (*Session, error) { policy: cfg.PoolConfig.HostSelectionPolicy, } + s.queryInterceptor = cfg.QueryInterceptor + s.batchInterceptor = cfg.BatchInterceptor s.queryObserver = cfg.QueryObserver s.batchObserver = cfg.BatchObserver s.connectObserver = cfg.ConnectObserver @@ -543,7 +547,14 @@ func (s *Session) executeQuery(qry *Query) (it *Iter) { return &Iter{err: ErrSessionClosed} } - iter, err := s.executor.executeQuery(qry) + var iter *Iter + var err error + if s.queryInterceptor != nil { + iter, err = s.queryInterceptor.InterceptQuery(qry, s.executor.executeQuery) + } else { + iter, err = s.executor.executeQuery(qry) + } + if err != nil { return &Iter{err: err} } @@ -744,7 +755,14 @@ func (s *Session) executeBatch(batch *Batch) *Iter { return &Iter{err: ErrTooManyStmts} } - iter, err := s.executor.executeQuery(batch) + var iter *Iter + var err error + if s.batchInterceptor != nil { + iter, err = s.batchInterceptor.InterceptBatch(batch, s.executor.executeQuery) + } else { + iter, err = s.executor.executeQuery(batch) + } + if err != nil { return &Iter{err: err} } @@ -2205,6 +2223,36 @@ type ObservedQuery struct { Attempt int } +// QueryHandler is a function that executes a single query. +type QueryHandler = func(*Query) (*Iter, error) + +// QueryInterceptor is the interface implemented by query interceptors / middleware. +// +// Interceptors are well-suited to logic that is not specific to a single query or batch. +type QueryInterceptor interface { + // InterceptQuery is invoked once immediately before a query execution. It is not invoked before retry attempts or + // speculative execution attempts. + + // The interceptor is responsible for calling the `handler` function and returning the handler result. Failure to + // call the handler will panic. If the interceptor wants to skip query execution, it should return an error. + InterceptQuery(qry *Query, handler QueryHandler) (*Iter, error) +} + +// BatchHandler is a function that executes a single batch. +type BatchHandler = func(*Batch) (*Iter, error) + +// BatchInterceptor is the interface implemented by batch interceptors / middleware. +// +// Interceptors are well-suited to logic that is not specific to a single query or batch. +type BatchInterceptor interface { + // InterceptBatch is invoked once immediately before a batch execution. It is not invoked before retry attempts or + // speculative execution attempts. + + // The interceptor is responsible for calling the `handler` function and returning the handler result. Failure to + // call the handler will panic. If the interceptor wants to skip batch execution, it should return an error. + InterceptBatch(batch *Batch, handler BatchHandler) (*Iter, error) +} + // QueryObserver is the interface implemented by query observers / stat collectors. // // Experimental, this interface and use may change