diff --git a/Makefile b/Makefile
index f48f4c9..4a1e9cb 100644
--- a/Makefile
+++ b/Makefile
@@ -1,4 +1,7 @@
-all: clean init gibbon tarball
+all: clean init gibbon agent tarball
+
+agent:
+ cd gibbonagent; GOOS=linux GOARCH=arm go build -o agent; go build -o agent.amd64
init:
mkdir -p output
@@ -11,12 +14,16 @@ gibbon:
# cd gibbonapi && go build
tarball: init gibbon
- cp control.sh output
- cp -r etc output
+ cp gibbond/control.sh output
+ mkdir -p output/etc
+ cp gibbond/etc/conf_product.json output/etc/conf.json
+ cp gibbond/etc/log.xml output/etc/log.xml
+ cp gibbond/etc/supervisord.conf output/etc/
cp misc/setupenv.sh output
tar -czf gibbon.tgz output
clean:
go clean
rm -rf gibbon gibbon.tgz output
-
+ rm -f test/test
+ rm -f gibbonagent/agent gibbonagent/agent.amd64
diff --git a/api/gibbonapi.go b/api/gibbonapi.go
deleted file mode 100644
index 3a38192..0000000
--- a/api/gibbonapi.go
+++ /dev/null
@@ -1,262 +0,0 @@
-package api
-
-import (
- "encoding/json"
- "fmt"
- log "github.com/cihub/seelog"
- "io/ioutil"
- "net/http"
- "os"
- "time"
-
- "github.com/chenyf/gibbon/comet"
- "github.com/chenyf/gibbon/devcenter"
-)
-
-type CommandRequest struct {
- Uid string `json:"uid"`
- Cmd string `json:"cmd"`
-}
-
-type CommandResponse struct {
- Status int `json:"status"`
- Error string `json:"error"`
-}
-
-type RouterInfo struct {
- Rid string `json:"rid"`
- Rname string `json:"rname"`
-}
-
-type ResponseRouterList struct {
- Status int `json:"status"`
- Descr string `json:"descr"`
- List []RouterInfo `json:"list"`
-}
-
-func checkAuthz(uid string, devid string) bool {
- log.Tracef("checkAuthz")
-
- devices, err := devcenter.GetDevices(uid, devcenter.DEV_ROUTER)
- if err != nil {
- log.Errorf("GetDevices failed: %s", err.Error())
- return false
- }
-
- for _, dev := range devices {
- if devid == dev.Id {
- return true
- }
- }
- return false
-}
-
-func getStatus(w http.ResponseWriter, r *http.Request) {
- size := comet.DevMap.Size()
- fmt.Fprintf(w, "total register device: %d\n", size)
-}
-
-func postRouterCommand(w http.ResponseWriter, r *http.Request) {
- log.Tracef("postRouterCommand")
- log.Debugf("Request from RemoterAddr: %s", r.RemoteAddr)
- var (
- uid string
- rid string
- response CommandResponse
- client *comet.Client
- body []byte
- err error
- cmdRequest CommandRequest
- bCmd []byte
- reply chan *comet.Message
- )
- response.Status = 1
- if r.Method != "POST" {
- response.Error = "must using 'POST' method\n"
- goto resp
- }
- r.ParseForm()
- rid = r.FormValue("rid")
- if rid == "" {
- response.Error = "missing 'rid'"
- goto resp
- }
-
- uid = r.FormValue("uid")
- if uid == "" {
- response.Error = "missing 'uid'"
- goto resp
- }
-
- if !checkAuthz(uid, rid) {
- log.Warnf("auth failed. uid: %s, rid: %s", uid, rid)
- response.Error = "authorization failed"
- goto resp
- }
-
- /*
- uid := r.FormValue("uid")
- if uid == "" { fmt.Fprintf(w, "missing 'uid'\n"); return; }
- tid := r.FormValue("tid")
- if tid == "" { fmt.Fprintf(w, "missing 'tid'\n"); return; }
- sign := r.FormValue("sign")
- if sign == "" { fmt.Fprintf(w, "missing 'sign'\n"); return; }
- tm := r.FormValue("tm")
- if tm == "" { fmt.Fprintf(w, "missing 'tm'\n"); return; }
- pmtt := r.FormValue("pmtt")
- if pmtt == "" { fmt.Fprintf(w, "missing 'pmtt'\n"); return; }
- query := map[string]string {
- "uid" : uid,
- "rid" : rid,
- "tid" : tid,
- "src" : "letv",
- "tm" : tm,
- "pmtt" : pmtt,
- }
- path := "/router/command"
- mysign := sign(path, query)
- if mysign != sign {
- response.Error = "sign valication failed"
- b, _ := json.Marshal(response)
- fmt.Fprintf(w, string(b))
- return
- }
- */
-
- if r.Body == nil {
- response.Error = "missing POST data"
- goto resp
- }
-
- if !comet.DevMap.Check(rid) {
- response.Error = fmt.Sprintf("device (%s) offline", rid)
- goto resp
- }
- client = comet.DevMap.Get(rid).(*comet.Client)
-
- body, err = ioutil.ReadAll(r.Body)
- r.Body.Close()
- if err != nil {
- response.Error = "invalid POST body"
- goto resp
- }
-
- cmdRequest = CommandRequest{
- Uid: uid,
- Cmd: string(body),
- }
-
- bCmd, _ = json.Marshal(cmdRequest)
- reply = make(chan *comet.Message)
- client.SendMessage(comet.MSG_REQUEST, bCmd, reply)
- select {
- case msg := <-reply:
- w.Write(msg.Data)
- case <-time.After(10 * time.Second):
- response.Error = "recv response timeout"
- goto resp
- }
- return
-
-resp:
- b, _ := json.Marshal(response)
- log.Debugf("postRouterCommand write: %s", string(b))
- w.Write(b)
-}
-
-func getRouterList(w http.ResponseWriter, r *http.Request) {
- log.Tracef("getRouterList")
- var (
- uid string
- tid string
- response ResponseRouterList
- router RouterInfo
- devices map[string]devcenter.Device
- err error
- )
-
- response.Status = -1
- if r.Method != "GET" {
- response.Descr = "must using 'GET' method\n"
- goto resp
- }
- r.ParseForm()
-
- uid = r.FormValue("uid")
- if uid == "" {
- response.Descr = "missing 'uid'"
- goto resp
- }
-
- tid = r.FormValue("tid")
- if tid == "" {
- response.Descr = "missing 'tid'"
- goto resp
- }
-
- devices, err = devcenter.GetDevices(uid, devcenter.DEV_ROUTER)
- if err != nil {
- log.Errorf("GetDevices failed: %s", err.Error())
- response.Descr = err.Error()
- goto resp
- }
-
- for _, dev := range devices {
- router = RouterInfo{
- Rid: dev.Id,
- Rname: dev.Title,
- }
- response.List = append(response.List, router)
- }
-
- //router = RouterInfo{
- // Rid: "c80e774a1e73",
- // Rname: "router1",
- //}
- //response.List = append(response.List, router)
-
- response.Status = 0
- response.Descr = "OK"
-
-resp:
- b, _ := json.Marshal(response)
- log.Debugf("getRoutelist write: %s", string(b))
- w.Write(b)
-}
-
-func getCommand(w http.ResponseWriter, r *http.Request) {
- log.Tracef("getCommand")
- r.ParseForm()
- devid := r.FormValue("devid")
- if devid == "" {
- fmt.Fprintf(w, "missing devid\n")
- return
- }
- if !comet.DevMap.Check(devid) {
- fmt.Fprintf(w, "(%s) not register\n", devid)
- return
- }
- cmd := r.FormValue("cmd")
- client := comet.DevMap.Get(devid).(*comet.Client)
- reply := make(chan *comet.Message)
- client.SendMessage(comet.MSG_REQUEST, []byte(cmd), reply)
- select {
- case msg := <-reply:
- fmt.Fprintf(w, "recv reply (%s)\n", string(msg.Data))
- case <-time.After(10 * time.Second):
- fmt.Fprintf(w, "recv timeout\n")
- }
-}
-
-func StartHttp(addr string) {
- log.Infof("Starting HTTP server on %s", addr)
- http.HandleFunc("/router/command", postRouterCommand)
- http.HandleFunc("/router/list", getRouterList)
- http.HandleFunc("/command", getCommand)
- http.HandleFunc("/status", getStatus)
- err := http.ListenAndServe(addr, nil)
- if err != nil {
- log.Criticalf("http listen: ", err)
- os.Exit(1)
- }
-}
diff --git a/cloud/def.go b/cloud/def.go
new file mode 100644
index 0000000..7be9a2b
--- /dev/null
+++ b/cloud/def.go
@@ -0,0 +1,16 @@
+package cloud
+
+const (
+ ERR_NOERROR = 10000
+ ERR_CMD_TIMEOUT = 20000
+)
+
+type ApiStatus struct {
+ ErrNo int `json:"errno"`
+ ErrMsg string `json:"errmsg,omitempty"`
+}
+
+type ApiResponse struct {
+ ApiStatus
+ Data interface{} `json:"data,omitempty"`
+}
diff --git a/comet/message.go b/comet/message.go
index c6eb2db..4ee60c7 100644
--- a/comet/message.go
+++ b/comet/message.go
@@ -23,13 +23,14 @@ const (
)
const (
- MSG_HEARTBEAT = uint8(0)
- MSG_REGISTER = uint8(1)
- MSG_REGISTER_REPLY = uint8(2)
- MSG_REQUEST = uint8(3)
- MSG_REQUEST_REPLY = uint8(4)
- MSG_ROUTER_COMMAND = uint8(10)
- MSG_ROUTER_COMMAND_REPLY = uint8(11)
+ MSG_HEARTBEAT = uint8(0)
+ MSG_REGISTER = uint8(1)
+ MSG_REGISTER_REPLY = uint8(2)
+ MSG_ROUTER_COMMAND = uint8(3)
+ MSG_ROUTER_COMMAND_REPLY = uint8(4)
+
+// MSG_REQUEST = uint8(3)
+// MSG_REQUEST_REPLY = uint8(4)
)
// msg to byte
@@ -54,18 +55,25 @@ type RegisterMessage struct {
}
type RegisterReplyMessage struct {
+ HeartbeatInterval int `json:"bt_interval"`
+}
+
+type CommandRequest struct {
+ Uid string `json:"uid"`
+ Cmd string `json:"cmd"`
}
type RouterCommandMessage struct {
- Uid string `json:"uid"`
- Cmd struct {
- Forward string `json:"forward"`
- } `json:"cmd"`
+ Uid string `json:"uid"`
+ Cmd string `json:"cmd"`
}
-type RouterCommandReplyMessage struct {
- Status int `json:"status"`
- Descr string `json:"descr"`
- Result string `json:"result"`
+type RouterCommand struct {
+ Forward string `json:"forward"`
}
+type RouterCommandReplyMessage struct {
+ Status int `json:"status"`
+ Descr string `json:"descr"`
+ Result string `json:"result"`
+}
diff --git a/comet/server.go b/comet/server.go
index 7993cf0..8ab15ca 100644
--- a/comet/server.go
+++ b/comet/server.go
@@ -1,31 +1,35 @@
package comet
import (
- //"log"
log "github.com/cihub/seelog"
"io"
"net"
"sync"
+ "sync/atomic"
"time"
//"strings"
- "github.com/chenyf/gibbon/utils/safemap"
+ "github.com/chenyf/gibbon/storage"
+ "github.com/chenyf/push/utils/safemap"
)
type MsgHandler func(*Client, *Header, []byte) int
type Server struct {
- exitCh chan bool
- waitGroup *sync.WaitGroup
- funcMap map[uint8]MsgHandler
- acceptTimeout time.Duration
- readTimeout time.Duration
- writeTimeout time.Duration
- heartbeatTimeout time.Duration
- maxMsgLen uint32
+ Name string // unique name of this server
+ exitCh chan bool
+ waitGroup *sync.WaitGroup
+ funcMap map[uint8]MsgHandler
+ acceptTimeout time.Duration
+ readTimeout time.Duration
+ writeTimeout time.Duration
+ heartbeatInterval time.Duration
+ heartbeatTimeout time.Duration
+ maxMsgLen uint32
}
func NewServer() *Server {
return &Server{
+ Name: "gibbon",
exitCh: make(chan bool),
waitGroup: &sync.WaitGroup{},
funcMap: make(map[uint8]MsgHandler),
@@ -38,25 +42,21 @@ func NewServer() *Server {
}
type Client struct {
- devId string
+ DevId string
ctrl chan bool
- MsgOut chan *Pack
+ MsgOut chan *Message
WaitingChannels map[uint32]chan *Message
NextSeqId uint32
LastAlive time.Time
+ RegistTime time.Time
}
-type Pack struct {
- msg *Message
- client *Client
- reply chan *Message
-}
-
-func (client *Client) SendMessage(msgType uint8, body []byte, reply chan *Message) {
+func (this *Client) SendMessage(msgType uint8, body []byte, reply chan *Message) (seq uint32) {
+ seq = this.NextSeq()
header := Header{
Type: msgType,
Ver: 0,
- Seq: 0,
+ Seq: seq,
Len: uint32(len(body)),
}
msg := &Message{
@@ -64,47 +64,55 @@ func (client *Client) SendMessage(msgType uint8, body []byte, reply chan *Messag
Data: body,
}
- pack := &Pack{
- msg: msg,
- client: client,
- reply: reply,
+ // add reply channel
+ if reply != nil {
+ this.WaitingChannels[seq] = reply
}
- client.MsgOut <- pack
+ this.MsgOut <- msg
+ return seq
+}
+
+func (this *Client) MsgTimeout(seq uint32) {
+ delete(this.WaitingChannels, seq)
+}
+
+func (this *Client) NextSeq() uint32 {
+ return atomic.AddUint32(&this.NextSeqId, 1)
}
var (
DevMap *safemap.SafeMap = safemap.NewSafeMap()
)
-func InitClient(conn *net.TCPConn, devid string) *Client {
+func (this *Server) InitClient(conn *net.TCPConn, devid string) *Client {
+ // save the client device Id to storage
+ if err := storage.Instance.AddDevice(this.Name, devid); err != nil {
+ log.Infof("failed to put device %s into redis:", devid, err)
+ return nil
+ }
+
client := &Client{
- devId: devid,
+ DevId: devid,
ctrl: make(chan bool),
- MsgOut: make(chan *Pack, 100),
+ MsgOut: make(chan *Message, 100),
WaitingChannels: make(map[uint32]chan *Message),
- NextSeqId: 1,
+ NextSeqId: 0,
LastAlive: time.Now(),
+ RegistTime: time.Now(),
}
DevMap.Set(devid, client)
go func() {
- log.Tracef("start send routine for %s", conn.RemoteAddr().String())
+ log.Tracef("start send routine for [%s] [%s]", devid, conn.RemoteAddr().String())
for {
select {
- case pack := <-client.MsgOut:
- seqid := pack.client.NextSeqId
- pack.msg.Header.Seq = seqid
- b, _ := pack.msg.Header.Serialize()
+ case msg := <-client.MsgOut:
+ b, _ := msg.Header.Serialize()
conn.Write(b)
- conn.Write(pack.msg.Data)
- log.Infof("send msg ok, (%s)", string(pack.msg.Data))
- pack.client.NextSeqId += 1
- // add reply channel
- if pack.reply != nil {
- pack.client.WaitingChannels[seqid] = pack.reply
- }
+ conn.Write(msg.Data)
+ log.Infof("send msg to [%s]. seq: %d. body: (%s)", devid, msg.Header.Seq, string(msg.Data))
case <-client.ctrl:
- log.Tracef("leave send routine for %s", conn.RemoteAddr().String())
+ log.Tracef("leave send routine for [%s] [%s]", devid, conn.RemoteAddr().String())
return
}
}
@@ -112,24 +120,29 @@ func InitClient(conn *net.TCPConn, devid string) *Client {
return client
}
-func CloseClient(client *Client) {
+func (this *Server) CloseClient(client *Client) {
client.ctrl <- true
- DevMap.Delete(client.devId)
+ if err := storage.Instance.RemoveDevice(this.Name, client.DevId); err != nil {
+ log.Errorf("failed to remove device %s from redis:", client.DevId, err)
+ }
+ DevMap.Delete(client.DevId)
}
func handleReply(client *Client, header *Header, body []byte) int {
- log.Debugf("Received reply: %s", body)
+ log.Debugf("Received reply from [%s]. seq: %d.", client.DevId, header.Seq)
ch, ok := client.WaitingChannels[header.Seq]
if ok {
//remove waiting channel from map
delete(client.WaitingChannels, header.Seq)
ch <- &Message{Header: *header, Data: body}
+ } else {
+ log.Warnf("no waiting channel for seq: %d, device: %s", header.Seq, client.DevId)
}
return 0
}
func handleHeartbeat(client *Client, header *Header, body []byte) int {
- log.Debugf("Heartbeat from devid: %s", client.devId)
+ log.Debugf("Heartbeat from devid: %s", client.DevId)
client.LastAlive = time.Now()
return 0
}
@@ -142,6 +155,10 @@ func (this *Server) SetReadTimeout(timeout time.Duration) {
this.readTimeout = timeout
}
+func (this *Server) SetHeartbeatInterval(timeout time.Duration) {
+ this.heartbeatInterval = timeout
+}
+
func (this *Server) SetHeartbeatTimeout(timeout time.Duration) {
this.heartbeatTimeout = timeout
}
@@ -162,7 +179,26 @@ func (this *Server) Init(addr string) (*net.TCPListener, error) {
return nil, err
}
this.funcMap[MSG_HEARTBEAT] = handleHeartbeat
- this.funcMap[MSG_REQUEST_REPLY] = handleReply
+ this.funcMap[MSG_ROUTER_COMMAND_REPLY] = handleReply
+
+ if err := storage.Instance.InitDevices(this.Name); err != nil {
+ log.Errorf("failed to InitDevices: %s", err.Error())
+ return nil, err
+ }
+
+ // keep the data of this node not expired on redis
+ go func() {
+ for {
+ select {
+ case <-this.exitCh:
+ log.Infof("exiting storage refreshing routine")
+ return
+ case <-time.After(10 * time.Second):
+ storage.Instance.RefreshDevices(this.Name, 30)
+ }
+ }
+ }()
+
return l, nil
}
@@ -174,9 +210,13 @@ func (this *Server) Run(listener *net.TCPListener) {
}()
//go this.dealSpamConn()
- log.Infof("Starting comet server on: %s\n", listener.Addr().String())
- log.Infof("Comet server settings: readtimeout [%d], accepttimeout [%d], heartbeattimeout [%d]\n",
- this.readTimeout, this.acceptTimeout, this.heartbeatTimeout)
+ log.Infof("Starting comet server on: %s", listener.Addr().String())
+ log.Infof("Comet server settings: readtimeout [%dms], accepttimeout [%dms], heartbeatinterval [%dms] heartbeattimeout [%dms]",
+ this.readTimeout/time.Millisecond,
+ this.acceptTimeout/time.Millisecond,
+ this.heartbeatInterval/time.Millisecond,
+ this.heartbeatTimeout/time.Millisecond)
+
for {
select {
case <-this.exitCh:
@@ -191,7 +231,7 @@ func (this *Server) Run(listener *net.TCPListener) {
if e, ok := err.(*net.OpError); ok && e.Timeout() {
continue
}
- log.Errorf("accept failed: %v\n", err)
+ log.Errorf("accept failed: %v", err)
continue
}
/*
@@ -213,7 +253,7 @@ func (this *Server) Stop() {
log.Infof("comet server stopped")
}
-func waitRegister(conn *net.TCPConn) *Client {
+func (this *Server) waitRegister(conn *net.TCPConn) *Client {
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
buf := make([]byte, 10)
n, err := io.ReadFull(conn, buf)
@@ -251,20 +291,27 @@ func waitRegister(conn *net.TCPConn) *Client {
conn.Close()
return nil
}
- client := InitClient(conn, devid)
+ client := this.InitClient(conn, devid)
return client
}
// handle a TCP connection
func (this *Server) handleConnection(conn *net.TCPConn) {
- log.Debugf("accept connection (%v)", conn)
- log.Infof("New conn accepted from %s\n", conn.RemoteAddr().String())
+ log.Infof("New conn accepted from %s", conn.RemoteAddr().String())
// handle register first
- client := waitRegister(conn)
+ client := this.waitRegister(conn)
if client == nil {
return
}
+ var (
+ readHeader = true
+ bytesRead = 0
+ data []byte
+ header Header
+ startTime time.Time
+ )
+
for {
/*
select {
@@ -275,49 +322,62 @@ func (this *Server) handleConnection(conn *net.TCPConn) {
}
*/
- var (
- data []byte = nil
- )
-
now := time.Now()
if now.After(client.LastAlive.Add(this.heartbeatTimeout)) {
- log.Warnf("heartbeat timeout")
+ log.Warnf("Device [%s] heartbeat timeout", client.DevId)
break
}
- //conn.SetReadDeadline(time.Now().Add(this.readTimeout))
conn.SetReadDeadline(now.Add(10 * time.Second))
- buf := make([]byte, HEADER_SIZE)
- n, err := io.ReadFull(conn, buf)
- if err != nil {
- if e, ok := err.(*net.OpError); ok && e.Timeout() {
- //log.Printf("read timeout, %d", n)
- continue
+ if readHeader {
+ buf := make([]byte, HEADER_SIZE)
+ n, err := io.ReadFull(conn, buf)
+ if err != nil {
+ if e, ok := err.(*net.OpError); ok && e.Timeout() {
+ //log.Printf("read timeout, %d", n)
+ continue
+ }
+ log.Errorf("readfull failed (%v)", err)
+ break
+ }
+ if err := header.Deserialize(buf[0:n]); err != nil {
+ log.Errorf("Deserialize header failed: %s", err.Error())
+ break
}
- log.Errorf("readfull failed (%v)", err)
- break
- }
-
- var header Header
- if err := header.Deserialize(buf[0:n]); err != nil {
- log.Errorf("Deserialize header failed: %s", err.Error())
- break
- }
- if header.Len > MAX_BODY_LEN {
- log.Warnf("Msg body too big: %d", header.Len)
- break
- }
+ if header.Len > MAX_BODY_LEN {
+ log.Warnf("Msg body too big from device [%s]: %d", header.Len, client.DevId)
+ break
+ }
- if header.Len > 0 {
- data = make([]byte, header.Len)
- if _, err := io.ReadFull(conn, data); err != nil {
+ if header.Len > 0 {
+ data = make([]byte, header.Len)
+ readHeader = false
+ bytesRead = 0
+ startTime = time.Now()
+ continue
+ }
+ } else {
+ n, err := conn.Read(data[bytesRead:])
+ if err != nil {
if e, ok := err.(*net.OpError); ok && e.Timeout() {
- continue
+ if now.After(startTime.Add(this.readTimeout)) {
+ log.Infof("read packet data timeout [%s]", client.DevId)
+ break
+ }
+ } else {
+ log.Infof("read from client [%s] failed: (%s)", client.DevId, err.Error())
+ break
}
- log.Errorf("read from client failed: (%v)", err)
- break
}
+ if n > 0 {
+ bytesRead += n
+ }
+ if uint32(bytesRead) < header.Len {
+ continue
+ }
+ readHeader = true
+ //log.Debugf("%s: body (%s)", client.DevId, data)
}
handler, ok := this.funcMap[header.Type]
@@ -329,7 +389,8 @@ func (this *Server) handleConnection(conn *net.TCPConn) {
}
}
// don't use defer to improve performance
- log.Infof("close connection %s\n", conn.RemoteAddr().String())
- CloseClient(client)
+ log.Infof("closing device [%s] [%s]", client.DevId, conn.RemoteAddr().String())
+ this.CloseClient(client)
+ log.Infof("close connection [%s]", conn.RemoteAddr().String())
conn.Close()
}
diff --git a/conf/conf.go b/conf/conf.go
index fe8fb11..856c916 100644
--- a/conf/conf.go
+++ b/conf/conf.go
@@ -6,13 +6,21 @@ import (
)
type ConfigStruct struct {
- Comet string `json:"comet"`
- AcceptTimeout int `json:"accept_timeout"`
- ReadTimeout int `json:"read_timeout"`
- HeartbeatTimeout int `json:"heartbeat_timeout"`
-
- Web string `json:"web"`
- DevCenter string `json:"devcenter"`
+ Comet string `json:"comet"`
+ AcceptTimeout int `json:"accept_timeout"`
+ ReadTimeout int `json:"read_timeout"`
+ HeartbeatInterval int `json:"heartbeat_interval"`
+ HeartbeatTimeout int `json:"heartbeat_timeout"`
+ Rabbit struct {
+ Enable bool `json:"enable"`
+ Uri string `json:"uri"`
+ } `json:"rabbit"`
+ Redis struct {
+ Server string `json:"server"`
+ Pass string `json:"pass"`
+ PoolSize int `json:"poolsize"`
+ Retry int `json:"retry"`
+ } `json:"redis"`
}
var (
diff --git a/devcenter/gibbonapi.go b/devcenter/gibbonapi.go
deleted file mode 100644
index 16c51e3..0000000
--- a/devcenter/gibbonapi.go
+++ /dev/null
@@ -1,58 +0,0 @@
-package devcenter
-
-import (
- "encoding/json"
- "errors"
- "fmt"
- log "github.com/cihub/seelog"
- "io/ioutil"
- "net/http"
-
- "github.com/chenyf/gibbon/conf"
-)
-
-const (
- DEV_ROUTER = 3
-)
-
-type Device struct {
- Id string `json:"id"`
- Type int `json:"type"`
- Title string `json:"title"`
-}
-
-type devicesResult struct {
- Errno int `json:"errno"`
- Errmsg string `json:"errmsg"`
- Data struct {
- UserOpenId int `json:"userOpenId"`
- DeviceList map[string]Device `json:"device"`
- } `json:"data"`
-}
-
-func GetDevices(uid string, devType int) (map[string]Device, error) {
- log.Tracef("GetDevices")
- url := fmt.Sprintf("http://%s/api/v1/device/bind/?user_id=%s&type=%d", conf.Config.DevCenter, uid, devType)
- res, err := http.Get(url)
- if err != nil {
- return nil, err
- }
- body, err := ioutil.ReadAll(res.Body)
- if err != nil {
- return nil, err
- }
-
- log.Debugf("Got response from device center: %s", body)
-
- var result devicesResult
- err = json.Unmarshal(body, &result)
- if err != nil {
- return nil, err
- }
-
- if result.Errno != 10000 {
- return nil, errors.New(result.Errmsg)
- }
-
- return result.Data.DeviceList, nil
-}
diff --git a/etc/conf.json b/etc/conf.json
deleted file mode 100644
index df59f2d..0000000
--- a/etc/conf.json
+++ /dev/null
@@ -1,9 +0,0 @@
-{
- "comet" : "0.0.0.0:10000",
- "accept_timeout" : 2,
- "read_timeout" : 60,
- "heartbeat_timeout" : 90,
-
- "web" : "0.0.0.0:9090",
- "devcenter" : "10.154.156.90"
-}
diff --git a/gibbonagent/README.md b/gibbonagent/README.md
deleted file mode 100644
index e69de29..0000000
diff --git a/gibbonagent/conf.json b/gibbonagent/conf.json
index 7ace3b5..5fc6825 100644
--- a/gibbonagent/conf.json
+++ b/gibbonagent/conf.json
@@ -1,3 +1,3 @@
{
-"address":"111.206.210.49:10000;111.206.210.50:10000"
+"address":"10.154.156.121:10000"
}
diff --git a/gibbonagent/conf_rdtest.json b/gibbonagent/conf_rdtest.json
new file mode 100644
index 0000000..f1d70bd
--- /dev/null
+++ b/gibbonagent/conf_rdtest.json
@@ -0,0 +1,3 @@
+{
+"address":"10.154.156.121:10000;111.206.210.50:10000"
+}
diff --git a/gibbonagent/gibbonagent.go b/gibbonagent/gibbonagent.go
index 5dec9f2..55f7eed 100644
--- a/gibbonagent/gibbonagent.go
+++ b/gibbonagent/gibbonagent.go
@@ -3,31 +3,50 @@ package main
import (
"bytes"
"encoding/json"
+ "flag"
"fmt"
//"io"
- "io/ioutil"
+ "github.com/chenyf/gibbon/comet"
log "github.com/cihub/seelog"
+ "io/ioutil"
"net"
"net/http"
"os"
+ "os/signal"
"strings"
- "time"
"sync"
"syscall"
- "os/signal"
- "github.com/chenyf/gibbon/comet"
+ "time"
+)
+
+const (
+ defaultLogConfig string = `
+
+
+
+
+
+
+
+
+
+`
+)
+
+var (
+ heartbeatInterval int = 110
)
type MsgHandler func(*Conn, *comet.Header, []byte) int
type Agent struct {
- done chan bool
- funcMap map[uint8]MsgHandler
+ done chan bool
+ funcMap map[uint8]MsgHandler
}
func NewAgent() *Agent {
agent := &Agent{
- done: make(chan bool),
+ done: make(chan bool),
funcMap: make(map[uint8]MsgHandler),
}
agent.funcMap[comet.MSG_REGISTER_REPLY] = handleRegisterReply
@@ -35,19 +54,19 @@ func NewAgent() *Agent {
return agent
}
-func (this *Agent)Run() {
+func (this *Agent) Run() {
var c *Conn = NewConn()
addSlice := strings.Split(Config.Address, ";")
for {
select {
- case <- this.done:
- log.Infof("agent quit")
- return
- default:
+ case <-this.done:
+ log.Infof("agent quit")
+ return
+ default:
}
if c.conn == nil {
if ok := c.Connect(addSlice[0]); !ok {
- time.Sleep(5*time.Second)
+ time.Sleep(10 * time.Second)
continue
}
log.Infof("connect ok")
@@ -56,12 +75,12 @@ func (this *Agent)Run() {
c.conn.SetReadDeadline(time.Now().Add(3 * time.Second))
n := c.Read()
if n < 0 {
- // failed
+ // failed
c.Close()
c = NewConn()
continue
} else if n > 0 {
- // need more data
+ // need more data
continue
}
// ok
@@ -79,7 +98,7 @@ func (this *Agent)Run() {
}
}
-func (this *Agent)Stop() {
+func (this *Agent) Stop() {
close(this.done)
}
@@ -89,6 +108,12 @@ func sendReply(c *Conn, msgType uint8, seq uint32, v interface{}) {
}
func handleRegisterReply(c *Conn, header *comet.Header, body []byte) int {
+ var reply comet.RegisterReplyMessage
+ err := json.Unmarshal(body, &reply)
+ if err != nil {
+ //TODO
+ }
+
return 0
}
@@ -97,17 +122,27 @@ func handleRouterCommand(c *Conn, header *comet.Header, body []byte) int {
var reply comet.RouterCommandReplyMessage
if err := json.Unmarshal(body, &msg); err != nil {
- reply.Status = 2000
+ reply.Status = -2000
+ reply.Descr = "Request message is not JSON"
+ sendReply(c, comet.MSG_ROUTER_COMMAND_REPLY, header.Seq, &reply)
+ return 0
+ }
+
+ var cmd comet.RouterCommand
+ if err := json.Unmarshal([]byte(msg.Cmd), &cmd); err != nil {
+ reply.Status = -2000
reply.Descr = "Request body is not JSON"
sendReply(c, comet.MSG_ROUTER_COMMAND_REPLY, header.Seq, &reply)
return 0
}
- if msg.Cmd.Forward == "" {
- reply.Status = 2001
+
+ if cmd.Forward == "" {
+ reply.Status = -2001
reply.Descr = "'forward' is empty"
sendReply(c, comet.MSG_ROUTER_COMMAND_REPLY, header.Seq, &reply)
return 0
}
+
client := &http.Client{
Transport: &http.Transport{
Dial: func(netw, addr string) (net.Conn, error) {
@@ -115,30 +150,38 @@ func handleRouterCommand(c *Conn, header *comet.Header, body []byte) int {
if err != nil {
return nil, err
}
- conn.SetDeadline(time.Now().Add(time.Second * 2))
+ conn.SetDeadline(time.Now().Add(time.Second * 5))
return conn, nil
},
- ResponseHeaderTimeout: time.Second * 2,
+ ResponseHeaderTimeout: time.Second * 5,
},
}
+ log.Debugf("Sending request to local service: %s", cmd.Forward)
+
response, err := client.Post("http://127.0.0.1:9999/",
"application/json;charset=utf-8",
- bytes.NewBuffer([]byte(msg.Cmd.Forward)))
+ bytes.NewBuffer([]byte(cmd.Forward)))
if err != nil {
- reply.Status = 2002
- reply.Descr = "Talk with local service failed"
+ reply.Status = -2002
+ errMsg := fmt.Sprintf("Talk with local service failed: %s", err.Error())
+ reply.Descr = errMsg
+ log.Errorf(errMsg)
sendReply(c, comet.MSG_ROUTER_COMMAND_REPLY, header.Seq, &reply)
return 0
}
+
result, err := ioutil.ReadAll(response.Body)
response.Body.Close()
if err != nil {
- reply.Status = 2003
- reply.Descr = "Local service response failed"
+ reply.Status = -2003
+ errMsg := fmt.Sprintf("Local service response failed: %s", err.Error())
+ reply.Descr = errMsg
+ log.Errorf(errMsg)
sendReply(c, comet.MSG_ROUTER_COMMAND_REPLY, header.Seq, &reply)
return 0
}
+
reply.Status = 0
reply.Descr = "OK"
reply.Result = string(result)
@@ -154,8 +197,8 @@ type Serverslice struct {
}
type Pack struct {
- msg *comet.Message
- reply chan *comet.Message
+ msg *comet.Message
+ reply chan *comet.Message
}
type Conn struct {
conn *net.TCPConn
@@ -163,23 +206,23 @@ type Conn struct {
outMsgs chan *Pack
readFlag int
nRead int
- headBuf []byte
+ headBuf []byte
dataBuf []byte
header comet.Header
}
func NewConn() *Conn {
return &Conn{
- conn : nil,
- done : make(chan bool),
- outMsgs : make(chan *Pack, 100),
- readFlag : 0,
- nRead : 0,
- headBuf : make([]byte, comet.HEADER_SIZE),
+ conn: nil,
+ done: make(chan bool),
+ outMsgs: make(chan *Pack, 100),
+ readFlag: 0,
+ nRead: 0,
+ headBuf: make([]byte, comet.HEADER_SIZE),
}
}
-func (this *Conn)Connect(service string) bool {
+func (this *Conn) Connect(service string) bool {
log.Infof("try to connect server address: %v\n", service)
tcpAddr, err := net.ResolveTCPAddr("tcp4", service)
if err != nil {
@@ -197,12 +240,12 @@ func (this *Conn)Connect(service string) bool {
return true
}
-func (this *Conn)Start() {
+func (this *Conn) Start() {
macAddr, _ := GetMac()
b := []byte(macAddr)
this.SendMessage(comet.MSG_REGISTER, 0, b, nil)
go func() {
- timer := time.NewTicker(110*time.Second)
+ timer := time.NewTicker(time.Duration(heartbeatInterval) * time.Second)
h := comet.Header{}
h.Type = comet.MSG_HEARTBEAT
heartbeat, _ := h.Serialize()
@@ -211,22 +254,22 @@ func (this *Conn)Start() {
case <-this.done:
return
case pack := <-this.outMsgs:
- //seqid := pack.client.nextSeq
+ //seqid := pack.client.nextSeq
//pack.msg.Header.Seq = seqid
b, _ := pack.msg.Header.Serialize()
this.conn.Write(b)
this.conn.Write(pack.msg.Data)
- log.Infof("send msg: (%d) (%s)", pack.msg.Header.Type, pack.msg.Data)
+ log.Infof("send msg: Type(%d), Seq(%d), Body(%s)", pack.msg.Header.Type, pack.msg.Header.Seq, pack.msg.Data)
//pack.client.nextSeq += 1
time.Sleep(1 * time.Second)
- case <- timer.C:
+ case <-timer.C:
this.conn.Write(heartbeat)
}
}
}()
}
-func (this *Conn)Read() int {
+func (this *Conn) Read() int {
var n int
if this.readFlag == 0 {
n = MyRead(this.conn, this.headBuf[this.nRead:])
@@ -265,20 +308,20 @@ func (this *Conn)Read() int {
return 0
}
-func (this *Conn)BufReset() {
+func (this *Conn) BufReset() {
this.readFlag = 0
this.nRead = 0
}
-func (this *Conn)Close() {
+func (this *Conn) Close() {
this.done <- true
this.conn.Close()
this.conn = nil
this.BufReset()
}
-func (this *Conn)SendMessage(msgType uint8, seq uint32, body []byte, reply chan *comet.Message) {
- header := comet.Header{
+func (this *Conn) SendMessage(msgType uint8, seq uint32, body []byte, reply chan *comet.Message) {
+ header := comet.Header{
Type: msgType,
Ver: 0,
Seq: seq,
@@ -289,32 +332,45 @@ func (this *Conn)SendMessage(msgType uint8, seq uint32, body []byte, reply chan
Data: body,
}
pack := &Pack{
- msg: msg,
- reply: reply,
+ msg: msg,
+ reply: reply,
}
this.outMsgs <- pack
}
func main() {
- //err := LoadConfig("/system/etc/conf.json")
- err := LoadConfig("./conf.json")
+ var (
+ logConfigFile = flag.String("l", "", "Log config file")
+ configFile = flag.String("c", "/system/etc/conf.json", "Config file")
+ )
+
+ flag.Parse()
+
+ err := LoadConfig(*configFile)
if err != nil {
- fmt.Printf("LoadConfig failed: (%s)", err)
+ fmt.Printf("LoadConfig from %s failed: (%s)\n", *configFile, err)
os.Exit(1)
}
- //logger, err := log.LoggerFromConfigAsFile("/system/etc/log.xml")
- logger, err := log.LoggerFromConfigAsFile("./log.xml")
- if err != nil {
- fmt.Printf("Load log config failed: (%s)\n", err)
- os.Exit(1)
+ var logger log.LoggerInterface
+ if *logConfigFile == "" {
+ logger, _ = log.LoggerFromConfigAsBytes([]byte(defaultLogConfig))
+ } else {
+ //logger, err := log.LoggerFromConfigAsFile("/system/etc/log.xml")
+ logger, err = log.LoggerFromConfigAsFile(*logConfigFile)
+ if err != nil {
+ fmt.Printf("Load log config from %s failed: (%s)\n", *logConfigFile, err)
+ os.Exit(1)
+ }
}
log.ReplaceLogger(logger)
+ log.Infof("gibbon agent started")
+
//wg := &sync.WaitGroup{}
agent := NewAgent()
c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
+ signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
wg := sync.WaitGroup{}
wg.Add(1)
@@ -328,4 +384,3 @@ func main() {
agent.Stop()
wg.Wait()
}
-
diff --git a/control.sh b/gibbond/control.sh
similarity index 100%
rename from control.sh
rename to gibbond/control.sh
diff --git a/gibbond/etc/conf.json b/gibbond/etc/conf.json
new file mode 120000
index 0000000..6aa6fd8
--- /dev/null
+++ b/gibbond/etc/conf.json
@@ -0,0 +1 @@
+conf_rdtest.json
\ No newline at end of file
diff --git a/gibbond/etc/conf_product.json b/gibbond/etc/conf_product.json
new file mode 100644
index 0000000..b3415e7
--- /dev/null
+++ b/gibbond/etc/conf_product.json
@@ -0,0 +1,18 @@
+{
+ "comet" : "0.0.0.0:10000",
+ "accept_timeout" : 2,
+ "read_timeout" : 60,
+ "heartbeat_interval" : 110,
+ "heartbeat_timeout" : 180,
+
+ "rabbit" : {
+ "enable" : true,
+ "uri" : "amqp://guest:guest@10.135.28.70:5672/"
+ },
+ "redis" : {
+ "server" : "10.135.28.70:6379",
+ "pass" : "rpasswd",
+ "poolsize" : 50,
+ "retry" : 3
+ }
+}
diff --git a/gibbond/etc/conf_rdtest.json b/gibbond/etc/conf_rdtest.json
new file mode 100644
index 0000000..bd53c72
--- /dev/null
+++ b/gibbond/etc/conf_rdtest.json
@@ -0,0 +1,18 @@
+{
+ "comet" : "0.0.0.0:10000",
+ "accept_timeout" : 2,
+ "read_timeout" : 60,
+ "heartbeat_interval" : 110,
+ "heartbeat_timeout" : 180,
+
+ "rabbit" : {
+ "enable" : true,
+ "uri" : "amqp://guest:guest@10.154.156.121:5672/"
+ },
+ "redis" : {
+ "server" : "10.154.156.122:6380",
+ "pass" : "rpasswd",
+ "poolsize" : 50,
+ "retry" : 3
+ }
+}
diff --git a/etc/log.xml b/gibbond/etc/log.xml
similarity index 83%
rename from etc/log.xml
rename to gibbond/etc/log.xml
index 5cb0f01..3853c62 100644
--- a/etc/log.xml
+++ b/gibbond/etc/log.xml
@@ -7,7 +7,7 @@
-->
-
+
diff --git a/etc/supervisord.conf b/gibbond/etc/supervisord.conf
similarity index 91%
rename from etc/supervisord.conf
rename to gibbond/etc/supervisord.conf
index bce022e..2512b4a 100644
--- a/etc/supervisord.conf
+++ b/gibbond/etc/supervisord.conf
@@ -18,7 +18,7 @@ supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
gibbonurl=unix:///letv/run/gibbon/supervisor.sock
[program:gibbon]
-command=/letv/gibbon/gibbon -c /letv/gibbon/etc/conf.json
+command=/letv/gibbon/gibbond -c /letv/gibbon/etc/conf.json
user=work
log_stdout=true
log_stderr=true
diff --git a/gibbond/gibbond.go b/gibbond/gibbond.go
index 0ae22c4..6c2b35d 100644
--- a/gibbond/gibbond.go
+++ b/gibbond/gibbond.go
@@ -1,45 +1,23 @@
package main
import (
- "crypto/hmac"
- "crypto/sha1"
"flag"
"fmt"
//"log"
log "github.com/cihub/seelog"
"os"
"os/signal"
- "strings"
"sync"
"syscall"
"time"
- "github.com/chenyf/gibbon/api"
"github.com/chenyf/gibbon/comet"
"github.com/chenyf/gibbon/conf"
+ "github.com/chenyf/gibbon/mq"
+ "github.com/chenyf/gibbon/storage"
+ "github.com/chenyf/push/utils"
)
-func sign(path string, query map[string]string) []byte {
- uid := query["uid"]
- rid := query["rid"]
- tid := query["tid"]
- src := query["src"]
- tm := query["tm"]
- pmtt := query["pmtt"]
-
- raw := []string{path, uid, rid, tid, src, tm, pmtt}
- args := []string{}
- x := []int{6, 5, 4, 3, 2, 1, 0}
- for _, item := range x {
- args = append(args, raw[item])
- }
- data := strings.Join(args, "")
- key := "xnRzFxoCDRVRU2mNQ7AoZ5MCxpAR7ntnmlgRGYav"
- mac := hmac.New(sha1.New, []byte(key))
- mac.Write([]byte(data))
- return mac.Sum(nil)
-}
-
func main() {
var (
@@ -54,6 +32,12 @@ func main() {
os.Exit(1)
}
+ err = log.RegisterCustomFormatter("Ms", utils.CreateMsFormatter)
+ if err != nil {
+ fmt.Printf("Failed to create custom formatter: (%s)\n", err)
+ os.Exit(1)
+ }
+
logger, err := log.LoggerFromConfigAsFile("./etc/log.xml")
if err != nil {
fmt.Printf("Load log config failed: (%s)\n", err)
@@ -63,8 +47,13 @@ func main() {
log.ReplaceLogger(logger)
waitGroup := &sync.WaitGroup{}
- cometServer := comet.NewServer()
+ storage.NewInstance(conf.Config.Redis.Server,
+ conf.Config.Redis.Pass,
+ conf.Config.Redis.PoolSize,
+ conf.Config.Redis.Retry)
+
+ cometServer := comet.NewServer()
listener, err := cometServer.Init(conf.Config.Comet)
if err != nil {
log.Criticalf("Failed to start comet server: %s", err.Error())
@@ -73,6 +62,7 @@ func main() {
cometServer.SetAcceptTimeout(time.Duration(conf.Config.AcceptTimeout) * time.Second)
cometServer.SetReadTimeout(time.Duration(conf.Config.ReadTimeout) * time.Second)
+ cometServer.SetHeartbeatInterval(time.Duration(conf.Config.HeartbeatInterval) * time.Second)
cometServer.SetHeartbeatTimeout(time.Duration(conf.Config.HeartbeatTimeout) * time.Second)
c := make(chan os.Signal, 1)
@@ -91,8 +81,12 @@ func main() {
cometServer.Run(listener)
}()
+ _, err = mq.NewRpcServer(conf.Config.Rabbit.Uri, "gibbon_rpc_exchange", "gibbon")
+ if err != nil {
+ log.Critical("failed to start RPC server: ", err)
+ os.Exit(1)
+ }
+
waitGroup.Add(1)
- go api.StartHttp(conf.Config.Web)
waitGroup.Wait()
}
-
diff --git a/misc/setupenv.sh b/misc/setupenv.sh
index d158ae1..8a5cea6 100755
--- a/misc/setupenv.sh
+++ b/misc/setupenv.sh
@@ -3,6 +3,8 @@
USER=work
WORK_ROOT=/letv
+THIS_DIR=$(dirname $(readlink -f $0) )
+
mkdir -p $WORK_ROOT/log/gibbon
chown -R $USER:$USER $WORK_ROOT/log/gibbon
@@ -10,7 +12,7 @@ mkdir -p $WORK_ROOT/run/gibbon
chown -R $USER:$USER $WORK_ROOT/run/gibbon
mkdir -p $WORK_ROOT/gibbon
-cp gibbon $WORK_ROOT/gibbon
-cp control.sh $WORK_ROOT/gibbon
-cp -r etc $WORK_ROOT/gibbon
+cp $THIS_DIR/gibbond $WORK_ROOT/gibbon
+cp $THIS_DIR/control.sh $WORK_ROOT/gibbon
+cp -r $THIS_DIR/etc $WORK_ROOT/gibbon
chown -R $USER:$USER $WORK_ROOT/gibbon
diff --git a/mq/rpc_msg.go b/mq/rpc_msg.go
new file mode 100644
index 0000000..ca6f7ba
--- /dev/null
+++ b/mq/rpc_msg.go
@@ -0,0 +1,23 @@
+package mq
+
+type MQ_Msg_Crtl struct {
+ DeviceId string `json:"dev_id"`
+ Service string `json:"svc"`
+ Cmd string `json:"cmd"`
+}
+
+type MQ_Msg_CtrlReply struct {
+ Status int `json:"status"`
+ Result string `json:"result,omitempty"`
+}
+
+const (
+ STATUS_SUCCESS = 0
+
+ STATUS_INVALID_SERVICE = 1
+ STATUS_EXCEPTION = 2
+
+ STATUS_NO_DEVICE = -1
+ STATUS_SEND_FAILED = -2
+ STATUS_SEND_TIMEOUT = -3
+)
diff --git a/mq/rpc_server.go b/mq/rpc_server.go
new file mode 100644
index 0000000..d2b039a
--- /dev/null
+++ b/mq/rpc_server.go
@@ -0,0 +1,172 @@
+package mq
+
+import (
+ "encoding/json"
+ "time"
+
+ "github.com/chenyf/gibbon/comet"
+ log "github.com/cihub/seelog"
+ "github.com/streadway/amqp"
+)
+
+var (
+ rpcExchangeType string = "direct"
+)
+
+type RpcServer struct {
+ conn *amqp.Connection
+ channel *amqp.Channel
+ exchange string
+}
+
+func NewRpcServer(amqpURI, exchange, bindingKey string) (*RpcServer, error) {
+ server := &RpcServer{
+ exchange: exchange,
+ }
+
+ var err error
+ server.conn, err = amqp.Dial(amqpURI)
+ if err != nil {
+ log.Errorf("Dial: %s", err)
+ return nil, err
+ }
+
+ log.Infof("got Connection, getting Channel")
+ server.channel, err = server.conn.Channel()
+ if err != nil {
+ log.Errorf("Channel: %s", err)
+ return nil, err
+ }
+
+ log.Infof("got Channel, declaring %q Exchange (%q)", rpcExchangeType, exchange)
+
+ if err := server.channel.ExchangeDeclare(
+ exchange, // name
+ rpcExchangeType, // type
+ true, // durable
+ false, // auto-deleted
+ false, // internal
+ false, // noWait
+ nil, // arguments
+ ); err != nil {
+ log.Errorf("Exchange Declare: %s", err)
+ return nil, err
+ }
+
+ rpcQueue, err := server.channel.QueueDeclare(
+ "", // name
+ false, // durable
+ true, // autoDelete
+ true, // exclusive
+ false, // noWait
+ nil, // args
+ )
+ if err != nil {
+ log.Errorf("Queue Declare: %s", err)
+ return nil, err
+ }
+ log.Infof("declared RPC queue [%s]", rpcQueue.Name)
+
+ if err = server.channel.QueueBind(
+ rpcQueue.Name, // name of the queue
+ bindingKey, // bindingKey
+ exchange, // sourceExchange
+ false, // noWait
+ nil, // arguments
+ ); err != nil {
+ log.Errorf("Queue bind: %s", err)
+ return nil, err
+ }
+
+ deliveries, err := server.channel.Consume(
+ rpcQueue.Name, // name
+ "", // consumerTag,
+ false, // noAck
+ false, // exclusive
+ false, // noLocal
+ false, // noWait
+ nil, // arguments
+ )
+ if err != nil {
+ log.Errorf("consume error: %s", err)
+ return nil, err
+ }
+
+ go server.handleDeliveries(deliveries)
+
+ return server, nil
+}
+
+func (this *RpcServer) Stop() {
+ this.conn.Close()
+}
+
+func (this *RpcServer) SendRpcResponse(callbackQueue, correlationId string, resp interface{}) {
+ log.Infof("Sending RPC reply. RequestId: %s", correlationId)
+ data, _ := json.Marshal(resp)
+ if err := this.channel.Publish(
+ "", // publish to an exchange
+ callbackQueue, // routingKey
+ false, // mandatory
+ false, // immediate
+ amqp.Publishing{
+ Headers: amqp.Table{},
+ ContentType: "text/plain",
+ ContentEncoding: "",
+ DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
+ Priority: 0, // 0-9
+ ReplyTo: "",
+ CorrelationId: correlationId,
+ Body: data,
+ },
+ ); err != nil {
+ log.Errorf("Exchange Publish: %s", err)
+ }
+}
+
+func (this *RpcServer) handleDeliveries(deliveries <-chan amqp.Delivery) {
+ for d := range deliveries {
+ log.Debugf("got %dB RPC request [%s]", len(d.Body), d.CorrelationId)
+ d.Ack(false)
+
+ var msg MQ_Msg_Crtl
+ if err := json.Unmarshal(d.Body, &msg); err != nil {
+ log.Errorf("Unknown MQ message: %s", err)
+ continue
+ }
+
+ go this.handleRpcRequest(&msg, d.ReplyTo, d.CorrelationId)
+ }
+
+ log.Infof("handle: deliveries channel closed")
+ //done <- nil
+}
+
+func (this *RpcServer) handleRpcRequest(msg *MQ_Msg_Crtl, replyTo, correlationId string) {
+ rpcReply := MQ_Msg_CtrlReply{}
+ c := comet.DevMap.Get(msg.DeviceId)
+ if c == nil {
+ log.Warnf("RPC: no device %s on this server.", msg.DeviceId)
+ rpcReply.Status = STATUS_NO_DEVICE
+ this.SendRpcResponse(replyTo, correlationId, rpcReply)
+ return
+ }
+ client := c.(*comet.Client)
+ var replyChannel chan *comet.Message = nil
+ wait := 10
+ replyChannel = make(chan *comet.Message)
+ seq := client.SendMessage(comet.MSG_ROUTER_COMMAND, []byte(msg.Cmd), replyChannel)
+ select {
+ case reply := <-replyChannel:
+ rpcReply.Status = 0
+ rpcReply.Result = string(reply.Data)
+ this.SendRpcResponse(replyTo, correlationId, rpcReply)
+ return
+ case <-time.After(time.Duration(wait) * time.Second):
+ log.Warnf("MSG timeout. RequestId: %s, seq: %d", correlationId, seq)
+ client.MsgTimeout(seq)
+ rpcReply.Status = STATUS_SEND_TIMEOUT
+ this.SendRpcResponse(replyTo, correlationId, rpcReply)
+ return
+ }
+}
diff --git a/storage/redis.go b/storage/redis.go
new file mode 100644
index 0000000..7304ad1
--- /dev/null
+++ b/storage/redis.go
@@ -0,0 +1,318 @@
+package storage
+
+import (
+ "encoding/json"
+ "fmt"
+ log "github.com/cihub/seelog"
+ "github.com/garyburd/redigo/redis"
+ "sort"
+ "strings"
+ "time"
+)
+
+type RedisStorage struct {
+ pool *redis.Pool
+ retry int
+}
+
+func NewRedisStorage(server string, pass string, poolsize int, retry int) *RedisStorage {
+ return &RedisStorage{
+ pool: &redis.Pool{
+ MaxActive: poolsize,
+ MaxIdle: poolsize,
+ IdleTimeout: 300 * time.Second,
+ Dial: func() (redis.Conn, error) {
+ c, err := redis.Dial("tcp", server)
+ if err != nil {
+ log.Infof("failed to connect Redis (%s), (%s)", server, err)
+ return nil, err
+ }
+ if _, err := c.Do("AUTH", pass); err != nil {
+ log.Infof("failed to auth Redis (%s), (%s)", server, err)
+ return nil, err
+
+ }
+ log.Infof("connected with Redis (%s)", server)
+ return c, err
+ },
+ TestOnBorrow: func(c redis.Conn, t time.Time) error {
+ _, err := c.Do("PING")
+ return err
+ },
+ },
+ retry: retry,
+ }
+}
+
+func (r *RedisStorage) Do(commandName string, args ...interface{}) (interface{}, error) {
+ var conn redis.Conn
+ i := r.retry
+ for ; i > 0; i-- {
+ conn = r.pool.Get()
+ err := conn.Err()
+ if err == nil {
+ break
+ } else {
+ log.Infof("failed to get conn from pool (%s)", err)
+ }
+ time.Sleep(time.Second)
+ }
+ if i == 0 || conn == nil {
+ return nil, fmt.Errorf("failed to find a useful redis conn")
+ } else {
+ ret, err := conn.Do(commandName, args...)
+ conn.Close()
+ return ret, err
+ }
+}
+
+// 从存储后端获取 > 指定时间的所有消息
+func (r *RedisStorage) GetOfflineMsgs(appId string, regId string, msgId int64) []*RawMessage {
+ key := "db_offline_msg_" + appId
+ ret, err := redis.Strings(r.Do("HKEYS", key))
+ if err != nil {
+ log.Infof("failed to get fields of offline msg:", err)
+ return nil
+ }
+
+ now := time.Now().Unix()
+ skeys := make(map[int64]interface{})
+ var sidxs []float64
+
+ for i := range ret {
+ var (
+ idx int64
+ expire int64
+ )
+ if _, err := fmt.Sscanf(ret[i], "%v_%v", &idx, &expire); err != nil {
+ log.Infof("invaild redis hash field:", err)
+ continue
+ }
+
+ if idx <= msgId || expire <= now {
+ continue
+ } else {
+ skeys[idx] = ret[i]
+ sidxs = append(sidxs, float64(idx))
+ }
+ }
+
+ sort.Float64Slice(sidxs).Sort()
+ args := []interface{}{key}
+ for k := range sidxs {
+ t := int64(sidxs[k])
+ args = append(args, skeys[t])
+ }
+
+ if len(args) == 1 {
+ return nil
+ }
+
+ rmsgs, err := redis.Strings(r.Do("HMGET", args...))
+ if err != nil {
+ log.Infof("failed to get offline rmsg:", err)
+ return nil
+ }
+
+ var msgs []*RawMessage
+ for i := range rmsgs {
+ t := []byte(rmsgs[i])
+ msg := &RawMessage{}
+ if err := json.Unmarshal(t, msg); err != nil {
+ log.Infof("failed to decode raw msg:", err)
+ continue
+ }
+ msgs = append(msgs, msg)
+ }
+ return msgs
+}
+
+// 从存储后端获取指定消息
+func (r *RedisStorage) GetRawMsg(appId string, msgId int64) *RawMessage {
+ key := "db_msg_" + appId
+ ret, err := redis.Bytes(r.Do("HGET", key, msgId))
+ if err != nil {
+ log.Warnf("redis: HGET failed (%s)", err)
+ return nil
+ }
+ rmsg := &RawMessage{}
+ if err := json.Unmarshal(ret, rmsg); err != nil {
+ log.Warnf("failed to decode raw msg:", err)
+ return nil
+ }
+ return rmsg
+}
+
+func (r *RedisStorage) AddDevice(serverName, devId string) error {
+ _, err := redis.Int(r.Do("HSET", "db_comet_"+serverName, devId, nil))
+ if err != nil {
+ log.Warnf("redis: HSET failed (%s)", err)
+ return err
+ }
+ return nil
+}
+
+func (r *RedisStorage) RemoveDevice(serverName, devId string) error {
+ _, err := r.Do("HDEL", "db_comet_"+serverName, devId)
+ if err != nil {
+ log.Warnf("redis: HDEL failed (%s)", err)
+ return err
+ }
+ return nil
+}
+
+func (r *RedisStorage) CheckDevice(devId string) (string, error) {
+ keys, err := redis.Strings(r.Do("KEYS", "db_comet_*"))
+ if err != nil {
+ log.Errorf("failed to get comet nodes KEYS:", err)
+ return "", err
+ }
+ for _, key := range keys {
+ exist, err := r.HashExists(key, devId)
+ if err != nil {
+ log.Errorf("error on HashExists:", err)
+ return "", err
+ }
+ if exist == 1 {
+ return strings.TrimPrefix(key, "db_comet_"), nil
+ }
+ }
+ return "", nil
+}
+
+func (r *RedisStorage) RefreshDevices(serverName string, timeout int) error {
+ _, err := redis.Int(r.Do("EXPIRE", "db_comet_"+serverName, timeout))
+ if err != nil {
+ log.Warnf("redis: EXPIRE failed, (%s)", err)
+ }
+ return err
+}
+
+func (r *RedisStorage) InitDevices(serverName string) error {
+ _, err := redis.Int(r.Do("DEL", "db_comet_"+serverName))
+ if err != nil {
+ log.Warnf("redis: DEL failed, (%s)", err)
+ }
+ return err
+}
+
+func (r *RedisStorage) HashGetAll(db string) ([]string, error) {
+ ret, err := r.Do("HGETALL", db)
+ if err != nil {
+ log.Warnf("redis: HGET failed (%s)", err)
+ return nil, err
+ }
+ if ret != nil {
+ ret, err := redis.Strings(ret, nil)
+ if err != nil {
+ log.Warnf("redis: convert to strings failed (%s)", err)
+ }
+ return ret, err
+ }
+ return nil, nil
+}
+
+func (r *RedisStorage) HashGet(db string, key string) ([]byte, error) {
+ ret, err := r.Do("HGET", db, key)
+ if err != nil {
+ log.Warnf("redis: HGET failed (%s)", err)
+ return nil, err
+ }
+ if ret != nil {
+ ret, err := redis.Bytes(ret, nil)
+ if err != nil {
+ log.Warnf("redis: convert to bytes failed (%s)", err)
+ }
+ return ret, err
+ }
+ return nil, nil
+}
+
+func (r *RedisStorage) HashSet(db string, key string, val []byte) (int, error) {
+ ret, err := redis.Int(r.Do("HSET", db, key, val))
+ if err != nil {
+ log.Warnf("redis: HSET failed (%s)", err)
+ }
+ return ret, err
+}
+
+func (r *RedisStorage) HashExists(db string, key string) (int, error) {
+ ret, err := redis.Int(r.Do("HEXISTS", db, key))
+ if err != nil {
+ log.Warnf("redis: HEXISTS failed (%s)", err)
+ }
+ return ret, err
+}
+
+func (r *RedisStorage) HashSetNotExist(db string, key string, val []byte) (int, error) {
+ ret, err := redis.Int(r.Do("HSETNX", db, key, val))
+ if err != nil {
+ log.Warnf("redis: HSETNX failed (%s)", err)
+ }
+ return ret, err
+}
+
+func (r *RedisStorage) HashDel(db string, key string) (int, error) {
+ ret, err := redis.Int(r.Do("HDEL", db, key))
+ if err != nil {
+ log.Warnf("redis: HDEL failed (%s)", err)
+ }
+ return ret, err
+}
+
+func (r *RedisStorage) HashIncrBy(db string, key string, val int64) (int64, error) {
+ ret, err := redis.Int64(r.Do("HINCRBY", db, key, val))
+ if err != nil {
+ log.Warnf("redis: HINCRBY failed, (%s)", err)
+ }
+ return ret, err
+}
+
+func (r *RedisStorage) SetNotExist(key string, val []byte) (int, error) {
+ ret, err := redis.Int(r.Do("SETNX", key, val))
+ if err != nil {
+ log.Warnf("redis: SETNX failed (%s)", err)
+ }
+ return ret, err
+}
+
+func (r *RedisStorage) IncrBy(key string, val int64) (int64, error) {
+ ret, err := redis.Int64(r.Do("INCRBY", key, val))
+ if err != nil {
+ log.Warnf("redis: INCRBY failed, (%s)", err)
+ }
+ return ret, err
+}
+
+func (r *RedisStorage) SetAdd(key string, val string) (int, error) {
+ ret, err := redis.Int(r.Do("SADD", key, val))
+ if err != nil {
+ log.Warnf("redis: SADD failed, (%s)", err)
+ }
+ return ret, err
+}
+
+func (r *RedisStorage) SetDel(key string, val string) (int, error) {
+ ret, err := redis.Int(r.Do("SREM", key, val))
+ if err != nil {
+ log.Warnf("redis: SREM failed, (%s)", err)
+ }
+ return ret, err
+}
+
+func (r *RedisStorage) SetIsMember(key string, val string) (int, error) {
+ ret, err := redis.Int(r.Do("SISMEMBER", key, val))
+ if err != nil {
+ log.Warnf("redis: SISMEMBER failed, (%s)", err)
+ }
+ return ret, err
+
+}
+
+func (r *RedisStorage) SetMembers(key string) ([]string, error) {
+ ret, err := redis.Strings(r.Do("SMEMBERS", key))
+ if err != nil {
+ log.Warnf("redis: SMEMBERS failed, (%s)", err)
+ }
+ return ret, err
+}
diff --git a/storage/storage.go b/storage/storage.go
new file mode 100644
index 0000000..482c04b
--- /dev/null
+++ b/storage/storage.go
@@ -0,0 +1,80 @@
+package storage
+
+import ()
+
+type RawMessage struct {
+ AppSec string `json:"appsec,omitempty"`
+ Token string `json:"token,omitempty"`
+ MsgId int64 `json:"msgid"`
+ AppId string `json:"appid"`
+ Pkg string `json:"pkg"`
+ CTime int64 `json:"ctime"`
+ Platform string `json:"platform,omitempty"`
+ MsgType int `json:"msg_type"`
+ PushType int `json:"push_type"`
+ PushParams struct {
+ RegId []string `json:"regid,omitempty"`
+ UserId []string `json:"userid,omitempty"`
+ DevId []string `json:"devid,omitempty"`
+ Topic string `json:"topic,omitempty"`
+ } `json:"push_params"`
+ Content string `json:"content,omitempty"`
+ Notification struct {
+ Title string `json:"title"`
+ Desc string `json:"desc"`
+ Type int `json:"type"`
+ SoundUri string `json:"sound_uri,omitempty`
+ Action int `json:"action,omitempty"`
+ IntentUri string `json:"intent_uri,omitempty`
+ WebUri string `json:"web_uri,omitempty`
+ } `json:"notification,omitempty"`
+ Options struct {
+ TTL int64 `json:"ttl,omitempty"`
+ TTS int64 `json:"tts,omitempty"`
+ } `json:"options"`
+}
+
+type RawApp struct {
+ Pkg string `json:"pkg"`
+ UserId string `json:"userid"`
+ AppKey string `json:"appkey,omitempty"`
+ AppSec string `json:"appsec,omitempty"`
+}
+
+type Storage interface {
+ GetOfflineMsgs(appId string, regId string, ctime int64) []*RawMessage
+ GetRawMsg(appId string, msgId int64) *RawMessage
+
+ AddDevice(serverName, devId string) error
+ RemoveDevice(serverName, devId string) error
+
+ // check if the device Id exists, return the server name
+ CheckDevice(devId string) (string, error)
+ RefreshDevices(serverName string, timeout int) error
+ InitDevices(serverName string) error
+
+ HashGetAll(db string) ([]string, error)
+ HashGet(db string, key string) ([]byte, error)
+ HashSet(db string, key string, val []byte) (int, error)
+ HashExists(db string, key string) (int, error)
+ HashSetNotExist(db string, key string, val []byte) (int, error)
+ HashDel(db string, key string) (int, error)
+ HashIncrBy(db string, key string, val int64) (int64, error)
+
+ SetNotExist(key string, val []byte) (int, error)
+ IncrBy(key string, val int64) (int64, error)
+
+ SetAdd(key string, val string) (int, error)
+ SetDel(key string, val string) (int, error)
+ SetIsMember(key string, val string) (int, error)
+ SetMembers(key string) ([]string, error)
+}
+
+var (
+ Instance Storage = nil
+)
+
+func NewInstance(server string, pass string, poolsize int, retry int) bool {
+ Instance = NewRedisStorage(server, pass, poolsize, retry)
+ return true
+}
diff --git a/utils/convert/convert.go b/utils/convert/convert.go
deleted file mode 100644
index 4916d8b..0000000
--- a/utils/convert/convert.go
+++ /dev/null
@@ -1,78 +0,0 @@
-package convert
-
-import (
- "crypto/md5"
- "fmt"
- "io"
- "time"
-)
-
-// 均采用大端字节序,事实上这个字节序没有任何用处,
-// 只要服务器和客户端约定采用相同的字节序就行
-
-func Uint32ToBytes(v uint32) []byte {
- buf := make([]byte, 4)
- buf[0] = byte(v >> 24)
- buf[1] = byte(v >> 16)
- buf[2] = byte(v >> 8)
- buf[3] = byte(v)
- return buf
-}
-
-func Int32ToBytes(v int32) []byte {
- buf := make([]byte, 4)
- buf[0] = byte(v >> 24)
- buf[1] = byte(v >> 16)
- buf[2] = byte(v >> 8)
- buf[3] = byte(v)
- return buf
-}
-
-func Uint16ToBytes(v uint16) []byte {
- buf := make([]byte, 2)
- buf[0] = byte(v >> 8)
- buf[1] = byte(v)
- return buf
-}
-
-func Int16ToBytes(v int16) []byte {
- buf := make([]byte, 2)
- buf[0] = byte(v >> 8)
- buf[1] = byte(v)
- return buf
-}
-
-func BytesToUint32(buf []byte) uint32 {
- v := (uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]))
- return v
-}
-
-func BytesToInt32(buf []byte) int32 {
- v := (int32(buf[0])<<24 | int32(buf[1])<<16 | int32(buf[2])<<8 | int32(buf[3]))
- return v
-}
-
-func BytesToUint16(buf []byte) uint16 {
- v := (uint16(buf[0])<<8 | uint16(buf[1]))
- return v
-}
-
-func BytesToInt16(buf []byte) int16 {
- v := (int16(buf[0])<<8 | int16(buf[1]))
- return v
-}
-
-func TimestampToTimeString(timestamp int64) string {
- return time.Unix(timestamp, 0).Format("2006-01-02 15:04:05")
-}
-
-func TimestampToTime(timestamp int64) time.Time {
- return time.Unix(timestamp, 0)
-}
-
-// 计算string的md5值, 返回长度32的字符串(md5)
-func StringToMd5(s string) string {
- m := md5.New()
- io.WriteString(m, s)
- return fmt.Sprintf("%x", m.Sum(nil))
-}
diff --git a/utils/convert/convert_test.go b/utils/convert/convert_test.go
deleted file mode 100644
index a14eeab..0000000
--- a/utils/convert/convert_test.go
+++ /dev/null
@@ -1,48 +0,0 @@
-package convert
-
-import (
- "fmt"
- "testing"
- "time"
-)
-
-func TestConvert(t *testing.T) {
- var a uint32 = 199211
- bytes := Uint32ToBytes(a)
- var b uint32 = BytesToUint32(bytes)
- if a != b {
- t.Error("convert error")
- }
-
- a = 0xffffffff
- b = BytesToUint32(Uint32ToBytes(a))
- if a != b {
- t.Error("convert error")
- }
-
- var c uint16 = 0xfefe
- bytes2 := Uint16ToBytes(c)
- var d uint16 = BytesToUint16(bytes2)
- if c != d {
- t.Error("convert error")
- }
-
- c = 65535
- d = BytesToUint16(Uint16ToBytes(c))
- if c != d {
- t.Error("convert error")
- }
-
- var e int32 = 98765
- bytes3 := Int32ToBytes(e)
- var f int32 = BytesToInt32(bytes3)
- if e != f {
- t.Error("convert error")
- }
-
- tt := time.Now()
- timestamp := tt.Unix()
- fmt.Println(TimestampToTimeString(timestamp))
- fmt.Println(TimestampToTime(timestamp))
- fmt.Println(tt)
-}
diff --git a/utils/funcmap/funcmap.go b/utils/funcmap/funcmap.go
deleted file mode 100644
index 7aaf758..0000000
--- a/utils/funcmap/funcmap.go
+++ /dev/null
@@ -1,50 +0,0 @@
-package funcmap
-
-import (
- "errors"
- "reflect"
- "strconv"
-)
-
-type FuncMap struct {
- funcs map[uint32]reflect.Value
-}
-
-func NewFuncMap() *FuncMap {
- return &FuncMap{
- funcs: make(map[uint32]reflect.Value),
- }
-}
-
-func (this *FuncMap) Bind(fnid uint32, fn interface{}) (err error) {
- defer func() {
- if e := recover(); e != nil {
- err = errors.New(strconv.Itoa(int(fnid)) + " is not callable.")
- }
- }()
-
- v := reflect.ValueOf(fn)
- v.Type().NumIn() // fn不是函数则panic
- this.funcs[fnid] = v
- return
-}
-
-func (this *FuncMap) Call(fnid uint32, params ...interface{}) (result []reflect.Value, err error) {
- if _, ok := this.funcs[fnid]; !ok {
- err = errors.New(strconv.Itoa(int(fnid)) + " does not exist.")
- return
- }
-
- in := make([]reflect.Value, len(params))
- for k, param := range params {
- in[k] = reflect.ValueOf(param)
- }
-
- result = this.funcs[fnid].Call(in)
- return
-}
-
-func (this *FuncMap) Exist(fnid uint32) bool {
- _, ok := this.funcs[fnid]
- return ok
-}
diff --git a/utils/funcmap/funcmap_test.go b/utils/funcmap/funcmap_test.go
deleted file mode 100644
index bab4b87..0000000
--- a/utils/funcmap/funcmap_test.go
+++ /dev/null
@@ -1,76 +0,0 @@
-package funcmap
-
-import (
- "fmt"
- "testing"
-)
-
-type T struct {
- id int
- name string
- by []byte
-}
-
-func (this *T) ft() {
- this.id = 100
- this.name = "lijie"
- this.by = []byte("hello world")
-}
-
-func (this *T) fp() {
- fmt.Println(this.id, this.name, string(this.by))
-}
-
-func f1() {
- fmt.Println("f1()")
-}
-
-func f2(i int, s string, arr []int, t *T) {
- fmt.Println("start f2()")
- fmt.Println(i, s)
- fmt.Println(arr)
- t.ft()
- t.fp()
- fmt.Println("end f2()")
-}
-
-func f3(in ...interface{}) []int {
- arr := make([]int, 0)
- for _, v := range in {
- arr = append(arr, v.(int))
- }
- return arr
-}
-
-func TestFuncMap(t *testing.T) {
-
- funcMap := NewFuncMap()
- funcMap.Bind(1, f1)
- funcMap.Bind(2, f2)
- funcMap.Bind(3, f3)
-
- funcMap.Call(1)
- funcMap.Call(2, 99, "sss", []int{1, 2, 3}, &T{})
- arr, err := funcMap.Call(3, 7, 8, 9, 10)
-
- if err == nil {
- for i := 0; i < arr[0].Len(); i++ {
- fmt.Println(arr[0].Index(i).Int())
- }
- } else {
- t.Error(err)
- }
-
- _, err = funcMap.Call(4, 9)
- if err == nil {
- t.Error(err)
- }
-
- if funcMap.Exist(10) {
- t.Error("Exist error")
- }
- if !funcMap.Exist(3) {
- t.Error("Exist error")
- }
-
-}
diff --git a/utils/jsonmessage.go b/utils/jsonmessage.go
deleted file mode 100644
index cdd01ac..0000000
--- a/utils/jsonmessage.go
+++ /dev/null
@@ -1,156 +0,0 @@
-package utils
-
-import (
- "encoding/json"
- "fmt"
- //"github.com/dotcloud/docker/pkg/term"
- "io"
- "strings"
- "time"
-)
-
-type JSONError struct {
- Code int `json:"code,omitempty"`
- Message string `json:"message,omitempty"`
-}
-
-func (e *JSONError) Error() string {
- return e.Message
-}
-
-type JSONProgress struct {
- terminalFd uintptr
- Current int `json:"current,omitempty"`
- Total int `json:"total,omitempty"`
- Start int64 `json:"start,omitempty"`
-}
-
-func (p *JSONProgress) String() string {
- var (
- width = 200
- pbBox string
- numbersBox string
- timeLeftBox string
- )
-
- if p.Current <= 0 && p.Total <= 0 {
- return ""
- }
- current := HumanSize(int64(p.Current))
- if p.Total <= 0 {
- return fmt.Sprintf("%8v", current)
- }
- total := HumanSize(int64(p.Total))
- percentage := int(float64(p.Current)/float64(p.Total)*100) / 2
- if width > 110 {
- pbBox = fmt.Sprintf("[%s>%s] ", strings.Repeat("=", percentage), strings.Repeat(" ", 50-percentage))
- }
- numbersBox = fmt.Sprintf("%8v/%v", current, total)
-
- if p.Start > 0 && percentage < 50 {
- fromStart := time.Now().UTC().Sub(time.Unix(int64(p.Start), 0))
- perEntry := fromStart / time.Duration(p.Current)
- left := time.Duration(p.Total-p.Current) * perEntry
- left = (left / time.Second) * time.Second
-
- if width > 50 {
- timeLeftBox = " " + left.String()
- }
- }
- return pbBox + numbersBox + timeLeftBox
-}
-
-type JSONMessage struct {
- Stream string `json:"stream,omitempty"`
- Status string `json:"status,omitempty"`
- Progress *JSONProgress `json:"progressDetail,omitempty"`
- ProgressMessage string `json:"progress,omitempty"` //deprecated
- ID string `json:"id,omitempty"`
- From string `json:"from,omitempty"`
- Time int64 `json:"time,omitempty"`
- Error *JSONError `json:"errorDetail,omitempty"`
- ErrorMessage string `json:"error,omitempty"` //deprecated
-}
-
-func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
- if jm.Error != nil {
- if jm.Error.Code == 401 {
- return fmt.Errorf("Authentication is required.")
- }
- return jm.Error
- }
- var endl string
- if isTerminal {
- // [2K = erase entire current line
- fmt.Fprintf(out, "%c[2K\r", 27)
- endl = "\r"
- } else if jm.Progress != nil { //disable progressbar in non-terminal
- return nil
- }
- if jm.Time != 0 {
- fmt.Fprintf(out, "[%s] ", time.Unix(jm.Time, 0))
- }
- if jm.ID != "" {
- fmt.Fprintf(out, "%s: ", jm.ID)
- }
- if jm.From != "" {
- fmt.Fprintf(out, "(from %s) ", jm.From)
- }
- if jm.Progress != nil {
- fmt.Fprintf(out, "%s %s%s", jm.Status, jm.Progress.String(), endl)
- } else if jm.ProgressMessage != "" { //deprecated
- fmt.Fprintf(out, "%s %s%s", jm.Status, jm.ProgressMessage, endl)
- } else if jm.Stream != "" {
- fmt.Fprintf(out, "%s%s", jm.Stream, endl)
- } else {
- fmt.Fprintf(out, "%s%s\n", jm.Status, endl)
- }
- return nil
-}
-
-func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool) error {
- var (
- dec = json.NewDecoder(in)
- ids = make(map[string]int)
- diff = 0
- )
- for {
- var jm JSONMessage
- if err := dec.Decode(&jm); err != nil {
- if err == io.EOF {
- break
- }
- return err
- }
-
- if jm.Progress != nil {
- jm.Progress.terminalFd = terminalFd
- }
- if jm.Progress != nil || jm.ProgressMessage != "" {
- line, ok := ids[jm.ID]
- if !ok {
- line = len(ids)
- ids[jm.ID] = line
- fmt.Fprintf(out, "\n")
- diff = 0
- } else {
- diff = len(ids) - line
- }
- if isTerminal {
- // [{diff}A = move cursor up diff rows
- fmt.Fprintf(out, "%c[%dA", 27, diff)
- }
- }
- err := jm.Display(out, isTerminal)
- if jm.ID != "" {
- if isTerminal {
- // [{diff}B = move cursor down diff rows
- fmt.Fprintf(out, "%c[%dB", 27, diff)
- }
- }
- if err != nil {
- return err
- }
- }
- return nil
-}
diff --git a/utils/random.go b/utils/random.go
deleted file mode 100644
index d4d33c6..0000000
--- a/utils/random.go
+++ /dev/null
@@ -1,16 +0,0 @@
-package utils
-
-import (
- "crypto/rand"
- "encoding/hex"
- "io"
-)
-
-func RandomString() string {
- id := make([]byte, 32)
- _, err := io.ReadFull(rand.Reader, id)
- if err != nil {
- panic(err) // This shouldn't happen
- }
- return hex.EncodeToString(id)
-}
diff --git a/utils/safemap/safemap.go b/utils/safemap/safemap.go
deleted file mode 100644
index 348aaab..0000000
--- a/utils/safemap/safemap.go
+++ /dev/null
@@ -1,74 +0,0 @@
-package safemap
-
-import (
- "sync"
-)
-
-type SafeMap struct {
- lock *sync.RWMutex
- mp map[interface{}]interface{}
-}
-
-// NewSafeMap return new safemap.
-func NewSafeMap() *SafeMap {
- return &SafeMap{
- lock: new(sync.RWMutex),
- mp: make(map[interface{}]interface{}),
- }
-}
-
-// Get from maps return the k's value.
-func (this *SafeMap) Get(k interface{}) interface{} {
- this.lock.RLock()
- defer this.lock.RUnlock()
- if val, ok := this.mp[k]; ok {
- return val
- }
- return nil
-}
-
-// Maps the given key and value.
-// Returns false if the key is already in the map and changes nothing.
-func (this *SafeMap) Set(k interface{}, v interface{}) bool {
- this.lock.Lock()
- defer this.lock.Unlock()
- if val, ok := this.mp[k]; !ok {
- this.mp[k] = v
- } else if val != v {
- this.mp[k] = v
- } else {
- return false
- }
- return true
-}
-
-// Returns true if k is exist in the map.
-func (this *SafeMap) Check(k interface{}) bool {
- this.lock.RLock()
- defer this.lock.RUnlock()
- if _, ok := this.mp[k]; !ok {
- return false
- }
- return true
-}
-
-// Delete the given key and value.
-func (this *SafeMap) Delete(k interface{}) {
- this.lock.Lock()
- defer this.lock.Unlock()
- delete(this.mp, k)
-}
-
-// Items returns all items in safemap
-func (this *SafeMap) Items() map[interface{}]interface{} {
- this.lock.RLock()
- defer this.lock.RUnlock()
- return this.mp
-}
-
-// Size returns the size of the safemap
-func (this *SafeMap) Size() int {
- this.lock.RLock()
- defer this.lock.RUnlock()
- return len(this.mp)
-}
diff --git a/utils/safemap/safemap_test.go b/utils/safemap/safemap_test.go
deleted file mode 100644
index 9c9aa7e..0000000
--- a/utils/safemap/safemap_test.go
+++ /dev/null
@@ -1,31 +0,0 @@
-package safemap
-
-import (
- "testing"
-)
-
-func TestSafeMap(t *testing.T) {
- mp := NewSafeMap()
- if !mp.Set("lijie", int(1)) {
- t.Error("set error")
- }
-
- if !mp.Check("lijie") {
- t.Error("check error")
- }
-
- if v := mp.Get("lijie"); v.(int) != 1 {
- t.Error("get error")
- }
-
- if mp.Size() != 1 {
- t.Error("size error")
- }
-
- mp.Delete("lijie")
- if mp.Check("lijie") {
- t.Error("delete error")
- }
-
- mp.Delete("no")
-}
diff --git a/utils/utils.go b/utils/utils.go
deleted file mode 100644
index 0567547..0000000
--- a/utils/utils.go
+++ /dev/null
@@ -1,1137 +0,0 @@
-package utils
-
-import (
- "bytes"
- "crypto/sha1"
- "crypto/sha256"
- "encoding/hex"
- "encoding/json"
- "fmt"
- "index/suffixarray"
- "io"
- "io/ioutil"
- "net/http"
- "os"
- "os/exec"
- "path/filepath"
- "regexp"
- "runtime"
- "strconv"
- "strings"
- "sync"
- "time"
-)
-
-var (
- IAMSTATIC bool // whether or not Docker itself was compiled statically via ./hack/make.sh binary
- INITSHA1 string // sha1sum of separate static dockerinit, if Docker itself was compiled dynamically via ./hack/make.sh dynbinary
- INITPATH string // custom location to search for a valid dockerinit binary (available for packagers as a last resort escape hatch)
-)
-
-// A common interface to access the Fatal method of
-// both testing.B and testing.T.
-type Fataler interface {
- Fatal(args ...interface{})
-}
-
-// Go is a basic promise implementation: it wraps calls a function in a goroutine,
-// and returns a channel which will later return the function's return value.
-func Go(f func() error) chan error {
- ch := make(chan error)
- go func() {
- ch <- f()
- }()
- return ch
-}
-
-// Request a given URL and return an io.Reader
-func Download(url string) (resp *http.Response, err error) {
- if resp, err = http.Get(url); err != nil {
- return nil, err
- }
- if resp.StatusCode >= 400 {
- return nil, fmt.Errorf("Got HTTP status code >= 400: %s", resp.Status)
- }
- return resp, nil
-}
-
-func logf(level string, format string, a ...interface{}) {
- // Retrieve the stack infos
- _, file, line, ok := runtime.Caller(2)
- if !ok {
- file = ""
- line = -1
- } else {
- file = file[strings.LastIndex(file, "/")+1:]
- }
-
- fmt.Fprintf(os.Stderr, fmt.Sprintf("[%s] %s:%d %s\n", level, file, line, format), a...)
-}
-
-// Debug function, if the debug flag is set, then display. Do nothing otherwise
-// If Docker is in damon mode, also send the debug info on the socket
-func Debugf(format string, a ...interface{}) {
- if os.Getenv("DEBUG") != "" {
- logf("debug", format, a...)
- }
-}
-
-func Errorf(format string, a ...interface{}) {
- logf("error", format, a...)
-}
-
-// HumanDuration returns a human-readable approximation of a duration
-// (eg. "About a minute", "4 hours ago", etc.)
-func HumanDuration(d time.Duration) string {
- if seconds := int(d.Seconds()); seconds < 1 {
- return "Less than a second"
- } else if seconds < 60 {
- return fmt.Sprintf("%d seconds", seconds)
- } else if minutes := int(d.Minutes()); minutes == 1 {
- return "About a minute"
- } else if minutes < 60 {
- return fmt.Sprintf("%d minutes", minutes)
- } else if hours := int(d.Hours()); hours == 1 {
- return "About an hour"
- } else if hours < 48 {
- return fmt.Sprintf("%d hours", hours)
- } else if hours < 24*7*2 {
- return fmt.Sprintf("%d days", hours/24)
- } else if hours < 24*30*3 {
- return fmt.Sprintf("%d weeks", hours/24/7)
- } else if hours < 24*365*2 {
- return fmt.Sprintf("%d months", hours/24/30)
- }
- return fmt.Sprintf("%f years", d.Hours()/24/365)
-}
-
-// HumanSize returns a human-readable approximation of a size
-// using SI standard (eg. "44kB", "17MB")
-func HumanSize(size int64) string {
- i := 0
- var sizef float64
- sizef = float64(size)
- units := []string{"B", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"}
- for sizef >= 1000.0 {
- sizef = sizef / 1000.0
- i++
- }
- return fmt.Sprintf("%.4g %s", sizef, units[i])
-}
-
-// Parses a human-readable string representing an amount of RAM
-// in bytes, kibibytes, mebibytes or gibibytes, and returns the
-// number of bytes, or -1 if the string is unparseable.
-// Units are case-insensitive, and the 'b' suffix is optional.
-func RAMInBytes(size string) (bytes int64, err error) {
- re, error := regexp.Compile("^(\\d+)([kKmMgG])?[bB]?$")
- if error != nil {
- return -1, error
- }
-
- matches := re.FindStringSubmatch(size)
-
- if len(matches) != 3 {
- return -1, fmt.Errorf("Invalid size: '%s'", size)
- }
-
- memLimit, error := strconv.ParseInt(matches[1], 10, 0)
- if error != nil {
- return -1, error
- }
-
- unit := strings.ToLower(matches[2])
-
- if unit == "k" {
- memLimit *= 1024
- } else if unit == "m" {
- memLimit *= 1024 * 1024
- } else if unit == "g" {
- memLimit *= 1024 * 1024 * 1024
- }
-
- return memLimit, nil
-}
-
-func Trunc(s string, maxlen int) string {
- if len(s) <= maxlen {
- return s
- }
- return s[:maxlen]
-}
-
-// Figure out the absolute path of our own binary (if it's still around).
-func SelfPath() string {
- path, err := exec.LookPath(os.Args[0])
- if err != nil {
- if os.IsNotExist(err) {
- return ""
- }
- if execErr, ok := err.(*exec.Error); ok && os.IsNotExist(execErr.Err) {
- return ""
- }
- panic(err)
- }
- path, err = filepath.Abs(path)
- if err != nil {
- if os.IsNotExist(err) {
- return ""
- }
- panic(err)
- }
- return path
-}
-
-func dockerInitSha1(target string) string {
- f, err := os.Open(target)
- if err != nil {
- return ""
- }
- defer f.Close()
- h := sha1.New()
- _, err = io.Copy(h, f)
- if err != nil {
- return ""
- }
- return hex.EncodeToString(h.Sum(nil))
-}
-
-func isValidDockerInitPath(target string, selfPath string) bool { // target and selfPath should be absolute (InitPath and SelfPath already do this)
- if target == "" {
- return false
- }
- if IAMSTATIC {
- if selfPath == "" {
- return false
- }
- if target == selfPath {
- return true
- }
- targetFileInfo, err := os.Lstat(target)
- if err != nil {
- return false
- }
- selfPathFileInfo, err := os.Lstat(selfPath)
- if err != nil {
- return false
- }
- return os.SameFile(targetFileInfo, selfPathFileInfo)
- }
- return INITSHA1 != "" && dockerInitSha1(target) == INITSHA1
-}
-
-// Figure out the path of our dockerinit (which may be SelfPath())
-func DockerInitPath(localCopy string) string {
- selfPath := SelfPath()
- if isValidDockerInitPath(selfPath, selfPath) {
- // if we're valid, don't bother checking anything else
- return selfPath
- }
- var possibleInits = []string{
- localCopy,
- INITPATH,
- filepath.Join(filepath.Dir(selfPath), "dockerinit"),
-
- // FHS 3.0 Draft: "/usr/libexec includes internal binaries that are not intended to be executed directly by users or shell scripts. Applications may use a single subdirectory under /usr/libexec."
- // http://www.linuxbase.org/betaspecs/fhs/fhs.html#usrlibexec
- "/usr/libexec/docker/dockerinit",
- "/usr/local/libexec/docker/dockerinit",
-
- // FHS 2.3: "/usr/lib includes object files, libraries, and internal binaries that are not intended to be executed directly by users or shell scripts."
- // http://refspecs.linuxfoundation.org/FHS_2.3/fhs-2.3.html#USRLIBLIBRARIESFORPROGRAMMINGANDPA
- "/usr/lib/docker/dockerinit",
- "/usr/local/lib/docker/dockerinit",
- }
- for _, dockerInit := range possibleInits {
- if dockerInit == "" {
- continue
- }
- path, err := exec.LookPath(dockerInit)
- if err == nil {
- path, err = filepath.Abs(path)
- if err != nil {
- // LookPath already validated that this file exists and is executable (following symlinks), so how could Abs fail?
- panic(err)
- }
- if isValidDockerInitPath(path, selfPath) {
- return path
- }
- }
- }
- return ""
-}
-
-type NopWriter struct{}
-
-func (*NopWriter) Write(buf []byte) (int, error) {
- return len(buf), nil
-}
-
-type nopWriteCloser struct {
- io.Writer
-}
-
-func (w *nopWriteCloser) Close() error { return nil }
-
-func NopWriteCloser(w io.Writer) io.WriteCloser {
- return &nopWriteCloser{w}
-}
-
-type bufReader struct {
- sync.Mutex
- buf *bytes.Buffer
- reader io.Reader
- err error
- wait sync.Cond
-}
-
-func NewBufReader(r io.Reader) *bufReader {
- reader := &bufReader{
- buf: &bytes.Buffer{},
- reader: r,
- }
- reader.wait.L = &reader.Mutex
- go reader.drain()
- return reader
-}
-
-func (r *bufReader) drain() {
- buf := make([]byte, 1024)
- for {
- n, err := r.reader.Read(buf)
- r.Lock()
- if err != nil {
- r.err = err
- } else {
- r.buf.Write(buf[0:n])
- }
- r.wait.Signal()
- r.Unlock()
- if err != nil {
- break
- }
- }
-}
-
-func (r *bufReader) Read(p []byte) (n int, err error) {
- r.Lock()
- defer r.Unlock()
- for {
- n, err = r.buf.Read(p)
- if n > 0 {
- return n, err
- }
- if r.err != nil {
- return 0, r.err
- }
- r.wait.Wait()
- }
-}
-
-func (r *bufReader) Close() error {
- closer, ok := r.reader.(io.ReadCloser)
- if !ok {
- return nil
- }
- return closer.Close()
-}
-
-type WriteBroadcaster struct {
- sync.Mutex
- buf *bytes.Buffer
- writers map[StreamWriter]bool
-}
-
-type StreamWriter struct {
- wc io.WriteCloser
- stream string
-}
-
-func (w *WriteBroadcaster) AddWriter(writer io.WriteCloser, stream string) {
- w.Lock()
- sw := StreamWriter{wc: writer, stream: stream}
- w.writers[sw] = true
- w.Unlock()
-}
-
-type JSONLog struct {
- Log string `json:"log,omitempty"`
- Stream string `json:"stream,omitempty"`
- Created time.Time `json:"time"`
-}
-
-func (w *WriteBroadcaster) Write(p []byte) (n int, err error) {
- w.Lock()
- defer w.Unlock()
- w.buf.Write(p)
- for sw := range w.writers {
- lp := p
- if sw.stream != "" {
- lp = nil
- for {
- line, err := w.buf.ReadString('\n')
- if err != nil {
- w.buf.Write([]byte(line))
- break
- }
- b, err := json.Marshal(&JSONLog{Log: line, Stream: sw.stream, Created: time.Now().UTC()})
- if err != nil {
- // On error, evict the writer
- delete(w.writers, sw)
- continue
- }
- lp = append(lp, b...)
- lp = append(lp, '\n')
- }
- }
- if n, err := sw.wc.Write(lp); err != nil || n != len(lp) {
- // On error, evict the writer
- delete(w.writers, sw)
- }
- }
- return len(p), nil
-}
-
-func (w *WriteBroadcaster) CloseWriters() error {
- w.Lock()
- defer w.Unlock()
- for sw := range w.writers {
- sw.wc.Close()
- }
- w.writers = make(map[StreamWriter]bool)
- return nil
-}
-
-func NewWriteBroadcaster() *WriteBroadcaster {
- return &WriteBroadcaster{writers: make(map[StreamWriter]bool), buf: bytes.NewBuffer(nil)}
-}
-
-func GetTotalUsedFds() int {
- if fds, err := ioutil.ReadDir(fmt.Sprintf("/proc/%d/fd", os.Getpid())); err != nil {
- Errorf("Error opening /proc/%d/fd: %s", os.Getpid(), err)
- } else {
- return len(fds)
- }
- return -1
-}
-
-// TruncIndex allows the retrieval of string identifiers by any of their unique prefixes.
-// This is used to retrieve image and container IDs by more convenient shorthand prefixes.
-type TruncIndex struct {
- index *suffixarray.Index
- ids map[string]bool
- bytes []byte
-}
-
-func NewTruncIndex() *TruncIndex {
- return &TruncIndex{
- index: suffixarray.New([]byte{' '}),
- ids: make(map[string]bool),
- bytes: []byte{' '},
- }
-}
-
-func (idx *TruncIndex) Add(id string) error {
- if strings.Contains(id, " ") {
- return fmt.Errorf("Illegal character: ' '")
- }
- if _, exists := idx.ids[id]; exists {
- return fmt.Errorf("Id already exists: %s", id)
- }
- idx.ids[id] = true
- idx.bytes = append(idx.bytes, []byte(id+" ")...)
- idx.index = suffixarray.New(idx.bytes)
- return nil
-}
-
-func (idx *TruncIndex) Delete(id string) error {
- if _, exists := idx.ids[id]; !exists {
- return fmt.Errorf("No such id: %s", id)
- }
- before, after, err := idx.lookup(id)
- if err != nil {
- return err
- }
- delete(idx.ids, id)
- idx.bytes = append(idx.bytes[:before], idx.bytes[after:]...)
- idx.index = suffixarray.New(idx.bytes)
- return nil
-}
-
-func (idx *TruncIndex) lookup(s string) (int, int, error) {
- offsets := idx.index.Lookup([]byte(" "+s), -1)
- //log.Printf("lookup(%s): %v (index bytes: '%s')\n", s, offsets, idx.index.Bytes())
- if offsets == nil || len(offsets) == 0 || len(offsets) > 1 {
- return -1, -1, fmt.Errorf("No such id: %s", s)
- }
- offsetBefore := offsets[0] + 1
- offsetAfter := offsetBefore + strings.Index(string(idx.bytes[offsetBefore:]), " ")
- return offsetBefore, offsetAfter, nil
-}
-
-func (idx *TruncIndex) Get(s string) (string, error) {
- before, after, err := idx.lookup(s)
- //log.Printf("Get(%s) bytes=|%s| before=|%d| after=|%d|\n", s, idx.bytes, before, after)
- if err != nil {
- return "", err
- }
- return string(idx.bytes[before:after]), err
-}
-
-// TruncateID returns a shorthand version of a string identifier for convenience.
-// A collision with other shorthands is very unlikely, but possible.
-// In case of a collision a lookup with TruncIndex.Get() will fail, and the caller
-// will need to use a langer prefix, or the full-length Id.
-func TruncateID(id string) string {
- shortLen := 12
- if len(id) < shortLen {
- shortLen = len(id)
- }
- return id[:shortLen]
-}
-
-// Code c/c from io.Copy() modified to handle escape sequence
-func CopyEscapable(dst io.Writer, src io.ReadCloser) (written int64, err error) {
- buf := make([]byte, 32*1024)
- for {
- nr, er := src.Read(buf)
- if nr > 0 {
- // ---- Docker addition
- // char 16 is C-p
- if nr == 1 && buf[0] == 16 {
- nr, er = src.Read(buf)
- // char 17 is C-q
- if nr == 1 && buf[0] == 17 {
- if err := src.Close(); err != nil {
- return 0, err
- }
- return 0, nil
- }
- }
- // ---- End of docker
- nw, ew := dst.Write(buf[0:nr])
- if nw > 0 {
- written += int64(nw)
- }
- if ew != nil {
- err = ew
- break
- }
- if nr != nw {
- err = io.ErrShortWrite
- break
- }
- }
- if er == io.EOF {
- break
- }
- if er != nil {
- err = er
- break
- }
- }
- return written, err
-}
-
-func HashData(src io.Reader) (string, error) {
- h := sha256.New()
- if _, err := io.Copy(h, src); err != nil {
- return "", err
- }
- return "sha256:" + hex.EncodeToString(h.Sum(nil)), nil
-}
-
-type KernelVersionInfo struct {
- Kernel int
- Major int
- Minor int
- Flavor string
-}
-
-func (k *KernelVersionInfo) String() string {
- flavor := ""
- if len(k.Flavor) > 0 {
- flavor = fmt.Sprintf("-%s", k.Flavor)
- }
- return fmt.Sprintf("%d.%d.%d%s", k.Kernel, k.Major, k.Minor, flavor)
-}
-
-// Compare two KernelVersionInfo struct.
-// Returns -1 if a < b, = if a == b, 1 it a > b
-func CompareKernelVersion(a, b *KernelVersionInfo) int {
- if a.Kernel < b.Kernel {
- return -1
- } else if a.Kernel > b.Kernel {
- return 1
- }
-
- if a.Major < b.Major {
- return -1
- } else if a.Major > b.Major {
- return 1
- }
-
- if a.Minor < b.Minor {
- return -1
- } else if a.Minor > b.Minor {
- return 1
- }
-
- return 0
-}
-/*
-func GetKernelVersion() (*KernelVersionInfo, error) {
- var (
- err error
- )
-
- uts, err := uname()
- if err != nil {
- return nil, err
- }
-
- release := make([]byte, len(uts.Release))
-
- i := 0
- for _, c := range uts.Release {
- release[i] = byte(c)
- i++
- }
-
- // Remove the \x00 from the release for Atoi to parse correctly
- release = release[:bytes.IndexByte(release, 0)]
-
- return ParseRelease(string(release))
-}
-*/
-func ParseRelease(release string) (*KernelVersionInfo, error) {
- var (
- flavor string
- kernel, major, minor int
- err error
- )
-
- tmp := strings.SplitN(release, "-", 2)
- tmp2 := strings.Split(tmp[0], ".")
-
- if len(tmp2) > 0 {
- kernel, err = strconv.Atoi(tmp2[0])
- if err != nil {
- return nil, err
- }
- }
-
- if len(tmp2) > 1 {
- major, err = strconv.Atoi(tmp2[1])
- if err != nil {
- return nil, err
- }
- }
-
- if len(tmp2) > 2 {
- // Removes "+" because git kernels might set it
- minorUnparsed := strings.Trim(tmp2[2], "+")
- minor, err = strconv.Atoi(minorUnparsed)
- if err != nil {
- return nil, err
- }
- }
-
- if len(tmp) == 2 {
- flavor = tmp[1]
- } else {
- flavor = ""
- }
-
- return &KernelVersionInfo{
- Kernel: kernel,
- Major: major,
- Minor: minor,
- Flavor: flavor,
- }, nil
-}
-
-// FIXME: this is deprecated by CopyWithTar in archive.go
-func CopyDirectory(source, dest string) error {
- if output, err := exec.Command("cp", "-ra", source, dest).CombinedOutput(); err != nil {
- return fmt.Errorf("Error copy: %s (%s)", err, output)
- }
- return nil
-}
-
-type NopFlusher struct{}
-
-func (f *NopFlusher) Flush() {}
-
-type WriteFlusher struct {
- sync.Mutex
- w io.Writer
- flusher http.Flusher
-}
-
-func (wf *WriteFlusher) Write(b []byte) (n int, err error) {
- wf.Lock()
- defer wf.Unlock()
- n, err = wf.w.Write(b)
- wf.flusher.Flush()
- return n, err
-}
-
-// Flush the stream immediately.
-func (wf *WriteFlusher) Flush() {
- wf.Lock()
- defer wf.Unlock()
- wf.flusher.Flush()
-}
-
-func NewWriteFlusher(w io.Writer) *WriteFlusher {
- var flusher http.Flusher
- if f, ok := w.(http.Flusher); ok {
- flusher = f
- } else {
- flusher = &NopFlusher{}
- }
- return &WriteFlusher{w: w, flusher: flusher}
-}
-
-func NewHTTPRequestError(msg string, res *http.Response) error {
- return &JSONError{
- Message: msg,
- Code: res.StatusCode,
- }
-}
-
-func IsURL(str string) bool {
- return strings.HasPrefix(str, "http://") || strings.HasPrefix(str, "https://")
-}
-
-func IsGIT(str string) bool {
- return strings.HasPrefix(str, "git://") || strings.HasPrefix(str, "github.com/")
-}
-
-// GetResolvConf opens and read the content of /etc/resolv.conf.
-// It returns it as byte slice.
-func GetResolvConf() ([]byte, error) {
- resolv, err := ioutil.ReadFile("/etc/resolv.conf")
- if err != nil {
- Errorf("Error openning resolv.conf: %s", err)
- return nil, err
- }
- return resolv, nil
-}
-
-// CheckLocalDns looks into the /etc/resolv.conf,
-// it returns true if there is a local nameserver or if there is no nameserver.
-func CheckLocalDns(resolvConf []byte) bool {
- var parsedResolvConf = StripComments(resolvConf, []byte("#"))
- if !bytes.Contains(parsedResolvConf, []byte("nameserver")) {
- return true
- }
- for _, ip := range [][]byte{
- []byte("127.0.0.1"),
- []byte("127.0.1.1"),
- } {
- if bytes.Contains(parsedResolvConf, ip) {
- return true
- }
- }
- return false
-}
-
-// StripComments parses input into lines and strips away comments.
-func StripComments(input []byte, commentMarker []byte) []byte {
- lines := bytes.Split(input, []byte("\n"))
- var output []byte
- for _, currentLine := range lines {
- var commentIndex = bytes.Index(currentLine, commentMarker)
- if commentIndex == -1 {
- output = append(output, currentLine...)
- } else {
- output = append(output, currentLine[:commentIndex]...)
- }
- output = append(output, []byte("\n")...)
- }
- return output
-}
-
-// GetNameserversAsCIDR returns nameservers (if any) listed in
-// /etc/resolv.conf as CIDR blocks (e.g., "1.2.3.4/32")
-// This function's output is intended for net.ParseCIDR
-func GetNameserversAsCIDR(resolvConf []byte) []string {
- var parsedResolvConf = StripComments(resolvConf, []byte("#"))
- nameservers := []string{}
- re := regexp.MustCompile(`^\s*nameserver\s*(([0-9]+\.){3}([0-9]+))\s*$`)
- for _, line := range bytes.Split(parsedResolvConf, []byte("\n")) {
- var ns = re.FindSubmatch(line)
- if len(ns) > 0 {
- nameservers = append(nameservers, string(ns[1])+"/32")
- }
- }
-
- return nameservers
-}
-
-// FIXME: Change this not to receive default value as parameter
-func ParseHost(defaultHost string, defaultPort int, defaultUnix, addr string) (string, error) {
- var (
- proto string
- host string
- port int
- )
- addr = strings.TrimSpace(addr)
- switch {
- case strings.HasPrefix(addr, "unix://"):
- proto = "unix"
- addr = strings.TrimPrefix(addr, "unix://")
- if addr == "" {
- addr = defaultUnix
- }
- case strings.HasPrefix(addr, "tcp://"):
- proto = "tcp"
- addr = strings.TrimPrefix(addr, "tcp://")
- case addr == "":
- proto = "unix"
- addr = defaultUnix
- default:
- if strings.Contains(addr, "://") {
- return "", fmt.Errorf("Invalid bind address protocol: %s", addr)
- }
- proto = "tcp"
- }
-
- if proto != "unix" && strings.Contains(addr, ":") {
- hostParts := strings.Split(addr, ":")
- if len(hostParts) != 2 {
- return "", fmt.Errorf("Invalid bind address format: %s", addr)
- }
- if hostParts[0] != "" {
- host = hostParts[0]
- } else {
- host = defaultHost
- }
-
- if p, err := strconv.Atoi(hostParts[1]); err == nil && p != 0 {
- port = p
- } else {
- port = defaultPort
- }
-
- } else {
- host = addr
- port = defaultPort
- }
- if proto == "unix" {
- return fmt.Sprintf("%s://%s", proto, host), nil
- }
- return fmt.Sprintf("%s://%s:%d", proto, host, port), nil
-}
-
-func GetReleaseVersion() string {
- resp, err := http.Get("http://get.docker.io/latest")
- if err != nil {
- return ""
- }
- defer resp.Body.Close()
- if resp.ContentLength > 24 || resp.StatusCode != 200 {
- return ""
- }
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return ""
- }
- return strings.TrimSpace(string(body))
-}
-
-// Get a repos name and returns the right reposName + tag
-// The tag can be confusing because of a port in a repository name.
-// Ex: localhost.localdomain:5000/samalba/hipache:latest
-func ParseRepositoryTag(repos string) (string, string) {
- n := strings.LastIndex(repos, ":")
- if n < 0 {
- return repos, ""
- }
- if tag := repos[n+1:]; !strings.Contains(tag, "/") {
- return repos[:n], tag
- }
- return repos, ""
-}
-
-type User struct {
- Uid string // user id
- Gid string // primary group id
- Username string
- Name string
- HomeDir string
-}
-
-// UserLookup check if the given username or uid is present in /etc/passwd
-// and returns the user struct.
-// If the username is not found, an error is returned.
-func UserLookup(uid string) (*User, error) {
- file, err := ioutil.ReadFile("/etc/passwd")
- if err != nil {
- return nil, err
- }
- for _, line := range strings.Split(string(file), "\n") {
- data := strings.Split(line, ":")
- if len(data) > 5 && (data[0] == uid || data[2] == uid) {
- return &User{
- Uid: data[2],
- Gid: data[3],
- Username: data[0],
- Name: data[4],
- HomeDir: data[5],
- }, nil
- }
- }
- return nil, fmt.Errorf("User not found in /etc/passwd")
-}
-
-type DependencyGraph struct {
- nodes map[string]*DependencyNode
-}
-
-type DependencyNode struct {
- id string
- deps map[*DependencyNode]bool
-}
-
-func NewDependencyGraph() DependencyGraph {
- return DependencyGraph{
- nodes: map[string]*DependencyNode{},
- }
-}
-
-func (graph *DependencyGraph) addNode(node *DependencyNode) string {
- if graph.nodes[node.id] == nil {
- graph.nodes[node.id] = node
- }
- return node.id
-}
-
-func (graph *DependencyGraph) NewNode(id string) string {
- if graph.nodes[id] != nil {
- return id
- }
- nd := &DependencyNode{
- id: id,
- deps: map[*DependencyNode]bool{},
- }
- graph.addNode(nd)
- return id
-}
-
-func (graph *DependencyGraph) AddDependency(node, to string) error {
- if graph.nodes[node] == nil {
- return fmt.Errorf("Node %s does not belong to this graph", node)
- }
-
- if graph.nodes[to] == nil {
- return fmt.Errorf("Node %s does not belong to this graph", to)
- }
-
- if node == to {
- return fmt.Errorf("Dependency loops are forbidden!")
- }
-
- graph.nodes[node].addDependency(graph.nodes[to])
- return nil
-}
-
-func (node *DependencyNode) addDependency(to *DependencyNode) bool {
- node.deps[to] = true
- return node.deps[to]
-}
-
-func (node *DependencyNode) Degree() int {
- return len(node.deps)
-}
-
-// The magic happens here ::
-func (graph *DependencyGraph) GenerateTraversalMap() ([][]string, error) {
- Debugf("Generating traversal map. Nodes: %d", len(graph.nodes))
- result := [][]string{}
- processed := map[*DependencyNode]bool{}
- // As long as we haven't processed all nodes...
- for len(processed) < len(graph.nodes) {
- // Use a temporary buffer for processed nodes, otherwise
- // nodes that depend on each other could end up in the same round.
- tmpProcessed := []*DependencyNode{}
- for _, node := range graph.nodes {
- // If the node has more dependencies than what we have cleared,
- // it won't be valid for this round.
- if node.Degree() > len(processed) {
- continue
- }
- // If it's already processed, get to the next one
- if processed[node] {
- continue
- }
- // It's not been processed yet and has 0 deps. Add it!
- // (this is a shortcut for what we're doing below)
- if node.Degree() == 0 {
- tmpProcessed = append(tmpProcessed, node)
- continue
- }
- // If at least one dep hasn't been processed yet, we can't
- // add it.
- ok := true
- for dep := range node.deps {
- if !processed[dep] {
- ok = false
- break
- }
- }
- // All deps have already been processed. Add it!
- if ok {
- tmpProcessed = append(tmpProcessed, node)
- }
- }
- Debugf("Round %d: found %d available nodes", len(result), len(tmpProcessed))
- // If no progress has been made this round,
- // that means we have circular dependencies.
- if len(tmpProcessed) == 0 {
- return nil, fmt.Errorf("Could not find a solution to this dependency graph")
- }
- round := []string{}
- for _, nd := range tmpProcessed {
- round = append(round, nd.id)
- processed[nd] = true
- }
- result = append(result, round)
- }
- return result, nil
-}
-
-// An StatusError reports an unsuccessful exit by a command.
-type StatusError struct {
- Status string
- StatusCode int
-}
-
-func (e *StatusError) Error() string {
- return fmt.Sprintf("Status: %s, Code: %d", e.Status, e.StatusCode)
-}
-
-func quote(word string, buf *bytes.Buffer) {
- // Bail out early for "simple" strings
- if word != "" && !strings.ContainsAny(word, "\\'\"`${[|&;<>()~*?! \t\n") {
- buf.WriteString(word)
- return
- }
-
- buf.WriteString("'")
-
- for i := 0; i < len(word); i++ {
- b := word[i]
- if b == '\'' {
- // Replace literal ' with a close ', a \', and a open '
- buf.WriteString("'\\''")
- } else {
- buf.WriteByte(b)
- }
- }
-
- buf.WriteString("'")
-}
-
-// Take a list of strings and escape them so they will be handled right
-// when passed as arguments to an program via a shell
-func ShellQuoteArguments(args []string) string {
- var buf bytes.Buffer
- for i, arg := range args {
- if i != 0 {
- buf.WriteByte(' ')
- }
- quote(arg, &buf)
- }
- return buf.String()
-}
-
-func IsClosedError(err error) bool {
- /* This comparison is ugly, but unfortunately, net.go doesn't export errClosing.
- * See:
- * http://golang.org/src/pkg/net/net.go
- * https://code.google.com/p/go/issues/detail?id=4337
- * https://groups.google.com/forum/#!msg/golang-nuts/0_aaCvBmOcM/SptmDyX1XJMJ
- */
- return strings.HasSuffix(err.Error(), "use of closed network connection")
-}
-
-func PartParser(template, data string) (map[string]string, error) {
- // ip:public:private
- var (
- templateParts = strings.Split(template, ":")
- parts = strings.Split(data, ":")
- out = make(map[string]string, len(templateParts))
- )
- if len(parts) != len(templateParts) {
- return nil, fmt.Errorf("Invalid format to parse. %s should match template %s", data, template)
- }
-
- for i, t := range templateParts {
- value := ""
- if len(parts) > i {
- value = parts[i]
- }
- out[t] = value
- }
- return out, nil
-}
-
-var globalTestID string
-
-// TestDirectory creates a new temporary directory and returns its path.
-// The contents of directory at path `templateDir` is copied into the
-// new directory.
-func TestDirectory(templateDir string) (dir string, err error) {
- if globalTestID == "" {
- globalTestID = RandomString()[:4]
- }
- prefix := fmt.Sprintf("docker-test%s-%s-", globalTestID, GetCallerName(2))
- if prefix == "" {
- prefix = "docker-test-"
- }
- dir, err = ioutil.TempDir("", prefix)
- if err = os.Remove(dir); err != nil {
- return
- }
- if templateDir != "" {
- if err = CopyDirectory(templateDir, dir); err != nil {
- return
- }
- }
- return
-}
-
-// GetCallerName introspects the call stack and returns the name of the
-// function `depth` levels down in the stack.
-func GetCallerName(depth int) string {
- // Use the caller function name as a prefix.
- // This helps trace temp directories back to their test.
- pc, _, _, _ := runtime.Caller(depth + 1)
- callerLongName := runtime.FuncForPC(pc).Name()
- parts := strings.Split(callerLongName, ".")
- callerShortName := parts[len(parts)-1]
- return callerShortName
-}
-
-func CopyFile(src, dst string) (int64, error) {
- if src == dst {
- return 0, nil
- }
- sf, err := os.Open(src)
- if err != nil {
- return 0, err
- }
- defer sf.Close()
- if err := os.Remove(dst); err != nil && !os.IsNotExist(err) {
- return 0, err
- }
- df, err := os.Create(dst)
- if err != nil {
- return 0, err
- }
- defer df.Close()
- return io.Copy(df, sf)
-}