aboutsummaryrefslogtreecommitdiff
path: root/connector
diff options
context:
space:
mode:
Diffstat (limited to 'connector')
-rw-r--r--connector/connector.go32
-rw-r--r--connector/external/config.go50
-rw-r--r--connector/external/external.go387
-rw-r--r--connector/irc/irc.go2
-rw-r--r--connector/marshal.go99
-rw-r--r--connector/mattermost/mattermost.go14
-rw-r--r--connector/xmpp/xmpp.go2
7 files changed, 563 insertions, 23 deletions
diff --git a/connector/connector.go b/connector/connector.go
index e8c382d..210d4ad 100644
--- a/connector/connector.go
+++ b/connector/connector.go
@@ -109,48 +109,48 @@ const (
)
type Event struct {
- Type EventType
+ Type EventType `json:"type"`
// If non-empty, the event Id is used to deduplicate events in a channel
// This is usefull for backends that provide a backlog of channel messages
// when (re-)joining a room
- Id string
+ Id string `json:"id"`
// UserID of the user that sent the event
// If this is a direct message event, this event can only have been authored
// by the user we are talking to (and not by ourself)
- Author UserID
+ Author UserID `json:"author"`
// UserID of the targetted user in the case of a direct message,
// empty if targetting a room
- Recipient UserID
+ Recipient UserID `json:"recipient"`
// RoomID of the room where the event happenned or of the targetted room,
// or empty string if it happenned by direct message
- Room RoomID
+ Room RoomID `json:"room"`
// Message text or action text
- Text string
+ Text string `json:"text`
// Attached files such as images
- Attachments []MediaObject
+ Attachments []SMediaObject `json:"attachments"`
}
type UserInfo struct {
- DisplayName string
+ DisplayName string `json:"display_name"`
// If non-empty, the Filename of the avatar object will be used by Easybridge
// to deduplicate the update events and prevent needless reuploads.
// Example strategy that works for the mattermost backend: use the update timestamp as fictious file name
- Avatar MediaObject
+ Avatar SMediaObject `json:"avatar"`
}
type RoomInfo struct {
- Name string
- Topic string
+ Name string `json:"name"`
+ Topic string `json:"topic"`
// Same deduplication comment as for UserInfo.Avatar
- Picture MediaObject
+ Picture SMediaObject `json:"picture"`
}
type MediaObject interface {
@@ -169,7 +169,11 @@ type MediaObject interface {
URL() string
}
+type SMediaObject struct {
+ MediaObject
+}
+
type ImageSize struct {
- Width int
- Height int
+ Width int `json:"width"`
+ Height int `json:"height"`
}
diff --git a/connector/external/config.go b/connector/external/config.go
new file mode 100644
index 0000000..caa2b7e
--- /dev/null
+++ b/connector/external/config.go
@@ -0,0 +1,50 @@
+package external
+
+import (
+ . "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector"
+)
+
+const DUMMYEXT_PROTOCOL = "DummyExt"
+const MESSENGER_PROTOCOL = "Messenger"
+
+func init() {
+ Register(DUMMYEXT_PROTOCOL, Protocol{
+ NewConnector: func() Connector {
+ return &External{
+ protocol: DUMMYEXT_PROTOCOL,
+ command: "./external/dummy.py",
+ debug: true,
+ }
+ },
+ Schema: ConfigSchema{
+ &ConfigEntry{
+ Name: "user",
+ Description: "Username",
+ Required: true,
+ },
+ },
+ })
+
+ Register(MESSENGER_PROTOCOL, Protocol{
+ NewConnector: func() Connector {
+ return &External{
+ protocol: MESSENGER_PROTOCOL,
+ command: "./external/messenger.py",
+ debug: true,
+ }
+ },
+ Schema: ConfigSchema{
+ &ConfigEntry{
+ Name: "email",
+ Description: "Email address",
+ Required: true,
+ },
+ &ConfigEntry{
+ Name: "password",
+ Description: "Password",
+ IsPassword: true,
+ Required: true,
+ },
+ },
+ })
+}
diff --git a/connector/external/external.go b/connector/external/external.go
new file mode 100644
index 0000000..e64ff78
--- /dev/null
+++ b/connector/external/external.go
@@ -0,0 +1,387 @@
+package external
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "os"
+ "os/exec"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ log "github.com/sirupsen/logrus"
+
+ . "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector"
+)
+
+// Serialization protocol
+
+type extMessage struct {
+ // Header: message type and identifier
+ MsgType string `json:"_type"`
+ MsgId uint64 `json:"_id"`
+
+ // Message fields
+ Key string `json:"key"`
+ Value string `json:"value"`
+ EventId string `json:"event_id"`
+ Error string `json:"error"`
+ Room RoomID `json:"room"`
+ User UserID `json:"user"`
+}
+
+type extMessageWithData struct {
+ extMessage
+
+ Data interface{} `json:"data"`
+}
+
+// Possible values for MsgType
+const (
+ // ezbr -> external
+ CONFIGURE = "configure"
+ GET_USER = "get_user"
+ SET_USER_INFO = "set_user_info"
+ SET_ROOM_INFO = "set_room_info"
+ JOIN = "join"
+ INVITE = "invite"
+ LEAVE = "leave"
+ SEND = "send"
+ CLOSE = "close"
+
+ // external -> ezbr
+ JOINED = "joined"
+ LEFT = "left"
+ USER_INFO_UPDATED = "user_info_updated"
+ ROOM_INFO_UPDATED = "room_info_updated"
+ EVENT = "event"
+ CACHE_PUT = "cache_put"
+ CACHE_GET = "cache_get"
+
+ // reply messages
+ // ezbr -> external: all must wait for a reply!
+ // external -> ezbr: only CACHE_GET produces a reply
+ REP_OK = "rep_ok"
+ REP_ERROR = "rep_error"
+)
+
+// ----
+
+type External struct {
+ handler Handler
+
+ protocol string
+ command string
+ debug bool
+
+ config Configuration
+
+ recv io.Reader
+ send io.Writer
+ sendJson *json.Encoder
+
+ generation int
+ proc *exec.Cmd
+
+ counter uint64
+ inflightRequests map[uint64]chan *extMessageWithData
+ lock sync.Mutex
+}
+
+func (ext *External) SetHandler(h Handler) {
+ ext.handler = h
+}
+
+func (ext *External) Protocol() string {
+ return ext.protocol
+}
+
+func (ext *External) Configure(c Configuration) error {
+ var err error
+
+ if ext.proc != nil {
+ ext.Close()
+ }
+
+ ext.inflightRequests = map[uint64]chan *extMessageWithData{}
+
+ ext.generation += 1
+
+ err = ext.setupProc()
+ if err != nil {
+ return err
+ }
+
+ go ext.restartLoop(ext.generation)
+
+ _, err = ext.cmd(extMessage{
+ MsgType: CONFIGURE,
+ }, c)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// ---- Process management and communication logic
+
+func (ext *External) setupProc() error {
+ var err error
+
+ ext.proc = exec.Command(ext.command)
+
+ ext.recv, err = ext.proc.StdoutPipe()
+ if err != nil {
+ return err
+ }
+ ext.send, err = ext.proc.StdinPipe()
+ if err != nil {
+ return err
+ }
+
+ if ext.debug {
+ ext.recv = io.TeeReader(ext.recv, os.Stderr)
+ ext.send = io.MultiWriter(ext.send, os.Stderr)
+ }
+
+ ext.sendJson = json.NewEncoder(ext.send)
+
+ ext.proc.Stderr = os.Stderr
+
+ err = ext.proc.Start()
+ if err != nil {
+ return err
+ }
+
+ go ext.recvLoop()
+ return nil
+}
+
+func (ext *External) restartLoop(generation int) {
+ for {
+ if ext.proc == nil {
+ break
+ }
+ ext.proc.Wait()
+ if ext.generation != generation {
+ break
+ }
+ log.Printf("Process %s stopped, restarting.", ext.command)
+ err := ext.setupProc()
+ if err != nil {
+ ext.proc = nil
+ log.Warnf("Unable to restart %s: %s", ext.command, err)
+ break
+ }
+ }
+}
+
+func (m *extMessageWithData) DecodeJSON(jj []byte) error {
+ var c extMessage
+
+ err := json.Unmarshal(jj, &c)
+ if err != nil {
+ return err
+ }
+ *m = extMessageWithData{extMessage: c}
+ switch c.MsgType {
+ case USER_INFO_UPDATED:
+ var ui UserInfo
+ err := json.Unmarshal(jj, &ui)
+ if err != nil {
+ return err
+ }
+ m.Data = &ui
+ case ROOM_INFO_UPDATED:
+ var ri RoomInfo
+ err := json.Unmarshal(jj, &ri)
+ if err != nil {
+ return err
+ }
+ m.Data = &ri
+ case EVENT:
+ var ev Event
+ err := json.Unmarshal(jj, &ev)
+ if err != nil {
+ return err
+ }
+ m.Data = &ev
+ }
+ return nil
+}
+
+func (ext *External) recvLoop() {
+ reader := json.NewDecoder(ext.recv)
+ for {
+ var msg extMessageWithData
+ err := reader.Decode(&msg)
+ if err != nil {
+ log.Warnf("Failed to decode from %s: %s. Stopping reading.", ext.command, err)
+ break
+ }
+
+ if strings.HasPrefix(msg.MsgType, "rep_") {
+ func() {
+ ext.lock.Lock()
+ defer ext.lock.Unlock()
+ if ch, ok := ext.inflightRequests[msg.MsgId]; ok {
+ ch <- &msg
+ delete(ext.inflightRequests, msg.MsgId)
+ }
+ }()
+ } else {
+ ext.handleCmd(&msg)
+ }
+ }
+}
+
+func (ext *External) cmd(msg extMessage, data interface{}) (*extMessageWithData, error) {
+ msg_id := atomic.AddUint64(&ext.counter, 1)
+
+ msg.MsgId = msg_id
+
+ fullMsg := extMessageWithData{
+ extMessage: msg,
+ Data: data,
+ }
+
+ ch := make(chan *extMessageWithData)
+
+ func() {
+ ext.lock.Lock()
+ defer ext.lock.Unlock()
+ ext.inflightRequests[msg_id] = ch
+ }()
+
+ defer func() {
+ ext.lock.Lock()
+ defer ext.lock.Unlock()
+ delete(ext.inflightRequests, msg_id)
+ }()
+
+ err := ext.sendJson.Encode(&fullMsg)
+ if err != nil {
+ return nil, err
+ }
+
+ select {
+ case rep := <-ch:
+ if rep.MsgType == REP_ERROR {
+ return nil, fmt.Errorf("%s: %s", msg.MsgType, rep.Error)
+ } else {
+ return rep, nil
+ }
+ case <-time.After(5 * time.Second):
+ return nil, fmt.Errorf("(%s) timeout", msg.MsgType)
+ }
+}
+
+func (ext *External) Close() {
+ ext.sendJson.Encode(&extMessage{
+ MsgType: CLOSE,
+ })
+ ext.generation += 1
+
+ proc := ext.proc
+ ext.proc = nil
+ ext.recv = nil
+ ext.send = nil
+ ext.sendJson = nil
+
+ go func() {
+ time.Sleep(10 * time.Second)
+ proc.Process.Kill()
+ }()
+}
+
+// ---- Actual message handling :)
+
+func (ext *External) handleCmd(msg *extMessageWithData) {
+ switch msg.MsgType {
+ case JOINED:
+ ext.handler.Joined(msg.Room)
+ case LEFT:
+ ext.handler.Left(msg.Room)
+ case USER_INFO_UPDATED:
+ ext.handler.UserInfoUpdated(msg.User, msg.Data.(*UserInfo))
+ case ROOM_INFO_UPDATED:
+ ext.handler.RoomInfoUpdated(msg.Room, msg.User, msg.Data.(*RoomInfo))
+ case EVENT:
+ ext.handler.Event(msg.Data.(*Event))
+ case CACHE_PUT:
+ ext.handler.CachePut(msg.Key, msg.Value)
+ case CACHE_GET:
+ value := ext.handler.CacheGet(msg.Key)
+ ext.sendJson.Encode(&extMessage{
+ MsgType: REP_OK,
+ MsgId: msg.MsgId,
+ Value: value,
+ })
+ }
+}
+
+func (ext *External) User() UserID {
+ rep, err := ext.cmd(extMessage{
+ MsgType: GET_USER,
+ }, nil)
+ if err != nil {
+ log.Warnf("Unable to get user!")
+ return ""
+ }
+ return rep.User
+}
+
+func (ext *External) SetUserInfo(info *UserInfo) error {
+ _, err := ext.cmd(extMessage{
+ MsgType: SET_USER_INFO,
+ }, info)
+ return err
+}
+
+func (ext *External) SetRoomInfo(room RoomID, info *RoomInfo) error {
+ _, err := ext.cmd(extMessage{
+ MsgType: SET_ROOM_INFO,
+ Room: room,
+ }, info)
+ return err
+}
+
+func (ext *External) Join(room RoomID) error {
+ _, err := ext.cmd(extMessage{
+ MsgType: JOIN,
+ Room: room,
+ }, nil)
+ return err
+}
+
+func (ext *External) Invite(user UserID, room RoomID) error {
+ _, err := ext.cmd(extMessage{
+ MsgType: LEAVE,
+ User: user,
+ Room: room,
+ }, nil)
+ return err
+}
+
+func (ext *External) Leave(room RoomID) {
+ _, err := ext.cmd(extMessage{
+ MsgType: LEAVE,
+ Room: room,
+ }, nil)
+ if err != nil {
+ log.Warnf("Could not leave %s: %s", room, err.Error())
+ }
+}
+
+func (ext *External) Send(event *Event) (string, error) {
+ rep, err := ext.cmd(extMessage{
+ MsgType: SEND,
+ }, event)
+ if err != nil {
+ return "", err
+ }
+ return rep.EventId, nil
+}
diff --git a/connector/irc/irc.go b/connector/irc/irc.go
index 4e5f4fd..4d014e8 100644
--- a/connector/irc/irc.go
+++ b/connector/irc/irc.go
@@ -142,7 +142,7 @@ func (irc *IRC) SetRoomInfo(roomId RoomID, info *RoomInfo) error {
if info.Name != "" && info.Name != ch {
return fmt.Errorf("May not change IRC room name to other than %s", ch)
}
- if info.Picture != nil {
+ if MediaObject(info.Picture) != nil {
return fmt.Errorf("Room picture not supported on IRC")
}
return nil
diff --git a/connector/marshal.go b/connector/marshal.go
new file mode 100644
index 0000000..3321fb0
--- /dev/null
+++ b/connector/marshal.go
@@ -0,0 +1,99 @@
+package connector
+
+import (
+ "bytes"
+ "encoding/base64"
+ "encoding/json"
+ "fmt"
+ "io"
+)
+
+func (t EventType) MarshalText() ([]byte, error) {
+ switch t {
+ case EVENT_JOIN:
+ return []byte("join"), nil
+ case EVENT_LEAVE:
+ return []byte("leave"), nil
+ case EVENT_MESSAGE:
+ return []byte("message"), nil
+ case EVENT_ACTION:
+ return []byte("action"), nil
+ default:
+ return nil, fmt.Errorf("Invalid event type: %d", t)
+ }
+}
+
+func (t *EventType) UnmarshalText(text []byte) error {
+ switch string(text) {
+ case "join":
+ *t = EVENT_JOIN
+ return nil
+ case "leave":
+ *t = EVENT_LEAVE
+ return nil
+ case "message":
+ *t = EVENT_MESSAGE
+ return nil
+ case "action":
+ *t = EVENT_ACTION
+ return nil
+ default:
+ return fmt.Errorf("Invalid event type: %s", string(text))
+ }
+}
+
+// ----
+
+type MediaObjectJSON struct {
+ Filename string `json:"filename"`
+ Mimetype string `json:"mime_type"`
+ ImageSize *ImageSize `json:"image_size"`
+ Data string `json:"data"`
+}
+
+func (mo SMediaObject) MarshalJSON() ([]byte, error) {
+ if MediaObject(mo) == nil {
+ return []byte("null"), nil
+ }
+
+ mod := MediaObjectJSON{
+ Filename: mo.Filename(),
+ Mimetype: mo.Mimetype(),
+ ImageSize: mo.ImageSize(),
+ }
+ rd, err := mo.Read()
+ if err != nil {
+ return nil, err
+ }
+ defer rd.Close()
+ buf := bytes.NewBuffer([]byte{})
+ _, err = io.Copy(buf, rd)
+ if err != nil {
+ return nil, err
+ }
+ mod.Data = base64.StdEncoding.EncodeToString(buf.Bytes())
+ return json.Marshal(&mod)
+}
+
+func (mo *SMediaObject) UnmarshalJSON(jdata []byte) error {
+ if string(jdata) == "null" {
+ return nil
+ }
+
+ var d MediaObjectJSON
+ err := json.Unmarshal(jdata, &d)
+ if err != nil {
+ return err
+ }
+ bytes, err := base64.StdEncoding.DecodeString(d.Data)
+ if err != nil {
+ return err
+ }
+ *mo = SMediaObject{&BlobMediaObject{
+ ObjectFilename: d.Filename,
+ ObjectMimetype: d.Mimetype,
+ ObjectImageSize: d.ImageSize,
+ ObjectData: bytes,
+ }}
+ return nil
+}
diff --git a/connector/mattermost/mattermost.go b/connector/mattermost/mattermost.go
index 52eb40f..e5b5bc9 100644
--- a/connector/mattermost/mattermost.go
+++ b/connector/mattermost/mattermost.go
@@ -210,7 +210,7 @@ func (mm *Mattermost) SetRoomInfo(roomId RoomID, info *RoomInfo) error {
mm.conn.UpdateChannelHeader(ch, info.Topic)
}
- if info.Picture != nil {
+ if MediaObject(info.Picture) != nil {
err = fmt.Errorf("Not supported: channel picture on mattermost")
}
@@ -374,7 +374,7 @@ func (mm *Mattermost) initSyncChannel(ch *model.Channel) {
room_info.Name = t.Team.Name + " / " + room_info.Name
}
if t.Team.LastTeamIconUpdate > 0 {
- room_info.Picture = &LazyBlobMediaObject{
+ room_info.Picture = SMediaObject{&LazyBlobMediaObject{
ObjectFilename: fmt.Sprintf("%s-%d",
t.Team.Name,
t.Team.LastTeamIconUpdate),
@@ -388,7 +388,7 @@ func (mm *Mattermost) initSyncChannel(ch *model.Channel) {
o.ObjectMimetype = http.DetectContentType(team_img)
return nil
},
- }
+ }}
}
break
}
@@ -487,7 +487,7 @@ func (mm *Mattermost) updateUserInfo(user *model.User) {
DisplayName: userDisp,
}
if user.LastPictureUpdate > 0 {
- ui.Avatar = &LazyBlobMediaObject{
+ ui.Avatar = SMediaObject{&LazyBlobMediaObject{
ObjectFilename: fmt.Sprintf("%s-%d",
user.Username,
user.LastPictureUpdate),
@@ -501,7 +501,7 @@ func (mm *Mattermost) updateUserInfo(user *model.User) {
o.ObjectMimetype = http.DetectContentType(img)
return nil
},
- }
+ }}
}
mm.handler.UserInfoUpdated(userId, ui)
mm.caches.displayname[userId] = userDisp
@@ -567,7 +567,7 @@ func (mm *Mattermost) handlePost(channel_name string, post *model.Post, only_mes
// Handle files
if post.FileIds != nil && len(post.FileIds) > 0 {
- msg_ev.Attachments = []MediaObject{}
+ msg_ev.Attachments = []SMediaObject{}
for _, file := range post.Metadata.Files {
media_object := &LazyBlobMediaObject{
ObjectFilename: file.Name,
@@ -587,7 +587,7 @@ func (mm *Mattermost) handlePost(channel_name string, post *model.Post, only_mes
Height: file.Height,
}
}
- msg_ev.Attachments = append(msg_ev.Attachments, media_object)
+ msg_ev.Attachments = append(msg_ev.Attachments, SMediaObject{media_object})
}
}
diff --git a/connector/xmpp/xmpp.go b/connector/xmpp/xmpp.go
index efaaf64..d858317 100644
--- a/connector/xmpp/xmpp.go
+++ b/connector/xmpp/xmpp.go
@@ -272,7 +272,7 @@ func (xm *XMPP) SetRoomInfo(roomId RoomID, info *RoomInfo) error {
}
}
- if info.Picture != nil {
+ if MediaObject(info.Picture) != nil {
// TODO
return fmt.Errorf("Room picture change not implemented on xmpp")
}