Merged PR 679203: fix: Make messaging.API closable

Related work items: #697210
This commit is contained in:
Christian Schaible 2024-11-13 12:17:40 +00:00
parent 7623ffc225
commit 568c5cdb91
3 changed files with 50 additions and 2 deletions

View file

@ -34,6 +34,10 @@ func (m *MessagingApiMock) Send(
return args.Error(0)
}
func (m *MessagingApiMock) Close(_ context.Context) error {
return nil
}
type ProtobufValidatorMock struct {
mock.Mock
}

View file

@ -28,6 +28,13 @@ type Api interface {
//
// It returns technical errors for connection issues or sending problems.
Send(ctx context.Context, topic string, data []byte, contentType string, applicationProperties map[string]any) error
// Close the underlying connection to the messaging system.
// Parameters:
// * ctx - the context object
//
// It returns an error if the connection cannot be closed successfully
Close(ctx context.Context) error
}
// MutexApi is wrapper around an API implementation that controls mutual exclusive access to the api.
@ -36,6 +43,8 @@ type MutexApi struct {
api *Api
}
var _ Api = &MutexApi{}
func NewMutexApi(api *Api) (*Api, error) {
if api == nil {
return nil, errors.New("api is nil")
@ -56,6 +65,12 @@ func (m *MutexApi) Send(ctx context.Context, topic string, data []byte, contentT
return (*m.api).Send(ctx, topic, data, contentType, applicationProperties)
}
func (m *MutexApi) Close(ctx context.Context) error {
m.mutex.Lock()
defer m.mutex.Unlock()
return (*m.api).Close(ctx)
}
// AmqpConfig provides AMQP connection related parameters.
type AmqpConfig struct {
URL string
@ -96,6 +111,8 @@ type AmqpApi struct {
session *AmqpSession
}
var _ Api = &AmqpApi{}
func NewAmqpApi(amqpConfig AmqpConfig) (*Api, error) {
amqpApi := &AmqpApi{config: amqpConfig}
@ -111,7 +128,7 @@ func NewAmqpApi(amqpConfig AmqpConfig) (*Api, error) {
// connect opens a new connection and session to the AMQP messaging system.
// The connection attempt will be cancelled after connectionTimeoutSeconds.
func (a *AmqpApi) connect() error {
log.AuditLogger.Info("connecting to messaging system")
log.AuditLogger.Info("connecting to audit messaging system")
// Set credentials if specified
auth := amqp.SASLTypeAnonymous()
@ -211,8 +228,16 @@ func (a *AmqpApi) resetConnection(ctx context.Context) error {
_ = (*a.session).Close(ctx)
err := a.connection.Close()
if err != nil {
log.AuditLogger.Error("failed to close message connection", err)
log.AuditLogger.Error("failed to close audit messaging connection", err)
}
return a.connect()
}
// Close implements Api.Close
func (a *AmqpApi) Close(ctx context.Context) error {
log.AuditLogger.Info("close audit messaging connection")
_ = (*a.session).Close(ctx)
err := a.connection.Close()
return err
}

View file

@ -160,4 +160,23 @@ func Test_AmqpMessagingApi_Send(t *testing.T) {
assert.Equal(t, value, string(message.Data[0]))
assert.Equal(t, topicName, *message.Properties.To)
})
t.Run("Close connection without errors", func(t *testing.T) {
defer solaceContainer.StopOnError()
// Initialize the solace queue
topicSubscriptionTopicPattern := "auditlog/>"
queueName := "close-connection-without-error"
assert.NoError(t, solaceContainer.QueueCreate(ctx, queueName))
assert.NoError(t, solaceContainer.TopicSubscriptionCreate(ctx, queueName, topicSubscriptionTopicPattern))
topicName := fmt.Sprintf("topic://auditlog/%s", "amqp-close-connection")
assert.NoError(t, solaceContainer.ValidateTopicName(topicSubscriptionTopicPattern, topicName))
api := &AmqpApi{config: AmqpConfig{URL: solaceContainer.AmqpConnectionString}}
err := api.connect()
assert.NoError(t, err)
err = (*api).Close(ctx)
assert.NoError(t, err)
})
}