aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-03-01 13:27:29 +0100
committerAlex Auvolat <alex@adnab.me>2020-03-01 13:27:29 +0100
commit018f4a751ac4bff9113874666a92b4c5d8679af3 (patch)
tree41670a7c39287344562d2992146cd9c3d6bc94f8
parent810e75a34dddd88279bea1cd2ea38816fe872d52 (diff)
downloadeasybridge-018f4a751ac4bff9113874666a92b4c5d8679af3.tar.gz
easybridge-018f4a751ac4bff9113874666a92b4c5d8679af3.zip
Read backlog; handle messages in the correct order
-rw-r--r--connector/external/external.go17
-rwxr-xr-xexternal/messenger.py36
2 files changed, 51 insertions, 2 deletions
diff --git a/connector/external/external.go b/connector/external/external.go
index 741802c..9a4137d 100644
--- a/connector/external/external.go
+++ b/connector/external/external.go
@@ -86,6 +86,7 @@ type External struct {
generation int
proc *exec.Cmd
+ handlerChan chan *extMessageWithData
counter uint64
inflightRequests map[uint64]chan *extMessageWithData
lock sync.Mutex
@@ -110,6 +111,9 @@ func (ext *External) Configure(c Configuration) error {
ext.generation += 1
+ ext.handlerChan = make(chan *extMessageWithData)
+ go ext.handlerLoop(ext.generation)
+
err = ext.setupProc()
if err != nil {
return err
@@ -254,7 +258,17 @@ func (ext *External) recvLoop() {
}
}()
} else {
- go ext.handleCmd(&msg)
+ ext.handlerChan <- &msg
+ }
+ }
+}
+
+func (ext *External) handlerLoop(generation int) {
+ for ext.handlerChan != nil && ext.generation == generation {
+ select {
+ case msg := <-ext.handlerChan:
+ ext.handleCmd(msg)
+ case <-time.After(10 * time.Second):
}
}
}
@@ -311,6 +325,7 @@ func (ext *External) Close() {
ext.recv = nil
ext.send = nil
ext.sendJson = nil
+ ext.handlerChan = nil
go func() {
time.Sleep(10 * time.Second)
diff --git a/external/messenger.py b/external/messenger.py
index 6ec6e9c..792a356 100755
--- a/external/messenger.py
+++ b/external/messenger.py
@@ -141,7 +141,25 @@ class InitialSyncThread(threading.Thread):
})
def backlog_room(self, thread):
- pass # TODO
+ prev_last_seen = self.bridge.cache_get("last_seen_%s"%thread.uid)
+ if prev_last_seen == "":
+ messages = self.client.fetchThreadMessages(thread.uid, limit=100)
+ else:
+ messages = []
+ found = False
+ while not Found:
+ before = None
+ if len(messages) > 0:
+ before = messages[-1].timestamp
+ page = self.client.fetchThreadMessages(thread.uid, before=before, limit=20)
+ for m in page:
+ if m.uid == prev_last_seen:
+ found = True
+ break
+ else:
+ messages.append(m)
+ for m in reversed(messages):
+ self.bridge.handleMessage(thread, m)
@@ -150,11 +168,13 @@ class InitialSyncThread(threading.Thread):
class MessengerBridge:
def __init__(self):
self.rev_uid = {}
+ self.uid_map = {}
def getUserId(self, user):
if user.url is not None and not "?" in user.url:
user_id = user.url.split("/")[-1]
self.rev_uid[user_id] = user.uid
+ self.uid_map[user.uid] = user_id
return user_id
else:
return user.uid
@@ -274,6 +294,20 @@ class MessengerBridge:
del self.cache_gets[num]
return rep
+ def handleMessage(self, thread, m):
+ author = m.author
+ if author in self.uid_map:
+ author = self.uid_map[m.author]
+
+ event = {
+ "type": EVENT_MESSAGE,
+ "author": author,
+ "text": m.text,
+ }
+ if thread.type == ThreadType.GROUP:
+ event["room"] = thread.uid
+ self.write({"_type": EVENT, "data": event})
+
if __name__ == "__main__":
bridge = MessengerBridge()