package messaging import ( "context" "errors" "fmt" "github.com/Azure/go-amqp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "testing" "time" ) type AmqpSessionMock struct { mock.Mock } func (m *AmqpSessionMock) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (*AmqpSender, error) { args := m.Called(ctx, target, opts) var sender *AmqpSender = nil if args.Get(0) != nil { sender = args.Get(0).(*AmqpSender) } err := args.Error(1) return sender, err } func (m *AmqpSessionMock) Close(ctx context.Context) error { args := m.Called(ctx) return args.Error(0) } type AmqpSenderMock struct { mock.Mock } func (m *AmqpSenderMock) Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error { args := m.Called(ctx, msg, opts) return args.Error(0) } func (m *AmqpSenderMock) Close(ctx context.Context) error { args := m.Called(ctx) return args.Error(0) } func Test_NewAmqpMessagingApi(t *testing.T) { _, err := NewAmqpApi(AmqpConfig{URL: "not-handled-protocol://localhost:5672"}) assert.EqualError(t, err, "unsupported scheme \"not-handled-protocol\"") } func Test_AmqpMessagingApi_Send(t *testing.T) { // Specify test timeout ctx, cancelFn := context.WithTimeout(context.Background(), 120*time.Second) defer cancelFn() // Start solace docker container solaceContainer, err := NewSolaceContainer(context.Background()) assert.NoError(t, err) defer solaceContainer.Stop() t.Run("Missing topic prefix", func(t *testing.T) { defer solaceContainer.StopOnError() api, err := NewAmqpApi(AmqpConfig{URL: solaceContainer.AmqpConnectionString}) assert.NoError(t, err) err = (*api).Send(ctx, "topic-name", []byte{}, "application/json", make(map[string]any)) assert.EqualError(t, err, "topic \"topic-name\" name lacks mandatory prefix \"topic://\"") }) t.Run("New sender call returns error", func(t *testing.T) { defer solaceContainer.StopOnError() // Initialize the solace queue topicSubscriptionTopicPattern := "auditlog/>" queueName := "messaging-new-sender" assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName)) assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern)) topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-no-new-sender") assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName)) api := &AmqpApi{config: AmqpConfig{URL: solaceContainer.AmqpConnectionString}} err := api.connect() assert.NoError(t, err) expectedError := errors.New("expected error") // Set mock session sessionMock := AmqpSessionMock{} sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(nil, expectedError) sessionMock.On("Close", mock.Anything).Return(nil) var amqpSession AmqpSession = &sessionMock api.session = &amqpSession // It's expected that the test succeeds. // First the session is closed as it returns the expected error // Then the retry mechanism restarts the connection and successfully sends the data value := "test" err = (*api).Send(ctx, topicName, []byte(value), "application/json", make(map[string]any)) assert.NoError(t, err) // Check that the mock was called assert.True(t, sessionMock.AssertNumberOfCalls(t, "NewSender", 1)) assert.True(t, sessionMock.AssertNumberOfCalls(t, "Close", 1)) message, err := solaceContainer.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), true) assert.NoError(t, err) assert.Equal(t, value, string(message.Data[0])) assert.Equal(t, topicName, *message.Properties.To) }) t.Run("Send call on sender returns error", func(t *testing.T) { defer solaceContainer.StopOnError() // Initialize the solace queue topicSubscriptionTopicPattern := "auditlog/>" queueName := "messaging-sender-error" assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName)) assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern)) topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-sender-error") assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName)) api := &AmqpApi{config: AmqpConfig{URL: solaceContainer.AmqpConnectionString}} err := api.connect() assert.NoError(t, err) expectedError := errors.New("expected error") // Instantiate mock sender senderMock := AmqpSenderMock{} senderMock.On("Send", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(expectedError) senderMock.On("Close", mock.Anything).Return(nil) var amqpSender AmqpSender = &senderMock // Set mock session sessionMock := AmqpSessionMock{} sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(&amqpSender, nil) sessionMock.On("Close", mock.Anything).Return(nil) var amqpSession AmqpSession = &sessionMock api.session = &amqpSession // It's expected that the test succeeds. // First the sender and session are closed as the sender returns the expected error // Then the retry mechanism restarts the connection and successfully sends the data value := "test" err = (*api).Send(ctx, topicName, []byte(value), "application/json", make(map[string]any)) assert.NoError(t, err) // Check that the mocks were called assert.True(t, sessionMock.AssertNumberOfCalls(t, "NewSender", 1)) assert.True(t, sessionMock.AssertNumberOfCalls(t, "Close", 1)) assert.True(t, senderMock.AssertNumberOfCalls(t, "Send", 1)) assert.True(t, senderMock.AssertNumberOfCalls(t, "Close", 1)) message, err := solaceContainer.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), true) assert.NoError(t, err) assert.Equal(t, value, string(message.Data[0])) assert.Equal(t, topicName, *message.Properties.To) }) }