mirror of
https://dev.azure.com/schwarzit/schwarzit.stackit-public/_git/audit-go
synced 2026-02-07 16:47:24 +00:00
81 lines
1.6 KiB
Go
81 lines
1.6 KiB
Go
package messaging
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/Azure/go-amqp"
|
|
)
|
|
|
|
const amqpTopicPrefix = "topic://"
|
|
|
|
type AmqpSender interface {
|
|
Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error
|
|
Close(ctx context.Context) error
|
|
}
|
|
|
|
type AmqpSenderSession struct {
|
|
Session AmqpSession
|
|
Sender AmqpSender
|
|
}
|
|
|
|
func (s *AmqpSenderSession) Send(
|
|
topic string,
|
|
data [][]byte,
|
|
contentType string,
|
|
applicationProperties map[string]any,
|
|
) error {
|
|
// check topic name
|
|
if !strings.HasPrefix(topic, amqpTopicPrefix) {
|
|
return fmt.Errorf(
|
|
"topic %q name lacks mandatory prefix %q",
|
|
topic,
|
|
amqpTopicPrefix,
|
|
)
|
|
}
|
|
|
|
if contentType == "" {
|
|
return errors.New("content-type is required")
|
|
}
|
|
|
|
// prepare the amqp message
|
|
message := amqp.Message{
|
|
Header: &amqp.MessageHeader{
|
|
Durable: true,
|
|
},
|
|
Properties: &amqp.MessageProperties{
|
|
To: &topic,
|
|
ContentType: &contentType,
|
|
},
|
|
ApplicationProperties: applicationProperties,
|
|
Data: data,
|
|
}
|
|
|
|
// send
|
|
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancelFn()
|
|
return s.Sender.Send(ctx, &message, nil)
|
|
}
|
|
|
|
func (s *AmqpSenderSession) Close() error {
|
|
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancelFn()
|
|
|
|
var closeErrors []error
|
|
senderErr := s.Sender.Close(ctx)
|
|
if senderErr != nil {
|
|
closeErrors = append(closeErrors, senderErr)
|
|
}
|
|
sessionErr := s.Session.Close(ctx)
|
|
if sessionErr != nil {
|
|
closeErrors = append(closeErrors, sessionErr)
|
|
}
|
|
|
|
if len(closeErrors) > 0 {
|
|
return errors.Join(closeErrors...)
|
|
}
|
|
return nil
|
|
}
|