aboutsummaryrefslogtreecommitdiff
path: root/connector/external
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-02-29 18:30:43 +0100
committerAlex Auvolat <alex@adnab.me>2020-02-29 18:34:14 +0100
commit74314696328a52779b83777eddac6bb506a3846b (patch)
tree599cf464488ee5137ab1b02d3b5d6c8381a483bc /connector/external
parent2649e41c85283c680b9e1aa3294868b985aecc22 (diff)
downloadeasybridge-74314696328a52779b83777eddac6bb506a3846b.tar.gz
easybridge-74314696328a52779b83777eddac6bb506a3846b.zip
Support for external processes; stub FB messenger bridge
Diffstat (limited to 'connector/external')
-rw-r--r--connector/external/config.go50
-rw-r--r--connector/external/external.go387
2 files changed, 437 insertions, 0 deletions
diff --git a/connector/external/config.go b/connector/external/config.go
new file mode 100644
index 0000000..caa2b7e
--- /dev/null
+++ b/connector/external/config.go
@@ -0,0 +1,50 @@
+package external
+
+import (
+ . "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector"
+)
+
+const DUMMYEXT_PROTOCOL = "DummyExt"
+const MESSENGER_PROTOCOL = "Messenger"
+
+func init() {
+ Register(DUMMYEXT_PROTOCOL, Protocol{
+ NewConnector: func() Connector {
+ return &External{
+ protocol: DUMMYEXT_PROTOCOL,
+ command: "./external/dummy.py",
+ debug: true,
+ }
+ },
+ Schema: ConfigSchema{
+ &ConfigEntry{
+ Name: "user",
+ Description: "Username",
+ Required: true,
+ },
+ },
+ })
+
+ Register(MESSENGER_PROTOCOL, Protocol{
+ NewConnector: func() Connector {
+ return &External{
+ protocol: MESSENGER_PROTOCOL,
+ command: "./external/messenger.py",
+ debug: true,
+ }
+ },
+ Schema: ConfigSchema{
+ &ConfigEntry{
+ Name: "email",
+ Description: "Email address",
+ Required: true,
+ },
+ &ConfigEntry{
+ Name: "password",
+ Description: "Password",
+ IsPassword: true,
+ Required: true,
+ },
+ },
+ })
+}
diff --git a/connector/external/external.go b/connector/external/external.go
new file mode 100644
index 0000000..e64ff78
--- /dev/null
+++ b/connector/external/external.go
@@ -0,0 +1,387 @@
+package external
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "os"
+ "os/exec"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ log "github.com/sirupsen/logrus"
+
+ . "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector"
+)
+
+// Serialization protocol
+
+type extMessage struct {
+ // Header: message type and identifier
+ MsgType string `json:"_type"`
+ MsgId uint64 `json:"_id"`
+
+ // Message fields
+ Key string `json:"key"`
+ Value string `json:"value"`
+ EventId string `json:"event_id"`
+ Error string `json:"error"`
+ Room RoomID `json:"room"`
+ User UserID `json:"user"`
+}
+
+type extMessageWithData struct {
+ extMessage
+
+ Data interface{} `json:"data"`
+}
+
+// Possible values for MsgType
+const (
+ // ezbr -> external
+ CONFIGURE = "configure"
+ GET_USER = "get_user"
+ SET_USER_INFO = "set_user_info"
+ SET_ROOM_INFO = "set_room_info"
+ JOIN = "join"
+ INVITE = "invite"
+ LEAVE = "leave"
+ SEND = "send"
+ CLOSE = "close"
+
+ // external -> ezbr
+ JOINED = "joined"
+ LEFT = "left"
+ USER_INFO_UPDATED = "user_info_updated"
+ ROOM_INFO_UPDATED = "room_info_updated"
+ EVENT = "event"
+ CACHE_PUT = "cache_put"
+ CACHE_GET = "cache_get"
+
+ // reply messages
+ // ezbr -> external: all must wait for a reply!
+ // external -> ezbr: only CACHE_GET produces a reply
+ REP_OK = "rep_ok"
+ REP_ERROR = "rep_error"
+)
+
+// ----
+
+type External struct {
+ handler Handler
+
+ protocol string
+ command string
+ debug bool
+
+ config Configuration
+
+ recv io.Reader
+ send io.Writer
+ sendJson *json.Encoder
+
+ generation int
+ proc *exec.Cmd
+
+ counter uint64
+ inflightRequests map[uint64]chan *extMessageWithData
+ lock sync.Mutex
+}
+
+func (ext *External) SetHandler(h Handler) {
+ ext.handler = h
+}
+
+func (ext *External) Protocol() string {
+ return ext.protocol
+}
+
+func (ext *External) Configure(c Configuration) error {
+ var err error
+
+ if ext.proc != nil {
+ ext.Close()
+ }
+
+ ext.inflightRequests = map[uint64]chan *extMessageWithData{}
+
+ ext.generation += 1
+
+ err = ext.setupProc()
+ if err != nil {
+ return err
+ }
+
+ go ext.restartLoop(ext.generation)
+
+ _, err = ext.cmd(extMessage{
+ MsgType: CONFIGURE,
+ }, c)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// ---- Process management and communication logic
+
+func (ext *External) setupProc() error {
+ var err error
+
+ ext.proc = exec.Command(ext.command)
+
+ ext.recv, err = ext.proc.StdoutPipe()
+ if err != nil {
+ return err
+ }
+ ext.send, err = ext.proc.StdinPipe()
+ if err != nil {
+ return err
+ }
+
+ if ext.debug {
+ ext.recv = io.TeeReader(ext.recv, os.Stderr)
+ ext.send = io.MultiWriter(ext.send, os.Stderr)
+ }
+
+ ext.sendJson = json.NewEncoder(ext.send)
+
+ ext.proc.Stderr = os.Stderr
+
+ err = ext.proc.Start()
+ if err != nil {
+ return err
+ }
+
+ go ext.recvLoop()
+ return nil
+}
+
+func (ext *External) restartLoop(generation int) {
+ for {
+ if ext.proc == nil {
+ break
+ }
+ ext.proc.Wait()
+ if ext.generation != generation {
+ break
+ }
+ log.Printf("Process %s stopped, restarting.", ext.command)
+ err := ext.setupProc()
+ if err != nil {
+ ext.proc = nil
+ log.Warnf("Unable to restart %s: %s", ext.command, err)
+ break
+ }
+ }
+}
+
+func (m *extMessageWithData) DecodeJSON(jj []byte) error {
+ var c extMessage
+
+ err := json.Unmarshal(jj, &c)
+ if err != nil {
+ return err
+ }
+ *m = extMessageWithData{extMessage: c}
+ switch c.MsgType {
+ case USER_INFO_UPDATED:
+ var ui UserInfo
+ err := json.Unmarshal(jj, &ui)
+ if err != nil {
+ return err
+ }
+ m.Data = &ui
+ case ROOM_INFO_UPDATED:
+ var ri RoomInfo
+ err := json.Unmarshal(jj, &ri)
+ if err != nil {
+ return err
+ }
+ m.Data = &ri
+ case EVENT:
+ var ev Event
+ err := json.Unmarshal(jj, &ev)
+ if err != nil {
+ return err
+ }
+ m.Data = &ev
+ }
+ return nil
+}
+
+func (ext *External) recvLoop() {
+ reader := json.NewDecoder(ext.recv)
+ for {
+ var msg extMessageWithData
+ err := reader.Decode(&msg)
+ if err != nil {
+ log.Warnf("Failed to decode from %s: %s. Stopping reading.", ext.command, err)
+ break
+ }
+
+ if strings.HasPrefix(msg.MsgType, "rep_") {
+ func() {
+ ext.lock.Lock()
+ defer ext.lock.Unlock()
+ if ch, ok := ext.inflightRequests[msg.MsgId]; ok {
+ ch <- &msg
+ delete(ext.inflightRequests, msg.MsgId)
+ }
+ }()
+ } else {
+ ext.handleCmd(&msg)
+ }
+ }
+}
+
+func (ext *External) cmd(msg extMessage, data interface{}) (*extMessageWithData, error) {
+ msg_id := atomic.AddUint64(&ext.counter, 1)
+
+ msg.MsgId = msg_id
+
+ fullMsg := extMessageWithData{
+ extMessage: msg,
+ Data: data,
+ }
+
+ ch := make(chan *extMessageWithData)
+
+ func() {
+ ext.lock.Lock()
+ defer ext.lock.Unlock()
+ ext.inflightRequests[msg_id] = ch
+ }()
+
+ defer func() {
+ ext.lock.Lock()
+ defer ext.lock.Unlock()
+ delete(ext.inflightRequests, msg_id)
+ }()
+
+ err := ext.sendJson.Encode(&fullMsg)
+ if err != nil {
+ return nil, err
+ }
+
+ select {
+ case rep := <-ch:
+ if rep.MsgType == REP_ERROR {
+ return nil, fmt.Errorf("%s: %s", msg.MsgType, rep.Error)
+ } else {
+ return rep, nil
+ }
+ case <-time.After(5 * time.Second):
+ return nil, fmt.Errorf("(%s) timeout", msg.MsgType)
+ }
+}
+
+func (ext *External) Close() {
+ ext.sendJson.Encode(&extMessage{
+ MsgType: CLOSE,
+ })
+ ext.generation += 1
+
+ proc := ext.proc
+ ext.proc = nil
+ ext.recv = nil
+ ext.send = nil
+ ext.sendJson = nil
+
+ go func() {
+ time.Sleep(10 * time.Second)
+ proc.Process.Kill()
+ }()
+}
+
+// ---- Actual message handling :)
+
+func (ext *External) handleCmd(msg *extMessageWithData) {
+ switch msg.MsgType {
+ case JOINED:
+ ext.handler.Joined(msg.Room)
+ case LEFT:
+ ext.handler.Left(msg.Room)
+ case USER_INFO_UPDATED:
+ ext.handler.UserInfoUpdated(msg.User, msg.Data.(*UserInfo))
+ case ROOM_INFO_UPDATED:
+ ext.handler.RoomInfoUpdated(msg.Room, msg.User, msg.Data.(*RoomInfo))
+ case EVENT:
+ ext.handler.Event(msg.Data.(*Event))
+ case CACHE_PUT:
+ ext.handler.CachePut(msg.Key, msg.Value)
+ case CACHE_GET:
+ value := ext.handler.CacheGet(msg.Key)
+ ext.sendJson.Encode(&extMessage{
+ MsgType: REP_OK,
+ MsgId: msg.MsgId,
+ Value: value,
+ })
+ }
+}
+
+func (ext *External) User() UserID {
+ rep, err := ext.cmd(extMessage{
+ MsgType: GET_USER,
+ }, nil)
+ if err != nil {
+ log.Warnf("Unable to get user!")
+ return ""
+ }
+ return rep.User
+}
+
+func (ext *External) SetUserInfo(info *UserInfo) error {
+ _, err := ext.cmd(extMessage{
+ MsgType: SET_USER_INFO,
+ }, info)
+ return err
+}
+
+func (ext *External) SetRoomInfo(room RoomID, info *RoomInfo) error {
+ _, err := ext.cmd(extMessage{
+ MsgType: SET_ROOM_INFO,
+ Room: room,
+ }, info)
+ return err
+}
+
+func (ext *External) Join(room RoomID) error {
+ _, err := ext.cmd(extMessage{
+ MsgType: JOIN,
+ Room: room,
+ }, nil)
+ return err
+}
+
+func (ext *External) Invite(user UserID, room RoomID) error {
+ _, err := ext.cmd(extMessage{
+ MsgType: LEAVE,
+ User: user,
+ Room: room,
+ }, nil)
+ return err
+}
+
+func (ext *External) Leave(room RoomID) {
+ _, err := ext.cmd(extMessage{
+ MsgType: LEAVE,
+ Room: room,
+ }, nil)
+ if err != nil {
+ log.Warnf("Could not leave %s: %s", room, err.Error())
+ }
+}
+
+func (ext *External) Send(event *Event) (string, error) {
+ rep, err := ext.cmd(extMessage{
+ MsgType: SEND,
+ }, event)
+ if err != nil {
+ return "", err
+ }
+ return rep.EventId, nil
+}