Skip to content

Commit

Permalink
Merge pull request #4 from n4-networks/bug_1_2
Browse files Browse the repository at this point in the history
fix for issue # 2, 3
  • Loading branch information
openuspio authored Nov 15, 2023
2 parents 30106ba + 2363dac commit 74251eb
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 20 deletions.
2 changes: 1 addition & 1 deletion configs/openusp.env
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ DB_NAME=usp
STOMP_MODE=nondtls
STOMP_ADDR=openusp-broker:61613
STOMP_TLS_ADDR=openusp-broker:61614
STOMP_AGENT_QUEUE=/queue/agent-1
STOMP_CNTLR_QUEUE=/queue/controller-notify-dest
STOMP_USER=
STOMP_PASSWD=
Expand All @@ -35,6 +34,7 @@ CACHE_ADDR=openusp-cache:6379
CNTLR_GRPC_PORT=9001
CNTLR_EPID=self::usp-controller
CNTLR_USP_PROTO_VERSION=1.2
CNTLR_USP_PROTO_VERSION_CHECK=false

# API Server: Required by ApiServer and CLI
HTTP_PORT=8081
Expand Down
12 changes: 10 additions & 2 deletions pkg/cntlr/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cntlr
import (
"log"
"os"
"strconv"

"github.com/joho/godotenv"
)
Expand All @@ -37,8 +38,9 @@ type cacheCfg struct {
}

type uspCfg struct {
endpointId string
protoVersion string
endpointId string
protoVersion string
protoVersionCheck bool
}

type cntlrCfg struct {
Expand Down Expand Up @@ -79,6 +81,12 @@ func (c *Cntlr) loadConfigFromEnv() error {
} else {
log.Println("CNTLR USP Protocol Version is not set")
}
if env, ok := os.LookupEnv("CNTLR_USP_PROTO_VERSION_CHECK"); ok {
boolValue, _ := strconv.ParseBool(env)
c.cfg.usp.protoVersionCheck = boolValue
} else {
log.Println("CNTLR USP Protocol Version Check is not enforced")
}

log.Printf("CNTLR Config params: %+v\n", c.cfg)

Expand Down
2 changes: 1 addition & 1 deletion pkg/cntlr/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (c *Cntlr) Cli() {
Func: c.cliSendDeleteInstanceMsg,
})

/* Shibu: TODO
/* TODO
shell.AddCmd(&ishell.Cmd{
Name: "getresp",
Help: "receive respone from agent, ex: getresp",
Expand Down
6 changes: 3 additions & 3 deletions pkg/cntlr/mtp.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (c *Cntlr) MtpStart() error {
func (c *Cntlr) MtpReceiveThread() {
for {
chanData := <-c.mtpH.rxChannel
log.Println("Shibu: Rx'd USP record from mtp type: ", chanData.MtpType)
log.Println("Rx'd USP record from mtp type: ", chanData.MtpType)

rData, err := c.parseUspRecord(chanData.Rec)
if err != nil {
Expand All @@ -94,7 +94,7 @@ func (c *Cntlr) MtpReceiveThread() {
if rData.recordType == "STOMP_CONNECT" {
aStomp := &mtp.AgentStomp{}
aStomp.Conn = c.mtpH.stomp.Conn
aStomp.DestQueue = "/queue/agent-1" //agentId
aStomp.DestQueue = rData.destQueue

initData := &agentInitData{}
initData.epId = agentId
Expand All @@ -119,7 +119,7 @@ func (c *Cntlr) MtpReceiveThread() {
}
aStomp := &mtp.AgentStomp{}
aStomp.Conn = c.mtpH.stomp.Conn
aStomp.DestQueue = agentId
aStomp.DestQueue = rData.destQueue

if mData.notify.nType == NotifyEvent && mData.notify.evt.name == "Boot!" {
log.Println("Received Boot event from agent")
Expand Down
14 changes: 10 additions & 4 deletions pkg/cntlr/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type uspRecordData struct {
fromId string
toId string
recordType string
destQueue string
}

func (c *Cntlr) parseUspRecord(s []byte) (*uspRecordData, error) {
Expand All @@ -44,7 +45,10 @@ func (c *Cntlr) parseUspRecord(s []byte) (*uspRecordData, error) {
rData.fromId = r.GetFromId()
switch r.RecordType.(type) {
case *usp_record.Record_StompConnect:
sc := r.GetStompConnect()
log.Println("Record Type:", r.GetStompConnect())
rData.destQueue = sc.GetSubscribedDestination()
log.Println("Subscribed Destination:", sc.GetSubscribedDestination())
rData.recordType = "STOMP_CONNECT"
default:
log.Println("Invalid record type")
Expand All @@ -65,15 +69,17 @@ func (c *Cntlr) parseUspRecord(s []byte) (*uspRecordData, error) {
return rData, nil
}
func (c *Cntlr) validateUspRecord(rData *uspRecordData) error {
if rData.version != c.cfg.usp.protoVersion {
log.Printf("Wrong USP Rx Version: %v, supproted Ver: ", rData.version, c.cfg.usp.protoVersion)
return errors.New("USP version mismatch")
if c.cfg.usp.protoVersionCheck {
if rData.version != c.cfg.usp.protoVersion {
log.Printf("Wrong USP Rx Version: %v, supproted Ver: %v", rData.version, c.cfg.usp.protoVersion)
return errors.New("USP version mismatch")
}
}
if rData.toId != c.cfg.usp.endpointId {
log.Printf("Wrong USP Rx ToId: %v, controller Id: %v", rData.toId, c.cfg.usp.endpointId)
return errors.New("USP ToId/Controller id mismatch")
}
log.Printf("Record: USP version: %v, toId: %v", rData.version, rData.toId)
log.Printf("Rx Record: USP version: %v, toId: %v", rData.version, rData.toId)
log.Println("Validated controller Id and USP protocol version")
return nil
}
10 changes: 1 addition & 9 deletions pkg/mtp/stomp.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type stompCfg struct {
mode string
serverAddr string
serverAddrTLS string
agentQueue string
controllerQueue string
userName string
passwd string
Expand Down Expand Up @@ -72,13 +71,6 @@ func loadStompConfigFromEnv() error {
return errors.New("STOMP_ADDR is not set")
}

if env, ok := os.LookupEnv("STOMP_AGENT_QUEUE"); ok {
sCfg.agentQueue = env
} else {
log.Println("STOMP Agent Queue is not set")
return errors.New("STOMP_AGENT_QUEUE is not set")
}

if env, ok := os.LookupEnv("STOMP_CNTLR_QUEUE"); ok {
sCfg.controllerQueue = env
} else {
Expand Down Expand Up @@ -118,8 +110,8 @@ func (s AgentStomp) SendMsg(msg []byte) error {
id := stompngo.Uuid()
h = h.Add("id", id)
h = h.Add("destination", s.DestQueue)
log.Printf("Stomp destination: %v", s.DestQueue)
h = h.Add("content-type", "application/vnd.bbf.usp.msg")
log.Printf("Sending USP record to destination: %v, Success", s.DestQueue)
return s.Conn.SendBytes(h, msg)
}

Expand Down

0 comments on commit 74251eb

Please sign in to comment.