aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--account.go17
-rw-r--r--connector/connector.go10
-rw-r--r--connector/irc/irc.go16
-rw-r--r--connector/mattermost/mattermost.go24
-rw-r--r--connector/xmpp/xmpp.go17
-rw-r--r--db.go12
-rw-r--r--server.go14
7 files changed, 70 insertions, 40 deletions
diff --git a/account.go b/account.go
index 732ff77..8365275 100644
--- a/account.go
+++ b/account.go
@@ -418,10 +418,17 @@ func (a *Account) eventInternal(event *Event) error {
}
}
+ var cache_key string
if event.Id != "" {
- cache_key := fmt.Sprintf("%s/event_seen/%s/%s",
+ // If the event has an ID, make sure it is processed only once
+ cache_key = fmt.Sprintf("%s/event_seen/%s/%s",
a.Protocol, mx_room_id, event.Id)
- if !dbKvTestAndSet(cache_key, "yes") {
+ slot_key := dbKvSlotKey(cache_key)
+
+ dbLockSlot(slot_key)
+ defer dbUnlockSlot(slot_key)
+
+ if dbKvGet(cache_key) == "yes" {
// false: cache key was not modified, meaning we
// already saw the event
return nil
@@ -470,6 +477,12 @@ func (a *Account) eventInternal(event *Event) error {
}
}
}
+
+ // Mark message as received in db
+ if cache_key != "" {
+ dbKvPutLocked(cache_key, "yes")
+ }
+
return nil
}
}
diff --git a/connector/connector.go b/connector/connector.go
index 38ce828..e8c382d 100644
--- a/connector/connector.go
+++ b/connector/connector.go
@@ -59,8 +59,14 @@ type Connector interface {
// Leave a channel
Leave(roomId RoomID)
- // Send an event
- Send(event *Event) error
+ // Send an event. Returns the ID of the created remote message.
+ // This ID is used to deduplicate messages: if it comes back, it should have the same Id
+ // than the one returned here.
+ // For backends that do not implement IDs (e.g. IRC), an empty string is returned.
+ // (FIXME how to deduplicate IRC messages?)
+ // The event that is fed in this function may have its ID already set,
+ // in which case the backend is free to re-use the ID or select a new one.
+ Send(event *Event) (string, error)
// Close the connection
Close()
diff --git a/connector/irc/irc.go b/connector/irc/irc.go
index dafe9db..4e5f4fd 100644
--- a/connector/irc/irc.go
+++ b/connector/irc/irc.go
@@ -198,9 +198,9 @@ func (irc *IRC) Leave(roomId RoomID) {
irc.conn.Cmd.Part(ch)
}
-func (irc *IRC) Send(event *Event) error {
+func (irc *IRC) Send(event *Event) (string, error) {
if irc.conn == nil {
- return fmt.Errorf("Not connected")
+ return "", fmt.Errorf("Not connected")
}
// Workaround girc bug
@@ -212,17 +212,17 @@ func (irc *IRC) Send(event *Event) error {
if event.Room != "" {
ch, err := irc.checkRoomId(event.Room)
if err != nil {
- return err
+ return "", err
}
dest = ch
} else if event.Recipient != "" {
ui, err := irc.checkUserId(event.Recipient)
if err != nil {
- return err
+ return "", err
}
dest = ui
} else {
- return fmt.Errorf("Invalid target")
+ return "", fmt.Errorf("Invalid target")
}
if event.Attachments != nil && len(event.Attachments) > 0 {
@@ -230,7 +230,7 @@ func (irc *IRC) Send(event *Event) error {
url := at.URL()
if url == "" {
// TODO find a way to send them using some hosing of some kind
- return fmt.Errorf("Attachment without URL sent to IRC")
+ return "", fmt.Errorf("Attachment without URL sent to IRC")
} else {
irc.conn.Cmd.Message(dest, fmt.Sprintf("%s (%s, %dkb)",
url, at.Mimetype(), at.Size()/1024))
@@ -243,9 +243,9 @@ func (irc *IRC) Send(event *Event) error {
} else if event.Type == EVENT_ACTION {
irc.conn.Cmd.Action(dest, event.Text)
} else {
- return fmt.Errorf("Invalid event type")
+ return "", fmt.Errorf("Invalid event type")
}
- return nil
+ return "", nil
}
func (irc *IRC) Close() {
diff --git a/connector/mattermost/mattermost.go b/connector/mattermost/mattermost.go
index 12ac604..52eb40f 100644
--- a/connector/mattermost/mattermost.go
+++ b/connector/mattermost/mattermost.go
@@ -243,7 +243,7 @@ func (mm *Mattermost) Leave(roomId RoomID) {
// Not supported? TODO
}
-func (mm *Mattermost) Send(event *Event) error {
+func (mm *Mattermost) Send(event *Event) (string, error) {
post := &model.Post{
Message: event.Text,
}
@@ -254,29 +254,29 @@ func (mm *Mattermost) Send(event *Event) error {
if event.Room != "" {
ch, err := mm.checkRoomId(event.Room)
if err != nil {
- return err
+ return "", err
}
post.ChannelId = ch
} else if event.Recipient != "" {
ui, err := mm.checkUserId(event.Recipient)
if err != nil {
- return err
+ return "", err
}
_, resp := mm.conn.Client.CreateDirectChannel(mm.conn.User.Id, ui)
if resp.Error != nil {
- return resp.Error
+ return "", resp.Error
}
channelName := model.GetDMNameFromIds(ui, mm.conn.User.Id)
err = mm.conn.UpdateChannels()
if err != nil {
- return err
+ return "", err
}
post.ChannelId = mm.conn.GetChannelId(channelName, "")
} else {
- return fmt.Errorf("Invalid target")
+ return "", fmt.Errorf("Invalid target")
}
if event.Attachments != nil {
@@ -284,28 +284,28 @@ func (mm *Mattermost) Send(event *Event) error {
for _, file := range event.Attachments {
rdr, err := file.Read()
if err != nil {
- return err
+ return "", err
}
defer rdr.Close()
data, err := ioutil.ReadAll(rdr)
if err != nil {
- return err
+ return "", err
}
up_file, err := mm.conn.UploadFile(data, post.ChannelId, file.Filename())
if err != nil {
log.Warnf("UploadFile error: %s", err)
- return err
+ return "", err
}
post.FileIds = append(post.FileIds, up_file)
}
}
- _, resp := mm.conn.Client.CreatePost(post)
+ created_post, resp := mm.conn.Client.CreatePost(post)
if resp.Error != nil {
log.Warnf("CreatePost error: %s", resp.Error)
- return resp.Error
+ return "", resp.Error
}
- return nil
+ return created_post.Id, nil
}
func (mm *Mattermost) Close() {
diff --git a/connector/xmpp/xmpp.go b/connector/xmpp/xmpp.go
index dcf1db6..efaaf64 100644
--- a/connector/xmpp/xmpp.go
+++ b/connector/xmpp/xmpp.go
@@ -306,13 +306,13 @@ func (xm *XMPP) Leave(roomId RoomID) {
xm.conn.LeaveMUC(string(roomId))
}
-func (xm *XMPP) Send(event *Event) error {
+func (xm *XMPP) Send(event *Event) (string, error) {
if event.Attachments != nil && len(event.Attachments) > 0 {
for _, at := range event.Attachments {
url := at.URL()
if url == "" {
// TODO find a way to send them using some hosing of some kind
- return fmt.Errorf("Attachment without URL sent to XMPP")
+ return "", fmt.Errorf("Attachment without URL sent to XMPP")
} else {
event.Text += fmt.Sprintf("\n%s (%s, %dkb)",
url, at.Mimetype(), at.Size()/1024)
@@ -320,6 +320,10 @@ func (xm *XMPP) Send(event *Event) error {
}
}
+ if event.Id == "" {
+ event.Id = xid.New().String()
+ }
+
log.Tracef("xm *XMPP Send %#v\n", event)
if len(event.Recipient) > 0 {
_, err := xm.conn.Send(gxmpp.Chat{
@@ -327,20 +331,17 @@ func (xm *XMPP) Send(event *Event) error {
Remote: string(event.Recipient),
Text: event.Text,
})
- return err
+ return event.Id, err
} else if len(event.Room) > 0 {
- if event.Id == "" {
- event.Id = xid.New().String()
- }
_, err := xm.conn.Send(gxmpp.Chat{
Type: "groupchat",
Remote: string(event.Room),
Text: event.Text,
ID: event.Id,
})
- return err
+ return event.Id, err
} else {
- return fmt.Errorf("Invalid event")
+ return "", fmt.Errorf("Invalid event")
}
}
diff --git a/db.go b/db.go
index f9bed06..cdbdca1 100644
--- a/db.go
+++ b/db.go
@@ -163,6 +163,14 @@ func dbKvPut(key string, value string) {
dbLockSlot(slot_key)
defer dbUnlockSlot(slot_key)
+ dbKvPutLocked(key, value)
+}
+
+// Variant of dbKvPut that does not take a lock,
+// use this if the slot is already locked
+func dbKvPutLocked(key string, value string) {
+ slot_key := dbKvSlotKey(key)
+
var entry DbKv
db.Where(&DbKv{Key: key}).Assign(&DbKv{Value: value}).FirstOrCreate(&entry)
dbCache.Add(slot_key, value)
@@ -179,9 +187,7 @@ func dbKvTestAndSet(key string, value string) bool {
return false
}
- var entry DbKv
- db.Where(&DbKv{Key: key}).Assign(&DbKv{Value: value}).FirstOrCreate(&entry)
- dbCache.Add(slot_key, value)
+ dbKvPutLocked(key, value)
return true
}
diff --git a/server.go b/server.go
index 241a977..dc04006 100644
--- a/server.go
+++ b/server.go
@@ -145,18 +145,22 @@ func handleTxnEvent(e *mxlib.Event) error {
} else if e.Sender == pm_room.MxUserID {
ev.Author = acct.Conn.User()
ev.Recipient = pm_room.UserID
- return acct.Conn.Send(ev)
+ _, err := acct.Conn.Send(ev)
+ return err
}
} else if room := dbIsPublicRoom(e.RoomId); room != nil {
- cache_key := fmt.Sprintf("%s/event_seen/%s/%s",
- room.Protocol, e.RoomId, ev.Id)
- dbKvPut(cache_key, "yes")
// If this is a regular room
acct := FindJoinedAccount(e.Sender, room.Protocol, room.RoomID)
if acct != nil {
ev.Author = acct.Conn.User()
ev.Room = room.RoomID
- return acct.Conn.Send(ev)
+ created_ev_id, err := acct.Conn.Send(ev)
+ if err == nil && created_ev_id != "" {
+ cache_key := fmt.Sprintf("%s/event_seen/%s/%s",
+ room.Protocol, e.RoomId, created_ev_id)
+ dbKvPut(cache_key, "yes")
+ }
+ return err
} else {
mx.RoomKick(e.RoomId, e.Sender, fmt.Sprintf("Not present in %s on %s, please talk with Easybridge to rejoin", room.RoomID, room.Protocol))
return fmt.Errorf("not joined %s on %s", room.RoomID, room.Protocol)