diff options
Diffstat (limited to 'connector/external')
-rw-r--r-- | connector/external/config.go | 50 | ||||
-rw-r--r-- | connector/external/external.go | 387 |
2 files changed, 437 insertions, 0 deletions
diff --git a/connector/external/config.go b/connector/external/config.go new file mode 100644 index 0000000..caa2b7e --- /dev/null +++ b/connector/external/config.go @@ -0,0 +1,50 @@ +package external + +import ( + . "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector" +) + +const DUMMYEXT_PROTOCOL = "DummyExt" +const MESSENGER_PROTOCOL = "Messenger" + +func init() { + Register(DUMMYEXT_PROTOCOL, Protocol{ + NewConnector: func() Connector { + return &External{ + protocol: DUMMYEXT_PROTOCOL, + command: "./external/dummy.py", + debug: true, + } + }, + Schema: ConfigSchema{ + &ConfigEntry{ + Name: "user", + Description: "Username", + Required: true, + }, + }, + }) + + Register(MESSENGER_PROTOCOL, Protocol{ + NewConnector: func() Connector { + return &External{ + protocol: MESSENGER_PROTOCOL, + command: "./external/messenger.py", + debug: true, + } + }, + Schema: ConfigSchema{ + &ConfigEntry{ + Name: "email", + Description: "Email address", + Required: true, + }, + &ConfigEntry{ + Name: "password", + Description: "Password", + IsPassword: true, + Required: true, + }, + }, + }) +} diff --git a/connector/external/external.go b/connector/external/external.go new file mode 100644 index 0000000..e64ff78 --- /dev/null +++ b/connector/external/external.go @@ -0,0 +1,387 @@ +package external + +import ( + "encoding/json" + "fmt" + "io" + "os" + "os/exec" + "strings" + "sync" + "sync/atomic" + "time" + + log "github.com/sirupsen/logrus" + + . "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector" +) + +// Serialization protocol + +type extMessage struct { + // Header: message type and identifier + MsgType string `json:"_type"` + MsgId uint64 `json:"_id"` + + // Message fields + Key string `json:"key"` + Value string `json:"value"` + EventId string `json:"event_id"` + Error string `json:"error"` + Room RoomID `json:"room"` + User UserID `json:"user"` +} + +type extMessageWithData struct { + extMessage + + Data interface{} `json:"data"` +} + +// Possible values for MsgType +const ( + // ezbr -> external + CONFIGURE = "configure" + GET_USER = "get_user" + SET_USER_INFO = "set_user_info" + SET_ROOM_INFO = "set_room_info" + JOIN = "join" + INVITE = "invite" + LEAVE = "leave" + SEND = "send" + CLOSE = "close" + + // external -> ezbr + JOINED = "joined" + LEFT = "left" + USER_INFO_UPDATED = "user_info_updated" + ROOM_INFO_UPDATED = "room_info_updated" + EVENT = "event" + CACHE_PUT = "cache_put" + CACHE_GET = "cache_get" + + // reply messages + // ezbr -> external: all must wait for a reply! + // external -> ezbr: only CACHE_GET produces a reply + REP_OK = "rep_ok" + REP_ERROR = "rep_error" +) + +// ---- + +type External struct { + handler Handler + + protocol string + command string + debug bool + + config Configuration + + recv io.Reader + send io.Writer + sendJson *json.Encoder + + generation int + proc *exec.Cmd + + counter uint64 + inflightRequests map[uint64]chan *extMessageWithData + lock sync.Mutex +} + +func (ext *External) SetHandler(h Handler) { + ext.handler = h +} + +func (ext *External) Protocol() string { + return ext.protocol +} + +func (ext *External) Configure(c Configuration) error { + var err error + + if ext.proc != nil { + ext.Close() + } + + ext.inflightRequests = map[uint64]chan *extMessageWithData{} + + ext.generation += 1 + + err = ext.setupProc() + if err != nil { + return err + } + + go ext.restartLoop(ext.generation) + + _, err = ext.cmd(extMessage{ + MsgType: CONFIGURE, + }, c) + if err != nil { + return err + } + + return nil +} + +// ---- Process management and communication logic + +func (ext *External) setupProc() error { + var err error + + ext.proc = exec.Command(ext.command) + + ext.recv, err = ext.proc.StdoutPipe() + if err != nil { + return err + } + ext.send, err = ext.proc.StdinPipe() + if err != nil { + return err + } + + if ext.debug { + ext.recv = io.TeeReader(ext.recv, os.Stderr) + ext.send = io.MultiWriter(ext.send, os.Stderr) + } + + ext.sendJson = json.NewEncoder(ext.send) + + ext.proc.Stderr = os.Stderr + + err = ext.proc.Start() + if err != nil { + return err + } + + go ext.recvLoop() + return nil +} + +func (ext *External) restartLoop(generation int) { + for { + if ext.proc == nil { + break + } + ext.proc.Wait() + if ext.generation != generation { + break + } + log.Printf("Process %s stopped, restarting.", ext.command) + err := ext.setupProc() + if err != nil { + ext.proc = nil + log.Warnf("Unable to restart %s: %s", ext.command, err) + break + } + } +} + +func (m *extMessageWithData) DecodeJSON(jj []byte) error { + var c extMessage + + err := json.Unmarshal(jj, &c) + if err != nil { + return err + } + *m = extMessageWithData{extMessage: c} + switch c.MsgType { + case USER_INFO_UPDATED: + var ui UserInfo + err := json.Unmarshal(jj, &ui) + if err != nil { + return err + } + m.Data = &ui + case ROOM_INFO_UPDATED: + var ri RoomInfo + err := json.Unmarshal(jj, &ri) + if err != nil { + return err + } + m.Data = &ri + case EVENT: + var ev Event + err := json.Unmarshal(jj, &ev) + if err != nil { + return err + } + m.Data = &ev + } + return nil +} + +func (ext *External) recvLoop() { + reader := json.NewDecoder(ext.recv) + for { + var msg extMessageWithData + err := reader.Decode(&msg) + if err != nil { + log.Warnf("Failed to decode from %s: %s. Stopping reading.", ext.command, err) + break + } + + if strings.HasPrefix(msg.MsgType, "rep_") { + func() { + ext.lock.Lock() + defer ext.lock.Unlock() + if ch, ok := ext.inflightRequests[msg.MsgId]; ok { + ch <- &msg + delete(ext.inflightRequests, msg.MsgId) + } + }() + } else { + ext.handleCmd(&msg) + } + } +} + +func (ext *External) cmd(msg extMessage, data interface{}) (*extMessageWithData, error) { + msg_id := atomic.AddUint64(&ext.counter, 1) + + msg.MsgId = msg_id + + fullMsg := extMessageWithData{ + extMessage: msg, + Data: data, + } + + ch := make(chan *extMessageWithData) + + func() { + ext.lock.Lock() + defer ext.lock.Unlock() + ext.inflightRequests[msg_id] = ch + }() + + defer func() { + ext.lock.Lock() + defer ext.lock.Unlock() + delete(ext.inflightRequests, msg_id) + }() + + err := ext.sendJson.Encode(&fullMsg) + if err != nil { + return nil, err + } + + select { + case rep := <-ch: + if rep.MsgType == REP_ERROR { + return nil, fmt.Errorf("%s: %s", msg.MsgType, rep.Error) + } else { + return rep, nil + } + case <-time.After(5 * time.Second): + return nil, fmt.Errorf("(%s) timeout", msg.MsgType) + } +} + +func (ext *External) Close() { + ext.sendJson.Encode(&extMessage{ + MsgType: CLOSE, + }) + ext.generation += 1 + + proc := ext.proc + ext.proc = nil + ext.recv = nil + ext.send = nil + ext.sendJson = nil + + go func() { + time.Sleep(10 * time.Second) + proc.Process.Kill() + }() +} + +// ---- Actual message handling :) + +func (ext *External) handleCmd(msg *extMessageWithData) { + switch msg.MsgType { + case JOINED: + ext.handler.Joined(msg.Room) + case LEFT: + ext.handler.Left(msg.Room) + case USER_INFO_UPDATED: + ext.handler.UserInfoUpdated(msg.User, msg.Data.(*UserInfo)) + case ROOM_INFO_UPDATED: + ext.handler.RoomInfoUpdated(msg.Room, msg.User, msg.Data.(*RoomInfo)) + case EVENT: + ext.handler.Event(msg.Data.(*Event)) + case CACHE_PUT: + ext.handler.CachePut(msg.Key, msg.Value) + case CACHE_GET: + value := ext.handler.CacheGet(msg.Key) + ext.sendJson.Encode(&extMessage{ + MsgType: REP_OK, + MsgId: msg.MsgId, + Value: value, + }) + } +} + +func (ext *External) User() UserID { + rep, err := ext.cmd(extMessage{ + MsgType: GET_USER, + }, nil) + if err != nil { + log.Warnf("Unable to get user!") + return "" + } + return rep.User +} + +func (ext *External) SetUserInfo(info *UserInfo) error { + _, err := ext.cmd(extMessage{ + MsgType: SET_USER_INFO, + }, info) + return err +} + +func (ext *External) SetRoomInfo(room RoomID, info *RoomInfo) error { + _, err := ext.cmd(extMessage{ + MsgType: SET_ROOM_INFO, + Room: room, + }, info) + return err +} + +func (ext *External) Join(room RoomID) error { + _, err := ext.cmd(extMessage{ + MsgType: JOIN, + Room: room, + }, nil) + return err +} + +func (ext *External) Invite(user UserID, room RoomID) error { + _, err := ext.cmd(extMessage{ + MsgType: LEAVE, + User: user, + Room: room, + }, nil) + return err +} + +func (ext *External) Leave(room RoomID) { + _, err := ext.cmd(extMessage{ + MsgType: LEAVE, + Room: room, + }, nil) + if err != nil { + log.Warnf("Could not leave %s: %s", room, err.Error()) + } +} + +func (ext *External) Send(event *Event) (string, error) { + rep, err := ext.cmd(extMessage{ + MsgType: SEND, + }, event) + if err != nil { + return "", err + } + return rep.EventId, nil +} |