From acab89b91a21a4d7534f3551eed4769470385f9e Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Thu, 11 Jun 2026 13:30:59 +0000 Subject: [PATCH] fix(command): wait for all stdout and stderr streams to complete correctly --- internal/command/start.go | 14 ++++++++++---- internal/command/startnlog.go | 32 +++++++++++++++++++------------- internal/openvpn/run.go | 5 +---- internal/openvpn/stream.go | 29 ++++++++++++++++++++--------- 4 files changed, 50 insertions(+), 30 deletions(-) diff --git a/internal/command/start.go b/internal/command/start.go index b2405cde..ac545b45 100644 --- a/internal/command/start.go +++ b/internal/command/start.go @@ -9,8 +9,9 @@ import ( ) // Start launches a command and streams stdout and stderr to channels. -// All the channels returned are ready only and won't be closed -// if the command fails later. +// stdoutLines and stderrLines channels will be closed when there is no more +// output to read, in order for the caller to catch all lines even after the +// command has finished. The waitError channel returned will never be closed. func (c *Cmder) Start(cmd *exec.Cmd) ( stdoutLines, stderrLines <-chan string, waitError <-chan error, startErr error, @@ -38,6 +39,7 @@ func start(cmd execCmd) (stdoutLines, stderrLines <-chan string, if err != nil { _ = stdout.Close() <-stdoutDone + close(stdoutLinesCh) return nil, nil, nil, err } go streamToChannel(stderrReady, stderrDone, stderr, stderrLinesCh) @@ -45,9 +47,11 @@ func start(cmd execCmd) (stdoutLines, stderrLines <-chan string, err = cmd.Start() if err != nil { _ = stdout.Close() - _ = stderr.Close() <-stdoutDone + close(stdoutLinesCh) + _ = stderr.Close() <-stderrDone + close(stderrLinesCh) return nil, nil, nil, err } @@ -55,8 +59,10 @@ func start(cmd execCmd) (stdoutLines, stderrLines <-chan string, go func() { err := cmd.Wait() <-stdoutDone - <-stderrDone + close(stdoutLinesCh) _ = stdout.Close() + <-stderrDone + close(stderrLinesCh) _ = stderr.Close() waitErrorCh <- err }() diff --git a/internal/command/startnlog.go b/internal/command/startnlog.go index 59699df7..b5660ecc 100644 --- a/internal/command/startnlog.go +++ b/internal/command/startnlog.go @@ -18,31 +18,37 @@ func (c *Cmder) RunAndLog(ctx context.Context, command string, logger Logger) (e return err } - streamCtx, streamCancel := context.WithCancel(context.Background()) streamDone := make(chan struct{}) - go streamLines(streamCtx, streamDone, logger, stdout, stderr) + go streamLines(streamDone, logger, stdout, stderr) err = <-waitError - streamCancel() <-streamDone return err } -func streamLines(ctx context.Context, done chan<- struct{}, - logger Logger, stdout, stderr <-chan string, +func streamLines(done chan<- struct{}, logger Logger, + stdout, stderr <-chan string, ) { defer close(done) - var line string - for { select { - case <-ctx.Done(): - return - case line = <-stdout: - logger.Info(line) - case line = <-stderr: - logger.Error(line) + case line, ok := <-stdout: + if ok { + logger.Info(line) + } + if stderr == nil { + return + } + stdout = nil + case line, ok := <-stderr: + if ok { + logger.Error(line) + } + if stdout == nil { + return + } + stderr = nil } } } diff --git a/internal/openvpn/run.go b/internal/openvpn/run.go index fde49606..eaaf3f68 100644 --- a/internal/openvpn/run.go +++ b/internal/openvpn/run.go @@ -29,19 +29,16 @@ func (r *Runner) Run(ctx context.Context, errCh chan<- error, ready chan<- struc return } - streamCtx, streamCancel := context.WithCancel(context.Background()) streamDone := make(chan struct{}) - go streamLines(streamCtx, streamDone, r.logger, + go streamLines(streamDone, r.logger, stdoutLines, stderrLines, ready) select { case <-ctx.Done(): <-waitError - streamCancel() <-streamDone errCh <- ctx.Err() case err := <-waitError: - streamCancel() <-streamDone errCh <- err } diff --git a/internal/openvpn/stream.go b/internal/openvpn/stream.go index a6c6883e..c11a4d1a 100644 --- a/internal/openvpn/stream.go +++ b/internal/openvpn/stream.go @@ -1,26 +1,37 @@ package openvpn import ( - "context" "strings" ) -func streamLines(ctx context.Context, done chan<- struct{}, +func streamLines(done chan<- struct{}, logger Logger, stdout, stderr <-chan string, tunnelReady chan<- struct{}, ) { defer close(done) - var line string - for { + var line string + var ok bool errLine := false select { - case <-ctx.Done(): - return - case line = <-stdout: - case line = <-stderr: - errLine = true + case line, ok = <-stdout: + if ok { + break + } + if stderr == nil { + return + } + stdout = nil + case line, ok = <-stderr: + if ok { + errLine = true + break + } + if stdout == nil { + return + } + stderr = nil } line, level := processLogLine(line) if line == "" {