Compare commits

...

2 commits

Author SHA1 Message Date
Christian Schaible
dfbc4cdf93 fix: Fix usage of RWMutex in AmqpApi and wrap errors 2025-01-14 07:27:43 +01:00
Christian Schaible
1aa9ff9eec feat: replace amqp connection 2025-01-13 16:42:12 +01:00
11 changed files with 845 additions and 318 deletions

View file

@ -30,7 +30,11 @@ func TestDynamicLegacyAuditApi(t *testing.T) {
defer solaceContainer.Stop() defer solaceContainer.Stop()
// Instantiate the messaging api // Instantiate the messaging api
messagingApi, err := messaging.NewAmqpApi(messaging.AmqpConfig{URL: solaceContainer.AmqpConnectionString}) messagingApi, err := messaging.NewAmqpApi(
messaging.AmqpConnectionPoolConfig{
Parameters: messaging.AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString},
PoolSize: 1,
})
assert.NoError(t, err) assert.NoError(t, err)
// Validator // Validator

View file

@ -32,7 +32,10 @@ func TestLegacyAuditApi(t *testing.T) {
defer solaceContainer.Stop() defer solaceContainer.Stop()
// Instantiate the messaging api // Instantiate the messaging api
messagingApi, err := messaging.NewAmqpApi(messaging.AmqpConfig{URL: solaceContainer.AmqpConnectionString}) messagingApi, err := messaging.NewAmqpApi(messaging.AmqpConnectionPoolConfig{
Parameters: messaging.AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString},
PoolSize: 1,
})
assert.NoError(t, err) assert.NoError(t, err)
// Validator // Validator
@ -579,7 +582,10 @@ func TestLegacyAuditApi_NewLegacyAuditApi(t *testing.T) {
defer solaceContainer.Stop() defer solaceContainer.Stop()
// Instantiate the messaging api // Instantiate the messaging api
messagingApi, err := messaging.NewAmqpApi(messaging.AmqpConfig{URL: solaceContainer.AmqpConnectionString}) messagingApi, err := messaging.NewAmqpApi(messaging.AmqpConnectionPoolConfig{
Parameters: messaging.AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString},
PoolSize: 1,
})
assert.NoError(t, err) assert.NoError(t, err)
// Validator // Validator

View file

@ -33,7 +33,10 @@ func TestRoutableAuditApi(t *testing.T) {
defer solaceContainer.Stop() defer solaceContainer.Stop()
// Instantiate the messaging api // Instantiate the messaging api
messagingApi, err := messaging.NewAmqpApi(messaging.AmqpConfig{URL: solaceContainer.AmqpConnectionString}) messagingApi, err := messaging.NewAmqpApi(messaging.AmqpConnectionPoolConfig{
Parameters: messaging.AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString},
PoolSize: 1,
})
assert.NoError(t, err) assert.NoError(t, err)
// Validator // Validator

View file

@ -0,0 +1,16 @@
package messaging
const AmqpTopicPrefix = "topic://"
const connectionTimeoutSeconds = 10
const disconnectionTimeoutSeconds = 10
type AmqpConnectionConfig struct {
BrokerUrl string `json:"brokerUrl"`
Username string `json:"username"`
Password string `json:"password"`
}
type AmqpConnectionPoolConfig struct {
Parameters AmqpConnectionConfig `json:"parameters"`
PoolSize int `json:"poolSize"`
}

View file

