aboutsummaryrefslogblamecommitdiff
path: root/connector/external/external.go
blob: 9aae0f1a315422078c6dc1a7f8ef03149b38908c (plain) (tree)
1
2
3
4


                
               














































                                                             
                                



                               
                                         










                                                            


                                                 












                            

                               




                              
                                                 























                                                                    
                                                              

                                          
                                           

















                                                  
                                                      



                                            
                                                 


                          
                                                



                          

                                       
                      

                                                      
                                           

         
                                            
 




                              
                                         



                                                  
                                







                                                                          
                                                                             
                                                





                                                                               
                                                                         

 
                                                             







                                              









                                                        
                               


                                                   



                                              

                                 
                               


                                                   



                                              

                                 
                   


                                                



                                              

                                 
                                
                               

                                                             
                                              


                                  
                                
                          



                                                                                                            
         
 

 

                                                               
                            
                                          
                                                            
                               





                                                                                                                   


                             
                                                                 









                                                                                  

                                               



                                                 








                                                                    







































                                                                                         
                                            




                                                                   

                           


                                        
                                             
 


                            

                                           



                                                                                                           

                       
                                           
 
                      

                          
                          
                             





                                                         

                                                                
















                                                                                     
                                         









                                       
                                                                





























                                                                     
                                















                                                                      












                                                                                            








                                                         
package external

import (
	"bufio"
	"encoding/json"
	"fmt"
	"io"
	"os"
	"os/exec"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	log "github.com/sirupsen/logrus"

	. "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector"
)

// Serialization protocol

type extMessage struct {
	// Header: message type and identifier
	MsgType string `json:"_type"`
	MsgId   uint64 `json:"_id"`

	// Message fields
	Key     string `json:"key"`
	Value   string `json:"value"`
	EventId string `json:"event_id"`
	Error   string `json:"error"`
	Room    RoomID `json:"room"`
	User    UserID `json:"user"`
}

type extMessageWithData struct {
	extMessage

	Data interface{} `json:"data"`
}

// Possible values for MsgType
const (
	// ezbr -> external
	CONFIGURE     = "configure"
	GET_USER      = "get_user"
	SET_USER_INFO = "set_user_info"
	SET_ROOM_INFO = "set_room_info"
	JOIN          = "join"
	INVITE        = "invite"
	LEAVE         = "leave"
	SEARCH        = "search"
	SEND          = "send"
	CLOSE         = "close"

	// external -> ezbr
	SAVE_CONFIG       = "save_config"
	JOINED            = "joined"
	LEFT              = "left"
	USER_INFO_UPDATED = "user_info_updated"
	ROOM_INFO_UPDATED = "room_info_updated"
	EVENT             = "event"
	CACHE_PUT         = "cache_put"
	CACHE_GET         = "cache_get"

	// reply messages
	// ezbr -> external: all must wait for a reply!
	// external -> ezbr: only CACHE_GET produces a reply
	REP_OK             = "rep_ok"
	REP_SEARCH_RESULTS = "rep_search_results"
	REP_ERROR          = "rep_error"
)

// ----

type External struct {
	handler Handler

	protocol string
	command  string
	debug    bool

	config Configuration

	recvPipe io.ReadCloser
	sendPipe io.WriteCloser
	sendJson *json.Encoder

	generation int
	proc       *exec.Cmd

	handlerChan      chan *extMessageWithData
	counter          uint64
	inflightRequests map[uint64]chan *extMessageWithData
	lock             sync.Mutex
}

func (ext *External) SetHandler(h Handler) {
	ext.handler = h
}

func (ext *External) Protocol() string {
	return ext.protocol
}

func (ext *External) Configure(c Configuration) error {
	var err error

	if ext.proc != nil {
		ext.Close()
	}

	ext.inflightRequests = map[uint64]chan *extMessageWithData{}

	ext.generation += 1

	ext.handlerChan = make(chan *extMessageWithData, 1000)
	go ext.handlerLoop(ext.generation)

	err = ext.setupProc(ext.generation)
	if err != nil {
		return err
	}

	go ext.restartLoop(ext.generation)

	_, err = ext.cmd(extMessage{
		MsgType: CONFIGURE,
	}, c)
	if err != nil {
		return err
	}

	return nil
}

// ---- Process management and communication logic

