Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add query interceptors #1810

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,14 @@ type ClusterConfig struct {
// See https://issues.apache.org/jira/browse/CASSANDRA-10786
DisableSkipMetadata bool

// QueryInterceptor will set the provided query interceptor on all queries created from this session.
// Use it to intercept and modify queries by providing an implementation of QueryInterceptor.
QueryInterceptor QueryInterceptor

// QueryInterceptor will set the provided query interceptor on all queries created from this session.
// Use it to intercept and modify batches by providing an implementation of BatchInterceptor.
BatchInterceptor BatchInterceptor

// QueryObserver will set the provided query observer on all queries created from this session.
// Use it to collect metrics / stats from queries by providing an implementation of QueryObserver.
QueryObserver QueryObserver
Expand Down
20 changes: 20 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,26 @@
//
// See Example_userDefinedTypesMap, Example_userDefinedTypesStruct, ExampleUDTMarshaler, ExampleUDTUnmarshaler.
//
// # Query Interceptors
//
// Query Interceptors wrap query execution and can be used to inject logic that should apply to all query and batch
// executions. For example, interceptors can be used for rate limiting, logging, attaching distributed tracing
// metadata to the context, modifying queries, and inspecting query results.
//
// Interceptors are invoked once prior to query execution and are not re-invoked on retry attempts or speculative
// execution attempts. Interceptors are responsible for calling the provided handler and returning a non-nil Iter
// or error.
//
// type MyQueryInterceptor struct {}
//
// func (q MyQueryInterceptor) InterceptQuery(qry *gocql.Query, handler gocql.QueryHandler) (*gocql.Iter, error) {
// return handler(qry.WithContext(context.WithValue(qry.Context(), "trace-id", "123")))
// }
//
// func (q MyQueryInterceptor) InterceptBatch(batch *gocql.Batch, handler gocql.BatchHandler) (*gocql.Iter, error) {
// return handler(batch.WithContext(context.WithValue(batch.Context(), "trace-id", "456")))
// }
//
// # Metrics and tracing
//
// It is possible to provide observer implementations that could be used to gather metrics:
Expand Down
52 changes: 50 additions & 2 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type Session struct {
routingKeyInfoCache routingKeyInfoLRU
schemaDescriber *schemaDescriber
trace Tracer
queryInterceptor QueryInterceptor
batchInterceptor BatchInterceptor
queryObserver QueryObserver
batchObserver BatchObserver
connectObserver ConnectObserver
Expand Down Expand Up @@ -182,6 +184,8 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
policy: cfg.PoolConfig.HostSelectionPolicy,
}

s.queryInterceptor = cfg.QueryInterceptor
s.batchInterceptor = cfg.BatchInterceptor
s.queryObserver = cfg.QueryObserver
s.batchObserver = cfg.BatchObserver
s.connectObserver = cfg.ConnectObserver
Expand Down Expand Up @@ -543,7 +547,14 @@ func (s *Session) executeQuery(qry *Query) (it *Iter) {
return &Iter{err: ErrSessionClosed}
}

iter, err := s.executor.executeQuery(qry)
var iter *Iter
var err error
if s.queryInterceptor != nil {
iter, err = s.queryInterceptor.InterceptQuery(qry, s.executor.executeQuery)
} else {
iter, err = s.executor.executeQuery(qry)
}

if err != nil {
return &Iter{err: err}
}
Expand Down Expand Up @@ -744,7 +755,14 @@ func (s *Session) executeBatch(batch *Batch) *Iter {
return &Iter{err: ErrTooManyStmts}
}

iter, err := s.executor.executeQuery(batch)
var iter *Iter
var err error
if s.batchInterceptor != nil {
iter, err = s.batchInterceptor.InterceptBatch(batch, s.executor.executeQuery)
} else {
iter, err = s.executor.executeQuery(batch)
}

if err != nil {
return &Iter{err: err}
}
Expand Down Expand Up @@ -2205,6 +2223,36 @@ type ObservedQuery struct {
Attempt int
}

// QueryHandler is a function that executes a single query.
type QueryHandler = func(*Query) (*Iter, error)

// QueryInterceptor is the interface implemented by query interceptors / middleware.
//
// Interceptors are well-suited to logic that is not specific to a single query or batch.
type QueryInterceptor interface {
// InterceptQuery is invoked once immediately before a query execution. It is not invoked before retry attempts or
// speculative execution attempts.

// The interceptor is responsible for calling the `handler` function and returning the handler result. Failure to
// call the handler will panic. If the interceptor wants to skip query execution, it should return an error.
InterceptQuery(qry *Query, handler QueryHandler) (*Iter, error)
}

// BatchHandler is a function that executes a single batch.
type BatchHandler = func(*Batch) (*Iter, error)

// BatchInterceptor is the interface implemented by batch interceptors / middleware.
//
// Interceptors are well-suited to logic that is not specific to a single query or batch.
type BatchInterceptor interface {
// InterceptBatch is invoked once immediately before a batch execution. It is not invoked before retry attempts or
// speculative execution attempts.

// The interceptor is responsible for calling the `handler` function and returning the handler result. Failure to
// call the handler will panic. If the interceptor wants to skip batch execution, it should return an error.
InterceptBatch(batch *Batch, handler BatchHandler) (*Iter, error)
}

// QueryObserver is the interface implemented by query observers / stat collectors.
//
// Experimental, this interface and use may change
Expand Down