audit-go/audit/messaging/solace.go
Christian Schaible 5742604629 Merged PR 716929: feat: Replace AMQP connection management
So far the SDK provided a messaging API that was not thread-safe (i.e. goroutine-safe). Additionally the SDK provided a MutexAPI which made it thread-safe at the cost of removed concurrency possibilities. The changes implemented in this commit replace both implementations with a thread-safe connection pool based solution.

The api gateway is a SDK user that requires reliable high performance send capabilities with a limit amount of amqp connections. These changes in the PR try address their requirements by moving the responsibility of connection management into the SDK. From this change other SDK users will benefit as well.

Security-concept-update-needed: false.

JIRA Work Item: STACKITALO-62
2025-01-27 13:23:54 +00:00

437 lines
12 KiB
Go

package messaging
import (
"bytes"
"context"
"dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/log"
"encoding/json"
"errors"
"fmt"
"github.com/Azure/go-amqp"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"io"
"net/http"
"regexp"
"strings"
"time"
)
const (
AmqpQueuePrefix = "queue://"
)
var ErrResourceNotFound = errors.New("resource not found")
type SempClient struct {
client http.Client
sempApiBaseUrl string
username string
password string
}
func (c SempClient) RequestWithoutBody(ctx context.Context, method string, url string) error {
request, err := http.NewRequestWithContext(ctx, method, fmt.Sprintf("%s%s", c.sempApiBaseUrl, url), nil)
if err != nil {
return err
}
response, err := c.doRequest(request)
if err != nil {
return err
}
_, err = c.parseResponseAsObject(response)
return err
}
func (c SempClient) RequestWithBody(ctx context.Context, method string, url string, body any) error {
data, err := json.Marshal(body)
if err != nil {
return err
}
request, err := http.NewRequestWithContext(ctx, method, fmt.Sprintf("%s%s", c.sempApiBaseUrl, url), bytes.NewBuffer(data))
if err != nil {
return err
}
response, err := c.doRequest(request)
if err != nil {
return err
}
_, err = c.parseResponseAsObject(response)
return err
}
func (c SempClient) doRequest(request *http.Request) ([]byte, error) {
request.SetBasicAuth(c.username, c.password)
if request.Method != http.MethodGet {
request.Header.Set("Content-Type", "application/json")
}
response, err := c.client.Do(request)
if err != nil {
return nil, err
}
defer response.Body.Close()
rawBody, err := io.ReadAll(response.Body)
if err != nil || (response.StatusCode != http.StatusOK && response.StatusCode != http.StatusBadRequest) {
return nil, fmt.Errorf("request to %v failes with status %v (%v), response:\n%s", response.StatusCode, response.Status, request.URL, rawBody)
}
if _, err := io.Copy(io.Discard, response.Body); err != nil {
return nil, fmt.Errorf("response processing error for call to %v", request.URL)
}
return rawBody, nil
}
func (c SempClient) parseResponseAsObject(dataResponse []byte) (map[string]any, error) {
data := map[string]any{}
err := json.Unmarshal(dataResponse, &data)
if err != nil {
return nil, fmt.Errorf("could not parse response:\n%s", dataResponse)
}
rawData, ok := data["data"]
if ok {
data, _ = rawData.(map[string]any)
return data, nil
} else {
metadata, ok := data["meta"]
if ok {
data, _ = metadata.(map[string]any)
if data["responseCode"].(float64) == http.StatusOK {
// http-delete
return nil, nil
}
description := data["error"].(map[string]interface{})["description"].(string)
status := data["error"].(map[string]interface{})["status"].(string)
if status == "NOT_FOUND" {
// resource not found
return nil, fmt.Errorf("request failed - description: %v, status: %v, %w", description, status, ErrResourceNotFound)
}
return nil, fmt.Errorf("request failed - description: %v, status: %v", description, status)
}
}
return nil, fmt.Errorf("could not parse response:\n%s", dataResponse)
}
// SolaceContainer wraps a testcontainers docker container instance of solace.
//
// The container must be terminated by calling:
// solaceContainer.Terminate(ctx)
type SolaceContainer struct {
testcontainers.Container
AmqpConnectionString string
sempClient SempClient
}
// NewSolaceContainer starts a container and waits until it is ready to be used.
func NewSolaceContainer(ctx context.Context) (*SolaceContainer, error) {
env := make(map[string]string)
env["username_admin_globalaccesslevel"] = "admin"
env["username_admin_password"] = "admin"
// Start docker container
request := testcontainers.ContainerRequest{
Image: "solace/solace-pubsub-standard:10.8",
ExposedPorts: []string{"5672/tcp", "8080/tcp"},
SkipReaper: true,
AutoRemove: true,
ShmSize: 1024 * 1024 * 1024, // 1 GB,
Env: env,
WaitingFor: wait.ForLog("Running pre-startup checks:").
WithStartupTimeout(90 * time.Second),
}
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: request,
Started: true,
})
if err != nil {
return nil, err
}
// Extract host and port information
host, err := container.Host(ctx)
if err != nil {
_ = container.Terminate(ctx)
return nil, err
}
amqpPort, err := container.MappedPort(ctx, "5672")
if err != nil {
_ = container.Terminate(ctx)
return nil, err
}
sempPort, err := container.MappedPort(ctx, "8080")
if err != nil {
_ = container.Terminate(ctx)
return nil, err
}
log.AuditLogger.Info("UI Port: " + sempPort.Port())
// Construct connection strings
amqpConnectionString := fmt.Sprintf("amqp://%s:%s/", host, amqpPort.Port())
sempApiBaseUrl := fmt.Sprintf("http://%s:%s/SEMP/v2", host, sempPort.Port())
// Construct SEMP client
sempClient := SempClient{client: http.Client{}, sempApiBaseUrl: sempApiBaseUrl, username: "admin", password: "admin"}
// Poll queue endpoint until solace is ready to interact
solaceStarting := true
for solaceStarting {
err := sempClient.RequestWithoutBody(
ctx,
"GET",
"/config/msgVpns/default/queues/test",
)
if err != nil && strings.Contains(err.Error(), "NOT_FOUND") {
solaceStarting = false
}
time.Sleep(1000 * time.Millisecond)
}
// Return container object
return &SolaceContainer{
Container: container,
AmqpConnectionString: amqpConnectionString,
sempClient: sempClient,
}, nil
}
// QueueCreate creates a queue with the given name.
func (c SolaceContainer) QueueCreate(ctx context.Context, queueName string) error {
// Construct parameters
var queueConfig = make(map[string]any)
queueConfig["accessType"] = "non-exclusive"
queueConfig["egressEnabled"] = true
queueConfig["ingressEnabled"] = true
queueConfig["permission"] = "consume"
queueConfig["queueName"] = queueName
queueConfig["maxBindCount"] = 100
// Create the queue
err := c.sempClient.RequestWithBody(
ctx,
"POST",
"/config/msgVpns/default/queues",
queueConfig)
return err
}
// QueueExists checks if a queue with the given name exists.
func (c SolaceContainer) QueueExists(ctx context.Context, queueName string) (bool, error) {
// Check if exists
err := c.sempClient.RequestWithoutBody(
ctx,
"GET",
fmt.Sprintf("/config/msgVpns/default/queues/%s", queueName),
)
// Check if response contains "NOT_FOUND" string indicating that the queue doesn't exist
if err != nil {
if strings.Contains(err.Error(), "NOT_FOUND") {
return false, nil
}
// Return technical errors
return false, err
}
// Return queue exists
return true, nil
}
// QueueDeleteIfExists deletes the queue with the given name if it exists.
func (c SolaceContainer) QueueDeleteIfExists(ctx context.Context, queueName string) error {
// Check if queue exists
exists, err := c.QueueExists(ctx, queueName)
if err != nil {
return err
}
// Delete if exists
if exists {
err := c.sempClient.RequestWithoutBody(
ctx,
"DELETE",
fmt.Sprintf("/config/msgVpns/default/queues/%s", queueName),
)
return err
}
return nil
}
// TopicSubscriptionCreate creates a topic subscription for a (underlying) queue.
//
// Parameters:
// * ctx - the context object
// * queueName - the name of the queue where the topic(s) should be subscribed
// * topicName - the name of the topic with optional wildcards (e.g. "organizations/org-*")
func (c SolaceContainer) TopicSubscriptionCreate(ctx context.Context, queueName string, topicName string) error {
// Construct url and parameters
url := fmt.Sprintf("/config/msgVpns/default/queues/%s/subscriptions", queueName)
subscriptionConfig := make(map[string]any)
subscriptionConfig["subscriptionTopic"] = topicName
// Create the subscription
err := c.sempClient.RequestWithBody(ctx, "POST", url, subscriptionConfig)
return err
}
func (c SolaceContainer) NewAmqpConnection(ctx context.Context) (*amqp.Conn, error) {
return amqp.Dial(ctx, c.AmqpConnectionString, nil)
}
// ValidateTopicName checks that topicName and topicSubscriptionTopicPattern are valid and compatible
// Solace topic name constraints can be found here:
// https://docs.solace.com/Messaging/SMF-Topics.htm
func (c SolaceContainer) ValidateTopicName(topicSubscriptionTopicPattern string, topicName string) error {
// Cut off the topic:// prefix
var name string
if strings.HasPrefix(topicName, "topic://") {
name = topicName[len("topic://"):]
} else {
name = topicName
}
// Check input
if topicSubscriptionTopicPattern == "" {
return errors.New("topicSubscriptionTopicPattern is empty")
}
if name == "" {
return errors.New("topicName is empty")
}
// Check topic name
allowedTopicCharacters, err := regexp.Compile(`[0-9A-Za-z-.]+(?:/[0-9A-Za-z-.]+)+|[0-9A-Za-z-.]+`)
if err != nil {
return err
}
if !allowedTopicCharacters.MatchString(name) {
return errors.New("invalid topic name")
}
// Check topic subscription topic pattern
allowedTopicSubscriptionCharacters, err := regexp.Compile(
`(?:(?:[0-9A-Za-z-.]+|[0-9A-Za-z-.]*\*)(?:/(?:[0-9A-Za-z-.]+|[0-9A-Za-z-.]*\*))+|(?:[0-9A-Za-z-.]+|[0-9A-Za-z-.]*\*)|/>)|>`)
if err != nil {
return err
}
if !allowedTopicSubscriptionCharacters.MatchString(topicSubscriptionTopicPattern) {
return errors.New("invalid topic subscription name")
}
// Check compatibility
subscriptionIndex := 0
var expectedNextCharacter uint8 = 0
var nextError error
for i := 0; i < len(name); i++ {
if expectedNextCharacter != 0 {
if expectedNextCharacter != name[i] {
return nextError
} else {
expectedNextCharacter = 0
nextError = nil
}
}
switch topicSubscriptionTopicPattern[subscriptionIndex] {
case '*':
if name[i] == '/' {
expectedNextCharacter = '/'
nextError = fmt.Errorf("invalid character '/' at index %d", i)
subscriptionIndex++
}
case '/':
if name[i] != '/' {
return fmt.Errorf("expected character '/', got %c at index %d", name[i], i)
}
subscriptionIndex++
case '>':
// everything is allowed
break
default:
if name[i] != topicSubscriptionTopicPattern[subscriptionIndex] {
return fmt.Errorf(
"expected character %c, got %c at index %d",
topicSubscriptionTopicPattern[subscriptionIndex],
name[i],
i,
)
} else {
subscriptionIndex++
}
}
}
return nil
}
// NextMessageFromQueue returns the next message from the queue.
// It is important that the topic subscription matches the topic name.
// Otherwise, no message is returned and the test will fail by exceeding the timeout.
func (c SolaceContainer) NextMessageFromQueue(
ctx context.Context,
queueName string,
accept bool,
) (*amqp.Message, error) {
return c.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), accept)
}
func (c SolaceContainer) NextMessage(ctx context.Context, target string, accept bool) (*amqp.Message, error) {
if !strings.HasPrefix(target, AmqpTopicPrefix) && !strings.HasPrefix(target, AmqpQueuePrefix) {
return nil, fmt.Errorf(
"solace receive: target %q name lacks mandatory prefix %q, %q",
target,
AmqpTopicPrefix,
AmqpQueuePrefix,
)
}
connection, err := c.NewAmqpConnection(ctx)
if err != nil {
return nil, err
}
session, err := connection.NewSession(ctx, nil)
if err != nil {
return nil, err
}
receiver, err := session.NewReceiver(ctx, target, nil)
if err != nil {
return nil, err
}
message, err := receiver.Receive(ctx, nil)
if err != nil {
return nil, err
}
if accept {
err := receiver.AcceptMessage(ctx, message)
if err != nil {
return nil, err
}
}
return message, nil
}
func (c SolaceContainer) Stop() {
_ = c.Terminate(context.Background())
}
func (c SolaceContainer) StopOnError() {
if r := recover(); r != nil {
c.Stop()
}
}