audit-go/audit/messaging/messaging.go
2024-10-07 13:57:06 +02:00

218 lines
5.9 KiB
Go

package messaging
import (
"context"
"dev.azure.com/schwarzit/schwarzit.stackit-core-platform/audit-go.git/log"
"errors"
"fmt"
"github.com/Azure/go-amqp"
"strings"
"sync"
"time"
)
// Default connection timeout for the AMQP connection
const connectionTimeoutSeconds = 10
// Api is an abstraction for a messaging system that can be used to send
// audit logs to the audit log system.
type Api interface {
// Send method will send the given data to the specified topic synchronously.
// Parameters:
// * ctx - the context object
// * topic - the messaging topic where to send the data to
// * data - the serialized data as byte array
// * contentType - the contentType of the serialized data
// * applicationProperties - properties to send with the message (i.e. cloud event headers)
//
// It returns technical errors for connection issues or sending problems.
Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error
}
// MutexApi is wrapper around an API implementation that controls mutual exclusive access to the api.
type MutexApi struct {
mutex *sync.Mutex
api *Api
}
func NewMutexApi(api *Api) (*Api, error) {
if api == nil {
return nil, errors.New("api is nil")
}
mutexApi := MutexApi{
mutex: &sync.Mutex{},
api: api,
}
var genericApi Api = &mutexApi
return &genericApi, nil
}
// Send implements Api.Send
func (m *MutexApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
m.mutex.Lock()
defer m.mutex.Unlock()
return (*m.api).Send(ctx, topic, data, contentType, applicationProperties)
}
// AmqpConfig provides AMQP connection related parameters.
type AmqpConfig struct {
URL string
User string
Password string
}
// AmqpSession is an abstraction providing a subset of the methods of amqp.Session
type AmqpSession interface {
NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (*AmqpSender, error)
Close(ctx context.Context) error
}
type AmqpSessionWrapper struct {
session *amqp.Session
}
func (w AmqpSessionWrapper) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (*AmqpSender, error) {
sender, err := w.session.NewSender(ctx, target, opts)
var amqpSender AmqpSender = sender
return &amqpSender, err
}
func (w AmqpSessionWrapper) Close(ctx context.Context) error {
return w.session.Close(ctx)
}
// AmqpSender is an abstraction providing a subset of the methods of amqp.Sender
type AmqpSender interface {
Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error
Close(ctx context.Context) error
}
// AmqpApi implements Api.
type AmqpApi struct {
config AmqpConfig
connection *amqp.Conn
session *AmqpSession
}
func NewAmqpApi(amqpConfig AmqpConfig) (*Api, error) {
amqpApi := &AmqpApi{config: amqpConfig}
err := amqpApi.connect()
if err != nil {
return nil, err
}
var messagingApi Api = amqpApi
return &messagingApi, nil
}
// connect opens a new connection and session to the AMQP messaging system.
// The connection attempt will be cancelled after connectionTimeoutSeconds.
func (a *AmqpApi) connect() error {
log.AuditLogger.Info("connecting to messaging system")
// Set credentials if specified
auth := amqp.SASLTypeAnonymous()
if a.config.User != "" && a.config.Password != "" {
auth = amqp.SASLTypePlain(a.config.User, a.config.Password)
log.AuditLogger.Info("using username and password for messaging")
} else {
log.AuditLogger.Warn("using anonymous messaging!")
}
options := &amqp.ConnOptions{
SASLType: auth,
}
// Create new context with timeout for the connection initialization
subCtx, cancel := context.WithTimeout(context.Background(), connectionTimeoutSeconds*time.Second)
defer cancel()
// Initialize connection
conn, err := amqp.Dial(subCtx, a.config.URL, options)
if err != nil {
return err
}
a.connection = conn
// Initialize session
session, err := conn.NewSession(context.Background(), nil)
if err != nil {
return err
}
var amqpSession AmqpSession = AmqpSessionWrapper{session: session}
a.session = &amqpSession
return nil
}
// Send implements Api.Send.
// If errors occur the connection to the messaging system will be closed and re-established.
func (a *AmqpApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
err := a.trySend(ctx, topic, data, contentType, applicationProperties)
if err == nil {
return nil
}
// Drop the current sender, as it cannot connect to the broker anymore
log.AuditLogger.Error("message sender error, recreating", err)
err = a.resetConnection(ctx)
if err != nil {
return err
}
return a.trySend(ctx, topic, data, contentType, applicationProperties)
}
// trySend actually sends the given data as amqp.Message to the messaging system.
func (a *AmqpApi) trySend(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
if !strings.HasPrefix(topic, AmqpTopicPrefix) {
return fmt.Errorf(
"topic %q name lacks mandatory prefix %q",
topic,
AmqpTopicPrefix,
)
}
sender, err := (*a.session).NewSender(ctx, topic, nil)
if err != nil {
return err
}
bytes := [][]byte{data}
message := amqp.Message{
Header: &amqp.MessageHeader{
Durable: true,
},
Properties: &amqp.MessageProperties{
To: &topic,
ContentType: &contentType,
},
ApplicationProperties: applicationProperties,
Data: bytes,
}
err = (*sender).Send(ctx, &message, nil)
if err != nil {
_ = (*sender).Close(ctx)
return err
}
return nil
}
// resetConnection closes the current session and connection and reconnects to the messaging system.
func (a *AmqpApi) resetConnection(ctx context.Context) error {
_ = (*a.session).Close(ctx)
err := a.connection.Close()
if err != nil {
log.AuditLogger.Error("failed to close message connection", err)
}
return a.connect()
}