package messaging import ( "context" "errors" "sync" "testing" "github.com/Azure/go-amqp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" pkgCommon "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/messaging/common" ) type amqpConnMock struct { mock.Mock } func (m *amqpConnMock) Done() <-chan struct{} { args := m.Called() return args.Get(0).(<-chan struct{}) } func (m *amqpConnMock) NewSession(ctx context.Context, opts *amqp.SessionOptions) (AmqpSession, error) { args := m.Called(ctx, opts) return args.Get(0).(AmqpSession), args.Error(1) } func (m *amqpConnMock) Close() error { args := m.Called() return args.Error(0) } var _ AmqpConn = (*amqpConnMock)(nil) type amqpDialMock struct { mock.Mock } func (m *amqpDialMock) Dial(ctx context.Context, addr string, opts *amqp.ConnOptions) (AmqpConn, error) { args := m.Called(ctx, addr, opts) return args.Get(0).(AmqpConn), args.Error(1) } var _ amqpDial = (*amqpDialMock)(nil) type amqpSessionMock struct { mock.Mock } func (m *amqpSessionMock) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error) { args := m.Called(ctx, target, opts) return args.Get(0).(AmqpSender), args.Error(1) } func (m *amqpSessionMock) Close(ctx context.Context) error { args := m.Called(ctx) return args.Error(0) } var _ AmqpSession = (*amqpSessionMock)(nil) func Test_AmqpConnection_IsClosed(t *testing.T) { connection := &AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, } channelReceiver := func(channel chan struct{}) <-chan struct{} { return channel } t.Run("is closed - connection nil", func(t *testing.T) { assert.True(t, connection.IsClosed()) }) t.Run("is closed", func(t *testing.T) { channel := make(chan struct{}) close(channel) amqpConnMock := &amqpConnMock{} amqpConnMock.On("Done").Return(channelReceiver(channel)) connection.Conn = amqpConnMock assert.True(t, connection.IsClosed()) }) t.Run("is not closed", func(t *testing.T) { channel := make(chan struct{}) amqpConnMock := &amqpConnMock{} amqpConnMock.On("Done").Return(channelReceiver(channel)) connection.Conn = amqpConnMock assert.False(t, connection.IsClosed()) }) } func Test_AmqpConnection_Close(t *testing.T) { connection := &AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, } t.Run("already closed", func(t *testing.T) { assert.NoError(t, connection.Close()) }) t.Run("close error", func(t *testing.T) { err := errors.New("test error") amqpConnMock := &amqpConnMock{} amqpConnMock.On("Close").Return(err) connection.Conn = amqpConnMock assert.EqualError(t, connection.Close(), "internal close: connection close: test error") assert.NotNil(t, connection.Conn) amqpConnMock.AssertNumberOfCalls(t, "Close", 1) }) t.Run("close without error", func(t *testing.T) { amqpConnMock := &amqpConnMock{} amqpConnMock.On("Close").Return(nil) connection.Conn = amqpConnMock assert.Nil(t, connection.Close()) assert.Nil(t, connection.Conn) amqpConnMock.AssertNumberOfCalls(t, "Close", 1) }) } func Test_AmqpConnection_Connect(t *testing.T) { connection := &AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, } t.Run("already connected", func(t *testing.T) { connection.Conn = &amqpConnMock{} assert.NoError(t, connection.Connect()) }) t.Run("dial error", func(t *testing.T) { connection.Conn = nil connection.Username = "user" connection.Password = "pass" amqpDialMock := &amqpDialMock{} var c *amqpConnMock = nil amqpDialMock.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(c, errors.New("test error")) connection.Dialer = amqpDialMock assert.EqualError(t, connection.Connect(), "internal connect: dial: test error") assert.Nil(t, connection.Conn) }) t.Run("connect without error", func(t *testing.T) { connection.Conn = nil amqpDialMock := &amqpDialMock{} amqpConn := &amqpConnMock{} amqpDialMock.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(amqpConn, nil) connection.Dialer = amqpDialMock assert.NoError(t, connection.Connect()) assert.Equal(t, amqpConn, connection.Conn) }) } func Test_AmqpConnection_NewSender(t *testing.T) { connection := &AmqpConnection{ ConnectionName: "test", Lock: sync.RWMutex{}, } channelReceiver := func(channel chan struct{}) <-chan struct{} { return channel } t.Run("connection not initialized", func(t *testing.T) { sender, err := connection.NewSender(context.Background(), "topic") assert.EqualError(t, err, "connection is not initialized") assert.Nil(t, sender) }) t.Run("connection is closed", func(t *testing.T) { channel := make(chan struct{}) close(channel) conn := &amqpConnMock{} conn.On("Done").Return(channelReceiver(channel)) connection.Conn = conn sender, err := connection.NewSender(context.Background(), "topic") assert.EqualError(t, err, "amqp connection is closed") assert.Nil(t, sender) }) t.Run("session error", func(t *testing.T) { channel := make(chan struct{}) var session *amqpSessionMock = nil conn := &amqpConnMock{} conn.On("NewSession", mock.Anything, mock.Anything).Return(session, errors.New("test error")) conn.On("Done").Return(channelReceiver(channel)) connection.Conn = conn sender, err := connection.NewSender(context.Background(), "topic") assert.EqualError(t, err, "new session: test error") assert.Nil(t, sender) }) t.Run("sender error", func(t *testing.T) { channel := make(chan struct{}) sessionMock := &amqpSessionMock{} var amqpSender *amqp.Sender = nil sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(amqpSender, errors.New("test error")) sessionMock.On("Close", mock.Anything).Return(nil) conn := &amqpConnMock{} conn.On("Done").Return(channelReceiver(channel)) conn.On("NewSession", mock.Anything, mock.Anything).Return(sessionMock, nil) connection.Conn = conn sender, err := connection.NewSender(context.Background(), "topic") assert.EqualError(t, err, "new internal sender: test error") assert.Nil(t, sender) }) t.Run("session close error", func(t *testing.T) { channel := make(chan struct{}) sessionMock := &amqpSessionMock{} var amqpSender *amqp.Sender = nil sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(amqpSender, errors.New("test error")) sessionMock.On("Close", mock.Anything).Return(errors.New("close error")) conn := &amqpConnMock{} conn.On("Done").Return(channelReceiver(channel)) conn.On("NewSession", mock.Anything, mock.Anything).Return(sessionMock, nil) connection.Conn = conn sender, err := connection.NewSender(context.Background(), "topic") assert.EqualError(t, err, "new internal sender: test error\nclose session: close error") assert.Nil(t, sender) }) t.Run("get sender", func(t *testing.T) { channel := make(chan struct{}) amqpSender := &amqp.Sender{} sessionMock := &amqpSessionMock{} sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(amqpSender, nil) conn := &amqpConnMock{} conn.On("Done").Return(channelReceiver(channel)) conn.On("NewSession", mock.Anything, mock.Anything).Return(sessionMock, nil) connection.Conn = conn sender, err := connection.NewSender(context.Background(), "topic") assert.NoError(t, err) assert.NotNil(t, sender) assert.Equal(t, amqpSender, sender.Sender) assert.Equal(t, sessionMock, sender.Session) }) } func Test_AmqpConnection_NewAmqpConnection(t *testing.T) { config := pkgCommon.AmqpConnectionConfig{ BrokerUrl: "brokerUrl", Username: "username", Password: "password", } connection := NewAmqpConnection(config, "connectionName") assert.NotNil(t, connection) assert.Equal(t, connection.ConnectionName, "connectionName") assert.Equal(t, connection.BrokerUrl, "brokerUrl") assert.Equal(t, connection.Username, "username") assert.Equal(t, connection.Password, "password") assert.NotNil(t, connection.Dialer) } func Test_As(t *testing.T) { t.Run("error", func(t *testing.T) { value, err := As[amqp.Message](nil, errors.New("test error")) assert.EqualError(t, err, "test error") assert.Nil(t, value) }) t.Run("value nil", func(t *testing.T) { value, err := As[amqp.Message](nil, nil) assert.NoError(t, err) assert.Nil(t, value) }) t.Run("value not not type", func(t *testing.T) { value, err := As[amqp.Message](struct{}{}, nil) assert.EqualError(t, err, "could not cast value: struct {}") assert.Nil(t, value) }) t.Run("cast", func(t *testing.T) { var sessionAny any = &amqpSessionMock{} value, err := As[amqpSessionMock](sessionAny, nil) assert.NoError(t, err) assert.NotNil(t, value) }) }