func (ext *External) setupProc(generation int) error {
	var err error

	ext.proc = exec.Command(ext.command)

	ext.recvPipe, err = ext.proc.StdoutPipe()
	if err != nil {
		return err
	}
	ext.sendPipe, err = ext.proc.StdinPipe()
	if err != nil {
		return err
	}

	send := io.Writer(ext.sendPipe)
	recv := io.Reader(ext.recvPipe)
	if ext.debug {
		recv = io.TeeReader(recv, os.Stderr)
		send = io.MultiWriter(send, os.Stderr)
		ext.proc.Stderr = os.Stderr
	}

	ext.sendJson = json.NewEncoder(send)

	err = ext.proc.Start()
	if err != nil {
		return err
	}

	go ext.recvLoop(recv, generation)
	return nil
}

func (ext *External) restartLoop(generation int) {
	for i := 0; i < 2; i++ {
		if ext.proc == nil {
			break
		}
		ext.proc.Wait()
		if ext.generation != generation {
			break
		}
		log.Printf("Process %s stopped, restarting.", ext.command)
		log.Printf("Generation %d vs %d", ext.generation, generation)
		err := ext.setupProc(generation)
		if err != nil {
			ext.proc = nil
			log.Warnf("Unable to restart %s: %s", ext.command, err)
			break
		}
	}
	log.Warnf("More than 3 attempts (%s); abandonning.", ext.command)
}

func (m *extMessageWithData) UnmarshalJSON(jj []byte) error {
	var c extMessage

	err := json.Unmarshal(jj, &c)
	if err != nil {
		return err
	}
	*m = extMessageWithData{extMessage: c}
	switch c.MsgType {
	case SAVE_CONFIG:
		var cf struct {
			Data Configuration `json:"data"`
		}
		err := json.Unmarshal(jj, &cf)
		if err != nil {
			return err
		}
		m.Data = cf.Data
		return nil
	case USER_INFO_UPDATED:
		var ui struct {
			Data UserInfo `json:"data"`
		}
		err := json.Unmarshal(jj, &ui)
		if err != nil {
			return err
		}
		m.Data = &ui.Data
		return nil
	case ROOM_INFO_UPDATED:
		var ri struct {
			Data RoomInfo `json:"data"`
		}
		err := json.Unmarshal(jj, &ri)
		if err != nil {
			return err
		}
		m.Data = &ri.Data
		return nil
	case EVENT:
		var ev struct {
			Data Event `json:"data"`
		}
		err := json.Unmarshal(jj, &ev)
		if err != nil {
			return err
		}
		m.Data = &ev.Data
		return nil
	case REP_SEARCH_RESULTS:
		var sr struct {
			Data []UserSearchResult `json:"data"`
		}
		err := json.Unmarshal(jj, &sr)
		if err != nil {
			return err
		}
		m.Data = sr.Data
		return nil
	case JOINED, LEFT, CACHE_PUT, CACHE_GET, REP_OK, REP_ERROR:
		return nil
	default:
		return fmt.Errorf("Invalid message type for message from external program: '%s'", c.MsgType)
	}

}

func (ext *External) recvLoop(from io.Reader, generation int) {
	scanner := bufio.NewScanner(from)
	for scanner.Scan() {
		var msg extMessageWithData
		err := json.Unmarshal(scanner.Bytes(), &msg)
		if err != nil {
			log.Warnf("Failed to decode from %s: %s. Skipping line.", ext.command, err.Error())
			continue
		}

		if scanner.Err() != nil {
			log.Warnf("Failed to read from %s: %s. Stopping here.", ext.command, scanner.Err().Error())
			break
		}

		log.Tracef("GOT MESSAGE: %#v %#v", msg, msg.Data)
		if strings.HasPrefix(msg.MsgType, "rep_") {
			func() {
				ext.lock.Lock()
				defer ext.lock.Unlock()
				if ch, ok := ext.inflightRequests[msg.MsgId]; ok {
					ch <- &msg
					delete(ext.inflightRequests, msg.MsgId)
				}
			}()
		} else {
			ext.handlerChan <- &msg
		}

		if ext.generation != generation {
			break
		}
	}
}

func (ext *External) handlerLoop(generation int) {
	for ext.handlerChan != nil && ext.generation == generation {
		select {
		case msg := <-ext.handlerChan:
			ext.handleCmd(msg)
		case <-time.After(10 * time.Second):
		}
	}
}

