aboutsummaryrefslogblamecommitdiff
path: root/connector/xmpp/xmpp.go
blob: 55efcba5085d583c56359555a48a89de93e1a3af (plain) (tree)
1
2
3
4
5
6
7
8
9


            
                    

                 
              
              
 

                                               
                                        

                                                             










                                                  

                             
 



                            
                            

                            

                          
 







                                         





                                       
                                   
                            









                                                  

















                                                            
                                
                                       
 
                                                                  






                                                  
                                          
 



                                              
                                                          













                                                                   
                                                      


                                                 





                                               





                                                  
                                                                                                                              







                                                                           
 


                                                                                                    
                                                       
                                                                                                                                                 
                                                                            
                                                                      

                                         
                         



                                             
                                                                                                           












                                                                               
                                                                            


















                                        
                                            
 


                                      
 


                                                 
 


                                                         
 


















                                                                                                                        
                                 
                         



                                                                                           
 





                                                    
 



                                                                                       
 


                                                                   
                         




                                                                                 

                                 





















                                                                                        
                                 




                                                                                
                                 
                         



                 





















                                                                    




                                                   



                                                                  
                             
                                                  

                                                

                                            


                                  

         
                                            








                                                                                


                                           






                                                         
 
                                                                 
                                                                       

                       
                                            

         



                                                            




                                                           




                                            


                                   
                                        



                                          

 




                                                                          
                                                    


                                   




                                                                                              
                                                                                            






                                                                            



                                             
                                                
                                     
                                                  
                                       
                                                        
                                           
                  
                                    
                                       


                                                                    
                                                  
                                            
                                                   
                                           
                                         
                  
                                    
                
                                                      


         



                                                          
                         


                                   


                               

                                
                            
 
package xmpp

import (
	"crypto/tls"
	"fmt"
	"strings"
	"sync"
	"time"

	gxmpp "github.com/matterbridge/go-xmpp"
	"github.com/rs/xid"
	log "github.com/sirupsen/logrus"

	. "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector"
)

// User id format: username@server (= JID)
// OR: nickname@room_name@muc_server

// Room id format: room_name@muc_server (= MUC ID)

type XMPP struct {
	handler Handler

	connectorLoopNum int
	connected        bool
	timeout          int

	server        string
	port          int
	ssl           bool
	jid           string
	jid_localpart string
	password      string
	nickname      string

	conn *gxmpp.Client

	stateLock sync.Mutex
	muc       map[RoomID]*mucInfo
}

type mucInfo struct {
	joined        bool
	pendingJoins  map[UserID]string
	pendingLeaves map[UserID]struct{}
}

func (xm *XMPP) SetHandler(h Handler) {
	xm.handler = h
}

func (xm *XMPP) Protocol() string {
	return XMPP_PROTOCOL
}

func (xm *XMPP) Configure(c Configuration) error {
	if xm.conn != nil {
		xm.Close()
	}

	// Parse and validate configuration
	var err error

	xm.port, err = c.GetInt("port", 5222)
	if err != nil {
		return err
	}

	xm.ssl, err = c.GetBool("ssl", true)
	if err != nil {
		return err
	}

	xm.jid, err = c.GetString("jid")
	if err != nil {
		return err
	}
	jid_parts := strings.Split(xm.jid, "@")
	if len(jid_parts) != 2 {
		return fmt.Errorf("Invalid JID: %s", xm.jid)
	}
	xm.server = jid_parts[1]
	xm.jid_localpart = jid_parts[0]

	xm.nickname, _ = c.GetString("nickname", xm.jid_localpart)

	xm.password, err = c.GetString("password")
	if err != nil {
		return err
	}

	// Try to connect
	xm.muc = make(map[RoomID]*mucInfo)

	xm.connectorLoopNum += 1
	go xm.connectLoop(xm.connectorLoopNum)

	for i := 0; i < 42; i++ {
		time.Sleep(time.Duration(1) * time.Second)
		if xm.connected {
			return nil
		}
	}
	return fmt.Errorf("Failed to connect after 42s attempting")
}

