Skip to content

Commit

Permalink
Merge pull request #72 from heetch/046-retain-logical-types
Browse files Browse the repository at this point in the history
allow retaining logical types in CanonicalString
  • Loading branch information
rogpeppe authored Apr 9, 2020
2 parents ed946e0 + e7ae23e commit 239e952
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 13 deletions.
2 changes: 1 addition & 1 deletion avroregistry/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (r encodingRegistry) AppendSchemaID(buf []byte, id int64) []byte {
func (r encodingRegistry) IDForSchema(ctx context.Context, schema *avro.Type) (int64, error) {
data, err := json.Marshal(struct {
Schema string `json:"schema"`
}{schema.CanonicalString(avro.LeaveDefaults)})
}{canonical(schema)})
if err != nil {
return 0, err
}
Expand Down
6 changes: 5 additions & 1 deletion avroregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (r *Registry) Register(ctx context.Context, subject string, schema *avro.Ty
// we need to strip metadata from the schema when registering.
data, err := json.Marshal(struct {
Schema string `json:"schema"`
}{schema.CanonicalString(avro.LeaveDefaults)})
}{canonical(schema)})
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -184,3 +184,7 @@ type apiError struct {
func (e *apiError) Error() string {
return fmt.Sprintf("Avro registry error (code %d): %v", e.ErrorCode, e.Message)
}

func canonical(schema *avro.Type) string {
return schema.CanonicalString(avro.RetainDefaults | avro.RetainLogicalTypes)
}
15 changes: 15 additions & 0 deletions avroregistry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ func TestSchemaCompatibility(t *testing.T) {
c.Assert(err, qt.ErrorMatches, `Avro registry error \(code 409\): Schema being registered is incompatible with an earlier schema`)
}

func TestSchemasRetainLogicalTypes(t *testing.T) {
c := qt.New(t)
defer c.Done()
r, subject := newTestRegistry(c)
ctx := context.Background()
type R struct {
T time.Time
}
id, err := r.Register(ctx, subject, schemaOf(nil, R{}))
c.Assert(err, qt.Equals, nil)
schema, err := r.Decoder().SchemaForID(ctx, id)
c.Assert(err, qt.Equals, nil)
c.Assert(schema.String(), qt.Equals, `{"type":"record","name":"R","fields":[{"name":"T","type":{"type":"long","logicalType":"timestamp-micros"},"default":0}]}`)
}

func TestSingleCodec(t *testing.T) {
c := qt.New(t)
defer c.Done()
Expand Down
47 changes: 41 additions & 6 deletions type.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ type Type struct {
schema string
// We might not usually need the canonical string, so we
// calculate it lazily and store it in canonical[opts].
canonical [2]string
canonicalOnce [2]sync.Once
canonical [RetainAll + 1]string
canonicalOnce [RetainAll + 1]sync.Once
}

// ParseType parses an Avro schema in the format defined by the Avro
Expand All @@ -44,7 +44,9 @@ type CanonicalOpts int
const (
// LeaveDefaults specifies that default values should be retained in
// the canonicalized schema string.
LeaveDefaults CanonicalOpts = 1 << iota
RetainDefaults CanonicalOpts = 1 << iota
RetainLogicalTypes
RetainAll CanonicalOpts = RetainDefaults | RetainLogicalTypes
)

// CanonicalString returns the canonical string representation of the type,
Expand All @@ -53,7 +55,7 @@ const (
// BUG: Unicode characters \u2028 and \u2029 in strings inside the schema are always escaped, contrary to the
// specification above.
func (t *Type) CanonicalString(opts CanonicalOpts) string {
opts &= LeaveDefaults
opts &= RetainAll
t.canonicalOnce[opts].Do(func() {
c := &canonicalizer{
defined: make(map[schema.QualifiedName]bool),
Expand Down Expand Up @@ -94,11 +96,44 @@ type canonicalFields struct {
// important to store in the registry, so we allow it to be
// kept with the LeaveDefaults option to CanonicalString.
// TODO the Avro spec doesn't define canonicalization for
// floating point values, which could be an issue.
// numeric values, which could be an issue.
Default interface{} `json:"default,omitempty"`
// Logical types aren't mentioned in the specification either,
// but they're important to maintain so that we can potentially
// guard against data corruption due to changed logical types.
LogicalType string `json:"logicalType,omitempty"`
Precision int `json:"precision,omitempty"`
Scale int `json:"scale,omitempty"`
}

func (c *canonicalizer) canonicalValue(at schema.AvroType) interface{} {
s := c.canonicalValue1(at)
ltype := logicalType(at)
if (c.opts&RetainLogicalTypes) == 0 || ltype == "" {
return s
}
var r canonicalFields
switch s := s.(type) {
case string:
r = canonicalFields{
Type: s,
}
case canonicalFields:
r = s
default:
panic("unexpected type returned from canonicalValue1")
}
r.LogicalType = ltype
if ltype == "decimal" {
scale, _ := at.Attribute("scale").(float64)
r.Scale = int(scale)
precision, _ := at.Attribute("precision").(float64)
r.Precision = int(precision)
}
return r
}

func (c *canonicalizer) canonicalValue1(at schema.AvroType) interface{} {
switch at := at.(type) {
case *schema.ArrayField:
return canonicalFields{
Expand Down Expand Up @@ -166,7 +201,7 @@ func (c *canonicalizer) canonicalValue(at schema.AvroType) interface{} {
Name: f.Name(),
Type: c.canonicalValue(f.Type()),
}
if f.HasDefault() && (c.opts&LeaveDefaults) != 0 {
if f.HasDefault() && (c.opts&RetainDefaults) != 0 {
cf.Fields[i].Default = f.Default()
}
}
Expand Down
80 changes: 75 additions & 5 deletions type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ var canonicalStringTests = []struct {
out: `"int"`,
}, {
testName: "spec-STRIP",
opts: avro.LeaveDefaults,
in: `{
"type": "record",
"name":"R",
Expand All @@ -37,10 +36,75 @@ var canonicalStringTests = []struct {
"name": "a",
"type": "string",
"default": "hello"
}, {
"name": "b",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}]}`,
out: `{"name":"R","type":"record","fields":[{"name":"a","type":"string"},{"name":"b","type":"long"}]}`,
}, {
testName: "spec-STRIP-retain-defaults",
opts: avro.RetainDefaults,
in: `{
"type": "record",
"name":"R",
"doc": "documentation",
"extra-meta":"hello",
"aliases": ["a", "b"],
"fields": [{
"name": "a",
"type": "string",
"default": "hello"
}, {
"name": "b",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
}]}`,
out: `{"name":"R","type":"record","fields":[{"name":"a","type":"string","default":"hello"},{"name":"b","type":"long"}]}`,
}, {
testName: "spec-STRIP-retain-logical-types",
opts: avro.RetainLogicalTypes,
in: `{
"type": "record",
"name":"R",
"doc": "documentation",
"extra-meta":"hello",
"aliases": ["a", "b"],
"fields": [{
"name": "a",
"type": "string",
"default": "hello"
}, {
"name": "b",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
}, {
"name": "c",
"type": {
"type": "bytes",
"logicalType": "decimal",
"scale": 3,
"precision": 6
}
}, {
"name": "d",
"type": {
"type": "bytes",
"logicalType": "decimal",
"scale": 0,
"precision": 6
}
}]}`,
out: `{"name":"R","type":"record","fields":[{"name":"a","type":"string","default":"hello"}]}`,
out: `{"name":"R","type":"record","fields":[{"name":"a","type":"string"},{"name":"b","type":{"type":"long","logicalType":"timestamp-micros"}},{"name":"c","type":{"type":"bytes","logicalType":"decimal","precision":6,"scale":3}},{"name":"d","type":{"type":"bytes","logicalType":"decimal","precision":6}}]}`,
}, {
testName: "spec-STRIP-include-defaults",
testName: "spec-STRIP-retain-all",
opts: avro.RetainAll,
in: `{
"type": "record",
"name":"R",
Expand All @@ -51,8 +115,14 @@ var canonicalStringTests = []struct {
"name": "a",
"type": "string",
"default": "hello"
}, {
"name": "b",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
}]}`,
out: `{"name":"R","type":"record","fields":[{"name":"a","type":"string"}]}`,
out: `{"name":"R","type":"record","fields":[{"name":"a","type":"string","default":"hello"},{"name":"b","type":{"type":"long","logicalType":"timestamp-micros"}}]}`,
}, {
testName: "spec-ORDER",
in: `{
Expand Down Expand Up @@ -92,7 +162,7 @@ var canonicalStringTests = []struct {
out: `{"name":"R","type":"record","fields":[{"name":"a","type":"string"},{"name":"b","type":{"name":"E","type":"enum","symbols":["x","y"]}},{"name":"c","type":{"type":"array","items":"int"}},{"name":"d","type":{"type":"map","values":"int"}},{"name":"e","type":{"name":"F","type":"fixed","size":20}}]}`,
}, {
testName: "spec-STRINGS",
opts: avro.LeaveDefaults,
opts: avro.RetainDefaults,
in: `{
"name":"\u0052",
"type":"record",
Expand Down

0 comments on commit 239e952

Please sign in to comment.