@ -0,0 +1,241 @@
package messaging
import (
"context"
"errors"
"fmt"
"github.com/Azure/go-amqp"
"log/slog"
"sync"
"time"
)
var ConnectionClosedError = errors.New("amqp connection is closed")
type AmqpConnection struct {
connectionName string
lock sync.RWMutex
brokerUrl string
username string
password string
conn *amqp.Conn
}
func NewAmqpConnection(config *AmqpConnectionConfig, connectionName string) *AmqpConnection {
return &AmqpConnection{
connectionName: connectionName,
lock: sync.RWMutex{},
brokerUrl: config.BrokerUrl,
username: config.Username,
password: config.Password,
}
}
func (c *AmqpConnection) NewReceiver(ctx context.Context, source string) (*AmqpReceiverSession, error) {
if c.conn == nil {
return nil, errors.New("connection is not initialized")
}
if c.internalIsClosed() {
return nil, ConnectionClosedError
}
c.lock.RLock()
defer c.lock.RUnlock()
// new session
session, err := c.conn.NewSession(ctx, nil)
if err != nil {
return nil, fmt.Errorf("new session: %w", err)
}
// new receiver
receiver, err := session.NewReceiver(ctx, source, nil)
if err != nil {
err = fmt.Errorf("new internal receiver: %w", err)
closeErr := session.Close(ctx)
if closeErr != nil {
return nil, errors.Join(fmt.Errorf("close session: %w", err), err)
}
return nil, err
}
return &AmqpReceiverSession{session, receiver}, nil
}
func (c *AmqpConnection) NewSender(ctx context.Context, topic string) (*AmqpSenderSession, error) {
if c.conn == nil {
return nil, errors.New("connection is not initialized")
}
if c.internalIsClosed() {
return nil, ConnectionClosedError
}
c.lock.RLock()
defer c.lock.RUnlock()
// new session
newSession, err := c.conn.NewSession(ctx, nil)
if err != nil {
return nil, fmt.Errorf("new session: %w", err)
}
// new sender
newSender, err := newSession.NewSender(ctx, topic, nil)
if err != nil {
err = fmt.Errorf("new internal sender: %w", err)
closeErr := newSession.Close(ctx)
if closeErr != nil {
return nil, errors.Join(fmt.Errorf("close session: %w", err), err)
}
return nil, err
}
return &AmqpSenderSession{newSession, newSender}, nil
}
func (c *AmqpConnection) ResetConnectionAndRetryIfErrorWithReturnValue(opName string, fn func(ctx context.Context) (any, error)) (any, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
result, err := fn(ctx)
cancelFn()
if err != nil {
slog.Info(fmt.Sprintf("amqp connection: %s", opName), slog.Any("connection", c.connectionName), slog.Any("err", err))
err := c.ResetConnection(context.Background())
if err != nil {
return nil, fmt.Errorf("reset connection: %w", err)
}
newCtx, closeFn := context.WithTimeout(context.Background(), 10*time.Second)
defer closeFn()
// Retry
return fn(newCtx)
}
return result, nil
}
func (c *AmqpConnection) ResetConnection(ctx context.Context) error {
c.lock.Lock()
defer c.lock.Unlock()
subCtx, cancel := context.WithTimeout(ctx, disconnectionTimeoutSeconds*time.Second)
err := c.internalClose(subCtx)
cancel()
if err != nil {
slog.Warn("amqp connection: reset: failed to close amqp connection", slog.Any("err", err))
}
subCtx, cancel = context.WithTimeout(ctx, connectionTimeoutSeconds*time.Second)
err = c.internalConnect(subCtx)
cancel()
if err != nil {
slog.Warn("amqp connection: reset: failed to connect to amqp server, retry..", slog.Any("err", err))
subCtx, cancel = context.WithTimeout(ctx, connectionTimeoutSeconds*time.Second)
err = c.internalConnect(subCtx)
cancel()
if err != nil {
return fmt.Errorf("connect: %w", err)
}
}
return nil
}
func As[T any](value any, err error) (*T, error) {
if err != nil {
return nil, err
}
if value == nil {
return nil, nil
}
castedValue, isType := value.(*T)
if !isType {
return nil, fmt.Errorf("could not cast value: %T", value)
}
return castedValue, nil
}
func (c *AmqpConnection) Connect(ctx context.Context) error {
c.lock.Lock()
defer c.lock.Unlock()
subCtx, cancel := context.WithTimeout(ctx, connectionTimeoutSeconds*time.Second)
defer cancel()
return c.internalConnect(subCtx)
}
func (c *AmqpConnection) internalConnect(ctx context.Context) error {
if c.conn == nil {
// Set credentials if specified
auth := amqp.SASLTypeAnonymous()
if c.username != "" && c.password != "" {
auth = amqp.SASLTypePlain(c.username, c.password)
} else {
slog.Debug("amqp connection: connect: using anonymous messaging")
}
options := &amqp.ConnOptions{
SASLType: auth,
}
// Initialize connection
conn, err := amqp.Dial(ctx, c.brokerUrl, options)
if err != nil {
return fmt.Errorf("dial: %w", err)
}
c.conn = conn
}
return nil
}
func (c *AmqpConnection) Close(ctx context.Context) error {
c.lock.Lock()
defer c.lock.Unlock()
subCtx, cancel := context.WithTimeout(ctx, disconnectionTimeoutSeconds*time.Second)
defer cancel()
return c.internalClose(subCtx)
}
func (c *AmqpConnection) internalClose(ctx context.Context) error {
if c.conn != nil {
closeErrors := make([]error, 0)
closeErrors = c.internalCloseConnection(closeErrors)
// return errors or nil
if len(closeErrors) > 0 {
return errors.Join(closeErrors...)
}
}
return nil
}
func (c *AmqpConnection) internalCloseConnection(closeErrors []error) []error {
err := c.conn.Close()
if err != nil {
closeErrors = append(closeErrors, err)
}
return closeErrors
}
func (c *AmqpConnection) IsClosed() bool {
c.lock.RLock()
defer c.lock.RUnlock()
return c.internalIsClosed()
}
func (c *AmqpConnection) internalIsClosed() bool {
if c.conn == nil {
return true
}
select {
case <-c.conn.Done():
return true
default:
return false
}
}

View file