func (xm *XMPP) connectLoop(num int) {
	xm.timeout = 10
	for {
		if xm.connectorLoopNum != num {
			return
		}
		tc := &tls.Config{
			ServerName:         xm.server,
			InsecureSkipVerify: true,
		}
		options := gxmpp.Options{
			Host:      xm.server,
			User:      xm.jid,
			Password:  xm.password,
			NoTLS:     true,
			StartTLS:  xm.ssl,
			Session:   true,
			TLSConfig: tc,
		}
		var err error
		xm.conn, err = options.NewClient()
		if err != nil {
			xm.connected = false
			xm.handler.SystemMessage(fmt.Sprintf("XMPP failed to connect (%s). Retrying in %ds", err, xm.timeout))
			time.Sleep(time.Duration(xm.timeout) * time.Second)
			xm.timeout *= 2
			if xm.timeout > 600 {
				xm.timeout = 600
			}
		} else {
			xm.connected = true
			xm.timeout = 10

			for muc, mucInfo := range xm.muc {
				if mucInfo.joined {
					_, err := xm.conn.JoinMUCNoHistory(string(muc), xm.nickname)
					if err != nil {
						xm.handler.SystemMessage(fmt.Sprintf("Could not rejoin MUC %s after reconnection: %s", muc, err))
						xm.handler.Left(RoomID(muc))
						mucInfo.joined = false
					}
				}
			}

			err = xm.handleXMPP()

			xm.connected = false
			xm.handler.SystemMessage(fmt.Sprintf("XMPP disconnected (%s), reconnecting)", err))
		}
	}
}

func (xm *XMPP) xmppKeepAlive() chan bool {
	done := make(chan bool)
	go func() {
		ticker := time.NewTicker(90 * time.Second)
		defer ticker.Stop()
		for {
			select {
			case <-ticker.C:
				if err := xm.conn.PingC2S("", ""); err != nil {
					log.Debugf("PING failed %#v\n", err)
				}
			case <-done:
				return
			}
		}
	}()
	return done
}

func (xm *XMPP) handleXMPP() error {
	done := xm.xmppKeepAlive()
	defer close(done)

	for {
		m, err := xm.conn.Recv()
		if err != nil {
			return err
		}

		log.Tracef("XMPP: %#v\n", m)

		xm.handleXMPPStance(m)
	}
}

func (xm *XMPP) handleXMPPStance(m interface{}) {
	xm.stateLock.Lock()
	defer xm.stateLock.Unlock()

	switch v := m.(type) {
	case gxmpp.Chat:
		remote_sp := strings.Split(v.Remote, "/")

		// Skip self-sent events
		if v.Remote == xm.jid || (v.Type == "groupchat" && len(remote_sp) == 2 && remote_sp[1] == xm.nickname) {
			return
		}

		// If empty text, make sure we joined the room
		// We would do this at every incoming message if it were not so costly
		if v.Text == "" && v.Type == "groupchat" {
			xm.handler.Joined(RoomID(remote_sp[0]))
		}

		// Handle subject change in group chats
		if v.Subject != "" && v.Type == "groupchat" {
			author := UserID("")
			if len(remote_sp) == 2 {
				if remote_sp[1] == xm.nickname {
					author = xm.User()
				} else {
					author = UserID(remote_sp[1] + "@" + remote_sp[0])
				}
			}
			xm.handler.RoomInfoUpdated(RoomID(remote_sp[0]), author, &RoomInfo{
				Topic: v.Subject,
			})
		}

		// Handle text message
		if v.Text != "" {
			event := &Event{
				Type: EVENT_MESSAGE,
				Text: v.Text,
			}

			if strings.HasPrefix(event.Text, "/me ") {
				event.Type = EVENT_ACTION
				event.Text = strings.Replace(event.Text, "/me ", "", 1)
			}

			if v.Type == "chat" {
				event.Author = UserID(remote_sp[0])
				xm.handler.Event(event)
			}
			if v.Type == "groupchat" && len(remote_sp) == 2 {
				// First flush pending leaves and joins
				room_id := RoomID(remote_sp[0])
				if muc, ok := xm.muc[room_id]; ok {
					muc.flushLeavesJoins(room_id, xm.handler)
				}

				// Now send event
				event.Room = room_id
				event.Author = UserID(remote_sp[1] + "@" + remote_sp[0])
				event.Id = v.ID
				xm.handler.Event(event)
			}
		}
	case gxmpp.Presence:
		remote := strings.Split(v.From, "/")
		room := RoomID(remote[0])
		if mucInfo, ok := xm.muc[room]; ok {
			// skip presence with no user and self-presence
			if len(remote) < 2 || remote[1] == xm.nickname {
				return
			}

			user := UserID(remote[1] + "@" + remote[0])
			if v.Type != "unavailable" {
				if _, ok := mucInfo.pendingLeaves[user]; ok {
					delete(mucInfo.pendingLeaves, user)
				} else {
					mucInfo.pendingJoins[user] = remote[1]
				}
			} else {
				if _, ok := mucInfo.pendingJoins[user]; ok {
					delete(mucInfo.pendingJoins, user)
				} else {
					mucInfo.pendingLeaves[user] = struct{}{}
				}
			}
		}
	}
}

