aboutsummaryrefslogblamecommitdiff
path: root/connector/mattermost/mattermost.go
blob: 52eb40f9e99799c0ac8d871357d58a2e05abd46c (plain) (tree)
1
2
3
4
5
6
7
8
9


                  
                       
             

                   

                 
              

              
                                                    
                                                       
                                        









                                                             
                       
                       
                                
 


                                                                                                        
                                              
                                 
 








                                                                         

 




                                             
                                  


















                                                        
                                                                  








                                                                   
                                          


                          





                                                        
 




                                                




                                                  
                                                                             
                                         






                               

























                                                                   
                                                                                                                     



















                                                                  
                                                               

                                            
                                                                        

                                                     

                                                          



                                                                                               










                                                                                                     




                                                       

                                   
 



                                                                             

                                      
                        




































                                                                                




                                                






                                                                
                                                          





                                       



                                                     
                                      
                 
                                   


                                                          
                                      
                 


                                                                                  
                                             




                                                                          
                                      


                                                                      
                
                                                       
         
 




                                                        
                                              



                                                        
                                              



                                                                                                 
                                              




                                                                    
                                                             
                              
                                                             
                                     
         
                                   


                               


                                     






                                          
                                              
                                            



                                                       
 
                                                                              
 



                                                                 
                                      
                                       
                                  


                                                 
                 

         
 











                                                                  


                                                                                
                              
                 
                                     


                                       
                                              
                                         







                                                                                                    
                                                                  





                                                                                                              

                                                                                                                             





                                                                                                           







                                                                     
                                                                                          
                                                                                                   













                                                                                               
                                                                                          
                 
         
 
                       

                                                                                 

















                                                                                                                    
                 
                
















                                                                                                    








                                                                                      

                                                                    


                                                                      



                         
 



                                                                        



                                                                                   



                                               





                                                                                                

                                                                                                                  





                                                                                      
                         
                 
                                                      
                                                        





                                                                        




                                                          

                                        

                                           
                  
                                                      


         
                                                                     



                                                             






                                                      



                                                                                                   










                                                                        
                               
 

                              
                                
                               

                                      




                                          

                                                         
                                                    
                                                          
                                                             
                                                          
                                                              







                                                                                     


                                                                          
                                                           


                                                            
                                                                                     


                 








                                                           
                                                                                         
                



                                                                      



                                                               
                                             

                                                        





                                                                                     


                                                                
                                                                                                 





                                                                               
package mattermost

import (
	"encoding/json"
	"fmt"
	"io/ioutil"
	"net/http"
	_ "os"
	"strings"
	"sync"
	"time"

	"github.com/42wim/matterbridge/matterclient"
	"github.com/mattermost/mattermost-server/model"
	log "github.com/sirupsen/logrus"

	. "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector"
)

// User id format: nickname@server
// Room id format: room_name@team@server

type Mattermost struct {
	handler Handler

	server   string
	username string
	teams    map[string]bool

	initial_members int // How many room members (maximum) to load when first joining a channel
	initial_backlog int // How many previous messages (maximum) to load when first joining a channel

	conn            *matterclient.MMClient
	handlerStopChan chan bool

	caches mmCaches
}

type mmCaches struct {
	sync.Mutex

	mmusers     map[string]string // map mm username to mm user id
	sentjoined  map[string]bool   // map username/room name to bool
	displayname map[UserID]string // map username to last displayname
}

func (mm *Mattermost) SetHandler(h Handler) {
	mm.handler = h
}

func (mm *Mattermost) Protocol() string {
	return MATTERMOST_PROTOCOL
}

