Skip to content

Commit

Permalink
Merge pull request #51 from uw-labs/support-proximo-offset
Browse files Browse the repository at this point in the history
support offset
  • Loading branch information
thinktainer authored Mar 13, 2019
2 parents b3795f8 + 19016fd commit 67626b3
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 19 deletions.
23 changes: 17 additions & 6 deletions proximo/proximo_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ import (
"github.com/uw-labs/sync/rungroup"
)

// Offset is the type used to specify the initial subscription offset
type Offset int64

const (
// OffsetOldest indicates the oldest appropriate message available on the broker.
OffsetOldest int64 = 1
OffsetOldest Offset = 1
// OffsetNewest indicates the next appropriate message available on the broker.
OffsetNewest int64 = 2
OffsetNewest Offset = 2
)

var (
Expand All @@ -31,8 +34,8 @@ type AsyncMessageSourceConfig struct {
ConsumerGroup string
Topic string
Broker string
// Offset int64 TODO: offset for proximo
Insecure bool
Offset Offset
Insecure bool
}

func NewAsyncMessageSource(c AsyncMessageSourceConfig) (substrate.AsyncMessageSource, error) {
Expand All @@ -53,6 +56,7 @@ type asyncMessageSource struct {
conn *grpc.ClientConn
consumerGroup string
topic string
offset Offset
}

type consMsg struct {
Expand Down Expand Up @@ -91,10 +95,17 @@ func (ams *asyncMessageSource) ConsumeMessages(ctx context.Context, messages cha
return errors.Wrap(err, "fail to consume")
}

var offset proximoc.Offset
if ams.offset == OffsetOldest {
offset = proximoc.Offset_OFFSET_OLDEST
} else {
offset = proximoc.Offset_OFFSET_NEWEST
}
if err := stream.Send(&proximoc.ConsumerRequest{
StartRequest: &proximoc.StartConsumeRequest{
Topic: ams.topic,
Consumer: ams.consumerGroup,
Topic: ams.topic,
Consumer: ams.consumerGroup,
InitialOffset: offset,
},
}); err != nil {
return err
Expand Down
20 changes: 9 additions & 11 deletions proximo/proximo_url.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,15 @@ func newProximoSource(u *url.URL) (substrate.AsyncMessageSource, error) {
conf.Insecure = false
}

/*
switch q.Get("offset") {
case "newest":
conf.Offset = OffsetNewest
case "oldest":
conf.Offset = OffsetOldest
case "":
default:
return nil, fmt.Errorf("ignoring unknown offset value '%s'", q.Get("offset"))
}
*/
switch q.Get("offset") {
case "newest":
conf.Offset = OffsetNewest
case "oldest":
conf.Offset = OffsetOldest
case "":
default:
return nil, fmt.Errorf("unknown offset value '%s'", q.Get("offset"))
}

return proximoSourcer(conf)
}
Expand Down
4 changes: 2 additions & 2 deletions proximo/proximo_url_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func TestProximoSource(t *testing.T) {
expected: AsyncMessageSourceConfig{
Broker: "localhost:123",
ConsumerGroup: "g1",
//Offset: foo,
Topic: "t1",
Offset: OffsetNewest,
Topic: "t1",
},
expectedErr: nil,
},
Expand Down

0 comments on commit 67626b3

Please sign in to comment.