func (muc *mucInfo) flushLeavesJoins(room RoomID, handler Handler) {
	for user, display_name := range muc.pendingJoins {
		handler.Event(&Event{
			Type:   EVENT_JOIN,
			Room:   room,
			Author: user,
		})
		handler.UserInfoUpdated(user, &UserInfo{
			DisplayName: display_name,
		})
	}
	for user, _ := range muc.pendingLeaves {
		handler.Event(&Event{
			Type:   EVENT_LEAVE,
			Room:   room,
			Author: user,
		})
	}
	muc.pendingJoins = make(map[UserID]string)
	muc.pendingLeaves = make(map[UserID]struct{})
}

func (xm *XMPP) User() UserID {
	return UserID(xm.jid)
}

func (xm *XMPP) SetUserInfo(info *UserInfo) error {
	return fmt.Errorf("Not implemented")
}

func (xm *XMPP) SetRoomInfo(roomId RoomID, info *RoomInfo) error {
	if info.Topic != "" {
		_, err := xm.conn.Send(gxmpp.Chat{
			Type:    "groupchat",
			Remote:  string(roomId),
			Subject: info.Topic,
		})
		if err != nil {
			return err
		}
	}

	if info.Picture.MediaObject != nil {
		// TODO
		return fmt.Errorf("Room picture change not implemented on xmpp")
	}

	if info.Name != "" && info.Name != string(roomId) {
		// TODO
		return fmt.Errorf("Room name change not implemented on xmpp")
	}
	return nil
}

func (xm *XMPP) Join(roomId RoomID) error {
	xm.stateLock.Lock()
	defer xm.stateLock.Unlock()

	xm.muc[roomId] = &mucInfo{
		pendingJoins:  make(map[UserID]string),
		pendingLeaves: make(map[UserID]struct{}),
	}

	log.Tracef("Join %s with nick %s\n", roomId, xm.nickname)
	_, err := xm.conn.JoinMUCNoHistory(string(roomId), xm.nickname)

	if err == nil {
		xm.muc[roomId].joined = true
	}

	return err
}

func (xm *XMPP) Invite(userId UserID, roomId RoomID) error {
	if roomId == "" {
		xm.conn.RequestSubscription(string(userId))
		xm.conn.ApproveSubscription(string(userId))
		return nil
	}
	// TODO
	return fmt.Errorf("Not implemented")
}

func (xm *XMPP) Leave(roomId RoomID) {
	xm.stateLock.Lock()
	defer xm.stateLock.Unlock()

	xm.conn.LeaveMUC(string(roomId))

	if muc, ok := xm.muc[roomId]; ok {
		muc.joined = false
	}
}

func (xm *XMPP) SearchForUsers(query string) ([]UserSearchResult, error) {
	// TODO: search roster
	return nil, fmt.Errorf("Not implemented")
}

func (xm *XMPP) Send(event *Event) (string, error) {
	xm.stateLock.Lock()
	defer xm.stateLock.Unlock()

	if event.Attachments != nil && len(event.Attachments) > 0 {
		for _, at := range event.Attachments {
			url := at.URL()
			if url == "" {
				// TODO find a way to send them using some hosing of some kind
				return "", fmt.Errorf("Attachment without URL sent to XMPP")
			} else {
				event.Text += fmt.Sprintf("\n%s (%s, %dkb)",
					url, at.Mimetype(), at.Size()/1024)
			}
		}
	}

	if event.Id == "" {
		event.Id = xid.New().String()
	}

	log.Tracef("xm *XMPP Send %#v\n", event)
	if len(event.Recipient) > 0 {
		_, err := xm.conn.Send(gxmpp.Chat{
			Type:   "chat",
			Remote: string(event.Recipient),
			Text:   event.Text,
		})
		return event.Id, err
	} else if len(event.Room) > 0 {
		if muc, ok := xm.muc[event.Room]; ok {
			muc.flushLeavesJoins(event.Room, xm.handler)
		}
		_, err := xm.conn.Send(gxmpp.Chat{
			Type:   "groupchat",
			Remote: string(event.Room),
			Text:   event.Text,
			ID:     event.Id,
		})
		return event.Id, err
	} else {
		return "", fmt.Errorf("Invalid event")
	}
}

func (xm *XMPP) UserCommand(cmd string) {
	xm.handler.SystemMessage("Command not supported.")
}

func (xm *XMPP) Close() {
	xm.stateLock.Lock()
	defer xm.stateLock.Unlock()

	if xm.conn != nil {
		xm.conn.Close()
	}
	xm.conn = nil
	xm.connectorLoopNum += 1
	xm.connected = false
}