From 23f99b06680136a53ac51283b89a1b198c573435 Mon Sep 17 00:00:00 2001 From: Christian Schaible Date: Tue, 30 Jul 2024 13:17:02 +0200 Subject: [PATCH] Add mutex api wrapper --- README.md | 1 - audit/api/api_legacy.go | 2 +- audit/messaging/messaging.go | 28 ++++++++++++++++++++++++++++ 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 304706b..9232c05 100644 --- a/README.md +++ b/README.md @@ -197,7 +197,6 @@ Python) will be extracted into separate repositories. - Finalizing messaging schema - Check if fields and constraints are correct and compatible with expected use cases - Check that routing api specific parameters are independent of AuditEventLog -- Clarify if a mutex is needed? Should we provide a MutexMessagingApi? - Clarify if `client.go` file can be used for licence / legal reasons - Extraction of python / java configurations and code - Clean up repo (delete main.go, etc. files) diff --git a/audit/api/api_legacy.go b/audit/api/api_legacy.go index 026c3f7..2dc832e 100644 --- a/audit/api/api_legacy.go +++ b/audit/api/api_legacy.go @@ -42,7 +42,7 @@ type LegacyAuditApi struct { validator *ProtobufValidator } -// NewLegacyAuditApi can be used to initialize the audit log api with LegacyAuditLogConnectionDetails. +// NewLegacyAuditApi can be used to initialize the audit log api. // // Note: The NewLegacyAuditApi method will be deprecated and replaced with "newRoutableAuditApi" once the new audit log routing is implemented func NewLegacyAuditApi( diff --git a/audit/messaging/messaging.go b/audit/messaging/messaging.go index 3e16f8d..cfae248 100644 --- a/audit/messaging/messaging.go +++ b/audit/messaging/messaging.go @@ -2,10 +2,12 @@ package messaging import ( "context" + "errors" "fmt" "github.com/Azure/go-amqp" "log/slog" "strings" + "sync" "time" ) @@ -28,6 +30,32 @@ type Api interface { Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error } +// MutexApi is wrapper around an API implementation that controls mutual exclusive access to the api. +type MutexApi struct { + mutex *sync.Mutex + api *Api +} + +func NewMutexApi(api *Api) (*Api, error) { + if api == nil { + return nil, errors.New("api is nil") + } + mutexApi := MutexApi{ + mutex: &sync.Mutex{}, + api: api, + } + + var genericApi Api = &mutexApi + return &genericApi, nil +} + +// Send implements Api.Send +func (m *MutexApi) Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error { + m.mutex.Lock() + defer m.mutex.Unlock() + return (*m.api).Send(ctx, topic, data, contentType, applicationProperties) +} + // AmqpConfig provides AMQP connection related parameters. type AmqpConfig struct { URL string