package messaging import ( "context" "errors" "github.com/Azure/go-amqp" "log/slog" "time" ) type AmqpReceiverSession struct { session *amqp.Session receiver *amqp.Receiver } func (c *AmqpConnection) NewReceiverSession(source string) (*AmqpReceiverSession, error) { if c.IsClosed() { return nil, ConnectionClosedError } return As[AmqpReceiverSession](c.ResetConnectionAndRetryIfErrorWithReturnValue("newReceiverSession", func(ctx context.Context) (any, error) { c.lock.RLock() defer c.lock.RUnlock() return c.NewReceiver(ctx, source) })) } func (s *AmqpReceiverSession) AcceptMessage(message *amqp.Message) error { ctx, cancelFn := context.WithTimeout(context.Background(), 1*time.Second) defer cancelFn() return s.receiver.AcceptMessage(ctx, message) } func (s *AmqpReceiverSession) Receive(opts *amqp.ReceiveOptions) (*amqp.Message, error) { ctx, cancelFn := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancelFn() message, err := s.receiver.Receive(ctx, opts) // return nil if the timeout exceeded if err != nil && errors.Is(err, context.DeadlineExceeded) { return nil, nil } return message, err } func (s *AmqpReceiverSession) Close() error { ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) defer cancelFn() var closeErrors []error receiverErr := s.receiver.Close(ctx) if receiverErr != nil { closeErrors = append(closeErrors, receiverErr) } sessionErr := s.session.Close(ctx) if sessionErr != nil { closeErrors = append(closeErrors, sessionErr) } if len(closeErrors) > 0 { return errors.Join(closeErrors...) } return nil } func (s *AmqpReceiverSession) CloseSilently() { err := s.Close() if err != nil { slog.Error("error closing receiver session", slog.Any("err", err)) } }