audit-go/audit/messaging/messaging_test.go
2024-11-14 13:30:25 +00:00

182 lines
6.4 KiB
Go

package messaging
import (
"context"
"errors"
"fmt"
"github.com/Azure/go-amqp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"testing"
"time"
)
type AmqpSessionMock struct {
mock.Mock
}
func (m *AmqpSessionMock) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error) {
args := m.Called(ctx, target, opts)
var sender AmqpSender = nil
if args.Get(0) != nil {
sender = args.Get(0).(AmqpSender)
}
err := args.Error(1)
return sender, err
}
func (m *AmqpSessionMock) Close(ctx context.Context) error {
args := m.Called(ctx)
return args.Error(0)
}
type AmqpSenderMock struct {
mock.Mock
}
func (m *AmqpSenderMock) Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error {
args := m.Called(ctx, msg, opts)
return args.Error(0)
}
func (m *AmqpSenderMock) Close(ctx context.Context) error {
args := m.Called(ctx)
return args.Error(0)
}
func Test_NewAmqpMessagingApi(t *testing.T) {
_, err := NewAmqpApi(AmqpConfig{URL: "not-handled-protocol://localhost:5672"})
assert.EqualError(t, err, "unsupported scheme \"not-handled-protocol\"")
}
func Test_AmqpMessagingApi_Send(t *testing.T) {
// Specify test timeout
ctx, cancelFn := context.WithTimeout(context.Background(), 120*time.Second)
defer cancelFn()
// Start solace docker container
solaceContainer, err := NewSolaceContainer(context.Background())
assert.NoError(t, err)
defer solaceContainer.Stop()
t.Run("Missing topic prefix", func(t *testing.T) {
defer solaceContainer.StopOnError()
api, err := NewAmqpApi(AmqpConfig{URL: solaceContainer.AmqpConnectionString})
assert.NoError(t, err)
err = api.Send(ctx, "topic-name", []byte{}, "application/json", make(map[string]any))
assert.EqualError(t, err, "topic \"topic-name\" name lacks mandatory prefix \"topic://\"")
})
t.Run("Close connection without errors", func(t *testing.T) {
defer solaceContainer.StopOnError()
// Initialize the solace queue
topicSubscriptionTopicPattern := "auditlog/>"
queueName := "close-connection-without-error"
assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName))
assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern))
topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-close-connection")
assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName))
api := &AmqpApi{config: AmqpConfig{URL: solaceContainer.AmqpConnectionString}}
err := api.connect()
assert.NoError(t, err)
err = api.Close(ctx)
assert.NoError(t, err)
})
t.Run("New sender call returns error", func(t *testing.T) {
defer solaceContainer.StopOnError()
// Initialize the solace queue
topicSubscriptionTopicPattern := "auditlog/>"
queueName := "messaging-new-sender"
assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName))
assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern))
topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-no-new-sender")
assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName))
api := &AmqpApi{config: AmqpConfig{URL: solaceContainer.AmqpConnectionString}}
err := api.connect()
assert.NoError(t, err)
expectedError := errors.New("expected error")
// Set mock session
sessionMock := AmqpSessionMock{}
sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(nil, expectedError)
sessionMock.On("Close", mock.Anything).Return(nil)
var amqpSession AmqpSession = &sessionMock
api.session = amqpSession
// It's expected that the test succeeds.
// First the session is closed as it returns the expected error
// Then the retry mechanism restarts the connection and successfully sends the data
value := "test"
err = api.Send(ctx, topicName, []byte(value), "application/json", make(map[string]any))
assert.NoError(t, err)
// Check that the mock was called
assert.True(t, sessionMock.AssertNumberOfCalls(t, "NewSender", 1))
assert.True(t, sessionMock.AssertNumberOfCalls(t, "Close", 1))
message, err := solaceContainer.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), true)
assert.NoError(t, err)
assert.Equal(t, value, string(message.Data[0]))
assert.Equal(t, topicName, *message.Properties.To)
})
t.Run("Send call on sender returns error", func(t *testing.T) {
defer solaceContainer.StopOnError()
// Initialize the solace queue
topicSubscriptionTopicPattern := "auditlog/>"
queueName := "messaging-sender-error"
assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName))
assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern))
topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-sender-error")
assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName))
api := &AmqpApi{config: AmqpConfig{URL: solaceContainer.AmqpConnectionString}}
err := api.connect()
assert.NoError(t, err)
expectedError := errors.New("expected error")
// Instantiate mock sender
senderMock := AmqpSenderMock{}
senderMock.On("Send", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(expectedError)
senderMock.On("Close", mock.Anything).Return(nil)
var amqpSender AmqpSender = &senderMock
// Set mock session
sessionMock := AmqpSessionMock{}
sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(&amqpSender, nil)
sessionMock.On("Close", mock.Anything).Return(nil)
var amqpSession AmqpSession = &sessionMock
api.session = amqpSession
// It's expected that the test succeeds.
// First the sender and session are closed as the sender returns the expected error
// Then the retry mechanism restarts the connection and successfully sends the data
value := "test"
err = api.Send(ctx, topicName, []byte(value), "application/json", make(map[string]any))
assert.NoError(t, err)
// Check that the mocks were called
assert.True(t, sessionMock.AssertNumberOfCalls(t, "NewSender", 1))
assert.True(t, sessionMock.AssertNumberOfCalls(t, "Close", 1))
assert.True(t, senderMock.AssertNumberOfCalls(t, "Send", 1))
assert.True(t, senderMock.AssertNumberOfCalls(t, "Close", 1))
message, err := solaceContainer.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), true)
assert.NoError(t, err)
assert.Equal(t, value, string(message.Data[0]))
assert.Equal(t, topicName, *message.Properties.To)
})
}