audit-go/internal/messaging/amqp_connection_pool_test.go
Christian Schaible (EXT) 85aae1c2e7 Merged PR 779949: feat: Refactor module structure to reflect best practices
Security-concept-update-needed: false.

JIRA Work Item: STACKITALO-259
2025-05-19 11:54:00 +00:00

581 lines
19 KiB
Go

package messaging
import (
"errors"
"fmt"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
pkgMessagingCommon "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/messaging/common"
)
type connectionProviderMock struct {
mock.Mock
}
func (p *connectionProviderMock) NewAmqpConnection(config pkgMessagingCommon.AmqpConnectionConfig, connectionName string) *AmqpConnection {
args := p.Called(config, connectionName)
return args.Get(0).(*AmqpConnection)
}
var _ connectionProvider = (*connectionProviderMock)(nil)
func Test_AmqpConnectionPool_GetHandle(t *testing.T) {
t.Run("next handle", func(t *testing.T) {
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 0,
Lock: sync.RWMutex{},
}
handle := pool.NewHandle()
assert.NotNil(t, handle)
assert.Equal(t, 0, handle.ConnectionOffset)
assert.Equal(t, 1, pool.HandleOffset)
})
t.Run("next handle high offset", func(t *testing.T) {
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 13,
Lock: sync.RWMutex{},
}
handle := pool.NewHandle()
assert.NotNil(t, handle)
assert.Equal(t, 3, handle.ConnectionOffset)
assert.Equal(t, 14, pool.HandleOffset)
})
}
func Test_AmqpConnectionPool_internalAddConnection(t *testing.T) {
t.Run("internal add connection", func(t *testing.T) {
conn := &amqpConnMock{}
dialer := &amqpDialMock{}
dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(conn, nil)
connection := &AmqpConnection{
ConnectionName: "test",
Lock: sync.RWMutex{},
Dialer: dialer,
}
connectionProvider := &connectionProviderMock{}
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection)
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 0,
Lock: sync.RWMutex{},
ConnectionProvider: connectionProvider,
}
err := pool.internalAddConnection()
assert.NoError(t, err)
assert.Equal(t, 1, len(pool.Connections))
connectionProvider.AssertNumberOfCalls(t, "NewAmqpConnection", 1)
dialer.AssertNumberOfCalls(t, "Dial", 1)
})
t.Run("dialer error", func(t *testing.T) {
conn := &amqpConnMock{}
dialer := &amqpDialMock{}
var c *amqpConnMock = nil
dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(c, errors.New("test error")).Once()
dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(conn, nil)
connection := &AmqpConnection{
ConnectionName: "test",
Lock: sync.RWMutex{},
Dialer: dialer,
}
connectionProvider := &connectionProviderMock{}
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection)
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 0,
Lock: sync.RWMutex{},
ConnectionProvider: connectionProvider,
}
err := pool.internalAddConnection()
assert.NoError(t, err)
assert.Equal(t, 1, len(pool.Connections))
connectionProvider.AssertNumberOfCalls(t, "NewAmqpConnection", 1)
dialer.AssertNumberOfCalls(t, "Dial", 2)
})
t.Run("repetitive dialer error", func(t *testing.T) {
dialer := &amqpDialMock{}
var c *amqpConnMock = nil
dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(c, errors.New("test error"))
connection := &AmqpConnection{
ConnectionName: "test",
Lock: sync.RWMutex{},
Dialer: dialer,
}
connectionProvider := &connectionProviderMock{}
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection)
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 0,
Lock: sync.RWMutex{},
ConnectionProvider: connectionProvider,
}
err := pool.internalAddConnection()
assert.EqualError(t, err, "new connection: new internal connection: internal connect: dial: test error")
assert.Equal(t, 0, len(pool.Connections))
connectionProvider.AssertNumberOfCalls(t, "NewAmqpConnection", 1)
dialer.AssertNumberOfCalls(t, "Dial", 2)
})
}
func Test_AmqpConnectionPool_initializeConnections(t *testing.T) {
t.Run("initialize connections successfully", func(t *testing.T) {
conn := &amqpConnMock{}
dialer := &amqpDialMock{}
dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(conn, nil)
connection := &AmqpConnection{
ConnectionName: "test",
Lock: sync.RWMutex{},
Dialer: dialer,
}
connectionProvider := &connectionProviderMock{}
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection)
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 0,
Lock: sync.RWMutex{},
ConnectionProvider: connectionProvider,
}
err := pool.initializeConnections()
assert.NoError(t, err)
assert.Equal(t, 5, len(pool.Connections))
connectionProvider.AssertNumberOfCalls(t, "NewAmqpConnection", 5)
})
t.Run("fail initialization of connections", func(t *testing.T) {
var c *amqpConnMock = nil
failingDialer := &amqpDialMock{}
failingDialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(c, errors.New("test error"))
failingConnection := &AmqpConnection{
ConnectionName: "test",
Lock: sync.RWMutex{},
Dialer: failingDialer,
}
conn := &amqpConnMock{}
successfulDialer := &amqpDialMock{}
successfulDialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(conn, nil)
successfulConnection := &AmqpConnection{
ConnectionName: "test",
Lock: sync.RWMutex{},
Dialer: successfulDialer,
}
connectionProvider := &connectionProviderMock{}
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(successfulConnection).Times(4)
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(failingConnection)
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 0,
Lock: sync.RWMutex{},
ConnectionProvider: connectionProvider,
}
err := pool.initializeConnections()
assert.EqualError(t, err, "new connection: new internal connection: internal connect: dial: test error")
assert.Equal(t, 4, len(pool.Connections))
connectionProvider.AssertNumberOfCalls(t, "NewAmqpConnection", 5)
})
}
func Test_AmqpConnectionPool_Close(t *testing.T) {
t.Run("close connection successfully", func(t *testing.T) {
// add 5 connections to the pool
conn := &amqpConnMock{}
conn.On("Close").Return(nil)
dialer := &amqpDialMock{}
dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(conn, nil)
connection := &AmqpConnection{
ConnectionName: "test",
Lock: sync.RWMutex{},
Dialer: dialer,
}
connectionProvider := &connectionProviderMock{}
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection)
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 0,
Lock: sync.RWMutex{},
ConnectionProvider: connectionProvider,
}
err := pool.initializeConnections()
assert.NoError(t, err)
assert.Equal(t, 5, len(pool.Connections))
// close the pool
err = pool.Close()
assert.NoError(t, err)
assert.Equal(t, 5, len(pool.Connections))
for _, c := range pool.Connections {
assert.Nil(t, c)
}
})
t.Run("close connection fail", func(t *testing.T) {
// add 5 connections to the pool
failingConn := &amqpConnMock{}
failingConn.On("Close").Return(errors.New("test error"))
failingDialer := &amqpDialMock{}
failingDialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(failingConn, nil)
failingConnection := &AmqpConnection{
ConnectionName: "test",
Lock: sync.RWMutex{},
Dialer: failingDialer,
}
successfulConn := &amqpConnMock{}
successfulConn.On("Close").Return(nil)
successfulDialer := &amqpDialMock{}
successfulDialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(successfulConn, nil)
successfulConnection := &AmqpConnection{
ConnectionName: "test",
Lock: sync.RWMutex{},
Dialer: successfulDialer,
}
connectionProvider := &connectionProviderMock{}
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(successfulConnection).Times(2)
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(failingConnection).Times(2)
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(successfulConnection).Times(1)
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 0,
Lock: sync.RWMutex{},
ConnectionProvider: connectionProvider,
}
err := pool.initializeConnections()
assert.NoError(t, err)
assert.Equal(t, 5, len(pool.Connections))
// close the pool
err = pool.Close()
assert.EqualError(t, err, "pooled connection: internal close: connection close: test error\npooled connection: internal close: connection close: test error")
assert.Equal(t, 5, len(pool.Connections))
for _, c := range pool.Connections {
assert.Nil(t, c)
}
})
}
func Test_AmqpConnectionPool_nextConnectionForHandle(t *testing.T) {
channelReceiver := func(channel chan struct{}) <-chan struct{} {
return channel
}
newActiveConnection := func() *AmqpConnection {
channel := make(chan struct{})
conn := &amqpConnMock{}
conn.On("Done", mock.Anything).Return(channelReceiver(channel))
return &AmqpConnection{
ConnectionName: "test",
Lock: sync.RWMutex{},
Conn: conn,
}
}
newClosedConnection := func() *AmqpConnection {
channel := make(chan struct{})
close(channel)
conn := &amqpConnMock{}
conn.On("Done", mock.Anything).Return(channelReceiver(channel))
return &AmqpConnection{
ConnectionName: "test",
Lock: sync.RWMutex{},
Conn: conn,
}
}
t.Run("next connection for requested handle", func(t *testing.T) {
connections := make([]*AmqpConnection, 0)
for i := 0; i < 5; i++ {
connections = append(connections, newActiveConnection())
}
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 0,
Lock: sync.RWMutex{},
Connections: connections,
}
connection, addConnection := pool.nextConnectionForHandle(&ConnectionPoolHandle{ConnectionOffset: 1})
assert.NotNil(t, connection)
assert.False(t, addConnection)
})
t.Run("nil connection for requested handle", func(t *testing.T) {
connections := make([]*AmqpConnection, 0)
connections = append(connections, newActiveConnection())
connections = append(connections, nil)
connections = append(connections, nil)
connections = append(connections, newActiveConnection())
connections = append(connections, newActiveConnection())
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 0,
Lock: sync.RWMutex{},
Connections: connections,
}
connection, addConnection := pool.nextConnectionForHandle(&ConnectionPoolHandle{ConnectionOffset: 1})
assert.NotNil(t, connection)
assert.True(t, addConnection)
})
t.Run("closed connection for requested handle", func(t *testing.T) {
connections := make([]*AmqpConnection, 0)
connections = append(connections, newActiveConnection())
connections = append(connections, newClosedConnection())
connections = append(connections, newClosedConnection())
connections = append(connections, newActiveConnection())
connections = append(connections, newActiveConnection())
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 0,
Lock: sync.RWMutex{},
Connections: connections,
}
connection, addConnection := pool.nextConnectionForHandle(&ConnectionPoolHandle{ConnectionOffset: 1})
assert.NotNil(t, connection)
assert.True(t, addConnection)
})
t.Run("no connection for requested handle", func(t *testing.T) {
connections := make([]*AmqpConnection, 0)
connections = append(connections, nil)
connections = append(connections, nil)
connections = append(connections, nil)
connections = append(connections, nil)
connections = append(connections, nil)
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 0,
Lock: sync.RWMutex{},
Connections: connections,
}
connection, addConnection := pool.nextConnectionForHandle(&ConnectionPoolHandle{ConnectionOffset: 1})
assert.Nil(t, connection)
assert.True(t, addConnection)
})
t.Run("connection for requested handle with large index", func(t *testing.T) {
connections := make([]*AmqpConnection, 0)
connections = append(connections, nil)
connections = append(connections, nil)
connections = append(connections, nil)
connections = append(connections, newActiveConnection())
connections = append(connections, nil)
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 0,
Lock: sync.RWMutex{},
Connections: connections,
}
connection, addConnection := pool.nextConnectionForHandle(&ConnectionPoolHandle{ConnectionOffset: 23})
assert.NotNil(t, connection)
assert.False(t, addConnection)
})
t.Run("connection for requested handle nil with large index", func(t *testing.T) {
connections := make([]*AmqpConnection, 0)
connections = append(connections, nil)
connections = append(connections, nil)
connections = append(connections, nil)
connections = append(connections, nil)
connections = append(connections, newActiveConnection())
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 0,
Lock: sync.RWMutex{},
Connections: connections,
}
connection, addConnection := pool.nextConnectionForHandle(&ConnectionPoolHandle{ConnectionOffset: 23})
assert.NotNil(t, connection)
assert.True(t, addConnection)
})
}
func Test_AmqpConnectionPool_GetConnection(t *testing.T) {
channelReceiver := func(channel chan struct{}) <-chan struct{} {
return channel
}
newActiveConnection := func() *AmqpConnection {
channel := make(chan struct{})
conn := &amqpConnMock{}
conn.On("Done", mock.Anything).Return(channelReceiver(channel))
return &AmqpConnection{
ConnectionName: "test",
Lock: sync.RWMutex{},
Conn: conn,
}
}
t.Run("get connection for requested handle", func(t *testing.T) {
connections := make([]*AmqpConnection, 0)
for i := 0; i < 5; i++ {
connections = append(connections, newActiveConnection())
}
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 0,
Lock: sync.RWMutex{},
Connections: connections,
}
connection, err := pool.GetConnection(&ConnectionPoolHandle{ConnectionOffset: 1})
assert.NoError(t, err)
assert.NotNil(t, connection)
assert.Equal(t, connections[1], connection)
assert.Equal(t, 5, len(connections))
})
t.Run("add connection if missing", func(t *testing.T) {
connections := make([]*AmqpConnection, 5)
connectionProvider := &connectionProviderMock{}
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(newActiveConnection())
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 0,
Lock: sync.RWMutex{},
Connections: connections,
ConnectionProvider: connectionProvider,
}
connection, err := pool.GetConnection(&ConnectionPoolHandle{ConnectionOffset: 1})
assert.NoError(t, err)
assert.NotNil(t, connection)
assert.Equal(t, connections[1], connection)
assert.Equal(t, 5, len(connections))
})
t.Run("add connection fails returns alternative connection", func(t *testing.T) {
connections := make([]*AmqpConnection, 0)
connections = append(connections, newActiveConnection())
connections = append(connections, nil)
connections = append(connections, newActiveConnection())
connections = append(connections, newActiveConnection())
connections = append(connections, newActiveConnection())
connectionProvider := &connectionProviderMock{}
dialer := &amqpDialMock{}
var c *amqpConnMock = nil
dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(c, fmt.Errorf("dial error"))
connection := &AmqpConnection{
ConnectionName: "test",
Lock: sync.RWMutex{},
Dialer: dialer,
}
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection)
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 0,
Lock: sync.RWMutex{},
Connections: connections,
ConnectionProvider: connectionProvider,
}
connection, err := pool.GetConnection(&ConnectionPoolHandle{ConnectionOffset: 1})
assert.NoError(t, err)
assert.NotNil(t, connection)
assert.Nil(t, connections[1])
assert.Equal(t, connections[2], connection)
assert.Equal(t, 5, len(connections))
})
t.Run("add connection fails", func(t *testing.T) {
connections := make([]*AmqpConnection, 0)
connections = append(connections, nil)
connections = append(connections, nil)
connections = append(connections, nil)
connections = append(connections, nil)
connections = append(connections, nil)
connectionProvider := &connectionProviderMock{}
dialer := &amqpDialMock{}
var c *amqpConnMock = nil
dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(c, fmt.Errorf("dial error"))
connection := &AmqpConnection{
ConnectionName: "test",
Lock: sync.RWMutex{},
Dialer: dialer,
}
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection)
pool := AmqpConnectionPool{
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
HandleOffset: 0,
Lock: sync.RWMutex{},
Connections: connections,
ConnectionProvider: connectionProvider,
}
connection, err := pool.GetConnection(&ConnectionPoolHandle{ConnectionOffset: 1})
assert.EqualError(t, err, "renew connection: new internal connection: internal connect: dial: dial error")
assert.Nil(t, connection)
assert.Equal(t, 5, len(connections))
})
}