mirror of
https://dev.azure.com/schwarzit/schwarzit.stackit-public/_git/audit-go
synced 2026-02-08 00:57:24 +00:00
309 lines
8.6 KiB
Go
309 lines
8.6 KiB
Go
package messaging
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"testing"
|
|
|
|
"github.com/Azure/go-amqp"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/mock"
|
|
|
|
pkgCommon "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/messaging/common"
|
|
)
|
|
|
|
type amqpConnMock struct {
|
|
mock.Mock
|
|
}
|
|
|
|
func (m *amqpConnMock) Done() <-chan struct{} {
|
|
args := m.Called()
|
|
return args.Get(0).(<-chan struct{})
|
|
}
|
|
|
|
func (m *amqpConnMock) NewSession(ctx context.Context, opts *amqp.SessionOptions) (AmqpSession, error) {
|
|
args := m.Called(ctx, opts)
|
|
return args.Get(0).(AmqpSession), args.Error(1)
|
|
}
|
|
|
|
func (m *amqpConnMock) Close() error {
|
|
args := m.Called()
|
|
return args.Error(0)
|
|
}
|
|
|
|
var _ AmqpConn = (*amqpConnMock)(nil)
|
|
|
|
type amqpDialMock struct {
|
|
mock.Mock
|
|
}
|
|
|
|
func (m *amqpDialMock) Dial(ctx context.Context, addr string, opts *amqp.ConnOptions) (AmqpConn, error) {
|
|
args := m.Called(ctx, addr, opts)
|
|
return args.Get(0).(AmqpConn), args.Error(1)
|
|
}
|
|
|
|
var _ amqpDial = (*amqpDialMock)(nil)
|
|
|
|
type amqpSessionMock struct {
|
|
mock.Mock
|
|
}
|
|
|
|
func (m *amqpSessionMock) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error) {
|
|
args := m.Called(ctx, target, opts)
|
|
return args.Get(0).(AmqpSender), args.Error(1)
|
|
}
|
|
|
|
func (m *amqpSessionMock) Close(ctx context.Context) error {
|
|
args := m.Called(ctx)
|
|
return args.Error(0)
|
|
}
|
|
|
|
var _ AmqpSession = (*amqpSessionMock)(nil)
|
|
|
|
func Test_AmqpConnection_IsClosed(t *testing.T) {
|
|
connection := &AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
}
|
|
|
|
channelReceiver := func(channel chan struct{}) <-chan struct{} {
|
|
return channel
|
|
}
|
|
|
|
t.Run("is closed - connection nil", func(t *testing.T) {
|
|
assert.True(t, connection.IsClosed())
|
|
})
|
|
|
|
t.Run("is closed", func(t *testing.T) {
|
|
channel := make(chan struct{})
|
|
close(channel)
|
|
amqpConnMock := &amqpConnMock{}
|
|
amqpConnMock.On("Done").Return(channelReceiver(channel))
|
|
connection.Conn = amqpConnMock
|
|
|
|
assert.True(t, connection.IsClosed())
|
|
})
|
|
|
|
t.Run("is not closed", func(t *testing.T) {
|
|
channel := make(chan struct{})
|
|
amqpConnMock := &amqpConnMock{}
|
|
amqpConnMock.On("Done").Return(channelReceiver(channel))
|
|
connection.Conn = amqpConnMock
|
|
|
|
assert.False(t, connection.IsClosed())
|
|
})
|
|
}
|
|
|
|
func Test_AmqpConnection_Close(t *testing.T) {
|
|
connection := &AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
}
|
|
|
|
t.Run("already closed", func(t *testing.T) {
|
|
assert.NoError(t, connection.Close())
|
|
})
|
|
|
|
t.Run("close error", func(t *testing.T) {
|
|
err := errors.New("test error")
|
|
|
|
amqpConnMock := &amqpConnMock{}
|
|
amqpConnMock.On("Close").Return(err)
|
|
connection.Conn = amqpConnMock
|
|
|
|
assert.EqualError(t, connection.Close(), "internal close: connection close: test error")
|
|
assert.NotNil(t, connection.Conn)
|
|
amqpConnMock.AssertNumberOfCalls(t, "Close", 1)
|
|
})
|
|
|
|
t.Run("close without error", func(t *testing.T) {
|
|
amqpConnMock := &amqpConnMock{}
|
|
amqpConnMock.On("Close").Return(nil)
|
|
connection.Conn = amqpConnMock
|
|
|
|
assert.Nil(t, connection.Close())
|
|
assert.Nil(t, connection.Conn)
|
|
amqpConnMock.AssertNumberOfCalls(t, "Close", 1)
|
|
})
|
|
}
|
|
|
|
func Test_AmqpConnection_Connect(t *testing.T) {
|
|
connection := &AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
}
|
|
|
|
t.Run("already connected", func(t *testing.T) {
|
|
connection.Conn = &amqpConnMock{}
|
|
assert.NoError(t, connection.Connect())
|
|
})
|
|
|
|
t.Run("dial error", func(t *testing.T) {
|
|
connection.Conn = nil
|
|
connection.Username = "user"
|
|
connection.Password = "pass"
|
|
|
|
amqpDialMock := &amqpDialMock{}
|
|
var c *amqpConnMock = nil
|
|
amqpDialMock.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(c, errors.New("test error"))
|
|
connection.Dialer = amqpDialMock
|
|
|
|
assert.EqualError(t, connection.Connect(), "internal connect: dial: test error")
|
|
assert.Nil(t, connection.Conn)
|
|
})
|
|
|
|
t.Run("connect without error", func(t *testing.T) {
|
|
connection.Conn = nil
|
|
|
|
amqpDialMock := &amqpDialMock{}
|
|
amqpConn := &amqpConnMock{}
|
|
amqpDialMock.On("Dial", mock.Anything, mock.Anything, mock.Anything).Return(amqpConn, nil)
|
|
connection.Dialer = amqpDialMock
|
|
|
|
assert.NoError(t, connection.Connect())
|
|
assert.Equal(t, amqpConn, connection.Conn)
|
|
})
|
|
}
|
|
|
|
func Test_AmqpConnection_NewSender(t *testing.T) {
|
|
connection := &AmqpConnection{
|
|
ConnectionName: "test",
|
|
Lock: sync.RWMutex{},
|
|
}
|
|
|
|
channelReceiver := func(channel chan struct{}) <-chan struct{} {
|
|
return channel
|
|
}
|
|
|
|
t.Run("connection not initialized", func(t *testing.T) {
|
|
sender, err := connection.NewSender(context.Background(), "topic")
|
|
assert.EqualError(t, err, "connection is not initialized")
|
|
assert.Nil(t, sender)
|
|
})
|
|
|
|
t.Run("connection is closed", func(t *testing.T) {
|
|
channel := make(chan struct{})
|
|
close(channel)
|
|
|
|
conn := &amqpConnMock{}
|
|
conn.On("Done").Return(channelReceiver(channel))
|
|
connection.Conn = conn
|
|
|
|
sender, err := connection.NewSender(context.Background(), "topic")
|
|
assert.EqualError(t, err, "amqp connection is closed")
|
|
assert.Nil(t, sender)
|
|
})
|
|
|
|
t.Run("session error", func(t *testing.T) {
|
|
channel := make(chan struct{})
|
|
|
|
var session *amqpSessionMock = nil
|
|
conn := &amqpConnMock{}
|
|
conn.On("NewSession", mock.Anything, mock.Anything).Return(session, errors.New("test error"))
|
|
conn.On("Done").Return(channelReceiver(channel))
|
|
connection.Conn = conn
|
|
|
|
sender, err := connection.NewSender(context.Background(), "topic")
|
|
assert.EqualError(t, err, "new session: test error")
|
|
assert.Nil(t, sender)
|
|
})
|
|
|
|
t.Run("sender error", func(t *testing.T) {
|
|
channel := make(chan struct{})
|
|
|
|
sessionMock := &amqpSessionMock{}
|
|
var amqpSender *amqp.Sender = nil
|
|
sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(amqpSender, errors.New("test error"))
|
|
sessionMock.On("Close", mock.Anything).Return(nil)
|
|
|
|
conn := &amqpConnMock{}
|
|
conn.On("Done").Return(channelReceiver(channel))
|
|
conn.On("NewSession", mock.Anything, mock.Anything).Return(sessionMock, nil)
|
|
connection.Conn = conn
|
|
|
|
sender, err := connection.NewSender(context.Background(), "topic")
|
|
assert.EqualError(t, err, "new internal sender: test error")
|
|
assert.Nil(t, sender)
|
|
})
|
|
|
|
t.Run("session close error", func(t *testing.T) {
|
|
channel := make(chan struct{})
|
|
|
|
sessionMock := &amqpSessionMock{}
|
|
var amqpSender *amqp.Sender = nil
|
|
sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(amqpSender, errors.New("test error"))
|
|
sessionMock.On("Close", mock.Anything).Return(errors.New("close error"))
|
|
|
|
conn := &amqpConnMock{}
|
|
conn.On("Done").Return(channelReceiver(channel))
|
|
conn.On("NewSession", mock.Anything, mock.Anything).Return(sessionMock, nil)
|
|
connection.Conn = conn
|
|
|
|
sender, err := connection.NewSender(context.Background(), "topic")
|
|
assert.EqualError(t, err, "new internal sender: test error\nclose session: close error")
|
|
assert.Nil(t, sender)
|
|
})
|
|
|
|
t.Run("get sender", func(t *testing.T) {
|
|
channel := make(chan struct{})
|
|
|
|
amqpSender := &amqp.Sender{}
|
|
sessionMock := &amqpSessionMock{}
|
|
sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(amqpSender, nil)
|
|
|
|
conn := &amqpConnMock{}
|
|
conn.On("Done").Return(channelReceiver(channel))
|
|
conn.On("NewSession", mock.Anything, mock.Anything).Return(sessionMock, nil)
|
|
connection.Conn = conn
|
|
|
|
sender, err := connection.NewSender(context.Background(), "topic")
|
|
assert.NoError(t, err)
|
|
assert.NotNil(t, sender)
|
|
assert.Equal(t, amqpSender, sender.Sender)
|
|
assert.Equal(t, sessionMock, sender.Session)
|
|
})
|
|
}
|
|
|
|
func Test_AmqpConnection_NewAmqpConnection(t *testing.T) {
|
|
config := pkgCommon.AmqpConnectionConfig{
|
|
BrokerUrl: "brokerUrl",
|
|
Username: "username",
|
|
Password: "password",
|
|
}
|
|
connection := NewAmqpConnection(config, "connectionName")
|
|
assert.NotNil(t, connection)
|
|
assert.Equal(t, connection.ConnectionName, "connectionName")
|
|
assert.Equal(t, connection.BrokerUrl, "brokerUrl")
|
|
assert.Equal(t, connection.Username, "username")
|
|
assert.Equal(t, connection.Password, "password")
|
|
assert.NotNil(t, connection.Dialer)
|
|
}
|
|
|
|
func Test_As(t *testing.T) {
|
|
|
|
t.Run("error", func(t *testing.T) {
|
|
value, err := As[amqp.Message](nil, errors.New("test error"))
|
|
assert.EqualError(t, err, "test error")
|
|
assert.Nil(t, value)
|
|
})
|
|
|
|
t.Run("value nil", func(t *testing.T) {
|
|
value, err := As[amqp.Message](nil, nil)
|
|
assert.NoError(t, err)
|
|
assert.Nil(t, value)
|
|
})
|
|
|
|
t.Run("value not not type", func(t *testing.T) {
|
|
value, err := As[amqp.Message](struct{}{}, nil)
|
|
assert.EqualError(t, err, "could not cast value: struct {}")
|
|
assert.Nil(t, value)
|
|
})
|
|
|
|
t.Run("cast", func(t *testing.T) {
|
|
var sessionAny any = &amqpSessionMock{}
|
|
value, err := As[amqpSessionMock](sessionAny, nil)
|
|
assert.NoError(t, err)
|
|
assert.NotNil(t, value)
|
|
})
|
|
}
|