diff options
Diffstat (limited to 'connector')
-rw-r--r-- | connector/connector.go | 32 | ||||
-rw-r--r-- | connector/external/config.go | 50 | ||||
-rw-r--r-- | connector/external/external.go | 387 | ||||
-rw-r--r-- | connector/irc/irc.go | 2 | ||||
-rw-r--r-- | connector/marshal.go | 99 | ||||
-rw-r--r-- | connector/mattermost/mattermost.go | 14 | ||||
-rw-r--r-- | connector/xmpp/xmpp.go | 2 |
7 files changed, 563 insertions, 23 deletions
diff --git a/connector/connector.go b/connector/connector.go index e8c382d..210d4ad 100644 --- a/connector/connector.go +++ b/connector/connector.go @@ -109,48 +109,48 @@ const ( ) type Event struct { - Type EventType + Type EventType `json:"type"` // If non-empty, the event Id is used to deduplicate events in a channel // This is usefull for backends that provide a backlog of channel messages // when (re-)joining a room - Id string + Id string `json:"id"` // UserID of the user that sent the event // If this is a direct message event, this event can only have been authored // by the user we are talking to (and not by ourself) - Author UserID + Author UserID `json:"author"` // UserID of the targetted user in the case of a direct message, // empty if targetting a room - Recipient UserID + Recipient UserID `json:"recipient"` // RoomID of the room where the event happenned or of the targetted room, // or empty string if it happenned by direct message - Room RoomID + Room RoomID `json:"room"` // Message text or action text - Text string + Text string `json:"text` // Attached files such as images - Attachments []MediaObject + Attachments []SMediaObject `json:"attachments"` } type UserInfo struct { - DisplayName string + DisplayName string `json:"display_name"` // If non-empty, the Filename of the avatar object will be used by Easybridge // to deduplicate the update events and prevent needless reuploads. // Example strategy that works for the mattermost backend: use the update timestamp as fictious file name - Avatar MediaObject + Avatar SMediaObject `json:"avatar"` } type RoomInfo struct { - Name string - Topic string + Name string `json:"name"` + Topic string `json:"topic"` // Same deduplication comment as for UserInfo.Avatar - Picture MediaObject + Picture SMediaObject `json:"picture"` } type MediaObject interface { @@ -169,7 +169,11 @@ type MediaObject interface { URL() string } +type SMediaObject struct { + MediaObject +} + type ImageSize struct { - Width int - Height int + Width int `json:"width"` + Height int `json:"height"` } diff --git a/connector/external/config.go b/connector/external/config.go new file mode 100644 index 0000000..caa2b7e --- /dev/null +++ b/connector/external/config.go @@ -0,0 +1,50 @@ +package external + +import ( + . "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector" +) + +const DUMMYEXT_PROTOCOL = "DummyExt" +const MESSENGER_PROTOCOL = "Messenger" + +func init() { + Register(DUMMYEXT_PROTOCOL, Protocol{ + NewConnector: func() Connector { + return &External{ + protocol: DUMMYEXT_PROTOCOL, + command: "./external/dummy.py", + debug: true, + } + }, + Schema: ConfigSchema{ + &ConfigEntry{ + Name: "user", + Description: "Username", + Required: true, + }, + }, + }) + + Register(MESSENGER_PROTOCOL, Protocol{ + NewConnector: func() Connector { + return &External{ + protocol: MESSENGER_PROTOCOL, + command: "./external/messenger.py", + debug: true, + } + }, + Schema: ConfigSchema{ + &ConfigEntry{ + Name: "email", + Description: "Email address", + Required: true, + }, + &ConfigEntry{ + Name: "password", + Description: "Password", + IsPassword: true, + Required: true, + }, + }, + }) +} diff --git a/connector/external/external.go b/connector/external/external.go new file mode 100644 index 0000000..e64ff78 --- /dev/null +++ b/connector/external/external.go @@ -0,0 +1,387 @@ +package external + +import ( + "encoding/json" + "fmt" + "io" + "os" + "os/exec" + "strings" + "sync" + "sync/atomic" + "time" + + log "github.com/sirupsen/logrus" + + . "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector" +) + +// Serialization protocol + +type extMessage struct { + // Header: message type and identifier + MsgType string `json:"_type"` + MsgId uint64 `json:"_id"` + + // Message fields + Key string `json:"key"` + Value string `json:"value"` + EventId string `json:"event_id"` + Error string `json:"error"` + Room RoomID `json:"room"` + User UserID `json:"user"` +} + +type extMessageWithData struct { + extMessage + + Data interface{} `json:"data"` +} + +// Possible values for MsgType +const ( + // ezbr -> external + CONFIGURE = "configure" + GET_USER = "get_user" + SET_USER_INFO = "set_user_info" + SET_ROOM_INFO = "set_room_info" + JOIN = "join" + INVITE = "invite" + LEAVE = "leave" + SEND = "send" + CLOSE = "close" + + // external -> ezbr + JOINED = "joined" + LEFT = "left" + USER_INFO_UPDATED = "user_info_updated" + ROOM_INFO_UPDATED = "room_info_updated" + EVENT = "event" + CACHE_PUT = "cache_put" + CACHE_GET = "cache_get" + + // reply messages + // ezbr -> external: all must wait for a reply! + // external -> ezbr: only CACHE_GET produces a reply + REP_OK = "rep_ok" + REP_ERROR = "rep_error" +) + +// ---- + +type External struct { + handler Handler + + protocol string + command string + debug bool + + config Configuration + + recv io.Reader + send io.Writer + sendJson *json.Encoder + + generation int + proc *exec.Cmd + + counter uint64 + inflightRequests map[uint64]chan *extMessageWithData + lock sync.Mutex +} + +func (ext *External) SetHandler(h Handler) { + ext.handler = h +} + +func (ext *External) Protocol() string { + return ext.protocol +} + +func (ext *External) Configure(c Configuration) error { + var err error + + if ext.proc != nil { + ext.Close() + } + + ext.inflightRequests = map[uint64]chan *extMessageWithData{} + + ext.generation += 1 + + err = ext.setupProc() + if err != nil { + return err + } + + go ext.restartLoop(ext.generation) + + _, err = ext.cmd(extMessage{ + MsgType: CONFIGURE, + }, c) + if err != nil { + return err + } + + return nil +} + +// ---- Process management and communication logic + +func (ext *External) setupProc() error { + var err error + + ext.proc = exec.Command(ext.command) + + ext.recv, err = ext.proc.StdoutPipe() + if err != nil { + return err + } + ext.send, err = ext.proc.StdinPipe() + if err != nil { + return err + } + + if ext.debug { + ext.recv = io.TeeReader(ext.recv, os.Stderr) + ext.send = io.MultiWriter(ext.send, os.Stderr) + } + + ext.sendJson = json.NewEncoder(ext.send) + + ext.proc.Stderr = os.Stderr + + err = ext.proc.Start() + if err != nil { + return err + } + + go ext.recvLoop() + return nil +} + +func (ext *External) restartLoop(generation int) { + for { + if ext.proc == nil { + break + } + ext.proc.Wait() + if ext.generation != generation { + break + } + log.Printf("Process %s stopped, restarting.", ext.command) + err := ext.setupProc() + if err != nil { + ext.proc = nil + log.Warnf("Unable to restart %s: %s", ext.command, err) + break + } + } +} + +func (m *extMessageWithData) DecodeJSON(jj []byte) error { + var c extMessage + + err := json.Unmarshal(jj, &c) + if err != nil { + return err + } + *m = extMessageWithData{extMessage: c} + switch c.MsgType { + case USER_INFO_UPDATED: + var ui UserInfo + err := json.Unmarshal(jj, &ui) + if err != nil { + return err + } + m.Data = &ui + case ROOM_INFO_UPDATED: + var ri RoomInfo + err := json.Unmarshal(jj, &ri) + if err != nil { + return err + } + m.Data = &ri + case EVENT: + var ev Event + err := json.Unmarshal(jj, &ev) + if err != nil { + return err + } + m.Data = &ev + } + return nil +} + +func (ext *External) recvLoop() { + reader := json.NewDecoder(ext.recv) + for { + var msg extMessageWithData + err := reader.Decode(&msg) + if err != nil { + log.Warnf("Failed to decode from %s: %s. Stopping reading.", ext.command, err) + break + } + + if strings.HasPrefix(msg.MsgType, "rep_") { + func() { + ext.lock.Lock() + defer ext.lock.Unlock() + if ch, ok := ext.inflightRequests[msg.MsgId]; ok { + ch <- &msg + delete(ext.inflightRequests, msg.MsgId) + } + }() + } else { + ext.handleCmd(&msg) + } + } +} + +func (ext *External) cmd(msg extMessage, data interface{}) (*extMessageWithData, error) { + msg_id := atomic.AddUint64(&ext.counter, 1) + + msg.MsgId = msg_id + + fullMsg := extMessageWithData{ + extMessage: msg, + Data: data, + } + + ch := make(chan *extMessageWithData) + + func() { + ext.lock.Lock() + defer ext.lock.Unlock() + ext.inflightRequests[msg_id] = ch + }() + + defer func() { + ext.lock.Lock() + defer ext.lock.Unlock() + delete(ext.inflightRequests, msg_id) + }() + + err := ext.sendJson.Encode(&fullMsg) + if err != nil { + return nil, err + } + + select { + case rep := <-ch: + if rep.MsgType == REP_ERROR { + return nil, fmt.Errorf("%s: %s", msg.MsgType, rep.Error) + } else { + return rep, nil + } + case <-time.After(5 * time.Second): + return nil, fmt.Errorf("(%s) timeout", msg.MsgType) + } +} + +func (ext *External) Close() { + ext.sendJson.Encode(&extMessage{ + MsgType: CLOSE, + }) + ext.generation += 1 + + proc := ext.proc + ext.proc = nil + ext.recv = nil + ext.send = nil + ext.sendJson = nil + + go func() { + time.Sleep(10 * time.Second) + proc.Process.Kill() + }() +} + +// ---- Actual message handling :) + +func (ext *External) handleCmd(msg *extMessageWithData) { + switch msg.MsgType { + case JOINED: + ext.handler.Joined(msg.Room) + case LEFT: + ext.handler.Left(msg.Room) + case USER_INFO_UPDATED: + ext.handler.UserInfoUpdated(msg.User, msg.Data.(*UserInfo)) + case ROOM_INFO_UPDATED: + ext.handler.RoomInfoUpdated(msg.Room, msg.User, msg.Data.(*RoomInfo)) + case EVENT: + ext.handler.Event(msg.Data.(*Event)) + case CACHE_PUT: + ext.handler.CachePut(msg.Key, msg.Value) + case CACHE_GET: + value := ext.handler.CacheGet(msg.Key) + ext.sendJson.Encode(&extMessage{ + MsgType: REP_OK, + MsgId: msg.MsgId, + Value: value, + }) + } +} + +func (ext *External) User() UserID { + rep, err := ext.cmd(extMessage{ + MsgType: GET_USER, + }, nil) + if err != nil { + log.Warnf("Unable to get user!") + return "" + } + return rep.User +} + +func (ext *External) SetUserInfo(info *UserInfo) error { + _, err := ext.cmd(extMessage{ + MsgType: SET_USER_INFO, + }, info) + return err +} + +func (ext *External) SetRoomInfo(room RoomID, info *RoomInfo) error { + _, err := ext.cmd(extMessage{ + MsgType: SET_ROOM_INFO, + Room: room, + }, info) + return err +} + +func (ext *External) Join(room RoomID) error { + _, err := ext.cmd(extMessage{ + MsgType: JOIN, + Room: room, + }, nil) + return err +} + +func (ext *External) Invite(user UserID, room RoomID) error { + _, err := ext.cmd(extMessage{ + MsgType: LEAVE, + User: user, + Room: room, + }, nil) + return err +} + +func (ext *External) Leave(room RoomID) { + _, err := ext.cmd(extMessage{ + MsgType: LEAVE, + Room: room, + }, nil) + if err != nil { + log.Warnf("Could not leave %s: %s", room, err.Error()) + } +} + +func (ext *External) Send(event *Event) (string, error) { + rep, err := ext.cmd(extMessage{ + MsgType: SEND, + }, event) + if err != nil { + return "", err + } + return rep.EventId, nil +} diff --git a/connector/irc/irc.go b/connector/irc/irc.go index 4e5f4fd..4d014e8 100644 --- a/connector/irc/irc.go +++ b/connector/irc/irc.go @@ -142,7 +142,7 @@ func (irc *IRC) SetRoomInfo(roomId RoomID, info *RoomInfo) error { if info.Name != "" && info.Name != ch { return fmt.Errorf("May not change IRC room name to other than %s", ch) } - if info.Picture != nil { + if MediaObject(info.Picture) != nil { return fmt.Errorf("Room picture not supported on IRC") } return nil diff --git a/connector/marshal.go b/connector/marshal.go new file mode 100644 index 0000000..3321fb0 --- /dev/null +++ b/connector/marshal.go @@ -0,0 +1,99 @@ +package connector + +import ( + "bytes" + "encoding/base64" + "encoding/json" + "fmt" + "io" +) + +func (t EventType) MarshalText() ([]byte, error) { + switch t { + case EVENT_JOIN: + return []byte("join"), nil + case EVENT_LEAVE: + return []byte("leave"), nil + case EVENT_MESSAGE: + return []byte("message"), nil + case EVENT_ACTION: + return []byte("action"), nil + default: + return nil, fmt.Errorf("Invalid event type: %d", t) + } +} + +func (t *EventType) UnmarshalText(text []byte) error { + switch string(text) { + case "join": + *t = EVENT_JOIN + return nil + case "leave": + *t = EVENT_LEAVE + return nil + case "message": + *t = EVENT_MESSAGE + return nil + case "action": + *t = EVENT_ACTION + return nil + default: + return fmt.Errorf("Invalid event type: %s", string(text)) + } +} + +// ---- + +type MediaObjectJSON struct { + Filename string `json:"filename"` + Mimetype string `json:"mime_type"` + ImageSize *ImageSize `json:"image_size"` + Data string `json:"data"` +} + +func (mo SMediaObject) MarshalJSON() ([]byte, error) { + if MediaObject(mo) == nil { + return []byte("null"), nil + } + + mod := MediaObjectJSON{ + Filename: mo.Filename(), + Mimetype: mo.Mimetype(), + ImageSize: mo.ImageSize(), + } + rd, err := mo.Read() + if err != nil { + return nil, err + } + defer rd.Close() + buf := bytes.NewBuffer([]byte{}) + _, err = io.Copy(buf, rd) + if err != nil { + return nil, err + } + mod.Data = base64.StdEncoding.EncodeToString(buf.Bytes()) + return json.Marshal(&mod) +} + +func (mo *SMediaObject) UnmarshalJSON(jdata []byte) error { + if string(jdata) == "null" { + return nil + } + + var d MediaObjectJSON + err := json.Unmarshal(jdata, &d) + if err != nil { + return err + } + bytes, err := base64.StdEncoding.DecodeString(d.Data) + if err != nil { + return err + } + *mo = SMediaObject{&BlobMediaObject{ + ObjectFilename: d.Filename, + ObjectMimetype: d.Mimetype, + ObjectImageSize: d.ImageSize, + ObjectData: bytes, + }} + return nil +} diff --git a/connector/mattermost/mattermost.go b/connector/mattermost/mattermost.go index 52eb40f..e5b5bc9 100644 --- a/connector/mattermost/mattermost.go +++ b/connector/mattermost/mattermost.go @@ -210,7 +210,7 @@ func (mm *Mattermost) SetRoomInfo(roomId RoomID, info *RoomInfo) error { mm.conn.UpdateChannelHeader(ch, info.Topic) } - if info.Picture != nil { + if MediaObject(info.Picture) != nil { err = fmt.Errorf("Not supported: channel picture on mattermost") } @@ -374,7 +374,7 @@ func (mm *Mattermost) initSyncChannel(ch *model.Channel) { room_info.Name = t.Team.Name + " / " + room_info.Name } if t.Team.LastTeamIconUpdate > 0 { - room_info.Picture = &LazyBlobMediaObject{ + room_info.Picture = SMediaObject{&LazyBlobMediaObject{ ObjectFilename: fmt.Sprintf("%s-%d", t.Team.Name, t.Team.LastTeamIconUpdate), @@ -388,7 +388,7 @@ func (mm *Mattermost) initSyncChannel(ch *model.Channel) { o.ObjectMimetype = http.DetectContentType(team_img) return nil }, - } + }} } break } @@ -487,7 +487,7 @@ func (mm *Mattermost) updateUserInfo(user *model.User) { DisplayName: userDisp, } if user.LastPictureUpdate > 0 { - ui.Avatar = &LazyBlobMediaObject{ + ui.Avatar = SMediaObject{&LazyBlobMediaObject{ ObjectFilename: fmt.Sprintf("%s-%d", user.Username, user.LastPictureUpdate), @@ -501,7 +501,7 @@ func (mm *Mattermost) updateUserInfo(user *model.User) { o.ObjectMimetype = http.DetectContentType(img) return nil }, - } + }} } mm.handler.UserInfoUpdated(userId, ui) mm.caches.displayname[userId] = userDisp @@ -567,7 +567,7 @@ func (mm *Mattermost) handlePost(channel_name string, post *model.Post, only_mes // Handle files if post.FileIds != nil && len(post.FileIds) > 0 { - msg_ev.Attachments = []MediaObject{} + msg_ev.Attachments = []SMediaObject{} for _, file := range post.Metadata.Files { media_object := &LazyBlobMediaObject{ ObjectFilename: file.Name, @@ -587,7 +587,7 @@ func (mm *Mattermost) handlePost(channel_name string, post *model.Post, only_mes Height: file.Height, } } - msg_ev.Attachments = append(msg_ev.Attachments, media_object) + msg_ev.Attachments = append(msg_ev.Attachments, SMediaObject{media_object}) } } diff --git a/connector/xmpp/xmpp.go b/connector/xmpp/xmpp.go index efaaf64..d858317 100644 --- a/connector/xmpp/xmpp.go +++ b/connector/xmpp/xmpp.go @@ -272,7 +272,7 @@ func (xm *XMPP) SetRoomInfo(roomId RoomID, info *RoomInfo) error { } } - if info.Picture != nil { + if MediaObject(info.Picture) != nil { // TODO return fmt.Errorf("Room picture change not implemented on xmpp") } |