Files
Quentin McGaw 4a78989d9d chore: do not use sentinel errors when unneeded
- main reason being it's a burden to always define sentinel errors at global scope, wrap them with `%w` instead of using a string directly
- only use sentinel errors when it has to be checked using `errors.Is`
- replace all usage of these sentinel errors in `fmt.Errorf` with direct strings that were in the sentinel error
- exclude the sentinel error definition requirement from .golangci.yml
- update unit tests to use ContainersError instead of ErrorIs so it stays as a "not a change detector test" without requiring a sentinel error
2026-05-02 03:29:46 +00:00

191 lines
4.6 KiB
Go

package portforward
import (
"context"
"errors"
"fmt"
"net/http"
"sync"
"time"
"github.com/qdm12/gluetun/internal/configuration/settings"
"github.com/qdm12/gluetun/internal/portforward/service"
)
type Loop struct {
// State
settings Settings
settingsMutex sync.RWMutex
service Service
// Fixed injected objects
routing Routing
client *http.Client
portAllower PortAllower
logger Logger
cmder Cmder
// Fixed parameters
uid, gid int
// Internal channels and locks
// runCtx is used to detect when the loop has exited
// when performing an update
runCtx context.Context //nolint:containedctx
runCancel context.CancelFunc
runDone <-chan struct{}
updateTrigger chan<- Settings
updatedResult <-chan error
}
func NewLoop(settings settings.PortForwarding, routing Routing,
client *http.Client, portAllower PortAllower,
logger Logger, cmder Cmder, uid, gid int,
) *Loop {
return &Loop{
settings: Settings{
VPNIsUp: ptrTo(false),
Service: service.Settings{
Enabled: settings.Enabled,
Filepath: *settings.Filepath,
UpCommand: *settings.UpCommand,
DownCommand: *settings.DownCommand,
ListeningPorts: settings.ListeningPorts,
PortsCount: settings.PortsCount,
},
},
routing: routing,
client: client,
portAllower: portAllower,
logger: logger,
cmder: cmder,
uid: uid,
gid: gid,
}
}
func (l *Loop) String() string {
return "port forwarding loop"
}
func (l *Loop) Start(_ context.Context) (runError <-chan error, _ error) {
l.runCtx, l.runCancel = context.WithCancel(context.Background())
runDone := make(chan struct{})
l.runDone = runDone
updateTrigger := make(chan Settings)
l.updateTrigger = updateTrigger
updateResult := make(chan error)
l.updatedResult = updateResult
runErrorCh := make(chan error)
go l.run(l.runCtx, runDone, runErrorCh, updateTrigger, updateResult)
return runErrorCh, nil
}
func (l *Loop) run(runCtx context.Context, runDone chan<- struct{},
runErrorCh chan<- error, updateTrigger <-chan Settings,
updateResult chan<- error,
) {
defer close(runDone)
var serviceRunError <-chan error
var retryAfter <-chan time.Time
const retryDelay = 5 * time.Second
for {
updateReceived := false
select {
case <-runCtx.Done():
// Stop call takes care of stopping the service
return
case partialUpdate := <-updateTrigger:
updatedSettings, err := l.settings.updateWith(partialUpdate, *l.settings.VPNIsUp)
if err != nil {
updateResult <- err
continue
}
updateReceived = true
l.settingsMutex.Lock()
l.settings = updatedSettings
l.settingsMutex.Unlock()
case err := <-serviceRunError:
l.logger.Error(err.Error())
case <-retryAfter:
// Retry starting the service after a delay
retryAfter = nil
}
if l.service != nil {
err := l.service.Stop()
if err != nil {
runErrorCh <- fmt.Errorf("stopping previous service: %w", err)
return
}
}
serviceSettings := l.settings.Service.Copy()
// Only enable port forward if the VPN tunnel is up
*serviceSettings.Enabled = *serviceSettings.Enabled && *l.settings.VPNIsUp
l.service = service.New(serviceSettings, l.routing, l.client,
l.portAllower, l.logger, l.cmder, l.uid, l.gid)
var err error
serviceRunError, err = l.service.Start(runCtx)
if updateReceived {
// Signal to the Update call that the service has started
// and if it failed to start.
if err != nil {
err = fmt.Errorf("starting port forwarding service: %w", err)
}
updateResult <- err
} else if err != nil {
// Log the error and schedule a retry
l.logger.Errorf("starting port forwarding service: %s - retrying in %s", err, retryDelay)
retryAfter = time.After(retryDelay)
}
}
}
func (l *Loop) UpdateWith(partialUpdate Settings) (err error) {
select {
case l.updateTrigger <- partialUpdate:
select {
case err = <-l.updatedResult:
return err
case <-l.runCtx.Done():
return l.runCtx.Err()
}
case <-l.runCtx.Done():
// loop has been stopped, no update can be done
return l.runCtx.Err()
}
}
func (l *Loop) Stop() (err error) {
l.runCancel()
<-l.runDone
if l.service != nil {
return l.service.Stop()
}
return nil
}
func (l *Loop) GetPortsForwarded() (ports []uint16) {
if l.service == nil {
return nil
}
return l.service.GetPortsForwarded()
}
func (l *Loop) SetPortsForwarded(ports []uint16) (err error) {
if l.service == nil {
return errors.New("port forwarding service not started")
}
return l.service.SetPortsForwarded(l.runCtx, ports)
}
func ptrTo[T any](value T) *T {
return &value
}