Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor sending and receiving of packets #25

Merged
merged 15 commits into from
Jul 22, 2017
5 changes: 0 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ go:
- 1.8
- tip

before_install:
- if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then sudo apt-get install -q libpcap-dev ; fi
- if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then brew update ; fi
- if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then brew install libpcap ; fi

install: make install_ci

script: make test
Expand Down
18 changes: 11 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ system. It provides fast and easy active end-to-end functional testing
of all the components in Data Center and Cloud infrastructures.
Arachne is able to detect intra-DC, inter-DC, DC-to-Cloud, and
DC-to-External-Services issues by generating minimal traffic:

- Reachability
- Round-trip and 1-way latency
- Silent packet drops and black holes
Expand All @@ -14,10 +14,6 @@ DC-to-External-Services issues by generating minimal traffic:
(accidental or not)
- Whether network-level SLAs are met

## Requirements

* libpcap

## Usage

There are two ways to use the Arachne package.
Expand All @@ -33,8 +29,8 @@ Import this package and call Arachne from your program/service with
where the option provided above is among the few optional ones.


Below is the list of all the CLI options available, when Arachne is
used as a standalone program. The default options should be good
Below is the list of all the CLI options available, when Arachne is
used as a standalone program. The default options should be good
enough for most users.

```
Expand Down Expand Up @@ -73,6 +69,14 @@ as root user, by being granted `CAP_NET_RAW` capability
(see: [capabilities][]).


### Note on BPF filtering

When receiving packets, Arachne attempts to apply a BPF filter to the raw socket
so that processing of packets occurs on a much smaller set (ones destined
specifically for Arachne agent testing). This is currently supported only on
Linux and thus performance will be worse on BSD-based systems where a larger
number of packets must be inspected.

<hr>

Released under the [MIT License](LICENSE.md).
Expand Down
28 changes: 21 additions & 7 deletions bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/uber/arachne/collector"
"github.com/uber/arachne/config"
d "github.com/uber/arachne/defines"
"github.com/uber/arachne/internal/ip"
"github.com/uber/arachne/internal/log"
"github.com/uber/arachne/internal/tcp"
"github.com/uber/arachne/internal/util"
Expand Down Expand Up @@ -86,12 +87,15 @@ func Run(ec *config.Extended, opts ...Option) {
logger.Error("error initializing stats", zap.Error(err))
}

// Hold raw socket connection for IPv4 packets
var connIPv4 *ip.Conn

logger.Info("Starting up arachne")

for {
var (
err error
currentDSCP tcp.DSCPValue
currentDSCP ip.DSCPValue
dnsWg sync.WaitGroup
finishedCycleUpload sync.WaitGroup
)
Expand Down Expand Up @@ -120,7 +124,7 @@ func Run(ec *config.Extended, opts ...Option) {
dnsRefresh := time.NewTicker(d.DNSRefreshInterval)
dnsWg.Add(1)
killC.DNSRefresh = make(chan struct{})
config.ResolveDnsTargets(gl.Remotes, gl.RemoteConfig, dnsRefresh, &dnsWg,
config.ResolveDNSTargets(gl.Remotes, gl.RemoteConfig, dnsRefresh, &dnsWg,
killC.DNSRefresh, logger)
dnsWg.Wait()
logger.Debug("Remotes after DNS resolution include",
Expand All @@ -132,6 +136,16 @@ func Run(ec *config.Extended, opts ...Option) {
sentC := make(chan tcp.Message, d.ChannelOutBufferSize)
rcvdC := make(chan tcp.Message, d.ChannelInBufferSize)

// Connection for IPv4 packets
if connIPv4 == nil {
connIPv4 = ip.NewConn(
d.AfInet,
gl.RemoteConfig.TargetTCPPort,
gl.RemoteConfig.InterfaceName,
gl.RemoteConfig.SrcAddress,
logger)
}

// Actual echoing is a percentage of the total configured batch cycle duration.
realBatchInterval := time.Duration(float32(gl.RemoteConfig.BatchInterval) *
d.BatchIntervalEchoingPerc)
Expand All @@ -150,8 +164,7 @@ func Run(ec *config.Extended, opts ...Option) {
if !*gl.CLI.SenderOnlyMode {
// Listen for responses or probes from other IPv4 arachne agents.
killC.Receiver = make(chan struct{})
err = tcp.Receiver("ip4", &gl.RemoteConfig.SrcAddress, gl.RemoteConfig.TargetTCPPort,
gl.RemoteConfig.InterfaceName, sentC, rcvdC, killC.Receiver, logger)
err = tcp.Receiver(connIPv4, sentC, rcvdC, killC.Receiver, logger)
if err != nil {
logger.Fatal("IPv4 receiver failed to start", zap.Error(err))
}
Expand All @@ -163,23 +176,24 @@ func Run(ec *config.Extended, opts ...Option) {
logger.Debug("Echoing...")
// Start echoing all targets.
killC.Echo = make(chan struct{})
tcp.EchoTargets(gl.Remotes, &gl.RemoteConfig.SrcAddress, gl.RemoteConfig.TargetTCPPort,
tcp.EchoTargets(gl.Remotes, connIPv4, gl.RemoteConfig.TargetTCPPort,
gl.RemoteConfig.SrcTCPPortRange, gl.RemoteConfig.QoSEnabled, &currentDSCP,
realBatchInterval, batchEndCycle, sentC, *gl.CLI.SenderOnlyMode,
completeCycleUpload, &finishedCycleUpload, killC.Echo, logger)
}

select {
case <-configRefresh.C:
util.CleanUpRefresh(killC, *gl.CLI.ReceiverOnlyMode, *gl.CLI.SenderOnlyMode, gl.RemoteConfig.ResolveDNS)
util.CleanUpRefresh(killC, *gl.CLI.ReceiverOnlyMode,
*gl.CLI.SenderOnlyMode, gl.RemoteConfig.ResolveDNS)
log.ResetLogFiles(gl.App.Logging.OutputPaths, d.LogFileSizeMaxMB, d.LogFileSizeKeepKB, logger)
logger.Info("Refreshing target list file, if needed")
configRefresh.Stop()
case <-sigC:
logger.Debug("Received SIG")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 187 exceeds the 120 char width limit.

configRefresh.Stop()
util.CleanUpAll(killC, *gl.CLI.ReceiverOnlyMode, *gl.CLI.SenderOnlyMode,
gl.RemoteConfig.ResolveDNS, gl.App.PIDPath, sr, logger)
gl.RemoteConfig.ResolveDNS, connIPv4, gl.App.PIDPath, sr, logger)
logger.Info("Exiting")
os.Exit(0)
}
Expand Down
63 changes: 33 additions & 30 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ import (

"github.com/uber/arachne/config"
"github.com/uber/arachne/defines"
"github.com/uber/arachne/internal/ip"
"github.com/uber/arachne/internal/log"
"github.com/uber/arachne/internal/tcp"
"github.com/uber/arachne/metrics"

"github.com/fatih/color"
"github.com/google/gopacket/layers"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand All @@ -49,14 +51,14 @@ type report struct {
}

// map[target address string] => *[QOS_DCSP_VALUE] =>map[source port]
type resultStore map[string]*[defines.NumQOSDCSPValues]map[uint16]report
type resultStore map[string]*[defines.NumQOSDCSPValues]map[layers.TCPPort]report
type messageStore map[string]*[defines.NumQOSDCSPValues]srcPortScopedMessageStore

type srcPortScopedMessageStore struct {
sent srcPortScopedMessages
rcvd srcPortScopedMessages
}
type srcPortScopedMessages map[uint16]tcp.Message
type srcPortScopedMessages map[layers.TCPPort]tcp.Message

func (ms messageStore) target(target string, QosDSCPIndex uint8) *srcPortScopedMessageStore {
// TODO: validate dscp is in range or create a dscp type alias
Expand All @@ -72,19 +74,19 @@ func (ms messageStore) target(target string, QosDSCPIndex uint8) *srcPortScopedM
return &ms[target][QosDSCPIndex]
}

func (spsm srcPortScopedMessages) add(srcPort uint16, message tcp.Message) {
func (spsm srcPortScopedMessages) add(srcPort layers.TCPPort, message tcp.Message) {
spsm[srcPort] = message
}

func (ms messageStore) sentAdd(target string, QosDSCPIndex uint8, srcPort uint16, message tcp.Message) {
func (ms messageStore) sentAdd(target string, QosDSCPIndex uint8, srcPort layers.TCPPort, message tcp.Message) {
ms.target(target, QosDSCPIndex).sent.add(srcPort, message)
}

func (ms messageStore) rcvdAdd(target string, QosDSCPIndex uint8, srcPort uint16, message tcp.Message) {
func (ms messageStore) rcvdAdd(target string, QosDSCPIndex uint8, srcPort layers.TCPPort, message tcp.Message) {
ms.target(target, QosDSCPIndex).rcvd.add(srcPort, message)
}

func (ms messageStore) existsRcvd(target string, QosDSCPIndex uint8, srcPort uint16) (tcp.Message, bool) {
func (ms messageStore) existsRcvd(target string, QosDSCPIndex uint8, srcPort layers.TCPPort) (tcp.Message, bool) {

if _, exists := ms[target]; !exists {
return tcp.Message{}, false
Expand All @@ -99,7 +101,7 @@ func (ms messageStore) existsRcvd(target string, QosDSCPIndex uint8, srcPort uin
return matchedMsg, true
}

func (ms messageStore) existsSent(target string, QosDSCPIndex uint8, srcPort uint16) (tcp.Message, bool) {
func (ms messageStore) existsSent(target string, QosDSCPIndex uint8, srcPort layers.TCPPort) (tcp.Message, bool) {

if _, exists := ms[target]; !exists {
return tcp.Message{}, false
Expand All @@ -114,23 +116,23 @@ func (ms messageStore) existsSent(target string, QosDSCPIndex uint8, srcPort uin
return matchedMsg, true
}

func (rs resultStore) add(target string, QosDSCPIndex uint8, srcPort uint16, r report) {
func (rs resultStore) add(target string, QosDSCPIndex uint8, srcPort layers.TCPPort, r report) {

if rs[target] == nil {
var resDSCP [defines.NumQOSDCSPValues]map[uint16]report
var resDSCP [defines.NumQOSDCSPValues]map[layers.TCPPort]report
rs[target] = &resDSCP
}
if rs[target][QosDSCPIndex] == nil {
rs[target][QosDSCPIndex] = make(map[uint16]report)
rs[target][QosDSCPIndex] = make(map[layers.TCPPort]report)
}
rs[target][QosDSCPIndex][srcPort] = r
}

type resultWalker func(report, string, string, uint16, bool, *log.Logger)
type resultWalker func(report, string, string, layers.TCPPort, bool, *log.Logger)

func (rs resultStore) walkResults(
remotes config.RemoteStore,
currentDSCP *tcp.DSCPValue,
currentDSCP *ip.DSCPValue,
foreground bool,
logger *log.Logger,
walkerF ...resultWalker) {
Expand All @@ -144,9 +146,10 @@ func (rs resultStore) walkResults(

qos := *currentDSCP
if remote.External {
qos = tcp.DSCPBeLow
qos = ip.DSCPBeLow
}
for srcPort, rep := range r[(tcp.GetDSCP).Pos(qos, logger)] {

for srcPort, rep := range r[(ip.GetDSCP).Pos(qos, logger)] {
walkerF[0](rep, remote.Hostname, remote.Location, srcPort, foreground, logger)
}
if len(walkerF) > 1 {
Expand Down Expand Up @@ -184,7 +187,7 @@ func (rs resultStore) processResults(

// Store processed report to 'result' data structure for stdout, if needed
if !*(gl.CLI.SenderOnlyMode) {
QosDSCPIndex := (tcp.GetDSCP).Pos(req.QosDSCP, logger)
QosDSCPIndex := (ip.GetDSCP).Pos(req.QosDSCP, logger)
rs.add(target, QosDSCPIndex, req.SrcPort, r)
}

Expand All @@ -194,7 +197,7 @@ func (rs resultStore) processResults(
func (rs resultStore) printResults(
gl *config.Global,
remotes config.RemoteStore,
currentDSCP *tcp.DSCPValue,
currentDSCP *ip.DSCPValue,
logger *log.Logger,
) {
foreground := *gl.CLI.Foreground
Expand All @@ -210,7 +213,7 @@ func Run(
sentC chan tcp.Message,
rcvdC chan tcp.Message,
remotes config.RemoteStore,
currentDSCP *tcp.DSCPValue,
currentDSCP *ip.DSCPValue,
sr metrics.Reporter,
completeCycleUpload chan bool,
wg *sync.WaitGroup,
Expand Down Expand Up @@ -246,7 +249,7 @@ func batchWorker(
remotes config.RemoteStore,
ms messageStore,
rs resultStore,
currentDSCP *tcp.DSCPValue,
currentDSCP *ip.DSCPValue,
sfn statsUploader,
sr metrics.Reporter,
completeCycleUpload chan bool,
Expand All @@ -263,7 +266,7 @@ func batchWorker(
zap.Any("type", out.Type))
continue
}
QosDSCPIndex := (tcp.GetDSCP).Pos(out.QosDSCP, logger)
QosDSCPIndex := (ip.GetDSCP).Pos(out.QosDSCP, logger)

// SYN sent
targetKey := out.DstAddr.String()
Expand All @@ -285,7 +288,7 @@ func batchWorker(
zap.Any("type", in.Type))
continue
}
QosDSCPIndex := (tcp.GetDSCP).Pos(in.QosDSCP, logger)
QosDSCPIndex := (ip.GetDSCP).Pos(in.QosDSCP, logger)

// SYN+ACK received
targetKey := in.SrcAddr.String()
Expand Down Expand Up @@ -359,8 +362,8 @@ type statsUploader func(
sr metrics.Reporter,
target string,
remotes config.RemoteStore,
QOSDSCP tcp.DSCPValue,
srcPort uint16,
QOSDSCP ip.DSCPValue,
srcPort layers.TCPPort,
r *report,
logger *log.Logger,
)
Expand All @@ -370,8 +373,8 @@ func statsUpload(
sr metrics.Reporter,
target string,
remotes config.RemoteStore,
QOSDSCP tcp.DSCPValue,
srcPort uint16,
QOSDSCP ip.DSCPValue,
srcPort layers.TCPPort,
r *report,
logger *log.Logger,
) {
Expand Down Expand Up @@ -416,12 +419,12 @@ func zeroOutResults(
for targetKey := range ms {
_, existsTarget := rs[targetKey]
if !existsTarget {
var resDSCP [defines.NumQOSDCSPValues]map[uint16]report
var resDSCP [defines.NumQOSDCSPValues]map[layers.TCPPort]report
rs[targetKey] = &resDSCP
}
for qosDSCP := 0; qosDSCP < defines.NumQOSDCSPValues; qosDSCP++ {
if rs[targetKey][qosDSCP] == nil {
rs[targetKey][qosDSCP] = make(map[uint16]report)
rs[targetKey][qosDSCP] = make(map[layers.TCPPort]report)
}
for srcPort := range ms[targetKey][qosDSCP].sent {
if _, existsSrc := rs[targetKey][qosDSCP][srcPort]; existsSrc {
Expand All @@ -430,7 +433,7 @@ func zeroOutResults(
rs[targetKey][qosDSCP][srcPort] = timedOutReport

// Upload timed out results
sfn(glr, sr, targetKey, remotes, tcp.GetDSCP[qosDSCP], srcPort, &timedOutReport, logger)
sfn(glr, sr, targetKey, remotes, ip.GetDSCP[qosDSCP], srcPort, &timedOutReport, logger)
time.Sleep(1 * time.Millisecond)
}
}
Expand Down Expand Up @@ -465,7 +468,7 @@ func printTableHeader(gl *config.Global, currentDSCP string, logger *log.Logger)
zap.String("version", defines.ArachneVersion),
zap.String("host", gl.RemoteConfig.HostName),
zap.String("host_location", gl.RemoteConfig.Location),
zap.Uint16("target_TCP_port", gl.RemoteConfig.TargetTCPPort),
zap.Any("target_TCP_port", gl.RemoteConfig.TargetTCPPort),
zap.String("QoS_DSCP", currentDSCP),
)
}
Expand All @@ -486,7 +489,7 @@ func printTableEntry(
r report,
targetHost string,
targetLocation string,
srcPort uint16,
srcPort layers.TCPPort,
foreground bool,
logger *log.Logger,
) {
Expand Down Expand Up @@ -532,7 +535,7 @@ func printTableEntry(
logger.Info("Result",
zap.String("target", targetHost),
zap.String("target_location", targetLocation),
zap.Uint16("source_port", srcPort),
zap.Any("source_port", srcPort),
twoWay,
oneWay,
zap.String("timed_out", timedOut))
Expand Down
Loading