aboutsummaryrefslogblamecommitdiff
path: root/connector/external/external.go
blob: e64ff78238f529b4af77684e48e7d65937f45ea2 (plain) (tree)


































































































































































































































































































































































































                                                                                                      
package external

import (
	"encoding/json"
	"fmt"
	"io"
	"os"
	"os/exec"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	log "github.com/sirupsen/logrus"

	. "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector"
)

// Serialization protocol

type extMessage struct {
	// Header: message type and identifier
	MsgType string `json:"_type"`
	MsgId   uint64 `json:"_id"`

	// Message fields
	Key     string `json:"key"`
	Value   string `json:"value"`
	EventId string `json:"event_id"`
	Error   string `json:"error"`
	Room    RoomID `json:"room"`
	User    UserID `json:"user"`
}

type extMessageWithData struct {
	extMessage

	Data interface{} `json:"data"`
}

// Possible values for MsgType
const (
	// ezbr -> external
	CONFIGURE     = "configure"
	GET_USER      = "get_user"
	SET_USER_INFO = "set_user_info"
	SET_ROOM_INFO = "set_room_info"
	JOIN          = "join"
	INVITE        = "invite"
	LEAVE         = "leave"
	SEND          = "send"
	CLOSE         = "close"

	// external -> ezbr
	JOINED            = "joined"
	LEFT              = "left"
	USER_INFO_UPDATED = "user_info_updated"
	ROOM_INFO_UPDATED = "room_info_updated"
	EVENT             = "event"
	CACHE_PUT         = "cache_put"
	CACHE_GET         = "cache_get"

	// reply messages
	// ezbr -> external: all must wait for a reply!
	// external -> ezbr: only CACHE_GET produces a reply
	REP_OK    = "rep_ok"
	REP_ERROR = "rep_error"
)

// ----

type External struct {
	handler Handler

	protocol string
	command  string
	debug    bool

	config Configuration

	recv     io.Reader
	send     io.Writer
	sendJson *json.Encoder

	generation int
	proc       *exec.Cmd

	counter          uint64
	inflightRequests map[uint64]chan *extMessageWithData
	lock             sync.Mutex
}

func (ext *External) SetHandler(h Handler) {
	ext.handler = h
}

func (ext *External) Protocol() string {
	return ext.protocol
}

func (ext *External) Configure(c Configuration) error {
	var err error

	if ext.proc != nil {
		ext.Close()
	}

	ext.inflightRequests = map[uint64]chan *extMessageWithData{}

	ext.generation += 1

	err = ext.setupProc()
	if err != nil {
		return err
	}

	go ext.restartLoop(ext.generation)

	_, err = ext.cmd(extMessage{
		MsgType: CONFIGURE,
	}, c)
	if err != nil {
		return err
	}

	return nil
}

// ---- Process management and communication logic

func (ext *External) setupProc() error {
	var err error

	ext.proc = exec.Command(ext.command)

	ext.recv, err = ext.proc.StdoutPipe()
	if err != nil {
		return err
	}
	ext.send, err = ext.proc.StdinPipe()
	if err != nil {
		return err
	}

	if ext.debug {
		ext.recv = io.TeeReader(ext.recv, os.Stderr)
		ext.send = io.MultiWriter(ext.send, os.Stderr)
	}

	ext.sendJson = json.NewEncoder(ext.send)

	ext.proc.Stderr = os.Stderr

	err = ext.proc.Start()
	if err != nil {
		return err
	}

	go ext.recvLoop()
	return nil
}

func (ext *External) restartLoop(generation int) {
	for {
		if ext.proc == nil {
			break
		}
		ext.proc.Wait()
		if ext.generation != generation {
			break
		}
		log.Printf("Process %s stopped, restarting.", ext.command)
		err := ext.setupProc()
		if err != nil {
			ext.proc = nil
			log.Warnf("Unable to restart %s: %s", ext.command, err)
			break
		}
	}
}

func (m *extMessageWithData) DecodeJSON(jj []byte) error {
	var c extMessage

	err := json.Unmarshal(jj, &c)
	if err != nil {
		return err
	}
	*m = extMessageWithData{extMessage: c}
	switch c.MsgType {
	case USER_INFO_UPDATED:
		var ui UserInfo
		err := json.Unmarshal(jj, &ui)
		if err != nil {
			return err
		}
		m.Data = &ui
	case ROOM_INFO_UPDATED:
		var ri RoomInfo
		err := json.Unmarshal(jj, &ri)
		if err != nil {
			return err
		}
		m.Data = &ri
	case EVENT:
		var ev Event
		err := json.Unmarshal(jj, &ev)
		if err != nil {
			return err
		}
		m.Data = &ev
	}
	return nil
}

