package external
import (
"bufio"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"strings"
"sync"
"sync/atomic"
"time"
log "github.com/sirupsen/logrus"
. "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector"
)
// Serialization protocol
type extMessage struct {
// Header: message type and identifier
MsgType string `json:"_type"`
MsgId uint64 `json:"_id"`
// Message fields
Key string `json:"key"`
Value string `json:"value"`
EventId string `json:"event_id"`
Error string `json:"error"`
Room RoomID `json:"room"`
User UserID `json:"user"`
}
type extMessageWithData struct {
extMessage
Data interface{} `json:"data"`
}
// Possible values for MsgType
const (
// ezbr -> external
CONFIGURE = "configure"
GET_USER = "get_user"
SET_USER_INFO = "set_user_info"
SET_ROOM_INFO = "set_room_info"
JOIN = "join"
INVITE = "invite"
LEAVE = "leave"
SEARCH = "search"
SEND = "send"
USER_COMMAND = "user_command"
CLOSE = "close"
// external -> ezbr
SAVE_CONFIG = "save_config"
SYSTEM_MESSAGE = "system_message"
JOINED = "joined"
LEFT = "left"
USER_INFO_UPDATED = "user_info_updated"
ROOM_INFO_UPDATED = "room_info_updated"
EVENT = "event"
CACHE_PUT = "cache_put"
CACHE_GET = "cache_get"
// reply messages
// ezbr -> external: all must wait for a reply!
// external -> ezbr: only CACHE_GET produces a reply
REP_OK = "rep_ok"
REP_SEARCH_RESULTS = "rep_search_results"
REP_ERROR = "rep_error"
)
// ----
type External struct {
handler Handler
protocol string
command string
debug bool
config Configuration
recvPipe io.ReadCloser
sendPipe io.WriteCloser
sendJson *json.Encoder
generation int
proc *exec.Cmd
handlerChan chan *extMessageWithData
counter uint64
inflightRequests map[uint64]chan *extMessageWithData
lock sync.Mutex
}
func (ext *External) SetHandler(h Handler) {
ext.handler = h
}
func (ext *External) Protocol() string {
return ext.protocol
}
func (ext *External) Configure(c Configuration) error {
var err error
if ext.proc != nil {
ext.Close()
}
ext.inflightRequests = map[uint64]chan *extMessageWithData{}
ext.generation += 1
ext.handlerChan = make(chan *extMessageWithData, 1000)
go ext.handlerLoop(ext.generation)
err = ext.setupProc(ext.generation)
if err != nil {
return err
}
go ext.restartLoop(ext.generation)
_, err = ext.cmd(extMessage{
MsgType: CONFIGURE,
}, c)
if err != nil {
return err
}
return nil
}
// ---- Process management and communication logic
func (ext *External) setupProc(generation int) error {
var err error
ext.proc = exec.Command(ext.command)
ext.recvPipe, err = ext.proc.StdoutPipe()
if err != nil {
return err
}
ext.sendPipe, err = ext.proc.StdinPipe()
if err != nil {
return err
}
send := io.Writer(ext.sendPipe)
recv := io.Reader(ext.recvPipe)
if ext.debug {
recv = io.TeeReader(recv, os.Stderr)
send = io.MultiWriter(send, os.Stderr)
ext.proc.Stderr = os.Stderr
}
ext.sendJson = json.NewEncoder(send)
err = ext.proc.Start()
if err != nil {
return err
}
go ext.recvLoop(recv, generation)
return nil
}
func (ext *External) restartLoop(generation int) {
for i := 0; i < 2; i++ {
if ext.proc == nil {
break
}
ext.proc.Wait()
if ext.generation != generation {
break
}
log.Printf("Process %s stopped, restarting.", ext.command)
log.Printf("Generation %d vs %d", ext.generation, generation)
err := ext.setupProc(generation)
if err != nil {
ext.proc = nil
log.Warnf("Unable to restart %s: %s", ext.command, err)
break
}
}
log.Warnf("More than 3 attempts (%s); abandonning.", ext.command)
}
func (m *extMessageWithData) UnmarshalJSON(jj []byte) error {
var c extMessage
err := json.Unmarshal(jj, &c)
if err != nil {
return err
}
*m = extMessageWithData{extMessage: c}
switch c.MsgType {
case SAVE_CONFIG:
var cf struct {
Data Configuration `json:"data"`
}
err := json.Unmarshal(jj, &cf)
if err != nil {
return err
}
m.Data = cf.Data
return nil
case USER_INFO_UPDATED:
var ui struct {
Data UserInfo `json:"data"`
}
err := json.Unmarshal(jj, &ui)
if err != nil {
return err
}
m.Data = &ui.Data
return nil
case ROOM_INFO_UPDATED:
var ri struct {
Data RoomInfo `json:"data"`
}
err := json.Unmarshal(jj, &ri)
if err != nil {
return err
}
m.Data = &ri.Data
return nil
case EVENT:
var ev struct {
Data Event `json:"data"`
}
err := json.Unmarshal(jj, &ev)
if err != nil {
return err
}
m.Data = &ev.Data
return nil
case REP_SEARCH_RESULTS:
var sr struct {
Data []UserSearchResult `json:"data"`
}
err := json.Unmarshal(jj, &sr)
if err != nil {
return err
}
m.Data = sr.Data
return nil
case SYSTEM_MESSAGE, JOINED, LEFT, CACHE_PUT, CACHE_GET, REP_OK, REP_ERROR:
return nil
default:
return fmt.Errorf("Invalid message type for message from external program: '%s'", c.MsgType)
}
}
func (ext *External) recvLoop(from io.Reader, generation int) {
scanner := bufio.NewScanner(from)
for scanner.Scan() {
var msg extMessageWithData
err := json.Unmarshal(scanner.Bytes(), &msg)
if err != nil {
log.Warnf("Failed to decode from %s: %s. Skipping line.", ext.command, err.Error())
continue
}
if scanner.Err() != nil {
log.Warnf("Failed to read from %s: %s. Stopping here.", ext.command, scanner.Err().Error())
break
}
log.Tracef("GOT MESSAGE: %#v %#v", msg, msg.Data)
if strings.HasPrefix(msg.MsgType, "rep_") {
func() {
ext.lock.Lock()
defer ext.lock.Unlock()
if ch, ok := ext.inflightRequests[msg.MsgId]; ok {
ch <- &msg
delete(ext.inflightRequests, msg.MsgId)
}
}()
} else {
ext.handlerChan <- &msg
}
if ext.generation != generation {
break
}
}
}
func (ext *External) handlerLoop(generation int) {
for ext.handlerChan != nil && ext.generation == generation {
select {
case msg := <-ext.handlerChan:
ext.handleCmd(msg)
case <-time.After(10 * time.Second):
}
}
}
func (ext *External) cmd(msg extMessage, data interface{}) (*extMessageWithData, error) {
msg_id := atomic.AddUint64(&ext.counter, 1)
msg.MsgId = msg_id
fullMsg := extMessageWithData{
extMessage: msg,
Data: data,
}
ch := make(chan *extMessageWithData)
func() {
ext.lock.Lock()
defer ext.lock.Unlock()
ext.inflightRequests[msg_id] = ch
}()
defer func() {
ext.lock.Lock()
defer ext.lock.Unlock()
delete(ext.inflightRequests, msg_id)
}()
err := ext.sendJson.Encode(&fullMsg)
if err != nil {
return nil, err
}
select {
case rep := <-ch:
if rep.MsgType == REP_ERROR {
return nil, fmt.Errorf("%s: %s", msg.MsgType, rep.Error)
} else {
return rep, nil
}
case <-time.After(30 * time.Second):
return nil, fmt.Errorf("(%s) timeout", msg.MsgType)
}
}
func (ext *External) Close() {
ext.generation += 1
ext.sendJson.Encode(&extMessage{
MsgType: CLOSE,
})
ext.proc.Process.Signal(os.Interrupt)
ext.recvPipe.Close()
ext.sendPipe.Close()
go func() {
time.Sleep(1 * time.Second)
if ext.proc != nil {
log.Info("Sending SIGKILL to external process (did not terminate within 1 second)")
ext.proc.Process.Kill()
}
}()
ext.proc.Wait()
log.Info("External process exited")
ext.proc = nil
ext.recvPipe = nil
ext.sendPipe = nil
ext.sendJson = nil
ext.handlerChan = nil
}
// ---- Actual message handling :)
func (ext *External) handleCmd(msg *extMessageWithData) {
switch msg.MsgType {
case SAVE_CONFIG:
ext.handler.SaveConfig(msg.Data.(Configuration))
case SYSTEM_MESSAGE:
ext.handler.SystemMessage(msg.Value)
case JOINED:
ext.handler.Joined(msg.Room)
case LEFT:
ext.handler.Left(msg.Room)
case USER_INFO_UPDATED:
ext.handler.UserInfoUpdated(msg.User, msg.Data.(*UserInfo))
case ROOM_INFO_UPDATED:
ext.handler.RoomInfoUpdated(msg.Room, msg.User, msg.Data.(*RoomInfo))
case EVENT:
ext.handler.Event(msg.Data.(*Event))
case CACHE_PUT:
ext.handler.CachePut(msg.Key, msg.Value)
case CACHE_GET:
value := ext.handler.CacheGet(msg.Key)
ext.sendJson.Encode(&extMessage{
MsgType: REP_OK,
MsgId: msg.MsgId,
Key: msg.Key,
Value: value,
})
}
}
func (ext *External) User() UserID {
rep, err := ext.cmd(extMessage{
MsgType: GET_USER,
}, nil)
if err != nil {
log.Warnf("Unable to get user! %s", err.Error())
return ""
}
return rep.User
}
func (ext *External) SetUserInfo(info *UserInfo) error {
_, err := ext.cmd(extMessage{
MsgType: SET_USER_INFO,
}, info)
return err
}
func (ext *External) SetRoomInfo(room RoomID, info *RoomInfo) error {
_, err := ext.cmd(extMessage{
MsgType: SET_ROOM_INFO,
Room: room,
}, info)
return err
}
func (ext *External) Join(room RoomID) error {
_, err := ext.cmd(extMessage{
MsgType: JOIN,
Room: room,
}, nil)
return err
}
func (ext *External) Invite(user UserID, room RoomID) error {
_, err := ext.cmd(extMessage{
MsgType: INVITE,
User: user,
Room: room,
}, nil)
return err
}
func (ext *External) Leave(room RoomID) {
_, err := ext.cmd(extMessage{
MsgType: LEAVE,
Room: room,
}, nil)
if err != nil {
log.Warnf("Could not leave %s: %s", room, err.Error())
}
}
func (ext *External) SearchForUsers(query string) ([]UserSearchResult, error) {
rep, err := ext.cmd(extMessage{
MsgType: SEARCH,
}, query)
if err != nil {
return nil, err
}
if rep.MsgType != REP_SEARCH_RESULTS {
return nil, fmt.Errorf("Invalid result type from external: %s", rep.MsgType)
}
return rep.Data.([]UserSearchResult), nil
}
func (ext *External) Send(event *Event) (string, error) {
rep, err := ext.cmd(extMessage{
MsgType: SEND,
}, event)
if err != nil {
return "", err
}
return rep.EventId, nil
}
func (ext *External) UserCommand(cm string) {
ext.cmd(extMessage{
MsgType: USER_COMMAND,
Value: cm,
}, nil)
}