aboutsummaryrefslogblamecommitdiff
path: root/external/messenger.py
blob: e45a45e40fefee7f86f30447219bab7a0c7ad7db (plain) (tree)
1
2
3
4
5
6
7
8
9



                      

                

             
           

                                              
 


              
 
             
                           










                               
                 



                  
                           











                                                   
                                         

                       





                         
 





                                                         





                                                          
 


















                                                                    
 
                                                             
 
                                     
                                                              
                                                           
 
                            
                                        

                  

                                                      

                                                                       
 



                                                                 
 


                            
                                                                      
                            




                                                       
                       
                                      
 













                                                                                                                             
                              
                     
                                                        
                                            
             
                             
 

                                           
                                           





                                                                  
                        
                                           
                               



                                  






                                                      

                                 







                                                                  
 




                                     
 


                                

                            
                            

                                


                                           
                                                                    
                            
                     
 









                                                                          
                                                                                
















                                                                
                                                                          
 

                                                                                                 



                                                                     
                                                                               
                                                                                                
                                                                      
 
                                            
                                                                                         
 






                                                                                    

                                       

                                                                    



                                                                              
                                                  
                                          
 
                                                     
                                                              
                                            

                                                  
                                                   
 
                         
                        
 
                            
                                                             
 












                                                                       
 









                                                             


                                                              








                                                                                     




                                                   



                                                                                                                                             

                                                            
                                                                            



                                                                                                                                  







                                                                           


                                                                   



                                   





                                                                



                                               


                                

                                                                    
 



                                                 

                                                  
                                                                      
                                              

                                          
                                                                          













                                                                    
                                              

                                         

























                                                                                                    
                                                                              

                                                                 




























                                                                                                        

                                           
                                             
                        






                                       


                                              
 
                                                                          

                                       



                                                    

                                                                                       


                                                             
                                         


                                            
                                 
             


                                 










                                                                      







                                                 
                                             

                                            



                                                                        

                                           

                                                

                                                                


                                                                    

                                                                   



                                                                            

                                                                      


                                         
                                  















                                                        

                                                                                           
                                       
                                           
                        



                                            
 







                                                                
                                                                                             





                                                 
                          




                                          
 
#!/usr/bin/env python3

import sys
import json
import signal
import threading
import queue
import pickle
import time
import traceback
from urllib.parse import unquote as UrlUnquote

import base64
import getpass
import zlib

import fbchat
from fbchat.models import *

# ---- MESSAGE TYPES ----

# ezbr -> external
CONFIGURE = "configure"
GET_USER = "get_user"
SET_USER_INFO = "set_user_info"
SET_ROOM_INFO = "set_room_info"
JOIN = "join"
INVITE = "invite"
LEAVE = "leave"
SEARCH = "search"
SEND = "send"
CLOSE = "close"

# external -> ezbr
SAVE_CONFIG = "save_config"
JOINED = "joined"
LEFT = "left"
USER_INFO_UPDATED = "user_info_updated"
ROOM_INFO_UPDATED = "room_info_updated"
EVENT = "event"
CACHE_PUT = "cache_put"
CACHE_GET = "cache_get"

# reply messages
# ezbr -> external: all must wait for a reply!
# external -> ezbr: only CACHE_GET produces a reply
REP_OK = "rep_ok"
REP_SEARCH_RESULTS = "rep_search_results"
REP_ERROR = "rep_error"

# Event types
EVENT_JOIN = "join"
EVENT_LEAVE = "leave"
EVENT_MESSAGE = "message"
EVENT_ACTION = "action"


def mediaObjectOfURL(url):
    return {
            "filename": url.split("?")[0].split("/")[-1],
            "url": url,
            }

def stripFbLinkPrefix(url):
    PREFIX = "https://l.facebook.com/l.php?u="
    if url[:len(PREFIX)] == PREFIX:
        return UrlUnquote(url[len(PREFIX):].split('&')[0])
    else:
        return url

# ---- MESSENGER CLIENT CLASS THAT HANDLES EVENTS ----