func (mm *Mattermost) Configure(c Configuration) error {
	if mm.conn != nil {
		mm.Close()
	}

	var err error

	mm.server, err = c.GetString("server")
	if err != nil {
		return err
	}

	mm.username, err = c.GetString("username")
	if err != nil {
		return err
	}

	mm.initial_members, err = c.GetInt("initial_members", 100)
	if err != nil {
		return err
	}

	mm.initial_backlog, err = c.GetInt("initial_backlog", 1000)
	if err != nil {
		return err
	}

	teams, err := c.GetString("teams")
	if err != nil {
		return err
	}
	mm.teams = map[string]bool{}
	anyteam := ""
	for _, team := range strings.Split(teams, ",") {
		anyteam = strings.TrimSpace(team)
		mm.teams[anyteam] = true
	}

	notls, err := c.GetBool("no_tls", false)
	if err != nil {
		return err
	}

	password, _ := c.GetString("password", "")
	token, _ := c.GetString("token", "")
	if token != "" {
		password = "token=" + token
	}
	mm.conn = matterclient.New(mm.username, password, anyteam, mm.server)
	mm.conn.Credentials.NoTLS = notls
	err = mm.conn.Login()
	if err != nil {
		return err
	}
	go mm.conn.WsReceiver()
	go mm.conn.StatusLoop()

	for i := 0; i < 42; i++ {
		time.Sleep(time.Duration(1) * time.Second)
		if mm.conn.WsConnected {
			mm.handleConnected()
			return nil
		}
	}
	return fmt.Errorf("Failed to connect after 42s attempting")
}

func (mm *Mattermost) User() UserID {
	return UserID(mm.username + "@" + mm.server)
}

func (mm *Mattermost) getTeamIdByName(name string) string {
	for _, team := range mm.conn.OtherTeams {
		if team.Team.Name == name {
			return team.Id
		}
	}
	return ""
}

func (mm *Mattermost) checkRoomId(id RoomID) (string, error) {
	x := strings.Split(string(id), "@")
	if len(x) == 1 {
		return "", fmt.Errorf("Please write whole room ID with team and server: %s@<team>@%s", id, mm.server)
	}
	if len(x) == 2 {
		return x[0], nil
	}
	if len(x) != 3 || x[2] != mm.server {
		return "", fmt.Errorf("Invalid room ID: %s", id)
	}

	team_id := mm.getTeamIdByName(x[1])
	if team_id == "" {
		return "", fmt.Errorf("Team not found: %s", id)
	}

	ch_id := mm.conn.GetChannelId(x[0], team_id)
	if ch_id == "" {
		return "", fmt.Errorf("Channel not found: %s", id)
	}
	return ch_id, nil
}

func (mm *Mattermost) reverseRoomId(id string) (bool, RoomID) {
	team := mm.conn.GetChannelTeamId(id)
	if team == "" {
		return true, RoomID(fmt.Sprintf("%s@%s", id, mm.server))
	} else {
		teamName := mm.conn.GetTeamName(team)
		if u, ok := mm.teams[teamName]; ok && u {
			name := mm.conn.GetChannelName(id)
			return true, RoomID(fmt.Sprintf("%s@%s@%s", name, teamName, mm.server))
		} else {
			return false, ""
		}
	}
}

func (mm *Mattermost) checkUserId(id UserID) (string, error) {
	x := strings.Split(string(id), "@")
	if len(x) == 1 {
		return "", fmt.Errorf("Please write whole user ID with server: %s@%s", id, mm.server)
	}
	if len(x) != 2 || x[1] != mm.server {
		return "", fmt.Errorf("Invalid user ID: %s", id)
	}

	mm.caches.Lock()
	defer mm.caches.Unlock()

	if user_id, ok := mm.caches.mmusers[x[0]]; ok {
		return user_id, nil
	}

	u, resp := mm.conn.Client.GetUserByUsername(x[0], "")
	if u == nil || resp.Error != nil {
		return "", fmt.Errorf("Not found: %s (%s)", x[0], resp.Error)
	}
	mm.caches.mmusers[x[0]] = u.Id

	return u.Id, nil
}

func (mm *Mattermost) SetUserInfo(info *UserInfo) error {
	return fmt.Errorf("Not implemented")
}

func (mm *Mattermost) SetRoomInfo(roomId RoomID, info *RoomInfo) error {
	ch, err := mm.checkRoomId(roomId)
	if err != nil {
		return err
	}

	if info.Topic != "" {
		mm.conn.UpdateChannelHeader(ch, info.Topic)
	}

	if info.Picture != nil {
		err = fmt.Errorf("Not supported: channel picture on mattermost")
	}

	if info.Name != "" {
		err = fmt.Errorf("Not supported: channel name on mattermost")
	}

	return err
}

func (mm *Mattermost) Join(roomId RoomID) error {
	ch, err := mm.checkRoomId(roomId)
	if err != nil {
		return err
	}

	return mm.conn.JoinChannel(ch)
}

