diff --git a/audit/messaging/messaging.go b/audit/messaging/messaging.go index 7fe7d43..d94b349 100644 --- a/audit/messaging/messaging.go +++ b/audit/messaging/messaging.go @@ -90,9 +90,7 @@ type AmqpSessionWrapper struct { } func (w AmqpSessionWrapper) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error) { - sender, err := w.session.NewSender(ctx, target, opts) - var amqpSender AmqpSender = sender - return amqpSender, err + return w.session.NewSender(ctx, target, opts) } func (w AmqpSessionWrapper) Close(ctx context.Context) error { @@ -117,13 +115,11 @@ var _ Api = &AmqpApi{} func NewAmqpApi(amqpConfig AmqpConfig) (Api, error) { amqpApi := &AmqpApi{config: amqpConfig} - err := amqpApi.connect() - if err != nil { - return nil, err + if err := amqpApi.connect(); err != nil { + return nil, fmt.Errorf("connect to broker: %w", err) } - var messagingApi Api = amqpApi - return messagingApi, nil + return amqpApi, nil } // connect opens a new connection and session to the AMQP messaging system. @@ -152,14 +148,14 @@ func (a *AmqpApi) connect() error { // Initialize connection conn, err := amqp.Dial(subCtx, a.config.URL, options) if err != nil { - return err + return fmt.Errorf("dial connection to broker: %w", err) } a.connection = conn // Initialize session session, err := conn.NewSession(context.Background(), nil) if err != nil { - return err + return fmt.Errorf("create session: %w", err) } var amqpSession AmqpSession = &AmqpSessionWrapper{session: session} @@ -181,7 +177,7 @@ func (a *AmqpApi) Send(ctx context.Context, topic string, data []byte, contentTy err = a.resetConnection(ctx) if err != nil { - return err + return fmt.Errorf("reset connection: %w", err) } return a.trySend(ctx, topic, data, contentType, applicationProperties) @@ -199,7 +195,7 @@ func (a *AmqpApi) trySend(ctx context.Context, topic string, data []byte, conten sender, err := a.session.NewSender(ctx, topic, nil) if err != nil { - return err + return fmt.Errorf("new sender: %w", err) } defer func() { if err := sender.Close(ctx); err != nil { @@ -222,7 +218,7 @@ func (a *AmqpApi) trySend(ctx context.Context, topic string, data []byte, conten err = sender.Send(ctx, &message, nil) if err != nil { - return err + return fmt.Errorf("send message: %w", err) } return nil diff --git a/audit/messaging/messaging_test.go b/audit/messaging/messaging_test.go index e57a3eb..d6b1a5b 100644 --- a/audit/messaging/messaging_test.go +++ b/audit/messaging/messaging_test.go @@ -4,11 +4,12 @@ import ( "context" "errors" "fmt" + "testing" + "time" + "github.com/Azure/go-amqp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "testing" - "time" ) type AmqpSessionMock struct { @@ -46,7 +47,7 @@ func (m *AmqpSenderMock) Close(ctx context.Context) error { func Test_NewAmqpMessagingApi(t *testing.T) { _, err := NewAmqpApi(AmqpConfig{URL: "not-handled-protocol://localhost:5672"}) - assert.EqualError(t, err, "unsupported scheme \"not-handled-protocol\"") + assert.EqualError(t, err, "connect to broker: dial connection to broker: unsupported scheme \"not-handled-protocol\"") } func Test_AmqpMessagingApi_Send(t *testing.T) {