package messaging import ( "context" "errors" "fmt" "github.com/Azure/go-amqp" "log/slog" "strings" "time" ) type AmqpSenderSession struct { session *amqp.Session sender *amqp.Sender } func (c *AmqpConnection) NewSenderSession(source string) (*AmqpSenderSession, error) { if c.IsClosed() { return nil, ConnectionClosedError } return As[AmqpSenderSession](c.ResetConnectionAndRetryIfErrorWithReturnValue("newSenderSession", func(ctx context.Context) (any, error) { c.lock.RLock() defer c.lock.RUnlock() return c.NewSender(ctx, source) })) } func (s *AmqpSenderSession) Send( topic string, data [][]byte, contentType string, applicationProperties map[string]any, ) error { // check topic name if !strings.HasPrefix(topic, AmqpTopicPrefix) { return fmt.Errorf( "topic %q name lacks mandatory prefix %q", topic, AmqpTopicPrefix, ) } if contentType == "" { return errors.New("content-type is required") } // prepare the amqp message message := amqp.Message{ Header: &amqp.MessageHeader{ Durable: true, }, Properties: &amqp.MessageProperties{ To: &topic, ContentType: &contentType, }, ApplicationProperties: applicationProperties, Data: data, } // send ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) defer cancelFn() return s.sender.Send(ctx, &message, nil) } func (s *AmqpSenderSession) Close() error { ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) defer cancelFn() var closeErrors []error senderErr := s.sender.Close(ctx) if senderErr != nil { closeErrors = append(closeErrors, senderErr) } sessionErr := s.session.Close(ctx) if sessionErr != nil { closeErrors = append(closeErrors, sessionErr) } if len(closeErrors) > 0 { return errors.Join(closeErrors...) } return nil } func (s *AmqpSenderSession) CloseSilently() { err := s.Close() if err != nil { slog.Error("error closing sender session", slog.Any("err", err)) } }