aboutsummaryrefslogtreecommitdiff
path: root/connector/external/external.go
diff options
context:
space:
mode:
Diffstat (limited to 'connector/external/external.go')
-rw-r--r--connector/external/external.go45
1 files changed, 28 insertions, 17 deletions
diff --git a/connector/external/external.go b/connector/external/external.go
index 6117880..b66ab6c 100644
--- a/connector/external/external.go
+++ b/connector/external/external.go
@@ -79,8 +79,8 @@ type External struct {
config Configuration
- recv io.Reader
- send io.Writer
+ recvPipe io.ReadCloser
+ sendPipe io.WriteCloser
sendJson *json.Encoder
generation int
@@ -114,7 +114,7 @@ func (ext *External) Configure(c Configuration) error {
ext.handlerChan = make(chan *extMessageWithData, 1000)
go ext.handlerLoop(ext.generation)
- err = ext.setupProc()
+ err = ext.setupProc(ext.generation)
if err != nil {
return err
}
@@ -133,26 +133,28 @@ func (ext *External) Configure(c Configuration) error {
// ---- Process management and communication logic
-func (ext *External) setupProc() error {
+func (ext *External) setupProc(generation int) error {
var err error
ext.proc = exec.Command(ext.command)
- ext.recv, err = ext.proc.StdoutPipe()
+ ext.recvPipe, err = ext.proc.StdoutPipe()
if err != nil {
return err
}
- ext.send, err = ext.proc.StdinPipe()
+ ext.sendPipe, err = ext.proc.StdinPipe()
if err != nil {
return err
}
+ send := io.Writer(ext.sendPipe)
+ recv := io.Reader(ext.recvPipe)
if ext.debug {
- ext.recv = io.TeeReader(ext.recv, os.Stderr)
- ext.send = io.MultiWriter(ext.send, os.Stderr)
+ recv = io.TeeReader(recv, os.Stderr)
+ send = io.MultiWriter(send, os.Stderr)
}
- ext.sendJson = json.NewEncoder(ext.send)
+ ext.sendJson = json.NewEncoder(send)
ext.proc.Stderr = os.Stderr
@@ -161,7 +163,7 @@ func (ext *External) setupProc() error {
return err
}
- go ext.recvLoop()
+ go ext.recvLoop(recv, generation)
return nil
}
@@ -175,7 +177,7 @@ func (ext *External) restartLoop(generation int) {
break
}
log.Printf("Process %s stopped, restarting.", ext.command)
- err := ext.setupProc()
+ err := ext.setupProc(generation)
if err != nil {
ext.proc = nil
log.Warnf("Unable to restart %s: %s", ext.command, err)
@@ -232,8 +234,8 @@ func (m *extMessageWithData) UnmarshalJSON(jj []byte) error {
}
-func (ext *External) recvLoop() {
- scanner := bufio.NewScanner(ext.recv)
+func (ext *External) recvLoop(from io.Reader, generation int) {
+ scanner := bufio.NewScanner(from)
for scanner.Scan() {
var msg extMessageWithData
err := json.Unmarshal(scanner.Bytes(), &msg)
@@ -260,6 +262,10 @@ func (ext *External) recvLoop() {
} else {
ext.handlerChan <- &msg
}
+
+ if ext.generation != generation {
+ break
+ }
}
}
@@ -315,20 +321,25 @@ func (ext *External) cmd(msg extMessage, data interface{}) (*extMessageWithData,
}
func (ext *External) Close() {
+ ext.generation += 1
+
ext.sendJson.Encode(&extMessage{
MsgType: CLOSE,
})
- ext.generation += 1
proc := ext.proc
+ proc.Process.Signal(os.Interrupt)
+ ext.recvPipe.Close()
+ ext.sendPipe.Close()
+
ext.proc = nil
- ext.recv = nil
- ext.send = nil
+ ext.recvPipe = nil
+ ext.sendPipe = nil
ext.sendJson = nil
ext.handlerChan = nil
go func() {
- time.Sleep(10 * time.Second)
+ time.Sleep(1 * time.Second)
proc.Process.Kill()
}()
}