class MessengerBridgeClient(fbchat.Client):
    def __init__(self, *args, **kwargs):
        super(MessengerBridgeClient, self).__init__(*args, **kwargs)
        self.bridge = None

    def setBridge(self, bridge):
        self.bridge = bridge

    ## Redirect all interesting events to Bridge
    def onMessage(self, *args, **kwargs):
        self.bridge.onMessage(*args, **kwargs)
    def onPeopleAdded(self, *args, **kwargs):
        self.bridge.onPeopleAdded(*args, **kwargs)
    def onPersonRemoved(self, *args, **kwargs):
        self.bridge.onPersonRemoved(*args, **kwargs)
    def onTitleChange(self, *args, **kwargs):
        self.bridge.onTitleChange(*args, **kwargs)

# ---- SEPARATE THREADS FOR INITIAL SYNC & CLIENT LISTEN ----

class SyncerThread(threading.Thread):
    def __init__(self, bridge, thread_queue, *args, **kwargs):
        super(SyncerThread, self).__init__(*args, **kwargs)

        self.bridge = bridge
        self.thread_queue = thread_queue

    def run(self):
        while True:
            thread = self.thread_queue.get(block=True)
            sys.stderr.write("(python) fb thread: {}\n".format(thread))
            self.bridge.setup_joined_thread(thread)


class ClientListenThread(threading.Thread):
    def __init__(self, client, *args, **kwargs):
        super(ClientListenThread, self).__init__(*args, **kwargs)

        self.client = client

    def run(self):
        sys.stderr.write("(python messenger) Start client.listen()\n")
        self.client.listen()


# ---- MAIN LOOP THAT HANDLES REQUESTS FROM BRIDGE ----

