Skip to content

Commit

Permalink
Merge pull request #50 from toni-moreno/feat/add_tls_support
Browse files Browse the repository at this point in the history
add TLS support to connect with influxdb
  • Loading branch information
sbengo authored Nov 19, 2021
2 parents 2bacda9 + 2dc02b7 commit b1da934
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 122 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# v 0.6.8 (unreleased)
# v 0.7.0 (unreleased)

## New features

* Support for uint64 columns (#44)
* add TLS support to connect with influxdb
* Support for uint64 columns (#44) (thanks to https://github.com/ptoews)

# v 0.6.7 (2020-05-03)

Expand Down
11 changes: 10 additions & 1 deletion conf/sample.syncflux.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,20 @@
admin-user = "admin"
admin-passwd = "admin"
timeout = "10s"

tls-insecure-skip-verify = true
tls-ca = ""
tls-cert = ""
tls-key = ""

[[influxdb]]
release = "1x"
name = "influxdb02"
location = "http://127.0.0.1:8087/"
admin-user = "admin"
admin-passwd = "admin"
timeout = "10s"
tls-insecure-skip-verify = true
tls-ca = ""
tls-cert = ""
tls-key = ""

3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/toni-moreno/syncflux
go 1.13

require (
github.com/coreos/go-etcd v2.0.0+incompatible // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gammazero/deque v0.0.0-20200310222745-50fa758af896 // indirect
github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6
Expand All @@ -21,7 +20,7 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.6.3
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8 // indirect
github.com/stretchr/testify v1.5.1 // indirect
github.com/unknwon/com v1.0.1 // indirect
golang.org/x/crypto v0.0.0-20200427165652-729f1e841bcc // indirect
golang.org/x/sys v0.0.0-20200427175716-29b57079015a // indirect
Expand Down
40 changes: 2 additions & 38 deletions go.sum

Large diffs are not rendered by default.

102 changes: 44 additions & 58 deletions pkg/agent/client.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package agent

import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"strconv"

"encoding/json"

"time"

"github.com/influxdata/influxdb1-client/v2"
client "github.com/influxdata/influxdb1-client/v2"
"github.com/toni-moreno/syncflux/pkg/agent/try"
)

Expand Down Expand Up @@ -37,52 +38,46 @@ func (rp *RetPol) GetFirstTime(max time.Duration) time.Time {
return time.Now().Add(-rp.Duration)
}

func DBclient(location string, user string, pass string) (client.Client, error) {

info := client.HTTPConfig{
Addr: location,
Username: user,
Password: pass,
UserAgent: "syncflux-agent",
Timeout: 120 * time.Second,
func GetTLSConfig(
SSLCert, SSLKey, SSLCA string,
InsecureSkipVerify bool,
) (*tls.Config, error) {
if SSLCert == "" && SSLKey == "" && SSLCA == "" && !InsecureSkipVerify {
return nil, nil
}

con, err2 := client.NewHTTPClient(info)
if err2 != nil {
log.Printf("Fail to build newclient to database %s, error: %s\n", location, err2)
return nil, err2
t := &tls.Config{
InsecureSkipVerify: InsecureSkipVerify,
}

dur, ver, err3 := con.Ping(time.Duration(10) * time.Second)
if err3 != nil {
log.Printf("Fail to build newclient to database %s, error: %s\n", location, err3)
return nil, err3
}
if SSLCA != "" {
caCert, err := ioutil.ReadFile(SSLCA)
if err != nil {
return nil, fmt.Errorf("Could not load TLS CA: %s", err)
}

q := client.Query{
Command: "show databases",
Database: "",
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
t.RootCAs = caCertPool
}
response, err4 := con.Query(q)
if err4 == nil && response.Error() == nil {
log.Println(response.Results)
return con, nil
} else {
if err4 != nil {
return nil, err4
}
if response.Error() != nil {
return nil, response.Error()

if SSLCert != "" && SSLKey != "" {
cert, err := tls.LoadX509KeyPair(SSLCert, SSLKey)
if err != nil {
return nil, fmt.Errorf(
"Could not load TLS client key/certificate from %s:%s: %s",
SSLKey, SSLCert, err)
}

t.Certificates = []tls.Certificate{cert}
t.BuildNameToCertificate()
}
log.Debugf("Happy as a hippo! %v, %s", dur, ver)

return con, nil
// will be nil by default if nothing is provided
return t, nil
}

func CreateDB(con client.Client, db string, rp *RetPol) error {

if db == "_internal" {
return nil
}
Expand All @@ -108,7 +103,6 @@ func CreateDB(con client.Client, db string, rp *RetPol) error {
}

func CreateRP(con client.Client, db string, rp *RetPol) error {

cmd := "CREATE RETENTION POLICY \"" + rp.Name + "\" ON \"" + db + "\" DURATION " + rp.Duration.String() + " REPLICATION " + strconv.FormatInt(rp.NReplicas, 10) + " SHARD DURATION " + rp.ShardGroupDuration.String()
if rp.Def {
cmd += " DEFAULT"
Expand All @@ -133,7 +127,6 @@ func CreateRP(con client.Client, db string, rp *RetPol) error {
}

func SetDefaultRP(con client.Client, db string, rp *RetPol) error {

cmd := "ALTER RETENTION POLICY \"" + rp.Name + "\" ON \"" + db + "\" DEFAULT"

log.Debugf("Influx QUERY: %s", cmd)
Expand Down Expand Up @@ -233,11 +226,10 @@ func GetRetentionPolicies(con client.Client, db string) ([]*RetPol, error) {
}

func GetFields(c client.Client, sdb string, meas string, rp string) map[string]*FieldSch {

fields := make(map[string]*FieldSch)

cmd := "show field keys from \"" + meas + "\""
//get measurements from database
// get measurements from database
q := client.Query{
Command: cmd,
Database: sdb,
Expand All @@ -256,7 +248,7 @@ func GetFields(c client.Client, sdb string, meas string, rp string) map[string]*
} else {

values := res[0].Series[0].Values
//show progress of getting measurements
// show progress of getting measurements
for _, row := range values {
fieldname := row[0].(string)
fieldtype := row[1].(string)
Expand All @@ -269,9 +261,8 @@ func GetFields(c client.Client, sdb string, meas string, rp string) map[string]*
}

func GetMeasurements(c client.Client, sdb string, rp string, mesafilter string) []*MeasurementSch {

cmd := "show measurements"
//get measurements from database
// get measurements from database
q := client.Query{
Command: cmd,
Database: sdb,
Expand All @@ -285,7 +276,7 @@ func GetMeasurements(c client.Client, sdb string, rp string, mesafilter string)
log.Printf("Fail to get response from database, get measurements error: %s\n", err.Error())
}

//log.Debugf("%s: %+v", cmd, response)
// log.Debugf("%s: %+v", cmd, response)

res := response.Results

Expand All @@ -295,7 +286,7 @@ func GetMeasurements(c client.Client, sdb string, rp string, mesafilter string)

values := res[0].Series[0].Values

//show progress of getting measurements
// show progress of getting measurements

for _, row := range values {
measurement := fmt.Sprintf("%v", row[0])
Expand All @@ -306,7 +297,6 @@ func GetMeasurements(c client.Client, sdb string, rp string, mesafilter string)

}
return measurements

}

func UnixNano2Time(tstamp int64) (time.Time, error) {
Expand All @@ -332,8 +322,8 @@ func ReadDB(c client.Client, sdb, srp, ddb, drp, cmd string, fieldmap map[string
RWRetryDelay := MainConfig.General.RWRetryDelay
totalpoints = 0

//param := make(map[string]interface{})
//param["wait_for_leader"] = "2000s"
// param := make(map[string]interface{})
// param["wait_for_leader"] = "2000s"

q := client.Query{
Command: cmd,
Expand All @@ -342,7 +332,7 @@ func ReadDB(c client.Client, sdb, srp, ddb, drp, cmd string, fieldmap map[string
Precision: "ns",
Chunked: true,
ChunkSize: 10000,
//Parameters: param,
// Parameters: param,
}

bpcfg := client.BatchPointsConfig{
Expand All @@ -358,7 +348,7 @@ func ReadDB(c client.Client, sdb, srp, ddb, drp, cmd string, fieldmap map[string
}
var response *client.Response

//Retry query if some error happens
// Retry query if some error happens

err = try.Do(func(attempt int) (bool, error) {
var qerr error
Expand All @@ -373,7 +363,6 @@ func ReadDB(c client.Client, sdb, srp, ddb, drp, cmd string, fieldmap map[string
}

return attempt < RWMaxRetries, qerr

})
if err != nil {
log.Errorf("Max Retries (%d) exceeded on read Data: Last error %s ", RWMaxRetries, err)
Expand All @@ -387,7 +376,7 @@ func ReadDB(c client.Client, sdb, srp, ddb, drp, cmd string, fieldmap map[string
resLength := len(res)
for k := 0; k < resLength; k++ {

//show progress of reading series
// show progress of reading series
log.Tracef("Reading %d Series for db %s", len(res[k].Series), sdb)
for _, ser := range res[k].Series {
log.Tracef("ROW Result [%d] [%#+v]", k, ser)
Expand Down Expand Up @@ -460,11 +449,10 @@ func ReadDB(c client.Client, sdb, srp, ddb, drp, cmd string, fieldmap map[string
case string, bool, int64, float64:
field[ser.Columns[i]] = v[i]
default:
//Supposed to be ok
// Supposed to be ok
log.Warnf("Error unknown type %T on field %s don't know about type %T! value %#+v \n", vt, ser.Columns[i], vt)
field[ser.Columns[i]] = v[i]
}

}
}
log.Tracef("POINT TIME [%s] - NOW[%s] | MEAS: %s | TAGS: %#+v | FIELDS: %#+v| ", timestamp.String(), time.Now().String(), ser.Name, ser.Tags, field)
Expand Down Expand Up @@ -492,7 +480,6 @@ func min(a, b int) int {
}

func BpSplit(bp client.BatchPoints, splitnum int) []client.BatchPoints {

points := bp.Points()
len := len(points)
lim := (len / splitnum) + 1
Expand Down Expand Up @@ -529,12 +516,11 @@ func BpSplit(bp client.BatchPoints, splitnum int) []client.BatchPoints {
}

func WriteDB(c client.Client, bp client.BatchPoints) error {

RWMaxRetries := MainConfig.General.RWMaxRetries
RWRetryDelay := MainConfig.General.RWRetryDelay
MaxPointsOnSingleWrite := MainConfig.General.MaxPointsOnSingleWrite

//spliting writtes max of MaxPointsOnSingleWrite points
// spliting writtes max of MaxPointsOnSingleWrite points
log.Debugf("MaxPointsOnSingleWrite [%d] ", MaxPointsOnSingleWrite)
sbp := BpSplit(bp, MaxPointsOnSingleWrite)

Expand Down
43 changes: 33 additions & 10 deletions pkg/agent/influxmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package agent

import (
"fmt"
"net/http"
"net/url"
"sync"
"time"

"github.com/influxdata/influxdb1-client/v2"
client "github.com/influxdata/influxdb1-client/v2"
"github.com/toni-moreno/syncflux/pkg/config"
)

Expand All @@ -27,7 +29,6 @@ func (im *InfluxMonitor) setStatError() {
im.statsData.Lock()
defer im.statsData.Unlock()
im.statusOK = false

}

func (im *InfluxMonitor) setStatOK(t time.Duration, version string) {
Expand All @@ -46,15 +47,38 @@ func (im *InfluxMonitor) GetState() (bool, time.Time, time.Duration) {
}

func (im *InfluxMonitor) InitPing() (client.Client, time.Duration, string, error) {

info := client.HTTPConfig{
Addr: im.cfg.Location,
Username: im.cfg.AdminUser,
Password: im.cfg.AdminPasswd,
Timeout: im.cfg.Timeout,
curl, err := url.Parse(im.cfg.Location)
if err != nil {
return nil, 0, "", err
}

con, err2 := client.NewHTTPClient(info)
var conf client.HTTPConfig
if curl.Scheme == "https" {
tls, err := GetTLSConfig(im.cfg.TLSCert, im.cfg.TLSKey, im.cfg.TLSCA, im.cfg.TLSInsecureSkipVerify)
if err != nil {
log.Errorf("Error on Create TLS config: %s", err)
return nil, 0, "", err
}
conf = client.HTTPConfig{
Addr: im.cfg.Location,
Username: im.cfg.AdminUser,
Password: im.cfg.AdminPasswd,
UserAgent: "syncflux-agent",
Timeout: time.Duration(im.cfg.Timeout) * time.Second,
TLSConfig: tls,
Proxy: http.ProxyFromEnvironment,
}
} else {
conf = client.HTTPConfig{
Addr: im.cfg.Location,
Username: im.cfg.AdminUser,
Password: im.cfg.AdminPasswd,
UserAgent: "syncflux-agent",
Timeout: time.Duration(im.cfg.Timeout) * time.Second,
Proxy: http.ProxyFromEnvironment,
}
}
con, err2 := client.NewHTTPClient(conf)
if err2 != nil {
log.Errorf("Fail to build newclient to database %s, error: %s\n", im.cfg.Location, err2)
return nil, 0, "", err2
Expand Down Expand Up @@ -113,7 +137,6 @@ func (im *InfluxMonitor) UpdateCli() client.Client {
}

func (im *InfluxMonitor) Ping() (time.Duration, string, error) {

cli := im.GetCli()
if cli == nil {
return 0, "", fmt.Errorf("can not ping database, the client is not initialized")
Expand Down
Loading

0 comments on commit b1da934

Please sign in to comment.