mirror of
https://dev.azure.com/schwarzit/schwarzit.stackit-public/_git/audit-go
synced 2026-02-08 00:57:24 +00:00
190 lines
7 KiB
Go
190 lines
7 KiB
Go
package messaging
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/stretchr/testify/assert"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
//type AmqpSessionMock struct {
|
|
// mock.Mock
|
|
//}
|
|
//
|
|
//func (m *AmqpSessionMock) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error) {
|
|
// args := m.Called(ctx, target, opts)
|
|
// var sender AmqpSender = nil
|
|
// if args.Get(0) != nil {
|
|
// sender = args.Get(0).(AmqpSender)
|
|
// }
|
|
// err := args.Error(1)
|
|
// return sender, err
|
|
//}
|
|
//
|
|
//func (m *AmqpSessionMock) Close(ctx context.Context) error {
|
|
// args := m.Called(ctx)
|
|
// return args.Error(0)
|
|
//}
|
|
//
|
|
//type AmqpSenderMock struct {
|
|
// mock.Mock
|
|
//}
|
|
//
|
|
//func (m *AmqpSenderMock) Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error {
|
|
// args := m.Called(ctx, msg, opts)
|
|
// return args.Error(0)
|
|
//}
|
|
//
|
|
//func (m *AmqpSenderMock) Close(ctx context.Context) error {
|
|
// args := m.Called(ctx)
|
|
// return args.Error(0)
|
|
//}
|
|
|
|
func Test_NewAmqpMessagingApi(t *testing.T) {
|
|
_, err := NewAmqpApi(
|
|
AmqpConnectionPoolConfig{
|
|
Parameters: AmqpConnectionConfig{BrokerUrl: "not-handled-protocol://localhost:5672"},
|
|
PoolSize: 1,
|
|
})
|
|
assert.EqualError(t, err, "new amqp connection pool: initialize connections: new connection: failed to connect to amqp broker: dial: unsupported scheme \"not-handled-protocol\"")
|
|
}
|
|
|
|
func Test_AmqpMessagingApi_Send(t *testing.T) {
|
|
// Specify test timeout
|
|
ctx, cancelFn := context.WithTimeout(context.Background(), 120*time.Second)
|
|
defer cancelFn()
|
|
|
|
// Start solace docker container
|
|
solaceContainer, err := NewSolaceContainer(context.Background())
|
|
assert.NoError(t, err)
|
|
defer solaceContainer.Stop()
|
|
|
|
t.Run("Missing topic prefix", func(t *testing.T) {
|
|
defer solaceContainer.StopOnError()
|
|
|
|
api, err := NewAmqpApi(AmqpConnectionPoolConfig{
|
|
Parameters: AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString},
|
|
PoolSize: 1,
|
|
})
|
|
assert.NoError(t, err)
|
|
|
|
err = api.Send(ctx, "topic-name", []byte{}, "application/json", make(map[string]any))
|
|
assert.EqualError(t, err, "send: topic \"topic-name\" name lacks mandatory prefix \"topic://\"")
|
|
})
|
|
|
|
t.Run("Close connection without errors", func(t *testing.T) {
|
|
defer solaceContainer.StopOnError()
|
|
|
|
// Initialize the solace queue
|
|
topicSubscriptionTopicPattern := "auditlog/>"
|
|
queueName := "close-connection-without-error"
|
|
assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName))
|
|
assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern))
|
|
topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-close-connection")
|
|
assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName))
|
|
|
|
api, err := NewAmqpApi(AmqpConnectionPoolConfig{
|
|
Parameters: AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString},
|
|
PoolSize: 1,
|
|
})
|
|
assert.NoError(t, err)
|
|
|
|
err = api.Close(ctx)
|
|
assert.NoError(t, err)
|
|
})
|
|
|
|
//t.Run("New sender call returns error", func(t *testing.T) {
|
|
// defer solaceContainer.StopOnError()
|
|
//
|
|
// // Initialize the solace queue
|
|
// topicSubscriptionTopicPattern := "auditlog/>"
|
|
// queueName := "messaging-new-sender"
|
|
// assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName))
|
|
// assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern))
|
|
// topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-no-new-sender")
|
|
// assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName))
|
|
//
|
|
// api, err := NewAmqpApi(AmqpConnectionPoolConfig{
|
|
// Parameters: AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString},
|
|
// PoolSize: 1,
|
|
// })
|
|
// assert.NoError(t, err)
|
|
//
|
|
// expectedError := errors.New("expected error")
|
|
//
|
|
// // Set mock session
|
|
// sessionMock := AmqpSessionMock{}
|
|
// sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(nil, expectedError)
|
|
// sessionMock.On("Close", mock.Anything).Return(nil)
|
|
//
|
|
// var amqpSession AmqpSession = &sessionMock
|
|
// api.session = amqpSession
|
|
//
|
|
// // It's expected that the test succeeds.
|
|
// // First the session is closed as it returns the expected error
|
|
// // Then the retry mechanism restarts the connection and successfully sends the data
|
|
// value := "test"
|
|
// err = api.Send(ctx, topicName, []byte(value), "application/json", make(map[string]any))
|
|
// assert.NoError(t, err)
|
|
//
|
|
// // Check that the mock was called
|
|
// assert.True(t, sessionMock.AssertNumberOfCalls(t, "NewSender", 1))
|
|
// assert.True(t, sessionMock.AssertNumberOfCalls(t, "Close", 1))
|
|
//
|
|
// message, err := solaceContainer.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), true)
|
|
// assert.NoError(t, err)
|
|
// assert.Equal(t, value, string(message.Data[0]))
|
|
// assert.Equal(t, topicName, *message.Properties.To)
|
|
//})
|
|
//
|
|
//t.Run("Send call on sender returns error", func(t *testing.T) {
|
|
// defer solaceContainer.StopOnError()
|
|
//
|
|
// // Initialize the solace queue
|
|
// topicSubscriptionTopicPattern := "auditlog/>"
|
|
// queueName := "messaging-sender-error"
|
|
// assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName))
|
|
// assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern))
|
|
// topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-sender-error")
|
|
// assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName))
|
|
//
|
|
// api := &AmqpApi{config: AmqpConfig{URL: solaceContainer.AmqpConnectionString}}
|
|
// err := api.connect()
|
|
// assert.NoError(t, err)
|
|
//
|
|
// expectedError := errors.New("expected error")
|
|
//
|
|
// // Instantiate mock sender
|
|
// senderMock := AmqpSenderMock{}
|
|
// senderMock.On("Send", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(expectedError)
|
|
// senderMock.On("Close", mock.Anything).Return(nil)
|
|
// var amqpSender AmqpSender = &senderMock
|
|
//
|
|
// // Set mock session
|
|
// sessionMock := AmqpSessionMock{}
|
|
// sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(&amqpSender, nil)
|
|
// sessionMock.On("Close", mock.Anything).Return(nil)
|
|
//
|
|
// var amqpSession AmqpSession = &sessionMock
|
|
// api.session = amqpSession
|
|
//
|
|
// // It's expected that the test succeeds.
|
|
// // First the sender and session are closed as the sender returns the expected error
|
|
// // Then the retry mechanism restarts the connection and successfully sends the data
|
|
// value := "test"
|
|
// err = api.Send(ctx, topicName, []byte(value), "application/json", make(map[string]any))
|
|
// assert.NoError(t, err)
|
|
//
|
|
// // Check that the mocks were called
|
|
// assert.True(t, sessionMock.AssertNumberOfCalls(t, "NewSender", 1))
|
|
// assert.True(t, sessionMock.AssertNumberOfCalls(t, "Close", 1))
|
|
// assert.True(t, senderMock.AssertNumberOfCalls(t, "Send", 1))
|
|
// assert.True(t, senderMock.AssertNumberOfCalls(t, "Close", 1))
|
|
//
|
|
// message, err := solaceContainer.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), true)
|
|
// assert.NoError(t, err)
|
|
// assert.Equal(t, value, string(message.Data[0]))
|
|
// assert.Equal(t, topicName, *message.Properties.To)
|
|
//})
|
|
}
|