package messaging import ( "context" "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/log" "errors" "fmt" "sync" "time" ) // Api is an abstraction for a messaging system that can be used to send // audit logs to the audit log system. type Api interface { // Send method will send the given data to the specified topic synchronously. // Parameters: // * ctx - the context object // * topic - the messaging topic where to send the data to // * data - the serialized data as byte array // * contentType - the contentType of the serialized data // * applicationProperties - properties to send with the message (i.e. cloud event headers) // // It returns technical errors for connection issues or sending problems. Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error // Close the underlying connection to the messaging system. // Parameters: // * ctx - the context object // // It returns an error if the connection cannot be closed successfully Close(ctx context.Context) error } // AmqpApi implements Api. type AmqpApi struct { config AmqpConnectionPoolConfig connection *AmqpConnection connectionPool ConnectionPool connectionPoolHandle *ConnectionPoolHandle senderCache map[string]*AmqpSenderSession lock sync.RWMutex } var _ Api = &AmqpApi{} func NewAmqpApi(amqpConfig AmqpConnectionPoolConfig) (Api, error) { connectionPool, err := NewAmqpConnectionPool(amqpConfig, "sdk") if err != nil { return nil, fmt.Errorf("new amqp connection pool: %w", err) } amqpApi := &AmqpApi{config: amqpConfig, connectionPool: connectionPool, connectionPoolHandle: connectionPool.NewHandle(), senderCache: make(map[string]*AmqpSenderSession), } var messagingApi Api = amqpApi return messagingApi, nil } // Send implements Api.Send. // If errors occur the connection to the messaging system will be closed and re-established. func (a *AmqpApi) Send(_ context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error { // create or get sender from cache var sender = a.senderFromCache(topic) if sender == nil { if err := a.newSender(topic); err != nil { return err } sender = a.senderFromCache(topic) } // first attempt to send var sendErr error wrappedData := [][]byte{data} if err := sender.Send(topic, wrappedData, contentType, applicationProperties); err != nil { sendErr = fmt.Errorf("send: %w", err) } else { return nil } // renew sender if err := a.newSender(topic); err != nil { return errors.Join(sendErr, err) } sender = a.senderFromCache(topic) // retry send if err := sender.Send(topic, wrappedData, contentType, applicationProperties); err != nil { return errors.Join(sendErr, fmt.Errorf("retry send: %w", err)) } return nil } func (a *AmqpApi) senderFromCache(topic string) *AmqpSenderSession { a.lock.RLock() defer a.lock.RUnlock() return a.senderCache[topic] } func (a *AmqpApi) newSender(topic string) error { a.lock.Lock() defer a.lock.Unlock() connectionIsClosed := a.connection == nil || a.connection.IsClosed() if connectionIsClosed { connection, err := a.connectionPool.GetConnection(a.connectionPoolHandle) if err != nil { return fmt.Errorf("get connection: %w", err) } a.connection = connection } ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) sender, err := a.connection.NewSender(ctx, topic) cancelFn() if err != nil { return fmt.Errorf("new sender: %w", err) } a.senderCache[topic] = sender return nil } // Close implements Api.Close func (a *AmqpApi) Close(_ context.Context) error { log.AuditLogger.Info("close audit amqp connection pool") a.lock.Lock() defer a.lock.Unlock() // cached senders var closeErrors []error for _, session := range a.senderCache { if err := session.Close(); err != nil { closeErrors = append(closeErrors, fmt.Errorf("close session: %w", err)) } } clear(a.senderCache) // pool if err := a.connectionPool.Close(); err != nil { closeErrors = append(closeErrors, fmt.Errorf("close pool: %w", err)) } if len(closeErrors) > 0 { return fmt.Errorf("close: %w", errors.Join(closeErrors...)) } return nil }