fix: Fix usage of RWMutex in AmqpApi and wrap errors

This commit is contained in:
Christian Schaible 2025-01-14 07:27:43 +01:00
parent 1aa9ff9eec
commit dfbc4cdf93
2 changed files with 23 additions and 8 deletions

View file

@ -49,7 +49,7 @@ func NewAmqpApi(amqpConfig AmqpConnectionPoolConfig) (Api, error) {
defer cancelFn()
connectionPool, err := NewAmqpConnectionPool(ctx, amqpConfig, "sdk")
if err != nil {
return nil, err
return nil, fmt.Errorf("new amqp connection pool: %w", err)
}
amqpApi := &AmqpApi{config: amqpConfig,
@ -65,26 +65,35 @@ func NewAmqpApi(amqpConfig AmqpConnectionPoolConfig) (Api, error) {
// Send implements Api.Send.
// If errors occur the connection to the messaging system will be closed and re-established.
func (a *AmqpApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error {
a.lock.RLock()
defer a.lock.RUnlock()
if a.connection == nil || a.connection.IsClosed() {
a.lock.RLock()
connectionIsClosed := a.connection == nil || a.connection.IsClosed()
a.lock.RUnlock()
if connectionIsClosed {
connection, err := a.connectionPool.GetConnection(a.connectionPoolHandle)
if err != nil {
return err
return fmt.Errorf("get connection: %w", err)
}
a.lock.Lock()
a.connection = connection
a.lock.Unlock()
}
a.lock.RLock()
var sender = a.senderCache[topic]
a.lock.RUnlock()
if sender == nil {
a.lock.RLock()
ctx, cancelFn := context.WithTimeout(ctx, 10*time.Second)
senderSession, err := a.connection.NewSender(ctx, topic)
cancelFn()
a.lock.RUnlock()
if err != nil {
return err
return fmt.Errorf("new sender: %w", err)
}
a.lock.Lock()
a.senderCache[topic] = senderSession
a.lock.Unlock()
sender = senderSession
}
@ -99,12 +108,18 @@ func (a *AmqpApi) Send(ctx context.Context, topic string, data []byte, contentTy
func (a *AmqpApi) Close(ctx context.Context) error {
log.AuditLogger.Info("close audit messaging connection")
var closeErrors []error
if len(a.senderCache) > 0 {
a.lock.RLock()
numSenderCacheItems := len(a.senderCache)
a.lock.RUnlock()
if numSenderCacheItems > 0 {
a.lock.Lock()
for _, session := range a.senderCache {
if err := session.Close(); err != nil {
closeErrors = append(closeErrors, fmt.Errorf("close session: %w", err))
}
}
clear(a.senderCache)
a.lock.Unlock()
}
if a.connectionPool != nil {
if err := a.connectionPool.Close(ctx); err != nil {

View file

@ -47,7 +47,7 @@ func Test_NewAmqpMessagingApi(t *testing.T) {
Parameters: AmqpConnectionConfig{BrokerUrl: "not-handled-protocol://localhost:5672"},
PoolSize: 1,
})
assert.EqualError(t, err, "initialize connections: new connection: failed to connect to amqp broker: dial: unsupported scheme \"not-handled-protocol\"")
assert.EqualError(t, err, "new amqp connection pool: initialize connections: new connection: failed to connect to amqp broker: dial: unsupported scheme \"not-handled-protocol\"")
}
func Test_AmqpMessagingApi_Send(t *testing.T) {