@ -0,0 +1,193 @@
package messaging
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
)
type AmqpConnectionPool struct {
config AmqpConnectionPoolConfig
connectionName string
connections []*AmqpConnection
handleOffset int
lock sync.RWMutex
}
type ConnectionPoolHandle struct {
connectionOffset int
}
func NewAmqpConnectionPool(ctx context.Context, config AmqpConnectionPoolConfig, connectionName string) (*AmqpConnectionPool, error) {
pool := &AmqpConnectionPool{
config: config,
connectionName: connectionName,
connections: make([]*AmqpConnection, 0),
handleOffset: 0,
lock: sync.RWMutex{},
}
if err := pool.initializeConnections(); err != nil {
if closeErr := pool.Close(ctx); closeErr != nil {
return nil, errors.Join(err, fmt.Errorf("initialize amqp connection: pool closed: %w", closeErr))
}
return nil, fmt.Errorf("initialize connections: %w", err)
}
return pool, nil
}
func (p *AmqpConnectionPool) initializeConnections() error {
if len(p.connections) < p.config.PoolSize {
p.lock.Lock()
defer p.lock.Unlock()
numMissingConnections := p.config.PoolSize - len(p.connections)
for i := 0; i < numMissingConnections; i++ {
if err := p.internalAddConnection(); err != nil {
return err
}
}
}
return nil
}
func (p *AmqpConnectionPool) internalAddConnection() error {
newConnection, err := p.internalNewConnection(context.Background())
if err != nil {
return fmt.Errorf("new connection: %w", err)
}
p.connections = append(p.connections, newConnection)
return nil
}
func (p *AmqpConnectionPool) internalNewConnection(ctx context.Context) (*AmqpConnection, error) {
conn := NewAmqpConnection(&p.config.Parameters, p.connectionName)
if err := conn.Connect(ctx); err != nil {
slog.Warn("amqp connection: failed to connect to amqp broker", slog.Any("err", err))
// retry
if err = conn.Connect(ctx); err != nil {
connectErr := fmt.Errorf("failed to connect to amqp broker: %w", err)
if closeErr := conn.Close(ctx); closeErr != nil {
return nil, errors.Join(connectErr, fmt.Errorf("close connection: %w", closeErr))
}
return nil, connectErr
}
}
return conn, nil
}
func (p *AmqpConnectionPool) Close(ctx context.Context) error {
p.lock.Lock()
defer p.lock.Unlock()
closeErrors := make([]error, 0)
for _, conn := range p.connections {
if err := conn.Close(ctx); err != nil {
closeErrors = append(closeErrors, fmt.Errorf("connection: close: %w", err))
}
}
if len(closeErrors) > 0 {
return errors.Join(closeErrors...)
}
p.connections = make([]*AmqpConnection, 0)
return nil
}
func (p *AmqpConnectionPool) NewHandle() *ConnectionPoolHandle {
p.lock.Lock()
defer p.lock.Unlock()
offset := p.handleOffset
p.handleOffset += 1
offset = offset % p.config.PoolSize
return &ConnectionPoolHandle{
connectionOffset: offset,
}
}
func (p *AmqpConnectionPool) GetConnection(handle *ConnectionPoolHandle) (*AmqpConnection, error) {
// get the requested connection or another one
conn, addConnection := p.nextConnectionFromQueue(handle)
// renew the requested connection if the request connection is closed
if conn == nil || addConnection {
p.lock.Lock()
// check that accessing the pool only with a valid index (out of bounds should only occur on shutdown)
if handle.connectionOffset < len(p.connections) && p.connections[handle.connectionOffset] == nil {
ctx, cancelFn := context.WithTimeout(context.Background(), connectionTimeoutSeconds)
connection, err := p.internalNewConnection(ctx)
cancelFn()
if err != nil {
p.lock.Unlock()
return nil, fmt.Errorf("renew connection: %w", err)
}
p.connections[handle.connectionOffset] = connection
}
p.lock.Unlock()
}
// return the previously returned connection if it is not nil
if conn != nil {
return conn, nil
}
// try to return the renewed connection or another one
conn, _ = p.nextConnectionFromQueue(handle)
if conn == nil {
return nil, errors.New("pool is empty")
}
return conn, nil
}
func (p *AmqpConnectionPool) nextConnectionFromQueue(handle *ConnectionPoolHandle) (*AmqpConnection, bool) {
// return the next possible index (including the retry offset)
nextIndex := func(idx int) int {
if idx+handle.connectionOffset >= p.config.PoolSize {
return idx + handle.connectionOffset - p.config.PoolSize
} else {
return idx + handle.connectionOffset
}
}
// retry as long as there are remaining connections in the pool
var conn *AmqpConnection
var addConnection bool
for i := 0; i < p.config.PoolSize; i++ {
// get the next possible connection (considering the retry index)
idx := nextIndex(i)
p.lock.RLock()
if idx < len(p.connections) {
conn = p.connections[idx]
} else {
// handle the edge case that the pool is empty on shutdown
conn = nil
}
p.lock.RUnlock()
// remember that the requested is closed, retry with the next
if conn == nil {
addConnection = true
continue
}
// if the connection is closed, mark it by setting it to nil
if conn.IsClosed() {
p.lock.Lock()
p.connections[idx] = nil
p.lock.Unlock()
continue
}
return conn, addConnection
}
return nil, true
}

