mirror of
https://dev.azure.com/schwarzit/schwarzit.stackit-public/_git/audit-go
synced 2026-02-08 09:07:26 +00:00
Compare commits
2 commits
main
...
v0.1.8.1-d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dfbc4cdf93 | ||
|
|
1aa9ff9eec |
11 changed files with 845 additions and 318 deletions
|
|
@ -30,7 +30,11 @@ func TestDynamicLegacyAuditApi(t *testing.T) {
|
||||||
defer solaceContainer.Stop()
|
defer solaceContainer.Stop()
|
||||||
|
|
||||||
// Instantiate the messaging api
|
// Instantiate the messaging api
|
||||||
messagingApi, err := messaging.NewAmqpApi(messaging.AmqpConfig{URL: solaceContainer.AmqpConnectionString})
|
messagingApi, err := messaging.NewAmqpApi(
|
||||||
|
messaging.AmqpConnectionPoolConfig{
|
||||||
|
Parameters: messaging.AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString},
|
||||||
|
PoolSize: 1,
|
||||||
|
})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Validator
|
// Validator
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,10 @@ func TestLegacyAuditApi(t *testing.T) {
|
||||||
defer solaceContainer.Stop()
|
defer solaceContainer.Stop()
|
||||||
|
|
||||||
// Instantiate the messaging api
|
// Instantiate the messaging api
|
||||||
messagingApi, err := messaging.NewAmqpApi(messaging.AmqpConfig{URL: solaceContainer.AmqpConnectionString})
|
messagingApi, err := messaging.NewAmqpApi(messaging.AmqpConnectionPoolConfig{
|
||||||
|
Parameters: messaging.AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString},
|
||||||
|
PoolSize: 1,
|
||||||
|
})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Validator
|
// Validator
|
||||||
|
|
@ -579,7 +582,10 @@ func TestLegacyAuditApi_NewLegacyAuditApi(t *testing.T) {
|
||||||
defer solaceContainer.Stop()
|
defer solaceContainer.Stop()
|
||||||
|
|
||||||
// Instantiate the messaging api
|
// Instantiate the messaging api
|
||||||
messagingApi, err := messaging.NewAmqpApi(messaging.AmqpConfig{URL: solaceContainer.AmqpConnectionString})
|
messagingApi, err := messaging.NewAmqpApi(messaging.AmqpConnectionPoolConfig{
|
||||||
|
Parameters: messaging.AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString},
|
||||||
|
PoolSize: 1,
|
||||||
|
})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Validator
|
// Validator
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,10 @@ func TestRoutableAuditApi(t *testing.T) {
|
||||||
defer solaceContainer.Stop()
|
defer solaceContainer.Stop()
|
||||||
|
|
||||||
// Instantiate the messaging api
|
// Instantiate the messaging api
|
||||||
messagingApi, err := messaging.NewAmqpApi(messaging.AmqpConfig{URL: solaceContainer.AmqpConnectionString})
|
messagingApi, err := messaging.NewAmqpApi(messaging.AmqpConnectionPoolConfig{
|
||||||
|
Parameters: messaging.AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString},
|
||||||
|
PoolSize: 1,
|
||||||
|
})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Validator
|
// Validator
|
||||||
|
|
|
||||||
16
audit/messaging/amqp_config.go
Normal file
16
audit/messaging/amqp_config.go
Normal file
|
|
@ -0,0 +1,16 @@
|
||||||
|
package messaging
|
||||||
|
|
||||||
|
const AmqpTopicPrefix = "topic://"
|
||||||
|
const connectionTimeoutSeconds = 10
|
||||||
|
const disconnectionTimeoutSeconds = 10
|
||||||
|
|
||||||
|
type AmqpConnectionConfig struct {
|
||||||
|
BrokerUrl string `json:"brokerUrl"`
|
||||||
|
Username string `json:"username"`
|
||||||
|
Password string `json:"password"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type AmqpConnectionPoolConfig struct {
|
||||||
|
Parameters AmqpConnectionConfig `json:"parameters"`
|
||||||
|
PoolSize int `json:"poolSize"`
|
||||||
|
}
|
||||||
241
audit/messaging/amqp_connection.go
Normal file
241
audit/messaging/amqp_connection.go
Normal file
|
|
@ -0,0 +1,241 @@
|
||||||
|
package messaging
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"github.com/Azure/go-amqp"
|
||||||
|
"log/slog"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ConnectionClosedError = errors.New("amqp connection is closed")
|
||||||
|
|
||||||
|
type AmqpConnection struct {
|
||||||
|
connectionName string
|
||||||
|
lock sync.RWMutex
|
||||||
|
brokerUrl string
|
||||||
|
username string
|
||||||
|
password string
|
||||||
|
conn *amqp.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAmqpConnection(config *AmqpConnectionConfig, connectionName string) *AmqpConnection {
|
||||||
|
return &AmqpConnection{
|
||||||
|
connectionName: connectionName,
|
||||||
|
lock: sync.RWMutex{},
|
||||||
|
brokerUrl: config.BrokerUrl,
|
||||||
|
username: config.Username,
|
||||||
|
password: config.Password,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *AmqpConnection) NewReceiver(ctx context.Context, source string) (*AmqpReceiverSession, error) {
|
||||||
|
if c.conn == nil {
|
||||||
|
return nil, errors.New("connection is not initialized")
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.internalIsClosed() {
|
||||||
|
return nil, ConnectionClosedError
|
||||||
|
}
|
||||||
|
|
||||||
|
c.lock.RLock()
|
||||||
|
defer c.lock.RUnlock()
|
||||||
|
|
||||||
|
// new session
|
||||||
|
session, err := c.conn.NewSession(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("new session: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// new receiver
|
||||||
|
receiver, err := session.NewReceiver(ctx, source, nil)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("new internal receiver: %w", err)
|
||||||
|
|
||||||
|
closeErr := session.Close(ctx)
|
||||||
|
if closeErr != nil {
|
||||||
|
return nil, errors.Join(fmt.Errorf("close session: %w", err), err)
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &AmqpReceiverSession{session, receiver}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *AmqpConnection) NewSender(ctx context.Context, topic string) (*AmqpSenderSession, error) {
|
||||||
|
if c.conn == nil {
|
||||||
|
return nil, errors.New("connection is not initialized")
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.internalIsClosed() {
|
||||||
|
return nil, ConnectionClosedError
|
||||||
|
}
|
||||||
|
|
||||||
|
c.lock.RLock()
|
||||||
|
defer c.lock.RUnlock()
|
||||||
|
|
||||||
|
// new session
|
||||||
|
newSession, err := c.conn.NewSession(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("new session: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// new sender
|
||||||
|
newSender, err := newSession.NewSender(ctx, topic, nil)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("new internal sender: %w", err)
|
||||||
|
|
||||||
|
closeErr := newSession.Close(ctx)
|
||||||
|
if closeErr != nil {
|
||||||
|
return nil, errors.Join(fmt.Errorf("close session: %w", err), err)
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &AmqpSenderSession{newSession, newSender}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *AmqpConnection) ResetConnectionAndRetryIfErrorWithReturnValue(opName string, fn func(ctx context.Context) (any, error)) (any, error) {
|
||||||
|
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
result, err := fn(ctx)
|
||||||
|
cancelFn()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
slog.Info(fmt.Sprintf("amqp connection: %s", opName), slog.Any("connection", c.connectionName), slog.Any("err", err))
|
||||||
|
err := c.ResetConnection(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("reset connection: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
newCtx, closeFn := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer closeFn()
|
||||||
|
|
||||||
|
// Retry
|
||||||
|
return fn(newCtx)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *AmqpConnection) ResetConnection(ctx context.Context) error {
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
|
||||||
|
subCtx, cancel := context.WithTimeout(ctx, disconnectionTimeoutSeconds*time.Second)
|
||||||
|
err := c.internalClose(subCtx)
|
||||||
|
cancel()
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("amqp connection: reset: failed to close amqp connection", slog.Any("err", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
subCtx, cancel = context.WithTimeout(ctx, connectionTimeoutSeconds*time.Second)
|
||||||
|
err = c.internalConnect(subCtx)
|
||||||
|
cancel()
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("amqp connection: reset: failed to connect to amqp server, retry..", slog.Any("err", err))
|
||||||
|
subCtx, cancel = context.WithTimeout(ctx, connectionTimeoutSeconds*time.Second)
|
||||||
|
err = c.internalConnect(subCtx)
|
||||||
|
cancel()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("connect: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func As[T any](value any, err error) (*T, error) {
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if value == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
castedValue, isType := value.(*T)
|
||||||
|
if !isType {
|
||||||
|
return nil, fmt.Errorf("could not cast value: %T", value)
|
||||||
|
}
|
||||||
|
return castedValue, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *AmqpConnection) Connect(ctx context.Context) error {
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
|
||||||
|
subCtx, cancel := context.WithTimeout(ctx, connectionTimeoutSeconds*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
return c.internalConnect(subCtx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *AmqpConnection) internalConnect(ctx context.Context) error {
|
||||||
|
if c.conn == nil {
|
||||||
|
// Set credentials if specified
|
||||||
|
auth := amqp.SASLTypeAnonymous()
|
||||||
|
if c.username != "" && c.password != "" {
|
||||||
|
auth = amqp.SASLTypePlain(c.username, c.password)
|
||||||
|
} else {
|
||||||
|
slog.Debug("amqp connection: connect: using anonymous messaging")
|
||||||
|
}
|
||||||
|
options := &amqp.ConnOptions{
|
||||||
|
SASLType: auth,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize connection
|
||||||
|
conn, err := amqp.Dial(ctx, c.brokerUrl, options)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("dial: %w", err)
|
||||||
|
}
|
||||||
|
c.conn = conn
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *AmqpConnection) Close(ctx context.Context) error {
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
|
||||||
|
subCtx, cancel := context.WithTimeout(ctx, disconnectionTimeoutSeconds*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
return c.internalClose(subCtx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *AmqpConnection) internalClose(ctx context.Context) error {
|
||||||
|
if c.conn != nil {
|
||||||
|
closeErrors := make([]error, 0)
|
||||||
|
closeErrors = c.internalCloseConnection(closeErrors)
|
||||||
|
|
||||||
|
// return errors or nil
|
||||||
|
if len(closeErrors) > 0 {
|
||||||
|
return errors.Join(closeErrors...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *AmqpConnection) internalCloseConnection(closeErrors []error) []error {
|
||||||
|
err := c.conn.Close()
|
||||||
|
if err != nil {
|
||||||
|
closeErrors = append(closeErrors, err)
|
||||||
|
}
|
||||||
|
return closeErrors
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *AmqpConnection) IsClosed() bool {
|
||||||
|
c.lock.RLock()
|
||||||
|
defer c.lock.RUnlock()
|
||||||
|
|
||||||
|
return c.internalIsClosed()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *AmqpConnection) internalIsClosed() bool {
|
||||||
|
if c.conn == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-c.conn.Done():
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
193
audit/messaging/amqp_connection_pool.go
Normal file
193
audit/messaging/amqp_connection_pool.go
Normal file
|
|
@ -0,0 +1,193 @@
|
||||||
|
package messaging
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type AmqpConnectionPool struct {
|
||||||
|
config AmqpConnectionPoolConfig
|
||||||
|
connectionName string
|
||||||
|
connections []*AmqpConnection
|
||||||
|
handleOffset int
|
||||||
|
lock sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
type ConnectionPoolHandle struct {
|
||||||
|
connectionOffset int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAmqpConnectionPool(ctx context.Context, config AmqpConnectionPoolConfig, connectionName string) (*AmqpConnectionPool, error) {
|
||||||
|
pool := &AmqpConnectionPool{
|
||||||
|
config: config,
|
||||||
|
connectionName: connectionName,
|
||||||
|
connections: make([]*AmqpConnection, 0),
|
||||||
|
handleOffset: 0,
|
||||||
|
lock: sync.RWMutex{},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pool.initializeConnections(); err != nil {
|
||||||
|
if closeErr := pool.Close(ctx); closeErr != nil {
|
||||||
|
return nil, errors.Join(err, fmt.Errorf("initialize amqp connection: pool closed: %w", closeErr))
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("initialize connections: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return pool, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *AmqpConnectionPool) initializeConnections() error {
|
||||||
|
if len(p.connections) < p.config.PoolSize {
|
||||||
|
p.lock.Lock()
|
||||||
|
defer p.lock.Unlock()
|
||||||
|
|
||||||
|
numMissingConnections := p.config.PoolSize - len(p.connections)
|
||||||
|
|
||||||
|
for i := 0; i < numMissingConnections; i++ {
|
||||||
|
if err := p.internalAddConnection(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *AmqpConnectionPool) internalAddConnection() error {
|
||||||
|
newConnection, err := p.internalNewConnection(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("new connection: %w", err)
|
||||||
|
}
|
||||||
|
p.connections = append(p.connections, newConnection)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *AmqpConnectionPool) internalNewConnection(ctx context.Context) (*AmqpConnection, error) {
|
||||||
|
conn := NewAmqpConnection(&p.config.Parameters, p.connectionName)
|
||||||
|
if err := conn.Connect(ctx); err != nil {
|
||||||
|
slog.Warn("amqp connection: failed to connect to amqp broker", slog.Any("err", err))
|
||||||
|
|
||||||
|
// retry
|
||||||
|
if err = conn.Connect(ctx); err != nil {
|
||||||
|
connectErr := fmt.Errorf("failed to connect to amqp broker: %w", err)
|
||||||
|
if closeErr := conn.Close(ctx); closeErr != nil {
|
||||||
|
return nil, errors.Join(connectErr, fmt.Errorf("close connection: %w", closeErr))
|
||||||
|
}
|
||||||
|
return nil, connectErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return conn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *AmqpConnectionPool) Close(ctx context.Context) error {
|
||||||
|
p.lock.Lock()
|
||||||
|
defer p.lock.Unlock()
|
||||||
|
|
||||||
|
closeErrors := make([]error, 0)
|
||||||
|
for _, conn := range p.connections {
|
||||||
|
if err := conn.Close(ctx); err != nil {
|
||||||
|
closeErrors = append(closeErrors, fmt.Errorf("connection: close: %w", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(closeErrors) > 0 {
|
||||||
|
return errors.Join(closeErrors...)
|
||||||
|
}
|
||||||
|
p.connections = make([]*AmqpConnection, 0)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *AmqpConnectionPool) NewHandle() *ConnectionPoolHandle {
|
||||||
|
p.lock.Lock()
|
||||||
|
defer p.lock.Unlock()
|
||||||
|
|
||||||
|
offset := p.handleOffset
|
||||||
|
p.handleOffset += 1
|
||||||
|
|
||||||
|
offset = offset % p.config.PoolSize
|
||||||
|
|
||||||
|
return &ConnectionPoolHandle{
|
||||||
|
connectionOffset: offset,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *AmqpConnectionPool) GetConnection(handle *ConnectionPoolHandle) (*AmqpConnection, error) {
|
||||||
|
// get the requested connection or another one
|
||||||
|
conn, addConnection := p.nextConnectionFromQueue(handle)
|
||||||
|
|
||||||
|
// renew the requested connection if the request connection is closed
|
||||||
|
if conn == nil || addConnection {
|
||||||
|
p.lock.Lock()
|
||||||
|
|
||||||
|
// check that accessing the pool only with a valid index (out of bounds should only occur on shutdown)
|
||||||
|
if handle.connectionOffset < len(p.connections) && p.connections[handle.connectionOffset] == nil {
|
||||||
|
ctx, cancelFn := context.WithTimeout(context.Background(), connectionTimeoutSeconds)
|
||||||
|
connection, err := p.internalNewConnection(ctx)
|
||||||
|
cancelFn()
|
||||||
|
if err != nil {
|
||||||
|
p.lock.Unlock()
|
||||||
|
return nil, fmt.Errorf("renew connection: %w", err)
|
||||||
|
}
|
||||||
|
p.connections[handle.connectionOffset] = connection
|
||||||
|
}
|
||||||
|
p.lock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// return the previously returned connection if it is not nil
|
||||||
|
if conn != nil {
|
||||||
|
return conn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// try to return the renewed connection or another one
|
||||||
|
conn, _ = p.nextConnectionFromQueue(handle)
|
||||||
|
if conn == nil {
|
||||||
|
return nil, errors.New("pool is empty")
|
||||||
|
}
|
||||||
|
return conn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *AmqpConnectionPool) nextConnectionFromQueue(handle *ConnectionPoolHandle) (*AmqpConnection, bool) {
|
||||||
|
// return the next possible index (including the retry offset)
|
||||||
|
nextIndex := func(idx int) int {
|
||||||
|
if idx+handle.connectionOffset >= p.config.PoolSize {
|
||||||
|
return idx + handle.connectionOffset - p.config.PoolSize
|
||||||
|
} else {
|
||||||
|
return idx + handle.connectionOffset
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// retry as long as there are remaining connections in the pool
|
||||||
|
var conn *AmqpConnection
|
||||||
|
var addConnection bool
|
||||||
|
for i := 0; i < p.config.PoolSize; i++ {
|
||||||
|
|
||||||
|
// get the next possible connection (considering the retry index)
|
||||||
|
idx := nextIndex(i)
|
||||||
|
p.lock.RLock()
|
||||||
|
if idx < len(p.connections) {
|
||||||
|
conn = p.connections[idx]
|
||||||
|
} else {
|
||||||
|
// handle the edge case that the pool is empty on shutdown
|
||||||
|
conn = nil
|
||||||
|
}
|
||||||
|
p.lock.RUnlock()
|
||||||
|
|
||||||
|
// remember that the requested is closed, retry with the next
|
||||||
|
if conn == nil {
|
||||||
|
addConnection = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// if the connection is closed, mark it by setting it to nil
|
||||||
|
if conn.IsClosed() {
|
||||||
|
p.lock.Lock()
|
||||||
|
p.connections[idx] = nil
|
||||||
|
p.lock.Unlock()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return conn, addConnection
|
||||||
|
}
|
||||||
|
return nil, true
|
||||||
|
}
|
||||||
71
audit/messaging/amqp_receiver_session.go
Normal file
71
audit/messaging/amqp_receiver_session.go
Normal file
|
|
@ -0,0 +1,71 @@
|
||||||
|
package messaging
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"github.com/Azure/go-amqp"
|
||||||
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type AmqpReceiverSession struct {
|
||||||
|
session *amqp.Session
|
||||||
|
receiver *amqp.Receiver
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *AmqpConnection) NewReceiverSession(source string) (*AmqpReceiverSession, error) {
|
||||||
|
if c.IsClosed() {
|
||||||
|
return nil, ConnectionClosedError
|
||||||
|
}
|
||||||
|
|
||||||
|
return As[AmqpReceiverSession](c.ResetConnectionAndRetryIfErrorWithReturnValue("newReceiverSession", func(ctx context.Context) (any, error) {
|
||||||
|
c.lock.RLock()
|
||||||
|
defer c.lock.RUnlock()
|
||||||
|
|
||||||
|
return c.NewReceiver(ctx, source)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *AmqpReceiverSession) AcceptMessage(message *amqp.Message) error {
|
||||||
|
ctx, cancelFn := context.WithTimeout(context.Background(), 1*time.Second)
|
||||||
|
defer cancelFn()
|
||||||
|
return s.receiver.AcceptMessage(ctx, message)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *AmqpReceiverSession) Receive(opts *amqp.ReceiveOptions) (*amqp.Message, error) {
|
||||||
|
ctx, cancelFn := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||||
|
defer cancelFn()
|
||||||
|
message, err := s.receiver.Receive(ctx, opts)
|
||||||
|
// return nil if the timeout exceeded
|
||||||
|
if err != nil && errors.Is(err, context.DeadlineExceeded) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return message, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *AmqpReceiverSession) Close() error {
|
||||||
|
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancelFn()
|
||||||
|
|
||||||
|
var closeErrors []error
|
||||||
|
receiverErr := s.receiver.Close(ctx)
|
||||||
|
if receiverErr != nil {
|
||||||
|
closeErrors = append(closeErrors, receiverErr)
|
||||||
|
}
|
||||||
|
sessionErr := s.session.Close(ctx)
|
||||||
|
if sessionErr != nil {
|
||||||
|
closeErrors = append(closeErrors, sessionErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(closeErrors) > 0 {
|
||||||
|
return errors.Join(closeErrors...)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *AmqpReceiverSession) CloseSilently() {
|
||||||
|
err := s.Close()
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("error closing receiver session", slog.Any("err", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
94
audit/messaging/amqp_sender_session.go
Normal file
94
audit/messaging/amqp_sender_session.go
Normal file
|
|
@ -0,0 +1,94 @@
|
||||||
|
package messaging
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"github.com/Azure/go-amqp"
|
||||||
|
"log/slog"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type AmqpSenderSession struct {
|
||||||
|
session *amqp.Session
|
||||||
|
sender *amqp.Sender
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *AmqpConnection) NewSenderSession(source string) (*AmqpSenderSession, error) {
|
||||||
|
if c.IsClosed() {
|
||||||
|
return nil, ConnectionClosedError
|
||||||
|
}
|
||||||
|
|
||||||
|
return As[AmqpSenderSession](c.ResetConnectionAndRetryIfErrorWithReturnValue("newSenderSession", func(ctx context.Context) (any, error) {
|
||||||
|
c.lock.RLock()
|
||||||
|
defer c.lock.RUnlock()
|
||||||
|
|
||||||
|
return c.NewSender(ctx, source)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *AmqpSenderSession) Send(
|
||||||
|
topic string,
|
||||||
|
data [][]byte,
|
||||||
|
contentType string,
|
||||||
|
applicationProperties map[string]any,
|
||||||
|
) error {
|
||||||
|
// check topic name
|
||||||
|
if !strings.HasPrefix(topic, AmqpTopicPrefix) {
|
||||||
|
return fmt.Errorf(
|
||||||
|
"topic %q name lacks mandatory prefix %q",
|
||||||
|
topic,
|
||||||
|
AmqpTopicPrefix,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
if contentType == "" {
|
||||||
|
return errors.New("content-type is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
// prepare the amqp message
|
||||||
|
message := amqp.Message{
|
||||||
|
Header: &amqp.MessageHeader{
|
||||||
|
Durable: true,
|
||||||
|
},
|
||||||
|
Properties: &amqp.MessageProperties{
|
||||||
|
To: &topic,
|
||||||
|
ContentType: &contentType,
|
||||||
|
},
|
||||||
|
ApplicationProperties: applicationProperties,
|
||||||
|
Data: data,
|
||||||
|
}
|
||||||
|
|
||||||
|
// send
|
||||||
|
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancelFn()
|
||||||
|
return s.sender.Send(ctx, &message, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *AmqpSenderSession) Close() error {
|
||||||
|
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancelFn()
|
||||||
|
|
||||||
|
var closeErrors []error
|
||||||
|
senderErr := s.sender.Close(ctx)
|
||||||
|
if senderErr != nil {
|
||||||
|
closeErrors = append(closeErrors, senderErr)
|
||||||
|
}
|
||||||
|
sessionErr := s.session.Close(ctx)
|
||||||
|
if sessionErr != nil {
|
||||||
|
closeErrors = append(closeErrors, sessionErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(closeErrors) > 0 {
|
||||||
|
return errors.Join(closeErrors...)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *AmqpSenderSession) CloseSilently() {
|
||||||
|
err := s.Close()
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("error closing sender session", slog.Any("err", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -2,19 +2,13 @@ package messaging
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/log"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/log"
|
|
||||||
"github.com/Azure/go-amqp"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Default connection timeout for the AMQP connection
|
|
||||||
const connectionTimeoutSeconds = 10
|
|
||||||
|
|
||||||
// Api is an abstraction for a messaging system that can be used to send
|
// Api is an abstraction for a messaging system that can be used to send
|
||||||
// audit logs to the audit log system.
|
// audit logs to the audit log system.
|
||||||
type Api interface {
|
type Api interface {
|
||||||
|
|
@ -38,203 +32,102 @@ type Api interface {
|
||||||
Close(ctx context.Context) error
|
Close(ctx context.Context) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// MutexApi is wrapper around an API implementation that controls mutual exclusive access to the api.
|
|
||||||
type MutexApi struct {
|
|
||||||
mutex sync.Mutex
|
|
||||||
api Api
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ Api = &MutexApi{}
|
|
||||||
|
|
||||||
func NewMutexApi(api Api) (Api, error) {
|
|
||||||
if api == nil {
|
|
||||||
return nil, errors.New("api is nil")
|
|
||||||
}
|
|
||||||
mutexApi := MutexApi{
|
|
||||||
mutex: sync.Mutex{},
|
|
||||||
api: api,
|
|
||||||
}
|
|
||||||
|
|
||||||
var genericApi Api = &mutexApi
|
|
||||||
return genericApi, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send implements Api.Send
|
|
||||||
func (m *MutexApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
|
|
||||||
m.mutex.Lock()
|
|
||||||
defer m.mutex.Unlock()
|
|
||||||
return m.api.Send(ctx, topic, data, contentType, applicationProperties)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MutexApi) Close(ctx context.Context) error {
|
|
||||||
m.mutex.Lock()
|
|
||||||
defer m.mutex.Unlock()
|
|
||||||
return m.api.Close(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
// AmqpConfig provides AMQP connection related parameters.
|
|
||||||
type AmqpConfig struct {
|
|
||||||
URL string
|
|
||||||
User string
|
|
||||||
Password string
|
|
||||||
}
|
|
||||||
|
|
||||||
// AmqpSession is an abstraction providing a subset of the methods of amqp.Session
|
|
||||||
type AmqpSession interface {
|
|
||||||
NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error)
|
|
||||||
Close(ctx context.Context) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type AmqpSessionWrapper struct {
|
|
||||||
session *amqp.Session
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w AmqpSessionWrapper) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error) {
|
|
||||||
return w.session.NewSender(ctx, target, opts)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w AmqpSessionWrapper) Close(ctx context.Context) error {
|
|
||||||
return w.session.Close(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
// AmqpSender is an abstraction providing a subset of the methods of amqp.Sender
|
|
||||||
type AmqpSender interface {
|
|
||||||
Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error
|
|
||||||
Close(ctx context.Context) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// AmqpApi implements Api.
|
// AmqpApi implements Api.
|
||||||
type AmqpApi struct {
|
type AmqpApi struct {
|
||||||
config AmqpConfig
|
config AmqpConnectionPoolConfig
|
||||||
connection *amqp.Conn
|
connection *AmqpConnection
|
||||||
session AmqpSession
|
connectionPool *AmqpConnectionPool
|
||||||
|
connectionPoolHandle *ConnectionPoolHandle
|
||||||
|
senderCache map[string]*AmqpSenderSession
|
||||||
|
lock sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Api = &AmqpApi{}
|
var _ Api = &AmqpApi{}
|
||||||
|
|
||||||
func NewAmqpApi(amqpConfig AmqpConfig) (Api, error) {
|
func NewAmqpApi(amqpConfig AmqpConnectionPoolConfig) (Api, error) {
|
||||||
amqpApi := &AmqpApi{config: amqpConfig}
|
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancelFn()
|
||||||
if err := amqpApi.connect(); err != nil {
|
connectionPool, err := NewAmqpConnectionPool(ctx, amqpConfig, "sdk")
|
||||||
return nil, fmt.Errorf("connect to broker: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return amqpApi, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// connect opens a new connection and session to the AMQP messaging system.
|
|
||||||
// The connection attempt will be cancelled after connectionTimeoutSeconds.
|
|
||||||
func (a *AmqpApi) connect() error {
|
|
||||||
log.AuditLogger.Info("connecting to audit messaging system")
|
|
||||||
|
|
||||||
// Set credentials if specified
|
|
||||||
auth := amqp.SASLTypeAnonymous()
|
|
||||||
|
|
||||||
if a.config.User != "" && a.config.Password != "" {
|
|
||||||
auth = amqp.SASLTypePlain(a.config.User, a.config.Password)
|
|
||||||
log.AuditLogger.Info("using username and password for messaging")
|
|
||||||
} else {
|
|
||||||
log.AuditLogger.Warn("using anonymous messaging!")
|
|
||||||
}
|
|
||||||
|
|
||||||
options := &amqp.ConnOptions{
|
|
||||||
SASLType: auth,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create new context with timeout for the connection initialization
|
|
||||||
subCtx, cancel := context.WithTimeout(context.Background(), connectionTimeoutSeconds*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// Initialize connection
|
|
||||||
conn, err := amqp.Dial(subCtx, a.config.URL, options)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("dial connection to broker: %w", err)
|
return nil, fmt.Errorf("new amqp connection pool: %w", err)
|
||||||
}
|
|
||||||
a.connection = conn
|
|
||||||
|
|
||||||
// Initialize session
|
|
||||||
session, err := conn.NewSession(context.Background(), nil)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("create session: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var amqpSession AmqpSession = &AmqpSessionWrapper{session: session}
|
amqpApi := &AmqpApi{config: amqpConfig,
|
||||||
a.session = amqpSession
|
connectionPool: connectionPool,
|
||||||
|
connectionPoolHandle: connectionPool.NewHandle(),
|
||||||
|
senderCache: make(map[string]*AmqpSenderSession),
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
var messagingApi Api = amqpApi
|
||||||
|
return messagingApi, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send implements Api.Send.
|
// Send implements Api.Send.
|
||||||
// If errors occur the connection to the messaging system will be closed and re-established.
|
// If errors occur the connection to the messaging system will be closed and re-established.
|
||||||
func (a *AmqpApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
|
func (a *AmqpApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
|
||||||
err := a.trySend(ctx, topic, data, contentType, applicationProperties)
|
|
||||||
if err == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Drop the current sender, as it cannot connect to the broker anymore
|
a.lock.RLock()
|
||||||
log.AuditLogger.Error("message sender error, recreating", err)
|
connectionIsClosed := a.connection == nil || a.connection.IsClosed()
|
||||||
|
a.lock.RUnlock()
|
||||||
err = a.resetConnection(ctx)
|
if connectionIsClosed {
|
||||||
if err != nil {
|
connection, err := a.connectionPool.GetConnection(a.connectionPoolHandle)
|
||||||
return fmt.Errorf("reset connection: %w", err)
|
if err != nil {
|
||||||
}
|
return fmt.Errorf("get connection: %w", err)
|
||||||
|
|
||||||
return a.trySend(ctx, topic, data, contentType, applicationProperties)
|
|
||||||
}
|
|
||||||
|
|
||||||
// trySend actually sends the given data as amqp.Message to the messaging system.
|
|
||||||
func (a *AmqpApi) trySend(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
|
|
||||||
if !strings.HasPrefix(topic, AmqpTopicPrefix) {
|
|
||||||
return fmt.Errorf(
|
|
||||||
"topic %q name lacks mandatory prefix %q",
|
|
||||||
topic,
|
|
||||||
AmqpTopicPrefix,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
sender, err := a.session.NewSender(ctx, topic, nil)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("new sender: %w", err)
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
if err := sender.Close(ctx); err != nil {
|
|
||||||
log.AuditLogger.Error("failed to close session sender", err)
|
|
||||||
}
|
}
|
||||||
}()
|
a.lock.Lock()
|
||||||
|
a.connection = connection
|
||||||
bytes := [][]byte{data}
|
a.lock.Unlock()
|
||||||
message := amqp.Message{
|
|
||||||
Header: &amqp.MessageHeader{
|
|
||||||
Durable: true,
|
|
||||||
},
|
|
||||||
Properties: &amqp.MessageProperties{
|
|
||||||
To: &topic,
|
|
||||||
ContentType: &contentType,
|
|
||||||
},
|
|
||||||
ApplicationProperties: applicationProperties,
|
|
||||||
Data: bytes,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err = sender.Send(ctx, &message, nil)
|
a.lock.RLock()
|
||||||
if err != nil {
|
var sender = a.senderCache[topic]
|
||||||
return fmt.Errorf("send message: %w", err)
|
a.lock.RUnlock()
|
||||||
|
if sender == nil {
|
||||||
|
a.lock.RLock()
|
||||||
|
ctx, cancelFn := context.WithTimeout(ctx, 10*time.Second)
|
||||||
|
senderSession, err := a.connection.NewSender(ctx, topic)
|
||||||
|
cancelFn()
|
||||||
|
a.lock.RUnlock()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("new sender: %w", err)
|
||||||
|
}
|
||||||
|
a.lock.Lock()
|
||||||
|
a.senderCache[topic] = senderSession
|
||||||
|
a.lock.Unlock()
|
||||||
|
sender = senderSession
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wrappedData := [][]byte{data}
|
||||||
|
if err := sender.Send(topic, wrappedData, contentType, applicationProperties); err != nil {
|
||||||
|
return fmt.Errorf("send: %w", err)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// resetConnection closes the current session and connection and reconnects to the messaging system.
|
|
||||||
func (a *AmqpApi) resetConnection(ctx context.Context) error {
|
|
||||||
if err := a.Close(ctx); err != nil {
|
|
||||||
log.AuditLogger.Error("failed to close audit messaging connection", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return a.connect()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close implements Api.Close
|
// Close implements Api.Close
|
||||||
func (a *AmqpApi) Close(ctx context.Context) error {
|
func (a *AmqpApi) Close(ctx context.Context) error {
|
||||||
log.AuditLogger.Info("close audit messaging connection")
|
log.AuditLogger.Info("close audit messaging connection")
|
||||||
return errors.Join(a.session.Close(ctx), a.connection.Close())
|
var closeErrors []error
|
||||||
|
a.lock.RLock()
|
||||||
|
numSenderCacheItems := len(a.senderCache)
|
||||||
|
a.lock.RUnlock()
|
||||||
|
if numSenderCacheItems > 0 {
|
||||||
|
a.lock.Lock()
|
||||||
|
for _, session := range a.senderCache {
|
||||||
|
if err := session.Close(); err != nil {
|
||||||
|
closeErrors = append(closeErrors, fmt.Errorf("close session: %w", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
clear(a.senderCache)
|
||||||
|
a.lock.Unlock()
|
||||||
|
}
|
||||||
|
if a.connectionPool != nil {
|
||||||
|
if err := a.connectionPool.Close(ctx); err != nil {
|
||||||
|
closeErrors = append(closeErrors, fmt.Errorf("close pool: %w", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(closeErrors) > 0 {
|
||||||
|
return fmt.Errorf("close: %w", errors.Join(closeErrors...))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,52 +2,52 @@ package messaging
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Azure/go-amqp"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/mock"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type AmqpSessionMock struct {
|
//type AmqpSessionMock struct {
|
||||||
mock.Mock
|
// mock.Mock
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
func (m *AmqpSessionMock) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error) {
|
//func (m *AmqpSessionMock) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error) {
|
||||||
args := m.Called(ctx, target, opts)
|
// args := m.Called(ctx, target, opts)
|
||||||
var sender AmqpSender = nil
|
// var sender AmqpSender = nil
|
||||||
if args.Get(0) != nil {
|
// if args.Get(0) != nil {
|
||||||
sender = args.Get(0).(AmqpSender)
|
// sender = args.Get(0).(AmqpSender)
|
||||||
}
|
// }
|
||||||
err := args.Error(1)
|
// err := args.Error(1)
|
||||||
return sender, err
|
// return sender, err
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
func (m *AmqpSessionMock) Close(ctx context.Context) error {
|
//func (m *AmqpSessionMock) Close(ctx context.Context) error {
|
||||||
args := m.Called(ctx)
|
// args := m.Called(ctx)
|
||||||
return args.Error(0)
|
// return args.Error(0)
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
type AmqpSenderMock struct {
|
//type AmqpSenderMock struct {
|
||||||
mock.Mock
|
// mock.Mock
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
func (m *AmqpSenderMock) Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error {
|
//func (m *AmqpSenderMock) Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error {
|
||||||
args := m.Called(ctx, msg, opts)
|
// args := m.Called(ctx, msg, opts)
|
||||||
return args.Error(0)
|
// return args.Error(0)
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
func (m *AmqpSenderMock) Close(ctx context.Context) error {
|
//func (m *AmqpSenderMock) Close(ctx context.Context) error {
|
||||||
args := m.Called(ctx)
|
// args := m.Called(ctx)
|
||||||
return args.Error(0)
|
// return args.Error(0)
|
||||||
}
|
//}
|
||||||
|
|
||||||
func Test_NewAmqpMessagingApi(t *testing.T) {
|
func Test_NewAmqpMessagingApi(t *testing.T) {
|
||||||
_, err := NewAmqpApi(AmqpConfig{URL: "not-handled-protocol://localhost:5672"})
|
_, err := NewAmqpApi(
|
||||||
assert.EqualError(t, err, "connect to broker: dial connection to broker: unsupported scheme \"not-handled-protocol\"")
|
AmqpConnectionPoolConfig{
|
||||||
|
Parameters: AmqpConnectionConfig{BrokerUrl: "not-handled-protocol://localhost:5672"},
|
||||||
|
PoolSize: 1,
|
||||||
|
})
|
||||||
|
assert.EqualError(t, err, "new amqp connection pool: initialize connections: new connection: failed to connect to amqp broker: dial: unsupported scheme \"not-handled-protocol\"")
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_AmqpMessagingApi_Send(t *testing.T) {
|
func Test_AmqpMessagingApi_Send(t *testing.T) {
|
||||||
|
|
@ -63,11 +63,14 @@ func Test_AmqpMessagingApi_Send(t *testing.T) {
|
||||||
t.Run("Missing topic prefix", func(t *testing.T) {
|
t.Run("Missing topic prefix", func(t *testing.T) {
|
||||||
defer solaceContainer.StopOnError()
|
defer solaceContainer.StopOnError()
|
||||||
|
|
||||||
api, err := NewAmqpApi(AmqpConfig{URL: solaceContainer.AmqpConnectionString})
|
api, err := NewAmqpApi(AmqpConnectionPoolConfig{
|
||||||
|
Parameters: AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString},
|
||||||
|
PoolSize: 1,
|
||||||
|
})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
err = api.Send(ctx, "topic-name", []byte{}, "application/json", make(map[string]any))
|
err = api.Send(ctx, "topic-name", []byte{}, "application/json", make(map[string]any))
|
||||||
assert.EqualError(t, err, "topic \"topic-name\" name lacks mandatory prefix \"topic://\"")
|
assert.EqualError(t, err, "send: topic \"topic-name\" name lacks mandatory prefix \"topic://\"")
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("Close connection without errors", func(t *testing.T) {
|
t.Run("Close connection without errors", func(t *testing.T) {
|
||||||
|
|
@ -81,103 +84,107 @@ func Test_AmqpMessagingApi_Send(t *testing.T) {
|
||||||
topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-close-connection")
|
topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-close-connection")
|
||||||
assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName))
|
assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName))
|
||||||
|
|
||||||
api := &AmqpApi{config: AmqpConfig{URL: solaceContainer.AmqpConnectionString}}
|
api, err := NewAmqpApi(AmqpConnectionPoolConfig{
|
||||||
err := api.connect()
|
Parameters: AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString},
|
||||||
|
PoolSize: 1,
|
||||||
|
})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
err = api.Close(ctx)
|
err = api.Close(ctx)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("New sender call returns error", func(t *testing.T) {
|
//t.Run("New sender call returns error", func(t *testing.T) {
|
||||||
defer solaceContainer.StopOnError()
|
// defer solaceContainer.StopOnError()
|
||||||
|
//
|
||||||
// Initialize the solace queue
|
// // Initialize the solace queue
|
||||||
topicSubscriptionTopicPattern := "auditlog/>"
|
// topicSubscriptionTopicPattern := "auditlog/>"
|
||||||
queueName := "messaging-new-sender"
|
// queueName := "messaging-new-sender"
|
||||||
assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName))
|
// assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName))
|
||||||
assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern))
|
// assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern))
|
||||||
topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-no-new-sender")
|
// topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-no-new-sender")
|
||||||
assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName))
|
// assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName))
|
||||||
|
//
|
||||||
api := &AmqpApi{config: AmqpConfig{URL: solaceContainer.AmqpConnectionString}}
|
// api, err := NewAmqpApi(AmqpConnectionPoolConfig{
|
||||||
err := api.connect()
|
// Parameters: AmqpConnectionConfig{BrokerUrl: solaceContainer.AmqpConnectionString},
|
||||||
assert.NoError(t, err)
|
// PoolSize: 1,
|
||||||
|
// })
|
||||||
expectedError := errors.New("expected error")
|
// assert.NoError(t, err)
|
||||||
|
//
|
||||||
// Set mock session
|
// expectedError := errors.New("expected error")
|
||||||
sessionMock := AmqpSessionMock{}
|
//
|
||||||
sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(nil, expectedError)
|
// // Set mock session
|
||||||
sessionMock.On("Close", mock.Anything).Return(nil)
|
// sessionMock := AmqpSessionMock{}
|
||||||
|
// sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(nil, expectedError)
|
||||||
var amqpSession AmqpSession = &sessionMock
|
// sessionMock.On("Close", mock.Anything).Return(nil)
|
||||||
api.session = amqpSession
|
//
|
||||||
|
// var amqpSession AmqpSession = &sessionMock
|
||||||
// It's expected that the test succeeds.
|
// api.session = amqpSession
|
||||||
// First the session is closed as it returns the expected error
|
//
|
||||||
// Then the retry mechanism restarts the connection and successfully sends the data
|
// // It's expected that the test succeeds.
|
||||||
value := "test"
|
// // First the session is closed as it returns the expected error
|
||||||
err = api.Send(ctx, topicName, []byte(value), "application/json", make(map[string]any))
|
// // Then the retry mechanism restarts the connection and successfully sends the data
|
||||||
assert.NoError(t, err)
|
// value := "test"
|
||||||
|
// err = api.Send(ctx, topicName, []byte(value), "application/json", make(map[string]any))
|
||||||
// Check that the mock was called
|
// assert.NoError(t, err)
|
||||||
assert.True(t, sessionMock.AssertNumberOfCalls(t, "NewSender", 1))
|
//
|
||||||
assert.True(t, sessionMock.AssertNumberOfCalls(t, "Close", 1))
|
// // Check that the mock was called
|
||||||
|
// assert.True(t, sessionMock.AssertNumberOfCalls(t, "NewSender", 1))
|
||||||
message, err := solaceContainer.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), true)
|
// assert.True(t, sessionMock.AssertNumberOfCalls(t, "Close", 1))
|
||||||
assert.NoError(t, err)
|
//
|
||||||
assert.Equal(t, value, string(message.Data[0]))
|
// message, err := solaceContainer.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), true)
|
||||||
assert.Equal(t, topicName, *message.Properties.To)
|
// assert.NoError(t, err)
|
||||||
})
|
// assert.Equal(t, value, string(message.Data[0]))
|
||||||
|
// assert.Equal(t, topicName, *message.Properties.To)
|
||||||
t.Run("Send call on sender returns error", func(t *testing.T) {
|
//})
|
||||||
defer solaceContainer.StopOnError()
|
//
|
||||||
|
//t.Run("Send call on sender returns error", func(t *testing.T) {
|
||||||
// Initialize the solace queue
|
// defer solaceContainer.StopOnError()
|
||||||
topicSubscriptionTopicPattern := "auditlog/>"
|
//
|
||||||
queueName := "messaging-sender-error"
|
// // Initialize the solace queue
|
||||||
assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName))
|
// topicSubscriptionTopicPattern := "auditlog/>"
|
||||||
assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern))
|
// queueName := "messaging-sender-error"
|
||||||
topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-sender-error")
|
// assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName))
|
||||||
assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName))
|
// assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern))
|
||||||
|
// topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-sender-error")
|
||||||
api := &AmqpApi{config: AmqpConfig{URL: solaceContainer.AmqpConnectionString}}
|
// assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName))
|
||||||
err := api.connect()
|
//
|
||||||
assert.NoError(t, err)
|
// api := &AmqpApi{config: AmqpConfig{URL: solaceContainer.AmqpConnectionString}}
|
||||||
|
// err := api.connect()
|
||||||
expectedError := errors.New("expected error")
|
// assert.NoError(t, err)
|
||||||
|
//
|
||||||
// Instantiate mock sender
|
// expectedError := errors.New("expected error")
|
||||||
senderMock := AmqpSenderMock{}
|
//
|
||||||
senderMock.On("Send", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(expectedError)
|
// // Instantiate mock sender
|
||||||
senderMock.On("Close", mock.Anything).Return(nil)
|
// senderMock := AmqpSenderMock{}
|
||||||
var amqpSender AmqpSender = &senderMock
|
// senderMock.On("Send", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(expectedError)
|
||||||
|
// senderMock.On("Close", mock.Anything).Return(nil)
|
||||||
// Set mock session
|
// var amqpSender AmqpSender = &senderMock
|
||||||
sessionMock := AmqpSessionMock{}
|
//
|
||||||
sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(&amqpSender, nil)
|
// // Set mock session
|
||||||
sessionMock.On("Close", mock.Anything).Return(nil)
|
// sessionMock := AmqpSessionMock{}
|
||||||
|
// sessionMock.On("NewSender", mock.Anything, mock.Anything, mock.Anything).Return(&amqpSender, nil)
|
||||||
var amqpSession AmqpSession = &sessionMock
|
// sessionMock.On("Close", mock.Anything).Return(nil)
|
||||||
api.session = amqpSession
|
//
|
||||||
|
// var amqpSession AmqpSession = &sessionMock
|
||||||
// It's expected that the test succeeds.
|
// api.session = amqpSession
|
||||||
// First the sender and session are closed as the sender returns the expected error
|
//
|
||||||
// Then the retry mechanism restarts the connection and successfully sends the data
|
// // It's expected that the test succeeds.
|
||||||
value := "test"
|
// // First the sender and session are closed as the sender returns the expected error
|
||||||
err = api.Send(ctx, topicName, []byte(value), "application/json", make(map[string]any))
|
// // Then the retry mechanism restarts the connection and successfully sends the data
|
||||||
assert.NoError(t, err)
|
// value := "test"
|
||||||
|
// err = api.Send(ctx, topicName, []byte(value), "application/json", make(map[string]any))
|
||||||
// Check that the mocks were called
|
// assert.NoError(t, err)
|
||||||
assert.True(t, sessionMock.AssertNumberOfCalls(t, "NewSender", 1))
|
//
|
||||||
assert.True(t, sessionMock.AssertNumberOfCalls(t, "Close", 1))
|
// // Check that the mocks were called
|
||||||
assert.True(t, senderMock.AssertNumberOfCalls(t, "Send", 1))
|
// assert.True(t, sessionMock.AssertNumberOfCalls(t, "NewSender", 1))
|
||||||
assert.True(t, senderMock.AssertNumberOfCalls(t, "Close", 1))
|
// assert.True(t, sessionMock.AssertNumberOfCalls(t, "Close", 1))
|
||||||
|
// assert.True(t, senderMock.AssertNumberOfCalls(t, "Send", 1))
|
||||||
message, err := solaceContainer.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), true)
|
// assert.True(t, senderMock.AssertNumberOfCalls(t, "Close", 1))
|
||||||
assert.NoError(t, err)
|
//
|
||||||
assert.Equal(t, value, string(message.Data[0]))
|
// message, err := solaceContainer.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), true)
|
||||||
assert.Equal(t, topicName, *message.Properties.To)
|
// assert.NoError(t, err)
|
||||||
})
|
// assert.Equal(t, value, string(message.Data[0]))
|
||||||
|
// assert.Equal(t, topicName, *message.Properties.To)
|
||||||
|
//})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,6 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
AmqpTopicPrefix = "topic://"
|
|
||||||
AmqpQueuePrefix = "queue://"
|
AmqpQueuePrefix = "queue://"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue