From 6041fd105a0489f2f8ed873ca565b6c122a87ea4 Mon Sep 17 00:00:00 2001 From: Lukas Schmitt Date: Fri, 3 Jan 2025 12:19:11 +0000 Subject: [PATCH] Merged PR 710571: refactor connection, session and sender handling during reconnect * logging all errors related to the session and connection close * closing the sender properly not only in case of an error Security-concept-update-needed: false. JIRA Work Item: STACKITALO-61 --- audit/messaging/messaging.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/audit/messaging/messaging.go b/audit/messaging/messaging.go index bc9ee76..7fe7d43 100644 --- a/audit/messaging/messaging.go +++ b/audit/messaging/messaging.go @@ -2,13 +2,14 @@ package messaging import ( "context" - "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/log" "errors" "fmt" - "github.com/Azure/go-amqp" "strings" "sync" "time" + + "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/log" + "github.com/Azure/go-amqp" ) // Default connection timeout for the AMQP connection @@ -200,6 +201,11 @@ func (a *AmqpApi) trySend(ctx context.Context, topic string, data []byte, conten if err != nil { return err } + defer func() { + if err := sender.Close(ctx); err != nil { + log.AuditLogger.Error("failed to close session sender", err) + } + }() bytes := [][]byte{data} message := amqp.Message{ @@ -216,7 +222,6 @@ func (a *AmqpApi) trySend(ctx context.Context, topic string, data []byte, conten err = sender.Send(ctx, &message, nil) if err != nil { - _ = sender.Close(ctx) return err } @@ -225,9 +230,7 @@ func (a *AmqpApi) trySend(ctx context.Context, topic string, data []byte, conten // resetConnection closes the current session and connection and reconnects to the messaging system. func (a *AmqpApi) resetConnection(ctx context.Context) error { - _ = a.session.Close(ctx) - err := a.connection.Close() - if err != nil { + if err := a.Close(ctx); err != nil { log.AuditLogger.Error("failed to close audit messaging connection", err) } @@ -237,7 +240,5 @@ func (a *AmqpApi) resetConnection(ctx context.Context) error { // Close implements Api.Close func (a *AmqpApi) Close(ctx context.Context) error { log.AuditLogger.Info("close audit messaging connection") - _ = a.session.Close(ctx) - err := a.connection.Close() - return err + return errors.Join(a.session.Close(ctx), a.connection.Close()) }