View file

@ -0,0 +1,71 @@
package messaging
import (
"context"
"errors"
"github.com/Azure/go-amqp"
"log/slog"
"time"
)
type AmqpReceiverSession struct {
session *amqp.Session
receiver *amqp.Receiver
}
func (c *AmqpConnection) NewReceiverSession(source string) (*AmqpReceiverSession, error) {
if c.IsClosed() {
return nil, ConnectionClosedError
}
return As[AmqpReceiverSession](c.ResetConnectionAndRetryIfErrorWithReturnValue("newReceiverSession", func(ctx context.Context) (any, error) {
c.lock.RLock()
defer c.lock.RUnlock()
return c.NewReceiver(ctx, source)
}))
}
func (s *AmqpReceiverSession) AcceptMessage(message *amqp.Message) error {
ctx, cancelFn := context.WithTimeout(context.Background(), 1*time.Second)
defer cancelFn()
return s.receiver.AcceptMessage(ctx, message)
}
func (s *AmqpReceiverSession) Receive(opts *amqp.ReceiveOptions) (*amqp.Message, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancelFn()
message, err := s.receiver.Receive(ctx, opts)
// return nil if the timeout exceeded
if err != nil && errors.Is(err, context.DeadlineExceeded) {
return nil, nil
}
return message, err
}
func (s *AmqpReceiverSession) Close() error {
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFn()
var closeErrors []error
receiverErr := s.receiver.Close(ctx)
if receiverErr != nil {
closeErrors = append(closeErrors, receiverErr)
}
sessionErr := s.session.Close(ctx)
if sessionErr != nil {
closeErrors = append(closeErrors, sessionErr)
}
if len(closeErrors) > 0 {
return errors.Join(closeErrors...)
}
return nil
}
func (s *AmqpReceiverSession) CloseSilently() {
err := s.Close()
if err != nil {
slog.Error("error closing receiver session", slog.Any("err", err))
}
}

View file

@ -0,0 +1,94 @@
package messaging
import (
"context"
"errors"
"fmt"
"github.com/Azure/go-amqp"
"log/slog"
"strings"
"time"
)
type AmqpSenderSession struct {
session *amqp.Session
sender *amqp.Sender
}
func (c *AmqpConnection) NewSenderSession(source string) (*AmqpSenderSession, error) {
if c.IsClosed() {
return nil, ConnectionClosedError
}
return As[AmqpSenderSession](c.ResetConnectionAndRetryIfErrorWithReturnValue("newSenderSession", func(ctx context.Context) (any, error) {
c.lock.RLock()
defer c.lock.RUnlock()
return c.NewSender(ctx, source)
}))
}
func (s *AmqpSenderSession) Send(
topic string,
data [][]byte,
contentType string,
applicationProperties map[string]any,
) error {
// check topic name
if !strings.HasPrefix(topic, AmqpTopicPrefix) {
return fmt.Errorf(
"topic %q name lacks mandatory prefix %q",
topic,
AmqpTopicPrefix,
)
}
if contentType == "" {
return errors.New("content-type is required")
}
// prepare the amqp message
message := amqp.Message{
Header: &amqp.MessageHeader{
Durable: true,
},
Properties: &amqp.MessageProperties{
To: &topic,
ContentType: &contentType,
},
ApplicationProperties: applicationProperties,
Data: data,
}
// send
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFn()
return s.sender.Send(ctx, &message, nil)
}
func (s *AmqpSenderSession) Close() error {
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFn()
var closeErrors []error
senderErr := s.sender.Close(ctx)
if senderErr != nil {
closeErrors = append(closeErrors, senderErr)
}
sessionErr := s.session.Close(ctx)
if sessionErr != nil {
closeErrors = append(closeErrors, sessionErr)
}
if len(closeErrors) > 0 {
return errors.Join(closeErrors...)
}
return nil
}
func (s *AmqpSenderSession) CloseSilently() {
err := s.Close()
if err != nil {
slog.Error("error closing sender session", slog.Any("err", err))
}
}

View file

