package messaging import ( "context" "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/log" "errors" "fmt" "sync" "time" ) // Api is an abstraction for a messaging system that can be used to send // audit logs to the audit log system. type Api interface { // Send method will send the given data to the specified topic synchronously. // Parameters: // * ctx - the context object // * topic - the messaging topic where to send the data to // * data - the serialized data as byte array // * contentType - the contentType of the serialized data // * applicationProperties - properties to send with the message (i.e. cloud event headers) // // It returns technical errors for connection issues or sending problems. Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error // Close the underlying connection to the messaging system. // Parameters: // * ctx - the context object // // It returns an error if the connection cannot be closed successfully Close(ctx context.Context) error } // AmqpApi implements Api. type AmqpApi struct { config AmqpConnectionPoolConfig connection *AmqpConnection connectionPool ConnectionPool connectionPoolHandle *ConnectionPoolHandle senderCache map[string]*AmqpSenderSession lock sync.RWMutex } var _ Api = &AmqpApi{} func NewAmqpApi(amqpConfig AmqpConnectionPoolConfig) (Api, error) { connectionPool, err := NewAmqpConnectionPool(amqpConfig, "sdk") if err != nil { return nil, fmt.Errorf("new amqp connection pool: %w", err) } amqpApi := &AmqpApi{config: amqpConfig, connectionPool: connectionPool, connectionPoolHandle: connectionPool.NewHandle(), senderCache: make(map[string]*AmqpSenderSession), } var messagingApi Api = amqpApi return messagingApi, nil } // Send implements Api.Send. // If errors occur the connection to the messaging system will be closed and re-established. func (a *AmqpApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error { a.lock.RLock() connectionIsClosed := a.connection == nil || a.connection.IsClosed() a.lock.RUnlock() if connectionIsClosed { connection, err := a.connectionPool.GetConnection(a.connectionPoolHandle) if err != nil { return fmt.Errorf("get connection: %w", err) } a.lock.Lock() a.connection = connection a.lock.Unlock() } a.lock.RLock() var sender = a.senderCache[topic] a.lock.RUnlock() if sender == nil { a.lock.RLock() ctx, cancelFn := context.WithTimeout(ctx, 10*time.Second) senderSession, err := a.connection.NewSender(ctx, topic) cancelFn() a.lock.RUnlock() if err != nil { return fmt.Errorf("new sender: %w", err) } a.lock.Lock() a.senderCache[topic] = senderSession a.lock.Unlock() sender = senderSession } wrappedData := [][]byte{data} if err := sender.Send(topic, wrappedData, contentType, applicationProperties); err != nil { return fmt.Errorf("send: %w", err) } return nil } // Close implements Api.Close func (a *AmqpApi) Close(_ context.Context) error { log.AuditLogger.Info("close audit amqp connection pool") a.lock.Lock() defer a.lock.Unlock() // cached senders var closeErrors []error for _, session := range a.senderCache { if err := session.Close(); err != nil { closeErrors = append(closeErrors, fmt.Errorf("close session: %w", err)) } } clear(a.senderCache) // pool if err := a.connectionPool.Close(); err != nil { closeErrors = append(closeErrors, fmt.Errorf("close pool: %w", err)) } if len(closeErrors) > 0 { return fmt.Errorf("close: %w", errors.Join(closeErrors...)) } return nil }