package api import ( "context" "errors" "fmt" "sync" "testing" "time" "github.com/Azure/go-amqp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" internalMessaging "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/internal/messaging" pkgMessagingCommon "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/messaging/common" pkgMessagingTest "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/messaging/test" ) type amqpConnMock struct { mock.Mock } func (m *amqpConnMock) Done() <-chan struct{} { args := m.Called() return args.Get(0).(<-chan struct{}) } func (m *amqpConnMock) NewSession(ctx context.Context, opts *amqp.SessionOptions) (internalMessaging.AmqpSession, error) { args := m.Called(ctx, opts) return args.Get(0).(internalMessaging.AmqpSession), args.Error(1) } func (m *amqpConnMock) Close() error { args := m.Called() return args.Error(0) } var _ internalMessaging.AmqpConn = (*amqpConnMock)(nil) type amqpSenderMock struct { mock.Mock } func (m *amqpSenderMock) Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error { return m.Called(ctx, msg, opts).Error(0) } func (m *amqpSenderMock) Close(ctx context.Context) error { return m.Called(ctx).Error(0) } var _ internalMessaging.AmqpSender = (*amqpSenderMock)(nil) type amqpSessionMock struct { mock.Mock } func (m *amqpSessionMock) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (internalMessaging.AmqpSender, error) { args := m.Called(ctx, target, opts) return args.Get(0).(internalMessaging.AmqpSender), args.Error(1) } func (m *amqpSessionMock) Close(ctx context.Context) error { args := m.Called(ctx) return args.Error(0) } var _ internalMessaging.AmqpSession = (*amqpSessionMock)(nil) type connectionPoolMock struct { mock.Mock } func (m *connectionPoolMock) Close() error { return m.Called().Error(0) } func (m *connectionPoolMock) NewHandle() *internalMessaging.ConnectionPoolHandle { return m.Called().Get(0).(*internalMessaging.ConnectionPoolHandle) } func (m *connectionPoolMock) GetConnection(handle *internalMessaging.ConnectionPoolHandle) (*internalMessaging.AmqpConnection, error) { return m.Called(handle).Get(0).(*internalMessaging.AmqpConnection), m.Called(handle).Error(1) } var _ internalMessaging.ConnectionPool = (*connectionPoolMock)(nil) func Test_NewAmqpMessagingApi(t *testing.T) { _, err := NewAmqpApi( pkgMessagingCommon.AmqpConnectionPoolConfig{ Parameters: pkgMessagingCommon.AmqpConnectionConfig{BrokerUrl: "not-handled-protocol://localhost:5672"}, PoolSize: 1, }) assert.EqualError(t, err, "new amqp connection pool: initialize connections: new connection: new internal connection: internal connect: dial: unsupported scheme \"not-handled-protocol\"") } func Test_AmqpMessagingApi_Send(t *testing.T) { // Specify test timeout ctx, cancelFn := context.WithTimeout(context.Background(), 120*time.Second) defer cancelFn() // Start solace docker container solaceContainer, err := pkgMessagingTest.NewSolaceContainer(context.Background()) assert.NoError(t, err) defer solaceContainer.Stop() t.Run("Missing topic prefix", func(t *testing.T) { defer solaceContainer.StopOnError() api, err := NewAmqpApi(pkgMessagingCommon.AmqpConnectionPoolConfig{ Parameters: pkgMessagingCommon.AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString}, PoolSize: 1, }) assert.NoError(t, err) err = api.Send(ctx, "topic-name", []byte{}, "application/json", make(map[string]any)) assert.EqualError(t, err, "send: topic \"topic-name\" name lacks mandatory prefix \"topic://\"\nretry send: topic \"topic-name\" name lacks mandatory prefix \"topic://\"") }) t.Run("send successfully", func(t *testing.T) { defer solaceContainer.StopOnError() // Initialize the solace queue topicSubscriptionTopicPattern := "auditlog/>" queueName := "send-successfully" assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName)) assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern)) topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-send-successfully") assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName)) api, err := NewDefaultAmqpApi(pkgMessagingCommon.AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString}) assert.NoError(t, err) data := []byte("data") applicationProperties := make(map[string]interface{}) applicationProperties["key"] = "value" err = api.Send(ctx, topicName, data, "application/json", applicationProperties) assert.NoError(t, err) message, err := solaceContainer.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), true) assert.NoError(t, err) assert.Equal(t, "data", string(message.Data[0])) assert.Equal(t, topicName, *message.Properties.To) assert.Equal(t, "application/json", *message.Properties.ContentType) assert.Equal(t, applicationProperties, message.ApplicationProperties) err = api.Close(ctx) assert.NoError(t, err) }) } func Test_AmqpMessagingApi_Send_Special_Cases(t *testing.T) { channelReceiver := func(channel chan struct{}) <-chan struct{} { return channel } newActiveConnection := func() *internalMessaging.AmqpConnection { channel := make(chan struct{}) conn := &amqpConnMock{} conn.On("Done", mock.Anything).Return(channelReceiver(channel)) return &internalMessaging.AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, Conn: conn, } } newClosedConnection := func() *internalMessaging.AmqpConnection { channel := make(chan struct{}) close(channel) conn := &amqpConnMock{} conn.On("Done", mock.Anything).Return(channelReceiver(channel)) return &internalMessaging.AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, Conn: conn, } } t.Run("connection nil sender nil", func(t *testing.T) { sender := &amqpSenderMock{} sender.On("Send", mock.Anything, mock.Anything, mock.Anything).Return(nil) session := &amqpSessionMock{} session.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(sender, nil) connection := newActiveConnection() conn := connection.Conn.(*amqpConnMock) conn.On("NewSession", mock.Anything, mock.Anything).Return(session, nil) pool := &connectionPoolMock{} pool.On("GetConnection", mock.Anything).Return(connection, nil) amqpApi := &AmqpApi{ connectionPool: pool, connectionPoolHandle: &internalMessaging.ConnectionPoolHandle{ConnectionOffset: 0}, senderCache: make(map[string]*internalMessaging.AmqpSenderSession), } err := amqpApi.Send(context.Background(), "topic://some-topic", []byte("data"), "application/json", make(map[string]any)) assert.NoError(t, err) sender.AssertNumberOfCalls(t, "Send", 1) session.AssertNumberOfCalls(t, "NewSender", 1) pool.AssertNumberOfCalls(t, "GetConnection", 2) }) t.Run("connection closed sender nil", func(t *testing.T) { sender := &amqpSenderMock{} sender.On("Send", mock.Anything, mock.Anything, mock.Anything).Return(nil) session := &amqpSessionMock{} session.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(sender, nil) connection := newActiveConnection() conn := connection.Conn.(*amqpConnMock) conn.On("NewSession", mock.Anything, mock.Anything).Return(session, nil) pool := &connectionPoolMock{} pool.On("GetConnection", mock.Anything).Return(connection, nil) closedConnection := newClosedConnection() closedConnMock := closedConnection.Conn.(*amqpConnMock) amqpApi := &AmqpApi{ connection: closedConnection, connectionPool: pool, connectionPoolHandle: &internalMessaging.ConnectionPoolHandle{ConnectionOffset: 0}, senderCache: make(map[string]*internalMessaging.AmqpSenderSession), } err := amqpApi.Send(context.Background(), "topic://some-topic", []byte("data"), "application/json", make(map[string]any)) assert.NoError(t, err) sender.AssertNumberOfCalls(t, "Send", 1) session.AssertNumberOfCalls(t, "NewSender", 1) pool.AssertNumberOfCalls(t, "GetConnection", 2) closedConnMock.AssertNumberOfCalls(t, "Done", 1) }) t.Run("connection nil get connection fail", func(t *testing.T) { var connection *internalMessaging.AmqpConnection = nil pool := &connectionPoolMock{} pool.On("GetConnection", mock.Anything).Return(connection, errors.New("connection error")) amqpApi := &AmqpApi{ connectionPool: pool, connectionPoolHandle: &internalMessaging.ConnectionPoolHandle{ConnectionOffset: 0}, senderCache: make(map[string]*internalMessaging.AmqpSenderSession), } err := amqpApi.Send(context.Background(), "topic://some-topic", []byte("data"), "application/json", make(map[string]any)) assert.EqualError(t, err, "get connection: connection error") pool.AssertNumberOfCalls(t, "GetConnection", 2) }) t.Run("connection active sender nil", func(t *testing.T) { sender := &amqpSenderMock{} sender.On("Send", mock.Anything, mock.Anything, mock.Anything).Return(nil) session := &amqpSessionMock{} session.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(sender, nil) connection := newActiveConnection() conn := connection.Conn.(*amqpConnMock) conn.On("NewSession", mock.Anything, mock.Anything).Return(session, nil) amqpApi := &AmqpApi{ connection: connection, senderCache: make(map[string]*internalMessaging.AmqpSenderSession), } err := amqpApi.Send(context.Background(), "topic://some-topic", []byte("data"), "application/json", make(map[string]any)) assert.NoError(t, err) sender.AssertNumberOfCalls(t, "Send", 1) session.AssertNumberOfCalls(t, "NewSender", 1) }) t.Run("connection active new sender fail", func(t *testing.T) { var sender *amqpSenderMock = nil session := &amqpSessionMock{} session.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(sender, errors.New("new sender error")) session.On("Close", mock.Anything).Return(nil) connection := newActiveConnection() conn := connection.Conn.(*amqpConnMock) conn.On("NewSession", mock.Anything, mock.Anything).Return(session, nil) amqpApi := &AmqpApi{ connection: connection, senderCache: make(map[string]*internalMessaging.AmqpSenderSession), } err := amqpApi.Send(context.Background(), "topic://some-topic", []byte("data"), "application/json", make(map[string]any)) assert.EqualError(t, err, "new sender: new internal sender: new sender error") session.AssertNumberOfCalls(t, "NewSender", 1) session.AssertNumberOfCalls(t, "Close", 1) }) t.Run("connection active sender set", func(t *testing.T) { sender := &amqpSenderMock{} sender.On("Send", mock.Anything, mock.Anything, mock.Anything).Return(nil) topic := "topic://some-topic" amqpApi := &AmqpApi{ connection: newActiveConnection(), senderCache: map[string]*internalMessaging.AmqpSenderSession{topic: {Sender: sender}}, } err := amqpApi.Send(context.Background(), topic, []byte("data"), "application/json", make(map[string]any)) assert.NoError(t, err) sender.AssertNumberOfCalls(t, "Send", 1) }) t.Run("send fail", func(t *testing.T) { sender := &amqpSenderMock{} sender.On("Send", mock.Anything, mock.Anything, mock.Anything).Return(errors.New("send error")) session := &amqpSessionMock{} session.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(sender, nil) topic := "topic://some-topic" connection := newActiveConnection() connection.Conn.(*amqpConnMock).On("NewSession", mock.Anything, mock.Anything, mock.Anything).Return(session, nil) amqpApi := &AmqpApi{ connection: connection, senderCache: map[string]*internalMessaging.AmqpSenderSession{topic: {Sender: sender}}, } err := amqpApi.Send(context.Background(), topic, []byte("data"), "application/json", make(map[string]any)) assert.EqualError(t, err, "send: send error\nretry send: send error") sender.AssertNumberOfCalls(t, "Send", 2) }) } func Test_AmqpMessagingApi_Close(t *testing.T) { t.Run("close without cached senders", func(t *testing.T) { pool := &connectionPoolMock{} pool.On("Close").Return(nil) amqpApi := &AmqpApi{ connectionPool: pool, connectionPoolHandle: &internalMessaging.ConnectionPoolHandle{ConnectionOffset: 0}, senderCache: make(map[string]*internalMessaging.AmqpSenderSession), } err := amqpApi.Close(context.Background()) assert.NoError(t, err) pool.AssertNumberOfCalls(t, "Close", 1) }) t.Run("close fail without cached senders", func(t *testing.T) { pool := &connectionPoolMock{} pool.On("Close").Return(errors.New("close error")) amqpApi := &AmqpApi{ connectionPool: pool, connectionPoolHandle: &internalMessaging.ConnectionPoolHandle{ConnectionOffset: 0}, senderCache: make(map[string]*internalMessaging.AmqpSenderSession), } err := amqpApi.Close(context.Background()) assert.EqualError(t, err, "close: close pool: close error") pool.AssertNumberOfCalls(t, "Close", 1) }) t.Run("close with cached senders", func(t *testing.T) { pool := &connectionPoolMock{} pool.On("Close").Return(nil) session := &amqpSessionMock{} session.On("Close", mock.Anything).Return(nil) sender := &amqpSenderMock{} sender.On("Close", mock.Anything).Return(nil) senderSession := &internalMessaging.AmqpSenderSession{ Session: session, Sender: sender, } amqpApi := &AmqpApi{ connectionPool: pool, connectionPoolHandle: &internalMessaging.ConnectionPoolHandle{ConnectionOffset: 0}, senderCache: map[string]*internalMessaging.AmqpSenderSession{"key": senderSession}, } err := amqpApi.Close(context.Background()) assert.NoError(t, err) assert.Equal(t, 0, len(amqpApi.senderCache)) pool.AssertNumberOfCalls(t, "Close", 1) session.AssertNumberOfCalls(t, "Close", 1) sender.AssertNumberOfCalls(t, "Close", 1) }) t.Run("close fail with cached senders", func(t *testing.T) { pool := &connectionPoolMock{} pool.On("Close").Return(nil) session := &amqpSessionMock{} session.On("Close", mock.Anything).Return(nil) sender := &amqpSenderMock{} sender.On("Close", mock.Anything).Return(errors.New("close sender error")) senderSession := &internalMessaging.AmqpSenderSession{ Session: session, Sender: sender, } amqpApi := &AmqpApi{ connectionPool: pool, connectionPoolHandle: &internalMessaging.ConnectionPoolHandle{ConnectionOffset: 0}, senderCache: map[string]*internalMessaging.AmqpSenderSession{"key": senderSession}, } err := amqpApi.Close(context.Background()) assert.EqualError(t, err, "close: close session: close sender error") assert.Equal(t, 0, len(amqpApi.senderCache)) pool.AssertNumberOfCalls(t, "Close", 1) session.AssertNumberOfCalls(t, "Close", 1) sender.AssertNumberOfCalls(t, "Close", 1) }) t.Run("close fail", func(t *testing.T) { pool := &connectionPoolMock{} pool.On("Close").Return(errors.New("close pool error")) session := &amqpSessionMock{} session.On("Close", mock.Anything).Return(errors.New("close session error")) sender := &amqpSenderMock{} sender.On("Close", mock.Anything).Return(errors.New("close sender error")) senderSession := &internalMessaging.AmqpSenderSession{ Session: session, Sender: sender, } amqpApi := &AmqpApi{ connectionPool: pool, connectionPoolHandle: &internalMessaging.ConnectionPoolHandle{ConnectionOffset: 0}, senderCache: map[string]*internalMessaging.AmqpSenderSession{"key": senderSession}, } err := amqpApi.Close(context.Background()) assert.EqualError(t, err, "close: close session: close sender error\nclose session error\nclose pool: close pool error") assert.Equal(t, 0, len(amqpApi.senderCache)) pool.AssertNumberOfCalls(t, "Close", 1) session.AssertNumberOfCalls(t, "Close", 1) sender.AssertNumberOfCalls(t, "Close", 1) }) }