@ -2,19 +2,13 @@ package messaging
import ( import (
"context" "context"
"dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/log"
"errors" "errors"
"fmt" "fmt"
"strings"
"sync" "sync"
"time" "time"
"dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/log"
"github.com/Azure/go-amqp"
) )
// Default connection timeout for the AMQP connection
const connectionTimeoutSeconds = 10
// Api is an abstraction for a messaging system that can be used to send // Api is an abstraction for a messaging system that can be used to send
// audit logs to the audit log system. // audit logs to the audit log system.
type Api interface { type Api interface {
@ -38,203 +32,102 @@ type Api interface {
Close(ctx context.Context) error Close(ctx context.Context) error
} }
// MutexApi is wrapper around an API implementation that controls mutual exclusive access to the api.
type MutexApi struct {
mutex sync.Mutex
api Api
}
var _ Api = &MutexApi{}
func NewMutexApi(api Api) (Api, error) {
if api == nil {
return nil, errors.New("api is nil")
}
mutexApi := MutexApi{
mutex: sync.Mutex{},
api: api,
}
var genericApi Api = &mutexApi
return genericApi, nil
}
// Send implements Api.Send
func (m *MutexApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.api.Send(ctx, topic, data, contentType, applicationProperties)
}
func (m *MutexApi) Close(ctx context.Context) error {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.api.Close(ctx)
}
// AmqpConfig provides AMQP connection related parameters.
type AmqpConfig struct {
URL string
User string
Password string
}
// AmqpSession is an abstraction providing a subset of the methods of amqp.Session
type AmqpSession interface {
NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error)
Close(ctx context.Context) error
}
type AmqpSessionWrapper struct {
session *amqp.Session
}
func (w AmqpSessionWrapper) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error) {
return w.session.NewSender(ctx, target, opts)
}
func (w AmqpSessionWrapper) Close(ctx context.Context) error {
return w.session.Close(ctx)
}
// AmqpSender is an abstraction providing a subset of the methods of amqp.Sender
type AmqpSender interface {
Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error
Close(ctx context.Context) error
}
// AmqpApi implements Api. // AmqpApi implements Api.
type AmqpApi struct { type AmqpApi struct {
config AmqpConfig config AmqpConnectionPoolConfig
connection *amqp.Conn connection *AmqpConnection
session AmqpSession connectionPool *AmqpConnectionPool
connectionPoolHandle *ConnectionPoolHandle
senderCache map[string]*AmqpSenderSession
lock sync.RWMutex
} }
var _ Api = &AmqpApi{} var _ Api = &AmqpApi{}
func NewAmqpApi(amqpConfig AmqpConfig) (Api, error) { func NewAmqpApi(amqpConfig AmqpConnectionPoolConfig) (Api, error) {
amqpApi := &AmqpApi{config: amqpConfig} ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFn()
if err := amqpApi.connect(); err != nil { connectionPool, err := NewAmqpConnectionPool(ctx, amqpConfig, "sdk")
return nil, fmt.Errorf("connect to broker: %w", err)
}
return amqpApi, nil
}
// connect opens a new connection and session to the AMQP messaging system.
// The connection attempt will be cancelled after connectionTimeoutSeconds.
func (a *AmqpApi) connect() error {
log.AuditLogger.Info("connecting to audit messaging system")
// Set credentials if specified
auth := amqp.SASLTypeAnonymous()
if a.config.User != "" && a.config.Password != "" {
auth = amqp.SASLTypePlain(a.config.User, a.config.Password)
log.AuditLogger.Info("using username and password for messaging")
} else {
log.AuditLogger.Warn("using anonymous messaging!")
}
options := &amqp.ConnOptions{
SASLType: auth,
}
// Create new context with timeout for the connection initialization
subCtx, cancel := context.WithTimeout(context.Background(), connectionTimeoutSeconds*time.Second)
defer cancel()
// Initialize connection
conn, err := amqp.Dial(subCtx, a.config.URL, options)
if err != nil { if err != nil {
return fmt.Errorf("dial connection to broker: %w", err) return nil, fmt.Errorf("new amqp connection pool: %w", err)
}
a.connection = conn
// Initialize session
session, err := conn.NewSession(context.Background(), nil)
if err != nil {
return fmt.Errorf("create session: %w", err)
} }
var amqpSession AmqpSession = &AmqpSessionWrapper{session: session} amqpApi := &AmqpApi{config: amqpConfig,
a.session = amqpSession connectionPool: connectionPool,
connectionPoolHandle: connectionPool.NewHandle(),
senderCache: make(map[string]*AmqpSenderSession),
}
return nil var messagingApi Api = amqpApi
return messagingApi, nil
} }
// Send implements Api.Send. // Send implements Api.Send.
// If errors occur the connection to the messaging system will be closed and re-established. // If errors occur the connection to the messaging system will be closed and re-established.
func (a *AmqpApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error { func (a *AmqpApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
err := a.trySend(ctx, topic, data, contentType, applicationProperties)
if err == nil {
return nil
}
// Drop the current sender, as it cannot connect to the broker anymore a.lock.RLock()
log.AuditLogger.Error("message sender error, recreating", err) connectionIsClosed := a.connection == nil || a.connection.IsClosed()
a.lock.RUnlock()
err = a.resetConnection(ctx) if connectionIsClosed {
if err != nil { connection, err := a.connectionPool.GetConnection(a.connectionPoolHandle)
return fmt.Errorf("reset connection: %w", err) if err != nil {
} return fmt.Errorf("get connection: %w", err)
return a.trySend(ctx, topic, data, contentType, applicationProperties)
}
// trySend actually sends the given data as amqp.Message to the messaging system.
func (a *AmqpApi) trySend(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
if !strings.HasPrefix(topic, AmqpTopicPrefix) {
return fmt.Errorf(
"topic %q name lacks mandatory prefix %q",
topic,
AmqpTopicPrefix,
)
}
sender, err := a.session.NewSender(ctx, topic, nil)
if err != nil {
return fmt.Errorf("new sender: %w", err)
}
defer func() {
if err := sender.Close(ctx); err != nil {
log.AuditLogger.Error("failed to close session sender", err)
} }
}() a.lock.Lock()
a.connection = connection
bytes := [][]byte{data} a.lock.Unlock()
message := amqp.Message{
Header: &amqp.MessageHeader{
Durable: true,
},
Properties: &amqp.MessageProperties{
To: &topic,
ContentType: &contentType,
},
ApplicationProperties: applicationProperties,
Data: bytes,
} }
err = sender.Send(ctx, &message, nil) a.lock.RLock()
if err != nil { var sender = a.senderCache[topic]
return fmt.Errorf("send message: %w", err) a.lock.RUnlock()
if sender == nil {
a.lock.RLock()
ctx, cancelFn := context.WithTimeout(ctx, 10*time.Second)
senderSession, err := a.connection.NewSender(ctx, topic)
cancelFn()
a.lock.RUnlock()
if err != nil {
return fmt.Errorf("new sender: %w", err)
}
a.lock.Lock()
a.senderCache[topic] = senderSession
a.lock.Unlock()
sender = senderSession
} }
wrappedData := [][]byte{data}
if err := sender.Send(topic, wrappedData, contentType, applicationProperties); err != nil {
return fmt.Errorf("send: %w", err)
}
return nil return nil
} }
// resetConnection closes the current session and connection and reconnects to the messaging system.
func (a *AmqpApi) resetConnection(ctx context.Context) error {
if err := a.Close(ctx); err != nil {
log.AuditLogger.Error("failed to close audit messaging connection", err)
}
return a.connect()
}
// Close implements Api.Close // Close implements Api.Close
func (a *AmqpApi) Close(ctx context.Context) error { func (a *AmqpApi) Close(ctx context.Context) error {
log.AuditLogger.Info("close audit messaging connection") log.AuditLogger.Info("close audit messaging connection")
return errors.Join(a.session.Close(ctx), a.connection.Close()) var closeErrors []error
a.lock.RLock()
numSenderCacheItems := len(a.senderCache)
a.lock.RUnlock()
if numSenderCacheItems > 0 {
a.lock.Lock()
for _, session := range a.senderCache {
if err := session.Close(); err != nil {
closeErrors = append(closeErrors, fmt.Errorf("close session: %w", err))
}
}
clear(a.senderCache)
a.lock.Unlock()
}
if a.connectionPool != nil {
if err := a.connectionPool.Close(ctx); err != nil {
closeErrors = append(closeErrors, fmt.Errorf("close pool: %w", err))
}
}
if len(closeErrors) > 0 {
return fmt.Errorf("close: %w", errors.Join(closeErrors...))
}
return nil
} }

View file

@ -2,52 +2,52 @@ package messaging
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"github.com/stretchr/testify/assert"
"testing" "testing"
"time" "time"
"github.com/Azure/go-amqp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
) )
type AmqpSessionMock struct { //type AmqpSessionMock struct {
mock.Mock // mock.Mock
} //}
//
func (m *AmqpSessionMock) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error) { //func (m *AmqpSessionMock) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error) {
args := m.Called(ctx, target, opts) // args := m.Called(ctx, target, opts)
var sender AmqpSender = nil // var sender AmqpSender = nil
if args.Get(0) != nil { // if args.Get(0) != nil {
sender = args.Get(0).(AmqpSender) // sender = args.Get(0).(AmqpSender)
} // }
err := args.Error(1) // err := args.Error(1)
return sender, err // return sender, err
} //}
//
func (m *AmqpSessionMock) Close(ctx context.Context) error { //func (m *AmqpSessionMock) Close(ctx context.Context) error {
args := m.Called(ctx) // args := m.Called(ctx)
return args.Error(0) // return args.Error(0)
} //}
//
type AmqpSenderMock struct { //type AmqpSenderMock struct {
mock.Mock // mock.Mock
} //}
//
func (m *AmqpSenderMock) Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error { //func (m *AmqpSenderMock) Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error {
args := m.Called(ctx, msg, opts) // args := m.Called(ctx, msg, opts)
return args.Error(0) // return args.Error(0)
} //}
//
func (m *AmqpSenderMock) Close(ctx context.Context) error { //func (m *AmqpSenderMock) Close(ctx context.Context) error {
args := m.Called(ctx) // args := m.Called(ctx)
return args.Error(0) // return args.Error(0)
} //}
func Test_NewAmqpMessagingApi(t *testing.T) { func Test_NewAmqpMessagingApi(t *testing.T) {
_, err := NewAmqpApi(AmqpConfig{URL: "not-handled-protocol://localhost:5672"}) _, err := NewAmqpApi(
assert.EqualError(t, err, "connect to broker: dial connection to broker: unsupported scheme \"not-handled-protocol\"") AmqpConnectionPoolConfig{
Parameters: AmqpConnectionConfig{BrokerUrl: "not-handled-protocol://localhost:5672"},
PoolSize: 1,
})
assert.EqualError(t, err, "new amqp connection pool: initialize connections: new connection: failed to connect to amqp broker: dial: unsupported scheme \"not-handled-protocol\"")
} }
func Test_AmqpMessagingApi_Send(t *testing.T) { func Test_AmqpMessagingApi_Send(t *testing.T) {
@ -63,11 +63,14 @@ func Test_AmqpMessagingApi_Send(t *testing.T) {
t.Run("Missing topic prefix", func(t *testing.T) { t.Run("Missing topic prefix", func(t *testing.T) {
defer solaceContainer.StopOnError() defer solaceContainer.StopOnError()
api, err := NewAmqpApi(AmqpConfig{URL: solaceContainer.AmqpConnectionString}) api, err := NewAmqpApi(AmqpConnectionPoolConfig{
Parameters: AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString},
PoolSize: 1,
})
assert.NoError(t, err) assert.NoError(t, err)
err = api.Send(ctx, "topic-name", []byte{}, "application/json", make(map[string]any)) err = api.Send(ctx, "topic-name", []byte{}, "application/json", make(map[string]any))
assert.EqualError(t, err, "topic \"topic-name\" name lacks mandatory prefix \"topic://\"") assert.EqualError(t, err, "send: topic \"topic-name\" name lacks mandatory prefix \"topic://\"")
}) })
t.Run("Close connection without errors", func(t *testing.T) { t.Run("Close connection without errors", func(t *testing.T) {
@ -81,103 +84,107 @@ func Test_AmqpMessagingApi_Send(t *testing.T) {
topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-close-connection") topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-close-connection")
assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName)) assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName))
api := &AmqpApi{config: AmqpConfig{URL: solaceContainer.AmqpConnectionString}} api, err := NewAmqpApi(AmqpConnectionPoolConfig{
err := api.connect() Parameters: AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString},
PoolSize: 1,
})
assert.NoError(t, err) assert.NoError(t, err)
err = api.Close(ctx) err = api.Close(ctx)
assert.NoError(t, err) assert.NoError(t, err)
}) })
t.Run("New sender call returns error", func(t *testing.T) { //t.Run("New sender call returns error", func(t *testing.T) {
defer solaceContainer.StopOnError() // defer solaceContainer.StopOnError()
//
// Initialize the solace queue // // Initialize the solace queue
topicSubscriptionTopicPattern := "auditlog/>" // topicSubscriptionTopicPattern := "auditlog/>"
queueName := "messaging-new-sender" // queueName := "messaging-new-sender"
assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName)) // assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName))
assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern)) // assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern))
topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-no-new-sender") // topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-no-new-sender")
assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName)) // assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName))
//
api := &AmqpApi{config: AmqpConfig{URL: solaceContainer.AmqpConnectionString}} // api, err := NewAmqpApi(AmqpConnectionPoolConfig{
err := api.connect() // Parameters: AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString},
assert.NoError(t, err) // PoolSize: 1,
// })
expectedError := errors.New("expected error") // assert.NoError(t, err)
//
// Set mock session // expectedError := errors.New("expected error")
sessionMock := AmqpSessionMock{} //
sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(nil, expectedError) // // Set mock session
sessionMock.On("Close", mock.Anything).Return(nil) // sessionMock := AmqpSessionMock{}
// sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(nil, expectedError)
var amqpSession AmqpSession = &sessionMock // sessionMock.On("Close", mock.Anything).Return(nil)
api.session = amqpSession //
// var amqpSession AmqpSession = &sessionMock
// It's expected that the test succeeds. // api.session = amqpSession
// First the session is closed as it returns the expected error //
// Then the retry mechanism restarts the connection and successfully sends the data // // It's expected that the test succeeds.
value := "test" // // First the session is closed as it returns the expected error
err = api.Send(ctx, topicName, []byte(value), "application/json", make(map[string]any)) // // Then the retry mechanism restarts the connection and successfully sends the data
assert.NoError(t, err) // value := "test"
// err = api.Send(ctx, topicName, []byte(value), "application/json", make(map[string]any))
// Check that the mock was called // assert.NoError(t, err)
assert.True(t, sessionMock.AssertNumberOfCalls(t, "NewSender", 1)) //
assert.True(t, sessionMock.AssertNumberOfCalls(t, "Close", 1)) // // Check that the mock was called
// assert.True(t, sessionMock.AssertNumberOfCalls(t, "NewSender", 1))
message, err := solaceContainer.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), true) // assert.True(t, sessionMock.AssertNumberOfCalls(t, "Close", 1))
assert.NoError(t, err) //
assert.Equal(t, value, string(message.Data[0])) // message, err := solaceContainer.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), true)
assert.Equal(t, topicName, *message.Properties.To) // assert.NoError(t, err)
}) // assert.Equal(t, value, string(message.Data[0]))
// assert.Equal(t, topicName, *message.Properties.To)
t.Run("Send call on sender returns error", func(t *testing.T) { //})
defer solaceContainer.StopOnError() //
//t.Run("Send call on sender returns error", func(t *testing.T) {
// Initialize the solace queue // defer solaceContainer.StopOnError()
topicSubscriptionTopicPattern := "auditlog/>" //
queueName := "messaging-sender-error" // // Initialize the solace queue
assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName)) // topicSubscriptionTopicPattern := "auditlog/>"
assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern)) // queueName := "messaging-sender-error"
topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-sender-error") // assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName))
assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName)) // assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern))
// topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-sender-error")
api := &AmqpApi{config: AmqpConfig{URL: solaceContainer.AmqpConnectionString}} // assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName))
err := api.connect() //
assert.NoError(t, err) // api := &AmqpApi{config: AmqpConfig{URL: solaceContainer.AmqpConnectionString}}
// err := api.connect()
expectedError := errors.New("expected error") // assert.NoError(t, err)
//
// Instantiate mock sender // expectedError := errors.New("expected error")
senderMock := AmqpSenderMock{} //
senderMock.On("Send", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(expectedError) // // Instantiate mock sender
senderMock.On("Close", mock.Anything).Return(nil) // senderMock := AmqpSenderMock{}
var amqpSender AmqpSender = &senderMock // senderMock.On("Send", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(expectedError)
// senderMock.On("Close", mock.Anything).Return(nil)
// Set mock session // var amqpSender AmqpSender = &senderMock
sessionMock := AmqpSessionMock{} //
sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(&amqpSender, nil) // // Set mock session
sessionMock.On("Close", mock.Anything).Return(nil) // sessionMock := AmqpSessionMock{}
// sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(&amqpSender, nil)
var amqpSession AmqpSession = &sessionMock // sessionMock.On("Close", mock.Anything).Return(nil)
api.session = amqpSession //
// var amqpSession AmqpSession = &sessionMock
// It's expected that the test succeeds. // api.session = amqpSession
// First the sender and session are closed as the sender returns the expected error //
// Then the retry mechanism restarts the connection and successfully sends the data // // It's expected that the test succeeds.
value := "test" // // First the sender and session are closed as the sender returns the expected error
err = api.Send(ctx, topicName, []byte(value), "application/json", make(map[string]any)) // // Then the retry mechanism restarts the connection and successfully sends the data
assert.NoError(t, err) // value := "test"
// err = api.Send(ctx, topicName, []byte(value), "application/json", make(map[string]any))
// Check that the mocks were called // assert.NoError(t, err)
assert.True(t, sessionMock.AssertNumberOfCalls(t, "NewSender", 1)) //
assert.True(t, sessionMock.AssertNumberOfCalls(t, "Close", 1)) // // Check that the mocks were called
assert.True(t, senderMock.AssertNumberOfCalls(t, "Send", 1)) // assert.True(t, sessionMock.AssertNumberOfCalls(t, "NewSender", 1))
assert.True(t, senderMock.AssertNumberOfCalls(t, "Close", 1)) // assert.True(t, sessionMock.AssertNumberOfCalls(t, "Close", 1))
// assert.True(t, senderMock.AssertNumberOfCalls(t, "Send", 1))
message, err := solaceContainer.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), true) // assert.True(t, senderMock.AssertNumberOfCalls(t, "Close", 1))
assert.NoError(t, err) //
assert.Equal(t, value, string(message.Data[0])) // message, err := solaceContainer.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), true)
assert.Equal(t, topicName, *message.Properties.To) // assert.NoError(t, err)
}) // assert.Equal(t, value, string(message.Data[0]))
// assert.Equal(t, topicName, *message.Properties.To)
//})
} }

View file

@ -18,7 +18,6 @@ import (
) )
const ( const (
AmqpTopicPrefix = "topic://"
AmqpQueuePrefix = "queue://" AmqpQueuePrefix = "queue://"
) )