diff options
-rw-r--r-- | account.go | 16 | ||||
-rw-r--r-- | connector/external/external.go | 48 | ||||
-rw-r--r-- | connector/irc/irc.go | 2 | ||||
-rw-r--r-- | connector/marshal.go | 78 | ||||
-rw-r--r-- | connector/mattermost/mattermost.go | 2 | ||||
-rw-r--r-- | connector/mediaobject.go | 38 | ||||
-rw-r--r-- | connector/xmpp/xmpp.go | 2 | ||||
-rwxr-xr-x | external/messenger.py | 85 |
8 files changed, 190 insertions, 81 deletions
@@ -293,12 +293,14 @@ func (a *Account) userInfoUpdatedInternal(user UserID, info *UserInfo) error { } } - if MediaObject(info.Avatar) != nil { + if info.Avatar.MediaObject != nil { cache_key := fmt.Sprintf("%s/user_avatar/%s", a.Protocol, user) cache_val := info.Avatar.Filename() - if cache_val == "" || dbKvTestAndSet(cache_key, cache_val) { + if cache_val == "" || dbKvGet(cache_key) != cache_val { err2 := mx.ProfileAvatar(mx_user_id, info.Avatar) - if err2 != nil { + if err2 == nil { + dbKvPut(cache_key, cache_val) + } else { err = err2 } } @@ -347,12 +349,14 @@ func (a *Account) roomInfoUpdatedInternal(roomId RoomID, author UserID, info *Ro } } - if MediaObject(info.Picture) != nil { + if info.Picture.MediaObject != nil { cache_key := fmt.Sprintf("%s/room_picture/%s", a.Protocol, roomId) cache_val := info.Picture.Filename() - if cache_val == "" || dbKvTestAndSet(cache_key, cache_val) { + if cache_val == "" || dbKvGet(cache_key) != cache_val { err2 := mx.RoomAvatarAs(mx_room_id, info.Picture, as_mxid) - if err2 != nil { + if err2 == nil { + dbKvPut(cache_key, cache_val) + } else { err = err2 } } diff --git a/connector/external/external.go b/connector/external/external.go index e64ff78..26f3c40 100644 --- a/connector/external/external.go +++ b/connector/external/external.go @@ -1,6 +1,7 @@ package external import ( + "bufio" "encoding/json" "fmt" "io" @@ -179,7 +180,7 @@ func (ext *External) restartLoop(generation int) { } } -func (m *extMessageWithData) DecodeJSON(jj []byte) error { +func (m *extMessageWithData) UnmarshalJSON(jj []byte) error { var c extMessage err := json.Unmarshal(jj, &c) @@ -189,40 +190,59 @@ func (m *extMessageWithData) DecodeJSON(jj []byte) error { *m = extMessageWithData{extMessage: c} switch c.MsgType { case USER_INFO_UPDATED: - var ui UserInfo + var ui struct { + Data UserInfo `json:"data"` + } err := json.Unmarshal(jj, &ui) if err != nil { return err } - m.Data = &ui + m.Data = &ui.Data + return nil case ROOM_INFO_UPDATED: - var ri RoomInfo + var ri struct { + Data RoomInfo `json:"data"` + } err := json.Unmarshal(jj, &ri) if err != nil { return err } - m.Data = &ri + m.Data = &ri.Data + return nil case EVENT: - var ev Event + var ev struct { + Data Event `json:"data"` + } err := json.Unmarshal(jj, &ev) if err != nil { return err } - m.Data = &ev + m.Data = &ev.Data + return nil + case JOINED, LEFT, CACHE_PUT, CACHE_GET, REP_OK, REP_ERROR: + return nil + default: + return fmt.Errorf("Invalid message type for message from external program: '%s'", c.MsgType) } - return nil + } func (ext *External) recvLoop() { - reader := json.NewDecoder(ext.recv) - for { + scanner := bufio.NewScanner(ext.recv) + for scanner.Scan() { var msg extMessageWithData - err := reader.Decode(&msg) + err := json.Unmarshal(scanner.Bytes(), &msg) if err != nil { - log.Warnf("Failed to decode from %s: %s. Stopping reading.", ext.command, err) + log.Warnf("Failed to decode from %s: %s. Skipping line.", ext.command, err.Error()) + continue + } + + if scanner.Err() != nil { + log.Warnf("Failed to read from %s: %s. Stopping here.", ext.command, scanner.Err().Error()) break } + log.Debugf("GOT MESSAGE: %#v %#v", msg, msg.Data) if strings.HasPrefix(msg.MsgType, "rep_") { func() { ext.lock.Lock() @@ -233,7 +253,7 @@ func (ext *External) recvLoop() { } }() } else { - ext.handleCmd(&msg) + go ext.handleCmd(&msg) } } } @@ -328,7 +348,7 @@ func (ext *External) User() UserID { MsgType: GET_USER, }, nil) if err != nil { - log.Warnf("Unable to get user!") + log.Warnf("Unable to get user! %s", err.Error()) return "" } return rep.User diff --git a/connector/irc/irc.go b/connector/irc/irc.go index 4d014e8..bf36dfe 100644 --- a/connector/irc/irc.go +++ b/connector/irc/irc.go @@ -142,7 +142,7 @@ func (irc *IRC) SetRoomInfo(roomId RoomID, info *RoomInfo) error { if info.Name != "" && info.Name != ch { return fmt.Errorf("May not change IRC room name to other than %s", ch) } - if MediaObject(info.Picture) != nil { + if info.Picture.MediaObject != nil { return fmt.Errorf("Room picture not supported on IRC") } return nil diff --git a/connector/marshal.go b/connector/marshal.go index 3321fb0..065c955 100644 --- a/connector/marshal.go +++ b/connector/marshal.go @@ -6,18 +6,28 @@ import ( "encoding/json" "fmt" "io" + "io/ioutil" + "net/http" + "strings" +) + +const ( + S_EVENT_JOIN = "join" + S_EVENT_LEAVE = "leave" + S_EVENT_MESSAGE = "message" + S_EVENT_ACTION = "action" ) func (t EventType) MarshalText() ([]byte, error) { switch t { case EVENT_JOIN: - return []byte("join"), nil + return []byte(S_EVENT_JOIN), nil case EVENT_LEAVE: - return []byte("leave"), nil + return []byte(S_EVENT_LEAVE), nil case EVENT_MESSAGE: - return []byte("message"), nil + return []byte(S_EVENT_MESSAGE), nil case EVENT_ACTION: - return []byte("action"), nil + return []byte(S_EVENT_ACTION), nil default: return nil, fmt.Errorf("Invalid event type: %d", t) } @@ -25,16 +35,16 @@ func (t EventType) MarshalText() ([]byte, error) { func (t *EventType) UnmarshalText(text []byte) error { switch string(text) { - case "join": + case S_EVENT_JOIN: *t = EVENT_JOIN return nil - case "leave": + case S_EVENT_LEAVE: *t = EVENT_LEAVE return nil - case "message": + case S_EVENT_MESSAGE: *t = EVENT_MESSAGE return nil - case "action": + case S_EVENT_ACTION: *t = EVENT_ACTION return nil default: @@ -47,31 +57,40 @@ func (t *EventType) UnmarshalText(text []byte) error { type MediaObjectJSON struct { Filename string `json:"filename"` Mimetype string `json:"mime_type"` + Size int64 `json:"size"` ImageSize *ImageSize `json:"image_size"` Data string `json:"data"` + URL string `json:"url"` } func (mo SMediaObject) MarshalJSON() ([]byte, error) { - if MediaObject(mo) == nil { + if mo.MediaObject == nil { return []byte("null"), nil } mod := MediaObjectJSON{ Filename: mo.Filename(), Mimetype: mo.Mimetype(), + Size: mo.Size(), ImageSize: mo.ImageSize(), + URL: mo.URL(), } - rd, err := mo.Read() - if err != nil { - return nil, err - } - defer rd.Close() - buf := bytes.NewBuffer([]byte{}) - _, err = io.Copy(buf, rd) - if err != nil { - return nil, err + + if mod.URL == "" { + // If we don't have a URL, the only way is to pass the blob itself + rd, err := mo.Read() + if err != nil { + return nil, err + } + defer rd.Close() + buf := bytes.NewBuffer([]byte{}) + _, err = io.Copy(buf, rd) + if err != nil { + return nil, err + } + mod.Data = base64.StdEncoding.EncodeToString(buf.Bytes()) } - mod.Data = base64.StdEncoding.EncodeToString(buf.Bytes()) + return json.Marshal(&mod) } @@ -85,6 +104,27 @@ func (mo *SMediaObject) UnmarshalJSON(jdata []byte) error { if err != nil { return err } + + if d.URL != "" { + *mo = SMediaObject{&LazyBlobMediaObject{ + ObjectFilename: d.Filename, + ObjectMimetype: d.Mimetype, + ObjectImageSize: d.ImageSize, + GetFn: func(o *LazyBlobMediaObject) error { + resp, err := http.Get(d.URL) + if err != nil { + return err + } + if o.ObjectMimetype == "" { + o.ObjectMimetype = strings.Join(resp.Header["Content-Type"], "") + } + o.ObjectData, err = ioutil.ReadAll(resp.Body) + return err + }, + }} + return nil + } + bytes, err := base64.StdEncoding.DecodeString(d.Data) if err != nil { return err diff --git a/connector/mattermost/mattermost.go b/connector/mattermost/mattermost.go index e5b5bc9..d39768e 100644 --- a/connector/mattermost/mattermost.go +++ b/connector/mattermost/mattermost.go @@ -210,7 +210,7 @@ func (mm *Mattermost) SetRoomInfo(roomId RoomID, info *RoomInfo) error { mm.conn.UpdateChannelHeader(ch, info.Topic) } - if MediaObject(info.Picture) != nil { + if info.Picture.MediaObject != nil { err = fmt.Errorf("Not supported: channel picture on mattermost") } diff --git a/connector/mediaobject.go b/connector/mediaobject.go index f904459..d66e245 100644 --- a/connector/mediaobject.go +++ b/connector/mediaobject.go @@ -57,44 +57,6 @@ func (m *FileMediaObject) URL() string { // ---- -type UrlMediaObject struct { - ObjectFilename string - ObjectSize int64 - ObjectMimetype string - ObjectURL string - ObjectImageSize *ImageSize -} - -func (m *UrlMediaObject) Filename() string { - return m.ObjectFilename -} - -func (m *UrlMediaObject) Size() int64 { - return m.ObjectSize -} - -func (m *UrlMediaObject) Mimetype() string { - return m.ObjectMimetype -} - -func (m *UrlMediaObject) ImageSize() *ImageSize { - return m.ObjectImageSize -} - -func (m *UrlMediaObject) Read() (io.ReadCloser, error) { - resp, err := http.Get(m.ObjectURL) - if err != nil { - return nil, err - } - return resp.Body, nil -} - -func (m *UrlMediaObject) URL() string { - return m.ObjectURL -} - -// ---- - type BlobMediaObject struct { ObjectFilename string ObjectMimetype string diff --git a/connector/xmpp/xmpp.go b/connector/xmpp/xmpp.go index d858317..1f77fdf 100644 --- a/connector/xmpp/xmpp.go +++ b/connector/xmpp/xmpp.go @@ -272,7 +272,7 @@ func (xm *XMPP) SetRoomInfo(roomId RoomID, info *RoomInfo) error { } } - if MediaObject(info.Picture) != nil { + if info.Picture.MediaObject != nil { // TODO return fmt.Errorf("Room picture change not implemented on xmpp") } diff --git a/external/messenger.py b/external/messenger.py index 495bca9..8403a03 100755 --- a/external/messenger.py +++ b/external/messenger.py @@ -2,10 +2,13 @@ import sys import json +import signal +import threading import hashlib -import gevent + import fbchat +from fbchat.models import * # ---- MESSAGE TYPES ---- @@ -35,21 +38,96 @@ CACHE_GET = "cache_get" REP_OK = "rep_ok" REP_ERROR = "rep_error" +# Event types +EVENT_JOIN = "join" +EVENT_LEAVE = "leave" +EVENT_MESSAGE = "message" +EVENT_ACTION = "action" + # ---- MESSENGER CLIENT CLASS THAT HANDLES EVENTS ---- +def getUserId(user): + if user.url is not None and not "?" in user.url: + return user.url.split("/")[-1] + else: + return user.uid + +def mediaObjectOfURL(url): + return { + "filename": url.split("?")[0].split("/")[-1], + "url": url, + } + + class MessengerBridgeClient(fbchat.Client): def __init__(self, bridge, *args, **kwargs): self.bridge = bridge super(MessengerBridgeClient, self).__init__(*args, **kwargs) +class InitialSyncThread(threading.Thread): + def __init__(self, client, bridge, *args, **kwargs): + super(InitialSyncThread, self).__init__(*args, **kwargs) + + self.client = client + self.bridge = bridge + + def run(self): + threads = self.client.fetchThreadList() + sys.stderr.write("fb thread list: {}\n".format(threads)) + for thread in threads: + sys.stderr.write("fb thread: {}\n".format(thread)) + if thread.type != ThreadType.GROUP: + continue + self.bridge.write({ + "_type": JOINED, + "room": thread.uid, + }) + + room_info = { + "name": thread.name, + } + if thread.photo is not None: + room_info["picture"] = mediaObjectOfURL(thread.photo) + self.bridge.write({ + "_type": ROOM_INFO_UPDATED, + "room": thread.uid, + "data": room_info, + }) + + members = self.client.fetchAllUsersFromThreads([thread]) + for member in members: + sys.stderr.write("fb thread member: {}\n".format(member)) + self.bridge.write({ + "_type": EVENT, + "data": { + "type": EVENT_JOIN, + "author": getUserId(member), + "room": thread.uid, + } + }) + + user_info = { + "display_name": member.name, + } + if member.photo is not None: + user_info["avatar"] = mediaObjectOfURL(member.photo) + self.bridge.write({ + "_type": USER_INFO_UPDATED, + "user": getUserId(member), + "data": user_info, + }) + # TODO: handle events # ---- MAIN LOOP THAT HANDLES REQUESTS FROM BRIDGE ---- class MessengerBridge: + def __init__(self): + pass + def run(self): self.client = None self.keep_running = True @@ -104,10 +182,15 @@ class MessengerBridge: except: pass + InitialSyncThread(self.client, self).start() + elif ty == CLOSE: self.client.logout() self.keep_running = False + elif ty == GET_USER: + return {"_type": REP_OK, "user": self.client.uid} + else: return {"_type": REP_ERROR, "error": "Not implemented"} |