func (mm *Mattermost) Invite(userId UserID, roomId RoomID) error {
	if roomId == "" {
		_, err := mm.checkUserId(userId)
		return err
	}

	return fmt.Errorf("Not supported: invite on mattermost")
}

func (mm *Mattermost) Leave(roomId RoomID) {
	// Not supported? TODO
}

func (mm *Mattermost) Send(event *Event) (string, error) {
	post := &model.Post{
		Message: event.Text,
	}
	if event.Type == EVENT_ACTION {
		post.Type = "me"
	}

	if event.Room != "" {
		ch, err := mm.checkRoomId(event.Room)
		if err != nil {
			return "", err
		}
		post.ChannelId = ch
	} else if event.Recipient != "" {
		ui, err := mm.checkUserId(event.Recipient)
		if err != nil {
			return "", err
		}

		_, resp := mm.conn.Client.CreateDirectChannel(mm.conn.User.Id, ui)
		if resp.Error != nil {
			return "", resp.Error
		}
		channelName := model.GetDMNameFromIds(ui, mm.conn.User.Id)

		err = mm.conn.UpdateChannels()
		if err != nil {
			return "", err
		}

		post.ChannelId = mm.conn.GetChannelId(channelName, "")
	} else {
		return "", fmt.Errorf("Invalid target")
	}

	if event.Attachments != nil {
		post.FileIds = []string{}
		for _, file := range event.Attachments {
			rdr, err := file.Read()
			if err != nil {
				return "", err
			}
			defer rdr.Close()
			data, err := ioutil.ReadAll(rdr)
			if err != nil {
				return "", err
			}
			up_file, err := mm.conn.UploadFile(data, post.ChannelId, file.Filename())
			if err != nil {
				log.Warnf("UploadFile error: %s", err)
				return "", err
			}
			post.FileIds = append(post.FileIds, up_file)
		}
	}

	created_post, resp := mm.conn.Client.CreatePost(post)
	if resp.Error != nil {
		log.Warnf("CreatePost error: %s", resp.Error)
		return "", resp.Error
	}
	return created_post.Id, nil
}

func (mm *Mattermost) Close() {
	if mm.conn != nil {
		mm.conn.WsQuit = true
	}
	if mm.handlerStopChan != nil {
		mm.handlerStopChan <- true
		mm.handlerStopChan = nil
	}
}

func (mm *Mattermost) handleConnected() {
	// Reinitialize shared data structures
	mm.handlerStopChan = make(chan bool)

	mm.caches.mmusers = make(map[string]string)
	mm.caches.sentjoined = make(map[string]bool)
	mm.caches.displayname = make(map[UserID]string)

	log.Debugf("Connected to mattermost: %s@%s\n", mm.username, mm.server)

	// Handle incoming messages
	go mm.handleLoop(mm.conn.MessageChan, mm.handlerStopChan)

	// Initial channel sync
	chans := mm.conn.GetChannels()
	doneCh := make(map[string]bool)
	for _, ch := range chans {
		if _, ok := doneCh[ch.Id]; !ok {
			doneCh[ch.Id] = true
			go mm.initSyncChannel(ch)
		}
	}
}

