diff options
Diffstat (limited to 'connector')
-rw-r--r-- | connector/xmpp/xmpp.go | 211 |
1 files changed, 137 insertions, 74 deletions
diff --git a/connector/xmpp/xmpp.go b/connector/xmpp/xmpp.go index 2d0260c..55efcba 100644 --- a/connector/xmpp/xmpp.go +++ b/connector/xmpp/xmpp.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "fmt" "strings" + "sync" "time" gxmpp "github.com/matterbridge/go-xmpp" @@ -35,8 +36,14 @@ type XMPP struct { conn *gxmpp.Client - isMUC map[string]bool - joinedMUC map[string]bool + stateLock sync.Mutex + muc map[RoomID]*mucInfo +} + +type mucInfo struct { + joined bool + pendingJoins map[UserID]string + pendingLeaves map[UserID]struct{} } func (xm *XMPP) SetHandler(h Handler) { @@ -84,10 +91,7 @@ func (xm *XMPP) Configure(c Configuration) error { } // Try to connect - if xm.isMUC == nil { - xm.isMUC = make(map[string]bool) - } - xm.joinedMUC = make(map[string]bool) + xm.muc = make(map[RoomID]*mucInfo) xm.connectorLoopNum += 1 go xm.connectLoop(xm.connectorLoopNum) @@ -134,13 +138,13 @@ func (xm *XMPP) connectLoop(num int) { xm.connected = true xm.timeout = 10 - for muc, joined := range xm.joinedMUC { - if joined { - _, err := xm.conn.JoinMUCNoHistory(muc, xm.nickname) + for muc, mucInfo := range xm.muc { + if mucInfo.joined { + _, err := xm.conn.JoinMUCNoHistory(string(muc), xm.nickname) if err != nil { xm.handler.SystemMessage(fmt.Sprintf("Could not rejoin MUC %s after reconnection: %s", muc, err)) xm.handler.Left(RoomID(muc)) - delete(xm.joinedMUC, muc) + mucInfo.joined = false } } } @@ -184,85 +188,123 @@ func (xm *XMPP) handleXMPP() error { log.Tracef("XMPP: %#v\n", m) - switch v := m.(type) { - case gxmpp.Chat: - remote_sp := strings.Split(v.Remote, "/") + xm.handleXMPPStance(m) + } +} - // Skip self-sent events - if v.Remote == xm.jid || (v.Type == "groupchat" && len(remote_sp) == 2 && remote_sp[1] == xm.nickname) { - continue - } +func (xm *XMPP) handleXMPPStance(m interface{}) { + xm.stateLock.Lock() + defer xm.stateLock.Unlock() - // If empty text, make sure we joined the room - // We would do this at every incoming message if it were not so costly - if v.Text == "" && v.Type == "groupchat" { - xm.handler.Joined(RoomID(remote_sp[0])) - } + switch v := m.(type) { + case gxmpp.Chat: + remote_sp := strings.Split(v.Remote, "/") - // Handle subject change in group chats - if v.Subject != "" && v.Type == "groupchat" { - author := UserID("") - if len(remote_sp) == 2 { - if remote_sp[1] == xm.nickname { - author = xm.User() - } else { - author = UserID(remote_sp[1] + "@" + remote_sp[0]) - } + // Skip self-sent events + if v.Remote == xm.jid || (v.Type == "groupchat" && len(remote_sp) == 2 && remote_sp[1] == xm.nickname) { + return + } + + // If empty text, make sure we joined the room + // We would do this at every incoming message if it were not so costly + if v.Text == "" && v.Type == "groupchat" { + xm.handler.Joined(RoomID(remote_sp[0])) + } + + // Handle subject change in group chats + if v.Subject != "" && v.Type == "groupchat" { + author := UserID("") + if len(remote_sp) == 2 { + if remote_sp[1] == xm.nickname { + author = xm.User() + } else { + author = UserID(remote_sp[1] + "@" + remote_sp[0]) } - xm.handler.RoomInfoUpdated(RoomID(remote_sp[0]), author, &RoomInfo{ - Topic: v.Subject, - }) } + xm.handler.RoomInfoUpdated(RoomID(remote_sp[0]), author, &RoomInfo{ + Topic: v.Subject, + }) + } - // Handle text message - if v.Text != "" { - event := &Event{ - Type: EVENT_MESSAGE, - Text: v.Text, - } + // Handle text message + if v.Text != "" { + event := &Event{ + Type: EVENT_MESSAGE, + Text: v.Text, + } - if strings.HasPrefix(event.Text, "/me ") { - event.Type = EVENT_ACTION - event.Text = strings.Replace(event.Text, "/me ", "", 1) - } + if strings.HasPrefix(event.Text, "/me ") { + event.Type = EVENT_ACTION + event.Text = strings.Replace(event.Text, "/me ", "", 1) + } - if v.Type == "chat" { - event.Author = UserID(remote_sp[0]) - xm.handler.Event(event) - } - if v.Type == "groupchat" && len(remote_sp) == 2 { - event.Room = RoomID(remote_sp[0]) - event.Author = UserID(remote_sp[1] + "@" + remote_sp[0]) - event.Id = v.ID - xm.handler.Event(event) - } + if v.Type == "chat" { + event.Author = UserID(remote_sp[0]) + xm.handler.Event(event) } - case gxmpp.Presence: - remote := strings.Split(v.From, "/") - if ismuc, ok := xm.isMUC[remote[0]]; ok && ismuc { - // skip presence with no user and self-presence - if len(remote) < 2 || remote[1] == xm.nickname { - continue + if v.Type == "groupchat" && len(remote_sp) == 2 { + // First flush pending leaves and joins + room_id := RoomID(remote_sp[0]) + if muc, ok := xm.muc[room_id]; ok { + muc.flushLeavesJoins(room_id, xm.handler) } - user := UserID(remote[1] + "@" + remote[0]) - event := &Event{ - Type: EVENT_JOIN, - Room: RoomID(remote[0]), - Author: user, + // Now send event + event.Room = room_id + event.Author = UserID(remote_sp[1] + "@" + remote_sp[0]) + event.Id = v.ID + xm.handler.Event(event) + } + } + case gxmpp.Presence: + remote := strings.Split(v.From, "/") + room := RoomID(remote[0]) + if mucInfo, ok := xm.muc[room]; ok { + // skip presence with no user and self-presence + if len(remote) < 2 || remote[1] == xm.nickname { + return + } + + user := UserID(remote[1] + "@" + remote[0]) + if v.Type != "unavailable" { + if _, ok := mucInfo.pendingLeaves[user]; ok { + delete(mucInfo.pendingLeaves, user) + } else { + mucInfo.pendingJoins[user] = remote[1] } - if v.Type == "unavailable" { - event.Type = EVENT_LEAVE + } else { + if _, ok := mucInfo.pendingJoins[user]; ok { + delete(mucInfo.pendingJoins, user) + } else { + mucInfo.pendingLeaves[user] = struct{}{} } - xm.handler.Event(event) - xm.handler.UserInfoUpdated(user, &UserInfo{ - DisplayName: remote[1], - }) } } } } +func (muc *mucInfo) flushLeavesJoins(room RoomID, handler Handler) { + for user, display_name := range muc.pendingJoins { + handler.Event(&Event{ + Type: EVENT_JOIN, + Room: room, + Author: user, + }) + handler.UserInfoUpdated(user, &UserInfo{ + DisplayName: display_name, + }) + } + for user, _ := range muc.pendingLeaves { + handler.Event(&Event{ + Type: EVENT_LEAVE, + Room: room, + Author: user, + }) + } + muc.pendingJoins = make(map[UserID]string) + muc.pendingLeaves = make(map[UserID]struct{}) +} + func (xm *XMPP) User() UserID { return UserID(xm.jid) } @@ -296,13 +338,19 @@ func (xm *XMPP) SetRoomInfo(roomId RoomID, info *RoomInfo) error { } func (xm *XMPP) Join(roomId RoomID) error { - xm.isMUC[string(roomId)] = true + xm.stateLock.Lock() + defer xm.stateLock.Unlock() + + xm.muc[roomId] = &mucInfo{ + pendingJoins: make(map[UserID]string), + pendingLeaves: make(map[UserID]struct{}), + } log.Tracef("Join %s with nick %s\n", roomId, xm.nickname) _, err := xm.conn.JoinMUCNoHistory(string(roomId), xm.nickname) if err == nil { - xm.joinedMUC[string(roomId)] = true + xm.muc[roomId].joined = true } return err @@ -319,8 +367,14 @@ func (xm *XMPP) Invite(userId UserID, roomId RoomID) error { } func (xm *XMPP) Leave(roomId RoomID) { + xm.stateLock.Lock() + defer xm.stateLock.Unlock() + xm.conn.LeaveMUC(string(roomId)) - delete(xm.joinedMUC, string(roomId)) + + if muc, ok := xm.muc[roomId]; ok { + muc.joined = false + } } func (xm *XMPP) SearchForUsers(query string) ([]UserSearchResult, error) { @@ -329,6 +383,9 @@ func (xm *XMPP) SearchForUsers(query string) ([]UserSearchResult, error) { } func (xm *XMPP) Send(event *Event) (string, error) { + xm.stateLock.Lock() + defer xm.stateLock.Unlock() + if event.Attachments != nil && len(event.Attachments) > 0 { for _, at := range event.Attachments { url := at.URL() @@ -355,6 +412,9 @@ func (xm *XMPP) Send(event *Event) (string, error) { }) return event.Id, err } else if len(event.Room) > 0 { + if muc, ok := xm.muc[event.Room]; ok { + muc.flushLeavesJoins(event.Room, xm.handler) + } _, err := xm.conn.Send(gxmpp.Chat{ Type: "groupchat", Remote: string(event.Room), @@ -372,6 +432,9 @@ func (xm *XMPP) UserCommand(cmd string) { } func (xm *XMPP) Close() { + xm.stateLock.Lock() + defer xm.stateLock.Unlock() + if xm.conn != nil { xm.conn.Close() } |