mirror of
https://dev.azure.com/schwarzit/schwarzit.stackit-public/_git/audit-go
synced 2026-02-07 16:47:24 +00:00
463 lines
16 KiB
Go
463 lines
16 KiB
Go
package api
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/Azure/go-amqp"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/mock"
|
|
|
|
internalMessaging "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/internal/messaging"
|
|
pkgMessagingCommon "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/messaging/common"
|
|
pkgMessagingTest "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/messaging/test"
|
|
)
|
|
|
|
type amqpConnMock struct {
|
|
mock.Mock
|
|
}
|
|
|
|
func (m *amqpConnMock) Done() <-chan struct{} {
|
|
args := m.Called()
|
|
return args.Get(0).(<-chan struct{})
|
|
}
|
|
|
|
func (m *amqpConnMock) NewSession(ctx context.Context, opts *amqp.SessionOptions) (internalMessaging.AmqpSession, error) {
|
|
args := m.Called(ctx, opts)
|
|
return args.Get(0).(internalMessaging.AmqpSession), args.Error(1)
|
|
}
|
|
|
|
func (m *amqpConnMock) Close() error {
|
|
args := m.Called()
|
|
return args.Error(0)
|
|
}
|
|
|
|
var _ internalMessaging.AmqpConn = (*amqpConnMock)(nil)
|
|
|
|
type amqpSenderMock struct {
|
|
mock.Mock
|
|
}
|
|
|
|
func (m *amqpSenderMock) Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error {
|
|
return m.Called(ctx, msg, opts).Error(0)
|
|
}
|
|
|
|
func (m *amqpSenderMock) Close(ctx context.Context) error {
|
|
return m.Called(ctx).Error(0)
|
|
}
|
|
|
|
var _ internalMessaging.AmqpSender = (*amqpSenderMock)(nil)
|
|
|
|
type amqpSessionMock struct {
|
|
mock.Mock
|
|
}
|
|
|
|
func (m *amqpSessionMock) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (internalMessaging.AmqpSender, error) {
|
|
args := m.Called(ctx, target, opts)
|
|
return args.Get(0).(internalMessaging.AmqpSender), args.Error(1)
|
|
}
|
|
|
|
func (m *amqpSessionMock) Close(ctx context.Context) error {
|
|
args := m.Called(ctx)
|
|
return args.Error(0)
|
|
}
|
|
|
|
var _ internalMessaging.AmqpSession = (*amqpSessionMock)(nil)
|
|
|
|
type connectionPoolMock struct {
|
|
mock.Mock
|
|
}
|
|
|
|
func (m *connectionPoolMock) Close() error {
|
|
return m.Called().Error(0)
|
|
}
|
|
|
|
func (m *connectionPoolMock) NewHandle() *internalMessaging.ConnectionPoolHandle {
|
|
return m.Called().Get(0).(*internalMessaging.ConnectionPoolHandle)
|
|
}
|
|
|
|
func (m *connectionPoolMock) GetConnection(handle *internalMessaging.ConnectionPoolHandle) (*internalMessaging.AmqpConnection, error) {
|
|
return m.Called(handle).Get(0).(*internalMessaging.AmqpConnection), m.Called(handle).Error(1)
|
|
}
|
|
|
|
var _ internalMessaging.ConnectionPool = (*connectionPoolMock)(nil)
|
|
|
|
func Test_NewAmqpMessagingApi(t *testing.T) {
|
|
_, err := NewAmqpApi(
|
|
pkgMessagingCommon.AmqpConnectionPoolConfig{
|
|
Parameters: pkgMessagingCommon.AmqpConnectionConfig{BrokerUrl: "not-handled-protocol://localhost:5672"},
|
|
PoolSize: 1,
|
|
})
|
|
assert.EqualError(t, err, "new amqp connection pool: initialize connections: new connection: new internal connection: internal connect: dial: unsupported scheme \"not-handled-protocol\"")
|
|
}
|
|
|
|
func Test_AmqpMessagingApi_Send(t *testing.T) {
|
|
// Specify test timeout
|
|
ctx, cancelFn := context.WithTimeout(context.Background(), 120*time.Second)
|
|
defer cancelFn()
|
|
|
|
// Start solace docker container
|
|
solaceContainer, err := pkgMessagingTest.NewSolaceContainer(context.Background())
|
|
assert.NoError(t, err)
|
|
defer solaceContainer.Stop()
|
|
|
|
t.Run("Missing topic prefix", func(t *testing.T) {
|
|
defer solaceContainer.StopOnError()
|
|
|
|
api, err := NewAmqpApi(pkgMessagingCommon.AmqpConnectionPoolConfig{
|
|
Parameters: pkgMessagingCommon.AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString},
|
|
PoolSize: 1,
|
|
})
|
|
assert.NoError(t, err)
|
|
|
|
err = api.Send(ctx, "topic-name", []byte{}, "application/json", make(map[string]any))
|
|
assert.EqualError(t, err, "send: topic \"topic-name\" name lacks mandatory prefix \"topic://\"\nretry send: topic \"topic-name\" name lacks mandatory prefix \"topic://\"")
|
|
})
|
|
|
|
t.Run("send successfully", func(t *testing.T) {
|
|
defer solaceContainer.StopOnError()
|
|
|
|
// Initialize the solace queue
|
|
topicSubscriptionTopicPattern := "auditlog/>"
|
|
queueName := "send-successfully"
|
|
assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName))
|
|
assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern))
|
|
topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-send-successfully")
|
|
assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName))
|
|
|
|
api, err := NewDefaultAmqpApi(pkgMessagingCommon.AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString})
|
|
assert.NoError(t, err)
|
|
|
|
data := []byte("data")
|
|
applicationProperties := make(map[string]interface{})
|
|
applicationProperties["key"] = "value"
|
|
|
|
err = api.Send(ctx, topicName, data, "application/json", applicationProperties)
|
|
assert.NoError(t, err)
|
|
|
|
message, err := solaceContainer.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), true)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, "data", string(message.Data[0]))
|
|
assert.Equal(t, topicName, *message.Properties.To)
|
|
assert.Equal(t, "application/json", *message.Properties.ContentType)
|
|
assert.Equal(t, applicationProperties, message.ApplicationProperties)
|
|
|
|
err = api.Close(ctx)
|
|
assert.NoError(t, err)
|
|
})
|
|
}
|
|
|
|
func Test_AmqpMessagingApi_Send_Special_Cases(t *testing.T) {
|
|
|
|
channelReceiver := func(channel chan struct{}) <-chan struct{} {
|
|
return channel
|
|
}
|
|
|
|
newActiveConnection := func() *internalMessaging.AmqpConnection {
|
|
channel := make(chan struct{})
|
|
conn := &amqpConnMock{}
|
|
conn.On("Done", mock.Anything).Return(channelReceiver(channel))
|
|
return &internalMessaging.AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
Conn: conn,
|
|
}
|
|
}
|
|
|
|
newClosedConnection := func() *internalMessaging.AmqpConnection {
|
|
channel := make(chan struct{})
|
|
close(channel)
|
|
|
|
conn := &amqpConnMock{}
|
|
conn.On("Done", mock.Anything).Return(channelReceiver(channel))
|
|
return &internalMessaging.AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
Conn: conn,
|
|
}
|
|
}
|
|
|
|
t.Run("connection nil sender nil", func(t *testing.T) {
|
|
sender := &amqpSenderMock{}
|
|
sender.On("Send", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
|
|
|
session := &amqpSessionMock{}
|
|
session.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(sender, nil)
|
|
|
|
connection := newActiveConnection()
|
|
conn := connection.Conn.(*amqpConnMock)
|
|
conn.On("NewSession", mock.Anything, mock.Anything).Return(session, nil)
|
|
|
|
pool := &connectionPoolMock{}
|
|
pool.On("GetConnection", mock.Anything).Return(connection, nil)
|
|
|
|
amqpApi := &AmqpApi{
|
|
connectionPool: pool,
|
|
connectionPoolHandle: &internalMessaging.ConnectionPoolHandle{ConnectionOffset: 0},
|
|
senderCache: make(map[string]*internalMessaging.AmqpSenderSession),
|
|
}
|
|
|
|
err := amqpApi.Send(context.Background(), "topic://some-topic", []byte("data"), "application/json", make(map[string]any))
|
|
assert.NoError(t, err)
|
|
|
|
sender.AssertNumberOfCalls(t, "Send", 1)
|
|
session.AssertNumberOfCalls(t, "NewSender", 1)
|
|
pool.AssertNumberOfCalls(t, "GetConnection", 2)
|
|
})
|
|
|
|
t.Run("connection closed sender nil", func(t *testing.T) {
|
|
sender := &amqpSenderMock{}
|
|
sender.On("Send", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
|
|
|
session := &amqpSessionMock{}
|
|
session.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(sender, nil)
|
|
|
|
connection := newActiveConnection()
|
|
conn := connection.Conn.(*amqpConnMock)
|
|
conn.On("NewSession", mock.Anything, mock.Anything).Return(session, nil)
|
|
|
|
pool := &connectionPoolMock{}
|
|
pool.On("GetConnection", mock.Anything).Return(connection, nil)
|
|
|
|
closedConnection := newClosedConnection()
|
|
closedConnMock := closedConnection.Conn.(*amqpConnMock)
|
|
amqpApi := &AmqpApi{
|
|
connection: closedConnection,
|
|
connectionPool: pool,
|
|
connectionPoolHandle: &internalMessaging.ConnectionPoolHandle{ConnectionOffset: 0},
|
|
senderCache: make(map[string]*internalMessaging.AmqpSenderSession),
|
|
}
|
|
|
|
err := amqpApi.Send(context.Background(), "topic://some-topic", []byte("data"), "application/json", make(map[string]any))
|
|
assert.NoError(t, err)
|
|
|
|
sender.AssertNumberOfCalls(t, "Send", 1)
|
|
session.AssertNumberOfCalls(t, "NewSender", 1)
|
|
pool.AssertNumberOfCalls(t, "GetConnection", 2)
|
|
closedConnMock.AssertNumberOfCalls(t, "Done", 1)
|
|
})
|
|
|
|
t.Run("connection nil get connection fail", func(t *testing.T) {
|
|
var connection *internalMessaging.AmqpConnection = nil
|
|
|
|
pool := &connectionPoolMock{}
|
|
pool.On("GetConnection", mock.Anything).Return(connection, errors.New("connection error"))
|
|
|
|
amqpApi := &AmqpApi{
|
|
connectionPool: pool,
|
|
connectionPoolHandle: &internalMessaging.ConnectionPoolHandle{ConnectionOffset: 0},
|
|
senderCache: make(map[string]*internalMessaging.AmqpSenderSession),
|
|
}
|
|
|
|
err := amqpApi.Send(context.Background(), "topic://some-topic", []byte("data"), "application/json", make(map[string]any))
|
|
assert.EqualError(t, err, "get connection: connection error")
|
|
|
|
pool.AssertNumberOfCalls(t, "GetConnection", 2)
|
|
})
|
|
|
|
t.Run("connection active sender nil", func(t *testing.T) {
|
|
sender := &amqpSenderMock{}
|
|
sender.On("Send", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
|
|
|
session := &amqpSessionMock{}
|
|
session.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(sender, nil)
|
|
|
|
connection := newActiveConnection()
|
|
conn := connection.Conn.(*amqpConnMock)
|
|
conn.On("NewSession", mock.Anything, mock.Anything).Return(session, nil)
|
|
|
|
amqpApi := &AmqpApi{
|
|
connection: connection,
|
|
senderCache: make(map[string]*internalMessaging.AmqpSenderSession),
|
|
}
|
|
|
|
err := amqpApi.Send(context.Background(), "topic://some-topic", []byte("data"), "application/json", make(map[string]any))
|
|
assert.NoError(t, err)
|
|
|
|
sender.AssertNumberOfCalls(t, "Send", 1)
|
|
session.AssertNumberOfCalls(t, "NewSender", 1)
|
|
})
|
|
|
|
t.Run("connection active new sender fail", func(t *testing.T) {
|
|
var sender *amqpSenderMock = nil
|
|
|
|
session := &amqpSessionMock{}
|
|
session.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(sender, errors.New("new sender error"))
|
|
session.On("Close", mock.Anything).Return(nil)
|
|
|
|
connection := newActiveConnection()
|
|
conn := connection.Conn.(*amqpConnMock)
|
|
conn.On("NewSession", mock.Anything, mock.Anything).Return(session, nil)
|
|
|
|
amqpApi := &AmqpApi{
|
|
connection: connection,
|
|
senderCache: make(map[string]*internalMessaging.AmqpSenderSession),
|
|
}
|
|
|
|
err := amqpApi.Send(context.Background(), "topic://some-topic", []byte("data"), "application/json", make(map[string]any))
|
|
assert.EqualError(t, err, "new sender: new internal sender: new sender error")
|
|
|
|
session.AssertNumberOfCalls(t, "NewSender", 1)
|
|
session.AssertNumberOfCalls(t, "Close", 1)
|
|
})
|
|
|
|
t.Run("connection active sender set", func(t *testing.T) {
|
|
sender := &amqpSenderMock{}
|
|
sender.On("Send", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
|
|
|
topic := "topic://some-topic"
|
|
amqpApi := &AmqpApi{
|
|
connection: newActiveConnection(),
|
|
senderCache: map[string]*internalMessaging.AmqpSenderSession{topic: {Sender: sender}},
|
|
}
|
|
|
|
err := amqpApi.Send(context.Background(), topic, []byte("data"), "application/json", make(map[string]any))
|
|
assert.NoError(t, err)
|
|
|
|
sender.AssertNumberOfCalls(t, "Send", 1)
|
|
})
|
|
|
|
t.Run("send fail", func(t *testing.T) {
|
|
sender := &amqpSenderMock{}
|
|
sender.On("Send", mock.Anything, mock.Anything, mock.Anything).Return(errors.New("send error"))
|
|
|
|
session := &amqpSessionMock{}
|
|
session.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(sender, nil)
|
|
|
|
topic := "topic://some-topic"
|
|
connection := newActiveConnection()
|
|
connection.Conn.(*amqpConnMock).On("NewSession", mock.Anything, mock.Anything, mock.Anything).Return(session, nil)
|
|
amqpApi := &AmqpApi{
|
|
connection: connection,
|
|
senderCache: map[string]*internalMessaging.AmqpSenderSession{topic: {Sender: sender}},
|
|
}
|
|
|
|
err := amqpApi.Send(context.Background(), topic, []byte("data"), "application/json", make(map[string]any))
|
|
assert.EqualError(t, err, "send: send error\nretry send: send error")
|
|
|
|
sender.AssertNumberOfCalls(t, "Send", 2)
|
|
})
|
|
}
|
|
|
|
func Test_AmqpMessagingApi_Close(t *testing.T) {
|
|
|
|
t.Run("close without cached senders", func(t *testing.T) {
|
|
pool := &connectionPoolMock{}
|
|
pool.On("Close").Return(nil)
|
|
|
|
amqpApi := &AmqpApi{
|
|
connectionPool: pool,
|
|
connectionPoolHandle: &internalMessaging.ConnectionPoolHandle{ConnectionOffset: 0},
|
|
senderCache: make(map[string]*internalMessaging.AmqpSenderSession),
|
|
}
|
|
|
|
err := amqpApi.Close(context.Background())
|
|
assert.NoError(t, err)
|
|
|
|
pool.AssertNumberOfCalls(t, "Close", 1)
|
|
})
|
|
|
|
t.Run("close fail without cached senders", func(t *testing.T) {
|
|
pool := &connectionPoolMock{}
|
|
pool.On("Close").Return(errors.New("close error"))
|
|
|
|
amqpApi := &AmqpApi{
|
|
connectionPool: pool,
|
|
connectionPoolHandle: &internalMessaging.ConnectionPoolHandle{ConnectionOffset: 0},
|
|
senderCache: make(map[string]*internalMessaging.AmqpSenderSession),
|
|
}
|
|
|
|
err := amqpApi.Close(context.Background())
|
|
assert.EqualError(t, err, "close: close pool: close error")
|
|
|
|
pool.AssertNumberOfCalls(t, "Close", 1)
|
|
})
|
|
|
|
t.Run("close with cached senders", func(t *testing.T) {
|
|
pool := &connectionPoolMock{}
|
|
pool.On("Close").Return(nil)
|
|
|
|
session := &amqpSessionMock{}
|
|
session.On("Close", mock.Anything).Return(nil)
|
|
sender := &amqpSenderMock{}
|
|
sender.On("Close", mock.Anything).Return(nil)
|
|
senderSession := &internalMessaging.AmqpSenderSession{
|
|
Session: session,
|
|
Sender: sender,
|
|
}
|
|
|
|
amqpApi := &AmqpApi{
|
|
connectionPool: pool,
|
|
connectionPoolHandle: &internalMessaging.ConnectionPoolHandle{ConnectionOffset: 0},
|
|
senderCache: map[string]*internalMessaging.AmqpSenderSession{"key": senderSession},
|
|
}
|
|
|
|
err := amqpApi.Close(context.Background())
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, 0, len(amqpApi.senderCache))
|
|
|
|
pool.AssertNumberOfCalls(t, "Close", 1)
|
|
session.AssertNumberOfCalls(t, "Close", 1)
|
|
sender.AssertNumberOfCalls(t, "Close", 1)
|
|
})
|
|
|
|
t.Run("close fail with cached senders", func(t *testing.T) {
|
|
pool := &connectionPoolMock{}
|
|
pool.On("Close").Return(nil)
|
|
|
|
session := &amqpSessionMock{}
|
|
session.On("Close", mock.Anything).Return(nil)
|
|
sender := &amqpSenderMock{}
|
|
sender.On("Close", mock.Anything).Return(errors.New("close sender error"))
|
|
senderSession := &internalMessaging.AmqpSenderSession{
|
|
Session: session,
|
|
Sender: sender,
|
|
}
|
|
|
|
amqpApi := &AmqpApi{
|
|
connectionPool: pool,
|
|
connectionPoolHandle: &internalMessaging.ConnectionPoolHandle{ConnectionOffset: 0},
|
|
senderCache: map[string]*internalMessaging.AmqpSenderSession{"key": senderSession},
|
|
}
|
|
|
|
err := amqpApi.Close(context.Background())
|
|
assert.EqualError(t, err, "close: close session: close sender error")
|
|
assert.Equal(t, 0, len(amqpApi.senderCache))
|
|
|
|
pool.AssertNumberOfCalls(t, "Close", 1)
|
|
session.AssertNumberOfCalls(t, "Close", 1)
|
|
sender.AssertNumberOfCalls(t, "Close", 1)
|
|
})
|
|
|
|
t.Run("close fail", func(t *testing.T) {
|
|
pool := &connectionPoolMock{}
|
|
pool.On("Close").Return(errors.New("close pool error"))
|
|
|
|
session := &amqpSessionMock{}
|
|
session.On("Close", mock.Anything).Return(errors.New("close session error"))
|
|
sender := &amqpSenderMock{}
|
|
sender.On("Close", mock.Anything).Return(errors.New("close sender error"))
|
|
senderSession := &internalMessaging.AmqpSenderSession{
|
|
Session: session,
|
|
Sender: sender,
|
|
}
|
|
|
|
amqpApi := &AmqpApi{
|
|
connectionPool: pool,
|
|
connectionPoolHandle: &internalMessaging.ConnectionPoolHandle{ConnectionOffset: 0},
|
|
senderCache: map[string]*internalMessaging.AmqpSenderSession{"key": senderSession},
|
|
}
|
|
|
|
err := amqpApi.Close(context.Background())
|
|
assert.EqualError(t, err, "close: close session: close sender error\nclose session error\nclose pool: close pool error")
|
|
assert.Equal(t, 0, len(amqpApi.senderCache))
|
|
|
|
pool.AssertNumberOfCalls(t, "Close", 1)
|
|
session.AssertNumberOfCalls(t, "Close", 1)
|
|
sender.AssertNumberOfCalls(t, "Close", 1)
|
|
})
|
|
}
|