From dfbc4cdf93f9bbcef870fe7775855f70f771a30a Mon Sep 17 00:00:00 2001 From: Christian Schaible Date: Tue, 14 Jan 2025 07:27:43 +0100 Subject: [PATCH] fix: Fix usage of RWMutex in AmqpApi and wrap errors --- audit/messaging/messaging.go | 29 ++++++++++++++++++++++------- audit/messaging/messaging_test.go | 2 +- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/audit/messaging/messaging.go b/audit/messaging/messaging.go index 5d9d12e..b62de84 100644 --- a/audit/messaging/messaging.go +++ b/audit/messaging/messaging.go @@ -49,7 +49,7 @@ func NewAmqpApi(amqpConfig AmqpConnectionPoolConfig) (Api, error) { defer cancelFn() connectionPool, err := NewAmqpConnectionPool(ctx, amqpConfig, "sdk") if err != nil { - return nil, err + return nil, fmt.Errorf("new amqp connection pool: %w", err) } amqpApi := &AmqpApi{config: amqpConfig, @@ -65,26 +65,35 @@ func NewAmqpApi(amqpConfig AmqpConnectionPoolConfig) (Api, error) { // Send implements Api.Send. // If errors occur the connection to the messaging system will be closed and re-established. func (a *AmqpApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error { - a.lock.RLock() - defer a.lock.RUnlock() - if a.connection == nil || a.connection.IsClosed() { + a.lock.RLock() + connectionIsClosed := a.connection == nil || a.connection.IsClosed() + a.lock.RUnlock() + if connectionIsClosed { connection, err := a.connectionPool.GetConnection(a.connectionPoolHandle) if err != nil { - return err + return fmt.Errorf("get connection: %w", err) } + a.lock.Lock() a.connection = connection + a.lock.Unlock() } + a.lock.RLock() var sender = a.senderCache[topic] + a.lock.RUnlock() if sender == nil { + a.lock.RLock() ctx, cancelFn := context.WithTimeout(ctx, 10*time.Second) senderSession, err := a.connection.NewSender(ctx, topic) cancelFn() + a.lock.RUnlock() if err != nil { - return err + return fmt.Errorf("new sender: %w", err) } + a.lock.Lock() a.senderCache[topic] = senderSession + a.lock.Unlock() sender = senderSession } @@ -99,12 +108,18 @@ func (a *AmqpApi) Send(ctx context.Context, topic string, data []byte, contentTy func (a *AmqpApi) Close(ctx context.Context) error { log.AuditLogger.Info("close audit messaging connection") var closeErrors []error - if len(a.senderCache) > 0 { + a.lock.RLock() + numSenderCacheItems := len(a.senderCache) + a.lock.RUnlock() + if numSenderCacheItems > 0 { + a.lock.Lock() for _, session := range a.senderCache { if err := session.Close(); err != nil { closeErrors = append(closeErrors, fmt.Errorf("close session: %w", err)) } } + clear(a.senderCache) + a.lock.Unlock() } if a.connectionPool != nil { if err := a.connectionPool.Close(ctx); err != nil { diff --git a/audit/messaging/messaging_test.go b/audit/messaging/messaging_test.go index 40d0888..6ebcdd3 100644 --- a/audit/messaging/messaging_test.go +++ b/audit/messaging/messaging_test.go @@ -47,7 +47,7 @@ func Test_NewAmqpMessagingApi(t *testing.T) { Parameters: AmqpConnectionConfig{BrokerUrl: "not-handled-protocol://localhost:5672"}, PoolSize: 1, }) - assert.EqualError(t, err, "initialize connections: new connection: failed to connect to amqp broker: dial: unsupported scheme \"not-handled-protocol\"") + assert.EqualError(t, err, "new amqp connection pool: initialize connections: new connection: failed to connect to amqp broker: dial: unsupported scheme \"not-handled-protocol\"") } func Test_AmqpMessagingApi_Send(t *testing.T) {