fix(command): wait for all stdout and stderr streams to complete correctly

This commit is contained in:
Quentin McGaw
2026-06-11 13:30:59 +00:00
parent 48c1f2bf6a
commit acab89b91a
4 changed files with 50 additions and 30 deletions
+10 -4
View File
@@ -9,8 +9,9 @@ import (
) )
// Start launches a command and streams stdout and stderr to channels. // Start launches a command and streams stdout and stderr to channels.
// All the channels returned are ready only and won't be closed // stdoutLines and stderrLines channels will be closed when there is no more
// if the command fails later. // output to read, in order for the caller to catch all lines even after the
// command has finished. The waitError channel returned will never be closed.
func (c *Cmder) Start(cmd *exec.Cmd) ( func (c *Cmder) Start(cmd *exec.Cmd) (
stdoutLines, stderrLines <-chan string, stdoutLines, stderrLines <-chan string,
waitError <-chan error, startErr error, waitError <-chan error, startErr error,
@@ -38,6 +39,7 @@ func start(cmd execCmd) (stdoutLines, stderrLines <-chan string,
if err != nil { if err != nil {
_ = stdout.Close() _ = stdout.Close()
<-stdoutDone <-stdoutDone
close(stdoutLinesCh)
return nil, nil, nil, err return nil, nil, nil, err
} }
go streamToChannel(stderrReady, stderrDone, stderr, stderrLinesCh) go streamToChannel(stderrReady, stderrDone, stderr, stderrLinesCh)
@@ -45,9 +47,11 @@ func start(cmd execCmd) (stdoutLines, stderrLines <-chan string,
err = cmd.Start() err = cmd.Start()
if err != nil { if err != nil {
_ = stdout.Close() _ = stdout.Close()
_ = stderr.Close()
<-stdoutDone <-stdoutDone
close(stdoutLinesCh)
_ = stderr.Close()
<-stderrDone <-stderrDone
close(stderrLinesCh)
return nil, nil, nil, err return nil, nil, nil, err
} }
@@ -55,8 +59,10 @@ func start(cmd execCmd) (stdoutLines, stderrLines <-chan string,
go func() { go func() {
err := cmd.Wait() err := cmd.Wait()
<-stdoutDone <-stdoutDone
<-stderrDone close(stdoutLinesCh)
_ = stdout.Close() _ = stdout.Close()
<-stderrDone
close(stderrLinesCh)
_ = stderr.Close() _ = stderr.Close()
waitErrorCh <- err waitErrorCh <- err
}() }()
+19 -13
View File
@@ -18,31 +18,37 @@ func (c *Cmder) RunAndLog(ctx context.Context, command string, logger Logger) (e
return err return err
} }
streamCtx, streamCancel := context.WithCancel(context.Background())
streamDone := make(chan struct{}) streamDone := make(chan struct{})
go streamLines(streamCtx, streamDone, logger, stdout, stderr) go streamLines(streamDone, logger, stdout, stderr)
err = <-waitError err = <-waitError
streamCancel()
<-streamDone <-streamDone
return err return err
} }
func streamLines(ctx context.Context, done chan<- struct{}, func streamLines(done chan<- struct{}, logger Logger,
logger Logger, stdout, stderr <-chan string, stdout, stderr <-chan string,
) { ) {
defer close(done) defer close(done)
var line string
for { for {
select { select {
case <-ctx.Done(): case line, ok := <-stdout:
return if ok {
case line = <-stdout: logger.Info(line)
logger.Info(line) }
case line = <-stderr: if stderr == nil {
logger.Error(line) return
}
stdout = nil
case line, ok := <-stderr:
if ok {
logger.Error(line)
}
if stdout == nil {
return
}
stderr = nil
} }
} }
} }
+1 -4
View File
@@ -29,19 +29,16 @@ func (r *Runner) Run(ctx context.Context, errCh chan<- error, ready chan<- struc
return return
} }
streamCtx, streamCancel := context.WithCancel(context.Background())
streamDone := make(chan struct{}) streamDone := make(chan struct{})
go streamLines(streamCtx, streamDone, r.logger, go streamLines(streamDone, r.logger,
stdoutLines, stderrLines, ready) stdoutLines, stderrLines, ready)
select { select {
case <-ctx.Done(): case <-ctx.Done():
<-waitError <-waitError
streamCancel()
<-streamDone <-streamDone
errCh <- ctx.Err() errCh <- ctx.Err()
case err := <-waitError: case err := <-waitError:
streamCancel()
<-streamDone <-streamDone
errCh <- err errCh <- err
} }
+20 -9
View File
@@ -1,26 +1,37 @@
package openvpn package openvpn
import ( import (
"context"
"strings" "strings"
) )
func streamLines(ctx context.Context, done chan<- struct{}, func streamLines(done chan<- struct{},
logger Logger, stdout, stderr <-chan string, logger Logger, stdout, stderr <-chan string,
tunnelReady chan<- struct{}, tunnelReady chan<- struct{},
) { ) {
defer close(done) defer close(done)
var line string
for { for {
var line string
var ok bool
errLine := false errLine := false
select { select {
case <-ctx.Done(): case line, ok = <-stdout:
return if ok {
case line = <-stdout: break
case line = <-stderr: }
errLine = true if stderr == nil {
return
}
stdout = nil
case line, ok = <-stderr:
if ok {
errLine = true
break
}
if stdout == nil {
return
}
stderr = nil
} }
line, level := processLogLine(line) line, level := processLogLine(line)
if line == "" { if line == "" {