audit-go/audit/messaging/messaging.go
Lukas Schmitt 6041fd105a Merged PR 710571: refactor connection, session and sender handling during reconnect
* logging all errors related to the session and connection close
* closing the sender properly not only in case of an error

Security-concept-update-needed: false.

JIRA Work Item: STACKITALO-61
2025-01-03 12:19:11 +00:00

244 lines
6.5 KiB
Go

package messaging
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
"dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/log"
"github.com/Azure/go-amqp"
)
// Default connection timeout for the AMQP connection
const connectionTimeoutSeconds = 10
// Api is an abstraction for a messaging system that can be used to send
// audit logs to the audit log system.
type Api interface {
// Send method will send the given data to the specified topic synchronously.
// Parameters:
// * ctx - the context object
// * topic - the messaging topic where to send the data to
// * data - the serialized data as byte array
// * contentType - the contentType of the serialized data
// * applicationProperties - properties to send with the message (i.e. cloud event headers)
//
// It returns technical errors for connection issues or sending problems.
Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error
// Close the underlying connection to the messaging system.
// Parameters:
// * ctx - the context object
//
// It returns an error if the connection cannot be closed successfully
Close(ctx context.Context) error
}
// MutexApi is wrapper around an API implementation that controls mutual exclusive access to the api.
type MutexApi struct {
mutex sync.Mutex
api Api
}
var _ Api = &MutexApi{}
func NewMutexApi(api Api) (Api, error) {
if api == nil {
return nil, errors.New("api is nil")
}
mutexApi := MutexApi{
mutex: sync.Mutex{},
api: api,
}
var genericApi Api = &mutexApi
return genericApi, nil
}
// Send implements Api.Send
func (m *MutexApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.api.Send(ctx, topic, data, contentType, applicationProperties)
}
func (m *MutexApi) Close(ctx context.Context) error {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.api.Close(ctx)
}
// AmqpConfig provides AMQP connection related parameters.
type AmqpConfig struct {
URL string
User string
Password string
}
// AmqpSession is an abstraction providing a subset of the methods of amqp.Session
type AmqpSession interface {
NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error)
Close(ctx context.Context) error
}
type AmqpSessionWrapper struct {
session *amqp.Session
}
func (w AmqpSessionWrapper) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error) {
sender, err := w.session.NewSender(ctx, target, opts)
var amqpSender AmqpSender = sender
return amqpSender, err
}
func (w AmqpSessionWrapper) Close(ctx context.Context) error {
return w.session.Close(ctx)
}
// AmqpSender is an abstraction providing a subset of the methods of amqp.Sender
type AmqpSender interface {
Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error
Close(ctx context.Context) error
}
// AmqpApi implements Api.
type AmqpApi struct {
config AmqpConfig
connection *amqp.Conn
session AmqpSession
}
var _ Api = &AmqpApi{}
func NewAmqpApi(amqpConfig AmqpConfig) (Api, error) {
amqpApi := &AmqpApi{config: amqpConfig}
err := amqpApi.connect()
if err != nil {
return nil, err
}
var messagingApi Api = amqpApi
return messagingApi, nil
}
// connect opens a new connection and session to the AMQP messaging system.
// The connection attempt will be cancelled after connectionTimeoutSeconds.
func (a *AmqpApi) connect() error {
log.AuditLogger.Info("connecting to audit messaging system")
// Set credentials if specified
auth := amqp.SASLTypeAnonymous()
if a.config.User != "" && a.config.Password != "" {
auth = amqp.SASLTypePlain(a.config.User, a.config.Password)
log.AuditLogger.Info("using username and password for messaging")
} else {
log.AuditLogger.Warn("using anonymous messaging!")
}
options := &amqp.ConnOptions{
SASLType: auth,
}
// Create new context with timeout for the connection initialization
subCtx, cancel := context.WithTimeout(context.Background(), connectionTimeoutSeconds*time.Second)
defer cancel()
// Initialize connection
conn, err := amqp.Dial(subCtx, a.config.URL, options)
if err != nil {
return err
}
a.connection = conn
// Initialize session
session, err := conn.NewSession(context.Background(), nil)
if err != nil {
return err
}
var amqpSession AmqpSession = &AmqpSessionWrapper{session: session}
a.session = amqpSession
return nil
}
// Send implements Api.Send.
// If errors occur the connection to the messaging system will be closed and re-established.
func (a *AmqpApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
err := a.trySend(ctx, topic, data, contentType, applicationProperties)
if err == nil {
return nil
}
// Drop the current sender, as it cannot connect to the broker anymore
log.AuditLogger.Error("message sender error, recreating", err)
err = a.resetConnection(ctx)
if err != nil {
return err
}
return a.trySend(ctx, topic, data, contentType, applicationProperties)
}
// trySend actually sends the given data as amqp.Message to the messaging system.
func (a *AmqpApi) trySend(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
if !strings.HasPrefix(topic, AmqpTopicPrefix) {
return fmt.Errorf(
"topic %q name lacks mandatory prefix %q",
topic,
AmqpTopicPrefix,
)
}
sender, err := a.session.NewSender(ctx, topic, nil)
if err != nil {
return err
}
defer func() {
if err := sender.Close(ctx); err != nil {
log.AuditLogger.Error("failed to close session sender", err)
}
}()
bytes := [][]byte{data}
message := amqp.Message{
Header: &amqp.MessageHeader{
Durable: true,
},
Properties: &amqp.MessageProperties{
To: &topic,
ContentType: &contentType,
},
ApplicationProperties: applicationProperties,
Data: bytes,
}
err = sender.Send(ctx, &message, nil)
if err != nil {
return err
}
return nil
}
// resetConnection closes the current session and connection and reconnects to the messaging system.
func (a *AmqpApi) resetConnection(ctx context.Context) error {
if err := a.Close(ctx); err != nil {
log.AuditLogger.Error("failed to close audit messaging connection", err)
}
return a.connect()
}
// Close implements Api.Close
func (a *AmqpApi) Close(ctx context.Context) error {
log.AuditLogger.Info("close audit messaging connection")
return errors.Join(a.session.Close(ctx), a.connection.Close())
}