audit-go/audit/messaging/amqp_sender_session.go
2025-01-13 16:42:12 +01:00

94 lines
2 KiB
Go

package messaging
import (
"context"
"errors"
"fmt"
"github.com/Azure/go-amqp"
"log/slog"
"strings"
"time"
)
type AmqpSenderSession struct {
session *amqp.Session
sender *amqp.Sender
}
func (c *AmqpConnection) NewSenderSession(source string) (*AmqpSenderSession, error) {
if c.IsClosed() {
return nil, ConnectionClosedError
}
return As[AmqpSenderSession](c.ResetConnectionAndRetryIfErrorWithReturnValue("newSenderSession", func(ctx context.Context) (any, error) {
c.lock.RLock()
defer c.lock.RUnlock()
return c.NewSender(ctx, source)
}))
}
func (s *AmqpSenderSession) Send(
topic string,
data [][]byte,
contentType string,
applicationProperties map[string]any,
) error {
// check topic name
if !strings.HasPrefix(topic, AmqpTopicPrefix) {
return fmt.Errorf(
"topic %q name lacks mandatory prefix %q",
topic,
AmqpTopicPrefix,
)
}
if contentType == "" {
return errors.New("content-type is required")
}
// prepare the amqp message
message := amqp.Message{
Header: &amqp.MessageHeader{
Durable: true,
},
Properties: &amqp.MessageProperties{
To: &topic,
ContentType: &contentType,
},
ApplicationProperties: applicationProperties,
Data: data,
}
// send
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFn()
return s.sender.Send(ctx, &message, nil)
}
func (s *AmqpSenderSession) Close() error {
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFn()
var closeErrors []error
senderErr := s.sender.Close(ctx)
if senderErr != nil {
closeErrors = append(closeErrors, senderErr)
}
sessionErr := s.session.Close(ctx)
if sessionErr != nil {
closeErrors = append(closeErrors, sessionErr)
}
if len(closeErrors) > 0 {
return errors.Join(closeErrors...)
}
return nil
}
func (s *AmqpSenderSession) CloseSilently() {
err := s.Close()
if err != nil {
slog.Error("error closing sender session", slog.Any("err", err))
}
}