mirror of
https://dev.azure.com/schwarzit/schwarzit.stackit-public/_git/audit-go
synced 2026-02-08 09:07:26 +00:00
218 lines
5.9 KiB
Go
218 lines
5.9 KiB
Go
package messaging
|
|
|
|
import (
|
|
"context"
|
|
"dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/log"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/Azure/go-amqp"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Default connection timeout for the AMQP connection
|
|
const connectionTimeoutSeconds = 10
|
|
|
|
// Api is an abstraction for a messaging system that can be used to send
|
|
// audit logs to the audit log system.
|
|
type Api interface {
|
|
|
|
// Send method will send the given data to the specified topic synchronously.
|
|
// Parameters:
|
|
// * ctx - the context object
|
|
// * topic - the messaging topic where to send the data to
|
|
// * data - the serialized data as byte array
|
|
// * contentType - the contentType of the serialized data
|
|
// * applicationProperties - properties to send with the message (i.e. cloud event headers)
|
|
//
|
|
// It returns technical errors for connection issues or sending problems.
|
|
Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error
|
|
}
|
|
|
|
// MutexApi is wrapper around an API implementation that controls mutual exclusive access to the api.
|
|
type MutexApi struct {
|
|
mutex *sync.Mutex
|
|
api *Api
|
|
}
|
|
|
|
func NewMutexApi(api *Api) (*Api, error) {
|
|
if api == nil {
|
|
return nil, errors.New("api is nil")
|
|
}
|
|
mutexApi := MutexApi{
|
|
mutex: &sync.Mutex{},
|
|
api: api,
|
|
}
|
|
|
|
var genericApi Api = &mutexApi
|
|
return &genericApi, nil
|
|
}
|
|
|
|
// Send implements Api.Send
|
|
func (m *MutexApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
return (*m.api).Send(ctx, topic, data, contentType, applicationProperties)
|
|
}
|
|
|
|
// AmqpConfig provides AMQP connection related parameters.
|
|
type AmqpConfig struct {
|
|
URL string
|
|
User string
|
|
Password string
|
|
}
|
|
|
|
// AmqpSession is an abstraction providing a subset of the methods of amqp.Session
|
|
type AmqpSession interface {
|
|
NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (*AmqpSender, error)
|
|
Close(ctx context.Context) error
|
|
}
|
|
|
|
type AmqpSessionWrapper struct {
|
|
session *amqp.Session
|
|
}
|
|
|
|
func (w AmqpSessionWrapper) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (*AmqpSender, error) {
|
|
sender, err := w.session.NewSender(ctx, target, opts)
|
|
var amqpSender AmqpSender = sender
|
|
return &amqpSender, err
|
|
}
|
|
|
|
func (w AmqpSessionWrapper) Close(ctx context.Context) error {
|
|
return w.session.Close(ctx)
|
|
}
|
|
|
|
// AmqpSender is an abstraction providing a subset of the methods of amqp.Sender
|
|
type AmqpSender interface {
|
|
Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error
|
|
Close(ctx context.Context) error
|
|
}
|
|
|
|
// AmqpApi implements Api.
|
|
type AmqpApi struct {
|
|
config AmqpConfig
|
|
connection *amqp.Conn
|
|
session *AmqpSession
|
|
}
|
|
|
|
func NewAmqpApi(amqpConfig AmqpConfig) (*Api, error) {
|
|
amqpApi := &AmqpApi{config: amqpConfig}
|
|
|
|
err := amqpApi.connect()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var messagingApi Api = amqpApi
|
|
return &messagingApi, nil
|
|
}
|
|
|
|
// connect opens a new connection and session to the AMQP messaging system.
|
|
// The connection attempt will be cancelled after connectionTimeoutSeconds.
|
|
func (a *AmqpApi) connect() error {
|
|
log.AuditLogger.Info("connecting to messaging system")
|
|
|
|
// Set credentials if specified
|
|
auth := amqp.SASLTypeAnonymous()
|
|
|
|
if a.config.User != "" && a.config.Password != "" {
|
|
auth = amqp.SASLTypePlain(a.config.User, a.config.Password)
|
|
log.AuditLogger.Info("using username and password for messaging")
|
|
} else {
|
|
log.AuditLogger.Warn("using anonymous messaging!")
|
|
}
|
|
|
|
options := &amqp.ConnOptions{
|
|
SASLType: auth,
|
|
}
|
|
|
|
// Create new context with timeout for the connection initialization
|
|
subCtx, cancel := context.WithTimeout(context.Background(), connectionTimeoutSeconds*time.Second)
|
|
defer cancel()
|
|
|
|
// Initialize connection
|
|
conn, err := amqp.Dial(subCtx, a.config.URL, options)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
a.connection = conn
|
|
|
|
// Initialize session
|
|
session, err := conn.NewSession(context.Background(), nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var amqpSession AmqpSession = AmqpSessionWrapper{session: session}
|
|
a.session = &amqpSession
|
|
|
|
return nil
|
|
}
|
|
|
|
// Send implements Api.Send.
|
|
// If errors occur the connection to the messaging system will be closed and re-established.
|
|
func (a *AmqpApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
|
|
err := a.trySend(ctx, topic, data, contentType, applicationProperties)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
// Drop the current sender, as it cannot connect to the broker anymore
|
|
log.AuditLogger.Error("message sender error, recreating", err)
|
|
|
|
err = a.resetConnection(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return a.trySend(ctx, topic, data, contentType, applicationProperties)
|
|
}
|
|
|
|
// trySend actually sends the given data as amqp.Message to the messaging system.
|
|
func (a *AmqpApi) trySend(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
|
|
if !strings.HasPrefix(topic, AmqpTopicPrefix) {
|
|
return fmt.Errorf(
|
|
"topic %q name lacks mandatory prefix %q",
|
|
topic,
|
|
AmqpTopicPrefix,
|
|
)
|
|
}
|
|
|
|
sender, err := (*a.session).NewSender(ctx, topic, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
bytes := [][]byte{data}
|
|
message := amqp.Message{
|
|
Header: &amqp.MessageHeader{
|
|
Durable: true,
|
|
},
|
|
Properties: &amqp.MessageProperties{
|
|
To: &topic,
|
|
ContentType: &contentType,
|
|
},
|
|
ApplicationProperties: applicationProperties,
|
|
Data: bytes,
|
|
}
|
|
|
|
err = (*sender).Send(ctx, &message, nil)
|
|
if err != nil {
|
|
_ = (*sender).Close(ctx)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// resetConnection closes the current session and connection and reconnects to the messaging system.
|
|
func (a *AmqpApi) resetConnection(ctx context.Context) error {
|
|
_ = (*a.session).Close(ctx)
|
|
err := a.connection.Close()
|
|
if err != nil {
|
|
log.AuditLogger.Error("failed to close message connection", err)
|
|
}
|
|
|
|
return a.connect()
|
|
}
|