mirror of
https://dev.azure.com/schwarzit/schwarzit.stackit-public/_git/audit-go
synced 2026-02-11 02:27:25 +00:00
71 lines
1.8 KiB
Go
71 lines
1.8 KiB
Go
package messaging
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"github.com/Azure/go-amqp"
|
|
"log/slog"
|
|
"time"
|
|
)
|
|
|
|
type AmqpReceiverSession struct {
|
|
session *amqp.Session
|
|
receiver *amqp.Receiver
|
|
}
|
|
|
|
func (c *AmqpConnection) NewReceiverSession(source string) (*AmqpReceiverSession, error) {
|
|
if c.IsClosed() {
|
|
return nil, ConnectionClosedError
|
|
}
|
|
|
|
return As[AmqpReceiverSession](c.ResetConnectionAndRetryIfErrorWithReturnValue("newReceiverSession", func(ctx context.Context) (any, error) {
|
|
c.lock.RLock()
|
|
defer c.lock.RUnlock()
|
|
|
|
return c.NewReceiver(ctx, source)
|
|
}))
|
|
}
|
|
|
|
func (s *AmqpReceiverSession) AcceptMessage(message *amqp.Message) error {
|
|
ctx, cancelFn := context.WithTimeout(context.Background(), 1*time.Second)
|
|
defer cancelFn()
|
|
return s.receiver.AcceptMessage(ctx, message)
|
|
}
|
|
|
|
func (s *AmqpReceiverSession) Receive(opts *amqp.ReceiveOptions) (*amqp.Message, error) {
|
|
ctx, cancelFn := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
defer cancelFn()
|
|
message, err := s.receiver.Receive(ctx, opts)
|
|
// return nil if the timeout exceeded
|
|
if err != nil && errors.Is(err, context.DeadlineExceeded) {
|
|
return nil, nil
|
|
}
|
|
return message, err
|
|
}
|
|
|
|
func (s *AmqpReceiverSession) Close() error {
|
|
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancelFn()
|
|
|
|
var closeErrors []error
|
|
receiverErr := s.receiver.Close(ctx)
|
|
if receiverErr != nil {
|
|
closeErrors = append(closeErrors, receiverErr)
|
|
}
|
|
sessionErr := s.session.Close(ctx)
|
|
if sessionErr != nil {
|
|
closeErrors = append(closeErrors, sessionErr)
|
|
}
|
|
|
|
if len(closeErrors) > 0 {
|
|
return errors.Join(closeErrors...)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *AmqpReceiverSession) CloseSilently() {
|
|
err := s.Close()
|
|
if err != nil {
|
|
slog.Error("error closing receiver session", slog.Any("err", err))
|
|
}
|
|
}
|