audit-go/audit/messaging/messaging.go
Christian Schaible 5742604629 Merged PR 716929: feat: Replace AMQP connection management
So far the SDK provided a messaging API that was not thread-safe (i.e. goroutine-safe). Additionally the SDK provided a MutexAPI which made it thread-safe at the cost of removed concurrency possibilities. The changes implemented in this commit replace both implementations with a thread-safe connection pool based solution.

The api gateway is a SDK user that requires reliable high performance send capabilities with a limit amount of amqp connections. These changes in the PR try address their requirements by moving the responsibility of connection management into the SDK. From this change other SDK users will benefit as well.

Security-concept-update-needed: false.

JIRA Work Item: STACKITALO-62
2025-01-27 13:23:54 +00:00

153 lines
4.2 KiB
Go

package messaging
import (
"context"
"dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/log"
"errors"
"fmt"
"sync"
"time"
)
// Api is an abstraction for a messaging system that can be used to send
// audit logs to the audit log system.
type Api interface {
// Send method will send the given data to the specified topic synchronously.
// Parameters:
// * ctx - the context object
// * topic - the messaging topic where to send the data to
// * data - the serialized data as byte array
// * contentType - the contentType of the serialized data
// * applicationProperties - properties to send with the message (i.e. cloud event headers)
//
// It returns technical errors for connection issues or sending problems.
Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error
// Close the underlying connection to the messaging system.
// Parameters:
// * ctx - the context object
//
// It returns an error if the connection cannot be closed successfully
Close(ctx context.Context) error
}
// AmqpApi implements Api.
type AmqpApi struct {
config AmqpConnectionPoolConfig
connection *AmqpConnection
connectionPool ConnectionPool
connectionPoolHandle *ConnectionPoolHandle
senderCache map[string]*AmqpSenderSession
lock sync.RWMutex
}
var _ Api = &AmqpApi{}
func NewAmqpApi(amqpConfig AmqpConnectionPoolConfig) (Api, error) {
connectionPool, err := NewAmqpConnectionPool(amqpConfig, "sdk")
if err != nil {
return nil, fmt.Errorf("new amqp connection pool: %w", err)
}
amqpApi := &AmqpApi{config: amqpConfig,
connectionPool: connectionPool,
connectionPoolHandle: connectionPool.NewHandle(),
senderCache: make(map[string]*AmqpSenderSession),
}
var messagingApi Api = amqpApi
return messagingApi, nil
}
// Send implements Api.Send.
// If errors occur the connection to the messaging system will be closed and re-established.
func (a *AmqpApi) Send(_ context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
// create or get sender from cache
var sender = a.senderFromCache(topic)
if sender == nil {
if err := a.newSender(topic); err != nil {
return err
}
sender = a.senderFromCache(topic)
}
// first attempt to send
var sendErr error
wrappedData := [][]byte{data}
if err := sender.Send(topic, wrappedData, contentType, applicationProperties); err != nil {
sendErr = fmt.Errorf("send: %w", err)
} else {
return nil
}
// renew sender
if err := a.newSender(topic); err != nil {
return errors.Join(sendErr, err)
}
sender = a.senderFromCache(topic)
// retry send
if err := sender.Send(topic, wrappedData, contentType, applicationProperties); err != nil {
return errors.Join(sendErr, fmt.Errorf("retry send: %w", err))
}
return nil
}
func (a *AmqpApi) senderFromCache(topic string) *AmqpSenderSession {
a.lock.RLock()
defer a.lock.RUnlock()
return a.senderCache[topic]
}
func (a *AmqpApi) newSender(topic string) error {
a.lock.Lock()
defer a.lock.Unlock()
connectionIsClosed := a.connection == nil || a.connection.IsClosed()
if connectionIsClosed {
connection, err := a.connectionPool.GetConnection(a.connectionPoolHandle)
if err != nil {
return fmt.Errorf("get connection: %w", err)
}
a.connection = connection
}
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
sender, err := a.connection.NewSender(ctx, topic)
cancelFn()
if err != nil {
return fmt.Errorf("new sender: %w", err)
}
a.senderCache[topic] = sender
return nil
}
// Close implements Api.Close
func (a *AmqpApi) Close(_ context.Context) error {
log.AuditLogger.Info("close audit amqp connection pool")
a.lock.Lock()
defer a.lock.Unlock()
// cached senders
var closeErrors []error
for _, session := range a.senderCache {
if err := session.Close(); err != nil {
closeErrors = append(closeErrors, fmt.Errorf("close session: %w", err))
}
}
clear(a.senderCache)
// pool
if err := a.connectionPool.Close(); err != nil {
closeErrors = append(closeErrors, fmt.Errorf("close pool: %w", err))
}
if len(closeErrors) > 0 {
return fmt.Errorf("close: %w", errors.Join(closeErrors...))
}
return nil
}