diff options
author | Alex Auvolat <alex@adnab.me> | 2020-03-01 20:37:34 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-03-01 20:37:34 +0100 |
commit | d2791094d90359c84c6bf103e6ca36ce9b96ee20 (patch) | |
tree | 7a59b9b6a549190629ff517977d5a872a271b86d | |
parent | 1d8b7ef84a3487d626fc05194ce869290b7dd30e (diff) | |
download | easybridge-d2791094d90359c84c6bf103e6ca36ce9b96ee20.tar.gz easybridge-d2791094d90359c84c6bf103e6ca36ce9b96ee20.zip |
Update how mattermost connector works to be more resilient to deco
-rw-r--r-- | connector/mattermost/mattermost.go | 71 |
1 files changed, 48 insertions, 23 deletions
diff --git a/connector/mattermost/mattermost.go b/connector/mattermost/mattermost.go index 6c0e091..0a632a9 100644 --- a/connector/mattermost/mattermost.go +++ b/connector/mattermost/mattermost.go @@ -42,6 +42,7 @@ type mmCaches struct { mmusers map[string]string // map mm username to mm user id sentjoined map[string]bool // map username/room name to bool displayname map[UserID]string // map username to last displayname + initsynced map[string]bool // chans for which init sync has been done } func (mm *Mattermost) SetHandler(h Handler) { @@ -57,6 +58,15 @@ func (mm *Mattermost) Configure(c Configuration) error { mm.Close() } + // Reinitialize shared data structures + mm.handlerStopChan = make(chan bool) + + mm.caches.mmusers = make(map[string]string) + mm.caches.sentjoined = make(map[string]bool) + mm.caches.displayname = make(map[UserID]string) + mm.caches.initsynced = make(map[string]bool) + + // Read config var err error mm.server, err = c.GetString("server") @@ -100,23 +110,23 @@ func (mm *Mattermost) Configure(c Configuration) error { if token != "" { password = "token=" + token } + + // Try to log in mm.conn = matterclient.New(mm.username, password, anyteam, mm.server) mm.conn.Credentials.NoTLS = notls err = mm.conn.Login() if err != nil { return err } + + // Try to start listening for messages + // Everytime the listener reconnects, mm.handleConnected does a sync of room status + mm.conn.OnWsConnect = mm.handleConnected go mm.conn.WsReceiver() go mm.conn.StatusLoop() + go mm.handleLoop(mm.conn.MessageChan, mm.handlerStopChan) - for i := 0; i < 42; i++ { - time.Sleep(time.Duration(1) * time.Second) - if mm.conn.WsConnected { - mm.handleConnected() - return nil - } - } - return fmt.Errorf("Failed to connect after 42s attempting") + return nil } func (mm *Mattermost) User() UserID { @@ -319,17 +329,7 @@ func (mm *Mattermost) Close() { } func (mm *Mattermost) handleConnected() { - // Reinitialize shared data structures - mm.handlerStopChan = make(chan bool) - - mm.caches.mmusers = make(map[string]string) - mm.caches.sentjoined = make(map[string]bool) - mm.caches.displayname = make(map[UserID]string) - - log.Debugf("Connected to mattermost: %s@%s\n", mm.username, mm.server) - - // Handle incoming messages - go mm.handleLoop(mm.conn.MessageChan, mm.handlerStopChan) + log.Debugf("(Re-)connected to mattermost: %s@%s ; doing channel sync\n", mm.username, mm.server) // Initial channel sync chans := mm.conn.GetChannels() @@ -337,20 +337,43 @@ func (mm *Mattermost) handleConnected() { for _, ch := range chans { if _, ok := doneCh[ch.Id]; !ok { doneCh[ch.Id] = true - go mm.initSyncChannel(ch) + go mm.syncChannel(ch) } } } +func (mm *Mattermost) syncChannel(ch *model.Channel) { + // The first time we see a chan, sync everything (member list, names, profile pictures) + must_initsync := func() bool { + mm.caches.Lock() + defer mm.caches.Unlock() + + if x, ok := mm.caches.initsynced[ch.Id]; ok && x { + return false + } + mm.caches.initsynced[ch.Id] = true + return true + }() + + if must_initsync { + mm.initSyncChannel(ch) + } + + // The following times, only sync missing messages + mm.backlogChannel(ch) +} + func (mm *Mattermost) initSyncChannel(ch *model.Channel) { if len(strings.Split(ch.Name, "__")) == 2 { // DM channel // Update remote user info users := strings.Split(ch.Name, "__") for _, uid := range users { - user := mm.conn.GetUser(uid) - if user != nil && uid != mm.conn.User.Id { - mm.updateUserInfo(user) + if uid != mm.conn.User.Id { + user := mm.conn.GetUser(uid) + if user != nil { + mm.updateUserInfo(user) + } } } } else { @@ -415,7 +438,9 @@ func (mm *Mattermost) initSyncChannel(ch *model.Channel) { log.Warnf("Could not get channel members: %s", resp.Error.Error()) } } +} +func (mm *Mattermost) backlogChannel(ch *model.Channel) { // Read backlog last_seen_post := mm.handler.CacheGet(fmt.Sprintf("last_seen_%s", ch.Id)) if last_seen_post != "" { |