func (mm *Mattermost) initSyncChannel(ch *model.Channel) {
	if len(strings.Split(ch.Name, "__")) == 2 {
		// DM channel
		// Update remote user info
		users := strings.Split(ch.Name, "__")
		for _, uid := range users {
			user := mm.conn.GetUser(uid)
			if user != nil && uid != mm.conn.User.Id {
				mm.updateUserInfo(user)
			}
		}
	} else {
		interested, id := mm.reverseRoomId(ch.Id)
		if !interested {
			// Skip channels that are not in teams we want to bridge
			return
		}
		mm.handler.Joined(id)

		// Update room info
		room_info := &RoomInfo{
			Name:  ch.DisplayName,
			Topic: ch.Header,
		}
		for _, t := range mm.conn.OtherTeams {
			if t.Id == ch.TeamId {
				if t.Team.DisplayName != "" {
					room_info.Name = t.Team.DisplayName + " / " + room_info.Name
				} else {
					room_info.Name = t.Team.Name + " / " + room_info.Name
				}
				if t.Team.LastTeamIconUpdate > 0 {
					room_info.Picture = &LazyBlobMediaObject{
						ObjectFilename: fmt.Sprintf("%s-%d",
							t.Team.Name,
							t.Team.LastTeamIconUpdate),
						GetFn: func(o *LazyBlobMediaObject) error {
							team_img, resp := mm.conn.Client.GetTeamIcon(t.Id, "")
							if resp.Error != nil {
								log.Warnf("Could not get team image: %s", resp.Error.Error())
								return resp.Error
							}
							o.ObjectData = team_img
							o.ObjectMimetype = http.DetectContentType(team_img)
							return nil
						},
					}
				}
				break
			}
		}
		mm.handler.RoomInfoUpdated(id, UserID(""), room_info)

		// Update member list
		// TODO (when this will be slow, i.e. hundreds of members): do only a diff
		members, resp := mm.conn.Client.GetChannelMembers(ch.Id, 0, mm.initial_members, "")
		if resp.Error == nil {
			for _, mem := range *members {
				if mem.UserId == mm.conn.User.Id {
					continue
				}
				user := mm.conn.GetUser(mem.UserId)
				if user != nil {
					mm.ensureJoined(user, id)
					mm.updateUserInfo(user)
				} else {
					log.Warnf("Could not find joined user: %s", mem.UserId)
				}
			}
		} else {
			log.Warnf("Could not get channel members: %s", resp.Error.Error())
		}
	}

	// Read backlog
	last_seen_post := mm.handler.CacheGet(fmt.Sprintf("last_seen_%s", ch.Id))
	if last_seen_post != "" {
		const NUM_PER_PAGE = 100
		page := 0
		backlogs := []*model.PostList{}
		for {
			backlog, resp := mm.conn.Client.GetPostsAfter(ch.Id, last_seen_post, page, NUM_PER_PAGE, "")
			if resp.Error == nil {
				backlogs = append(backlogs, backlog)
				if len(backlog.Order) == NUM_PER_PAGE {
					page += 1
				} else {
					break
				}
			} else {
				break
			}
		}
		for i := 0; i < len(backlogs); i++ {
			mm.processBacklog(ch, backlogs[i])
		}
	} else {
		backlog, resp := mm.conn.Client.GetPostsForChannel(ch.Id, 0, mm.initial_backlog, "")
		if resp.Error == nil {
			mm.processBacklog(ch, backlog)
		} else {
			log.Warnf("Could not get channel backlog: %s", resp.Error)
		}
	}
}

func (mm *Mattermost) processBacklog(ch *model.Channel, backlog *model.PostList) {
	for i := 0; i < len(backlog.Order); i++ {
		post_id := backlog.Order[len(backlog.Order)-i-1]
		post := backlog.Posts[post_id]
		post_time := time.Unix(post.CreateAt/1000, 0)
		post.Message = fmt.Sprintf("[%s] %s",
			post_time.Format("2006-01-02 15:04:05 MST"), post.Message)
		mm.handlePost(ch.Name, post, true)
	}
}

func (mm *Mattermost) handleLoop(msgCh chan *matterclient.Message, quitCh chan bool) {
	for {
		select {
		case <-quitCh:
			break
		case msg := <-msgCh:
			log.Tracef("Mattermost: %#v\n", msg)
			log.Tracef("Mattermost raw: %#v\n", msg.Raw)
			err := mm.handlePosted(msg.Raw)
			if err != nil {
				log.Warnf("Mattermost error: %s", err)
			}
		}
	}
}

func (mm *Mattermost) updateUserInfo(user *model.User) {
	userId := UserID(fmt.Sprintf("%s@%s", user.Username, mm.server))
	userDisp := user.GetDisplayName(model.SHOW_NICKNAME_FULLNAME)

	mm.caches.Lock()
	defer mm.caches.Unlock()

	if lastdn, ok := mm.caches.displayname[userId]; !ok || lastdn != userDisp {
		ui := &UserInfo{
			DisplayName: userDisp,
		}
		if user.LastPictureUpdate > 0 {
			ui.Avatar = &LazyBlobMediaObject{
				ObjectFilename: fmt.Sprintf("%s-%d",
					user.Username,
					user.LastPictureUpdate),
				GetFn: func(o *LazyBlobMediaObject) error {
					img, resp := mm.conn.Client.GetProfileImage(user.Id, "")
					if resp.Error != nil {
						log.Warnf("Could not get profile picture: %s", resp.Error.Error())
						return resp.Error
					}
					o.ObjectData = img
					o.ObjectMimetype = http.DetectContentType(img)
					return nil
				},
			}
		}
		mm.handler.UserInfoUpdated(userId, ui)
		mm.caches.displayname[userId] = userDisp
	}
}

