From 769a5c1be198c3c661f70f0e2a9722741c7d3dba Mon Sep 17 00:00:00 2001 From: "mykyta.oleksiienko" Date: Mon, 28 Oct 2024 17:15:18 +0200 Subject: [PATCH] CASSGO-26 consistency serial was added MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The user should be able to set consistency to SERIAL or LOCAL_SERIAL for Paxos reads, but the previous implementation doesn't support such a feature. patch by Mykyta Oleksiienko; reviewed by João Reis for CASSGO-26 --- CHANGELOG.md | 2 ++ cassandra_test.go | 88 +++++++++++++++++++++++++++++++++++++++++++++++ cluster.go | 2 +- frame.go | 58 +++++++++++-------------------- session.go | 18 +++++++--- 5 files changed, 125 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 20da746a0..0f4e8936e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Allow SERIAL and LOCAL_SERIAL on SELECT statements [CASSGO-26](https://issues.apache.org/jira/browse/CASSGO-26) + ### Changed - Don't restrict server authenticator unless PasswordAuthentictor.AllowedAuthenticators is provided (CASSGO-19) diff --git a/cassandra_test.go b/cassandra_test.go index 797a7cf7f..e92ae648b 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -45,6 +45,8 @@ import ( "unicode" inf "gopkg.in/inf.v0" + + "github.com/stretchr/testify/require" ) func TestEmptyHosts(t *testing.T) { @@ -504,6 +506,92 @@ func TestCAS(t *testing.T) { } } +func TestConsistencySerial(t *testing.T) { + session := createSession(t) + defer session.Close() + + type testStruct struct { + name string + id int + consistency Consistency + expected string + } + + testCases := []testStruct{ + { + name: "Any", + consistency: Any, + expected: "serial consistency can only be SERIAL or LOCAL_SERIAL got ANY", + }, { + name: "One", + consistency: One, + expected: "serial consistency can only be SERIAL or LOCAL_SERIAL got ONE", + }, { + name: "Two", + consistency: Two, + expected: "serial consistency can only be SERIAL or LOCAL_SERIAL got TWO", + }, { + name: "Three", + consistency: Three, + expected: "serial consistency can only be SERIAL or LOCAL_SERIAL got THREE", + }, { + name: "Quorum", + consistency: Quorum, + expected: "serial consistency can only be SERIAL or LOCAL_SERIAL got QUORUM", + }, { + name: "LocalQuorum", + consistency: LocalQuorum, + expected: "serial consistency can only be SERIAL or LOCAL_SERIAL got LOCAL_QUORUM", + }, { + name: "EachQuorum", + consistency: EachQuorum, + expected: "serial consistency can only be SERIAL or LOCAL_SERIAL got EACH_QUORUM", + }, { + name: "Serial", + id: 8, + consistency: Serial, + expected: "", + }, { + name: "LocalSerial", + id: 9, + consistency: LocalSerial, + expected: "", + }, { + name: "LocalOne", + consistency: LocalOne, + expected: "serial consistency can only be SERIAL or LOCAL_SERIAL got LOCAL_ONE", + }, + } + + err := session.Query("CREATE TABLE IF NOT EXISTS gocql_test.consistency_serial (id int PRIMARY KEY)").Exec() + if err != nil { + t.Fatalf("can't create consistency_serial table:%v", err) + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if tc.expected == "" { + err = session.Query("INSERT INTO gocql_test.consistency_serial (id) VALUES (?)", tc.id).SerialConsistency(tc.consistency).Exec() + if err != nil { + t.Fatal(err) + } + + var receivedID int + err = session.Query("SELECT * FROM gocql_test.consistency_serial WHERE id=?", tc.id).Scan(&receivedID) + if err != nil { + t.Fatal(err) + } + + require.Equal(t, tc.id, receivedID) + } else { + require.PanicsWithValue(t, tc.expected, func() { + session.Query("INSERT INTO gocql_test.consistency_serial (id) VALUES (?)", tc.id).SerialConsistency(tc.consistency) + }) + } + }) + } +} + func TestDurationType(t *testing.T) { session := createSession(t) defer session.Close() diff --git a/cluster.go b/cluster.go index 13e62f3b0..74a0c26f1 100644 --- a/cluster.go +++ b/cluster.go @@ -150,7 +150,7 @@ type ClusterConfig struct { // Consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL. // Default: unset - SerialConsistency SerialConsistency + SerialConsistency Consistency // SslOpts configures TLS use when HostDialer is not set. // SslOpts is ignored if HostDialer is set. diff --git a/frame.go b/frame.go index d374ae574..a8252da9a 100644 --- a/frame.go +++ b/frame.go @@ -192,6 +192,9 @@ const ( type Consistency uint16 +// SerialConsistency is deprecated. Use Consistency instead. +type SerialConsistency = Consistency + const ( Any Consistency = 0x00 One Consistency = 0x01 @@ -201,6 +204,8 @@ const ( All Consistency = 0x05 LocalQuorum Consistency = 0x06 EachQuorum Consistency = 0x07 + Serial Consistency = 0x08 + LocalSerial Consistency = 0x09 LocalOne Consistency = 0x0A ) @@ -224,6 +229,10 @@ func (c Consistency) String() string { return "EACH_QUORUM" case LocalOne: return "LOCAL_ONE" + case Serial: + return "SERIAL" + case LocalSerial: + return "LOCAL_SERIAL" default: return fmt.Sprintf("UNKNOWN_CONS_0x%x", uint16(c)) } @@ -253,6 +262,10 @@ func (c *Consistency) UnmarshalText(text []byte) error { *c = EachQuorum case "LOCAL_ONE": *c = LocalOne + case "SERIAL": + *c = Serial + case "LOCAL_SERIAL": + *c = LocalSerial default: return fmt.Errorf("invalid consistency %q", string(text)) } @@ -260,6 +273,10 @@ func (c *Consistency) UnmarshalText(text []byte) error { return nil } +func (c Consistency) IsSerial() bool { + return c == Serial || c == LocalSerial + +} func ParseConsistency(s string) Consistency { var c Consistency if err := c.UnmarshalText([]byte(strings.ToUpper(s))); err != nil { @@ -286,41 +303,6 @@ func MustParseConsistency(s string) (Consistency, error) { return c, nil } -type SerialConsistency uint16 - -const ( - Serial SerialConsistency = 0x08 - LocalSerial SerialConsistency = 0x09 -) - -func (s SerialConsistency) String() string { - switch s { - case Serial: - return "SERIAL" - case LocalSerial: - return "LOCAL_SERIAL" - default: - return fmt.Sprintf("UNKNOWN_SERIAL_CONS_0x%x", uint16(s)) - } -} - -func (s SerialConsistency) MarshalText() (text []byte, err error) { - return []byte(s.String()), nil -} - -func (s *SerialConsistency) UnmarshalText(text []byte) error { - switch string(text) { - case "SERIAL": - *s = Serial - case "LOCAL_SERIAL": - *s = LocalSerial - default: - return fmt.Errorf("invalid consistency %q", string(text)) - } - - return nil -} - const ( apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal." ) @@ -1452,7 +1434,7 @@ type queryParams struct { values []queryValues pageSize int pagingState []byte - serialConsistency SerialConsistency + serialConsistency Consistency // v3+ defaultTimestamp bool defaultTimestampValue int64 @@ -1541,7 +1523,7 @@ func (f *framer) writeQueryParams(opts *queryParams) { } if opts.serialConsistency > 0 { - f.writeConsistency(Consistency(opts.serialConsistency)) + f.writeConsistency(opts.serialConsistency) } if f.proto > protoVersion2 && opts.defaultTimestamp { @@ -1653,7 +1635,7 @@ type writeBatchFrame struct { consistency Consistency // v3+ - serialConsistency SerialConsistency + serialConsistency Consistency defaultTimestamp bool defaultTimestampValue int64 diff --git a/session.go b/session.go index a600b95f3..85d584d3d 100644 --- a/session.go +++ b/session.go @@ -144,6 +144,10 @@ func NewSession(cfg ClusterConfig) (*Session, error) { return nil, errors.New("Can't use both Authenticator and AuthProvider in cluster config.") } + if cfg.SerialConsistency > 0 && !cfg.SerialConsistency.IsSerial() { + return nil, fmt.Errorf("the default SerialConsistency level is not allowed to be anything else but SERIAL or LOCAL_SERIAL. Recived value: %v", cfg.SerialConsistency) + } + // TODO: we should take a context in here at some point ctx, cancel := context.WithCancel(context.TODO()) @@ -915,7 +919,7 @@ type Query struct { rt RetryPolicy spec SpeculativeExecutionPolicy binding func(q *QueryInfo) ([]interface{}, error) - serialCons SerialConsistency + serialCons Consistency defaultTimestamp bool defaultTimestampValue int64 disableSkipMetadata bool @@ -1264,7 +1268,10 @@ func (q *Query) Bind(v ...interface{}) *Query { // either SERIAL or LOCAL_SERIAL and if not present, it defaults to // SERIAL. This option will be ignored for anything else that a // conditional update/insert. -func (q *Query) SerialConsistency(cons SerialConsistency) *Query { +func (q *Query) SerialConsistency(cons Consistency) *Query { + if !cons.IsSerial() { + panic("serial consistency can only be SERIAL or LOCAL_SERIAL got " + cons.String()) + } q.serialCons = cons return q } @@ -1735,7 +1742,7 @@ type Batch struct { trace Tracer observer BatchObserver session *Session - serialCons SerialConsistency + serialCons Consistency defaultTimestamp bool defaultTimestampValue int64 context context.Context @@ -1914,7 +1921,10 @@ func (b *Batch) Size() int { // conditional update/insert. // // Only available for protocol 3 and above -func (b *Batch) SerialConsistency(cons SerialConsistency) *Batch { +func (b *Batch) SerialConsistency(cons Consistency) *Batch { + if !cons.IsSerial() { + panic("serial consistency can only be SERIAL or LOCAL_SERIAL got " + cons.String()) + } b.serialCons = cons return b }