mirror of
https://dev.azure.com/schwarzit/schwarzit.stackit-public/_git/audit-go
synced 2026-02-07 16:47:24 +00:00
237 lines
5.2 KiB
Go
237 lines
5.2 KiB
Go
package messaging
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Azure/go-amqp"
|
|
|
|
pkgCommon "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/messaging/common"
|
|
)
|
|
|
|
const connectionTimeoutSeconds = 10
|
|
|
|
var ErrConnectionClosed = errors.New("amqp connection is closed")
|
|
|
|
type AmqpConnection struct {
|
|
ConnectionName string
|
|
Lock sync.RWMutex
|
|
BrokerUrl string
|
|
Username string
|
|
Password string
|
|
Conn AmqpConn
|
|
Dialer amqpDial
|
|
}
|
|
|
|
// AmqpConn is an abstraction of amqp.Conn
|
|
type AmqpConn interface {
|
|
NewSession(ctx context.Context, opts *amqp.SessionOptions) (AmqpSession, error)
|
|
Close() error
|
|
Done() <-chan struct{}
|
|
}
|
|
|
|
type defaultAmqpConn struct {
|
|
Conn *amqp.Conn
|
|
}
|
|
|
|
func newDefaultAmqpConn(conn *amqp.Conn) *defaultAmqpConn {
|
|
return &defaultAmqpConn{
|
|
Conn: conn,
|
|
}
|
|
}
|
|
|
|
func (d defaultAmqpConn) NewSession(ctx context.Context, opts *amqp.SessionOptions) (AmqpSession, error) {
|
|
session, err := d.Conn.NewSession(ctx, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return newDefaultAmqpSession(session), nil
|
|
}
|
|
|
|
func (d defaultAmqpConn) Close() error {
|
|
return d.Conn.Close()
|
|
}
|
|
|
|
func (d defaultAmqpConn) Done() <-chan struct{} {
|
|
return d.Conn.Done()
|
|
}
|
|
|
|
var _ AmqpConn = (*defaultAmqpConn)(nil)
|
|
|
|
type amqpDial interface {
|
|
Dial(ctx context.Context, addr string, opts *amqp.ConnOptions) (AmqpConn, error)
|
|
}
|
|
|
|
type AmqpSession interface {
|
|
NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error)
|
|
Close(ctx context.Context) error
|
|
}
|
|
|
|
type defaultAmqpSession struct {
|
|
Session *amqp.Session
|
|
}
|
|
|
|
func newDefaultAmqpSession(session *amqp.Session) *defaultAmqpSession {
|
|
return &defaultAmqpSession{
|
|
Session: session,
|
|
}
|
|
}
|
|
|
|
func (s *defaultAmqpSession) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error) {
|
|
return s.Session.NewSender(ctx, target, opts)
|
|
}
|
|
|
|
func (s *defaultAmqpSession) Close(ctx context.Context) error {
|
|
return s.Session.Close(ctx)
|
|
}
|
|
|
|
var _ AmqpSession = (*defaultAmqpSession)(nil)
|
|
|
|
type defaultAmqpDialer struct{}
|
|
|
|
func (d *defaultAmqpDialer) Dial(ctx context.Context, addr string, opts *amqp.ConnOptions) (AmqpConn, error) {
|
|
dial, err := amqp.Dial(ctx, addr, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return newDefaultAmqpConn(dial), nil
|
|
}
|
|
|
|
var _ amqpDial = (*defaultAmqpDialer)(nil)
|
|
|
|
func NewAmqpConnection(config pkgCommon.AmqpConnectionConfig, connectionName string) *AmqpConnection {
|
|
return &AmqpConnection{
|
|
ConnectionName: connectionName,
|
|
Lock: sync.RWMutex{},
|
|
BrokerUrl: config.BrokerUrl,
|
|
Username: config.Username,
|
|
Password: config.Password,
|
|
Dialer: &defaultAmqpDialer{},
|
|
}
|
|
}
|
|
|
|
func (c *AmqpConnection) NewSender(ctx context.Context, topic string) (*AmqpSenderSession, error) {
|
|
if c.Conn == nil {
|
|
return nil, errors.New("connection is not initialized")
|
|
}
|
|
|
|
if c.internalIsClosed() {
|
|
return nil, ErrConnectionClosed
|
|
}
|
|
|
|
c.Lock.RLock()
|
|
defer c.Lock.RUnlock()
|
|
|
|
// new session
|
|
newSession, err := c.Conn.NewSession(ctx, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("new session: %w", err)
|
|
}
|
|
|
|
// new sender
|
|
newSender, err := newSession.NewSender(ctx, topic, nil)
|
|
if err != nil {
|
|
err = fmt.Errorf("new internal sender: %w", err)
|
|
|
|
closeErr := newSession.Close(ctx)
|
|
if closeErr != nil {
|
|
return nil, errors.Join(err, fmt.Errorf("close session: %w", closeErr))
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
return &AmqpSenderSession{newSession, newSender}, nil
|
|
}
|
|
|
|
func As[T any](value any, err error) (*T, error) {
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if value == nil {
|
|
return nil, nil
|
|
}
|
|
castedValue, isType := value.(*T)
|
|
if !isType {
|
|
return nil, fmt.Errorf("could not cast value: %T", value)
|
|
}
|
|
return castedValue, nil
|
|
}
|
|
|
|
func (c *AmqpConnection) Connect() error {
|
|
c.Lock.Lock()
|
|
defer c.Lock.Unlock()
|
|
|
|
subCtx, cancel := context.WithTimeout(context.Background(), connectionTimeoutSeconds*time.Second)
|
|
defer cancel()
|
|
|
|
if err := c.internalConnect(subCtx); err != nil {
|
|
return fmt.Errorf("internal connect: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *AmqpConnection) internalConnect(ctx context.Context) error {
|
|
if c.Conn == nil {
|
|
// Set credentials if specified
|
|
auth := amqp.SASLTypeAnonymous()
|
|
if c.Username != "" && c.Password != "" {
|
|
auth = amqp.SASLTypePlain(c.Username, c.Password)
|
|
} else {
|
|
slog.Debug("amqp connection: connect: using anonymous messaging")
|
|
}
|
|
options := &amqp.ConnOptions{
|
|
SASLType: auth,
|
|
}
|
|
|
|
// Initialize connection
|
|
conn, err := c.Dialer.Dial(ctx, c.BrokerUrl, options)
|
|
if err != nil {
|
|
return fmt.Errorf("dial: %w", err)
|
|
}
|
|
c.Conn = conn
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *AmqpConnection) Close() error {
|
|
c.Lock.Lock()
|
|
defer c.Lock.Unlock()
|
|
|
|
if err := c.internalClose(); err != nil {
|
|
return fmt.Errorf("internal close: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *AmqpConnection) internalClose() error {
|
|
if c.Conn != nil {
|
|
if err := c.Conn.Close(); err != nil {
|
|
return fmt.Errorf("connection close: %w", err)
|
|
}
|
|
c.Conn = nil
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *AmqpConnection) IsClosed() bool {
|
|
c.Lock.RLock()
|
|
defer c.Lock.RUnlock()
|
|
|
|
return c.internalIsClosed()
|
|
}
|
|
|
|
func (c *AmqpConnection) internalIsClosed() bool {
|
|
if c.Conn == nil {
|
|
return true
|
|
}
|
|
select {
|
|
case <-c.Conn.Done():
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|