-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.go
65 lines (51 loc) · 1.69 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package kafka
import (
"encoding/json"
"math/rand"
"time"
"github.com/segmentio/kafka-go"
)
// TODO These constants could become configurations.
const (
// DefaultLeaderTimeout is the amount of time to wait before a connection to the Kafka leader has considered to time
// out.
DefaultLeaderTimeout = time.Second * 10
// MessageMaxBytes is the maximum quantity of bytes a Kafka message can be.
MessageMaxBytes = 10e8
// MessageBuffer is the maximum number of Kafka messages to buffer from a reading channel.
MessageBuffer = 5
)
// CreateKafkaMessage creates the Kafka message to publish to Kafka.
func CreateKafkaMessage(topic string) (message *kafka.Message, err error) {
// Create a fake reading to put in the message to publish.
reading := createReading()
// Turn the reading into a JSON byte slice.
var data []byte
if data, err = json.Marshal(reading); err != nil {
return nil, err
}
// Create the Kafka message.
return &kafka.Message{
Topic: topic,
Value: data,
}, nil
}
// createReading creates a random fake temperature reading for the current time.
func createReading() *Reading {
// Create a random temperature reading based on the temperatures from this website:
// https://earthobservatory.nasa.gov/global-maps/MOD_LSTD_M
celsius := rand.Float64() + float64(rand.Intn(45+25)-25)
// Create the Reading Go structure.
return &Reading{
Celsius: celsius,
Epoch: time.Now().Unix(),
}
}
// UnmarshalMessage unmarshalls the Kafka message into the Reading Go structure.
func UnmarshalMessage(message *kafka.Message) (reading *Reading, err error) {
reading = &Reading{}
if err = json.Unmarshal(message.Value, reading); err != nil {
return nil, err
}
return reading, nil
}