package messaging import ( "errors" "fmt" "sync" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" pkgMessagingCommon "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/messaging/common" ) type connectionProviderMock struct { mock.Mock } func (p *connectionProviderMock) NewAmqpConnection(config pkgMessagingCommon.AmqpConnectionConfig, connectionName string) *AmqpConnection { args := p.Called(config, connectionName) return args.Get(0).(*AmqpConnection) } var _ connectionProvider = (*connectionProviderMock)(nil) func Test_AmqpConnectionPool_GetHandle(t *testing.T) { t.Run("next handle", func(t *testing.T) { pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 0, Lock: sync.RWMutex{}, } handle := pool.NewHandle() assert.NotNil(t, handle) assert.Equal(t, 0, handle.ConnectionOffset) assert.Equal(t, 1, pool.HandleOffset) }) t.Run("next handle high offset", func(t *testing.T) { pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 13, Lock: sync.RWMutex{}, } handle := pool.NewHandle() assert.NotNil(t, handle) assert.Equal(t, 3, handle.ConnectionOffset) assert.Equal(t, 14, pool.HandleOffset) }) } func Test_AmqpConnectionPool_internalAddConnection(t *testing.T) { t.Run("internal add connection", func(t *testing.T) { conn := &amqpConnMock{} dialer := &amqpDialMock{} dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(conn, nil) connection := &AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, Dialer: dialer, } connectionProvider := &connectionProviderMock{} connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection) pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 0, Lock: sync.RWMutex{}, ConnectionProvider: connectionProvider, } err := pool.internalAddConnection() assert.NoError(t, err) assert.Equal(t, 1, len(pool.Connections)) connectionProvider.AssertNumberOfCalls(t, "NewAmqpConnection", 1) dialer.AssertNumberOfCalls(t, "Dial", 1) }) t.Run("dialer error", func(t *testing.T) { conn := &amqpConnMock{} dialer := &amqpDialMock{} var c *amqpConnMock = nil dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(c, errors.New("test error")).Once() dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(conn, nil) connection := &AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, Dialer: dialer, } connectionProvider := &connectionProviderMock{} connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection) pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 0, Lock: sync.RWMutex{}, ConnectionProvider: connectionProvider, } err := pool.internalAddConnection() assert.NoError(t, err) assert.Equal(t, 1, len(pool.Connections)) connectionProvider.AssertNumberOfCalls(t, "NewAmqpConnection", 1) dialer.AssertNumberOfCalls(t, "Dial", 2) }) t.Run("repetitive dialer error", func(t *testing.T) { dialer := &amqpDialMock{} var c *amqpConnMock = nil dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(c, errors.New("test error")) connection := &AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, Dialer: dialer, } connectionProvider := &connectionProviderMock{} connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection) pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 0, Lock: sync.RWMutex{}, ConnectionProvider: connectionProvider, } err := pool.internalAddConnection() assert.EqualError(t, err, "new connection: new internal connection: internal connect: dial: test error") assert.Equal(t, 0, len(pool.Connections)) connectionProvider.AssertNumberOfCalls(t, "NewAmqpConnection", 1) dialer.AssertNumberOfCalls(t, "Dial", 2) }) } func Test_AmqpConnectionPool_initializeConnections(t *testing.T) { t.Run("initialize connections successfully", func(t *testing.T) { conn := &amqpConnMock{} dialer := &amqpDialMock{} dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(conn, nil) connection := &AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, Dialer: dialer, } connectionProvider := &connectionProviderMock{} connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection) pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 0, Lock: sync.RWMutex{}, ConnectionProvider: connectionProvider, } err := pool.initializeConnections() assert.NoError(t, err) assert.Equal(t, 5, len(pool.Connections)) connectionProvider.AssertNumberOfCalls(t, "NewAmqpConnection", 5) }) t.Run("fail initialization of connections", func(t *testing.T) { var c *amqpConnMock = nil failingDialer := &amqpDialMock{} failingDialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(c, errors.New("test error")) failingConnection := &AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, Dialer: failingDialer, } conn := &amqpConnMock{} successfulDialer := &amqpDialMock{} successfulDialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(conn, nil) successfulConnection := &AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, Dialer: successfulDialer, } connectionProvider := &connectionProviderMock{} connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(successfulConnection).Times(4) connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(failingConnection) pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 0, Lock: sync.RWMutex{}, ConnectionProvider: connectionProvider, } err := pool.initializeConnections() assert.EqualError(t, err, "new connection: new internal connection: internal connect: dial: test error") assert.Equal(t, 4, len(pool.Connections)) connectionProvider.AssertNumberOfCalls(t, "NewAmqpConnection", 5) }) } func Test_AmqpConnectionPool_Close(t *testing.T) { t.Run("close connection successfully", func(t *testing.T) { // add 5 connections to the pool conn := &amqpConnMock{} conn.On("Close").Return(nil) dialer := &amqpDialMock{} dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(conn, nil) connection := &AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, Dialer: dialer, } connectionProvider := &connectionProviderMock{} connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection) pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 0, Lock: sync.RWMutex{}, ConnectionProvider: connectionProvider, } err := pool.initializeConnections() assert.NoError(t, err) assert.Equal(t, 5, len(pool.Connections)) // close the pool err = pool.Close() assert.NoError(t, err) assert.Equal(t, 5, len(pool.Connections)) for _, c := range pool.Connections { assert.Nil(t, c) } }) t.Run("close connection fail", func(t *testing.T) { // add 5 connections to the pool failingConn := &amqpConnMock{} failingConn.On("Close").Return(errors.New("test error")) failingDialer := &amqpDialMock{} failingDialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(failingConn, nil) failingConnection := &AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, Dialer: failingDialer, } successfulConn := &amqpConnMock{} successfulConn.On("Close").Return(nil) successfulDialer := &amqpDialMock{} successfulDialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(successfulConn, nil) successfulConnection := &AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, Dialer: successfulDialer, } connectionProvider := &connectionProviderMock{} connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(successfulConnection).Times(2) connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(failingConnection).Times(2) connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(successfulConnection).Times(1) pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 0, Lock: sync.RWMutex{}, ConnectionProvider: connectionProvider, } err := pool.initializeConnections() assert.NoError(t, err) assert.Equal(t, 5, len(pool.Connections)) // close the pool err = pool.Close() assert.EqualError(t, err, "pooled connection: internal close: connection close: test error\npooled connection: internal close: connection close: test error") assert.Equal(t, 5, len(pool.Connections)) for _, c := range pool.Connections { assert.Nil(t, c) } }) } func Test_AmqpConnectionPool_nextConnectionForHandle(t *testing.T) { channelReceiver := func(channel chan struct{}) <-chan struct{} { return channel } newActiveConnection := func() *AmqpConnection { channel := make(chan struct{}) conn := &amqpConnMock{} conn.On("Done", mock.Anything).Return(channelReceiver(channel)) return &AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, Conn: conn, } } newClosedConnection := func() *AmqpConnection { channel := make(chan struct{}) close(channel) conn := &amqpConnMock{} conn.On("Done", mock.Anything).Return(channelReceiver(channel)) return &AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, Conn: conn, } } t.Run("next connection for requested handle", func(t *testing.T) { connections := make([]*AmqpConnection, 0) for i := 0; i < 5; i++ { connections = append(connections, newActiveConnection()) } pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 0, Lock: sync.RWMutex{}, Connections: connections, } connection, addConnection := pool.nextConnectionForHandle(&ConnectionPoolHandle{ConnectionOffset: 1}) assert.NotNil(t, connection) assert.False(t, addConnection) }) t.Run("nil connection for requested handle", func(t *testing.T) { connections := make([]*AmqpConnection, 0) connections = append(connections, newActiveConnection()) connections = append(connections, nil) connections = append(connections, nil) connections = append(connections, newActiveConnection()) connections = append(connections, newActiveConnection()) pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 0, Lock: sync.RWMutex{}, Connections: connections, } connection, addConnection := pool.nextConnectionForHandle(&ConnectionPoolHandle{ConnectionOffset: 1}) assert.NotNil(t, connection) assert.True(t, addConnection) }) t.Run("closed connection for requested handle", func(t *testing.T) { connections := make([]*AmqpConnection, 0) connections = append(connections, newActiveConnection()) connections = append(connections, newClosedConnection()) connections = append(connections, newClosedConnection()) connections = append(connections, newActiveConnection()) connections = append(connections, newActiveConnection()) pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 0, Lock: sync.RWMutex{}, Connections: connections, } connection, addConnection := pool.nextConnectionForHandle(&ConnectionPoolHandle{ConnectionOffset: 1}) assert.NotNil(t, connection) assert.True(t, addConnection) }) t.Run("no connection for requested handle", func(t *testing.T) { connections := make([]*AmqpConnection, 0) connections = append(connections, nil) connections = append(connections, nil) connections = append(connections, nil) connections = append(connections, nil) connections = append(connections, nil) pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 0, Lock: sync.RWMutex{}, Connections: connections, } connection, addConnection := pool.nextConnectionForHandle(&ConnectionPoolHandle{ConnectionOffset: 1}) assert.Nil(t, connection) assert.True(t, addConnection) }) t.Run("connection for requested handle with large index", func(t *testing.T) { connections := make([]*AmqpConnection, 0) connections = append(connections, nil) connections = append(connections, nil) connections = append(connections, nil) connections = append(connections, newActiveConnection()) connections = append(connections, nil) pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 0, Lock: sync.RWMutex{}, Connections: connections, } connection, addConnection := pool.nextConnectionForHandle(&ConnectionPoolHandle{ConnectionOffset: 23}) assert.NotNil(t, connection) assert.False(t, addConnection) }) t.Run("connection for requested handle nil with large index", func(t *testing.T) { connections := make([]*AmqpConnection, 0) connections = append(connections, nil) connections = append(connections, nil) connections = append(connections, nil) connections = append(connections, nil) connections = append(connections, newActiveConnection()) pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 0, Lock: sync.RWMutex{}, Connections: connections, } connection, addConnection := pool.nextConnectionForHandle(&ConnectionPoolHandle{ConnectionOffset: 23}) assert.NotNil(t, connection) assert.True(t, addConnection) }) } func Test_AmqpConnectionPool_GetConnection(t *testing.T) { channelReceiver := func(channel chan struct{}) <-chan struct{} { return channel } newActiveConnection := func() *AmqpConnection { channel := make(chan struct{}) conn := &amqpConnMock{} conn.On("Done", mock.Anything).Return(channelReceiver(channel)) return &AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, Conn: conn, } } t.Run("get connection for requested handle", func(t *testing.T) { connections := make([]*AmqpConnection, 0) for i := 0; i < 5; i++ { connections = append(connections, newActiveConnection()) } pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 0, Lock: sync.RWMutex{}, Connections: connections, } connection, err := pool.GetConnection(&ConnectionPoolHandle{ConnectionOffset: 1}) assert.NoError(t, err) assert.NotNil(t, connection) assert.Equal(t, connections[1], connection) assert.Equal(t, 5, len(connections)) }) t.Run("add connection if missing", func(t *testing.T) { connections := make([]*AmqpConnection, 5) connectionProvider := &connectionProviderMock{} connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(newActiveConnection()) pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 0, Lock: sync.RWMutex{}, Connections: connections, ConnectionProvider: connectionProvider, } connection, err := pool.GetConnection(&ConnectionPoolHandle{ConnectionOffset: 1}) assert.NoError(t, err) assert.NotNil(t, connection) assert.Equal(t, connections[1], connection) assert.Equal(t, 5, len(connections)) }) t.Run("add connection fails returns alternative connection", func(t *testing.T) { connections := make([]*AmqpConnection, 0) connections = append(connections, newActiveConnection()) connections = append(connections, nil) connections = append(connections, newActiveConnection()) connections = append(connections, newActiveConnection()) connections = append(connections, newActiveConnection()) connectionProvider := &connectionProviderMock{} dialer := &amqpDialMock{} var c *amqpConnMock = nil dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(c, fmt.Errorf("dial error")) connection := &AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, Dialer: dialer, } connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection) pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 0, Lock: sync.RWMutex{}, Connections: connections, ConnectionProvider: connectionProvider, } connection, err := pool.GetConnection(&ConnectionPoolHandle{ConnectionOffset: 1}) assert.NoError(t, err) assert.NotNil(t, connection) assert.Nil(t, connections[1]) assert.Equal(t, connections[2], connection) assert.Equal(t, 5, len(connections)) }) t.Run("add connection fails", func(t *testing.T) { connections := make([]*AmqpConnection, 0) connections = append(connections, nil) connections = append(connections, nil) connections = append(connections, nil) connections = append(connections, nil) connections = append(connections, nil) connectionProvider := &connectionProviderMock{} dialer := &amqpDialMock{} var c *amqpConnMock = nil dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(c, fmt.Errorf("dial error")) connection := &AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, Dialer: dialer, } connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection) pool := AmqpConnectionPool{ Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5}, HandleOffset: 0, Lock: sync.RWMutex{}, Connections: connections, ConnectionProvider: connectionProvider, } connection, err := pool.GetConnection(&ConnectionPoolHandle{ConnectionOffset: 1}) assert.EqualError(t, err, "renew connection: new internal connection: internal connect: dial: dial error") assert.Nil(t, connection) assert.Equal(t, 5, len(connections)) }) }