func (ext *External) cmd(msg extMessage, data interface{}) (*extMessageWithData, error) {
	msg_id := atomic.AddUint64(&ext.counter, 1)

	msg.MsgId = msg_id

	fullMsg := extMessageWithData{
		extMessage: msg,
		Data:       data,
	}

	ch := make(chan *extMessageWithData)

	func() {
		ext.lock.Lock()
		defer ext.lock.Unlock()
		ext.inflightRequests[msg_id] = ch
	}()

	defer func() {
		ext.lock.Lock()
		defer ext.lock.Unlock()
		delete(ext.inflightRequests, msg_id)
	}()

	err := ext.sendJson.Encode(&fullMsg)
	if err != nil {
		return nil, err
	}

	select {
	case rep := <-ch:
		if rep.MsgType == REP_ERROR {
			return nil, fmt.Errorf("%s: %s", msg.MsgType, rep.Error)
		} else {
			return rep, nil
		}
	case <-time.After(30 * time.Second):
		return nil, fmt.Errorf("(%s) timeout", msg.MsgType)
	}
}

func (ext *External) Close() {
	ext.generation += 1

	ext.sendJson.Encode(&extMessage{
		MsgType: CLOSE,
	})
	ext.proc.Process.Signal(os.Interrupt)

	ext.recvPipe.Close()
	ext.sendPipe.Close()

	go func() {
		time.Sleep(1 * time.Second)
		if ext.proc != nil {
			log.Info("Sending SIGKILL to external process (did not terminate within 1 second)")
			ext.proc.Process.Kill()
		}
	}()
	ext.proc.Wait()
	log.Info("External process exited")

	ext.proc = nil
	ext.recvPipe = nil
	ext.sendPipe = nil
	ext.sendJson = nil
	ext.handlerChan = nil
}

// ---- Actual message handling :)

func (ext *External) handleCmd(msg *extMessageWithData) {
	switch msg.MsgType {
	case SAVE_CONFIG:
		ext.handler.SaveConfig(msg.Data.(Configuration))
	case JOINED:
		ext.handler.Joined(msg.Room)
	case LEFT:
		ext.handler.Left(msg.Room)
	case USER_INFO_UPDATED:
		ext.handler.UserInfoUpdated(msg.User, msg.Data.(*UserInfo))
	case ROOM_INFO_UPDATED:
		ext.handler.RoomInfoUpdated(msg.Room, msg.User, msg.Data.(*RoomInfo))
	case EVENT:
		ext.handler.Event(msg.Data.(*Event))
	case CACHE_PUT:
		ext.handler.CachePut(msg.Key, msg.Value)
	case CACHE_GET:
		value := ext.handler.CacheGet(msg.Key)
		ext.sendJson.Encode(&extMessage{
			MsgType: REP_OK,
			MsgId:   msg.MsgId,
			Key:     msg.Key,
			Value:   value,
		})
	}
}

func (ext *External) User() UserID {
	rep, err := ext.cmd(extMessage{
		MsgType: GET_USER,
	}, nil)
	if err != nil {
		log.Warnf("Unable to get user! %s", err.Error())
		return ""
	}
	return rep.User
}

func (ext *External) SetUserInfo(info *UserInfo) error {
	_, err := ext.cmd(extMessage{
		MsgType: SET_USER_INFO,
	}, info)
	return err
}

func (ext *External) SetRoomInfo(room RoomID, info *RoomInfo) error {
	_, err := ext.cmd(extMessage{
		MsgType: SET_ROOM_INFO,
		Room:    room,
	}, info)
	return err
}

func (ext *External) Join(room RoomID) error {
	_, err := ext.cmd(extMessage{
		MsgType: JOIN,
		Room:    room,
	}, nil)
	return err
}

func (ext *External) Invite(user UserID, room RoomID) error {
	_, err := ext.cmd(extMessage{
		MsgType: INVITE,
		User:    user,
		Room:    room,
	}, nil)
	return err
}

func (ext *External) Leave(room RoomID) {
	_, err := ext.cmd(extMessage{
		MsgType: LEAVE,
		Room:    room,
	}, nil)
	if err != nil {
		log.Warnf("Could not leave %s: %s", room, err.Error())
	}
}

func (ext *External) SearchForUsers(query string) ([]UserSearchResult, error) {
	rep, err := ext.cmd(extMessage{
		MsgType: SEARCH,
	}, query)
	if err != nil {
		return nil, err
	}
	if rep.MsgType != REP_SEARCH_RESULTS {
		return nil, fmt.Errorf("Invalid result type from external: %s", rep.MsgType)
	}
	return rep.Data.([]UserSearchResult), nil
}

func (ext *External) Send(event *Event) (string, error) {
	rep, err := ext.cmd(extMessage{
		MsgType: SEND,
	}, event)
	if err != nil {
		return "", err
	}
	return rep.EventId, nil
}