audit-go/pkg/messaging/api/amqp.go
Christian Schaible (EXT) 85aae1c2e7 Merged PR 779949: feat: Refactor module structure to reflect best practices
Security-concept-update-needed: false.

JIRA Work Item: STACKITALO-259
2025-05-19 11:54:00 +00:00

148 lines
4.1 KiB
Go

package api
import (
"context"
"errors"
"fmt"
"sync"
"time"
internalMessaging "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/internal/messaging"
pkgLog "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/log"
pkgMessagingCommon "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/messaging/common"
)
// AmqpApi implements Api.
type AmqpApi struct {
connection *internalMessaging.AmqpConnection
connectionPool internalMessaging.ConnectionPool
connectionPoolHandle *internalMessaging.ConnectionPoolHandle
senderCache map[string]*internalMessaging.AmqpSenderSession
lock sync.RWMutex
}
var _ Api = &AmqpApi{}
func NewDefaultAmqpApi(amqpConfig pkgMessagingCommon.AmqpConnectionConfig) (Api, error) {
connectionPool, err := internalMessaging.NewDefaultAmqpConnectionPool(amqpConfig, "sdk")
if err != nil {
return nil, fmt.Errorf("new amqp connection pool: %w", err)
}
amqpApi := &AmqpApi{
connectionPool: connectionPool,
connectionPoolHandle: connectionPool.NewHandle(),
senderCache: make(map[string]*internalMessaging.AmqpSenderSession),
}
var messagingApi Api = amqpApi
return messagingApi, nil
}
func NewAmqpApi(amqpConfig pkgMessagingCommon.AmqpConnectionPoolConfig) (Api, error) {
connectionPool, err := internalMessaging.NewAmqpConnectionPool(amqpConfig, "sdk")
if err != nil {
return nil, fmt.Errorf("new amqp connection pool: %w", err)
}
amqpApi := &AmqpApi{
connectionPool: connectionPool,
connectionPoolHandle: connectionPool.NewHandle(),
senderCache: make(map[string]*internalMessaging.AmqpSenderSession),
}
var messagingApi Api = amqpApi
return messagingApi, nil
}
// Send implements Api.Send.
// If errors occur the connection to the messaging system will be closed and re-established.
func (a *AmqpApi) Send(_ context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
// create or get sender from cache
var sender = a.senderFromCache(topic)
if sender == nil {
if err := a.newSender(topic); err != nil {
return err
}
sender = a.senderFromCache(topic)
}
// first attempt to send
var sendErr error
wrappedData := [][]byte{data}
if err := sender.Send(topic, wrappedData, contentType, applicationProperties); err != nil {
sendErr = fmt.Errorf("send: %w", err)
} else {
return nil
}
// renew sender
if err := a.newSender(topic); err != nil {
return errors.Join(sendErr, err)
}
sender = a.senderFromCache(topic)
// retry send
if err := sender.Send(topic, wrappedData, contentType, applicationProperties); err != nil {
return errors.Join(sendErr, fmt.Errorf("retry send: %w", err))
}
return nil
}
func (a *AmqpApi) senderFromCache(topic string) *internalMessaging.AmqpSenderSession {
a.lock.RLock()
defer a.lock.RUnlock()
return a.senderCache[topic]
}
func (a *AmqpApi) newSender(topic string) error {
a.lock.Lock()
defer a.lock.Unlock()
connectionIsClosed := a.connection == nil || a.connection.IsClosed()
if connectionIsClosed {
connection, err := a.connectionPool.GetConnection(a.connectionPoolHandle)
if err != nil {
return fmt.Errorf("get connection: %w", err)
}
a.connection = connection
}
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
sender, err := a.connection.NewSender(ctx, topic)
cancelFn()
if err != nil {
return fmt.Errorf("new sender: %w", err)
}
a.senderCache[topic] = sender
return nil
}
// Close implements Api.Close
func (a *AmqpApi) Close(_ context.Context) error {
pkgLog.AuditLogger.Info("close audit amqp connection pool")
a.lock.Lock()
defer a.lock.Unlock()
// cached senders
var closeErrors []error
for _, session := range a.senderCache {
if err := session.Close(); err != nil {
closeErrors = append(closeErrors, fmt.Errorf("close session: %w", err))
}
}
clear(a.senderCache)
// pool
if err := a.connectionPool.Close(); err != nil {
closeErrors = append(closeErrors, fmt.Errorf("close pool: %w", err))
}
if len(closeErrors) > 0 {
return fmt.Errorf("close: %w", errors.Join(closeErrors...))
}
return nil
}