package messaging import ( "context" "errors" "fmt" "strings" "time" "github.com/Azure/go-amqp" ) const amqpTopicPrefix = "topic://" type AmqpSender interface { Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error Close(ctx context.Context) error } type AmqpSenderSession struct { Session AmqpSession Sender AmqpSender } func (s *AmqpSenderSession) Send( topic string, data [][]byte, contentType string, applicationProperties map[string]any, ) error { // check topic name if !strings.HasPrefix(topic, amqpTopicPrefix) { return fmt.Errorf( "topic %q name lacks mandatory prefix %q", topic, amqpTopicPrefix, ) } if contentType == "" { return errors.New("content-type is required") } // prepare the amqp message message := amqp.Message{ Header: &amqp.MessageHeader{ Durable: true, }, Properties: &amqp.MessageProperties{ To: &topic, ContentType: &contentType, }, ApplicationProperties: applicationProperties, Data: data, } // send ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) defer cancelFn() return s.Sender.Send(ctx, &message, nil) } func (s *AmqpSenderSession) Close() error { ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) defer cancelFn() var closeErrors []error senderErr := s.Sender.Close(ctx) if senderErr != nil { closeErrors = append(closeErrors, senderErr) } sessionErr := s.Session.Close(ctx) if sessionErr != nil { closeErrors = append(closeErrors, sessionErr) } if len(closeErrors) > 0 { return errors.Join(closeErrors...) } return nil }