mirror of
https://dev.azure.com/schwarzit/schwarzit.stackit-public/_git/audit-go
synced 2026-02-08 00:57:24 +00:00
148 lines
4.1 KiB
Go
148 lines
4.1 KiB
Go
package api
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
internalMessaging "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/internal/messaging"
|
|
pkgLog "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/log"
|
|
pkgMessagingCommon "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/messaging/common"
|
|
)
|
|
|
|
// AmqpApi implements Api.
|
|
type AmqpApi struct {
|
|
connection *internalMessaging.AmqpConnection
|
|
connectionPool internalMessaging.ConnectionPool
|
|
connectionPoolHandle *internalMessaging.ConnectionPoolHandle
|
|
senderCache map[string]*internalMessaging.AmqpSenderSession
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
var _ Api = &AmqpApi{}
|
|
|
|
func NewDefaultAmqpApi(amqpConfig pkgMessagingCommon.AmqpConnectionConfig) (Api, error) {
|
|
connectionPool, err := internalMessaging.NewDefaultAmqpConnectionPool(amqpConfig, "sdk")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("new amqp connection pool: %w", err)
|
|
}
|
|
|
|
amqpApi := &AmqpApi{
|
|
connectionPool: connectionPool,
|
|
connectionPoolHandle: connectionPool.NewHandle(),
|
|
senderCache: make(map[string]*internalMessaging.AmqpSenderSession),
|
|
}
|
|
|
|
var messagingApi Api = amqpApi
|
|
return messagingApi, nil
|
|
}
|
|
|
|
func NewAmqpApi(amqpConfig pkgMessagingCommon.AmqpConnectionPoolConfig) (Api, error) {
|
|
connectionPool, err := internalMessaging.NewAmqpConnectionPool(amqpConfig, "sdk")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("new amqp connection pool: %w", err)
|
|
}
|
|
|
|
amqpApi := &AmqpApi{
|
|
connectionPool: connectionPool,
|
|
connectionPoolHandle: connectionPool.NewHandle(),
|
|
senderCache: make(map[string]*internalMessaging.AmqpSenderSession),
|
|
}
|
|
|
|
var messagingApi Api = amqpApi
|
|
return messagingApi, nil
|
|
}
|
|
|
|
// Send implements Api.Send.
|
|
// If errors occur the connection to the messaging system will be closed and re-established.
|
|
func (a *AmqpApi) Send(_ context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
|
|
|
|
// create or get sender from cache
|
|
var sender = a.senderFromCache(topic)
|
|
if sender == nil {
|
|
if err := a.newSender(topic); err != nil {
|
|
return err
|
|
}
|
|
sender = a.senderFromCache(topic)
|
|
}
|
|
|
|
// first attempt to send
|
|
var sendErr error
|
|
wrappedData := [][]byte{data}
|
|
if err := sender.Send(topic, wrappedData, contentType, applicationProperties); err != nil {
|
|
sendErr = fmt.Errorf("send: %w", err)
|
|
} else {
|
|
return nil
|
|
}
|
|
|
|
// renew sender
|
|
if err := a.newSender(topic); err != nil {
|
|
return errors.Join(sendErr, err)
|
|
}
|
|
sender = a.senderFromCache(topic)
|
|
|
|
// retry send
|
|
if err := sender.Send(topic, wrappedData, contentType, applicationProperties); err != nil {
|
|
return errors.Join(sendErr, fmt.Errorf("retry send: %w", err))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *AmqpApi) senderFromCache(topic string) *internalMessaging.AmqpSenderSession {
|
|
a.lock.RLock()
|
|
defer a.lock.RUnlock()
|
|
return a.senderCache[topic]
|
|
}
|
|
|
|
func (a *AmqpApi) newSender(topic string) error {
|
|
a.lock.Lock()
|
|
defer a.lock.Unlock()
|
|
|
|
connectionIsClosed := a.connection == nil || a.connection.IsClosed()
|
|
if connectionIsClosed {
|
|
connection, err := a.connectionPool.GetConnection(a.connectionPoolHandle)
|
|
if err != nil {
|
|
return fmt.Errorf("get connection: %w", err)
|
|
}
|
|
a.connection = connection
|
|
}
|
|
|
|
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
|
|
sender, err := a.connection.NewSender(ctx, topic)
|
|
cancelFn()
|
|
if err != nil {
|
|
return fmt.Errorf("new sender: %w", err)
|
|
}
|
|
|
|
a.senderCache[topic] = sender
|
|
return nil
|
|
}
|
|
|
|
// Close implements Api.Close
|
|
func (a *AmqpApi) Close(_ context.Context) error {
|
|
pkgLog.AuditLogger.Info("close audit amqp connection pool")
|
|
|
|
a.lock.Lock()
|
|
defer a.lock.Unlock()
|
|
|
|
// cached senders
|
|
var closeErrors []error
|
|
for _, session := range a.senderCache {
|
|
if err := session.Close(); err != nil {
|
|
closeErrors = append(closeErrors, fmt.Errorf("close session: %w", err))
|
|
}
|
|
}
|
|
clear(a.senderCache)
|
|
|
|
// pool
|
|
if err := a.connectionPool.Close(); err != nil {
|
|
closeErrors = append(closeErrors, fmt.Errorf("close pool: %w", err))
|
|
}
|
|
|
|
if len(closeErrors) > 0 {
|
|
return fmt.Errorf("close: %w", errors.Join(closeErrors...))
|
|
}
|
|
return nil
|
|
}
|