package messaging import ( "context" "errors" "fmt" "log/slog" "sync" "time" "github.com/Azure/go-amqp" pkgCommon "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/messaging/common" ) const connectionTimeoutSeconds = 10 var ErrConnectionClosed = errors.New("amqp connection is closed") type AmqpConnection struct { ConnectionName string Lock sync.RWMutex BrokerUrl string Username string Password string Conn AmqpConn Dialer amqpDial } // AmqpConn is an abstraction of amqp.Conn type AmqpConn interface { NewSession(ctx context.Context, opts *amqp.SessionOptions) (AmqpSession, error) Close() error Done() <-chan struct{} } type defaultAmqpConn struct { Conn *amqp.Conn } func newDefaultAmqpConn(conn *amqp.Conn) *defaultAmqpConn { return &defaultAmqpConn{ Conn: conn, } } func (d defaultAmqpConn) NewSession(ctx context.Context, opts *amqp.SessionOptions) (AmqpSession, error) { session, err := d.Conn.NewSession(ctx, opts) if err != nil { return nil, err } return newDefaultAmqpSession(session), nil } func (d defaultAmqpConn) Close() error { return d.Conn.Close() } func (d defaultAmqpConn) Done() <-chan struct{} { return d.Conn.Done() } var _ AmqpConn = (*defaultAmqpConn)(nil) type amqpDial interface { Dial(ctx context.Context, addr string, opts *amqp.ConnOptions) (AmqpConn, error) } type AmqpSession interface { NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error) Close(ctx context.Context) error } type defaultAmqpSession struct { Session *amqp.Session } func newDefaultAmqpSession(session *amqp.Session) *defaultAmqpSession { return &defaultAmqpSession{ Session: session, } } func (s *defaultAmqpSession) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error) { return s.Session.NewSender(ctx, target, opts) } func (s *defaultAmqpSession) Close(ctx context.Context) error { return s.Session.Close(ctx) } var _ AmqpSession = (*defaultAmqpSession)(nil) type defaultAmqpDialer struct{} func (d *defaultAmqpDialer) Dial(ctx context.Context, addr string, opts *amqp.ConnOptions) (AmqpConn, error) { dial, err := amqp.Dial(ctx, addr, opts) if err != nil { return nil, err } return newDefaultAmqpConn(dial), nil } var _ amqpDial = (*defaultAmqpDialer)(nil) func NewAmqpConnection(config pkgCommon.AmqpConnectionConfig, connectionName string) *AmqpConnection { return &AmqpConnection{ ConnectionName: connectionName, Lock: sync.RWMutex{}, BrokerUrl: config.BrokerUrl, Username: config.Username, Password: config.Password, Dialer: &defaultAmqpDialer{}, } } func (c *AmqpConnection) NewSender(ctx context.Context, topic string) (*AmqpSenderSession, error) { if c.Conn == nil { return nil, errors.New("connection is not initialized") } if c.internalIsClosed() { return nil, ErrConnectionClosed } c.Lock.RLock() defer c.Lock.RUnlock() // new session newSession, err := c.Conn.NewSession(ctx, nil) if err != nil { return nil, fmt.Errorf("new session: %w", err) } // new sender newSender, err := newSession.NewSender(ctx, topic, nil) if err != nil { err = fmt.Errorf("new internal sender: %w", err) closeErr := newSession.Close(ctx) if closeErr != nil { return nil, errors.Join(err, fmt.Errorf("close session: %w", closeErr)) } return nil, err } return &AmqpSenderSession{newSession, newSender}, nil } func As[T any](value any, err error) (*T, error) { if err != nil { return nil, err } if value == nil { return nil, nil } castedValue, isType := value.(*T) if !isType { return nil, fmt.Errorf("could not cast value: %T", value) } return castedValue, nil } func (c *AmqpConnection) Connect() error { c.Lock.Lock() defer c.Lock.Unlock() subCtx, cancel := context.WithTimeout(context.Background(), connectionTimeoutSeconds*time.Second) defer cancel() if err := c.internalConnect(subCtx); err != nil { return fmt.Errorf("internal connect: %w", err) } return nil } func (c *AmqpConnection) internalConnect(ctx context.Context) error { if c.Conn == nil { // Set credentials if specified auth := amqp.SASLTypeAnonymous() if c.Username != "" && c.Password != "" { auth = amqp.SASLTypePlain(c.Username, c.Password) } else { slog.Debug("amqp connection: connect: using anonymous messaging") } options := &amqp.ConnOptions{ SASLType: auth, } // Initialize connection conn, err := c.Dialer.Dial(ctx, c.BrokerUrl, options) if err != nil { return fmt.Errorf("dial: %w", err) } c.Conn = conn } return nil } func (c *AmqpConnection) Close() error { c.Lock.Lock() defer c.Lock.Unlock() if err := c.internalClose(); err != nil { return fmt.Errorf("internal close: %w", err) } return nil } func (c *AmqpConnection) internalClose() error { if c.Conn != nil { if err := c.Conn.Close(); err != nil { return fmt.Errorf("connection close: %w", err) } c.Conn = nil } return nil } func (c *AmqpConnection) IsClosed() bool { c.Lock.RLock() defer c.Lock.RUnlock() return c.internalIsClosed() } func (c *AmqpConnection) internalIsClosed() bool { if c.Conn == nil { return true } select { case <-c.Conn.Done(): return true default: return false } }