audit-go/audit/messaging/messaging.go
2025-01-14 07:27:43 +01:00

133 lines
3.9 KiB
Go

package messaging
import (
"context"
"dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/log"
"errors"
"fmt"
"sync"
"time"
)
// Api is an abstraction for a messaging system that can be used to send
// audit logs to the audit log system.
type Api interface {
// Send method will send the given data to the specified topic synchronously.
// Parameters:
// * ctx - the context object
// * topic - the messaging topic where to send the data to
// * data - the serialized data as byte array
// * contentType - the contentType of the serialized data
// * applicationProperties - properties to send with the message (i.e. cloud event headers)
//
// It returns technical errors for connection issues or sending problems.
Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error
// Close the underlying connection to the messaging system.
// Parameters:
// * ctx - the context object
//
// It returns an error if the connection cannot be closed successfully
Close(ctx context.Context) error
}
// AmqpApi implements Api.
type AmqpApi struct {
config AmqpConnectionPoolConfig
connection *AmqpConnection
connectionPool *AmqpConnectionPool
connectionPoolHandle *ConnectionPoolHandle
senderCache map[string]*AmqpSenderSession
lock sync.RWMutex
}
var _ Api = &AmqpApi{}
func NewAmqpApi(amqpConfig AmqpConnectionPoolConfig) (Api, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFn()
connectionPool, err := NewAmqpConnectionPool(ctx, amqpConfig, "sdk")
if err != nil {
return nil, fmt.Errorf("new amqp connection pool: %w", err)
}
amqpApi := &AmqpApi{config: amqpConfig,
connectionPool: connectionPool,
connectionPoolHandle: connectionPool.NewHandle(),
senderCache: make(map[string]*AmqpSenderSession),
}
var messagingApi Api = amqpApi
return messagingApi, nil
}
// Send implements Api.Send.
// If errors occur the connection to the messaging system will be closed and re-established.
func (a *AmqpApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
a.lock.RLock()
connectionIsClosed := a.connection == nil || a.connection.IsClosed()
a.lock.RUnlock()
if connectionIsClosed {
connection, err := a.connectionPool.GetConnection(a.connectionPoolHandle)
if err != nil {
return fmt.Errorf("get connection: %w", err)
}
a.lock.Lock()
a.connection = connection
a.lock.Unlock()
}
a.lock.RLock()
var sender = a.senderCache[topic]
a.lock.RUnlock()
if sender == nil {
a.lock.RLock()
ctx, cancelFn := context.WithTimeout(ctx, 10*time.Second)
senderSession, err := a.connection.NewSender(ctx, topic)
cancelFn()
a.lock.RUnlock()
if err != nil {
return fmt.Errorf("new sender: %w", err)
}
a.lock.Lock()
a.senderCache[topic] = senderSession
a.lock.Unlock()
sender = senderSession
}
wrappedData := [][]byte{data}
if err := sender.Send(topic, wrappedData, contentType, applicationProperties); err != nil {
return fmt.Errorf("send: %w", err)
}
return nil
}
// Close implements Api.Close
func (a *AmqpApi) Close(ctx context.Context) error {
log.AuditLogger.Info("close audit messaging connection")
var closeErrors []error
a.lock.RLock()
numSenderCacheItems := len(a.senderCache)
a.lock.RUnlock()
if numSenderCacheItems > 0 {
a.lock.Lock()
for _, session := range a.senderCache {
if err := session.Close(); err != nil {
closeErrors = append(closeErrors, fmt.Errorf("close session: %w", err))
}
}
clear(a.senderCache)
a.lock.Unlock()
}
if a.connectionPool != nil {
if err := a.connectionPool.Close(ctx); err != nil {
closeErrors = append(closeErrors, fmt.Errorf("close pool: %w", err))
}
}
if len(closeErrors) > 0 {
return fmt.Errorf("close: %w", errors.Join(closeErrors...))
}
return nil
}