func (ext *External) recvLoop() {
	reader := json.NewDecoder(ext.recv)
	for {
		var msg extMessageWithData
		err := reader.Decode(&msg)
		if err != nil {
			log.Warnf("Failed to decode from %s: %s. Stopping reading.", ext.command, err)
			break
		}

		if strings.HasPrefix(msg.MsgType, "rep_") {
			func() {
				ext.lock.Lock()
				defer ext.lock.Unlock()
				if ch, ok := ext.inflightRequests[msg.MsgId]; ok {
					ch <- &msg
					delete(ext.inflightRequests, msg.MsgId)
				}
			}()
		} else {
			ext.handleCmd(&msg)
		}
	}
}

func (ext *External) cmd(msg extMessage, data interface{}) (*extMessageWithData, error) {
	msg_id := atomic.AddUint64(&ext.counter, 1)

	msg.MsgId = msg_id

	fullMsg := extMessageWithData{
		extMessage: msg,
		Data:       data,
	}

	ch := make(chan *extMessageWithData)

	func() {
		ext.lock.Lock()
		defer ext.lock.Unlock()
		ext.inflightRequests[msg_id] = ch
	}()

	defer func() {
		ext.lock.Lock()
		defer ext.lock.Unlock()
		delete(ext.inflightRequests, msg_id)
	}()

	err := ext.sendJson.Encode(&fullMsg)
	if err != nil {
		return nil, err
	}

	select {
	case rep := <-ch:
		if rep.MsgType == REP_ERROR {
			return nil, fmt.Errorf("%s: %s", msg.MsgType, rep.Error)
		} else {
			return rep, nil
		}
	case <-time.After(5 * time.Second):
		return nil, fmt.Errorf("(%s) timeout", msg.MsgType)
	}
}

func (ext *External) Close() {
	ext.sendJson.Encode(&extMessage{
		MsgType: CLOSE,
	})
	ext.generation += 1

	proc := ext.proc
	ext.proc = nil
	ext.recv = nil
	ext.send = nil
	ext.sendJson = nil

	go func() {
		time.Sleep(10 * time.Second)
		proc.Process.Kill()
	}()
}

// ---- Actual message handling :)

func (ext *External) handleCmd(msg *extMessageWithData) {
	switch msg.MsgType {
	case JOINED:
		ext.handler.Joined(msg.Room)
	case LEFT:
		ext.handler.Left(msg.Room)
	case USER_INFO_UPDATED:
		ext.handler.UserInfoUpdated(msg.User, msg.Data.(*UserInfo))
	case ROOM_INFO_UPDATED:
		ext.handler.RoomInfoUpdated(msg.Room, msg.User, msg.Data.(*RoomInfo))
	case EVENT:
		ext.handler.Event(msg.Data.(*Event))
	case CACHE_PUT:
		ext.handler.CachePut(msg.Key, msg.Value)
	case CACHE_GET:
		value := ext.handler.CacheGet(msg.Key)
		ext.sendJson.Encode(&extMessage{
			MsgType: REP_OK,
			MsgId:   msg.MsgId,
			Value:   value,
		})
	}
}

func (ext *External) User() UserID {
	rep, err := ext.cmd(extMessage{
		MsgType: GET_USER,
	}, nil)
	if err != nil {
		log.Warnf("Unable to get user!")
		return ""
	}
	return rep.User
}

func (ext *External) SetUserInfo(info *UserInfo) error {
	_, err := ext.cmd(extMessage{
		MsgType: SET_USER_INFO,
	}, info)
	return err
}

func (ext *External) SetRoomInfo(room RoomID, info *RoomInfo) error {
	_, err := ext.cmd(extMessage{
		MsgType: SET_ROOM_INFO,
		Room:    room,
	}, info)
	return err
}

func (ext *External) Join(room RoomID) error {
	_, err := ext.cmd(extMessage{
		MsgType: JOIN,
		Room:    room,
	}, nil)
	return err
}

func (ext *External) Invite(user UserID, room RoomID) error {
	_, err := ext.cmd(extMessage{
		MsgType: LEAVE,
		User:    user,
		Room:    room,
	}, nil)
	return err
}

func (ext *External) Leave(room RoomID) {
	_, err := ext.cmd(extMessage{
		MsgType: LEAVE,
		Room:    room,
	}, nil)
	if err != nil {
		log.Warnf("Could not leave %s: %s", room, err.Error())
	}
}

func (ext *External) Send(event *Event) (string, error) {
	rep, err := ext.cmd(extMessage{
		MsgType: SEND,
	}, event)
	if err != nil {
		return "", err
	}
	return rep.EventId, nil
}