package api import ( "context" "errors" "fmt" "sync" "time" internalMessaging "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/internal/messaging" pkgLog "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/log" pkgMessagingCommon "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/messaging/common" ) // AmqpApi implements Api. type AmqpApi struct { connection *internalMessaging.AmqpConnection connectionPool internalMessaging.ConnectionPool connectionPoolHandle *internalMessaging.ConnectionPoolHandle senderCache map[string]*internalMessaging.AmqpSenderSession lock sync.RWMutex } var _ Api = &AmqpApi{} func NewDefaultAmqpApi(amqpConfig pkgMessagingCommon.AmqpConnectionConfig) (Api, error) { connectionPool, err := internalMessaging.NewDefaultAmqpConnectionPool(amqpConfig, "sdk") if err != nil { return nil, fmt.Errorf("new amqp connection pool: %w", err) } amqpApi := &AmqpApi{ connectionPool: connectionPool, connectionPoolHandle: connectionPool.NewHandle(), senderCache: make(map[string]*internalMessaging.AmqpSenderSession), } var messagingApi Api = amqpApi return messagingApi, nil } func NewAmqpApi(amqpConfig pkgMessagingCommon.AmqpConnectionPoolConfig) (Api, error) { connectionPool, err := internalMessaging.NewAmqpConnectionPool(amqpConfig, "sdk") if err != nil { return nil, fmt.Errorf("new amqp connection pool: %w", err) } amqpApi := &AmqpApi{ connectionPool: connectionPool, connectionPoolHandle: connectionPool.NewHandle(), senderCache: make(map[string]*internalMessaging.AmqpSenderSession), } var messagingApi Api = amqpApi return messagingApi, nil } // Send implements Api.Send. // If errors occur the connection to the messaging system will be closed and re-established. func (a *AmqpApi) Send(_ context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error { // create or get sender from cache var sender = a.senderFromCache(topic) if sender == nil { if err := a.newSender(topic); err != nil { return err } sender = a.senderFromCache(topic) } // first attempt to send var sendErr error wrappedData := [][]byte{data} if err := sender.Send(topic, wrappedData, contentType, applicationProperties); err != nil { sendErr = fmt.Errorf("send: %w", err) } else { return nil } // renew sender if err := a.newSender(topic); err != nil { return errors.Join(sendErr, err) } sender = a.senderFromCache(topic) // retry send if err := sender.Send(topic, wrappedData, contentType, applicationProperties); err != nil { return errors.Join(sendErr, fmt.Errorf("retry send: %w", err)) } return nil } func (a *AmqpApi) senderFromCache(topic string) *internalMessaging.AmqpSenderSession { a.lock.RLock() defer a.lock.RUnlock() return a.senderCache[topic] } func (a *AmqpApi) newSender(topic string) error { a.lock.Lock() defer a.lock.Unlock() connectionIsClosed := a.connection == nil || a.connection.IsClosed() if connectionIsClosed { connection, err := a.connectionPool.GetConnection(a.connectionPoolHandle) if err != nil { return fmt.Errorf("get connection: %w", err) } a.connection = connection } ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) sender, err := a.connection.NewSender(ctx, topic) cancelFn() if err != nil { return fmt.Errorf("new sender: %w", err) } a.senderCache[topic] = sender return nil } // Close implements Api.Close func (a *AmqpApi) Close(_ context.Context) error { pkgLog.AuditLogger.Info("close audit amqp connection pool") a.lock.Lock() defer a.lock.Unlock() // cached senders var closeErrors []error for _, session := range a.senderCache { if err := session.Close(); err != nil { closeErrors = append(closeErrors, fmt.Errorf("close session: %w", err)) } } clear(a.senderCache) // pool if err := a.connectionPool.Close(); err != nil { closeErrors = append(closeErrors, fmt.Errorf("close pool: %w", err)) } if len(closeErrors) > 0 { return fmt.Errorf("close: %w", errors.Join(closeErrors...)) } return nil }