audit-go/audit/messaging/amqp_connection_pool.go
Christian Schaible 3eb803ae1c Merged PR 724980: fix: Set default connection pool size to 2
Updating the default connection pool size after clarifying with the Integration platform team.

Security-concept-update-needed: false.

JIRA Work Item: STACKITRMA-68
2025-01-29 15:07:27 +00:00

230 lines
6.4 KiB
Go

package messaging
import (
"errors"
"fmt"
"log/slog"
"sync"
)
type connectionProvider interface {
NewAmqpConnection(config AmqpConnectionConfig, connectionName string) *AmqpConnection
}
type defaultAmqpConnectionProvider struct{}
func (p defaultAmqpConnectionProvider) NewAmqpConnection(config AmqpConnectionConfig, connectionName string) *AmqpConnection {
return NewAmqpConnection(config, connectionName)
}
var _ connectionProvider = (*defaultAmqpConnectionProvider)(nil)
type ConnectionPool interface {
Close() error
NewHandle() *ConnectionPoolHandle
GetConnection(handle *ConnectionPoolHandle) (*AmqpConnection, error)
}
type AmqpConnectionPool struct {
config AmqpConnectionPoolConfig
connectionName string
connections []*AmqpConnection
connectionProvider connectionProvider
handleOffset int
lock sync.RWMutex
}
type ConnectionPoolHandle struct {
connectionOffset int
}
func NewDefaultAmqpConnectionPool(config AmqpConnectionConfig, connectionName string) (ConnectionPool, error) {
poolConfig := AmqpConnectionPoolConfig{
Parameters: config,
PoolSize: 2,
}
return NewAmqpConnectionPool(poolConfig, connectionName)
}
func NewAmqpConnectionPool(config AmqpConnectionPoolConfig, connectionName string) (ConnectionPool, error) {
if config.PoolSize == 0 {
config.PoolSize = 2
}
pool := &AmqpConnectionPool{
config: config,
connectionName: connectionName,
connections: make([]*AmqpConnection, 0),
connectionProvider: defaultAmqpConnectionProvider{},
handleOffset: 0,
lock: sync.RWMutex{},
}
if err := pool.initializeConnections(); err != nil {
if closeErr := pool.Close(); closeErr != nil {
return nil, errors.Join(err, fmt.Errorf("initialize amqp connection: pool closed: %w", closeErr))
}
return nil, fmt.Errorf("initialize connections: %w", err)
}
return pool, nil
}
func (p *AmqpConnectionPool) initializeConnections() error {
if len(p.connections) < p.config.PoolSize {
p.lock.Lock()
defer p.lock.Unlock()
numMissingConnections := p.config.PoolSize - len(p.connections)
for i := 0; i < numMissingConnections; i++ {
if err := p.internalAddConnection(); err != nil {
return err
}
}
}
return nil
}
func (p *AmqpConnectionPool) internalAddConnection() error {
newConnection, err := p.internalNewConnection()
if err != nil {
return fmt.Errorf("new connection: %w", err)
}
p.connections = append(p.connections, newConnection)
return nil
}
func (p *AmqpConnectionPool) internalNewConnection() (*AmqpConnection, error) {
conn := p.connectionProvider.NewAmqpConnection(p.config.Parameters, p.connectionName)
if err := conn.Connect(); err != nil {
slog.Warn("amqp connection: failed to connect to amqp broker", slog.Any("err", err))
// retry
if err = conn.Connect(); err != nil {
connectErr := fmt.Errorf("new internal connection: %w", err)
if closeErr := conn.Close(); closeErr != nil {
// this case should never happen as the inner connection should always be null, therefore
// it should not have to be closed, i.e. be able to return errors.
return nil, errors.Join(connectErr, fmt.Errorf("close connection: %w", closeErr))
}
return nil, connectErr
}
}
return conn, nil
}
func (p *AmqpConnectionPool) Close() error {
p.lock.Lock()
defer p.lock.Unlock()
closeErrors := make([]error, 0)
for _, conn := range p.connections {
if conn != nil {
if err := conn.Close(); err != nil {
closeErrors = append(closeErrors, fmt.Errorf("pooled connection: %w", err))
}
}
}
p.connections = make([]*AmqpConnection, p.config.PoolSize)
if len(closeErrors) > 0 {
return errors.Join(closeErrors...)
}
return nil
}
func (p *AmqpConnectionPool) NewHandle() *ConnectionPoolHandle {
p.lock.Lock()
defer p.lock.Unlock()
offset := p.handleOffset
p.handleOffset += 1
offset = offset % p.config.PoolSize
return &ConnectionPoolHandle{
connectionOffset: offset,
}
}
func (p *AmqpConnectionPool) GetConnection(handle *ConnectionPoolHandle) (*AmqpConnection, error) {
// get the requested connection or another one
conn, addConnection := p.nextConnectionForHandle(handle)
// renew the requested connection if the request connection is closed
if conn == nil || addConnection {
p.lock.Lock()
// check that accessing the pool only with a valid index (out of bounds should only occur on shutdown)
connectionIndex := p.connectionIndex(handle, 0)
if p.connections[connectionIndex] == nil {
connection, err := p.internalNewConnection()
if err != nil {
if conn == nil {
// case: connection could not be renewed and no connection to return has been found
p.lock.Unlock()
return nil, fmt.Errorf("renew connection: %w", err)
}
// case: connection could not be renewed but another connection will be returned
slog.Warn("amqp connection pool: get connection: renew connection: ", slog.Any("err", err))
} else {
// case: connection could be renewed and will be added to pool
p.connections[connectionIndex] = connection
conn = connection
}
}
p.lock.Unlock()
}
if conn == nil {
return nil, fmt.Errorf("amqp connection pool: get connection: failed to obtain connection")
}
return conn, nil
}
func (p *AmqpConnectionPool) nextConnectionForHandle(handle *ConnectionPoolHandle) (*AmqpConnection, bool) {
// retry as long as there are remaining connections in the pool
var conn *AmqpConnection
var addConnection bool
for i := 0; i < p.config.PoolSize; i++ {
// get the next possible connection (considering the retry index)
idx := p.connectionIndex(handle, i)
p.lock.RLock()
if idx < len(p.connections) {
conn = p.connections[idx]
} else {
// handle the edge case that the pool is empty on shutdown
conn = nil
}
p.lock.RUnlock()
// remember that the requested is closed, retry with the next
if conn == nil {
addConnection = true
continue
}
// if the connection is closed, mark it by setting it to nil
if conn.IsClosed() {
p.lock.Lock()
p.connections[idx] = nil
p.lock.Unlock()
addConnection = true
continue
}
return conn, addConnection
}
return nil, true
}
func (p *AmqpConnectionPool) connectionIndex(handle *ConnectionPoolHandle, iteration int) int {
if iteration+handle.connectionOffset >= p.config.PoolSize {
return (iteration + handle.connectionOffset) % p.config.PoolSize
} else {
return iteration + handle.connectionOffset
}
}