Merged PR 710739: feat: Add error wrapping in amqp messaging

Security-concept-update-needed: false.

JIRA Work Item: STACKITALO-63
This commit is contained in:
Lukas Schmitt 2025-01-08 09:16:52 +00:00 committed by Christian Schaible
parent 6041fd105a
commit c90ce29c51
2 changed files with 13 additions and 16 deletions

View file

@ -90,9 +90,7 @@ type AmqpSessionWrapper struct {
} }
func (w AmqpSessionWrapper) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error) { func (w AmqpSessionWrapper) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AmqpSender, error) {
sender, err := w.session.NewSender(ctx, target, opts) return w.session.NewSender(ctx, target, opts)
var amqpSender AmqpSender = sender
return amqpSender, err
} }
func (w AmqpSessionWrapper) Close(ctx context.Context) error { func (w AmqpSessionWrapper) Close(ctx context.Context) error {
@ -117,13 +115,11 @@ var _ Api = &AmqpApi{}
func NewAmqpApi(amqpConfig AmqpConfig) (Api, error) { func NewAmqpApi(amqpConfig AmqpConfig) (Api, error) {
amqpApi := &AmqpApi{config: amqpConfig} amqpApi := &AmqpApi{config: amqpConfig}
err := amqpApi.connect() if err := amqpApi.connect(); err != nil {
if err != nil { return nil, fmt.Errorf("connect to broker: %w", err)
return nil, err
} }
var messagingApi Api = amqpApi return amqpApi, nil
return messagingApi, nil
} }
// connect opens a new connection and session to the AMQP messaging system. // connect opens a new connection and session to the AMQP messaging system.
@ -152,14 +148,14 @@ func (a *AmqpApi) connect() error {
// Initialize connection // Initialize connection
conn, err := amqp.Dial(subCtx, a.config.URL, options) conn, err := amqp.Dial(subCtx, a.config.URL, options)
if err != nil { if err != nil {
return err return fmt.Errorf("dial connection to broker: %w", err)
} }
a.connection = conn a.connection = conn
// Initialize session // Initialize session
session, err := conn.NewSession(context.Background(), nil) session, err := conn.NewSession(context.Background(), nil)
if err != nil { if err != nil {
return err return fmt.Errorf("create session: %w", err)
} }
var amqpSession AmqpSession = &AmqpSessionWrapper{session: session} var amqpSession AmqpSession = &AmqpSessionWrapper{session: session}
@ -181,7 +177,7 @@ func (a *AmqpApi) Send(ctx context.Context, topic string, data []byte, contentTy
err = a.resetConnection(ctx) err = a.resetConnection(ctx)
if err != nil { if err != nil {
return err return fmt.Errorf("reset connection: %w", err)
} }
return a.trySend(ctx, topic, data, contentType, applicationProperties) return a.trySend(ctx, topic, data, contentType, applicationProperties)
@ -199,7 +195,7 @@ func (a *AmqpApi) trySend(ctx context.Context, topic string, data []byte, conten
sender, err := a.session.NewSender(ctx, topic, nil) sender, err := a.session.NewSender(ctx, topic, nil)
if err != nil { if err != nil {
return err return fmt.Errorf("new sender: %w", err)
} }
defer func() { defer func() {
if err := sender.Close(ctx); err != nil { if err := sender.Close(ctx); err != nil {
@ -222,7 +218,7 @@ func (a *AmqpApi) trySend(ctx context.Context, topic string, data []byte, conten
err = sender.Send(ctx, &message, nil) err = sender.Send(ctx, &message, nil)
if err != nil { if err != nil {
return err return fmt.Errorf("send message: %w", err)
} }
return nil return nil

View file

@ -4,11 +4,12 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"testing"
"time"
"github.com/Azure/go-amqp" "github.com/Azure/go-amqp"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"testing"
"time"
) )
type AmqpSessionMock struct { type AmqpSessionMock struct {
@ -46,7 +47,7 @@ func (m *AmqpSenderMock) Close(ctx context.Context) error {
func Test_NewAmqpMessagingApi(t *testing.T) { func Test_NewAmqpMessagingApi(t *testing.T) {
_, err := NewAmqpApi(AmqpConfig{URL: "not-handled-protocol://localhost:5672"}) _, err := NewAmqpApi(AmqpConfig{URL: "not-handled-protocol://localhost:5672"})
assert.EqualError(t, err, "unsupported scheme \"not-handled-protocol\"") assert.EqualError(t, err, "connect to broker: dial connection to broker: unsupported scheme \"not-handled-protocol\"")
} }
func Test_AmqpMessagingApi_Send(t *testing.T) { func Test_AmqpMessagingApi_Send(t *testing.T) {