package messaging import ( "errors" "fmt" "log/slog" "sync" ) type connectionProvider interface { NewAmqpConnection(config AmqpConnectionConfig, connectionName string) *AmqpConnection } type defaultAmqpConnectionProvider struct{} func (p defaultAmqpConnectionProvider) NewAmqpConnection(config AmqpConnectionConfig, connectionName string) *AmqpConnection { return NewAmqpConnection(config, connectionName) } var _ connectionProvider = (*defaultAmqpConnectionProvider)(nil) type ConnectionPool interface { Close() error NewHandle() *ConnectionPoolHandle GetConnection(handle *ConnectionPoolHandle) (*AmqpConnection, error) } type AmqpConnectionPool struct { config AmqpConnectionPoolConfig connectionName string connections []*AmqpConnection connectionProvider connectionProvider handleOffset int lock sync.RWMutex } type ConnectionPoolHandle struct { connectionOffset int } func NewDefaultAmqpConnectionPool(config AmqpConnectionConfig, connectionName string) (ConnectionPool, error) { poolConfig := AmqpConnectionPoolConfig{ Parameters: config, PoolSize: 1, } return NewAmqpConnectionPool(poolConfig, connectionName) } func NewAmqpConnectionPool(config AmqpConnectionPoolConfig, connectionName string) (ConnectionPool, error) { if config.PoolSize == 0 { config.PoolSize = 1 } pool := &AmqpConnectionPool{ config: config, connectionName: connectionName, connections: make([]*AmqpConnection, 0), connectionProvider: defaultAmqpConnectionProvider{}, handleOffset: 0, lock: sync.RWMutex{}, } if err := pool.initializeConnections(); err != nil { if closeErr := pool.Close(); closeErr != nil { return nil, errors.Join(err, fmt.Errorf("initialize amqp connection: pool closed: %w", closeErr)) } return nil, fmt.Errorf("initialize connections: %w", err) } return pool, nil } func (p *AmqpConnectionPool) initializeConnections() error { if len(p.connections) < p.config.PoolSize { p.lock.Lock() defer p.lock.Unlock() numMissingConnections := p.config.PoolSize - len(p.connections) for i := 0; i < numMissingConnections; i++ { if err := p.internalAddConnection(); err != nil { return err } } } return nil } func (p *AmqpConnectionPool) internalAddConnection() error { newConnection, err := p.internalNewConnection() if err != nil { return fmt.Errorf("new connection: %w", err) } p.connections = append(p.connections, newConnection) return nil } func (p *AmqpConnectionPool) internalNewConnection() (*AmqpConnection, error) { conn := p.connectionProvider.NewAmqpConnection(p.config.Parameters, p.connectionName) if err := conn.Connect(); err != nil { slog.Warn("amqp connection: failed to connect to amqp broker", slog.Any("err", err)) // retry if err = conn.Connect(); err != nil { connectErr := fmt.Errorf("new internal connection: %w", err) if closeErr := conn.Close(); closeErr != nil { // this case should never happen as the inner connection should always be null, therefore // it should not have to be closed, i.e. be able to return errors. return nil, errors.Join(connectErr, fmt.Errorf("close connection: %w", closeErr)) } return nil, connectErr } } return conn, nil } func (p *AmqpConnectionPool) Close() error { p.lock.Lock() defer p.lock.Unlock() closeErrors := make([]error, 0) for _, conn := range p.connections { if conn != nil { if err := conn.Close(); err != nil { closeErrors = append(closeErrors, fmt.Errorf("pooled connection: %w", err)) } } } p.connections = make([]*AmqpConnection, p.config.PoolSize) if len(closeErrors) > 0 { return errors.Join(closeErrors...) } return nil } func (p *AmqpConnectionPool) NewHandle() *ConnectionPoolHandle { p.lock.Lock() defer p.lock.Unlock() offset := p.handleOffset p.handleOffset += 1 offset = offset % p.config.PoolSize return &ConnectionPoolHandle{ connectionOffset: offset, } } func (p *AmqpConnectionPool) GetConnection(handle *ConnectionPoolHandle) (*AmqpConnection, error) { // get the requested connection or another one conn, addConnection := p.nextConnectionForHandle(handle) // renew the requested connection if the request connection is closed if conn == nil || addConnection { p.lock.Lock() // check that accessing the pool only with a valid index (out of bounds should only occur on shutdown) connectionIndex := p.connectionIndex(handle, 0) if p.connections[connectionIndex] == nil { connection, err := p.internalNewConnection() if err != nil { if conn == nil { // case: connection could not be renewed and no connection to return has been found p.lock.Unlock() return nil, fmt.Errorf("renew connection: %w", err) } // case: connection could not be renewed but another connection will be returned slog.Warn("amqp connection pool: get connection: renew connection: ", slog.Any("err", err)) } else { // case: connection could be renewed and will be added to pool p.connections[connectionIndex] = connection conn = connection } } p.lock.Unlock() } if conn == nil { return nil, fmt.Errorf("amqp connection pool: get connection: failed to obtain connection") } return conn, nil } func (p *AmqpConnectionPool) nextConnectionForHandle(handle *ConnectionPoolHandle) (*AmqpConnection, bool) { // retry as long as there are remaining connections in the pool var conn *AmqpConnection var addConnection bool for i := 0; i < p.config.PoolSize; i++ { // get the next possible connection (considering the retry index) idx := p.connectionIndex(handle, i) p.lock.RLock() if idx < len(p.connections) { conn = p.connections[idx] } else { // handle the edge case that the pool is empty on shutdown conn = nil } p.lock.RUnlock() // remember that the requested is closed, retry with the next if conn == nil { addConnection = true continue } // if the connection is closed, mark it by setting it to nil if conn.IsClosed() { p.lock.Lock() p.connections[idx] = nil p.lock.Unlock() addConnection = true continue } return conn, addConnection } return nil, true } func (p *AmqpConnectionPool) connectionIndex(handle *ConnectionPoolHandle, iteration int) int { if iteration+handle.connectionOffset >= p.config.PoolSize { return (iteration + handle.connectionOffset) % p.config.PoolSize } else { return iteration + handle.connectionOffset } }