Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor sending and receiving of packets #25

Merged
merged 15 commits into from
Jul 22, 2017
Merged

Conversation

henridevieux
Copy link
Contributor

This marks a very large set of changes on how Arachne sends and receives Echo requests. The agents will now open a single file descriptor for each the sending and receiving of packets - thus eliminating the need to create a socket for each outbound echo.

As a side effect of that change, the need arose for agents to manually craft IP headers, as the DSCP value can no longer be set on a per-socket basis. This change uses gopacket for the creation of these headers as well as for the decoding of incoming packets.

Lastly, pcap proved to be too inaccurate for testing in a sub-millisecond environment. This was due gopacket's pcap timeout minimum value of 1ms. And, although pcap_set_immediate_mode could be used, it would a very tight loop of requesting packets and error handling which is ultimately more overhead. The decision here is that creating a raw socket and applying a BPF filter (much to the same overall effect as pcap would achieve) is a more efficient and controllable solution.

Copy link

@billf billf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first pass

@@ -7,7 +7,6 @@ import:
- package: github.com/google/gopacket
version: ^1
subpackages:
- pcap
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was due gopacket's pcap timeout minimum value of 1ms.

That is unfortunate.

}
if filter, err := getBPFFilter(ipHeaderOffset, uint32(listenPort)); err != nil {
logger.Warn("Failed to compile BPF Filter", zap.Error(err))
} else {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} else {
// Attempt to attach the BPF filter.
// This is currently only supported on Linux systems.
if err = rs.attachBPF(filter); err != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err := rs.attachBPF

@@ -22,13 +22,20 @@ package ip

import (
"net"
"syscall"
"unsafe"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general, minimize "unsafe" code to the absolute minimum. in this case, that means splitting this file into iplayer_linux.go that contains the non-unsafe, linux-specific code.

if err := unByteTime.UnmarshalBinary(ts); err == nil {
pkt.Ts = unByteTime
// parsePktIP parses the IP header of an incoming packet and extracts the src IP addr and DSCP value.
func parsePktIP(pkt gopacket.Packet) (srcIP net.IP, dscpv DSCPValue, err error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err != nil {
ackNum := tcpHeader.Seq + 1
flags := tcpFlags{syn: true, ack: true}
if err = send(conn, &srcIP, listenPort, srcPortRange, DSCPv,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err := send(

Payload: pkt.tcpPayload.Ts},
Seq: pkt.seqNum,
Ack: pkt.ackNum,
Payload: payloadTime},
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider writing as

Ts: Timestamp{
  Run:     receiveTime,
  Payload: payloadTime,
},

as you come across these

func TestParsePktTCP(t *testing.T) {
assert := assert.New(t)

pktData := []byte{69, 0, 0, 55, 145, 204, 64, 0, 56, 6, 81, 145, 10, 1, 1, 10, 10, 0, 0, 10, 121, 24, 172, 79, 118, 141, 174, 112, 251, 246, 199, 17, 80, 18, 170, 170, 154, 237, 0, 0, 1, 0, 0, 0, 14, 208, 222, 40, 104, 36, 169, 82, 116, 0, 0}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

humans can't read this so it should be documented in some way. what is the nature of this packet that makes it suitable for this test.

assert.Equal(tcpHeader.DstPort, layers.TCPPort(44111), "unexpectedly formatted TCP Dst Port")

expectedTime, err := time.Parse(time.RFC3339, "2017-06-22T21:06:48.615076468+00:00")
if err != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

require.NoError(t, err, "Error parsing timestamp")

t.Fatal("Non IPv4 or IPv6 family should not be handled")
}

func TestParsePktIP(t *testing.T) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comments as above

"net"
"syscall"

"golang.org/x/net/bpf"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should go in the bottom of the imports with the rest of the external ones.

README.md Outdated
### Note on BPF filtering

When receiving packets, Arachne attempts to apply a BPF filter to the raw socket
so that processing of packets occurs on a much smaller set of (ones destined
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"of ..."? Parenthesis needed or not?

README.md Outdated
When receiving packets, Arachne attempts to apply a BPF filter to the raw socket
so that processing of packets occurs on a much smaller set of (ones destined
specifically for Arachne agent testing). This is currently supported only on
Linux and thus performance will be significantly worse on BSD-based systems
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

significantly?

bootstrap.go Outdated
@@ -150,8 +164,7 @@ func Run(ec *config.Extended, opts ...Option) {
if !*gl.CLI.SenderOnlyMode {
// Listen for responses or probes from other IPv4 arachne agents.
killC.Receiver = make(chan struct{})
err = tcp.Receiver("ip4", &gl.RemoteConfig.SrcAddress, gl.RemoteConfig.TargetTCPPort,
gl.RemoteConfig.InterfaceName, sentC, rcvdC, killC.Receiver, logger)
err = tcp.Receiver(connIPv4, gl.RemoteConfig.TargetTCPPort, sentC, rcvdC, killC.Receiver, logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is gl.RemoteConfig.TargetTCPPort redundant here? Can it not be extracted from connIPv4?

@@ -117,7 +117,7 @@ func TestRun(t *testing.T) {
Type: tcp.EchoRequest,
SrcAddr: source,
DstAddr: target,
Af: network.Family(&target),
Af: defines.AfInet,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind if we keep the following import consistent?
import defines as d

Copy link
Contributor Author

@henridevieux henridevieux Jul 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Importing of defines was previous code. We can address changing defines to d throughout the entire codebase in a separate diff.

@@ -81,47 +75,14 @@ func (q echoType) text(logger *log.Logger) string {
}

const tcpHdrSize int = 20 // 20 bytes without any TCP Options
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move these constants to defines.go?

}...)
default:
return 0, errors.New("unhandled AF family")
ipLayer, err := ip.GetIPHeaderLayer(af, uint8(dscpv), uint16(tcpLen), srcAddr, dstAddr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't dscpv already uint8? If not, we can fix the argument of the calling function.

if !destinedToArachne {
srcIP, DSCPv, err := parsePktIP(pkt)
if err != nil {
logger.Error("Error parsing packet IP layer", zap.Error(err), zap.Any("packet", pkt))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capitalized error message... See previous comment please.


// Replying with SYN+ACK to Arachne agent
srcPortRange := PortRange{pkt.srcPort, pkt.srcPort}
srcPortRange := PortRange{uint16(tcpHeader.SrcPort), uint16(tcpHeader.SrcPort)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cast here? Can we take care of this above (in PortRange for instance if needed)?


"github.com/google/gopacket"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imports on line 30 and 31 should come with set of line 34.

// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package ip
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See previous conversation - I agreed with @billf that non-unsafe code should be separated out from unsafe code.

Copy link
Contributor

@Vasilis Vasilis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once these are addressed, I believe we're good for prime time!

@@ -179,7 +192,7 @@ func Run(ec *config.Extended, opts ...Option) {
logger.Debug("Received SIG")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 187 exceeds the 120 char width limit.

@@ -27,8 +27,10 @@ import (
"sync"
"time"

"github.com/google/gopacket/layers"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import should follow after the uber arachne imports with one line of separation from latter.

config/config.go Outdated
@@ -37,6 +37,7 @@ import (
"github.com/uber/arachne/internal/tcp"
"github.com/uber/arachne/metrics"

"github.com/google/gopacket/layers"
"github.com/jawher/mow.cli"
"github.com/pkg/errors"
"github.com/uber/arachne/defines"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please move this under line 38 above?

@@ -0,0 +1,55 @@
// Copyright (c) 2016 Uber Technologies, Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is really the equivalent of ip_darwin.go in terms of functions included, I would call this file ip_linux.go and the other linux file I would call it ip_linux_bpf.go or ip_linux_WHATEVER.go so that it follows alphabetically the original file.

"time"

"github.com/google/gopacket"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above.


if pkt.dataOffset > 5 {
binary.Read(r, binary.BigEndian, &pkt.options)
tcp := layer.(*layers.TCP)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warning: name of variable starts with package name.

if err != nil {
ackNum := tcpHeader.Seq + 1
flags := tcpFlags{syn: true, ack: true}
// Replies are sent to the same port as this agent is listening on (conn.ListenPort)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"as the one this agent..."

}
// Send 'echo' reply message received to collector
in <- inMsg

if inMsg.FromExternalTarget(listenPort) {
if inMsg.FromExternalTarget(conn.ListenPort) {
//TODO verify
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

@@ -612,14 +360,14 @@ func EchoTargets(
) {
go func() {
for {
for i := range GetDSCP {
for i := range ip.GetDSCP {
t0 := time.Now()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to monoNow()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can address in a separate change.

flag = "RST"
default:
flag = ""
}

go func() {
defer syscall.Close(sendSocket)

rand.Seed(time.Now().UnixNano())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be in Receiver()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

@henridevieux henridevieux merged commit f930c36 into master Jul 22, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants