aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--connector/xmpp/xmpp.go211
1 files changed, 137 insertions, 74 deletions
diff --git a/connector/xmpp/xmpp.go b/connector/xmpp/xmpp.go
index 2d0260c..55efcba 100644
--- a/connector/xmpp/xmpp.go
+++ b/connector/xmpp/xmpp.go
@@ -4,6 +4,7 @@ import (
"crypto/tls"
"fmt"
"strings"
+ "sync"
"time"
gxmpp "github.com/matterbridge/go-xmpp"
@@ -35,8 +36,14 @@ type XMPP struct {
conn *gxmpp.Client
- isMUC map[string]bool
- joinedMUC map[string]bool
+ stateLock sync.Mutex
+ muc map[RoomID]*mucInfo
+}
+
+type mucInfo struct {
+ joined bool
+ pendingJoins map[UserID]string
+ pendingLeaves map[UserID]struct{}
}
func (xm *XMPP) SetHandler(h Handler) {
@@ -84,10 +91,7 @@ func (xm *XMPP) Configure(c Configuration) error {
}
// Try to connect
- if xm.isMUC == nil {
- xm.isMUC = make(map[string]bool)
- }
- xm.joinedMUC = make(map[string]bool)
+ xm.muc = make(map[RoomID]*mucInfo)
xm.connectorLoopNum += 1
go xm.connectLoop(xm.connectorLoopNum)
@@ -134,13 +138,13 @@ func (xm *XMPP) connectLoop(num int) {
xm.connected = true
xm.timeout = 10
- for muc, joined := range xm.joinedMUC {
- if joined {
- _, err := xm.conn.JoinMUCNoHistory(muc, xm.nickname)
+ for muc, mucInfo := range xm.muc {
+ if mucInfo.joined {
+ _, err := xm.conn.JoinMUCNoHistory(string(muc), xm.nickname)
if err != nil {
xm.handler.SystemMessage(fmt.Sprintf("Could not rejoin MUC %s after reconnection: %s", muc, err))
xm.handler.Left(RoomID(muc))
- delete(xm.joinedMUC, muc)
+ mucInfo.joined = false
}
}
}
@@ -184,85 +188,123 @@ func (xm *XMPP) handleXMPP() error {
log.Tracef("XMPP: %#v\n", m)
- switch v := m.(type) {
- case gxmpp.Chat:
- remote_sp := strings.Split(v.Remote, "/")
+ xm.handleXMPPStance(m)
+ }
+}
- // Skip self-sent events
- if v.Remote == xm.jid || (v.Type == "groupchat" && len(remote_sp) == 2 && remote_sp[1] == xm.nickname) {
- continue
- }
+func (xm *XMPP) handleXMPPStance(m interface{}) {
+ xm.stateLock.Lock()
+ defer xm.stateLock.Unlock()
- // If empty text, make sure we joined the room
- // We would do this at every incoming message if it were not so costly
- if v.Text == "" && v.Type == "groupchat" {
- xm.handler.Joined(RoomID(remote_sp[0]))
- }
+ switch v := m.(type) {
+ case gxmpp.Chat:
+ remote_sp := strings.Split(v.Remote, "/")
- // Handle subject change in group chats
- if v.Subject != "" && v.Type == "groupchat" {
- author := UserID("")
- if len(remote_sp) == 2 {
- if remote_sp[1] == xm.nickname {
- author = xm.User()
- } else {
- author = UserID(remote_sp[1] + "@" + remote_sp[0])
- }
+ // Skip self-sent events
+ if v.Remote == xm.jid || (v.Type == "groupchat" && len(remote_sp) == 2 && remote_sp[1] == xm.nickname) {
+ return
+ }
+
+ // If empty text, make sure we joined the room
+ // We would do this at every incoming message if it were not so costly
+ if v.Text == "" && v.Type == "groupchat" {
+ xm.handler.Joined(RoomID(remote_sp[0]))
+ }
+
+ // Handle subject change in group chats
+ if v.Subject != "" && v.Type == "groupchat" {
+ author := UserID("")
+ if len(remote_sp) == 2 {
+ if remote_sp[1] == xm.nickname {
+ author = xm.User()
+ } else {
+ author = UserID(remote_sp[1] + "@" + remote_sp[0])
}
- xm.handler.RoomInfoUpdated(RoomID(remote_sp[0]), author, &RoomInfo{
- Topic: v.Subject,
- })
}
+ xm.handler.RoomInfoUpdated(RoomID(remote_sp[0]), author, &RoomInfo{
+ Topic: v.Subject,
+ })
+ }
- // Handle text message
- if v.Text != "" {
- event := &Event{
- Type: EVENT_MESSAGE,
- Text: v.Text,
- }
+ // Handle text message
+ if v.Text != "" {
+ event := &Event{
+ Type: EVENT_MESSAGE,
+ Text: v.Text,
+ }
- if strings.HasPrefix(event.Text, "/me ") {
- event.Type = EVENT_ACTION
- event.Text = strings.Replace(event.Text, "/me ", "", 1)
- }
+ if strings.HasPrefix(event.Text, "/me ") {
+ event.Type = EVENT_ACTION
+ event.Text = strings.Replace(event.Text, "/me ", "", 1)
+ }
- if v.Type == "chat" {
- event.Author = UserID(remote_sp[0])
- xm.handler.Event(event)
- }
- if v.Type == "groupchat" && len(remote_sp) == 2 {
- event.Room = RoomID(remote_sp[0])
- event.Author = UserID(remote_sp[1] + "@" + remote_sp[0])
- event.Id = v.ID
- xm.handler.Event(event)
- }
+ if v.Type == "chat" {
+ event.Author = UserID(remote_sp[0])
+ xm.handler.Event(event)
}
- case gxmpp.Presence:
- remote := strings.Split(v.From, "/")
- if ismuc, ok := xm.isMUC[remote[0]]; ok && ismuc {
- // skip presence with no user and self-presence
- if len(remote) < 2 || remote[1] == xm.nickname {
- continue
+ if v.Type == "groupchat" && len(remote_sp) == 2 {
+ // First flush pending leaves and joins
+ room_id := RoomID(remote_sp[0])
+ if muc, ok := xm.muc[room_id]; ok {
+ muc.flushLeavesJoins(room_id, xm.handler)
}
- user := UserID(remote[1] + "@" + remote[0])
- event := &Event{
- Type: EVENT_JOIN,
- Room: RoomID(remote[0]),
- Author: user,
+ // Now send event
+ event.Room = room_id
+ event.Author = UserID(remote_sp[1] + "@" + remote_sp[0])
+ event.Id = v.ID
+ xm.handler.Event(event)
+ }
+ }
+ case gxmpp.Presence:
+ remote := strings.Split(v.From, "/")
+ room := RoomID(remote[0])
+ if mucInfo, ok := xm.muc[room]; ok {
+ // skip presence with no user and self-presence
+ if len(remote) < 2 || remote[1] == xm.nickname {
+ return
+ }
+
+ user := UserID(remote[1] + "@" + remote[0])
+ if v.Type != "unavailable" {
+ if _, ok := mucInfo.pendingLeaves[user]; ok {
+ delete(mucInfo.pendingLeaves, user)
+ } else {
+ mucInfo.pendingJoins[user] = remote[1]
}
- if v.Type == "unavailable" {
- event.Type = EVENT_LEAVE
+ } else {
+ if _, ok := mucInfo.pendingJoins[user]; ok {
+ delete(mucInfo.pendingJoins, user)
+ } else {
+ mucInfo.pendingLeaves[user] = struct{}{}
}
- xm.handler.Event(event)
- xm.handler.UserInfoUpdated(user, &UserInfo{
- DisplayName: remote[1],
- })
}
}
}
}
+func (muc *mucInfo) flushLeavesJoins(room RoomID, handler Handler) {
+ for user, display_name := range muc.pendingJoins {
+ handler.Event(&Event{
+ Type: EVENT_JOIN,
+ Room: room,
+ Author: user,
+ })
+ handler.UserInfoUpdated(user, &UserInfo{
+ DisplayName: display_name,
+ })
+ }
+ for user, _ := range muc.pendingLeaves {
+ handler.Event(&Event{
+ Type: EVENT_LEAVE,
+ Room: room,
+ Author: user,
+ })
+ }
+ muc.pendingJoins = make(map[UserID]string)
+ muc.pendingLeaves = make(map[UserID]struct{})
+}
+
func (xm *XMPP) User() UserID {
return UserID(xm.jid)
}
@@ -296,13 +338,19 @@ func (xm *XMPP) SetRoomInfo(roomId RoomID, info *RoomInfo) error {
}
func (xm *XMPP) Join(roomId RoomID) error {
- xm.isMUC[string(roomId)] = true
+ xm.stateLock.Lock()
+ defer xm.stateLock.Unlock()
+
+ xm.muc[roomId] = &mucInfo{
+ pendingJoins: make(map[UserID]string),
+ pendingLeaves: make(map[UserID]struct{}),
+ }
log.Tracef("Join %s with nick %s\n", roomId, xm.nickname)
_, err := xm.conn.JoinMUCNoHistory(string(roomId), xm.nickname)
if err == nil {
- xm.joinedMUC[string(roomId)] = true
+ xm.muc[roomId].joined = true
}
return err
@@ -319,8 +367,14 @@ func (xm *XMPP) Invite(userId UserID, roomId RoomID) error {
}
func (xm *XMPP) Leave(roomId RoomID) {
+ xm.stateLock.Lock()
+ defer xm.stateLock.Unlock()
+
xm.conn.LeaveMUC(string(roomId))
- delete(xm.joinedMUC, string(roomId))
+
+ if muc, ok := xm.muc[roomId]; ok {
+ muc.joined = false
+ }
}
func (xm *XMPP) SearchForUsers(query string) ([]UserSearchResult, error) {
@@ -329,6 +383,9 @@ func (xm *XMPP) SearchForUsers(query string) ([]UserSearchResult, error) {
}
func (xm *XMPP) Send(event *Event) (string, error) {
+ xm.stateLock.Lock()
+ defer xm.stateLock.Unlock()
+
if event.Attachments != nil && len(event.Attachments) > 0 {
for _, at := range event.Attachments {
url := at.URL()
@@ -355,6 +412,9 @@ func (xm *XMPP) Send(event *Event) (string, error) {
})
return event.Id, err
} else if len(event.Room) > 0 {
+ if muc, ok := xm.muc[event.Room]; ok {
+ muc.flushLeavesJoins(event.Room, xm.handler)
+ }
_, err := xm.conn.Send(gxmpp.Chat{
Type: "groupchat",
Remote: string(event.Room),
@@ -372,6 +432,9 @@ func (xm *XMPP) UserCommand(cmd string) {
}
func (xm *XMPP) Close() {
+ xm.stateLock.Lock()
+ defer xm.stateLock.Unlock()
+
if xm.conn != nil {
xm.conn.Close()
}