diff --git a/audit/messaging/amqp_connection_pool.go b/audit/messaging/amqp_connection_pool.go index 7b38e15..fec9805 100644 --- a/audit/messaging/amqp_connection_pool.go +++ b/audit/messaging/amqp_connection_pool.go @@ -38,7 +38,18 @@ type ConnectionPoolHandle struct { connectionOffset int } +func NewDefaultAmqpConnectionPool(config AmqpConnectionConfig, connectionName string) (ConnectionPool, error) { + poolConfig := AmqpConnectionPoolConfig{ + Parameters: config, + PoolSize: 1, + } + return NewAmqpConnectionPool(poolConfig, connectionName) +} + func NewAmqpConnectionPool(config AmqpConnectionPoolConfig, connectionName string) (ConnectionPool, error) { + if config.PoolSize == 0 { + config.PoolSize = 1 + } pool := &AmqpConnectionPool{ config: config, connectionName: connectionName, diff --git a/audit/messaging/messaging.go b/audit/messaging/messaging.go index 7e9b131..d3c6cfa 100644 --- a/audit/messaging/messaging.go +++ b/audit/messaging/messaging.go @@ -34,7 +34,6 @@ type Api interface { // AmqpApi implements Api. type AmqpApi struct { - config AmqpConnectionPoolConfig connection *AmqpConnection connectionPool ConnectionPool connectionPoolHandle *ConnectionPoolHandle @@ -44,13 +43,29 @@ type AmqpApi struct { var _ Api = &AmqpApi{} +func NewDefaultAmqpApi(amqpConfig AmqpConnectionConfig) (Api, error) { + connectionPool, err := NewDefaultAmqpConnectionPool(amqpConfig, "sdk") + if err != nil { + return nil, fmt.Errorf("new amqp connection pool: %w", err) + } + + amqpApi := &AmqpApi{ + connectionPool: connectionPool, + connectionPoolHandle: connectionPool.NewHandle(), + senderCache: make(map[string]*AmqpSenderSession), + } + + var messagingApi Api = amqpApi + return messagingApi, nil +} + func NewAmqpApi(amqpConfig AmqpConnectionPoolConfig) (Api, error) { connectionPool, err := NewAmqpConnectionPool(amqpConfig, "sdk") if err != nil { return nil, fmt.Errorf("new amqp connection pool: %w", err) } - amqpApi := &AmqpApi{config: amqpConfig, + amqpApi := &AmqpApi{ connectionPool: connectionPool, connectionPoolHandle: connectionPool.NewHandle(), senderCache: make(map[string]*AmqpSenderSession), diff --git a/audit/messaging/messaging_test.go b/audit/messaging/messaging_test.go index 1174266..b70e43a 100644 --- a/audit/messaging/messaging_test.go +++ b/audit/messaging/messaging_test.go @@ -72,10 +72,7 @@ func Test_AmqpMessagingApi_Send(t *testing.T) { topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-send-successfully") assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName)) - api, err := NewAmqpApi(AmqpConnectionPoolConfig{ - Parameters: AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString}, - PoolSize: 1, - }) + api, err := NewDefaultAmqpApi(AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString}) assert.NoError(t, err) data := []byte("data") @@ -141,7 +138,7 @@ func Test_AmqpMessagingApi_Send_Special_Cases(t *testing.T) { pool := &connectionPoolMock{} pool.On("GetConnection", mock.Anything).Return(connection, nil) - amqpApi := &AmqpApi{config: AmqpConnectionPoolConfig{}, + amqpApi := &AmqpApi{ connectionPool: pool, connectionPoolHandle: &ConnectionPoolHandle{connectionOffset: 0}, senderCache: make(map[string]*AmqpSenderSession), @@ -171,7 +168,7 @@ func Test_AmqpMessagingApi_Send_Special_Cases(t *testing.T) { closedConnection := newClosedConnection() closedConnMock := closedConnection.conn.(*amqpConnMock) - amqpApi := &AmqpApi{config: AmqpConnectionPoolConfig{}, + amqpApi := &AmqpApi{ connection: closedConnection, connectionPool: pool, connectionPoolHandle: &ConnectionPoolHandle{connectionOffset: 0}, @@ -193,7 +190,7 @@ func Test_AmqpMessagingApi_Send_Special_Cases(t *testing.T) { pool := &connectionPoolMock{} pool.On("GetConnection", mock.Anything).Return(connection, errors.New("connection error")) - amqpApi := &AmqpApi{config: AmqpConnectionPoolConfig{}, + amqpApi := &AmqpApi{ connectionPool: pool, connectionPoolHandle: &ConnectionPoolHandle{connectionOffset: 0}, senderCache: make(map[string]*AmqpSenderSession), @@ -216,7 +213,7 @@ func Test_AmqpMessagingApi_Send_Special_Cases(t *testing.T) { conn := connection.conn.(*amqpConnMock) conn.On("NewSession", mock.Anything, mock.Anything).Return(session, nil) - amqpApi := &AmqpApi{config: AmqpConnectionPoolConfig{}, + amqpApi := &AmqpApi{ connection: connection, senderCache: make(map[string]*AmqpSenderSession), } @@ -239,7 +236,7 @@ func Test_AmqpMessagingApi_Send_Special_Cases(t *testing.T) { conn := connection.conn.(*amqpConnMock) conn.On("NewSession", mock.Anything, mock.Anything).Return(session, nil) - amqpApi := &AmqpApi{config: AmqpConnectionPoolConfig{}, + amqpApi := &AmqpApi{ connection: connection, senderCache: make(map[string]*AmqpSenderSession), } @@ -256,7 +253,7 @@ func Test_AmqpMessagingApi_Send_Special_Cases(t *testing.T) { sender.On("Send", mock.Anything, mock.Anything, mock.Anything).Return(nil) topic := "topic://some-topic" - amqpApi := &AmqpApi{config: AmqpConnectionPoolConfig{}, + amqpApi := &AmqpApi{ connection: newActiveConnection(), senderCache: map[string]*AmqpSenderSession{topic: {sender: sender}}, } @@ -277,7 +274,7 @@ func Test_AmqpMessagingApi_Send_Special_Cases(t *testing.T) { topic := "topic://some-topic" connection := newActiveConnection() connection.conn.(*amqpConnMock).On("NewSession", mock.Anything, mock.Anything, mock.Anything).Return(session, nil) - amqpApi := &AmqpApi{config: AmqpConnectionPoolConfig{}, + amqpApi := &AmqpApi{ connection: connection, senderCache: map[string]*AmqpSenderSession{topic: {sender: sender}}, } @@ -295,7 +292,7 @@ func Test_AmqpMessagingApi_Close(t *testing.T) { pool := &connectionPoolMock{} pool.On("Close").Return(nil) - amqpApi := &AmqpApi{config: AmqpConnectionPoolConfig{}, + amqpApi := &AmqpApi{ connectionPool: pool, connectionPoolHandle: &ConnectionPoolHandle{connectionOffset: 0}, senderCache: make(map[string]*AmqpSenderSession), @@ -311,7 +308,7 @@ func Test_AmqpMessagingApi_Close(t *testing.T) { pool := &connectionPoolMock{} pool.On("Close").Return(errors.New("close error")) - amqpApi := &AmqpApi{config: AmqpConnectionPoolConfig{}, + amqpApi := &AmqpApi{ connectionPool: pool, connectionPoolHandle: &ConnectionPoolHandle{connectionOffset: 0}, senderCache: make(map[string]*AmqpSenderSession), @@ -336,7 +333,7 @@ func Test_AmqpMessagingApi_Close(t *testing.T) { sender: sender, } - amqpApi := &AmqpApi{config: AmqpConnectionPoolConfig{}, + amqpApi := &AmqpApi{ connectionPool: pool, connectionPoolHandle: &ConnectionPoolHandle{connectionOffset: 0}, senderCache: map[string]*AmqpSenderSession{"key": senderSession}, @@ -364,7 +361,7 @@ func Test_AmqpMessagingApi_Close(t *testing.T) { sender: sender, } - amqpApi := &AmqpApi{config: AmqpConnectionPoolConfig{}, + amqpApi := &AmqpApi{ connectionPool: pool, connectionPoolHandle: &ConnectionPoolHandle{connectionOffset: 0}, senderCache: map[string]*AmqpSenderSession{"key": senderSession}, @@ -392,7 +389,7 @@ func Test_AmqpMessagingApi_Close(t *testing.T) { sender: sender, } - amqpApi := &AmqpApi{config: AmqpConnectionPoolConfig{}, + amqpApi := &AmqpApi{ connectionPool: pool, connectionPoolHandle: &ConnectionPoolHandle{connectionOffset: 0}, senderCache: map[string]*AmqpSenderSession{"key": senderSession},