mirror of
https://dev.azure.com/schwarzit/schwarzit.stackit-public/_git/audit-go
synced 2026-02-07 16:47:24 +00:00
581 lines
19 KiB
Go
581 lines
19 KiB
Go
package messaging
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/mock"
|
|
|
|
pkgMessagingCommon "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/messaging/common"
|
|
)
|
|
|
|
type connectionProviderMock struct {
|
|
mock.Mock
|
|
}
|
|
|
|
func (p *connectionProviderMock) NewAmqpConnection(config pkgMessagingCommon.AmqpConnectionConfig, connectionName string) *AmqpConnection {
|
|
args := p.Called(config, connectionName)
|
|
return args.Get(0).(*AmqpConnection)
|
|
}
|
|
|
|
var _ connectionProvider = (*connectionProviderMock)(nil)
|
|
|
|
func Test_AmqpConnectionPool_GetHandle(t *testing.T) {
|
|
|
|
t.Run("next handle", func(t *testing.T) {
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 0,
|
|
Lock: sync.RWMutex{},
|
|
}
|
|
|
|
handle := pool.NewHandle()
|
|
assert.NotNil(t, handle)
|
|
assert.Equal(t, 0, handle.ConnectionOffset)
|
|
assert.Equal(t, 1, pool.HandleOffset)
|
|
})
|
|
|
|
t.Run("next handle high offset", func(t *testing.T) {
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 13,
|
|
Lock: sync.RWMutex{},
|
|
}
|
|
|
|
handle := pool.NewHandle()
|
|
assert.NotNil(t, handle)
|
|
assert.Equal(t, 3, handle.ConnectionOffset)
|
|
assert.Equal(t, 14, pool.HandleOffset)
|
|
})
|
|
}
|
|
|
|
func Test_AmqpConnectionPool_internalAddConnection(t *testing.T) {
|
|
|
|
t.Run("internal add connection", func(t *testing.T) {
|
|
conn := &amqpConnMock{}
|
|
|
|
dialer := &amqpDialMock{}
|
|
dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(conn, nil)
|
|
|
|
connection := &AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
Dialer: dialer,
|
|
}
|
|
|
|
connectionProvider := &connectionProviderMock{}
|
|
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection)
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 0,
|
|
Lock: sync.RWMutex{},
|
|
ConnectionProvider: connectionProvider,
|
|
}
|
|
|
|
err := pool.internalAddConnection()
|
|
assert.NoError(t, err)
|
|
|
|
assert.Equal(t, 1, len(pool.Connections))
|
|
connectionProvider.AssertNumberOfCalls(t, "NewAmqpConnection", 1)
|
|
dialer.AssertNumberOfCalls(t, "Dial", 1)
|
|
})
|
|
|
|
t.Run("dialer error", func(t *testing.T) {
|
|
conn := &amqpConnMock{}
|
|
|
|
dialer := &amqpDialMock{}
|
|
var c *amqpConnMock = nil
|
|
dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(c, errors.New("test error")).Once()
|
|
dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(conn, nil)
|
|
|
|
connection := &AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
Dialer: dialer,
|
|
}
|
|
|
|
connectionProvider := &connectionProviderMock{}
|
|
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection)
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 0,
|
|
Lock: sync.RWMutex{},
|
|
ConnectionProvider: connectionProvider,
|
|
}
|
|
|
|
err := pool.internalAddConnection()
|
|
assert.NoError(t, err)
|
|
|
|
assert.Equal(t, 1, len(pool.Connections))
|
|
connectionProvider.AssertNumberOfCalls(t, "NewAmqpConnection", 1)
|
|
dialer.AssertNumberOfCalls(t, "Dial", 2)
|
|
})
|
|
|
|
t.Run("repetitive dialer error", func(t *testing.T) {
|
|
dialer := &amqpDialMock{}
|
|
var c *amqpConnMock = nil
|
|
dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(c, errors.New("test error"))
|
|
|
|
connection := &AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
Dialer: dialer,
|
|
}
|
|
|
|
connectionProvider := &connectionProviderMock{}
|
|
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection)
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 0,
|
|
Lock: sync.RWMutex{},
|
|
ConnectionProvider: connectionProvider,
|
|
}
|
|
|
|
err := pool.internalAddConnection()
|
|
assert.EqualError(t, err, "new connection: new internal connection: internal connect: dial: test error")
|
|
|
|
assert.Equal(t, 0, len(pool.Connections))
|
|
connectionProvider.AssertNumberOfCalls(t, "NewAmqpConnection", 1)
|
|
dialer.AssertNumberOfCalls(t, "Dial", 2)
|
|
})
|
|
}
|
|
|
|
func Test_AmqpConnectionPool_initializeConnections(t *testing.T) {
|
|
|
|
t.Run("initialize connections successfully", func(t *testing.T) {
|
|
|
|
conn := &amqpConnMock{}
|
|
dialer := &amqpDialMock{}
|
|
dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(conn, nil)
|
|
|
|
connection := &AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
Dialer: dialer,
|
|
}
|
|
|
|
connectionProvider := &connectionProviderMock{}
|
|
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection)
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 0,
|
|
Lock: sync.RWMutex{},
|
|
ConnectionProvider: connectionProvider,
|
|
}
|
|
|
|
err := pool.initializeConnections()
|
|
assert.NoError(t, err)
|
|
|
|
assert.Equal(t, 5, len(pool.Connections))
|
|
connectionProvider.AssertNumberOfCalls(t, "NewAmqpConnection", 5)
|
|
})
|
|
|
|
t.Run("fail initialization of connections", func(t *testing.T) {
|
|
|
|
var c *amqpConnMock = nil
|
|
failingDialer := &amqpDialMock{}
|
|
failingDialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(c, errors.New("test error"))
|
|
|
|
failingConnection := &AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
Dialer: failingDialer,
|
|
}
|
|
|
|
conn := &amqpConnMock{}
|
|
successfulDialer := &amqpDialMock{}
|
|
successfulDialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(conn, nil)
|
|
|
|
successfulConnection := &AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
Dialer: successfulDialer,
|
|
}
|
|
|
|
connectionProvider := &connectionProviderMock{}
|
|
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(successfulConnection).Times(4)
|
|
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(failingConnection)
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 0,
|
|
Lock: sync.RWMutex{},
|
|
ConnectionProvider: connectionProvider,
|
|
}
|
|
|
|
err := pool.initializeConnections()
|
|
assert.EqualError(t, err, "new connection: new internal connection: internal connect: dial: test error")
|
|
|
|
assert.Equal(t, 4, len(pool.Connections))
|
|
connectionProvider.AssertNumberOfCalls(t, "NewAmqpConnection", 5)
|
|
})
|
|
}
|
|
|
|
func Test_AmqpConnectionPool_Close(t *testing.T) {
|
|
|
|
t.Run("close connection successfully", func(t *testing.T) {
|
|
// add 5 connections to the pool
|
|
conn := &amqpConnMock{}
|
|
conn.On("Close").Return(nil)
|
|
|
|
dialer := &amqpDialMock{}
|
|
dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(conn, nil)
|
|
|
|
connection := &AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
Dialer: dialer,
|
|
}
|
|
|
|
connectionProvider := &connectionProviderMock{}
|
|
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection)
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 0,
|
|
Lock: sync.RWMutex{},
|
|
ConnectionProvider: connectionProvider,
|
|
}
|
|
|
|
err := pool.initializeConnections()
|
|
assert.NoError(t, err)
|
|
|
|
assert.Equal(t, 5, len(pool.Connections))
|
|
|
|
// close the pool
|
|
err = pool.Close()
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, 5, len(pool.Connections))
|
|
for _, c := range pool.Connections {
|
|
assert.Nil(t, c)
|
|
}
|
|
})
|
|
|
|
t.Run("close connection fail", func(t *testing.T) {
|
|
// add 5 connections to the pool
|
|
failingConn := &amqpConnMock{}
|
|
failingConn.On("Close").Return(errors.New("test error"))
|
|
|
|
failingDialer := &amqpDialMock{}
|
|
failingDialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(failingConn, nil)
|
|
|
|
failingConnection := &AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
Dialer: failingDialer,
|
|
}
|
|
|
|
successfulConn := &amqpConnMock{}
|
|
successfulConn.On("Close").Return(nil)
|
|
successfulDialer := &amqpDialMock{}
|
|
successfulDialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(successfulConn, nil)
|
|
|
|
successfulConnection := &AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
Dialer: successfulDialer,
|
|
}
|
|
|
|
connectionProvider := &connectionProviderMock{}
|
|
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(successfulConnection).Times(2)
|
|
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(failingConnection).Times(2)
|
|
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(successfulConnection).Times(1)
|
|
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 0,
|
|
Lock: sync.RWMutex{},
|
|
ConnectionProvider: connectionProvider,
|
|
}
|
|
|
|
err := pool.initializeConnections()
|
|
assert.NoError(t, err)
|
|
|
|
assert.Equal(t, 5, len(pool.Connections))
|
|
|
|
// close the pool
|
|
err = pool.Close()
|
|
assert.EqualError(t, err, "pooled connection: internal close: connection close: test error\npooled connection: internal close: connection close: test error")
|
|
assert.Equal(t, 5, len(pool.Connections))
|
|
for _, c := range pool.Connections {
|
|
assert.Nil(t, c)
|
|
}
|
|
})
|
|
}
|
|
|
|
func Test_AmqpConnectionPool_nextConnectionForHandle(t *testing.T) {
|
|
channelReceiver := func(channel chan struct{}) <-chan struct{} {
|
|
return channel
|
|
}
|
|
|
|
newActiveConnection := func() *AmqpConnection {
|
|
channel := make(chan struct{})
|
|
conn := &amqpConnMock{}
|
|
conn.On("Done", mock.Anything).Return(channelReceiver(channel))
|
|
return &AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
Conn: conn,
|
|
}
|
|
}
|
|
|
|
newClosedConnection := func() *AmqpConnection {
|
|
channel := make(chan struct{})
|
|
close(channel)
|
|
|
|
conn := &amqpConnMock{}
|
|
conn.On("Done", mock.Anything).Return(channelReceiver(channel))
|
|
return &AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
Conn: conn,
|
|
}
|
|
}
|
|
|
|
t.Run("next connection for requested handle", func(t *testing.T) {
|
|
connections := make([]*AmqpConnection, 0)
|
|
for i := 0; i < 5; i++ {
|
|
connections = append(connections, newActiveConnection())
|
|
}
|
|
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 0,
|
|
Lock: sync.RWMutex{},
|
|
Connections: connections,
|
|
}
|
|
|
|
connection, addConnection := pool.nextConnectionForHandle(&ConnectionPoolHandle{ConnectionOffset: 1})
|
|
assert.NotNil(t, connection)
|
|
assert.False(t, addConnection)
|
|
})
|
|
|
|
t.Run("nil connection for requested handle", func(t *testing.T) {
|
|
connections := make([]*AmqpConnection, 0)
|
|
connections = append(connections, newActiveConnection())
|
|
connections = append(connections, nil)
|
|
connections = append(connections, nil)
|
|
connections = append(connections, newActiveConnection())
|
|
connections = append(connections, newActiveConnection())
|
|
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 0,
|
|
Lock: sync.RWMutex{},
|
|
Connections: connections,
|
|
}
|
|
|
|
connection, addConnection := pool.nextConnectionForHandle(&ConnectionPoolHandle{ConnectionOffset: 1})
|
|
assert.NotNil(t, connection)
|
|
assert.True(t, addConnection)
|
|
})
|
|
|
|
t.Run("closed connection for requested handle", func(t *testing.T) {
|
|
connections := make([]*AmqpConnection, 0)
|
|
connections = append(connections, newActiveConnection())
|
|
connections = append(connections, newClosedConnection())
|
|
connections = append(connections, newClosedConnection())
|
|
connections = append(connections, newActiveConnection())
|
|
connections = append(connections, newActiveConnection())
|
|
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 0,
|
|
Lock: sync.RWMutex{},
|
|
Connections: connections,
|
|
}
|
|
|
|
connection, addConnection := pool.nextConnectionForHandle(&ConnectionPoolHandle{ConnectionOffset: 1})
|
|
assert.NotNil(t, connection)
|
|
assert.True(t, addConnection)
|
|
})
|
|
|
|
t.Run("no connection for requested handle", func(t *testing.T) {
|
|
connections := make([]*AmqpConnection, 0)
|
|
connections = append(connections, nil)
|
|
connections = append(connections, nil)
|
|
connections = append(connections, nil)
|
|
connections = append(connections, nil)
|
|
connections = append(connections, nil)
|
|
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 0,
|
|
Lock: sync.RWMutex{},
|
|
Connections: connections,
|
|
}
|
|
|
|
connection, addConnection := pool.nextConnectionForHandle(&ConnectionPoolHandle{ConnectionOffset: 1})
|
|
assert.Nil(t, connection)
|
|
assert.True(t, addConnection)
|
|
})
|
|
|
|
t.Run("connection for requested handle with large index", func(t *testing.T) {
|
|
connections := make([]*AmqpConnection, 0)
|
|
connections = append(connections, nil)
|
|
connections = append(connections, nil)
|
|
connections = append(connections, nil)
|
|
connections = append(connections, newActiveConnection())
|
|
connections = append(connections, nil)
|
|
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 0,
|
|
Lock: sync.RWMutex{},
|
|
Connections: connections,
|
|
}
|
|
|
|
connection, addConnection := pool.nextConnectionForHandle(&ConnectionPoolHandle{ConnectionOffset: 23})
|
|
assert.NotNil(t, connection)
|
|
assert.False(t, addConnection)
|
|
})
|
|
|
|
t.Run("connection for requested handle nil with large index", func(t *testing.T) {
|
|
connections := make([]*AmqpConnection, 0)
|
|
connections = append(connections, nil)
|
|
connections = append(connections, nil)
|
|
connections = append(connections, nil)
|
|
connections = append(connections, nil)
|
|
connections = append(connections, newActiveConnection())
|
|
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 0,
|
|
Lock: sync.RWMutex{},
|
|
Connections: connections,
|
|
}
|
|
|
|
connection, addConnection := pool.nextConnectionForHandle(&ConnectionPoolHandle{ConnectionOffset: 23})
|
|
assert.NotNil(t, connection)
|
|
assert.True(t, addConnection)
|
|
})
|
|
}
|
|
|
|
func Test_AmqpConnectionPool_GetConnection(t *testing.T) {
|
|
channelReceiver := func(channel chan struct{}) <-chan struct{} {
|
|
return channel
|
|
}
|
|
|
|
newActiveConnection := func() *AmqpConnection {
|
|
channel := make(chan struct{})
|
|
conn := &amqpConnMock{}
|
|
conn.On("Done", mock.Anything).Return(channelReceiver(channel))
|
|
return &AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
Conn: conn,
|
|
}
|
|
}
|
|
|
|
t.Run("get connection for requested handle", func(t *testing.T) {
|
|
connections := make([]*AmqpConnection, 0)
|
|
for i := 0; i < 5; i++ {
|
|
connections = append(connections, newActiveConnection())
|
|
}
|
|
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 0,
|
|
Lock: sync.RWMutex{},
|
|
Connections: connections,
|
|
}
|
|
|
|
connection, err := pool.GetConnection(&ConnectionPoolHandle{ConnectionOffset: 1})
|
|
assert.NoError(t, err)
|
|
assert.NotNil(t, connection)
|
|
assert.Equal(t, connections[1], connection)
|
|
assert.Equal(t, 5, len(connections))
|
|
})
|
|
|
|
t.Run("add connection if missing", func(t *testing.T) {
|
|
connections := make([]*AmqpConnection, 5)
|
|
|
|
connectionProvider := &connectionProviderMock{}
|
|
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(newActiveConnection())
|
|
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 0,
|
|
Lock: sync.RWMutex{},
|
|
Connections: connections,
|
|
ConnectionProvider: connectionProvider,
|
|
}
|
|
|
|
connection, err := pool.GetConnection(&ConnectionPoolHandle{ConnectionOffset: 1})
|
|
assert.NoError(t, err)
|
|
assert.NotNil(t, connection)
|
|
assert.Equal(t, connections[1], connection)
|
|
assert.Equal(t, 5, len(connections))
|
|
})
|
|
|
|
t.Run("add connection fails returns alternative connection", func(t *testing.T) {
|
|
connections := make([]*AmqpConnection, 0)
|
|
connections = append(connections, newActiveConnection())
|
|
connections = append(connections, nil)
|
|
connections = append(connections, newActiveConnection())
|
|
connections = append(connections, newActiveConnection())
|
|
connections = append(connections, newActiveConnection())
|
|
|
|
connectionProvider := &connectionProviderMock{}
|
|
|
|
dialer := &amqpDialMock{}
|
|
var c *amqpConnMock = nil
|
|
dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(c, fmt.Errorf("dial error"))
|
|
connection := &AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
Dialer: dialer,
|
|
}
|
|
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection)
|
|
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 0,
|
|
Lock: sync.RWMutex{},
|
|
Connections: connections,
|
|
ConnectionProvider: connectionProvider,
|
|
}
|
|
|
|
connection, err := pool.GetConnection(&ConnectionPoolHandle{ConnectionOffset: 1})
|
|
assert.NoError(t, err)
|
|
assert.NotNil(t, connection)
|
|
assert.Nil(t, connections[1])
|
|
assert.Equal(t, connections[2], connection)
|
|
assert.Equal(t, 5, len(connections))
|
|
})
|
|
|
|
t.Run("add connection fails", func(t *testing.T) {
|
|
connections := make([]*AmqpConnection, 0)
|
|
connections = append(connections, nil)
|
|
connections = append(connections, nil)
|
|
connections = append(connections, nil)
|
|
connections = append(connections, nil)
|
|
connections = append(connections, nil)
|
|
|
|
connectionProvider := &connectionProviderMock{}
|
|
|
|
dialer := &amqpDialMock{}
|
|
var c *amqpConnMock = nil
|
|
dialer.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(c, fmt.Errorf("dial error"))
|
|
connection := &AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
Dialer: dialer,
|
|
}
|
|
connectionProvider.On("NewAmqpConnection", mock.Anything, mock.Anything).Return(connection)
|
|
|
|
pool := AmqpConnectionPool{
|
|
Config: pkgMessagingCommon.AmqpConnectionPoolConfig{PoolSize: 5},
|
|
HandleOffset: 0,
|
|
Lock: sync.RWMutex{},
|
|
Connections: connections,
|
|
ConnectionProvider: connectionProvider,
|
|
}
|
|
|
|
connection, err := pool.GetConnection(&ConnectionPoolHandle{ConnectionOffset: 1})
|
|
assert.EqualError(t, err, "renew connection: new internal connection: internal connect: dial: dial error")
|
|
assert.Nil(t, connection)
|
|
assert.Equal(t, 5, len(connections))
|
|
})
|
|
}
|