audit-go/audit/messaging/amqp_receiver_session.go
2025-01-13 16:42:12 +01:00

71 lines
1.8 KiB
Go

package messaging
import (
"context"
"errors"
"github.com/Azure/go-amqp"
"log/slog"
"time"
)
type AmqpReceiverSession struct {
session *amqp.Session
receiver *amqp.Receiver
}
func (c *AmqpConnection) NewReceiverSession(source string) (*AmqpReceiverSession, error) {
if c.IsClosed() {
return nil, ConnectionClosedError
}
return As[AmqpReceiverSession](c.ResetConnectionAndRetryIfErrorWithReturnValue("newReceiverSession", func(ctx context.Context) (any, error) {
c.lock.RLock()
defer c.lock.RUnlock()
return c.NewReceiver(ctx, source)
}))
}
func (s *AmqpReceiverSession) AcceptMessage(message *amqp.Message) error {
ctx, cancelFn := context.WithTimeout(context.Background(), 1*time.Second)
defer cancelFn()
return s.receiver.AcceptMessage(ctx, message)
}
func (s *AmqpReceiverSession) Receive(opts *amqp.ReceiveOptions) (*amqp.Message, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancelFn()
message, err := s.receiver.Receive(ctx, opts)
// return nil if the timeout exceeded
if err != nil && errors.Is(err, context.DeadlineExceeded) {
return nil, nil
}
return message, err
}
func (s *AmqpReceiverSession) Close() error {
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFn()
var closeErrors []error
receiverErr := s.receiver.Close(ctx)
if receiverErr != nil {
closeErrors = append(closeErrors, receiverErr)
}
sessionErr := s.session.Close(ctx)
if sessionErr != nil {
closeErrors = append(closeErrors, sessionErr)
}
if len(closeErrors) > 0 {
return errors.Join(closeErrors...)
}
return nil
}
func (s *AmqpReceiverSession) CloseSilently() {
err := s.Close()
if err != nil {
slog.Error("error closing receiver session", slog.Any("err", err))
}
}