Skip to content

Commit

Permalink
add weight and leastConn mode.change monitor tpl.
Browse files Browse the repository at this point in the history
  • Loading branch information
devtw committed Mar 18, 2014
1 parent 7000f13 commit 51fa21b
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 36 deletions.
Binary file modified GoHAProxy
Binary file not shown.
4 changes: 2 additions & 2 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"Src": "",
"SrcPort": "9000",
"Mode":"tcp",
"Type":"Source",
"Type":"Weight",
"CheckTime":1,
"DstList":
[
Expand Down Expand Up @@ -72,7 +72,7 @@
"Src": "",
"SrcPort": "9003",
"Mode":"tcp",
"Type":"Health",
"Type":"LeastConn",
"CheckTime":1,
"DstList":
[
Expand Down
90 changes: 69 additions & 21 deletions forwardServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ import (
"time"
)

var (
count int = 0
)

type ForwardServer struct {
ClientList *list.List
Expand All @@ -27,7 +24,7 @@ type Client struct {
}

func (fs *ForwardServer) CheckHealth(connType string, uri string) (bool, int) {
//fmt.Printf("checkHealth:Type:%s Addr:%s\n",connType,uri)

conn, err := net.Dial(connType, uri)
errCode := 0

Expand Down Expand Up @@ -83,8 +80,7 @@ func (fs *ForwardServer) GetClientElement(RemoteAddrs string) *list.Element {
return nil
}

func (fs *ForwardServer) GetClient(RemoteAddrs string) *Client {
//RemoteAddrs := _RemoteAddr
func (fs *ForwardServer) GetClient(RemoteAddrs string) *Client {
RemoteAddr, _ := GetRemoteAddrInfo(RemoteAddrs)

for e := fs.ClientList.Front(); e != nil; e = e.Next() {
Expand All @@ -103,18 +99,18 @@ func GetRemoteAddrInfo(RemoteAddrs string) (string, string) {
}

func (fs *ForwardServer) Forward(localConn net.Conn, index int) {
// Setup server Conn
count++
// Setup server Conn
srvConn, err := net.Dial(fs.srvProxy.Mode, fs.srvProxy.GetDstAddr(index))
if err != nil {
fmt.Printf("forward Err: %v\n", err)
return
}
fs.srvProxy.DstList[index].Connections++
//fmt.Printf("connType:%s serverAddr:%s fowarding[%d] \n",connType,serverAddrString,count)

// Copy localConn.Reader to sshConn.Writer
fs.CheckTimeout(localConn)
fs.CheckTimeout(srvConn)

if srvConn != nil {
go func() {
_, err := io.Copy(srvConn, localConn)
Expand Down Expand Up @@ -147,10 +143,8 @@ func (fs *ForwardServer) Forward(localConn net.Conn, index int) {
localConn.Close()
return
}
}()
//defer srvConn.Close()
}
//defer localConn.Close()
}()
}
}

func (fs *ForwardServer) Check() {
Expand All @@ -161,8 +155,7 @@ func (fs *ForwardServer) Check() {
fs.srvProxy.DstList[k].Health, _ = fs.CheckHealth("tcp", fs.srvProxy.GetDstAddr(k))
} else {
fs.srvProxy.DstList[k].Health = true
}
//fmt.Printf("Check Mode:%s DstAddr:%s Health:%v SrvHealth:%v\n",fs.srvProxy.Mode,fs.srvProxy.GetDstAddr(k),dstObj.Health,fs.srvProxy.DstList[k].Health)
}
}
time.Sleep(time.Duration(fs.srvProxy.CheckTime) * time.Second)

Expand Down Expand Up @@ -206,9 +199,53 @@ func (fs *ForwardServer) TurnToNode(localConn net.Conn) {

switch fs.srvProxy.Type {
case "LeastConn":

DstIndex := 0

for k,Dst := range fs.srvProxy.DstList {
if fs.srvProxy.DstList[DstIndex].Connections > Dst.Connections {
DstIndex = k
}
}

DstIndex = fs.GetHealthNode(DstIndex)
fs.srvProxy.DstList[DstIndex].Counter++
if *configInfo.Debug {
fmt.Printf("DstAddr:%s Remote:%s\n", fs.srvProxy.GetDstAddr(DstIndex), localConn.RemoteAddr().String())
}
go fs.Forward(localConn, DstIndex)

case "Weight":

DstIndex := -1

for k,Dst := range fs.srvProxy.DstList {
if Dst.WeightCounter % Dst.Weight != 0 {
DstIndex = k
fs.srvProxy.DstList[k].WeightCounter++
break
}
}
//fmt.Printf("First DstIndex:%d\n",DstIndex)
if DstIndex == -1 {

for k,Dst := range fs.srvProxy.DstList {
if Dst.WeightCounter % Dst.Weight == 0 && DstIndex == -1 {
DstIndex = k
}/* else {
fs.srvProxy.DstList[k].WeightCounter = 0
} */
fs.srvProxy.DstList[k].WeightCounter = 1

//fmt.Printf("Name:%s Weight:%v WeightCounter:%v DstIndex:%d\n",Dst.Name,Dst.Weight,Dst.WeightCounter,DstIndex)
}
}
//fmt.Printf("Final DstIndex:%d\n",DstIndex)
DstIndex = fs.GetHealthNode(DstIndex)
fs.srvProxy.DstList[DstIndex].Counter++
if *configInfo.Debug {
fmt.Printf("DstAddr:%s Remote:%s\n", fs.srvProxy.GetDstAddr(DstIndex), localConn.RemoteAddr().String())
}
go fs.Forward(localConn, DstIndex)

case "Source":
client := fs.GetClient(localConn.RemoteAddr().String())
if client == nil {
Expand Down Expand Up @@ -250,15 +287,14 @@ func (fs *ForwardServer) Stop() {
if fs.localListener != nil {
fs.localListener.Close()
}
//fmt.Printf("FS:%s ready to stop.%v\n",fs.srvProxy.Name,fs.Run)
}

func (fs *ForwardServer) Listen(srvProxy Proxy) {

if *configInfo.Debug {
fmt.Printf("forwardServer connType:%s serverAddr:%s Type:%s\n", srvProxy.Mode, srvProxy.GetSrcAddr(), srvProxy.Type)
fmt.Printf("ForwardServer connType:%s serverAddr:%s Type:%s\n", srvProxy.Mode, srvProxy.GetSrcAddr(), srvProxy.Type)
}
//已經使用var 宣告則物件已建立,不需要再用new
//FS := new(ForwardServer)

fs.ClientList = list.New()
fs.Run = true
fs.srvProxy = srvProxy
Expand All @@ -268,6 +304,18 @@ func (fs *ForwardServer) Listen(srvProxy Proxy) {
}
fs.srvProxy.DstLen = len(fs.srvProxy.DstList)
if fs.srvProxy.Mode == "tcp" || fs.srvProxy.Mode == "http" || fs.srvProxy.Mode == "health" {

//Init
switch fs.srvProxy.Type {
case "Weight":
for k,Dst := range fs.srvProxy.DstList {
fs.srvProxy.DstList[k].Weight += 2
fs.srvProxy.DstList[k].WeightCounter = 1
fmt.Printf("Name:%s Weight:%v WeightCounter:%v \n",Dst.Name,fs.srvProxy.DstList[k].Weight,fs.srvProxy.DstList[k].WeightCounter)
}

}

switch fs.srvProxy.Mode {
case "health":
go fs.Check()
Expand Down
17 changes: 4 additions & 13 deletions goHAProxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ var configInfo ConfigInfo
var help *bool

func main() {
//fmt.Printf("GoHAProxy version:%s\n",version)
configInfo.FileName = flag.String("config", "./config.json", "set config file path.")
configInfo.Debug = flag.Bool("debug", false, "show debug trace message.")
configInfo.Version = flag.Bool("version", false, "GoHAProxy version.")
Expand All @@ -36,16 +35,12 @@ func main() {
if *configInfo.Version {
fmt.Println("GoHAProxy Version", version)
os.Exit(0)
}

fmt.Printf("GoHAProxy FileName:%s\n", *configInfo.FileName)

}
ok,haConfig := loadConfigs()
if ok {
for _, proxy := range haConfig.Configs.ProxyList {
FS := new(ForwardServer)
proxyServer.ServerList = append(proxyServer.ServerList, FS)
//proxyServer.ServerList[k] = FS
go FS.Listen(proxy)
}
}
Expand Down Expand Up @@ -95,12 +90,8 @@ func configWatcher() {
fmt.Printf("Delete Proxy[%v]: %v\n", k, sProxy.srvProxy.Name)
}
sProxy.Stop()
//proxyServer.ServerList = append(proxyServer.ServerList[:k], proxyServer.ServerList[k+1:])
proxyServer.ServerList = proxyServer.ServerList[:k+copy(proxyServer.ServerList[k:], proxyServer.ServerList[k+1:])]
//proxyServer.ServerList = copy(proxyServer.ServerList[k:], proxyServer.ServerList[k+1:])

//delete(proxyServer.ServerList, k)
//time.Sleep(1 * time.Second)

}
}

Expand All @@ -119,8 +110,7 @@ func configWatcher() {
}
if newProxy {
FS := new(ForwardServer)
proxyServer.ServerList = append(proxyServer.ServerList, FS)
//proxyServer.ServerList[lastkey+1] = FS
proxyServer.ServerList = append(proxyServer.ServerList, FS)
if *configInfo.Debug {
fmt.Printf("Add New Proxy: %v\n", proxy)
}
Expand All @@ -134,6 +124,7 @@ func configWatcher() {
}

func loadConfigs() (bool,HAConfig) {
fmt.Printf("GoHAProxy load config file:%s\n", *configInfo.FileName)
file, e := ioutil.ReadFile(*configInfo.FileName)
if e != nil {
fmt.Printf("Load GoHAProxy config error: %v\n", e)
Expand Down
1 change: 1 addition & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type DstConfig struct {
Dst string
DstPort string
Weight int
WeightCounter int
Check bool
Health bool
Counter int
Expand Down

0 comments on commit 51fa21b

Please sign in to comment.