mirror of
https://github.com/coder/coder.git
synced 2026-06-05 05:58:20 +00:00
2840fdcb54
relates to: https://github.com/coder/internal/issues/1094 This is number 2 of 5 pull requests in an effort to add agent script ordering. It adds a drpc API that is exposed via a local socket. This API serves access to a lightweight DAG based dependency manager that was inspired by systemd. In follow-up PRs: * This unit manager will be plumbed into the workspace agent struct. * CLI commands will use this agentsocket api to express dependencies between coder scripts I used an LLM to produce some of these changes, but I have conducted thorough self review and consider this contribution to be ready for an external reviewer.
186 lines
4.1 KiB
Go
186 lines
4.1 KiB
Go
package agentsocket
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"net"
|
|
"sync"
|
|
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/hashicorp/yamux"
|
|
"storj.io/drpc/drpcmux"
|
|
"storj.io/drpc/drpcserver"
|
|
|
|
"cdr.dev/slog"
|
|
"github.com/coder/coder/v2/agent/agentsocket/proto"
|
|
"github.com/coder/coder/v2/agent/unit"
|
|
"github.com/coder/coder/v2/codersdk/drpcsdk"
|
|
)
|
|
|
|
// Server provides access to the DRPCAgentSocketService via a Unix domain socket.
|
|
// Do not invoke Server{} directly. Use NewServer() instead.
|
|
type Server struct {
|
|
logger slog.Logger
|
|
path string
|
|
drpcServer *drpcserver.Server
|
|
service *DRPCAgentSocketService
|
|
|
|
mu sync.Mutex
|
|
listener net.Listener
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func NewServer(path string, logger slog.Logger) (*Server, error) {
|
|
logger = logger.Named("agentsocket-server")
|
|
server := &Server{
|
|
logger: logger,
|
|
path: path,
|
|
service: &DRPCAgentSocketService{
|
|
logger: logger,
|
|
unitManager: unit.NewManager(),
|
|
},
|
|
}
|
|
|
|
mux := drpcmux.New()
|
|
err := proto.DRPCRegisterAgentSocket(mux, server.service)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("failed to register drpc service: %w", err)
|
|
}
|
|
|
|
server.drpcServer = drpcserver.NewWithOptions(mux, drpcserver.Options{
|
|
Manager: drpcsdk.DefaultDRPCOptions(nil),
|
|
Log: func(err error) {
|
|
if errors.Is(err, context.Canceled) ||
|
|
errors.Is(err, context.DeadlineExceeded) {
|
|
return
|
|
}
|
|
logger.Debug(context.Background(), "drpc server error", slog.Error(err))
|
|
},
|
|
})
|
|
|
|
if server.path == "" {
|
|
var err error
|
|
server.path, err = getDefaultSocketPath()
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("get default socket path: %w", err)
|
|
}
|
|
}
|
|
|
|
listener, err := createSocket(server.path)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("create socket: %w", err)
|
|
}
|
|
|
|
server.listener = listener
|
|
|
|
// This context is canceled by server.Close().
|
|
// canceling it will close all connections.
|
|
server.ctx, server.cancel = context.WithCancel(context.Background())
|
|
|
|
server.logger.Info(server.ctx, "agent socket server started", slog.F("path", server.path))
|
|
|
|
server.wg.Add(1)
|
|
go func() {
|
|
defer server.wg.Done()
|
|
server.acceptConnections()
|
|
}()
|
|
|
|
return server, nil
|
|
}
|
|
|
|
func (s *Server) Close() error {
|
|
s.mu.Lock()
|
|
|
|
if s.listener == nil {
|
|
s.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
s.logger.Info(s.ctx, "stopping agent socket server")
|
|
|
|
s.cancel()
|
|
|
|
if err := s.listener.Close(); err != nil {
|
|
s.logger.Warn(s.ctx, "error closing socket listener", slog.Error(err))
|
|
}
|
|
|
|
s.listener = nil
|
|
|
|
s.mu.Unlock()
|
|
|
|
// Wait for all connections to finish
|
|
s.wg.Wait()
|
|
|
|
if err := cleanupSocket(s.path); err != nil {
|
|
s.logger.Warn(s.ctx, "error cleaning up socket file", slog.Error(err))
|
|
}
|
|
|
|
s.logger.Info(s.ctx, "agent socket server stopped")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) acceptConnections() {
|
|
// In an edge case, Close() might race with acceptConnections() and set s.listener to nil.
|
|
// Therefore, we grab a copy of the listener under a lock. We might still get a nil listener,
|
|
// but then we know close has already run and we can return early.
|
|
s.mu.Lock()
|
|
listener := s.listener
|
|
s.mu.Unlock()
|
|
if listener == nil {
|
|
return
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
conn, err := listener.Accept()
|
|
if err != nil {
|
|
s.logger.Warn(s.ctx, "error accepting connection", slog.Error(err))
|
|
continue
|
|
}
|
|
|
|
s.mu.Lock()
|
|
if s.listener == nil {
|
|
s.mu.Unlock()
|
|
_ = conn.Close()
|
|
return
|
|
}
|
|
s.wg.Add(1)
|
|
s.mu.Unlock()
|
|
|
|
go func() {
|
|
defer s.wg.Done()
|
|
s.handleConnection(conn)
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (s *Server) handleConnection(conn net.Conn) {
|
|
defer conn.Close()
|
|
|
|
s.logger.Debug(s.ctx, "new connection accepted", slog.F("remote_addr", conn.RemoteAddr()))
|
|
|
|
config := yamux.DefaultConfig()
|
|
config.LogOutput = nil
|
|
config.Logger = slog.Stdlib(s.ctx, s.logger.Named("agentsocket-yamux"), slog.LevelInfo)
|
|
session, err := yamux.Server(conn, config)
|
|
if err != nil {
|
|
s.logger.Warn(s.ctx, "failed to create yamux session", slog.Error(err))
|
|
return
|
|
}
|
|
defer session.Close()
|
|
|
|
err = s.drpcServer.Serve(s.ctx, session)
|
|
if err != nil {
|
|
s.logger.Debug(s.ctx, "drpc server finished", slog.Error(err))
|
|
}
|
|
}
|