package messaging import ( "context" "fmt" "github.com/stretchr/testify/assert" "testing" "time" ) //type AmqpSessionMock struct { // mock.Mock //} // //func (m *AmqpSessionMock) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error) { // args := m.Called(ctx, target, opts) // var sender AmqpSender = nil // if args.Get(0) != nil { // sender = args.Get(0).(AmqpSender) // } // err := args.Error(1) // return sender, err //} // //func (m *AmqpSessionMock) Close(ctx context.Context) error { // args := m.Called(ctx) // return args.Error(0) //} // //type AmqpSenderMock struct { // mock.Mock //} // //func (m *AmqpSenderMock) Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error { // args := m.Called(ctx, msg, opts) // return args.Error(0) //} // //func (m *AmqpSenderMock) Close(ctx context.Context) error { // args := m.Called(ctx) // return args.Error(0) //} func Test_NewAmqpMessagingApi(t *testing.T) { _, err := NewAmqpApi( AmqpConnectionPoolConfig{ Parameters: AmqpConnectionConfig{BrokerUrl: "not-handled-protocol://localhost:5672"}, PoolSize: 1, }) assert.EqualError(t, err, "new amqp connection pool: initialize connections: new connection: failed to connect to amqp broker: dial: unsupported scheme \"not-handled-protocol\"") } func Test_AmqpMessagingApi_Send(t *testing.T) { // Specify test timeout ctx, cancelFn := context.WithTimeout(context.Background(), 120*time.Second) defer cancelFn() // Start solace docker container solaceContainer, err := NewSolaceContainer(context.Background()) assert.NoError(t, err) defer solaceContainer.Stop() t.Run("Missing topic prefix", func(t *testing.T) { defer solaceContainer.StopOnError() api, err := NewAmqpApi(AmqpConnectionPoolConfig{ Parameters: AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString}, PoolSize: 1, }) assert.NoError(t, err) err = api.Send(ctx, "topic-name", []byte{}, "application/json", make(map[string]any)) assert.EqualError(t, err, "send: topic \"topic-name\" name lacks mandatory prefix \"topic://\"") }) t.Run("Close connection without errors", func(t *testing.T) { defer solaceContainer.StopOnError() // Initialize the solace queue topicSubscriptionTopicPattern := "auditlog/>" queueName := "close-connection-without-error" assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName)) assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern)) topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-close-connection") assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName)) api, err := NewAmqpApi(AmqpConnectionPoolConfig{ Parameters: AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString}, PoolSize: 1, }) assert.NoError(t, err) err = api.Close(ctx) assert.NoError(t, err) }) //t.Run("New sender call returns error", func(t *testing.T) { // defer solaceContainer.StopOnError() // // // Initialize the solace queue // topicSubscriptionTopicPattern := "auditlog/>" // queueName := "messaging-new-sender" // assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName)) // assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern)) // topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-no-new-sender") // assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName)) // // api, err := NewAmqpApi(AmqpConnectionPoolConfig{ // Parameters: AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString}, // PoolSize: 1, // }) // assert.NoError(t, err) // // expectedError := errors.New("expected error") // // // Set mock session // sessionMock := AmqpSessionMock{} // sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(nil, expectedError) // sessionMock.On("Close", mock.Anything).Return(nil) // // var amqpSession AmqpSession = &sessionMock // api.session = amqpSession // // // It's expected that the test succeeds. // // First the session is closed as it returns the expected error // // Then the retry mechanism restarts the connection and successfully sends the data // value := "test" // err = api.Send(ctx, topicName, []byte(value), "application/json", make(map[string]any)) // assert.NoError(t, err) // // // Check that the mock was called // assert.True(t, sessionMock.AssertNumberOfCalls(t, "NewSender", 1)) // assert.True(t, sessionMock.AssertNumberOfCalls(t, "Close", 1)) // // message, err := solaceContainer.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), true) // assert.NoError(t, err) // assert.Equal(t, value, string(message.Data[0])) // assert.Equal(t, topicName, *message.Properties.To) //}) // //t.Run("Send call on sender returns error", func(t *testing.T) { // defer solaceContainer.StopOnError() // // // Initialize the solace queue // topicSubscriptionTopicPattern := "auditlog/>" // queueName := "messaging-sender-error" // assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName)) // assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern)) // topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-sender-error") // assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName)) // // api := &AmqpApi{config: AmqpConfig{URL: solaceContainer.AmqpConnectionString}} // err := api.connect() // assert.NoError(t, err) // // expectedError := errors.New("expected error") // // // Instantiate mock sender // senderMock := AmqpSenderMock{} // senderMock.On("Send", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(expectedError) // senderMock.On("Close", mock.Anything).Return(nil) // var amqpSender AmqpSender = &senderMock // // // Set mock session // sessionMock := AmqpSessionMock{} // sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(&amqpSender, nil) // sessionMock.On("Close", mock.Anything).Return(nil) // // var amqpSession AmqpSession = &sessionMock // api.session = amqpSession // // // It's expected that the test succeeds. // // First the sender and session are closed as the sender returns the expected error // // Then the retry mechanism restarts the connection and successfully sends the data // value := "test" // err = api.Send(ctx, topicName, []byte(value), "application/json", make(map[string]any)) // assert.NoError(t, err) // // // Check that the mocks were called // assert.True(t, sessionMock.AssertNumberOfCalls(t, "NewSender", 1)) // assert.True(t, sessionMock.AssertNumberOfCalls(t, "Close", 1)) // assert.True(t, senderMock.AssertNumberOfCalls(t, "Send", 1)) // assert.True(t, senderMock.AssertNumberOfCalls(t, "Close", 1)) // // message, err := solaceContainer.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), true) // assert.NoError(t, err) // assert.Equal(t, value, string(message.Data[0])) // assert.Equal(t, topicName, *message.Properties.To) //}) }