diff options
Diffstat (limited to 'connector')
-rw-r--r-- | connector/external/external.go | 45 |
1 files changed, 28 insertions, 17 deletions
diff --git a/connector/external/external.go b/connector/external/external.go index 6117880..b66ab6c 100644 --- a/connector/external/external.go +++ b/connector/external/external.go @@ -79,8 +79,8 @@ type External struct { config Configuration - recv io.Reader - send io.Writer + recvPipe io.ReadCloser + sendPipe io.WriteCloser sendJson *json.Encoder generation int @@ -114,7 +114,7 @@ func (ext *External) Configure(c Configuration) error { ext.handlerChan = make(chan *extMessageWithData, 1000) go ext.handlerLoop(ext.generation) - err = ext.setupProc() + err = ext.setupProc(ext.generation) if err != nil { return err } @@ -133,26 +133,28 @@ func (ext *External) Configure(c Configuration) error { // ---- Process management and communication logic -func (ext *External) setupProc() error { +func (ext *External) setupProc(generation int) error { var err error ext.proc = exec.Command(ext.command) - ext.recv, err = ext.proc.StdoutPipe() + ext.recvPipe, err = ext.proc.StdoutPipe() if err != nil { return err } - ext.send, err = ext.proc.StdinPipe() + ext.sendPipe, err = ext.proc.StdinPipe() if err != nil { return err } + send := io.Writer(ext.sendPipe) + recv := io.Reader(ext.recvPipe) if ext.debug { - ext.recv = io.TeeReader(ext.recv, os.Stderr) - ext.send = io.MultiWriter(ext.send, os.Stderr) + recv = io.TeeReader(recv, os.Stderr) + send = io.MultiWriter(send, os.Stderr) } - ext.sendJson = json.NewEncoder(ext.send) + ext.sendJson = json.NewEncoder(send) ext.proc.Stderr = os.Stderr @@ -161,7 +163,7 @@ func (ext *External) setupProc() error { return err } - go ext.recvLoop() + go ext.recvLoop(recv, generation) return nil } @@ -175,7 +177,7 @@ func (ext *External) restartLoop(generation int) { break } log.Printf("Process %s stopped, restarting.", ext.command) - err := ext.setupProc() + err := ext.setupProc(generation) if err != nil { ext.proc = nil log.Warnf("Unable to restart %s: %s", ext.command, err) @@ -232,8 +234,8 @@ func (m *extMessageWithData) UnmarshalJSON(jj []byte) error { } -func (ext *External) recvLoop() { - scanner := bufio.NewScanner(ext.recv) +func (ext *External) recvLoop(from io.Reader, generation int) { + scanner := bufio.NewScanner(from) for scanner.Scan() { var msg extMessageWithData err := json.Unmarshal(scanner.Bytes(), &msg) @@ -260,6 +262,10 @@ func (ext *External) recvLoop() { } else { ext.handlerChan <- &msg } + + if ext.generation != generation { + break + } } } @@ -315,20 +321,25 @@ func (ext *External) cmd(msg extMessage, data interface{}) (*extMessageWithData, } func (ext *External) Close() { + ext.generation += 1 + ext.sendJson.Encode(&extMessage{ MsgType: CLOSE, }) - ext.generation += 1 proc := ext.proc + proc.Process.Signal(os.Interrupt) + ext.recvPipe.Close() + ext.sendPipe.Close() + ext.proc = nil - ext.recv = nil - ext.send = nil + ext.recvPipe = nil + ext.sendPipe = nil ext.sendJson = nil ext.handlerChan = nil go func() { - time.Sleep(10 * time.Second) + time.Sleep(1 * time.Second) proc.Process.Kill() }() } |