diff --git a/audit/api/api_common_test.go b/audit/api/api_common_test.go index c775f43..5d988c9 100644 --- a/audit/api/api_common_test.go +++ b/audit/api/api_common_test.go @@ -34,6 +34,10 @@ func (m *MessagingApiMock) Send( return args.Error(0) } +func (m *MessagingApiMock) Close(_ context.Context) error { + return nil +} + type ProtobufValidatorMock struct { mock.Mock } diff --git a/audit/messaging/messaging.go b/audit/messaging/messaging.go index 5e0f977..0c72a8f 100644 --- a/audit/messaging/messaging.go +++ b/audit/messaging/messaging.go @@ -28,6 +28,13 @@ type Api interface { // // It returns technical errors for connection issues or sending problems. Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error + + // Close the underlying connection to the messaging system. + // Parameters: + // * ctx - the context object + // + // It returns an error if the connection cannot be closed successfully + Close(ctx context.Context) error } // MutexApi is wrapper around an API implementation that controls mutual exclusive access to the api. @@ -36,6 +43,8 @@ type MutexApi struct { api *Api } +var _ Api = &MutexApi{} + func NewMutexApi(api *Api) (*Api, error) { if api == nil { return nil, errors.New("api is nil") @@ -56,6 +65,12 @@ func (m *MutexApi) Send(ctx context.Context, topic string, data []byte, contentT return (*m.api).Send(ctx, topic, data, contentType, applicationProperties) } +func (m *MutexApi) Close(ctx context.Context) error { + m.mutex.Lock() + defer m.mutex.Unlock() + return (*m.api).Close(ctx) +} + // AmqpConfig provides AMQP connection related parameters. type AmqpConfig struct { URL string @@ -96,6 +111,8 @@ type AmqpApi struct { session *AmqpSession } +var _ Api = &AmqpApi{} + func NewAmqpApi(amqpConfig AmqpConfig) (*Api, error) { amqpApi := &AmqpApi{config: amqpConfig} @@ -111,7 +128,7 @@ func NewAmqpApi(amqpConfig AmqpConfig) (*Api, error) { // connect opens a new connection and session to the AMQP messaging system. // The connection attempt will be cancelled after connectionTimeoutSeconds. func (a *AmqpApi) connect() error { - log.AuditLogger.Info("connecting to messaging system") + log.AuditLogger.Info("connecting to audit messaging system") // Set credentials if specified auth := amqp.SASLTypeAnonymous() @@ -211,8 +228,16 @@ func (a *AmqpApi) resetConnection(ctx context.Context) error { _ = (*a.session).Close(ctx) err := a.connection.Close() if err != nil { - log.AuditLogger.Error("failed to close message connection", err) + log.AuditLogger.Error("failed to close audit messaging connection", err) } return a.connect() } + +// Close implements Api.Close +func (a *AmqpApi) Close(ctx context.Context) error { + log.AuditLogger.Info("close audit messaging connection") + _ = (*a.session).Close(ctx) + err := a.connection.Close() + return err +} diff --git a/audit/messaging/messaging_test.go b/audit/messaging/messaging_test.go index 973b17f..5cde8ae 100644 --- a/audit/messaging/messaging_test.go +++ b/audit/messaging/messaging_test.go @@ -160,4 +160,23 @@ func Test_AmqpMessagingApi_Send(t *testing.T) { assert.Equal(t, value, string(message.Data[0])) assert.Equal(t, topicName, *message.Properties.To) }) + + t.Run("Close connection without errors", func(t *testing.T) { + defer solaceContainer.StopOnError() + + // Initialize the solace queue + topicSubscriptionTopicPattern := "auditlog/>" + queueName := "close-connection-without-error" + assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName)) + assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern)) + topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-close-connection") + assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName)) + + api := &AmqpApi{config: AmqpConfig{URL: solaceContainer.AmqpConnectionString}} + err := api.connect() + assert.NoError(t, err) + + err = (*api).Close(ctx) + assert.NoError(t, err) + }) }