mirror of
https://dev.azure.com/schwarzit/schwarzit.stackit-public/_git/audit-go
synced 2026-02-08 09:07:26 +00:00
So far the SDK provided a messaging API that was not thread-safe (i.e. goroutine-safe). Additionally the SDK provided a MutexAPI which made it thread-safe at the cost of removed concurrency possibilities. The changes implemented in this commit replace both implementations with a thread-safe connection pool based solution. The api gateway is a SDK user that requires reliable high performance send capabilities with a limit amount of amqp connections. These changes in the PR try address their requirements by moving the responsibility of connection management into the SDK. From this change other SDK users will benefit as well. Security-concept-update-needed: false. JIRA Work Item: STACKITALO-62
153 lines
4.2 KiB
Go
153 lines
4.2 KiB
Go
package messaging
|
|
|
|
import (
|
|
"context"
|
|
"dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/log"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Api is an abstraction for a messaging system that can be used to send
|
|
// audit logs to the audit log system.
|
|
type Api interface {
|
|
|
|
// Send method will send the given data to the specified topic synchronously.
|
|
// Parameters:
|
|
// * ctx - the context object
|
|
// * topic - the messaging topic where to send the data to
|
|
// * data - the serialized data as byte array
|
|
// * contentType - the contentType of the serialized data
|
|
// * applicationProperties - properties to send with the message (i.e. cloud event headers)
|
|
//
|
|
// It returns technical errors for connection issues or sending problems.
|
|
Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error
|
|
|
|
// Close the underlying connection to the messaging system.
|
|
// Parameters:
|
|
// * ctx - the context object
|
|
//
|
|
// It returns an error if the connection cannot be closed successfully
|
|
Close(ctx context.Context) error
|
|
}
|
|
|
|
// AmqpApi implements Api.
|
|
type AmqpApi struct {
|
|
config AmqpConnectionPoolConfig
|
|
connection *AmqpConnection
|
|
connectionPool ConnectionPool
|
|
connectionPoolHandle *ConnectionPoolHandle
|
|
senderCache map[string]*AmqpSenderSession
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
var _ Api = &AmqpApi{}
|
|
|
|
func NewAmqpApi(amqpConfig AmqpConnectionPoolConfig) (Api, error) {
|
|
connectionPool, err := NewAmqpConnectionPool(amqpConfig, "sdk")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("new amqp connection pool: %w", err)
|
|
}
|
|
|
|
amqpApi := &AmqpApi{config: amqpConfig,
|
|
connectionPool: connectionPool,
|
|
connectionPoolHandle: connectionPool.NewHandle(),
|
|
senderCache: make(map[string]*AmqpSenderSession),
|
|
}
|
|
|
|
var messagingApi Api = amqpApi
|
|
return messagingApi, nil
|
|
}
|
|
|
|
// Send implements Api.Send.
|
|
// If errors occur the connection to the messaging system will be closed and re-established.
|
|
func (a *AmqpApi) Send(_ context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
|
|
|
|
// create or get sender from cache
|
|
var sender = a.senderFromCache(topic)
|
|
if sender == nil {
|
|
if err := a.newSender(topic); err != nil {
|
|
return err
|
|
}
|
|
sender = a.senderFromCache(topic)
|
|
}
|
|
|
|
// first attempt to send
|
|
var sendErr error
|
|
wrappedData := [][]byte{data}
|
|
if err := sender.Send(topic, wrappedData, contentType, applicationProperties); err != nil {
|
|
sendErr = fmt.Errorf("send: %w", err)
|
|
} else {
|
|
return nil
|
|
}
|
|
|
|
// renew sender
|
|
if err := a.newSender(topic); err != nil {
|
|
return errors.Join(sendErr, err)
|
|
}
|
|
sender = a.senderFromCache(topic)
|
|
|
|
// retry send
|
|
if err := sender.Send(topic, wrappedData, contentType, applicationProperties); err != nil {
|
|
return errors.Join(sendErr, fmt.Errorf("retry send: %w", err))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *AmqpApi) senderFromCache(topic string) *AmqpSenderSession {
|
|
a.lock.RLock()
|
|
defer a.lock.RUnlock()
|
|
return a.senderCache[topic]
|
|
}
|
|
|
|
func (a *AmqpApi) newSender(topic string) error {
|
|
a.lock.Lock()
|
|
defer a.lock.Unlock()
|
|
|
|
connectionIsClosed := a.connection == nil || a.connection.IsClosed()
|
|
if connectionIsClosed {
|
|
connection, err := a.connectionPool.GetConnection(a.connectionPoolHandle)
|
|
if err != nil {
|
|
return fmt.Errorf("get connection: %w", err)
|
|
}
|
|
a.connection = connection
|
|
}
|
|
|
|
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
|
|
sender, err := a.connection.NewSender(ctx, topic)
|
|
cancelFn()
|
|
if err != nil {
|
|
return fmt.Errorf("new sender: %w", err)
|
|
}
|
|
|
|
a.senderCache[topic] = sender
|
|
return nil
|
|
}
|
|
|
|
// Close implements Api.Close
|
|
func (a *AmqpApi) Close(_ context.Context) error {
|
|
log.AuditLogger.Info("close audit amqp connection pool")
|
|
|
|
a.lock.Lock()
|
|
defer a.lock.Unlock()
|
|
|
|
// cached senders
|
|
var closeErrors []error
|
|
for _, session := range a.senderCache {
|
|
if err := session.Close(); err != nil {
|
|
closeErrors = append(closeErrors, fmt.Errorf("close session: %w", err))
|
|
}
|
|
}
|
|
clear(a.senderCache)
|
|
|
|
// pool
|
|
if err := a.connectionPool.Close(); err != nil {
|
|
closeErrors = append(closeErrors, fmt.Errorf("close pool: %w", err))
|
|
}
|
|
|
|
if len(closeErrors) > 0 {
|
|
return fmt.Errorf("close: %w", errors.Join(closeErrors...))
|
|
}
|
|
return nil
|
|
}
|