package messaging import ( "context" "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/log" "errors" "fmt" "github.com/Azure/go-amqp" "strings" "sync" "time" ) // Default connection timeout for the AMQP connection const connectionTimeoutSeconds = 10 // Api is an abstraction for a messaging system that can be used to send // audit logs to the audit log system. type Api interface { // Send method will send the given data to the specified topic synchronously. // Parameters: // * ctx - the context object // * topic - the messaging topic where to send the data to // * data - the serialized data as byte array // * contentType - the contentType of the serialized data // * applicationProperties - properties to send with the message (i.e. cloud event headers) // // It returns technical errors for connection issues or sending problems. Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error } // MutexApi is wrapper around an API implementation that controls mutual exclusive access to the api. type MutexApi struct { mutex *sync.Mutex api *Api } func NewMutexApi(api *Api) (*Api, error) { if api == nil { return nil, errors.New("api is nil") } mutexApi := MutexApi{ mutex: &sync.Mutex{}, api: api, } var genericApi Api = &mutexApi return &genericApi, nil } // Send implements Api.Send func (m *MutexApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error { m.mutex.Lock() defer m.mutex.Unlock() return (*m.api).Send(ctx, topic, data, contentType, applicationProperties) } // AmqpConfig provides AMQP connection related parameters. type AmqpConfig struct { URL string User string Password string } // AmqpSession is an abstraction providing a subset of the methods of amqp.Session type AmqpSession interface { NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (*AmqpSender, error) Close(ctx context.Context) error } type AmqpSessionWrapper struct { session *amqp.Session } func (w AmqpSessionWrapper) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (*AmqpSender, error) { sender, err := w.session.NewSender(ctx, target, opts) var amqpSender AmqpSender = sender return &amqpSender, err } func (w AmqpSessionWrapper) Close(ctx context.Context) error { return w.session.Close(ctx) } // AmqpSender is an abstraction providing a subset of the methods of amqp.Sender type AmqpSender interface { Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error Close(ctx context.Context) error } // AmqpApi implements Api. type AmqpApi struct { config AmqpConfig connection *amqp.Conn session *AmqpSession } func NewAmqpApi(amqpConfig AmqpConfig) (*Api, error) { amqpApi := &AmqpApi{config: amqpConfig} err := amqpApi.connect() if err != nil { return nil, err } var messagingApi Api = amqpApi return &messagingApi, nil } // connect opens a new connection and session to the AMQP messaging system. // The connection attempt will be cancelled after connectionTimeoutSeconds. func (a *AmqpApi) connect() error { log.AuditLogger.Info("connecting to messaging system") // Set credentials if specified auth := amqp.SASLTypeAnonymous() if a.config.User != "" && a.config.Password != "" { auth = amqp.SASLTypePlain(a.config.User, a.config.Password) log.AuditLogger.Info("using username and password for messaging") } else { log.AuditLogger.Warn("using anonymous messaging!") } options := &amqp.ConnOptions{ SASLType: auth, } // Create new context with timeout for the connection initialization subCtx, cancel := context.WithTimeout(context.Background(), connectionTimeoutSeconds*time.Second) defer cancel() // Initialize connection conn, err := amqp.Dial(subCtx, a.config.URL, options) if err != nil { return err } a.connection = conn // Initialize session session, err := conn.NewSession(context.Background(), nil) if err != nil { return err } var amqpSession AmqpSession = AmqpSessionWrapper{session: session} a.session = &amqpSession return nil } // Send implements Api.Send. // If errors occur the connection to the messaging system will be closed and re-established. func (a *AmqpApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error { err := a.trySend(ctx, topic, data, contentType, applicationProperties) if err == nil { return nil } // Drop the current sender, as it cannot connect to the broker anymore log.AuditLogger.Error("message sender error, recreating", err) err = a.resetConnection(ctx) if err != nil { return err } return a.trySend(ctx, topic, data, contentType, applicationProperties) } // trySend actually sends the given data as amqp.Message to the messaging system. func (a *AmqpApi) trySend(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error { if !strings.HasPrefix(topic, AmqpTopicPrefix) { return fmt.Errorf( "topic %q name lacks mandatory prefix %q", topic, AmqpTopicPrefix, ) } sender, err := (*a.session).NewSender(ctx, topic, nil) if err != nil { return err } bytes := [][]byte{data} message := amqp.Message{ Header: &amqp.MessageHeader{ Durable: true, }, Properties: &amqp.MessageProperties{ To: &topic, ContentType: &contentType, }, ApplicationProperties: applicationProperties, Data: bytes, } err = (*sender).Send(ctx, &message, nil) if err != nil { _ = (*sender).Close(ctx) return err } return nil } // resetConnection closes the current session and connection and reconnects to the messaging system. func (a *AmqpApi) resetConnection(ctx context.Context) error { _ = (*a.session).Close(ctx) err := a.connection.Close() if err != nil { log.AuditLogger.Error("failed to close message connection", err) } return a.connect() }