audit-go/audit/messaging/amqp_connection_pool.go
Christian Schaible 5742604629 Merged PR 716929: feat: Replace AMQP connection management
So far the SDK provided a messaging API that was not thread-safe (i.e. goroutine-safe). Additionally the SDK provided a MutexAPI which made it thread-safe at the cost of removed concurrency possibilities. The changes implemented in this commit replace both implementations with a thread-safe connection pool based solution.

The api gateway is a SDK user that requires reliable high performance send capabilities with a limit amount of amqp connections. These changes in the PR try address their requirements by moving the responsibility of connection management into the SDK. From this change other SDK users will benefit as well.

Security-concept-update-needed: false.

JIRA Work Item: STACKITALO-62
2025-01-27 13:23:54 +00:00

219 lines
6 KiB
Go

package messaging
import (
"errors"
"fmt"
"log/slog"
"sync"
)
type connectionProvider interface {
NewAmqpConnection(config AmqpConnectionConfig, connectionName string) *AmqpConnection
}
type defaultAmqpConnectionProvider struct{}
func (p defaultAmqpConnectionProvider) NewAmqpConnection(config AmqpConnectionConfig, connectionName string) *AmqpConnection {
return NewAmqpConnection(config, connectionName)
}
var _ connectionProvider = (*defaultAmqpConnectionProvider)(nil)
type ConnectionPool interface {
Close() error
NewHandle() *ConnectionPoolHandle
GetConnection(handle *ConnectionPoolHandle) (*AmqpConnection, error)
}
type AmqpConnectionPool struct {
config AmqpConnectionPoolConfig
connectionName string
connections []*AmqpConnection
connectionProvider connectionProvider
handleOffset int
lock sync.RWMutex
}
type ConnectionPoolHandle struct {
connectionOffset int
}
func NewAmqpConnectionPool(config AmqpConnectionPoolConfig, connectionName string) (ConnectionPool, error) {
pool := &AmqpConnectionPool{
config: config,
connectionName: connectionName,
connections: make([]*AmqpConnection, 0),
connectionProvider: defaultAmqpConnectionProvider{},
handleOffset: 0,
lock: sync.RWMutex{},
}
if err := pool.initializeConnections(); err != nil {
if closeErr := pool.Close(); closeErr != nil {
return nil, errors.Join(err, fmt.Errorf("initialize amqp connection: pool closed: %w", closeErr))
}
return nil, fmt.Errorf("initialize connections: %w", err)
}
return pool, nil
}
func (p *AmqpConnectionPool) initializeConnections() error {
if len(p.connections) < p.config.PoolSize {
p.lock.Lock()
defer p.lock.Unlock()
numMissingConnections := p.config.PoolSize - len(p.connections)
for i := 0; i < numMissingConnections; i++ {
if err := p.internalAddConnection(); err != nil {
return err
}
}
}
return nil
}
func (p *AmqpConnectionPool) internalAddConnection() error {
newConnection, err := p.internalNewConnection()
if err != nil {
return fmt.Errorf("new connection: %w", err)
}
p.connections = append(p.connections, newConnection)
return nil
}
func (p *AmqpConnectionPool) internalNewConnection() (*AmqpConnection, error) {
conn := p.connectionProvider.NewAmqpConnection(p.config.Parameters, p.connectionName)
if err := conn.Connect(); err != nil {
slog.Warn("amqp connection: failed to connect to amqp broker", slog.Any("err", err))
// retry
if err = conn.Connect(); err != nil {
connectErr := fmt.Errorf("new internal connection: %w", err)
if closeErr := conn.Close(); closeErr != nil {
// this case should never happen as the inner connection should always be null, therefore
// it should not have to be closed, i.e. be able to return errors.
return nil, errors.Join(connectErr, fmt.Errorf("close connection: %w", closeErr))
}
return nil, connectErr
}
}
return conn, nil
}
func (p *AmqpConnectionPool) Close() error {
p.lock.Lock()
defer p.lock.Unlock()
closeErrors := make([]error, 0)
for _, conn := range p.connections {
if conn != nil {
if err := conn.Close(); err != nil {
closeErrors = append(closeErrors, fmt.Errorf("pooled connection: %w", err))
}
}
}
p.connections = make([]*AmqpConnection, p.config.PoolSize)
if len(closeErrors) > 0 {
return errors.Join(closeErrors...)
}
return nil
}
func (p *AmqpConnectionPool) NewHandle() *ConnectionPoolHandle {
p.lock.Lock()
defer p.lock.Unlock()
offset := p.handleOffset
p.handleOffset += 1
offset = offset % p.config.PoolSize
return &ConnectionPoolHandle{
connectionOffset: offset,
}
}
func (p *AmqpConnectionPool) GetConnection(handle *ConnectionPoolHandle) (*AmqpConnection, error) {
// get the requested connection or another one
conn, addConnection := p.nextConnectionForHandle(handle)
// renew the requested connection if the request connection is closed
if conn == nil || addConnection {
p.lock.Lock()
// check that accessing the pool only with a valid index (out of bounds should only occur on shutdown)
connectionIndex := p.connectionIndex(handle, 0)
if p.connections[connectionIndex] == nil {
connection, err := p.internalNewConnection()
if err != nil {
if conn == nil {
// case: connection could not be renewed and no connection to return has been found
p.lock.Unlock()
return nil, fmt.Errorf("renew connection: %w", err)
}
// case: connection could not be renewed but another connection will be returned
slog.Warn("amqp connection pool: get connection: renew connection: ", slog.Any("err", err))
} else {
// case: connection could be renewed and will be added to pool
p.connections[connectionIndex] = connection
conn = connection
}
}
p.lock.Unlock()
}
if conn == nil {
return nil, fmt.Errorf("amqp connection pool: get connection: failed to obtain connection")
}
return conn, nil
}
func (p *AmqpConnectionPool) nextConnectionForHandle(handle *ConnectionPoolHandle) (*AmqpConnection, bool) {
// retry as long as there are remaining connections in the pool
var conn *AmqpConnection
var addConnection bool
for i := 0; i < p.config.PoolSize; i++ {
// get the next possible connection (considering the retry index)
idx := p.connectionIndex(handle, i)
p.lock.RLock()
if idx < len(p.connections) {
conn = p.connections[idx]
} else {
// handle the edge case that the pool is empty on shutdown
conn = nil
}
p.lock.RUnlock()
// remember that the requested is closed, retry with the next
if conn == nil {
addConnection = true
continue
}
// if the connection is closed, mark it by setting it to nil
if conn.IsClosed() {
p.lock.Lock()
p.connections[idx] = nil
p.lock.Unlock()
addConnection = true
continue
}
return conn, addConnection
}
return nil, true
}
func (p *AmqpConnectionPool) connectionIndex(handle *ConnectionPoolHandle, iteration int) int {
if iteration+handle.connectionOffset >= p.config.PoolSize {
return (iteration + handle.connectionOffset) % p.config.PoolSize
} else {
return iteration + handle.connectionOffset
}
}