Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 7.62.x] [NPM-4125] Refactor connection direction to use SYN packets #33081

Open
wants to merge 1 commit into
base: 7.62.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/network/event_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ type ConnectionFamily uint8
type ConnectionDirection uint8

const (
// UNKNOWN represents connections where the direction is not known (yet)
UNKNOWN ConnectionDirection = 0

// INCOMING represents connections inbound to the host
INCOMING ConnectionDirection = 1 // incoming

Expand Down
68 changes: 54 additions & 14 deletions pkg/network/tracer/connection/ebpfless/tcp_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package ebpfless

import (
"fmt"
"github.com/DataDog/datadog-agent/pkg/util/log"
"syscall"
"time"

Expand All @@ -19,6 +18,7 @@ import (

"github.com/DataDog/datadog-agent/pkg/network"
"github.com/DataDog/datadog-agent/pkg/network/config"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

type connectionState struct {
Expand Down Expand Up @@ -61,6 +61,9 @@ type connectionState struct {
// Can we make all connections in TCPProcessor have a ConnectionStats no matter what, and
// filter them out in GetConnections?
lastUpdateEpoch uint64

// connDirection has the direction of the connection, if we saw the SYN packet
connDirection network.ConnectionDirection
}

func (st *connectionState) hasMissedHandshake() bool {
Expand All @@ -71,9 +74,9 @@ func (st *connectionState) hasMissedHandshake() bool {
type TCPProcessor struct {
cfg *config.Config
// pendingConns contains connections with tcpState == connStatAttempted
pendingConns map[network.ConnectionTuple]*connectionState
pendingConns map[PCAPTuple]*connectionState
// establishedConns contains connections with tcpState == connStatEstablished
establishedConns map[network.ConnectionTuple]*connectionState
establishedConns map[PCAPTuple]*connectionState
}

// TODO make this into a config value
Expand All @@ -84,8 +87,8 @@ const pendingConnTimeoutNs = uint64(5 * time.Second)
func NewTCPProcessor(cfg *config.Config) *TCPProcessor {
return &TCPProcessor{
cfg: cfg,
pendingConns: make(map[network.ConnectionTuple]*connectionState, maxPendingConns),
establishedConns: make(map[network.ConnectionTuple]*connectionState, cfg.MaxTrackedConnections),
pendingConns: make(map[PCAPTuple]*connectionState, maxPendingConns),
establishedConns: make(map[PCAPTuple]*connectionState, cfg.MaxTrackedConnections),
}
}

Expand Down Expand Up @@ -135,6 +138,10 @@ func (t *TCPProcessor) updateSynFlag(conn *network.ConnectionStats, st *connecti
if tcp.RST {
return
}
// if this is the initial SYN, store the connection direction
if tcp.SYN && !tcp.ACK {
st.connDirection = connDirectionFromPktType(pktType)
}
// progress the synStates based off this packet
if pktType == unix.PACKET_OUTGOING {
st.localSynState.update(tcp.SYN, tcp.ACK)
Expand Down Expand Up @@ -247,6 +254,10 @@ func (t *TCPProcessor) updateRstFlag(conn *network.ConnectionStats, st *connecti
if st.tcpState == connStatAttempted {
reason = syscall.ECONNREFUSED
}

if conn.TCPFailures == nil {
conn.TCPFailures = make(map[uint16]uint32)
}
conn.TCPFailures[uint16(reason)]++

if st.tcpState == connStatEstablished {
Expand Down Expand Up @@ -278,7 +289,12 @@ func (t *TCPProcessor) Process(conn *network.ConnectionStats, timestampNs uint64
return ProcessResultNone, nil
}

st := t.getConn(conn.ConnectionTuple)
tuple := MakeEbpflessTuple(conn.ConnectionTuple)
st, ok := t.getConn(tuple)
if !ok {
// create a fresh state object that will be stored by moveConn later
st = &connectionState{}
}
origState := st.tcpState

t.updateSynFlag(conn, st, pktType, tcp, payloadLen)
Expand All @@ -288,7 +304,7 @@ func (t *TCPProcessor) Process(conn *network.ConnectionStats, timestampNs uint64

stateChanged := st.tcpState != origState
if stateChanged {
ok := t.moveConn(conn.ConnectionTuple, st)
ok := t.moveConn(tuple, st)
// if the map is full then we are unable to move the connection, report that
if !ok {
return ProcessResultMapFull, nil
Expand All @@ -306,26 +322,25 @@ func (t *TCPProcessor) Process(conn *network.ConnectionStats, timestampNs uint64
return ProcessResultNone, nil
}

func (t *TCPProcessor) getConn(tuple network.ConnectionTuple) *connectionState {
func (t *TCPProcessor) getConn(tuple PCAPTuple) (*connectionState, bool) {
if st, ok := t.establishedConns[tuple]; ok {
return st
return st, true
}
if st, ok := t.pendingConns[tuple]; ok {
return st
return st, true
}
// otherwise, create a fresh state object that will be stored by moveConn later
return &connectionState{}
return nil, false
}

// RemoveConn clears a ConnectionTuple from its internal state.
func (t *TCPProcessor) RemoveConn(tuple network.ConnectionTuple) {
func (t *TCPProcessor) RemoveConn(tuple PCAPTuple) {
delete(t.pendingConns, tuple)
delete(t.establishedConns, tuple)
}

// moveConn moves a connection to the correct map based on its tcpState.
// If it had to drop the connection because the target map was full, it returns false.
func (t *TCPProcessor) moveConn(tuple network.ConnectionTuple, st *connectionState) bool {
func (t *TCPProcessor) moveConn(tuple PCAPTuple, st *connectionState) bool {
t.RemoveConn(tuple)

switch st.tcpState {
Expand Down Expand Up @@ -365,3 +380,28 @@ func (t *TCPProcessor) CleanupExpiredPendingConns(timestampNs uint64) {
}
}
}

// MakeEbpflessTuple converts a network.ConnectionTuple to a PCAPTuple.
// See the PCAPTuple doc for more information.
func MakeEbpflessTuple(tuple network.ConnectionTuple) PCAPTuple {
ret := PCAPTuple(tuple)
ret.Pid = 0
ret.Direction = network.UNKNOWN
return ret
}

// MakeConnStatsTuple converts a PCAPTuple to a network.ConnectionTuple.
func MakeConnStatsTuple(tuple PCAPTuple) network.ConnectionTuple {
// Direction is still 0, this will get set by the ebpfless tracer in finalizeConnectionDirection
return network.ConnectionTuple(tuple)
}

// GetConnDirection returns the direction of the connection.
// If the SYN packet was not seen (for a pre-existing connection), it returns ConnDirUnknown.
func (t *TCPProcessor) GetConnDirection(tuple PCAPTuple) (network.ConnectionDirection, bool) {
conn, ok := t.getConn(tuple)
if !ok {
return network.UNKNOWN, false
}
return conn.connDirection, true
}
55 changes: 51 additions & 4 deletions pkg/network/tracer/connection/ebpfless/tcp_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
package ebpfless

import (
"github.com/DataDog/datadog-agent/pkg/network/config"
"net"
"syscall"
"testing"
"time"

"github.com/DataDog/datadog-agent/pkg/network/config"

"golang.org/x/sys/unix"

"github.com/google/gopacket/layers"
Expand Down Expand Up @@ -176,6 +177,15 @@ func newTCPTestFixture(t *testing.T) *tcpTestFixture {
}
}

func (fixture *tcpTestFixture) getConnectionState() *connectionState {
tuple := MakeEbpflessTuple(fixture.conn.ConnectionTuple)
conn, ok := fixture.tcp.getConn(tuple)
if ok {
return conn
}
return &connectionState{}
}

func (fixture *tcpTestFixture) runPkt(pkt testCapture) ProcessResult {
if fixture.conn == nil {
fixture.conn = makeTCPStates(pkt)
Expand All @@ -200,9 +210,8 @@ func (fixture *tcpTestFixture) runAgainstState(packets []testCapture, expected [
expectedStrs = append(expectedStrs, labelForState(expected[i]))

fixture.runPkt(pkt)
connTuple := fixture.conn.ConnectionTuple
actual := fixture.tcp.getConn(connTuple).tcpState
actualStrs = append(actualStrs, labelForState(actual))
tcpState := fixture.getConnectionState().tcpState
actualStrs = append(actualStrs, labelForState(tcpState))
}
require.Equal(fixture.t, expectedStrs, actualStrs)
}
Expand Down Expand Up @@ -815,3 +824,41 @@ func TestPendingConnExpiry(t *testing.T) {
f.tcp.CleanupExpiredPendingConns(now + tenSecNs)
require.Empty(t, f.tcp.pendingConns)
}

func TestTCPProcessorConnDirection(t *testing.T) {
pb := newPacketBuilder(lowerSeq, higherSeq)

t.Run("outgoing", func(t *testing.T) {
f := newTCPTestFixture(t)
capture := []testCapture{
pb.outgoing(0, 0, 0, SYN),
pb.incoming(0, 0, 1, SYN|ACK),
pb.outgoing(0, 1, 1, ACK),
}
f.runPkts(capture)

require.Equal(t, network.OUTGOING, f.getConnectionState().connDirection)
})
t.Run("incoming", func(t *testing.T) {
f := newTCPTestFixture(t)
capture := []testCapture{
pb.incoming(0, 0, 0, SYN),
pb.outgoing(0, 0, 1, SYN|ACK),
pb.incoming(0, 1, 1, ACK),
}
f.runPkts(capture)

require.Equal(t, network.INCOMING, f.getConnectionState().connDirection)
})
t.Run("preexisting", func(t *testing.T) {
f := newTCPTestFixture(t)
capture := []testCapture{
// just sending data, no SYN
pb.outgoing(1, 10, 10, ACK),
pb.incoming(1, 10, 11, ACK),
}
f.runPkts(capture)

require.Equal(t, network.UNKNOWN, f.getConnectionState().connDirection)
})
}
17 changes: 17 additions & 0 deletions pkg/network/tracer/connection/ebpfless/tcp_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,28 @@ import (

"github.com/google/gopacket/layers"

"github.com/DataDog/datadog-agent/pkg/network"
"github.com/DataDog/datadog-agent/pkg/telemetry"
)

const ebpflessModuleName = "ebpfless_network_tracer"

// PCAPTuple represents a unique key for an ebpfless tracer connection.
// It represents a network.ConnectionTuple with only the fields that are available
// via packet capture: PID and Direction are zeroed out.
type PCAPTuple network.ConnectionTuple

func connDirectionFromPktType(pktType uint8) network.ConnectionDirection {
switch pktType {
case unix.PACKET_HOST:
return network.INCOMING
case unix.PACKET_OUTGOING:
return network.OUTGOING
default:
return network.UNKNOWN
}
}

// ProcessResult represents what the ebpfless tracer should do with ConnectionStats after processing a packet
type ProcessResult uint8

Expand Down
Loading
Loading