diff --git a/Makefile b/Makefile index f48f4c9..4a1e9cb 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,7 @@ -all: clean init gibbon tarball +all: clean init gibbon agent tarball + +agent: + cd gibbonagent; GOOS=linux GOARCH=arm go build -o agent; go build -o agent.amd64 init: mkdir -p output @@ -11,12 +14,16 @@ gibbon: # cd gibbonapi && go build tarball: init gibbon - cp control.sh output - cp -r etc output + cp gibbond/control.sh output + mkdir -p output/etc + cp gibbond/etc/conf_product.json output/etc/conf.json + cp gibbond/etc/log.xml output/etc/log.xml + cp gibbond/etc/supervisord.conf output/etc/ cp misc/setupenv.sh output tar -czf gibbon.tgz output clean: go clean rm -rf gibbon gibbon.tgz output - + rm -f test/test + rm -f gibbonagent/agent gibbonagent/agent.amd64 diff --git a/api/gibbonapi.go b/api/gibbonapi.go deleted file mode 100644 index 3a38192..0000000 --- a/api/gibbonapi.go +++ /dev/null @@ -1,262 +0,0 @@ -package api - -import ( - "encoding/json" - "fmt" - log "github.com/cihub/seelog" - "io/ioutil" - "net/http" - "os" - "time" - - "github.com/chenyf/gibbon/comet" - "github.com/chenyf/gibbon/devcenter" -) - -type CommandRequest struct { - Uid string `json:"uid"` - Cmd string `json:"cmd"` -} - -type CommandResponse struct { - Status int `json:"status"` - Error string `json:"error"` -} - -type RouterInfo struct { - Rid string `json:"rid"` - Rname string `json:"rname"` -} - -type ResponseRouterList struct { - Status int `json:"status"` - Descr string `json:"descr"` - List []RouterInfo `json:"list"` -} - -func checkAuthz(uid string, devid string) bool { - log.Tracef("checkAuthz") - - devices, err := devcenter.GetDevices(uid, devcenter.DEV_ROUTER) - if err != nil { - log.Errorf("GetDevices failed: %s", err.Error()) - return false - } - - for _, dev := range devices { - if devid == dev.Id { - return true - } - } - return false -} - -func getStatus(w http.ResponseWriter, r *http.Request) { - size := comet.DevMap.Size() - fmt.Fprintf(w, "total register device: %d\n", size) -} - -func postRouterCommand(w http.ResponseWriter, r *http.Request) { - log.Tracef("postRouterCommand") - log.Debugf("Request from RemoterAddr: %s", r.RemoteAddr) - var ( - uid string - rid string - response CommandResponse - client *comet.Client - body []byte - err error - cmdRequest CommandRequest - bCmd []byte - reply chan *comet.Message - ) - response.Status = 1 - if r.Method != "POST" { - response.Error = "must using 'POST' method\n" - goto resp - } - r.ParseForm() - rid = r.FormValue("rid") - if rid == "" { - response.Error = "missing 'rid'" - goto resp - } - - uid = r.FormValue("uid") - if uid == "" { - response.Error = "missing 'uid'" - goto resp - } - - if !checkAuthz(uid, rid) { - log.Warnf("auth failed. uid: %s, rid: %s", uid, rid) - response.Error = "authorization failed" - goto resp - } - - /* - uid := r.FormValue("uid") - if uid == "" { fmt.Fprintf(w, "missing 'uid'\n"); return; } - tid := r.FormValue("tid") - if tid == "" { fmt.Fprintf(w, "missing 'tid'\n"); return; } - sign := r.FormValue("sign") - if sign == "" { fmt.Fprintf(w, "missing 'sign'\n"); return; } - tm := r.FormValue("tm") - if tm == "" { fmt.Fprintf(w, "missing 'tm'\n"); return; } - pmtt := r.FormValue("pmtt") - if pmtt == "" { fmt.Fprintf(w, "missing 'pmtt'\n"); return; } - query := map[string]string { - "uid" : uid, - "rid" : rid, - "tid" : tid, - "src" : "letv", - "tm" : tm, - "pmtt" : pmtt, - } - path := "/router/command" - mysign := sign(path, query) - if mysign != sign { - response.Error = "sign valication failed" - b, _ := json.Marshal(response) - fmt.Fprintf(w, string(b)) - return - } - */ - - if r.Body == nil { - response.Error = "missing POST data" - goto resp - } - - if !comet.DevMap.Check(rid) { - response.Error = fmt.Sprintf("device (%s) offline", rid) - goto resp - } - client = comet.DevMap.Get(rid).(*comet.Client) - - body, err = ioutil.ReadAll(r.Body) - r.Body.Close() - if err != nil { - response.Error = "invalid POST body" - goto resp - } - - cmdRequest = CommandRequest{ - Uid: uid, - Cmd: string(body), - } - - bCmd, _ = json.Marshal(cmdRequest) - reply = make(chan *comet.Message) - client.SendMessage(comet.MSG_REQUEST, bCmd, reply) - select { - case msg := <-reply: - w.Write(msg.Data) - case <-time.After(10 * time.Second): - response.Error = "recv response timeout" - goto resp - } - return - -resp: - b, _ := json.Marshal(response) - log.Debugf("postRouterCommand write: %s", string(b)) - w.Write(b) -} - -func getRouterList(w http.ResponseWriter, r *http.Request) { - log.Tracef("getRouterList") - var ( - uid string - tid string - response ResponseRouterList - router RouterInfo - devices map[string]devcenter.Device - err error - ) - - response.Status = -1 - if r.Method != "GET" { - response.Descr = "must using 'GET' method\n" - goto resp - } - r.ParseForm() - - uid = r.FormValue("uid") - if uid == "" { - response.Descr = "missing 'uid'" - goto resp - } - - tid = r.FormValue("tid") - if tid == "" { - response.Descr = "missing 'tid'" - goto resp - } - - devices, err = devcenter.GetDevices(uid, devcenter.DEV_ROUTER) - if err != nil { - log.Errorf("GetDevices failed: %s", err.Error()) - response.Descr = err.Error() - goto resp - } - - for _, dev := range devices { - router = RouterInfo{ - Rid: dev.Id, - Rname: dev.Title, - } - response.List = append(response.List, router) - } - - //router = RouterInfo{ - // Rid: "c80e774a1e73", - // Rname: "router1", - //} - //response.List = append(response.List, router) - - response.Status = 0 - response.Descr = "OK" - -resp: - b, _ := json.Marshal(response) - log.Debugf("getRoutelist write: %s", string(b)) - w.Write(b) -} - -func getCommand(w http.ResponseWriter, r *http.Request) { - log.Tracef("getCommand") - r.ParseForm() - devid := r.FormValue("devid") - if devid == "" { - fmt.Fprintf(w, "missing devid\n") - return - } - if !comet.DevMap.Check(devid) { - fmt.Fprintf(w, "(%s) not register\n", devid) - return - } - cmd := r.FormValue("cmd") - client := comet.DevMap.Get(devid).(*comet.Client) - reply := make(chan *comet.Message) - client.SendMessage(comet.MSG_REQUEST, []byte(cmd), reply) - select { - case msg := <-reply: - fmt.Fprintf(w, "recv reply (%s)\n", string(msg.Data)) - case <-time.After(10 * time.Second): - fmt.Fprintf(w, "recv timeout\n") - } -} - -func StartHttp(addr string) { - log.Infof("Starting HTTP server on %s", addr) - http.HandleFunc("/router/command", postRouterCommand) - http.HandleFunc("/router/list", getRouterList) - http.HandleFunc("/command", getCommand) - http.HandleFunc("/status", getStatus) - err := http.ListenAndServe(addr, nil) - if err != nil { - log.Criticalf("http listen: ", err) - os.Exit(1) - } -} diff --git a/cloud/def.go b/cloud/def.go new file mode 100644 index 0000000..7be9a2b --- /dev/null +++ b/cloud/def.go @@ -0,0 +1,16 @@ +package cloud + +const ( + ERR_NOERROR = 10000 + ERR_CMD_TIMEOUT = 20000 +) + +type ApiStatus struct { + ErrNo int `json:"errno"` + ErrMsg string `json:"errmsg,omitempty"` +} + +type ApiResponse struct { + ApiStatus + Data interface{} `json:"data,omitempty"` +} diff --git a/comet/message.go b/comet/message.go index c6eb2db..4ee60c7 100644 --- a/comet/message.go +++ b/comet/message.go @@ -23,13 +23,14 @@ const ( ) const ( - MSG_HEARTBEAT = uint8(0) - MSG_REGISTER = uint8(1) - MSG_REGISTER_REPLY = uint8(2) - MSG_REQUEST = uint8(3) - MSG_REQUEST_REPLY = uint8(4) - MSG_ROUTER_COMMAND = uint8(10) - MSG_ROUTER_COMMAND_REPLY = uint8(11) + MSG_HEARTBEAT = uint8(0) + MSG_REGISTER = uint8(1) + MSG_REGISTER_REPLY = uint8(2) + MSG_ROUTER_COMMAND = uint8(3) + MSG_ROUTER_COMMAND_REPLY = uint8(4) + +// MSG_REQUEST = uint8(3) +// MSG_REQUEST_REPLY = uint8(4) ) // msg to byte @@ -54,18 +55,25 @@ type RegisterMessage struct { } type RegisterReplyMessage struct { + HeartbeatInterval int `json:"bt_interval"` +} + +type CommandRequest struct { + Uid string `json:"uid"` + Cmd string `json:"cmd"` } type RouterCommandMessage struct { - Uid string `json:"uid"` - Cmd struct { - Forward string `json:"forward"` - } `json:"cmd"` + Uid string `json:"uid"` + Cmd string `json:"cmd"` } -type RouterCommandReplyMessage struct { - Status int `json:"status"` - Descr string `json:"descr"` - Result string `json:"result"` +type RouterCommand struct { + Forward string `json:"forward"` } +type RouterCommandReplyMessage struct { + Status int `json:"status"` + Descr string `json:"descr"` + Result string `json:"result"` +} diff --git a/comet/server.go b/comet/server.go index 7993cf0..8ab15ca 100644 --- a/comet/server.go +++ b/comet/server.go @@ -1,31 +1,35 @@ package comet import ( - //"log" log "github.com/cihub/seelog" "io" "net" "sync" + "sync/atomic" "time" //"strings" - "github.com/chenyf/gibbon/utils/safemap" + "github.com/chenyf/gibbon/storage" + "github.com/chenyf/push/utils/safemap" ) type MsgHandler func(*Client, *Header, []byte) int type Server struct { - exitCh chan bool - waitGroup *sync.WaitGroup - funcMap map[uint8]MsgHandler - acceptTimeout time.Duration - readTimeout time.Duration - writeTimeout time.Duration - heartbeatTimeout time.Duration - maxMsgLen uint32 + Name string // unique name of this server + exitCh chan bool + waitGroup *sync.WaitGroup + funcMap map[uint8]MsgHandler + acceptTimeout time.Duration + readTimeout time.Duration + writeTimeout time.Duration + heartbeatInterval time.Duration + heartbeatTimeout time.Duration + maxMsgLen uint32 } func NewServer() *Server { return &Server{ + Name: "gibbon", exitCh: make(chan bool), waitGroup: &sync.WaitGroup{}, funcMap: make(map[uint8]MsgHandler), @@ -38,25 +42,21 @@ func NewServer() *Server { } type Client struct { - devId string + DevId string ctrl chan bool - MsgOut chan *Pack + MsgOut chan *Message WaitingChannels map[uint32]chan *Message NextSeqId uint32 LastAlive time.Time + RegistTime time.Time } -type Pack struct { - msg *Message - client *Client - reply chan *Message -} - -func (client *Client) SendMessage(msgType uint8, body []byte, reply chan *Message) { +func (this *Client) SendMessage(msgType uint8, body []byte, reply chan *Message) (seq uint32) { + seq = this.NextSeq() header := Header{ Type: msgType, Ver: 0, - Seq: 0, + Seq: seq, Len: uint32(len(body)), } msg := &Message{ @@ -64,47 +64,55 @@ func (client *Client) SendMessage(msgType uint8, body []byte, reply chan *Messag Data: body, } - pack := &Pack{ - msg: msg, - client: client, - reply: reply, + // add reply channel + if reply != nil { + this.WaitingChannels[seq] = reply } - client.MsgOut <- pack + this.MsgOut <- msg + return seq +} + +func (this *Client) MsgTimeout(seq uint32) { + delete(this.WaitingChannels, seq) +} + +func (this *Client) NextSeq() uint32 { + return atomic.AddUint32(&this.NextSeqId, 1) } var ( DevMap *safemap.SafeMap = safemap.NewSafeMap() ) -func InitClient(conn *net.TCPConn, devid string) *Client { +func (this *Server) InitClient(conn *net.TCPConn, devid string) *Client { + // save the client device Id to storage + if err := storage.Instance.AddDevice(this.Name, devid); err != nil { + log.Infof("failed to put device %s into redis:", devid, err) + return nil + } + client := &Client{ - devId: devid, + DevId: devid, ctrl: make(chan bool), - MsgOut: make(chan *Pack, 100), + MsgOut: make(chan *Message, 100), WaitingChannels: make(map[uint32]chan *Message), - NextSeqId: 1, + NextSeqId: 0, LastAlive: time.Now(), + RegistTime: time.Now(), } DevMap.Set(devid, client) go func() { - log.Tracef("start send routine for %s", conn.RemoteAddr().String()) + log.Tracef("start send routine for [%s] [%s]", devid, conn.RemoteAddr().String()) for { select { - case pack := <-client.MsgOut: - seqid := pack.client.NextSeqId - pack.msg.Header.Seq = seqid - b, _ := pack.msg.Header.Serialize() + case msg := <-client.MsgOut: + b, _ := msg.Header.Serialize() conn.Write(b) - conn.Write(pack.msg.Data) - log.Infof("send msg ok, (%s)", string(pack.msg.Data)) - pack.client.NextSeqId += 1 - // add reply channel - if pack.reply != nil { - pack.client.WaitingChannels[seqid] = pack.reply - } + conn.Write(msg.Data) + log.Infof("send msg to [%s]. seq: %d. body: (%s)", devid, msg.Header.Seq, string(msg.Data)) case <-client.ctrl: - log.Tracef("leave send routine for %s", conn.RemoteAddr().String()) + log.Tracef("leave send routine for [%s] [%s]", devid, conn.RemoteAddr().String()) return } } @@ -112,24 +120,29 @@ func InitClient(conn *net.TCPConn, devid string) *Client { return client } -func CloseClient(client *Client) { +func (this *Server) CloseClient(client *Client) { client.ctrl <- true - DevMap.Delete(client.devId) + if err := storage.Instance.RemoveDevice(this.Name, client.DevId); err != nil { + log.Errorf("failed to remove device %s from redis:", client.DevId, err) + } + DevMap.Delete(client.DevId) } func handleReply(client *Client, header *Header, body []byte) int { - log.Debugf("Received reply: %s", body) + log.Debugf("Received reply from [%s]. seq: %d.", client.DevId, header.Seq) ch, ok := client.WaitingChannels[header.Seq] if ok { //remove waiting channel from map delete(client.WaitingChannels, header.Seq) ch <- &Message{Header: *header, Data: body} + } else { + log.Warnf("no waiting channel for seq: %d, device: %s", header.Seq, client.DevId) } return 0 } func handleHeartbeat(client *Client, header *Header, body []byte) int { - log.Debugf("Heartbeat from devid: %s", client.devId) + log.Debugf("Heartbeat from devid: %s", client.DevId) client.LastAlive = time.Now() return 0 } @@ -142,6 +155,10 @@ func (this *Server) SetReadTimeout(timeout time.Duration) { this.readTimeout = timeout } +func (this *Server) SetHeartbeatInterval(timeout time.Duration) { + this.heartbeatInterval = timeout +} + func (this *Server) SetHeartbeatTimeout(timeout time.Duration) { this.heartbeatTimeout = timeout } @@ -162,7 +179,26 @@ func (this *Server) Init(addr string) (*net.TCPListener, error) { return nil, err } this.funcMap[MSG_HEARTBEAT] = handleHeartbeat - this.funcMap[MSG_REQUEST_REPLY] = handleReply + this.funcMap[MSG_ROUTER_COMMAND_REPLY] = handleReply + + if err := storage.Instance.InitDevices(this.Name); err != nil { + log.Errorf("failed to InitDevices: %s", err.Error()) + return nil, err + } + + // keep the data of this node not expired on redis + go func() { + for { + select { + case <-this.exitCh: + log.Infof("exiting storage refreshing routine") + return + case <-time.After(10 * time.Second): + storage.Instance.RefreshDevices(this.Name, 30) + } + } + }() + return l, nil } @@ -174,9 +210,13 @@ func (this *Server) Run(listener *net.TCPListener) { }() //go this.dealSpamConn() - log.Infof("Starting comet server on: %s\n", listener.Addr().String()) - log.Infof("Comet server settings: readtimeout [%d], accepttimeout [%d], heartbeattimeout [%d]\n", - this.readTimeout, this.acceptTimeout, this.heartbeatTimeout) + log.Infof("Starting comet server on: %s", listener.Addr().String()) + log.Infof("Comet server settings: readtimeout [%dms], accepttimeout [%dms], heartbeatinterval [%dms] heartbeattimeout [%dms]", + this.readTimeout/time.Millisecond, + this.acceptTimeout/time.Millisecond, + this.heartbeatInterval/time.Millisecond, + this.heartbeatTimeout/time.Millisecond) + for { select { case <-this.exitCh: @@ -191,7 +231,7 @@ func (this *Server) Run(listener *net.TCPListener) { if e, ok := err.(*net.OpError); ok && e.Timeout() { continue } - log.Errorf("accept failed: %v\n", err) + log.Errorf("accept failed: %v", err) continue } /* @@ -213,7 +253,7 @@ func (this *Server) Stop() { log.Infof("comet server stopped") } -func waitRegister(conn *net.TCPConn) *Client { +func (this *Server) waitRegister(conn *net.TCPConn) *Client { conn.SetReadDeadline(time.Now().Add(10 * time.Second)) buf := make([]byte, 10) n, err := io.ReadFull(conn, buf) @@ -251,20 +291,27 @@ func waitRegister(conn *net.TCPConn) *Client { conn.Close() return nil } - client := InitClient(conn, devid) + client := this.InitClient(conn, devid) return client } // handle a TCP connection func (this *Server) handleConnection(conn *net.TCPConn) { - log.Debugf("accept connection (%v)", conn) - log.Infof("New conn accepted from %s\n", conn.RemoteAddr().String()) + log.Infof("New conn accepted from %s", conn.RemoteAddr().String()) // handle register first - client := waitRegister(conn) + client := this.waitRegister(conn) if client == nil { return } + var ( + readHeader = true + bytesRead = 0 + data []byte + header Header + startTime time.Time + ) + for { /* select { @@ -275,49 +322,62 @@ func (this *Server) handleConnection(conn *net.TCPConn) { } */ - var ( - data []byte = nil - ) - now := time.Now() if now.After(client.LastAlive.Add(this.heartbeatTimeout)) { - log.Warnf("heartbeat timeout") + log.Warnf("Device [%s] heartbeat timeout", client.DevId) break } - //conn.SetReadDeadline(time.Now().Add(this.readTimeout)) conn.SetReadDeadline(now.Add(10 * time.Second)) - buf := make([]byte, HEADER_SIZE) - n, err := io.ReadFull(conn, buf) - if err != nil { - if e, ok := err.(*net.OpError); ok && e.Timeout() { - //log.Printf("read timeout, %d", n) - continue + if readHeader { + buf := make([]byte, HEADER_SIZE) + n, err := io.ReadFull(conn, buf) + if err != nil { + if e, ok := err.(*net.OpError); ok && e.Timeout() { + //log.Printf("read timeout, %d", n) + continue + } + log.Errorf("readfull failed (%v)", err) + break + } + if err := header.Deserialize(buf[0:n]); err != nil { + log.Errorf("Deserialize header failed: %s", err.Error()) + break } - log.Errorf("readfull failed (%v)", err) - break - } - - var header Header - if err := header.Deserialize(buf[0:n]); err != nil { - log.Errorf("Deserialize header failed: %s", err.Error()) - break - } - if header.Len > MAX_BODY_LEN { - log.Warnf("Msg body too big: %d", header.Len) - break - } + if header.Len > MAX_BODY_LEN { + log.Warnf("Msg body too big from device [%s]: %d", header.Len, client.DevId) + break + } - if header.Len > 0 { - data = make([]byte, header.Len) - if _, err := io.ReadFull(conn, data); err != nil { + if header.Len > 0 { + data = make([]byte, header.Len) + readHeader = false + bytesRead = 0 + startTime = time.Now() + continue + } + } else { + n, err := conn.Read(data[bytesRead:]) + if err != nil { if e, ok := err.(*net.OpError); ok && e.Timeout() { - continue + if now.After(startTime.Add(this.readTimeout)) { + log.Infof("read packet data timeout [%s]", client.DevId) + break + } + } else { + log.Infof("read from client [%s] failed: (%s)", client.DevId, err.Error()) + break } - log.Errorf("read from client failed: (%v)", err) - break } + if n > 0 { + bytesRead += n + } + if uint32(bytesRead) < header.Len { + continue + } + readHeader = true + //log.Debugf("%s: body (%s)", client.DevId, data) } handler, ok := this.funcMap[header.Type] @@ -329,7 +389,8 @@ func (this *Server) handleConnection(conn *net.TCPConn) { } } // don't use defer to improve performance - log.Infof("close connection %s\n", conn.RemoteAddr().String()) - CloseClient(client) + log.Infof("closing device [%s] [%s]", client.DevId, conn.RemoteAddr().String()) + this.CloseClient(client) + log.Infof("close connection [%s]", conn.RemoteAddr().String()) conn.Close() } diff --git a/conf/conf.go b/conf/conf.go index fe8fb11..856c916 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -6,13 +6,21 @@ import ( ) type ConfigStruct struct { - Comet string `json:"comet"` - AcceptTimeout int `json:"accept_timeout"` - ReadTimeout int `json:"read_timeout"` - HeartbeatTimeout int `json:"heartbeat_timeout"` - - Web string `json:"web"` - DevCenter string `json:"devcenter"` + Comet string `json:"comet"` + AcceptTimeout int `json:"accept_timeout"` + ReadTimeout int `json:"read_timeout"` + HeartbeatInterval int `json:"heartbeat_interval"` + HeartbeatTimeout int `json:"heartbeat_timeout"` + Rabbit struct { + Enable bool `json:"enable"` + Uri string `json:"uri"` + } `json:"rabbit"` + Redis struct { + Server string `json:"server"` + Pass string `json:"pass"` + PoolSize int `json:"poolsize"` + Retry int `json:"retry"` + } `json:"redis"` } var ( diff --git a/devcenter/gibbonapi.go b/devcenter/gibbonapi.go deleted file mode 100644 index 16c51e3..0000000 --- a/devcenter/gibbonapi.go +++ /dev/null @@ -1,58 +0,0 @@ -package devcenter - -import ( - "encoding/json" - "errors" - "fmt" - log "github.com/cihub/seelog" - "io/ioutil" - "net/http" - - "github.com/chenyf/gibbon/conf" -) - -const ( - DEV_ROUTER = 3 -) - -type Device struct { - Id string `json:"id"` - Type int `json:"type"` - Title string `json:"title"` -} - -type devicesResult struct { - Errno int `json:"errno"` - Errmsg string `json:"errmsg"` - Data struct { - UserOpenId int `json:"userOpenId"` - DeviceList map[string]Device `json:"device"` - } `json:"data"` -} - -func GetDevices(uid string, devType int) (map[string]Device, error) { - log.Tracef("GetDevices") - url := fmt.Sprintf("http://%s/api/v1/device/bind/?user_id=%s&type=%d", conf.Config.DevCenter, uid, devType) - res, err := http.Get(url) - if err != nil { - return nil, err - } - body, err := ioutil.ReadAll(res.Body) - if err != nil { - return nil, err - } - - log.Debugf("Got response from device center: %s", body) - - var result devicesResult - err = json.Unmarshal(body, &result) - if err != nil { - return nil, err - } - - if result.Errno != 10000 { - return nil, errors.New(result.Errmsg) - } - - return result.Data.DeviceList, nil -} diff --git a/etc/conf.json b/etc/conf.json deleted file mode 100644 index df59f2d..0000000 --- a/etc/conf.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "comet" : "0.0.0.0:10000", - "accept_timeout" : 2, - "read_timeout" : 60, - "heartbeat_timeout" : 90, - - "web" : "0.0.0.0:9090", - "devcenter" : "10.154.156.90" -} diff --git a/gibbonagent/README.md b/gibbonagent/README.md deleted file mode 100644 index e69de29..0000000 diff --git a/gibbonagent/conf.json b/gibbonagent/conf.json index 7ace3b5..5fc6825 100644 --- a/gibbonagent/conf.json +++ b/gibbonagent/conf.json @@ -1,3 +1,3 @@ { -"address":"111.206.210.49:10000;111.206.210.50:10000" +"address":"10.154.156.121:10000" } diff --git a/gibbonagent/conf_rdtest.json b/gibbonagent/conf_rdtest.json new file mode 100644 index 0000000..f1d70bd --- /dev/null +++ b/gibbonagent/conf_rdtest.json @@ -0,0 +1,3 @@ +{ +"address":"10.154.156.121:10000;111.206.210.50:10000" +} diff --git a/gibbonagent/gibbonagent.go b/gibbonagent/gibbonagent.go index 5dec9f2..55f7eed 100644 --- a/gibbonagent/gibbonagent.go +++ b/gibbonagent/gibbonagent.go @@ -3,31 +3,50 @@ package main import ( "bytes" "encoding/json" + "flag" "fmt" //"io" - "io/ioutil" + "github.com/chenyf/gibbon/comet" log "github.com/cihub/seelog" + "io/ioutil" "net" "net/http" "os" + "os/signal" "strings" - "time" "sync" "syscall" - "os/signal" - "github.com/chenyf/gibbon/comet" + "time" +) + +const ( + defaultLogConfig string = ` + + + + + + + + + +` +) + +var ( + heartbeatInterval int = 110 ) type MsgHandler func(*Conn, *comet.Header, []byte) int type Agent struct { - done chan bool - funcMap map[uint8]MsgHandler + done chan bool + funcMap map[uint8]MsgHandler } func NewAgent() *Agent { agent := &Agent{ - done: make(chan bool), + done: make(chan bool), funcMap: make(map[uint8]MsgHandler), } agent.funcMap[comet.MSG_REGISTER_REPLY] = handleRegisterReply @@ -35,19 +54,19 @@ func NewAgent() *Agent { return agent } -func (this *Agent)Run() { +func (this *Agent) Run() { var c *Conn = NewConn() addSlice := strings.Split(Config.Address, ";") for { select { - case <- this.done: - log.Infof("agent quit") - return - default: + case <-this.done: + log.Infof("agent quit") + return + default: } if c.conn == nil { if ok := c.Connect(addSlice[0]); !ok { - time.Sleep(5*time.Second) + time.Sleep(10 * time.Second) continue } log.Infof("connect ok") @@ -56,12 +75,12 @@ func (this *Agent)Run() { c.conn.SetReadDeadline(time.Now().Add(3 * time.Second)) n := c.Read() if n < 0 { - // failed + // failed c.Close() c = NewConn() continue } else if n > 0 { - // need more data + // need more data continue } // ok @@ -79,7 +98,7 @@ func (this *Agent)Run() { } } -func (this *Agent)Stop() { +func (this *Agent) Stop() { close(this.done) } @@ -89,6 +108,12 @@ func sendReply(c *Conn, msgType uint8, seq uint32, v interface{}) { } func handleRegisterReply(c *Conn, header *comet.Header, body []byte) int { + var reply comet.RegisterReplyMessage + err := json.Unmarshal(body, &reply) + if err != nil { + //TODO + } + return 0 } @@ -97,17 +122,27 @@ func handleRouterCommand(c *Conn, header *comet.Header, body []byte) int { var reply comet.RouterCommandReplyMessage if err := json.Unmarshal(body, &msg); err != nil { - reply.Status = 2000 + reply.Status = -2000 + reply.Descr = "Request message is not JSON" + sendReply(c, comet.MSG_ROUTER_COMMAND_REPLY, header.Seq, &reply) + return 0 + } + + var cmd comet.RouterCommand + if err := json.Unmarshal([]byte(msg.Cmd), &cmd); err != nil { + reply.Status = -2000 reply.Descr = "Request body is not JSON" sendReply(c, comet.MSG_ROUTER_COMMAND_REPLY, header.Seq, &reply) return 0 } - if msg.Cmd.Forward == "" { - reply.Status = 2001 + + if cmd.Forward == "" { + reply.Status = -2001 reply.Descr = "'forward' is empty" sendReply(c, comet.MSG_ROUTER_COMMAND_REPLY, header.Seq, &reply) return 0 } + client := &http.Client{ Transport: &http.Transport{ Dial: func(netw, addr string) (net.Conn, error) { @@ -115,30 +150,38 @@ func handleRouterCommand(c *Conn, header *comet.Header, body []byte) int { if err != nil { return nil, err } - conn.SetDeadline(time.Now().Add(time.Second * 2)) + conn.SetDeadline(time.Now().Add(time.Second * 5)) return conn, nil }, - ResponseHeaderTimeout: time.Second * 2, + ResponseHeaderTimeout: time.Second * 5, }, } + log.Debugf("Sending request to local service: %s", cmd.Forward) + response, err := client.Post("http://127.0.0.1:9999/", "application/json;charset=utf-8", - bytes.NewBuffer([]byte(msg.Cmd.Forward))) + bytes.NewBuffer([]byte(cmd.Forward))) if err != nil { - reply.Status = 2002 - reply.Descr = "Talk with local service failed" + reply.Status = -2002 + errMsg := fmt.Sprintf("Talk with local service failed: %s", err.Error()) + reply.Descr = errMsg + log.Errorf(errMsg) sendReply(c, comet.MSG_ROUTER_COMMAND_REPLY, header.Seq, &reply) return 0 } + result, err := ioutil.ReadAll(response.Body) response.Body.Close() if err != nil { - reply.Status = 2003 - reply.Descr = "Local service response failed" + reply.Status = -2003 + errMsg := fmt.Sprintf("Local service response failed: %s", err.Error()) + reply.Descr = errMsg + log.Errorf(errMsg) sendReply(c, comet.MSG_ROUTER_COMMAND_REPLY, header.Seq, &reply) return 0 } + reply.Status = 0 reply.Descr = "OK" reply.Result = string(result) @@ -154,8 +197,8 @@ type Serverslice struct { } type Pack struct { - msg *comet.Message - reply chan *comet.Message + msg *comet.Message + reply chan *comet.Message } type Conn struct { conn *net.TCPConn @@ -163,23 +206,23 @@ type Conn struct { outMsgs chan *Pack readFlag int nRead int - headBuf []byte + headBuf []byte dataBuf []byte header comet.Header } func NewConn() *Conn { return &Conn{ - conn : nil, - done : make(chan bool), - outMsgs : make(chan *Pack, 100), - readFlag : 0, - nRead : 0, - headBuf : make([]byte, comet.HEADER_SIZE), + conn: nil, + done: make(chan bool), + outMsgs: make(chan *Pack, 100), + readFlag: 0, + nRead: 0, + headBuf: make([]byte, comet.HEADER_SIZE), } } -func (this *Conn)Connect(service string) bool { +func (this *Conn) Connect(service string) bool { log.Infof("try to connect server address: %v\n", service) tcpAddr, err := net.ResolveTCPAddr("tcp4", service) if err != nil { @@ -197,12 +240,12 @@ func (this *Conn)Connect(service string) bool { return true } -func (this *Conn)Start() { +func (this *Conn) Start() { macAddr, _ := GetMac() b := []byte(macAddr) this.SendMessage(comet.MSG_REGISTER, 0, b, nil) go func() { - timer := time.NewTicker(110*time.Second) + timer := time.NewTicker(time.Duration(heartbeatInterval) * time.Second) h := comet.Header{} h.Type = comet.MSG_HEARTBEAT heartbeat, _ := h.Serialize() @@ -211,22 +254,22 @@ func (this *Conn)Start() { case <-this.done: return case pack := <-this.outMsgs: - //seqid := pack.client.nextSeq + //seqid := pack.client.nextSeq //pack.msg.Header.Seq = seqid b, _ := pack.msg.Header.Serialize() this.conn.Write(b) this.conn.Write(pack.msg.Data) - log.Infof("send msg: (%d) (%s)", pack.msg.Header.Type, pack.msg.Data) + log.Infof("send msg: Type(%d), Seq(%d), Body(%s)", pack.msg.Header.Type, pack.msg.Header.Seq, pack.msg.Data) //pack.client.nextSeq += 1 time.Sleep(1 * time.Second) - case <- timer.C: + case <-timer.C: this.conn.Write(heartbeat) } } }() } -func (this *Conn)Read() int { +func (this *Conn) Read() int { var n int if this.readFlag == 0 { n = MyRead(this.conn, this.headBuf[this.nRead:]) @@ -265,20 +308,20 @@ func (this *Conn)Read() int { return 0 } -func (this *Conn)BufReset() { +func (this *Conn) BufReset() { this.readFlag = 0 this.nRead = 0 } -func (this *Conn)Close() { +func (this *Conn) Close() { this.done <- true this.conn.Close() this.conn = nil this.BufReset() } -func (this *Conn)SendMessage(msgType uint8, seq uint32, body []byte, reply chan *comet.Message) { - header := comet.Header{ +func (this *Conn) SendMessage(msgType uint8, seq uint32, body []byte, reply chan *comet.Message) { + header := comet.Header{ Type: msgType, Ver: 0, Seq: seq, @@ -289,32 +332,45 @@ func (this *Conn)SendMessage(msgType uint8, seq uint32, body []byte, reply chan Data: body, } pack := &Pack{ - msg: msg, - reply: reply, + msg: msg, + reply: reply, } this.outMsgs <- pack } func main() { - //err := LoadConfig("/system/etc/conf.json") - err := LoadConfig("./conf.json") + var ( + logConfigFile = flag.String("l", "", "Log config file") + configFile = flag.String("c", "/system/etc/conf.json", "Config file") + ) + + flag.Parse() + + err := LoadConfig(*configFile) if err != nil { - fmt.Printf("LoadConfig failed: (%s)", err) + fmt.Printf("LoadConfig from %s failed: (%s)\n", *configFile, err) os.Exit(1) } - //logger, err := log.LoggerFromConfigAsFile("/system/etc/log.xml") - logger, err := log.LoggerFromConfigAsFile("./log.xml") - if err != nil { - fmt.Printf("Load log config failed: (%s)\n", err) - os.Exit(1) + var logger log.LoggerInterface + if *logConfigFile == "" { + logger, _ = log.LoggerFromConfigAsBytes([]byte(defaultLogConfig)) + } else { + //logger, err := log.LoggerFromConfigAsFile("/system/etc/log.xml") + logger, err = log.LoggerFromConfigAsFile(*logConfigFile) + if err != nil { + fmt.Printf("Load log config from %s failed: (%s)\n", *logConfigFile, err) + os.Exit(1) + } } log.ReplaceLogger(logger) + log.Infof("gibbon agent started") + //wg := &sync.WaitGroup{} agent := NewAgent() c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT) + signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT) wg := sync.WaitGroup{} wg.Add(1) @@ -328,4 +384,3 @@ func main() { agent.Stop() wg.Wait() } - diff --git a/control.sh b/gibbond/control.sh similarity index 100% rename from control.sh rename to gibbond/control.sh diff --git a/gibbond/etc/conf.json b/gibbond/etc/conf.json new file mode 120000 index 0000000..6aa6fd8 --- /dev/null +++ b/gibbond/etc/conf.json @@ -0,0 +1 @@ +conf_rdtest.json \ No newline at end of file diff --git a/gibbond/etc/conf_product.json b/gibbond/etc/conf_product.json new file mode 100644 index 0000000..b3415e7 --- /dev/null +++ b/gibbond/etc/conf_product.json @@ -0,0 +1,18 @@ +{ + "comet" : "0.0.0.0:10000", + "accept_timeout" : 2, + "read_timeout" : 60, + "heartbeat_interval" : 110, + "heartbeat_timeout" : 180, + + "rabbit" : { + "enable" : true, + "uri" : "amqp://guest:guest@10.135.28.70:5672/" + }, + "redis" : { + "server" : "10.135.28.70:6379", + "pass" : "rpasswd", + "poolsize" : 50, + "retry" : 3 + } +} diff --git a/gibbond/etc/conf_rdtest.json b/gibbond/etc/conf_rdtest.json new file mode 100644 index 0000000..bd53c72 --- /dev/null +++ b/gibbond/etc/conf_rdtest.json @@ -0,0 +1,18 @@ +{ + "comet" : "0.0.0.0:10000", + "accept_timeout" : 2, + "read_timeout" : 60, + "heartbeat_interval" : 110, + "heartbeat_timeout" : 180, + + "rabbit" : { + "enable" : true, + "uri" : "amqp://guest:guest@10.154.156.121:5672/" + }, + "redis" : { + "server" : "10.154.156.122:6380", + "pass" : "rpasswd", + "poolsize" : 50, + "retry" : 3 + } +} diff --git a/etc/log.xml b/gibbond/etc/log.xml similarity index 83% rename from etc/log.xml rename to gibbond/etc/log.xml index 5cb0f01..3853c62 100644 --- a/etc/log.xml +++ b/gibbond/etc/log.xml @@ -7,7 +7,7 @@ --> - + diff --git a/etc/supervisord.conf b/gibbond/etc/supervisord.conf similarity index 91% rename from etc/supervisord.conf rename to gibbond/etc/supervisord.conf index bce022e..2512b4a 100644 --- a/etc/supervisord.conf +++ b/gibbond/etc/supervisord.conf @@ -18,7 +18,7 @@ supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface gibbonurl=unix:///letv/run/gibbon/supervisor.sock [program:gibbon] -command=/letv/gibbon/gibbon -c /letv/gibbon/etc/conf.json +command=/letv/gibbon/gibbond -c /letv/gibbon/etc/conf.json user=work log_stdout=true log_stderr=true diff --git a/gibbond/gibbond.go b/gibbond/gibbond.go index 0ae22c4..6c2b35d 100644 --- a/gibbond/gibbond.go +++ b/gibbond/gibbond.go @@ -1,45 +1,23 @@ package main import ( - "crypto/hmac" - "crypto/sha1" "flag" "fmt" //"log" log "github.com/cihub/seelog" "os" "os/signal" - "strings" "sync" "syscall" "time" - "github.com/chenyf/gibbon/api" "github.com/chenyf/gibbon/comet" "github.com/chenyf/gibbon/conf" + "github.com/chenyf/gibbon/mq" + "github.com/chenyf/gibbon/storage" + "github.com/chenyf/push/utils" ) -func sign(path string, query map[string]string) []byte { - uid := query["uid"] - rid := query["rid"] - tid := query["tid"] - src := query["src"] - tm := query["tm"] - pmtt := query["pmtt"] - - raw := []string{path, uid, rid, tid, src, tm, pmtt} - args := []string{} - x := []int{6, 5, 4, 3, 2, 1, 0} - for _, item := range x { - args = append(args, raw[item]) - } - data := strings.Join(args, "") - key := "xnRzFxoCDRVRU2mNQ7AoZ5MCxpAR7ntnmlgRGYav" - mac := hmac.New(sha1.New, []byte(key)) - mac.Write([]byte(data)) - return mac.Sum(nil) -} - func main() { var ( @@ -54,6 +32,12 @@ func main() { os.Exit(1) } + err = log.RegisterCustomFormatter("Ms", utils.CreateMsFormatter) + if err != nil { + fmt.Printf("Failed to create custom formatter: (%s)\n", err) + os.Exit(1) + } + logger, err := log.LoggerFromConfigAsFile("./etc/log.xml") if err != nil { fmt.Printf("Load log config failed: (%s)\n", err) @@ -63,8 +47,13 @@ func main() { log.ReplaceLogger(logger) waitGroup := &sync.WaitGroup{} - cometServer := comet.NewServer() + storage.NewInstance(conf.Config.Redis.Server, + conf.Config.Redis.Pass, + conf.Config.Redis.PoolSize, + conf.Config.Redis.Retry) + + cometServer := comet.NewServer() listener, err := cometServer.Init(conf.Config.Comet) if err != nil { log.Criticalf("Failed to start comet server: %s", err.Error()) @@ -73,6 +62,7 @@ func main() { cometServer.SetAcceptTimeout(time.Duration(conf.Config.AcceptTimeout) * time.Second) cometServer.SetReadTimeout(time.Duration(conf.Config.ReadTimeout) * time.Second) + cometServer.SetHeartbeatInterval(time.Duration(conf.Config.HeartbeatInterval) * time.Second) cometServer.SetHeartbeatTimeout(time.Duration(conf.Config.HeartbeatTimeout) * time.Second) c := make(chan os.Signal, 1) @@ -91,8 +81,12 @@ func main() { cometServer.Run(listener) }() + _, err = mq.NewRpcServer(conf.Config.Rabbit.Uri, "gibbon_rpc_exchange", "gibbon") + if err != nil { + log.Critical("failed to start RPC server: ", err) + os.Exit(1) + } + waitGroup.Add(1) - go api.StartHttp(conf.Config.Web) waitGroup.Wait() } - diff --git a/misc/setupenv.sh b/misc/setupenv.sh index d158ae1..8a5cea6 100755 --- a/misc/setupenv.sh +++ b/misc/setupenv.sh @@ -3,6 +3,8 @@ USER=work WORK_ROOT=/letv +THIS_DIR=$(dirname $(readlink -f $0) ) + mkdir -p $WORK_ROOT/log/gibbon chown -R $USER:$USER $WORK_ROOT/log/gibbon @@ -10,7 +12,7 @@ mkdir -p $WORK_ROOT/run/gibbon chown -R $USER:$USER $WORK_ROOT/run/gibbon mkdir -p $WORK_ROOT/gibbon -cp gibbon $WORK_ROOT/gibbon -cp control.sh $WORK_ROOT/gibbon -cp -r etc $WORK_ROOT/gibbon +cp $THIS_DIR/gibbond $WORK_ROOT/gibbon +cp $THIS_DIR/control.sh $WORK_ROOT/gibbon +cp -r $THIS_DIR/etc $WORK_ROOT/gibbon chown -R $USER:$USER $WORK_ROOT/gibbon diff --git a/mq/rpc_msg.go b/mq/rpc_msg.go new file mode 100644 index 0000000..ca6f7ba --- /dev/null +++ b/mq/rpc_msg.go @@ -0,0 +1,23 @@ +package mq + +type MQ_Msg_Crtl struct { + DeviceId string `json:"dev_id"` + Service string `json:"svc"` + Cmd string `json:"cmd"` +} + +type MQ_Msg_CtrlReply struct { + Status int `json:"status"` + Result string `json:"result,omitempty"` +} + +const ( + STATUS_SUCCESS = 0 + + STATUS_INVALID_SERVICE = 1 + STATUS_EXCEPTION = 2 + + STATUS_NO_DEVICE = -1 + STATUS_SEND_FAILED = -2 + STATUS_SEND_TIMEOUT = -3 +) diff --git a/mq/rpc_server.go b/mq/rpc_server.go new file mode 100644 index 0000000..d2b039a --- /dev/null +++ b/mq/rpc_server.go @@ -0,0 +1,172 @@ +package mq + +import ( + "encoding/json" + "time" + + "github.com/chenyf/gibbon/comet" + log "github.com/cihub/seelog" + "github.com/streadway/amqp" +) + +var ( + rpcExchangeType string = "direct" +) + +type RpcServer struct { + conn *amqp.Connection + channel *amqp.Channel + exchange string +} + +func NewRpcServer(amqpURI, exchange, bindingKey string) (*RpcServer, error) { + server := &RpcServer{ + exchange: exchange, + } + + var err error + server.conn, err = amqp.Dial(amqpURI) + if err != nil { + log.Errorf("Dial: %s", err) + return nil, err + } + + log.Infof("got Connection, getting Channel") + server.channel, err = server.conn.Channel() + if err != nil { + log.Errorf("Channel: %s", err) + return nil, err + } + + log.Infof("got Channel, declaring %q Exchange (%q)", rpcExchangeType, exchange) + + if err := server.channel.ExchangeDeclare( + exchange, // name + rpcExchangeType, // type + true, // durable + false, // auto-deleted + false, // internal + false, // noWait + nil, // arguments + ); err != nil { + log.Errorf("Exchange Declare: %s", err) + return nil, err + } + + rpcQueue, err := server.channel.QueueDeclare( + "", // name + false, // durable + true, // autoDelete + true, // exclusive + false, // noWait + nil, // args + ) + if err != nil { + log.Errorf("Queue Declare: %s", err) + return nil, err + } + log.Infof("declared RPC queue [%s]", rpcQueue.Name) + + if err = server.channel.QueueBind( + rpcQueue.Name, // name of the queue + bindingKey, // bindingKey + exchange, // sourceExchange + false, // noWait + nil, // arguments + ); err != nil { + log.Errorf("Queue bind: %s", err) + return nil, err + } + + deliveries, err := server.channel.Consume( + rpcQueue.Name, // name + "", // consumerTag, + false, // noAck + false, // exclusive + false, // noLocal + false, // noWait + nil, // arguments + ) + if err != nil { + log.Errorf("consume error: %s", err) + return nil, err + } + + go server.handleDeliveries(deliveries) + + return server, nil +} + +func (this *RpcServer) Stop() { + this.conn.Close() +} + +func (this *RpcServer) SendRpcResponse(callbackQueue, correlationId string, resp interface{}) { + log.Infof("Sending RPC reply. RequestId: %s", correlationId) + data, _ := json.Marshal(resp) + if err := this.channel.Publish( + "", // publish to an exchange + callbackQueue, // routingKey + false, // mandatory + false, // immediate + amqp.Publishing{ + Headers: amqp.Table{}, + ContentType: "text/plain", + ContentEncoding: "", + DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent + Priority: 0, // 0-9 + ReplyTo: "", + CorrelationId: correlationId, + Body: data, + }, + ); err != nil { + log.Errorf("Exchange Publish: %s", err) + } +} + +func (this *RpcServer) handleDeliveries(deliveries <-chan amqp.Delivery) { + for d := range deliveries { + log.Debugf("got %dB RPC request [%s]", len(d.Body), d.CorrelationId) + d.Ack(false) + + var msg MQ_Msg_Crtl + if err := json.Unmarshal(d.Body, &msg); err != nil { + log.Errorf("Unknown MQ message: %s", err) + continue + } + + go this.handleRpcRequest(&msg, d.ReplyTo, d.CorrelationId) + } + + log.Infof("handle: deliveries channel closed") + //done <- nil +} + +func (this *RpcServer) handleRpcRequest(msg *MQ_Msg_Crtl, replyTo, correlationId string) { + rpcReply := MQ_Msg_CtrlReply{} + c := comet.DevMap.Get(msg.DeviceId) + if c == nil { + log.Warnf("RPC: no device %s on this server.", msg.DeviceId) + rpcReply.Status = STATUS_NO_DEVICE + this.SendRpcResponse(replyTo, correlationId, rpcReply) + return + } + client := c.(*comet.Client) + var replyChannel chan *comet.Message = nil + wait := 10 + replyChannel = make(chan *comet.Message) + seq := client.SendMessage(comet.MSG_ROUTER_COMMAND, []byte(msg.Cmd), replyChannel) + select { + case reply := <-replyChannel: + rpcReply.Status = 0 + rpcReply.Result = string(reply.Data) + this.SendRpcResponse(replyTo, correlationId, rpcReply) + return + case <-time.After(time.Duration(wait) * time.Second): + log.Warnf("MSG timeout. RequestId: %s, seq: %d", correlationId, seq) + client.MsgTimeout(seq) + rpcReply.Status = STATUS_SEND_TIMEOUT + this.SendRpcResponse(replyTo, correlationId, rpcReply) + return + } +} diff --git a/storage/redis.go b/storage/redis.go new file mode 100644 index 0000000..7304ad1 --- /dev/null +++ b/storage/redis.go @@ -0,0 +1,318 @@ +package storage + +import ( + "encoding/json" + "fmt" + log "github.com/cihub/seelog" + "github.com/garyburd/redigo/redis" + "sort" + "strings" + "time" +) + +type RedisStorage struct { + pool *redis.Pool + retry int +} + +func NewRedisStorage(server string, pass string, poolsize int, retry int) *RedisStorage { + return &RedisStorage{ + pool: &redis.Pool{ + MaxActive: poolsize, + MaxIdle: poolsize, + IdleTimeout: 300 * time.Second, + Dial: func() (redis.Conn, error) { + c, err := redis.Dial("tcp", server) + if err != nil { + log.Infof("failed to connect Redis (%s), (%s)", server, err) + return nil, err + } + if _, err := c.Do("AUTH", pass); err != nil { + log.Infof("failed to auth Redis (%s), (%s)", server, err) + return nil, err + + } + log.Infof("connected with Redis (%s)", server) + return c, err + }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + _, err := c.Do("PING") + return err + }, + }, + retry: retry, + } +} + +func (r *RedisStorage) Do(commandName string, args ...interface{}) (interface{}, error) { + var conn redis.Conn + i := r.retry + for ; i > 0; i-- { + conn = r.pool.Get() + err := conn.Err() + if err == nil { + break + } else { + log.Infof("failed to get conn from pool (%s)", err) + } + time.Sleep(time.Second) + } + if i == 0 || conn == nil { + return nil, fmt.Errorf("failed to find a useful redis conn") + } else { + ret, err := conn.Do(commandName, args...) + conn.Close() + return ret, err + } +} + +// 从存储后端获取 > 指定时间的所有消息 +func (r *RedisStorage) GetOfflineMsgs(appId string, regId string, msgId int64) []*RawMessage { + key := "db_offline_msg_" + appId + ret, err := redis.Strings(r.Do("HKEYS", key)) + if err != nil { + log.Infof("failed to get fields of offline msg:", err) + return nil + } + + now := time.Now().Unix() + skeys := make(map[int64]interface{}) + var sidxs []float64 + + for i := range ret { + var ( + idx int64 + expire int64 + ) + if _, err := fmt.Sscanf(ret[i], "%v_%v", &idx, &expire); err != nil { + log.Infof("invaild redis hash field:", err) + continue + } + + if idx <= msgId || expire <= now { + continue + } else { + skeys[idx] = ret[i] + sidxs = append(sidxs, float64(idx)) + } + } + + sort.Float64Slice(sidxs).Sort() + args := []interface{}{key} + for k := range sidxs { + t := int64(sidxs[k]) + args = append(args, skeys[t]) + } + + if len(args) == 1 { + return nil + } + + rmsgs, err := redis.Strings(r.Do("HMGET", args...)) + if err != nil { + log.Infof("failed to get offline rmsg:", err) + return nil + } + + var msgs []*RawMessage + for i := range rmsgs { + t := []byte(rmsgs[i]) + msg := &RawMessage{} + if err := json.Unmarshal(t, msg); err != nil { + log.Infof("failed to decode raw msg:", err) + continue + } + msgs = append(msgs, msg) + } + return msgs +} + +// 从存储后端获取指定消息 +func (r *RedisStorage) GetRawMsg(appId string, msgId int64) *RawMessage { + key := "db_msg_" + appId + ret, err := redis.Bytes(r.Do("HGET", key, msgId)) + if err != nil { + log.Warnf("redis: HGET failed (%s)", err) + return nil + } + rmsg := &RawMessage{} + if err := json.Unmarshal(ret, rmsg); err != nil { + log.Warnf("failed to decode raw msg:", err) + return nil + } + return rmsg +} + +func (r *RedisStorage) AddDevice(serverName, devId string) error { + _, err := redis.Int(r.Do("HSET", "db_comet_"+serverName, devId, nil)) + if err != nil { + log.Warnf("redis: HSET failed (%s)", err) + return err + } + return nil +} + +func (r *RedisStorage) RemoveDevice(serverName, devId string) error { + _, err := r.Do("HDEL", "db_comet_"+serverName, devId) + if err != nil { + log.Warnf("redis: HDEL failed (%s)", err) + return err + } + return nil +} + +func (r *RedisStorage) CheckDevice(devId string) (string, error) { + keys, err := redis.Strings(r.Do("KEYS", "db_comet_*")) + if err != nil { + log.Errorf("failed to get comet nodes KEYS:", err) + return "", err + } + for _, key := range keys { + exist, err := r.HashExists(key, devId) + if err != nil { + log.Errorf("error on HashExists:", err) + return "", err + } + if exist == 1 { + return strings.TrimPrefix(key, "db_comet_"), nil + } + } + return "", nil +} + +func (r *RedisStorage) RefreshDevices(serverName string, timeout int) error { + _, err := redis.Int(r.Do("EXPIRE", "db_comet_"+serverName, timeout)) + if err != nil { + log.Warnf("redis: EXPIRE failed, (%s)", err) + } + return err +} + +func (r *RedisStorage) InitDevices(serverName string) error { + _, err := redis.Int(r.Do("DEL", "db_comet_"+serverName)) + if err != nil { + log.Warnf("redis: DEL failed, (%s)", err) + } + return err +} + +func (r *RedisStorage) HashGetAll(db string) ([]string, error) { + ret, err := r.Do("HGETALL", db) + if err != nil { + log.Warnf("redis: HGET failed (%s)", err) + return nil, err + } + if ret != nil { + ret, err := redis.Strings(ret, nil) + if err != nil { + log.Warnf("redis: convert to strings failed (%s)", err) + } + return ret, err + } + return nil, nil +} + +func (r *RedisStorage) HashGet(db string, key string) ([]byte, error) { + ret, err := r.Do("HGET", db, key) + if err != nil { + log.Warnf("redis: HGET failed (%s)", err) + return nil, err + } + if ret != nil { + ret, err := redis.Bytes(ret, nil) + if err != nil { + log.Warnf("redis: convert to bytes failed (%s)", err) + } + return ret, err + } + return nil, nil +} + +func (r *RedisStorage) HashSet(db string, key string, val []byte) (int, error) { + ret, err := redis.Int(r.Do("HSET", db, key, val)) + if err != nil { + log.Warnf("redis: HSET failed (%s)", err) + } + return ret, err +} + +func (r *RedisStorage) HashExists(db string, key string) (int, error) { + ret, err := redis.Int(r.Do("HEXISTS", db, key)) + if err != nil { + log.Warnf("redis: HEXISTS failed (%s)", err) + } + return ret, err +} + +func (r *RedisStorage) HashSetNotExist(db string, key string, val []byte) (int, error) { + ret, err := redis.Int(r.Do("HSETNX", db, key, val)) + if err != nil { + log.Warnf("redis: HSETNX failed (%s)", err) + } + return ret, err +} + +func (r *RedisStorage) HashDel(db string, key string) (int, error) { + ret, err := redis.Int(r.Do("HDEL", db, key)) + if err != nil { + log.Warnf("redis: HDEL failed (%s)", err) + } + return ret, err +} + +func (r *RedisStorage) HashIncrBy(db string, key string, val int64) (int64, error) { + ret, err := redis.Int64(r.Do("HINCRBY", db, key, val)) + if err != nil { + log.Warnf("redis: HINCRBY failed, (%s)", err) + } + return ret, err +} + +func (r *RedisStorage) SetNotExist(key string, val []byte) (int, error) { + ret, err := redis.Int(r.Do("SETNX", key, val)) + if err != nil { + log.Warnf("redis: SETNX failed (%s)", err) + } + return ret, err +} + +func (r *RedisStorage) IncrBy(key string, val int64) (int64, error) { + ret, err := redis.Int64(r.Do("INCRBY", key, val)) + if err != nil { + log.Warnf("redis: INCRBY failed, (%s)", err) + } + return ret, err +} + +func (r *RedisStorage) SetAdd(key string, val string) (int, error) { + ret, err := redis.Int(r.Do("SADD", key, val)) + if err != nil { + log.Warnf("redis: SADD failed, (%s)", err) + } + return ret, err +} + +func (r *RedisStorage) SetDel(key string, val string) (int, error) { + ret, err := redis.Int(r.Do("SREM", key, val)) + if err != nil { + log.Warnf("redis: SREM failed, (%s)", err) + } + return ret, err +} + +func (r *RedisStorage) SetIsMember(key string, val string) (int, error) { + ret, err := redis.Int(r.Do("SISMEMBER", key, val)) + if err != nil { + log.Warnf("redis: SISMEMBER failed, (%s)", err) + } + return ret, err + +} + +func (r *RedisStorage) SetMembers(key string) ([]string, error) { + ret, err := redis.Strings(r.Do("SMEMBERS", key)) + if err != nil { + log.Warnf("redis: SMEMBERS failed, (%s)", err) + } + return ret, err +} diff --git a/storage/storage.go b/storage/storage.go new file mode 100644 index 0000000..482c04b --- /dev/null +++ b/storage/storage.go @@ -0,0 +1,80 @@ +package storage + +import () + +type RawMessage struct { + AppSec string `json:"appsec,omitempty"` + Token string `json:"token,omitempty"` + MsgId int64 `json:"msgid"` + AppId string `json:"appid"` + Pkg string `json:"pkg"` + CTime int64 `json:"ctime"` + Platform string `json:"platform,omitempty"` + MsgType int `json:"msg_type"` + PushType int `json:"push_type"` + PushParams struct { + RegId []string `json:"regid,omitempty"` + UserId []string `json:"userid,omitempty"` + DevId []string `json:"devid,omitempty"` + Topic string `json:"topic,omitempty"` + } `json:"push_params"` + Content string `json:"content,omitempty"` + Notification struct { + Title string `json:"title"` + Desc string `json:"desc"` + Type int `json:"type"` + SoundUri string `json:"sound_uri,omitempty` + Action int `json:"action,omitempty"` + IntentUri string `json:"intent_uri,omitempty` + WebUri string `json:"web_uri,omitempty` + } `json:"notification,omitempty"` + Options struct { + TTL int64 `json:"ttl,omitempty"` + TTS int64 `json:"tts,omitempty"` + } `json:"options"` +} + +type RawApp struct { + Pkg string `json:"pkg"` + UserId string `json:"userid"` + AppKey string `json:"appkey,omitempty"` + AppSec string `json:"appsec,omitempty"` +} + +type Storage interface { + GetOfflineMsgs(appId string, regId string, ctime int64) []*RawMessage + GetRawMsg(appId string, msgId int64) *RawMessage + + AddDevice(serverName, devId string) error + RemoveDevice(serverName, devId string) error + + // check if the device Id exists, return the server name + CheckDevice(devId string) (string, error) + RefreshDevices(serverName string, timeout int) error + InitDevices(serverName string) error + + HashGetAll(db string) ([]string, error) + HashGet(db string, key string) ([]byte, error) + HashSet(db string, key string, val []byte) (int, error) + HashExists(db string, key string) (int, error) + HashSetNotExist(db string, key string, val []byte) (int, error) + HashDel(db string, key string) (int, error) + HashIncrBy(db string, key string, val int64) (int64, error) + + SetNotExist(key string, val []byte) (int, error) + IncrBy(key string, val int64) (int64, error) + + SetAdd(key string, val string) (int, error) + SetDel(key string, val string) (int, error) + SetIsMember(key string, val string) (int, error) + SetMembers(key string) ([]string, error) +} + +var ( + Instance Storage = nil +) + +func NewInstance(server string, pass string, poolsize int, retry int) bool { + Instance = NewRedisStorage(server, pass, poolsize, retry) + return true +} diff --git a/utils/convert/convert.go b/utils/convert/convert.go deleted file mode 100644 index 4916d8b..0000000 --- a/utils/convert/convert.go +++ /dev/null @@ -1,78 +0,0 @@ -package convert - -import ( - "crypto/md5" - "fmt" - "io" - "time" -) - -// 均采用大端字节序,事实上这个字节序没有任何用处, -// 只要服务器和客户端约定采用相同的字节序就行 - -func Uint32ToBytes(v uint32) []byte { - buf := make([]byte, 4) - buf[0] = byte(v >> 24) - buf[1] = byte(v >> 16) - buf[2] = byte(v >> 8) - buf[3] = byte(v) - return buf -} - -func Int32ToBytes(v int32) []byte { - buf := make([]byte, 4) - buf[0] = byte(v >> 24) - buf[1] = byte(v >> 16) - buf[2] = byte(v >> 8) - buf[3] = byte(v) - return buf -} - -func Uint16ToBytes(v uint16) []byte { - buf := make([]byte, 2) - buf[0] = byte(v >> 8) - buf[1] = byte(v) - return buf -} - -func Int16ToBytes(v int16) []byte { - buf := make([]byte, 2) - buf[0] = byte(v >> 8) - buf[1] = byte(v) - return buf -} - -func BytesToUint32(buf []byte) uint32 { - v := (uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])) - return v -} - -func BytesToInt32(buf []byte) int32 { - v := (int32(buf[0])<<24 | int32(buf[1])<<16 | int32(buf[2])<<8 | int32(buf[3])) - return v -} - -func BytesToUint16(buf []byte) uint16 { - v := (uint16(buf[0])<<8 | uint16(buf[1])) - return v -} - -func BytesToInt16(buf []byte) int16 { - v := (int16(buf[0])<<8 | int16(buf[1])) - return v -} - -func TimestampToTimeString(timestamp int64) string { - return time.Unix(timestamp, 0).Format("2006-01-02 15:04:05") -} - -func TimestampToTime(timestamp int64) time.Time { - return time.Unix(timestamp, 0) -} - -// 计算string的md5值, 返回长度32的字符串(md5) -func StringToMd5(s string) string { - m := md5.New() - io.WriteString(m, s) - return fmt.Sprintf("%x", m.Sum(nil)) -} diff --git a/utils/convert/convert_test.go b/utils/convert/convert_test.go deleted file mode 100644 index a14eeab..0000000 --- a/utils/convert/convert_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package convert - -import ( - "fmt" - "testing" - "time" -) - -func TestConvert(t *testing.T) { - var a uint32 = 199211 - bytes := Uint32ToBytes(a) - var b uint32 = BytesToUint32(bytes) - if a != b { - t.Error("convert error") - } - - a = 0xffffffff - b = BytesToUint32(Uint32ToBytes(a)) - if a != b { - t.Error("convert error") - } - - var c uint16 = 0xfefe - bytes2 := Uint16ToBytes(c) - var d uint16 = BytesToUint16(bytes2) - if c != d { - t.Error("convert error") - } - - c = 65535 - d = BytesToUint16(Uint16ToBytes(c)) - if c != d { - t.Error("convert error") - } - - var e int32 = 98765 - bytes3 := Int32ToBytes(e) - var f int32 = BytesToInt32(bytes3) - if e != f { - t.Error("convert error") - } - - tt := time.Now() - timestamp := tt.Unix() - fmt.Println(TimestampToTimeString(timestamp)) - fmt.Println(TimestampToTime(timestamp)) - fmt.Println(tt) -} diff --git a/utils/funcmap/funcmap.go b/utils/funcmap/funcmap.go deleted file mode 100644 index 7aaf758..0000000 --- a/utils/funcmap/funcmap.go +++ /dev/null @@ -1,50 +0,0 @@ -package funcmap - -import ( - "errors" - "reflect" - "strconv" -) - -type FuncMap struct { - funcs map[uint32]reflect.Value -} - -func NewFuncMap() *FuncMap { - return &FuncMap{ - funcs: make(map[uint32]reflect.Value), - } -} - -func (this *FuncMap) Bind(fnid uint32, fn interface{}) (err error) { - defer func() { - if e := recover(); e != nil { - err = errors.New(strconv.Itoa(int(fnid)) + " is not callable.") - } - }() - - v := reflect.ValueOf(fn) - v.Type().NumIn() // fn不是函数则panic - this.funcs[fnid] = v - return -} - -func (this *FuncMap) Call(fnid uint32, params ...interface{}) (result []reflect.Value, err error) { - if _, ok := this.funcs[fnid]; !ok { - err = errors.New(strconv.Itoa(int(fnid)) + " does not exist.") - return - } - - in := make([]reflect.Value, len(params)) - for k, param := range params { - in[k] = reflect.ValueOf(param) - } - - result = this.funcs[fnid].Call(in) - return -} - -func (this *FuncMap) Exist(fnid uint32) bool { - _, ok := this.funcs[fnid] - return ok -} diff --git a/utils/funcmap/funcmap_test.go b/utils/funcmap/funcmap_test.go deleted file mode 100644 index bab4b87..0000000 --- a/utils/funcmap/funcmap_test.go +++ /dev/null @@ -1,76 +0,0 @@ -package funcmap - -import ( - "fmt" - "testing" -) - -type T struct { - id int - name string - by []byte -} - -func (this *T) ft() { - this.id = 100 - this.name = "lijie" - this.by = []byte("hello world") -} - -func (this *T) fp() { - fmt.Println(this.id, this.name, string(this.by)) -} - -func f1() { - fmt.Println("f1()") -} - -func f2(i int, s string, arr []int, t *T) { - fmt.Println("start f2()") - fmt.Println(i, s) - fmt.Println(arr) - t.ft() - t.fp() - fmt.Println("end f2()") -} - -func f3(in ...interface{}) []int { - arr := make([]int, 0) - for _, v := range in { - arr = append(arr, v.(int)) - } - return arr -} - -func TestFuncMap(t *testing.T) { - - funcMap := NewFuncMap() - funcMap.Bind(1, f1) - funcMap.Bind(2, f2) - funcMap.Bind(3, f3) - - funcMap.Call(1) - funcMap.Call(2, 99, "sss", []int{1, 2, 3}, &T{}) - arr, err := funcMap.Call(3, 7, 8, 9, 10) - - if err == nil { - for i := 0; i < arr[0].Len(); i++ { - fmt.Println(arr[0].Index(i).Int()) - } - } else { - t.Error(err) - } - - _, err = funcMap.Call(4, 9) - if err == nil { - t.Error(err) - } - - if funcMap.Exist(10) { - t.Error("Exist error") - } - if !funcMap.Exist(3) { - t.Error("Exist error") - } - -} diff --git a/utils/jsonmessage.go b/utils/jsonmessage.go deleted file mode 100644 index cdd01ac..0000000 --- a/utils/jsonmessage.go +++ /dev/null @@ -1,156 +0,0 @@ -package utils - -import ( - "encoding/json" - "fmt" - //"github.com/dotcloud/docker/pkg/term" - "io" - "strings" - "time" -) - -type JSONError struct { - Code int `json:"code,omitempty"` - Message string `json:"message,omitempty"` -} - -func (e *JSONError) Error() string { - return e.Message -} - -type JSONProgress struct { - terminalFd uintptr - Current int `json:"current,omitempty"` - Total int `json:"total,omitempty"` - Start int64 `json:"start,omitempty"` -} - -func (p *JSONProgress) String() string { - var ( - width = 200 - pbBox string - numbersBox string - timeLeftBox string - ) - - if p.Current <= 0 && p.Total <= 0 { - return "" - } - current := HumanSize(int64(p.Current)) - if p.Total <= 0 { - return fmt.Sprintf("%8v", current) - } - total := HumanSize(int64(p.Total)) - percentage := int(float64(p.Current)/float64(p.Total)*100) / 2 - if width > 110 { - pbBox = fmt.Sprintf("[%s>%s] ", strings.Repeat("=", percentage), strings.Repeat(" ", 50-percentage)) - } - numbersBox = fmt.Sprintf("%8v/%v", current, total) - - if p.Start > 0 && percentage < 50 { - fromStart := time.Now().UTC().Sub(time.Unix(int64(p.Start), 0)) - perEntry := fromStart / time.Duration(p.Current) - left := time.Duration(p.Total-p.Current) * perEntry - left = (left / time.Second) * time.Second - - if width > 50 { - timeLeftBox = " " + left.String() - } - } - return pbBox + numbersBox + timeLeftBox -} - -type JSONMessage struct { - Stream string `json:"stream,omitempty"` - Status string `json:"status,omitempty"` - Progress *JSONProgress `json:"progressDetail,omitempty"` - ProgressMessage string `json:"progress,omitempty"` //deprecated - ID string `json:"id,omitempty"` - From string `json:"from,omitempty"` - Time int64 `json:"time,omitempty"` - Error *JSONError `json:"errorDetail,omitempty"` - ErrorMessage string `json:"error,omitempty"` //deprecated -} - -func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error { - if jm.Error != nil { - if jm.Error.Code == 401 { - return fmt.Errorf("Authentication is required.") - } - return jm.Error - } - var endl string - if isTerminal { - // [2K = erase entire current line - fmt.Fprintf(out, "%c[2K\r", 27) - endl = "\r" - } else if jm.Progress != nil { //disable progressbar in non-terminal - return nil - } - if jm.Time != 0 { - fmt.Fprintf(out, "[%s] ", time.Unix(jm.Time, 0)) - } - if jm.ID != "" { - fmt.Fprintf(out, "%s: ", jm.ID) - } - if jm.From != "" { - fmt.Fprintf(out, "(from %s) ", jm.From) - } - if jm.Progress != nil { - fmt.Fprintf(out, "%s %s%s", jm.Status, jm.Progress.String(), endl) - } else if jm.ProgressMessage != "" { //deprecated - fmt.Fprintf(out, "%s %s%s", jm.Status, jm.ProgressMessage, endl) - } else if jm.Stream != "" { - fmt.Fprintf(out, "%s%s", jm.Stream, endl) - } else { - fmt.Fprintf(out, "%s%s\n", jm.Status, endl) - } - return nil -} - -func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool) error { - var ( - dec = json.NewDecoder(in) - ids = make(map[string]int) - diff = 0 - ) - for { - var jm JSONMessage - if err := dec.Decode(&jm); err != nil { - if err == io.EOF { - break - } - return err - } - - if jm.Progress != nil { - jm.Progress.terminalFd = terminalFd - } - if jm.Progress != nil || jm.ProgressMessage != "" { - line, ok := ids[jm.ID] - if !ok { - line = len(ids) - ids[jm.ID] = line - fmt.Fprintf(out, "\n") - diff = 0 - } else { - diff = len(ids) - line - } - if isTerminal { - // [{diff}A = move cursor up diff rows - fmt.Fprintf(out, "%c[%dA", 27, diff) - } - } - err := jm.Display(out, isTerminal) - if jm.ID != "" { - if isTerminal { - // [{diff}B = move cursor down diff rows - fmt.Fprintf(out, "%c[%dB", 27, diff) - } - } - if err != nil { - return err - } - } - return nil -} diff --git a/utils/random.go b/utils/random.go deleted file mode 100644 index d4d33c6..0000000 --- a/utils/random.go +++ /dev/null @@ -1,16 +0,0 @@ -package utils - -import ( - "crypto/rand" - "encoding/hex" - "io" -) - -func RandomString() string { - id := make([]byte, 32) - _, err := io.ReadFull(rand.Reader, id) - if err != nil { - panic(err) // This shouldn't happen - } - return hex.EncodeToString(id) -} diff --git a/utils/safemap/safemap.go b/utils/safemap/safemap.go deleted file mode 100644 index 348aaab..0000000 --- a/utils/safemap/safemap.go +++ /dev/null @@ -1,74 +0,0 @@ -package safemap - -import ( - "sync" -) - -type SafeMap struct { - lock *sync.RWMutex - mp map[interface{}]interface{} -} - -// NewSafeMap return new safemap. -func NewSafeMap() *SafeMap { - return &SafeMap{ - lock: new(sync.RWMutex), - mp: make(map[interface{}]interface{}), - } -} - -// Get from maps return the k's value. -func (this *SafeMap) Get(k interface{}) interface{} { - this.lock.RLock() - defer this.lock.RUnlock() - if val, ok := this.mp[k]; ok { - return val - } - return nil -} - -// Maps the given key and value. -// Returns false if the key is already in the map and changes nothing. -func (this *SafeMap) Set(k interface{}, v interface{}) bool { - this.lock.Lock() - defer this.lock.Unlock() - if val, ok := this.mp[k]; !ok { - this.mp[k] = v - } else if val != v { - this.mp[k] = v - } else { - return false - } - return true -} - -// Returns true if k is exist in the map. -func (this *SafeMap) Check(k interface{}) bool { - this.lock.RLock() - defer this.lock.RUnlock() - if _, ok := this.mp[k]; !ok { - return false - } - return true -} - -// Delete the given key and value. -func (this *SafeMap) Delete(k interface{}) { - this.lock.Lock() - defer this.lock.Unlock() - delete(this.mp, k) -} - -// Items returns all items in safemap -func (this *SafeMap) Items() map[interface{}]interface{} { - this.lock.RLock() - defer this.lock.RUnlock() - return this.mp -} - -// Size returns the size of the safemap -func (this *SafeMap) Size() int { - this.lock.RLock() - defer this.lock.RUnlock() - return len(this.mp) -} diff --git a/utils/safemap/safemap_test.go b/utils/safemap/safemap_test.go deleted file mode 100644 index 9c9aa7e..0000000 --- a/utils/safemap/safemap_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package safemap - -import ( - "testing" -) - -func TestSafeMap(t *testing.T) { - mp := NewSafeMap() - if !mp.Set("lijie", int(1)) { - t.Error("set error") - } - - if !mp.Check("lijie") { - t.Error("check error") - } - - if v := mp.Get("lijie"); v.(int) != 1 { - t.Error("get error") - } - - if mp.Size() != 1 { - t.Error("size error") - } - - mp.Delete("lijie") - if mp.Check("lijie") { - t.Error("delete error") - } - - mp.Delete("no") -} diff --git a/utils/utils.go b/utils/utils.go deleted file mode 100644 index 0567547..0000000 --- a/utils/utils.go +++ /dev/null @@ -1,1137 +0,0 @@ -package utils - -import ( - "bytes" - "crypto/sha1" - "crypto/sha256" - "encoding/hex" - "encoding/json" - "fmt" - "index/suffixarray" - "io" - "io/ioutil" - "net/http" - "os" - "os/exec" - "path/filepath" - "regexp" - "runtime" - "strconv" - "strings" - "sync" - "time" -) - -var ( - IAMSTATIC bool // whether or not Docker itself was compiled statically via ./hack/make.sh binary - INITSHA1 string // sha1sum of separate static dockerinit, if Docker itself was compiled dynamically via ./hack/make.sh dynbinary - INITPATH string // custom location to search for a valid dockerinit binary (available for packagers as a last resort escape hatch) -) - -// A common interface to access the Fatal method of -// both testing.B and testing.T. -type Fataler interface { - Fatal(args ...interface{}) -} - -// Go is a basic promise implementation: it wraps calls a function in a goroutine, -// and returns a channel which will later return the function's return value. -func Go(f func() error) chan error { - ch := make(chan error) - go func() { - ch <- f() - }() - return ch -} - -// Request a given URL and return an io.Reader -func Download(url string) (resp *http.Response, err error) { - if resp, err = http.Get(url); err != nil { - return nil, err - } - if resp.StatusCode >= 400 { - return nil, fmt.Errorf("Got HTTP status code >= 400: %s", resp.Status) - } - return resp, nil -} - -func logf(level string, format string, a ...interface{}) { - // Retrieve the stack infos - _, file, line, ok := runtime.Caller(2) - if !ok { - file = "" - line = -1 - } else { - file = file[strings.LastIndex(file, "/")+1:] - } - - fmt.Fprintf(os.Stderr, fmt.Sprintf("[%s] %s:%d %s\n", level, file, line, format), a...) -} - -// Debug function, if the debug flag is set, then display. Do nothing otherwise -// If Docker is in damon mode, also send the debug info on the socket -func Debugf(format string, a ...interface{}) { - if os.Getenv("DEBUG") != "" { - logf("debug", format, a...) - } -} - -func Errorf(format string, a ...interface{}) { - logf("error", format, a...) -} - -// HumanDuration returns a human-readable approximation of a duration -// (eg. "About a minute", "4 hours ago", etc.) -func HumanDuration(d time.Duration) string { - if seconds := int(d.Seconds()); seconds < 1 { - return "Less than a second" - } else if seconds < 60 { - return fmt.Sprintf("%d seconds", seconds) - } else if minutes := int(d.Minutes()); minutes == 1 { - return "About a minute" - } else if minutes < 60 { - return fmt.Sprintf("%d minutes", minutes) - } else if hours := int(d.Hours()); hours == 1 { - return "About an hour" - } else if hours < 48 { - return fmt.Sprintf("%d hours", hours) - } else if hours < 24*7*2 { - return fmt.Sprintf("%d days", hours/24) - } else if hours < 24*30*3 { - return fmt.Sprintf("%d weeks", hours/24/7) - } else if hours < 24*365*2 { - return fmt.Sprintf("%d months", hours/24/30) - } - return fmt.Sprintf("%f years", d.Hours()/24/365) -} - -// HumanSize returns a human-readable approximation of a size -// using SI standard (eg. "44kB", "17MB") -func HumanSize(size int64) string { - i := 0 - var sizef float64 - sizef = float64(size) - units := []string{"B", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"} - for sizef >= 1000.0 { - sizef = sizef / 1000.0 - i++ - } - return fmt.Sprintf("%.4g %s", sizef, units[i]) -} - -// Parses a human-readable string representing an amount of RAM -// in bytes, kibibytes, mebibytes or gibibytes, and returns the -// number of bytes, or -1 if the string is unparseable. -// Units are case-insensitive, and the 'b' suffix is optional. -func RAMInBytes(size string) (bytes int64, err error) { - re, error := regexp.Compile("^(\\d+)([kKmMgG])?[bB]?$") - if error != nil { - return -1, error - } - - matches := re.FindStringSubmatch(size) - - if len(matches) != 3 { - return -1, fmt.Errorf("Invalid size: '%s'", size) - } - - memLimit, error := strconv.ParseInt(matches[1], 10, 0) - if error != nil { - return -1, error - } - - unit := strings.ToLower(matches[2]) - - if unit == "k" { - memLimit *= 1024 - } else if unit == "m" { - memLimit *= 1024 * 1024 - } else if unit == "g" { - memLimit *= 1024 * 1024 * 1024 - } - - return memLimit, nil -} - -func Trunc(s string, maxlen int) string { - if len(s) <= maxlen { - return s - } - return s[:maxlen] -} - -// Figure out the absolute path of our own binary (if it's still around). -func SelfPath() string { - path, err := exec.LookPath(os.Args[0]) - if err != nil { - if os.IsNotExist(err) { - return "" - } - if execErr, ok := err.(*exec.Error); ok && os.IsNotExist(execErr.Err) { - return "" - } - panic(err) - } - path, err = filepath.Abs(path) - if err != nil { - if os.IsNotExist(err) { - return "" - } - panic(err) - } - return path -} - -func dockerInitSha1(target string) string { - f, err := os.Open(target) - if err != nil { - return "" - } - defer f.Close() - h := sha1.New() - _, err = io.Copy(h, f) - if err != nil { - return "" - } - return hex.EncodeToString(h.Sum(nil)) -} - -func isValidDockerInitPath(target string, selfPath string) bool { // target and selfPath should be absolute (InitPath and SelfPath already do this) - if target == "" { - return false - } - if IAMSTATIC { - if selfPath == "" { - return false - } - if target == selfPath { - return true - } - targetFileInfo, err := os.Lstat(target) - if err != nil { - return false - } - selfPathFileInfo, err := os.Lstat(selfPath) - if err != nil { - return false - } - return os.SameFile(targetFileInfo, selfPathFileInfo) - } - return INITSHA1 != "" && dockerInitSha1(target) == INITSHA1 -} - -// Figure out the path of our dockerinit (which may be SelfPath()) -func DockerInitPath(localCopy string) string { - selfPath := SelfPath() - if isValidDockerInitPath(selfPath, selfPath) { - // if we're valid, don't bother checking anything else - return selfPath - } - var possibleInits = []string{ - localCopy, - INITPATH, - filepath.Join(filepath.Dir(selfPath), "dockerinit"), - - // FHS 3.0 Draft: "/usr/libexec includes internal binaries that are not intended to be executed directly by users or shell scripts. Applications may use a single subdirectory under /usr/libexec." - // http://www.linuxbase.org/betaspecs/fhs/fhs.html#usrlibexec - "/usr/libexec/docker/dockerinit", - "/usr/local/libexec/docker/dockerinit", - - // FHS 2.3: "/usr/lib includes object files, libraries, and internal binaries that are not intended to be executed directly by users or shell scripts." - // http://refspecs.linuxfoundation.org/FHS_2.3/fhs-2.3.html#USRLIBLIBRARIESFORPROGRAMMINGANDPA - "/usr/lib/docker/dockerinit", - "/usr/local/lib/docker/dockerinit", - } - for _, dockerInit := range possibleInits { - if dockerInit == "" { - continue - } - path, err := exec.LookPath(dockerInit) - if err == nil { - path, err = filepath.Abs(path) - if err != nil { - // LookPath already validated that this file exists and is executable (following symlinks), so how could Abs fail? - panic(err) - } - if isValidDockerInitPath(path, selfPath) { - return path - } - } - } - return "" -} - -type NopWriter struct{} - -func (*NopWriter) Write(buf []byte) (int, error) { - return len(buf), nil -} - -type nopWriteCloser struct { - io.Writer -} - -func (w *nopWriteCloser) Close() error { return nil } - -func NopWriteCloser(w io.Writer) io.WriteCloser { - return &nopWriteCloser{w} -} - -type bufReader struct { - sync.Mutex - buf *bytes.Buffer - reader io.Reader - err error - wait sync.Cond -} - -func NewBufReader(r io.Reader) *bufReader { - reader := &bufReader{ - buf: &bytes.Buffer{}, - reader: r, - } - reader.wait.L = &reader.Mutex - go reader.drain() - return reader -} - -func (r *bufReader) drain() { - buf := make([]byte, 1024) - for { - n, err := r.reader.Read(buf) - r.Lock() - if err != nil { - r.err = err - } else { - r.buf.Write(buf[0:n]) - } - r.wait.Signal() - r.Unlock() - if err != nil { - break - } - } -} - -func (r *bufReader) Read(p []byte) (n int, err error) { - r.Lock() - defer r.Unlock() - for { - n, err = r.buf.Read(p) - if n > 0 { - return n, err - } - if r.err != nil { - return 0, r.err - } - r.wait.Wait() - } -} - -func (r *bufReader) Close() error { - closer, ok := r.reader.(io.ReadCloser) - if !ok { - return nil - } - return closer.Close() -} - -type WriteBroadcaster struct { - sync.Mutex - buf *bytes.Buffer - writers map[StreamWriter]bool -} - -type StreamWriter struct { - wc io.WriteCloser - stream string -} - -func (w *WriteBroadcaster) AddWriter(writer io.WriteCloser, stream string) { - w.Lock() - sw := StreamWriter{wc: writer, stream: stream} - w.writers[sw] = true - w.Unlock() -} - -type JSONLog struct { - Log string `json:"log,omitempty"` - Stream string `json:"stream,omitempty"` - Created time.Time `json:"time"` -} - -func (w *WriteBroadcaster) Write(p []byte) (n int, err error) { - w.Lock() - defer w.Unlock() - w.buf.Write(p) - for sw := range w.writers { - lp := p - if sw.stream != "" { - lp = nil - for { - line, err := w.buf.ReadString('\n') - if err != nil { - w.buf.Write([]byte(line)) - break - } - b, err := json.Marshal(&JSONLog{Log: line, Stream: sw.stream, Created: time.Now().UTC()}) - if err != nil { - // On error, evict the writer - delete(w.writers, sw) - continue - } - lp = append(lp, b...) - lp = append(lp, '\n') - } - } - if n, err := sw.wc.Write(lp); err != nil || n != len(lp) { - // On error, evict the writer - delete(w.writers, sw) - } - } - return len(p), nil -} - -func (w *WriteBroadcaster) CloseWriters() error { - w.Lock() - defer w.Unlock() - for sw := range w.writers { - sw.wc.Close() - } - w.writers = make(map[StreamWriter]bool) - return nil -} - -func NewWriteBroadcaster() *WriteBroadcaster { - return &WriteBroadcaster{writers: make(map[StreamWriter]bool), buf: bytes.NewBuffer(nil)} -} - -func GetTotalUsedFds() int { - if fds, err := ioutil.ReadDir(fmt.Sprintf("/proc/%d/fd", os.Getpid())); err != nil { - Errorf("Error opening /proc/%d/fd: %s", os.Getpid(), err) - } else { - return len(fds) - } - return -1 -} - -// TruncIndex allows the retrieval of string identifiers by any of their unique prefixes. -// This is used to retrieve image and container IDs by more convenient shorthand prefixes. -type TruncIndex struct { - index *suffixarray.Index - ids map[string]bool - bytes []byte -} - -func NewTruncIndex() *TruncIndex { - return &TruncIndex{ - index: suffixarray.New([]byte{' '}), - ids: make(map[string]bool), - bytes: []byte{' '}, - } -} - -func (idx *TruncIndex) Add(id string) error { - if strings.Contains(id, " ") { - return fmt.Errorf("Illegal character: ' '") - } - if _, exists := idx.ids[id]; exists { - return fmt.Errorf("Id already exists: %s", id) - } - idx.ids[id] = true - idx.bytes = append(idx.bytes, []byte(id+" ")...) - idx.index = suffixarray.New(idx.bytes) - return nil -} - -func (idx *TruncIndex) Delete(id string) error { - if _, exists := idx.ids[id]; !exists { - return fmt.Errorf("No such id: %s", id) - } - before, after, err := idx.lookup(id) - if err != nil { - return err - } - delete(idx.ids, id) - idx.bytes = append(idx.bytes[:before], idx.bytes[after:]...) - idx.index = suffixarray.New(idx.bytes) - return nil -} - -func (idx *TruncIndex) lookup(s string) (int, int, error) { - offsets := idx.index.Lookup([]byte(" "+s), -1) - //log.Printf("lookup(%s): %v (index bytes: '%s')\n", s, offsets, idx.index.Bytes()) - if offsets == nil || len(offsets) == 0 || len(offsets) > 1 { - return -1, -1, fmt.Errorf("No such id: %s", s) - } - offsetBefore := offsets[0] + 1 - offsetAfter := offsetBefore + strings.Index(string(idx.bytes[offsetBefore:]), " ") - return offsetBefore, offsetAfter, nil -} - -func (idx *TruncIndex) Get(s string) (string, error) { - before, after, err := idx.lookup(s) - //log.Printf("Get(%s) bytes=|%s| before=|%d| after=|%d|\n", s, idx.bytes, before, after) - if err != nil { - return "", err - } - return string(idx.bytes[before:after]), err -} - -// TruncateID returns a shorthand version of a string identifier for convenience. -// A collision with other shorthands is very unlikely, but possible. -// In case of a collision a lookup with TruncIndex.Get() will fail, and the caller -// will need to use a langer prefix, or the full-length Id. -func TruncateID(id string) string { - shortLen := 12 - if len(id) < shortLen { - shortLen = len(id) - } - return id[:shortLen] -} - -// Code c/c from io.Copy() modified to handle escape sequence -func CopyEscapable(dst io.Writer, src io.ReadCloser) (written int64, err error) { - buf := make([]byte, 32*1024) - for { - nr, er := src.Read(buf) - if nr > 0 { - // ---- Docker addition - // char 16 is C-p - if nr == 1 && buf[0] == 16 { - nr, er = src.Read(buf) - // char 17 is C-q - if nr == 1 && buf[0] == 17 { - if err := src.Close(); err != nil { - return 0, err - } - return 0, nil - } - } - // ---- End of docker - nw, ew := dst.Write(buf[0:nr]) - if nw > 0 { - written += int64(nw) - } - if ew != nil { - err = ew - break - } - if nr != nw { - err = io.ErrShortWrite - break - } - } - if er == io.EOF { - break - } - if er != nil { - err = er - break - } - } - return written, err -} - -func HashData(src io.Reader) (string, error) { - h := sha256.New() - if _, err := io.Copy(h, src); err != nil { - return "", err - } - return "sha256:" + hex.EncodeToString(h.Sum(nil)), nil -} - -type KernelVersionInfo struct { - Kernel int - Major int - Minor int - Flavor string -} - -func (k *KernelVersionInfo) String() string { - flavor := "" - if len(k.Flavor) > 0 { - flavor = fmt.Sprintf("-%s", k.Flavor) - } - return fmt.Sprintf("%d.%d.%d%s", k.Kernel, k.Major, k.Minor, flavor) -} - -// Compare two KernelVersionInfo struct. -// Returns -1 if a < b, = if a == b, 1 it a > b -func CompareKernelVersion(a, b *KernelVersionInfo) int { - if a.Kernel < b.Kernel { - return -1 - } else if a.Kernel > b.Kernel { - return 1 - } - - if a.Major < b.Major { - return -1 - } else if a.Major > b.Major { - return 1 - } - - if a.Minor < b.Minor { - return -1 - } else if a.Minor > b.Minor { - return 1 - } - - return 0 -} -/* -func GetKernelVersion() (*KernelVersionInfo, error) { - var ( - err error - ) - - uts, err := uname() - if err != nil { - return nil, err - } - - release := make([]byte, len(uts.Release)) - - i := 0 - for _, c := range uts.Release { - release[i] = byte(c) - i++ - } - - // Remove the \x00 from the release for Atoi to parse correctly - release = release[:bytes.IndexByte(release, 0)] - - return ParseRelease(string(release)) -} -*/ -func ParseRelease(release string) (*KernelVersionInfo, error) { - var ( - flavor string - kernel, major, minor int - err error - ) - - tmp := strings.SplitN(release, "-", 2) - tmp2 := strings.Split(tmp[0], ".") - - if len(tmp2) > 0 { - kernel, err = strconv.Atoi(tmp2[0]) - if err != nil { - return nil, err - } - } - - if len(tmp2) > 1 { - major, err = strconv.Atoi(tmp2[1]) - if err != nil { - return nil, err - } - } - - if len(tmp2) > 2 { - // Removes "+" because git kernels might set it - minorUnparsed := strings.Trim(tmp2[2], "+") - minor, err = strconv.Atoi(minorUnparsed) - if err != nil { - return nil, err - } - } - - if len(tmp) == 2 { - flavor = tmp[1] - } else { - flavor = "" - } - - return &KernelVersionInfo{ - Kernel: kernel, - Major: major, - Minor: minor, - Flavor: flavor, - }, nil -} - -// FIXME: this is deprecated by CopyWithTar in archive.go -func CopyDirectory(source, dest string) error { - if output, err := exec.Command("cp", "-ra", source, dest).CombinedOutput(); err != nil { - return fmt.Errorf("Error copy: %s (%s)", err, output) - } - return nil -} - -type NopFlusher struct{} - -func (f *NopFlusher) Flush() {} - -type WriteFlusher struct { - sync.Mutex - w io.Writer - flusher http.Flusher -} - -func (wf *WriteFlusher) Write(b []byte) (n int, err error) { - wf.Lock() - defer wf.Unlock() - n, err = wf.w.Write(b) - wf.flusher.Flush() - return n, err -} - -// Flush the stream immediately. -func (wf *WriteFlusher) Flush() { - wf.Lock() - defer wf.Unlock() - wf.flusher.Flush() -} - -func NewWriteFlusher(w io.Writer) *WriteFlusher { - var flusher http.Flusher - if f, ok := w.(http.Flusher); ok { - flusher = f - } else { - flusher = &NopFlusher{} - } - return &WriteFlusher{w: w, flusher: flusher} -} - -func NewHTTPRequestError(msg string, res *http.Response) error { - return &JSONError{ - Message: msg, - Code: res.StatusCode, - } -} - -func IsURL(str string) bool { - return strings.HasPrefix(str, "http://") || strings.HasPrefix(str, "https://") -} - -func IsGIT(str string) bool { - return strings.HasPrefix(str, "git://") || strings.HasPrefix(str, "github.com/") -} - -// GetResolvConf opens and read the content of /etc/resolv.conf. -// It returns it as byte slice. -func GetResolvConf() ([]byte, error) { - resolv, err := ioutil.ReadFile("/etc/resolv.conf") - if err != nil { - Errorf("Error openning resolv.conf: %s", err) - return nil, err - } - return resolv, nil -} - -// CheckLocalDns looks into the /etc/resolv.conf, -// it returns true if there is a local nameserver or if there is no nameserver. -func CheckLocalDns(resolvConf []byte) bool { - var parsedResolvConf = StripComments(resolvConf, []byte("#")) - if !bytes.Contains(parsedResolvConf, []byte("nameserver")) { - return true - } - for _, ip := range [][]byte{ - []byte("127.0.0.1"), - []byte("127.0.1.1"), - } { - if bytes.Contains(parsedResolvConf, ip) { - return true - } - } - return false -} - -// StripComments parses input into lines and strips away comments. -func StripComments(input []byte, commentMarker []byte) []byte { - lines := bytes.Split(input, []byte("\n")) - var output []byte - for _, currentLine := range lines { - var commentIndex = bytes.Index(currentLine, commentMarker) - if commentIndex == -1 { - output = append(output, currentLine...) - } else { - output = append(output, currentLine[:commentIndex]...) - } - output = append(output, []byte("\n")...) - } - return output -} - -// GetNameserversAsCIDR returns nameservers (if any) listed in -// /etc/resolv.conf as CIDR blocks (e.g., "1.2.3.4/32") -// This function's output is intended for net.ParseCIDR -func GetNameserversAsCIDR(resolvConf []byte) []string { - var parsedResolvConf = StripComments(resolvConf, []byte("#")) - nameservers := []string{} - re := regexp.MustCompile(`^\s*nameserver\s*(([0-9]+\.){3}([0-9]+))\s*$`) - for _, line := range bytes.Split(parsedResolvConf, []byte("\n")) { - var ns = re.FindSubmatch(line) - if len(ns) > 0 { - nameservers = append(nameservers, string(ns[1])+"/32") - } - } - - return nameservers -} - -// FIXME: Change this not to receive default value as parameter -func ParseHost(defaultHost string, defaultPort int, defaultUnix, addr string) (string, error) { - var ( - proto string - host string - port int - ) - addr = strings.TrimSpace(addr) - switch { - case strings.HasPrefix(addr, "unix://"): - proto = "unix" - addr = strings.TrimPrefix(addr, "unix://") - if addr == "" { - addr = defaultUnix - } - case strings.HasPrefix(addr, "tcp://"): - proto = "tcp" - addr = strings.TrimPrefix(addr, "tcp://") - case addr == "": - proto = "unix" - addr = defaultUnix - default: - if strings.Contains(addr, "://") { - return "", fmt.Errorf("Invalid bind address protocol: %s", addr) - } - proto = "tcp" - } - - if proto != "unix" && strings.Contains(addr, ":") { - hostParts := strings.Split(addr, ":") - if len(hostParts) != 2 { - return "", fmt.Errorf("Invalid bind address format: %s", addr) - } - if hostParts[0] != "" { - host = hostParts[0] - } else { - host = defaultHost - } - - if p, err := strconv.Atoi(hostParts[1]); err == nil && p != 0 { - port = p - } else { - port = defaultPort - } - - } else { - host = addr - port = defaultPort - } - if proto == "unix" { - return fmt.Sprintf("%s://%s", proto, host), nil - } - return fmt.Sprintf("%s://%s:%d", proto, host, port), nil -} - -func GetReleaseVersion() string { - resp, err := http.Get("http://get.docker.io/latest") - if err != nil { - return "" - } - defer resp.Body.Close() - if resp.ContentLength > 24 || resp.StatusCode != 200 { - return "" - } - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return "" - } - return strings.TrimSpace(string(body)) -} - -// Get a repos name and returns the right reposName + tag -// The tag can be confusing because of a port in a repository name. -// Ex: localhost.localdomain:5000/samalba/hipache:latest -func ParseRepositoryTag(repos string) (string, string) { - n := strings.LastIndex(repos, ":") - if n < 0 { - return repos, "" - } - if tag := repos[n+1:]; !strings.Contains(tag, "/") { - return repos[:n], tag - } - return repos, "" -} - -type User struct { - Uid string // user id - Gid string // primary group id - Username string - Name string - HomeDir string -} - -// UserLookup check if the given username or uid is present in /etc/passwd -// and returns the user struct. -// If the username is not found, an error is returned. -func UserLookup(uid string) (*User, error) { - file, err := ioutil.ReadFile("/etc/passwd") - if err != nil { - return nil, err - } - for _, line := range strings.Split(string(file), "\n") { - data := strings.Split(line, ":") - if len(data) > 5 && (data[0] == uid || data[2] == uid) { - return &User{ - Uid: data[2], - Gid: data[3], - Username: data[0], - Name: data[4], - HomeDir: data[5], - }, nil - } - } - return nil, fmt.Errorf("User not found in /etc/passwd") -} - -type DependencyGraph struct { - nodes map[string]*DependencyNode -} - -type DependencyNode struct { - id string - deps map[*DependencyNode]bool -} - -func NewDependencyGraph() DependencyGraph { - return DependencyGraph{ - nodes: map[string]*DependencyNode{}, - } -} - -func (graph *DependencyGraph) addNode(node *DependencyNode) string { - if graph.nodes[node.id] == nil { - graph.nodes[node.id] = node - } - return node.id -} - -func (graph *DependencyGraph) NewNode(id string) string { - if graph.nodes[id] != nil { - return id - } - nd := &DependencyNode{ - id: id, - deps: map[*DependencyNode]bool{}, - } - graph.addNode(nd) - return id -} - -func (graph *DependencyGraph) AddDependency(node, to string) error { - if graph.nodes[node] == nil { - return fmt.Errorf("Node %s does not belong to this graph", node) - } - - if graph.nodes[to] == nil { - return fmt.Errorf("Node %s does not belong to this graph", to) - } - - if node == to { - return fmt.Errorf("Dependency loops are forbidden!") - } - - graph.nodes[node].addDependency(graph.nodes[to]) - return nil -} - -func (node *DependencyNode) addDependency(to *DependencyNode) bool { - node.deps[to] = true - return node.deps[to] -} - -func (node *DependencyNode) Degree() int { - return len(node.deps) -} - -// The magic happens here :: -func (graph *DependencyGraph) GenerateTraversalMap() ([][]string, error) { - Debugf("Generating traversal map. Nodes: %d", len(graph.nodes)) - result := [][]string{} - processed := map[*DependencyNode]bool{} - // As long as we haven't processed all nodes... - for len(processed) < len(graph.nodes) { - // Use a temporary buffer for processed nodes, otherwise - // nodes that depend on each other could end up in the same round. - tmpProcessed := []*DependencyNode{} - for _, node := range graph.nodes { - // If the node has more dependencies than what we have cleared, - // it won't be valid for this round. - if node.Degree() > len(processed) { - continue - } - // If it's already processed, get to the next one - if processed[node] { - continue - } - // It's not been processed yet and has 0 deps. Add it! - // (this is a shortcut for what we're doing below) - if node.Degree() == 0 { - tmpProcessed = append(tmpProcessed, node) - continue - } - // If at least one dep hasn't been processed yet, we can't - // add it. - ok := true - for dep := range node.deps { - if !processed[dep] { - ok = false - break - } - } - // All deps have already been processed. Add it! - if ok { - tmpProcessed = append(tmpProcessed, node) - } - } - Debugf("Round %d: found %d available nodes", len(result), len(tmpProcessed)) - // If no progress has been made this round, - // that means we have circular dependencies. - if len(tmpProcessed) == 0 { - return nil, fmt.Errorf("Could not find a solution to this dependency graph") - } - round := []string{} - for _, nd := range tmpProcessed { - round = append(round, nd.id) - processed[nd] = true - } - result = append(result, round) - } - return result, nil -} - -// An StatusError reports an unsuccessful exit by a command. -type StatusError struct { - Status string - StatusCode int -} - -func (e *StatusError) Error() string { - return fmt.Sprintf("Status: %s, Code: %d", e.Status, e.StatusCode) -} - -func quote(word string, buf *bytes.Buffer) { - // Bail out early for "simple" strings - if word != "" && !strings.ContainsAny(word, "\\'\"`${[|&;<>()~*?! \t\n") { - buf.WriteString(word) - return - } - - buf.WriteString("'") - - for i := 0; i < len(word); i++ { - b := word[i] - if b == '\'' { - // Replace literal ' with a close ', a \', and a open ' - buf.WriteString("'\\''") - } else { - buf.WriteByte(b) - } - } - - buf.WriteString("'") -} - -// Take a list of strings and escape them so they will be handled right -// when passed as arguments to an program via a shell -func ShellQuoteArguments(args []string) string { - var buf bytes.Buffer - for i, arg := range args { - if i != 0 { - buf.WriteByte(' ') - } - quote(arg, &buf) - } - return buf.String() -} - -func IsClosedError(err error) bool { - /* This comparison is ugly, but unfortunately, net.go doesn't export errClosing. - * See: - * http://golang.org/src/pkg/net/net.go - * https://code.google.com/p/go/issues/detail?id=4337 - * https://groups.google.com/forum/#!msg/golang-nuts/0_aaCvBmOcM/SptmDyX1XJMJ - */ - return strings.HasSuffix(err.Error(), "use of closed network connection") -} - -func PartParser(template, data string) (map[string]string, error) { - // ip:public:private - var ( - templateParts = strings.Split(template, ":") - parts = strings.Split(data, ":") - out = make(map[string]string, len(templateParts)) - ) - if len(parts) != len(templateParts) { - return nil, fmt.Errorf("Invalid format to parse. %s should match template %s", data, template) - } - - for i, t := range templateParts { - value := "" - if len(parts) > i { - value = parts[i] - } - out[t] = value - } - return out, nil -} - -var globalTestID string - -// TestDirectory creates a new temporary directory and returns its path. -// The contents of directory at path `templateDir` is copied into the -// new directory. -func TestDirectory(templateDir string) (dir string, err error) { - if globalTestID == "" { - globalTestID = RandomString()[:4] - } - prefix := fmt.Sprintf("docker-test%s-%s-", globalTestID, GetCallerName(2)) - if prefix == "" { - prefix = "docker-test-" - } - dir, err = ioutil.TempDir("", prefix) - if err = os.Remove(dir); err != nil { - return - } - if templateDir != "" { - if err = CopyDirectory(templateDir, dir); err != nil { - return - } - } - return -} - -// GetCallerName introspects the call stack and returns the name of the -// function `depth` levels down in the stack. -func GetCallerName(depth int) string { - // Use the caller function name as a prefix. - // This helps trace temp directories back to their test. - pc, _, _, _ := runtime.Caller(depth + 1) - callerLongName := runtime.FuncForPC(pc).Name() - parts := strings.Split(callerLongName, ".") - callerShortName := parts[len(parts)-1] - return callerShortName -} - -func CopyFile(src, dst string) (int64, error) { - if src == dst { - return 0, nil - } - sf, err := os.Open(src) - if err != nil { - return 0, err - } - defer sf.Close() - if err := os.Remove(dst); err != nil && !os.IsNotExist(err) { - return 0, err - } - df, err := os.Create(dst) - if err != nil { - return 0, err - } - defer df.Close() - return io.Copy(df, sf) -}