aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--account.go9
-rwxr-xr-xexternal/messenger.py117
-rw-r--r--main.go20
3 files changed, 114 insertions, 32 deletions
diff --git a/account.go b/account.go
index 44173c6..b4e7118 100644
--- a/account.go
+++ b/account.go
@@ -128,6 +128,15 @@ func RemoveAccount(mxUser string, name string) {
}
}
+func CloseAllAcountsForShutdown() {
+ accountsLock.Lock()
+ for _, accl := range registeredAccounts {
+ for _, acct := range accl {
+ acct.Conn.Close()
+ }
+ }
+}
+
// ----
func SaveDbAccounts(mxid string, key *[32]byte) {
diff --git a/external/messenger.py b/external/messenger.py
index 792a356..6af2ec2 100755
--- a/external/messenger.py
+++ b/external/messenger.py
@@ -6,6 +6,7 @@ import signal
import threading
import queue
import pickle
+import time
import hashlib
@@ -47,8 +48,6 @@ EVENT_MESSAGE = "message"
EVENT_ACTION = "action"
-# ---- MESSENGER CLIENT CLASS THAT HANDLES EVENTS ----
-
def mediaObjectOfURL(url):
return {
@@ -57,12 +56,27 @@ def mediaObjectOfURL(url):
}
-# class MessengerBridgeClient(fbchat.Client):
-# def __init__(self, bridge, *args, **kwargs):
-# super(MessengerBridgeClient, self).__init__(*args, **kwargs)
-#
-# # TODO: handle events
+# ---- MESSENGER CLIENT CLASS THAT HANDLES EVENTS ----
+
+class MessengerBridgeClient(fbchat.Client):
+ def __init__(self, *args, **kwargs):
+ super(MessengerBridgeClient, self).__init__(*args, **kwargs)
+ self.bridge = None
+
+ def setBridge(self, bridge):
+ self.bridge = bridge
+
+ ## Redirect all interesting events to Bridge
+ def onMessage(self, *args, **kwargs):
+ self.bridge.onMessage(*args, **kwargs)
+ def onPeopleAdded(self, *args, **kwargs):
+ self.bridge.onPeopleAdded(*args, **kwargs)
+ def onPersonRemoved(self, *args, **kwargs):
+ self.bridge.onPersonRemoved(*args, **kwargs)
+ def onTitleChange(self, *args, **kwargs):
+ self.bridge.onTitleChange(*args, **kwargs)
+# ---- SEPARATE THREADS FOR INITIAL SYNC & CLIENT LISTEN ----
class InitialSyncThread(threading.Thread):
def __init__(self, client, bridge, *args, **kwargs):
@@ -83,7 +97,7 @@ class InitialSyncThread(threading.Thread):
"_type": JOINED,
"room": thread.uid,
})
-
+
self.send_room_info(thread, members)
self.send_room_members(thread, members)
elif thread.type == ThreadType.USER:
@@ -147,7 +161,7 @@ class InitialSyncThread(threading.Thread):
else:
messages = []
found = False
- while not Found:
+ while not found:
before = None
if len(messages) > 0:
before = messages[-1].timestamp
@@ -159,8 +173,19 @@ class InitialSyncThread(threading.Thread):
else:
messages.append(m)
for m in reversed(messages):
- self.bridge.handleMessage(thread, m)
+ self.bridge.onMessage(thread_id=thread.uid,
+ thread_type=thread.type,
+ message_object=m)
+
+class ClientListenThread(threading.Thread):
+ def __init__(self, client, *args, **kwargs):
+ super(ClientListenThread, self).__init__(*args, **kwargs)
+ self.client = client
+
+ def run(self):
+ sys.stderr.write("Start client.listen()\n")
+ self.client.listen()
# ---- MAIN LOOP THAT HANDLES REQUESTS FROM BRIDGE ----
@@ -175,9 +200,17 @@ class MessengerBridge:
user_id = user.url.split("/")[-1]
self.rev_uid[user_id] = user.uid
self.uid_map[user.uid] = user_id
- return user_id
else:
- return user.uid
+ self.uid_map[user.uid] = user.uid
+
+ return self.uid_map[user.uid]
+
+ def getUserIdFromUid(self, uid):
+ if uid in self.uid_map:
+ return self.uid_map[uid]
+ else:
+ user = self.client.fetchUserInfo(uid)[uid]
+ return self.getUserId(user)
def revUserId(self, user_id):
if user_id in self.rev_uid:
@@ -199,7 +232,14 @@ class MessengerBridge:
self.num = 0
while self.keep_running:
- line = sys.stdin.readline()
+ try:
+ line = sys.stdin.readline()
+ except KeyboardInterrupt:
+ sys.stderr.write("(messenger) shutting down")
+ self.close()
+ time.sleep(5)
+ sys.exit(0)
+
sys.stderr.write("(python) reading {}\n".format(line.strip()))
cmd = json.loads(line)
@@ -239,7 +279,7 @@ class MessengerBridge:
if self.client is None:
email, password = cmd["data"]["email"], cmd["data"]["password"]
- self.client = fbchat.Client(email=email, password=password, max_tries=1)
+ self.client = MessengerBridgeClient(email=email, password=password, max_tries=1)
if self.client.isLoggedIn():
try:
@@ -249,13 +289,16 @@ class MessengerBridge:
except:
pass
+ self.client.setBridge(self)
InitialSyncThread(self.client, self).start()
+ ClientListenThread(self.client).start()
elif ty == CLOSE:
- self.keep_running = False
+ self.close()
elif ty == GET_USER:
- return {"_type": REP_OK, "user": self.client.uid}
+ userId = self.getUserIdFromUid(self.client.uid)
+ return {"_type": REP_OK, "user": userId}
elif ty == INVITE and cmd["room"] == "":
return {"_type": REP_OK}
@@ -284,6 +327,10 @@ class MessengerBridge:
else:
return {"_type": REP_ERROR, "error": "Not implemented"}
+ def close(self):
+ self.keep_running = False
+ self.client.stopListening()
+
def cache_get(self, key):
self.num += 1
num = self.num
@@ -294,19 +341,33 @@ class MessengerBridge:
del self.cache_gets[num]
return rep
- def handleMessage(self, thread, m):
- author = m.author
- if author in self.uid_map:
- author = self.uid_map[m.author]
+ def cache_put(self, key, value):
+ self.write({"_type": CACHE_PUT, "key": key, "value": value})
- event = {
- "type": EVENT_MESSAGE,
- "author": author,
- "text": m.text,
- }
- if thread.type == ThreadType.GROUP:
- event["room"] = thread.uid
- self.write({"_type": EVENT, "data": event})
+ def onMessage(self, thread_id, thread_type, message_object, **kwargs):
+ if message_object.author == self.client.uid:
+ # Ignore our own messages
+ return
+
+ author = self.getUserIdFromUid(message_object.author)
+
+ event = {
+ "type": EVENT_MESSAGE,
+ "author": author,
+ "text": message_object.text,
+ }
+ if thread_type == ThreadType.GROUP:
+ event["room"] = thread_id
+ self.write({"_type": EVENT, "data": event})
+
+ self.cache_put("last_seen_%s"%thread_id, message_object.uid)
+
+ def onPeopleAdded(self, *args, **kwargs):
+ pass
+ def onPersonRemoved(self, *args, **kwargs):
+ pass
+ def onTitleChange(self, *args, **kwargs):
+ pass
if __name__ == "__main__":
diff --git a/main.go b/main.go
index 514bd4c..d6597a2 100644
--- a/main.go
+++ b/main.go
@@ -7,6 +7,8 @@ import (
"flag"
"io/ioutil"
"os"
+ "os/signal"
+ "syscall"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
@@ -174,6 +176,9 @@ func main() {
// Start appservice and web management interface
errch := make(chan error)
+ sigch := make(chan os.Signal)
+ signal.Notify(sigch, os.Interrupt, syscall.SIGTERM)
+
err = StartAppService(errch)
if err != nil {
log.Fatal(err)
@@ -181,9 +186,16 @@ func main() {
StartWeb(errch)
- // Wait for an error somewhere
- err = <-errch
- if err != nil {
- log.Fatal(err)
+ // Wait for an error somewhere or interrupt signal
+ select {
+ case err = <-errch:
+ if err != nil {
+ log.Error(err)
+ }
+ case sig := <-sigch:
+ log.Warnf("Got signal %s", sig.String())
}
+
+ log.Warn("Shuttind down")
+ CloseAllAcountsForShutdown()
}