aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-03-01 20:38:00 +0100
committerAlex Auvolat <alex@adnab.me>2020-03-01 20:38:00 +0100
commitd5d74b0b73590fcee103f3beea7ee5ff7ca83653 (patch)
tree67615702804f1d76f76bdc9a3e46062d57fa3f87
parentd2791094d90359c84c6bf103e6ca36ce9b96ee20 (diff)
downloadeasybridge-d5d74b0b73590fcee103f3beea7ee5ff7ca83653.tar.gz
easybridge-d5d74b0b73590fcee103f3beea7ee5ff7ca83653.zip
Messenger: hopefully handle joins correctly
-rw-r--r--connector/external/external.go2
-rwxr-xr-xexternal/messenger.py179
2 files changed, 114 insertions, 67 deletions
diff --git a/connector/external/external.go b/connector/external/external.go
index 9a4137d..6d25230 100644
--- a/connector/external/external.go
+++ b/connector/external/external.go
@@ -111,7 +111,7 @@ func (ext *External) Configure(c Configuration) error {
ext.generation += 1
- ext.handlerChan = make(chan *extMessageWithData)
+ ext.handlerChan = make(chan *extMessageWithData, 1000)
go ext.handlerLoop(ext.generation)
err = ext.setupProc()
diff --git a/external/messenger.py b/external/messenger.py
index cb48410..9728c9d 100755
--- a/external/messenger.py
+++ b/external/messenger.py
@@ -95,52 +95,12 @@ class InitialSyncThread(threading.Thread):
self.threads = threads
def run(self):
- sys.stderr.write("fb thread list: {}\n".format(self.threads))
+ sys.stderr.write("(python) fb thread list: {}\n".format(self.threads))
for thread in self.threads:
- sys.stderr.write("fb thread: {}\n".format(thread))
- if thread.type == ThreadType.GROUP:
- members = self.client.fetchAllUsersFromThreads([thread])
+ sys.stderr.write("(python) fb thread: {}\n".format(thread))
+ self.bridge.setup_joined_thread(thread)
- self.bridge.write({
- "_type": JOINED,
- "room": thread.uid,
- })
-
- self.bridge.send_room_info(thread, members)
- self.bridge.send_room_members(thread, members)
-
- self.backlog_room(thread)
-
-
- def backlog_room(self, thread):
- prev_last_seen = self.bridge.cache_get("last_seen_%s"%thread.uid)
- if prev_last_seen == "":
- prev_last_seen = None
-
- messages = []
- found = False
- while not found:
- before = None
- if len(messages) > 0:
- before = messages[-1].timestamp
- page = self.client.fetchThreadMessages(thread.uid, before=before, limit=20)
- for m in page:
- if m.uid == prev_last_seen or len(messages) > self.bridge.init_backlog_length:
- found = True
- break
- else:
- messages.append(m)
-
- for m in reversed(messages):
- if m.text is None:
- m.text = ""
- m.text = "[{}] {}".format(
- time.strftime("%Y-%m-%d %H:%M %Z", time.localtime(float(m.timestamp)/1000)).strip(),
- m.text)
- self.bridge.onMessage(thread_id=thread.uid,
- thread_type=thread.type,
- message_object=m)
class ClientListenThread(threading.Thread):
def __init__(self, client, *args, **kwargs):
@@ -149,7 +109,7 @@ class ClientListenThread(threading.Thread):
self.client = client
def run(self):
- sys.stderr.write("Start client.listen()\n")
+ sys.stderr.write("(python messenger) Start client.listen()\n")
self.client.listen()
@@ -159,7 +119,8 @@ class MessengerBridge:
def __init__(self):
self.rev_uid = {}
self.uid_map = {}
- self.joined_map = {}
+ self.others_joined_map = {}
+ self.my_joined_rooms = {}
self.init_backlog_length = 100
def getUserId(self, user):
@@ -215,7 +176,7 @@ class MessengerBridge:
try:
line = sys.stdin.readline()
except KeyboardInterrupt:
- sys.stderr.write("(messenger) shutting down")
+ sys.stderr.write("(python messenger) shutting down")
self.close()
time.sleep(5)
sys.exit(0)
@@ -230,7 +191,7 @@ class MessengerBridge:
if "_type" not in rep:
rep["_type"] = REP_OK
except Exception as e:
- sys.stderr.write("{}\n".format(traceback.format_exc()))
+ sys.stderr.write("(python) {}\n".format(traceback.format_exc()))
rep = {
"_type": REP_ERROR,
"error": "{}".format(e)
@@ -292,8 +253,19 @@ class MessengerBridge:
userId = self.getUserIdFromUid(self.client.uid)
return {"_type": REP_OK, "user": userId}
- elif ty == INVITE and cmd["room"] == "":
- return {"_type": REP_OK}
+ elif ty == JOIN:
+ self.ensure_i_joined(cmd["room"])
+
+ elif ty == LEAVE:
+ thread_id = cmd["room"]
+ self.client.removeUserFromGroup(self.client.uid, thread_id)
+ if thread_id in self.my_joined_rooms:
+ del self.my_joined_rooms[thread_id]
+
+ elif ty == INVITE:
+ if cmd["room"] != "":
+ uid = self.revUserId(cmd["user"])
+ self.client.addUsersToGroup([uid], cmd["room"])
elif ty == SEND:
event = cmd["data"]
@@ -318,7 +290,7 @@ class MessengerBridge:
msg_id = self.client.send(msg, thread_id=event["room"], thread_type=ThreadType.GROUP)
elif event["recipient"] != "":
uid = self.revUserId(event["recipient"])
- sys.stderr.write("Sending to {}\n".format(uid))
+ sys.stderr.write("(python) Sending to {}\n".format(uid))
if len(attachments) > 0:
msg_id = self.client.sendRemoteFiles(attachments, message=msg, thread_id=uid, thread_type=ThreadType.USER)
else:
@@ -344,13 +316,40 @@ class MessengerBridge:
q = queue.Queue(1)
self.cache_gets[num] = q
self.write({"_type": CACHE_GET, "_id": num, "key": key})
- rep = q.get(block=True, timeout=30)
+ try:
+ rep = q.get(block=True, timeout=30)
+ except queue.Empty:
+ rep = ""
del self.cache_gets[num]
return rep
def cache_put(self, key, value):
self.write({"_type": CACHE_PUT, "key": key, "value": value})
+ # ---- Info sync ----
+
+ def ensure_i_joined(self, thread_id):
+ if thread_id not in self.my_joined_rooms:
+ thread = self.client.fetchThreadInfo(thread_id)[thread_id]
+ self.setup_joined_thread(thread)
+
+ def setup_joined_thread(self, thread):
+ self.my_joined_rooms[thread.uid] = True
+
+ if thread.type == ThreadType.GROUP:
+ members = self.client.fetchAllUsersFromThreads([thread])
+
+ self.write({
+ "_type": JOINED,
+ "room": thread.uid,
+ })
+
+ self.send_room_info(thread, members)
+ self.send_room_members(thread, members)
+
+ self.backlog_room(thread)
+
+
def send_room_info(self, thread, members):
room_info = {}
if thread.name is not None:
@@ -378,12 +377,41 @@ class MessengerBridge:
def send_room_members(self, thread, members):
for member in members:
- sys.stderr.write("fb thread member: {}\n".format(member))
+ sys.stderr.write("(python) fb thread member: {}\n".format(member))
self.ensureJoined(self.getUserId(member), thread.uid)
+ def backlog_room(self, thread):
+ prev_last_seen = self.cache_get("last_seen_%s"%thread.uid)
+ if prev_last_seen == "":
+ prev_last_seen = None
+
+ messages = []
+ found = False
+ while not found:
+ before = None
+ if len(messages) > 0:
+ before = messages[-1].timestamp
+ page = self.client.fetchThreadMessages(thread.uid, before=before, limit=20)
+ for m in page:
+ if m.uid == prev_last_seen or len(messages) > self.init_backlog_length:
+ found = True
+ break
+ else:
+ messages.append(m)
+
+ for m in reversed(messages):
+ if m.text is None:
+ m.text = ""
+ m.text = "[{}] {}".format(
+ time.strftime("%Y-%m-%d %H:%M %Z", time.localtime(float(m.timestamp)/1000)).strip(),
+ m.text)
+ self.onMessage(thread_id=thread.uid,
+ thread_type=thread.type,
+ message_object=m)
+
def ensureJoined(self, userId, room):
key = "{}--{}".format(userId, room)
- if not key in self.joined_map:
+ if not key in self.others_joined_map:
self.write({
"_type": EVENT,
"data": {
@@ -392,9 +420,13 @@ class MessengerBridge:
"room": room,
}
})
- self.joined_map[key] = True
+ self.others_joined_map[key] = True
+
+ # ---- Event handlers ----
def onMessage(self, thread_id, thread_type, message_object, **kwargs):
+ self.ensure_i_joined(thread_id)
+
if message_object.author == self.client.uid:
# Ignore our own messages
return
@@ -442,29 +474,44 @@ class MessengerBridge:
event["room"] = thread_id
self.ensureJoined(author, thread_id)
- self.write({"_type": EVENT, "data": event})
+ if event["text"] != "" or len(event["attachments"]) > 0:
+ self.write({"_type": EVENT, "data": event})
self.cache_put("last_seen_%s"%thread_id, message_object.uid)
def onPeopleAdded(self, added_ids, thread_id, *args, **kwargs):
for user_id in added_ids:
- self.ensureJoined(self.getUserIdFromUid(user_id), thread_id)
+ if user_id == self.client.uid:
+ self.ensure_i_joined(thread_id)
+ else:
+ self.ensureJoined(self.getUserIdFromUid(user_id), thread_id)
def onPersonRemoved(self, removed_id, thread_id, *args, **kwargs):
- userId = self.getUserIdFromUid(removed_id),
- self.write({
- "_type": EVENT,
- "data": {
- "type": EVENT_JOIN,
- "author": userId,
+ if removed_id == self.client.uid:
+ self.write({
+ "_type": LEFT,
"room": thread_id,
- }
- })
- del self.joined_map["{}--{}".format(userId, thread_id)]
+ })
+ if thread_id in self.my_joined_rooms:
+ del self.my_joined_rooms[thread_id]
+ else:
+ userId = self.getUserIdFromUid(removed_id),
+ self.write({
+ "_type": EVENT,
+ "data": {
+ "type": EVENT_JOIN,
+ "author": userId,
+ "room": thread_id,
+ }
+ })
+ map_key = "{}--{}".format(userId, thread_id)
+ if map_key in self.others_joined_map:
+ del self.others_joined_map[map_key]
def onTitleChange(self, author_id, new_title, thread_id, thread_type, *args, **kwargs):
+ self.ensure_i_joined(thread_id)
if thread_type == ThreadType.GROUP:
- self.bridge.write({
+ self.write({
"_type": ROOM_INFO_UPDATED,
"room": thread_id,
"data": {"name": new_title},