func (mm *Mattermost) ensureJoined(user *model.User, roomId RoomID) {
	userId := UserID(fmt.Sprintf("%s@%s", user.Username, mm.server))
	cache_key := fmt.Sprintf("%s / %s", userId, roomId)

	mm.caches.Lock()
	defer mm.caches.Unlock()

	if _, ok := mm.caches.sentjoined[cache_key]; !ok {
		mm.handler.Event(&Event{
			Author: userId,
			Room:   roomId,
			Type:   EVENT_JOIN,
		})
		mm.caches.sentjoined[cache_key] = true
	}
}

func (mm *Mattermost) handlePosted(msg *model.WebSocketEvent) error {
	channel_name, ok := msg.Data["channel_name"].(string)
	if !ok {
		return nil
	}
	post_str := msg.Data["post"].(string)
	var post model.Post
	err := json.Unmarshal([]byte(post_str), &post)
	if err != nil {
		return err
	}

	return mm.handlePost(channel_name, &post, false)
}

func (mm *Mattermost) handlePost(channel_name string, post *model.Post, only_messages bool) error {
	// Skip self messages
	if post.UserId == mm.conn.User.Id {
		return nil
	}

	// Find sending user
	user := mm.conn.GetUser(post.UserId)
	if user == nil {
		return fmt.Errorf("Invalid user")
	}
	userId := UserID(fmt.Sprintf("%s@%s", user.Username, mm.server))
	mm.updateUserInfo(user)

	// Build message event
	msg_ev := &Event{
		Id:     post.Id,
		Author: userId,
		Text:   post.Message,
		Type:   EVENT_MESSAGE,
	}
	if post.Type == "me" {
		msg_ev.Type = EVENT_ACTION
	}

	// Handle files
	if post.FileIds != nil && len(post.FileIds) > 0 {
		msg_ev.Attachments = []MediaObject{}
		for _, file := range post.Metadata.Files {
			media_object := &LazyBlobMediaObject{
				ObjectFilename: file.Name,
				ObjectMimetype: file.MimeType,
				GetFn: func(o *LazyBlobMediaObject) error {
					blob, resp := mm.conn.Client.GetFile(file.Id)
					if resp.Error != nil {
						return resp.Error
					}
					o.ObjectData = blob
					return nil
				},
			}
			if file.Width > 0 {
				media_object.ObjectImageSize = &ImageSize{
					Width:  file.Width,
					Height: file.Height,
				}
			}
			msg_ev.Attachments = append(msg_ev.Attachments, media_object)
		}
	}

	// Dispatch as PM or as room message
	if len(strings.Split(channel_name, "__")) == 2 {
		// Private message, no need to find room id
		if user.Id == mm.conn.User.Id {
			// Skip self sent messages
			return nil
		}

		mm.handler.Event(msg_ev)
		mm.handler.CachePut(fmt.Sprintf("last_seen_%s", post.ChannelId), post.Id)
	} else {
		interested, roomId := mm.reverseRoomId(post.ChannelId)
		if !interested {
			return nil
		}
		if roomId == "" {
			return fmt.Errorf("Invalid channel id")
		}

		mm.ensureJoined(user, roomId)

		if post.Type == "system_header_change" {
			if !only_messages {
				new_header := post.Props["new_header"].(string)
				mm.handler.RoomInfoUpdated(roomId, userId, &RoomInfo{
					Topic: new_header,
				})
			}
		} else if post.Type == "" || post.Type == "me" {
			msg_ev.Room = roomId
			mm.handler.Event(msg_ev)
			mm.handler.CachePut(fmt.Sprintf("last_seen_%s", post.ChannelId), post.Id)
		} else {
			return fmt.Errorf("Unhandled post type: %s", post.Type)
		}
	}
	return nil
}