class MessengerBridge:
    def __init__(self):
        self.init_backlog_length = 100

        # We cache maps between two kinds of identifiers:
        # - facebook uids of users
        # - identifiers for the bridge, which are the username when defined (otherwise equal to above)
        # Generally speaking, the first is referred to as uid whereas the second is just id
        # THESE MAPS SHOULD NOT BE USED DIRECTLY, instead functions getUserId, getUserIdFromUid and revUserId should be used
        self.uid_map = {}   # map from fb user uid to bridge id
        self.rev_uid = {}   # map fro bridge id to fb user uid

        # caches the room we (the user of the bridge) have joined (map keys = room uid)
        self.my_joined_rooms = {}

        # caches for the people that are in rooms so that we don't send JOINED every time (map keys = "<userId>--<threadId>")
        self.others_joined_map = {}

    def getUserId(self, user):
        retval = None
        if user.url is not None and not "?" in user.url:
            retval = user.url.split("/")[-1]
        else:
            retval = user.uid

        if user.uid not in self.uid_map:
            self.uid_map[user.uid] = retval
            self.rev_uid[retval] = user.uid

            user_info = {
                "display_name": user.name,
            }
            if user.photo is not None:
                user_info["avatar"] = mediaObjectOfURL(user.photo)
            self.write({
                "_type": USER_INFO_UPDATED,
                "user": retval,
                "data": user_info,
            })

        return retval

    def getUserIdFromUid(self, uid):
        if uid in self.uid_map:
            return self.uid_map[uid]
        else:
            user = self.client.fetchUserInfo(uid)[uid]
            return self.getUserId(user)

    def revUserId(self, user_id):
        if user_id not in self.rev_uid:
            for user in self.client.searchForUsers(user_id):
                self.getUserId(user)

        if user_id not in self.rev_uid:
            raise ValueError("User not found: {}".format(user_id))

        return self.rev_uid[user_id]

    def getUserShortName(self, user):
        if user.first_name != None:
            return user.first_name
        else:
            return user.name

    def run(self):
        self.client = None
        self.keep_running = True
        self.cache_gets = {}
        self.num = 0
        self.my_user_id = ""

        while self.keep_running:
            try:
                line = sys.stdin.readline()
            except KeyboardInterrupt:
                sys.stderr.write("(python messenger) shutting down")
                self.close()
                break

            sys.stderr.write("(python) reading {}\n".format(line.strip()))
            cmd = json.loads(line)

            try:
                rep = self.handle_cmd(cmd)
                if rep is None:
                    rep = {}
                if "_type" not in rep:
                    rep["_type"] = REP_OK
            except Exception as e:
                sys.stderr.write("(python) {}\n".format(traceback.format_exc()))
                rep = {
                        "_type": REP_ERROR,
                        "error": "{}".format(e)
                        }

            rep["_id"] = cmd["_id"]
            self.write(rep)

    def write(self, msg):
        msgstr = json.dumps(msg)
        sys.stderr.write("(python) writing {}\n".format(msgstr))
        sys.stdout.write(msgstr + "\n")
        sys.stdout.flush()

    def handle_cmd(self, cmd):
        ty = cmd["_type"]
        if ty == CONFIGURE:
            self.init_backlog_length = int(cmd["data"]["initial_backlog"])

            has_pickle = "client_pickle" in cmd["data"] and len(cmd["data"]["client_pickle"]) > 0
            if has_pickle:
                data = base64.b64decode(cmd["data"]["client_pickle"])
                data = zlib.decompress(data)
                self.client = pickle.loads(data)
            else:
                email, password = cmd["data"]["email"], cmd["data"]["password"]
                self.client = MessengerBridgeClient(email=email, password=password, max_tries=1)
                ## TODO: save client in new client_pickle config value

            if not self.client.isLoggedIn():
                return {"_type": REP_ERROR, "error": "Unable to login (invalid pickle?)"}

            if not has_pickle:
                new_config = cmd["data"]
                data = pickle.dumps(self.client)
                data = zlib.compress(data)
                new_config["client_pickle"] = base64.b64encode(data).decode('ascii')
                self.write({"_type": SAVE_CONFIG, "data": new_config})

            self.client.setBridge(self)

            self.my_user_id = self.getUserIdFromUid(self.client.uid)

            threads = self.client.fetchThreadList()
            # ensure we have a correct mapping for bridged user IDs to fb uids
            # (this should be fast)
            for thread in threads:
                if thread.type == ThreadType.USER:
                    self.getUserId(thread)

            self.sync_thread_queue = queue.Queue(100)
            SyncerThread(self, self.sync_thread_queue).start()
            for thread in reversed(threads):
                self.sync_thread_queue.put(thread)

            ClientListenThread(self.client).start()

        elif ty == CLOSE:
            self.close()

        elif ty == GET_USER:
            return {"_type": REP_OK, "user": self.my_user_id}

        elif ty == JOIN:
            self.ensure_i_joined(cmd["room"])

        elif ty == LEAVE:
            thread_id = cmd["room"]
            self.client.removeUserFromGroup(self.client.uid, thread_id)
            if thread_id in self.my_joined_rooms:
                del self.my_joined_rooms[thread_id]

        elif ty == INVITE:
            if cmd["room"] != "":
                uid = self.revUserId(cmd["user"])
                self.client.addUsersToGroup([uid], cmd["room"])

        elif ty == SEARCH:
            users = self.client.searchForUsers(cmd["data"])
            rep = []
            for user in users:
                rep.append({
                    "id": self.getUserId(user),
                    "display_name": user.name,
                })
            return {"_type": REP_SEARCH_RESULTS, "data": rep}

        elif ty == SEND:
            event = cmd["data"]
            if event["type"] in [EVENT_MESSAGE, EVENT_ACTION]:
                attachments = []
                if "attachments" in event and isinstance(event["attachments"], list):
                    for at in event["attachments"]:
                        if "url" in at:
                            attachments.append(at["url"])
                        else:
                            # TODO
                            sys.stdout.write("Unhandled: attachment without URL")

                msg = Message(event["text"])
                if event["type"] == EVENT_ACTION:
                    msg.text = "* " + event["text"]

                if event["room"] != "":
                    if len(attachments) > 0:
                        msg_id = self.client.sendRemoteFiles(attachments, message=msg, thread_id=event["room"], thread_type=ThreadType.GROUP)
                    else:
                        msg_id = self.client.send(msg, thread_id=event["room"], thread_type=ThreadType.GROUP)
                elif event["recipient"] != "":
                    uid = self.revUserId(event["recipient"])
                    sys.stderr.write("(python) Sending to {}\n".format(uid))
                    if len(attachments) > 0:
                        msg_id = self.client.sendRemoteFiles(attachments, message=msg, thread_id=uid, thread_type=ThreadType.USER)
                    else:
                        msg_id = self.client.send(msg, thread_id=uid, thread_type=ThreadType.USER)
                else:
                    return {"_type": REP_ERROR, "error": "Invalid message"}

                return {"_type": REP_OK, "event_id": msg_id}

        elif ty == REP_OK and cmd["_id"] in self.cache_gets:
            self.cache_gets[cmd["_id"]].put(cmd["value"])

        else:
            return {"_type": REP_ERROR, "error": "Not implemented"}

    def close(self):
        self.keep_running = False
        self.client.stopListening()

    def cache_get(self, key):
        self.num += 1
        num = self.num
        q = queue.Queue(1)
        self.cache_gets[num] = q
        self.write({"_type": CACHE_GET, "_id": num, "key": key})
        try:
            rep = q.get(block=True, timeout=30)
        except queue.Empty:
            rep = ""
        del self.cache_gets[num]
        return rep

    def cache_put(self, key, value):
        self.write({"_type": CACHE_PUT, "key": key, "value": value})

    # ---- Info sync ----

    def ensure_i_joined(self, thread_id):
        if thread_id not in self.my_joined_rooms:
            self.my_joined_rooms[thread_id] = True

            thread = self.client.fetchThreadInfo(thread_id)[thread_id]
            self.sync_thread_queue.put(thread)

    def setup_joined_thread(self, thread):
        sys.stderr.write("(python) setup_joined_thread {}".format(thread))
        if thread.type == ThreadType.GROUP:
            members = self.client.fetchAllUsersFromThreads([thread])

            self.write({
                "_type": JOINED,
                "room": thread.uid,
            })

            self.send_room_info(thread, members)
            self.send_room_members(thread, members)

        self.backlog_room(thread)


    def send_room_info(self, thread, members):
        members.sort(key=lambda m: m.uid)

        room_info = {}
        if thread.name is not None:
            room_info["name"] = thread.name
        else:
            who = [m for m in members if m.uid != self.client.uid]
            if len(who) > 3:
                room_info["name"] = ", ".join([self.getUserShortName(m) for m in who[:3]] + ["..."])
            else:
                room_info["name"] = ", ".join([self.getUserShortName(m) for m in who])

        if thread.photo is not None:
            room_info["picture"] = mediaObjectOfURL(thread.photo)
        else:
            for m in members:
                if m.uid != self.client.uid and m.photo is not None:
                    room_info["picture"] = mediaObjectOfURL(m.photo)
                    break

        self.write({
            "_type": ROOM_INFO_UPDATED,
            "room": thread.uid,
            "data": room_info,
        })

    def send_room_members(self, thread, members):
        for member in members:
            sys.stderr.write("(python) fb thread member: {}\n".format(member))
            self.ensureJoined(self.getUserId(member), thread.uid)

    def backlog_room(self, thread):
        prev_last_seen = self.cache_get("last_seen_%s"%thread.uid)
        if prev_last_seen == "":
            prev_last_seen = None

        messages = []
        found = False
        while not found:
            before = None
            if len(messages) > 0:
                before = messages[-1].timestamp
            page = self.client.fetchThreadMessages(thread.uid, before=before, limit=20)
            for m in page:
                if m.uid == prev_last_seen or len(messages) > self.init_backlog_length:
                    found = True
                    break
                else:
                    messages.append(m)

        for m in reversed(messages):
            if m.text is None:
                m.text = ""
            m.text = "[{}] {}".format(
                    time.strftime("%Y-%m-%d %H:%M %Z", time.localtime(float(m.timestamp)/1000)).strip(),
                    m.text)
            self.onMessage(thread_id=thread.uid,
                                  thread_type=thread.type,
                                  message_object=m)

    def ensureJoined(self, userId, room):
        key = "{}--{}".format(userId, room)
        if not key in self.others_joined_map:
            self.write({
                "_type": EVENT,
                "data": {
                    "type": EVENT_JOIN,
                    "author": userId,
                    "room": room,
                }
            })
            self.others_joined_map[key] = True

    # ---- Event handlers ----

    def onMessage(self, thread_id, thread_type, message_object, **kwargs):
        self.ensure_i_joined(thread_id)

        if message_object.author == self.client.uid:
            # Ignore our own messages
            return

        sys.stderr.write("(python messenger) Got message: {}\n".format(message_object))

        author = self.getUserIdFromUid(message_object.author)

        event = {
                "id": message_object.uid,
                "type": EVENT_MESSAGE,
                "author": author,
                "text": message_object.text,
                "attachments": []
            }
        if event["text"] is None:
            event["text"] = ""

        for at in message_object.attachments:
            if isinstance(at, ImageAttachment):
                full_url = self.client.fetchImageUrl(at.uid)
                event["attachments"].append({
                    "filename": full_url.split("?")[0].split("/")[-1],
                    "url": full_url,
                    "image_size": {
                        "width": at.width,
                        "height": at.height,
                    },
                })
            elif isinstance(at, FileAttachment):
                url = stripFbLinkPrefix(at.url)
                event["attachments"].append({
                    "filename": at.name,
                    "url": url,
                })
            elif isinstance(at, AudioAttachment):
                url = stripFbLinkPrefix(at.url)
                event["attachments"].append({
                    "filename": at.filename,
                    "url": url,
                })
            else:
                event["text"] += "\nUnhandled attachment: {}".format(at)

        if thread_type == ThreadType.GROUP:
            event["room"] = thread_id
            self.ensureJoined(author, thread_id)

        if event["text"] != "" or len(event["attachments"]) > 0:
            self.write({"_type": EVENT, "data": event})

        self.cache_put("last_seen_%s"%thread_id, message_object.uid)

    def onPeopleAdded(self, added_ids, thread_id, *args, **kwargs):
        for user_id in added_ids:
            if user_id == self.client.uid:
                self.ensure_i_joined(thread_id)
            else:
                self.ensureJoined(self.getUserIdFromUid(user_id), thread_id)

    def onPersonRemoved(self, removed_id, thread_id, *args, **kwargs):
        if removed_id == self.client.uid:
            self.write({
                "_type": LEFT,
                "room": thread_id,
            })
            if thread_id in self.my_joined_rooms:
                del self.my_joined_rooms[thread_id]
        else:
            userId = self.getUserIdFromUid(removed_id),
            self.write({
                "_type": EVENT,
                "data": {
                    "type": EVENT_JOIN,
                    "author": userId,
                    "room": thread_id,
                }
            })
            map_key = "{}--{}".format(userId, thread_id)
            if map_key in self.others_joined_map:
                del self.others_joined_map[map_key]

    def onTitleChange(self, author_id, new_title, thread_id, thread_type, *args, **kwargs):
        self.ensure_i_joined(thread_id)
        if thread_type == ThreadType.GROUP:
            self.write({
                "_type": ROOM_INFO_UPDATED,
                "room": thread_id,
                "data": {"name": new_title},
            })

# ---- CLI ----

def createClientPickle():
    email = input("Email address of Facebook account: ")
    password = getpass.getpass()
    client = MessengerBridgeClient(email, password, max_tries=1)
    if not client.isLoggedIn():
        print("Could not log in (why???)")
        print("Still creating pickle though, maybe it will work after login was authorized?")
    print("")
    data = pickle.dumps(client)
    data = zlib.compress(data)
    data = base64.b64encode(data).decode('ascii')
    print(data)

if __name__ == "__main__":
    if "create_client_pickle" in sys.argv:
        createClientPickle()
    else:
        bridge = MessengerBridge()
        bridge.run()