audit-go/audit/messaging/solace.go
2025-01-13 16:42:12 +01:00

437 lines
12 KiB
Go

package messaging
import (
"bytes"
"context"
"dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/log"
"encoding/json"
"errors"
"fmt"
"github.com/Azure/go-amqp"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"io"
"net/http"
"regexp"
"strings"
"time"
)
const (
AmqpQueuePrefix = "queue://"
)
var ErrResourceNotFound = errors.New("resource not found")
type SempClient struct {
client http.Client
sempApiBaseUrl string
username string
password string
}
func (c SempClient) RequestWithoutBody(ctx context.Context, method string, url string) error {
request, err := http.NewRequestWithContext(ctx, method, fmt.Sprintf("%s%s", c.sempApiBaseUrl, url), nil)
if err != nil {
return err
}
response, err := c.doRequest(request)
if err != nil {
return err
}
_, err = c.parseResponseAsObject(response)
return err
}
func (c SempClient) RequestWithBody(ctx context.Context, method string, url string, body any) error {
data, err := json.Marshal(body)
if err != nil {
return err
}
request, err := http.NewRequestWithContext(ctx, method, fmt.Sprintf("%s%s", c.sempApiBaseUrl, url), bytes.NewBuffer(data))
if err != nil {
return err
}
response, err := c.doRequest(request)
if err != nil {
return err
}
_, err = c.parseResponseAsObject(response)
return err
}
func (c SempClient) doRequest(request *http.Request) ([]byte, error) {
request.SetBasicAuth(c.username, c.password)
if request.Method != http.MethodGet {
request.Header.Set("Content-Type", "application/json")
}
response, err := c.client.Do(request)
if err != nil {
return nil, err
}
defer response.Body.Close()
rawBody, err := io.ReadAll(response.Body)
if err != nil || (response.StatusCode != http.StatusOK && response.StatusCode != http.StatusBadRequest) {
return nil, fmt.Errorf("request to %v failes with status %v (%v), response:\n%s", response.StatusCode, response.Status, request.URL, rawBody)
}
if _, err := io.Copy(io.Discard, response.Body); err != nil {
return nil, fmt.Errorf("response processing error for call to %v", request.URL)
}
return rawBody, nil
}
func (c SempClient) parseResponseAsObject(dataResponse []byte) (map[string]any, error) {
data := map[string]any{}
err := json.Unmarshal(dataResponse, &data)
if err != nil {
return nil, fmt.Errorf("could not parse response:\n%s", dataResponse)
}
rawData, ok := data["data"]
if ok {
data, _ = rawData.(map[string]any)
return data, nil
} else {
metadata, ok := data["meta"]
if ok {
data, _ = metadata.(map[string]any)
if data["responseCode"].(float64) == http.StatusOK {
// http-delete
return nil, nil
}
description := data["error"].(map[string]interface{})["description"].(string)
status := data["error"].(map[string]interface{})["status"].(string)
if status == "NOT_FOUND" {
// resource not found
return nil, fmt.Errorf("request failed - description: %v, status: %v, %w", description, status, ErrResourceNotFound)
}
return nil, fmt.Errorf("request failed - description: %v, status: %v", description, status)
}
}
return nil, fmt.Errorf("could not parse response:\n%s", dataResponse)
}
// SolaceContainer wraps a testcontainers docker container instance of solace.
//
// The container must be terminated by calling:
// solaceContainer.Terminate(ctx)
type SolaceContainer struct {
testcontainers.Container
AmqpConnectionString string
sempClient SempClient
}
// NewSolaceContainer starts a container and waits until it is ready to be used.
func NewSolaceContainer(ctx context.Context) (*SolaceContainer, error) {
env := make(map[string]string)
env["username_admin_globalaccesslevel"] = "admin"
env["username_admin_password"] = "admin"
// Start docker container
request := testcontainers.ContainerRequest{
Image: "solace/solace-pubsub-standard:10.8",
ExposedPorts: []string{"5672/tcp", "8080/tcp"},
SkipReaper: true,
AutoRemove: true,
ShmSize: 1024 * 1024 * 1024, // 1 GB,
Env: env,
WaitingFor: wait.ForLog("Running pre-startup checks:").
WithStartupTimeout(90 * time.Second),
}
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: request,
Started: true,
})
if err != nil {
return nil, err
}
// Extract host and port information
host, err := container.Host(ctx)
if err != nil {
_ = container.Terminate(ctx)
return nil, err
}
amqpPort, err := container.MappedPort(ctx, "5672")
if err != nil {
_ = container.Terminate(ctx)
return nil, err
}
sempPort, err := container.MappedPort(ctx, "8080")
if err != nil {
_ = container.Terminate(ctx)
return nil, err
}
log.AuditLogger.Info("UI Port: " + sempPort.Port())
// Construct connection strings
amqpConnectionString := fmt.Sprintf("amqp://%s:%s/", host, amqpPort.Port())
sempApiBaseUrl := fmt.Sprintf("http://%s:%s/SEMP/v2", host, sempPort.Port())
// Construct SEMP client
sempClient := SempClient{client: http.Client{}, sempApiBaseUrl: sempApiBaseUrl, username: "admin", password: "admin"}
// Poll queue endpoint until solace is ready to interact
solaceStarting := true
for solaceStarting {
err := sempClient.RequestWithoutBody(
ctx,
"GET",
"/config/msgVpns/default/queues/test",
)
if err != nil && strings.Contains(err.Error(), "NOT_FOUND") {
solaceStarting = false
}
time.Sleep(1000 * time.Millisecond)
}
// Return container object
return &SolaceContainer{
Container: container,
AmqpConnectionString: amqpConnectionString,
sempClient: sempClient,
}, nil
}
// QueueCreate creates a queue with the given name.
func (c SolaceContainer) QueueCreate(ctx context.Context, queueName string) error {
// Construct parameters
var queueConfig = make(map[string]any)
queueConfig["accessType"] = "non-exclusive"
queueConfig["egressEnabled"] = true
queueConfig["ingressEnabled"] = true
queueConfig["permission"] = "consume"
queueConfig["queueName"] = queueName
queueConfig["maxBindCount"] = 100
// Create the queue
err := c.sempClient.RequestWithBody(
ctx,
"POST",
"/config/msgVpns/default/queues",
queueConfig)
return err
}
// QueueExists checks if a queue with the given name exists.
func (c SolaceContainer) QueueExists(ctx context.Context, queueName string) (bool, error) {
// Check if exists
err := c.sempClient.RequestWithoutBody(
ctx,
"GET",
fmt.Sprintf("/config/msgVpns/default/queues/%s", queueName),
)
// Check if response contains "NOT_FOUND" string indicating that the queue doesn't exist
if err != nil {
if strings.Contains(err.Error(), "NOT_FOUND") {
return false, nil
}
// Return technical errors
return false, err
}
// Return queue exists
return true, nil
}
// QueueDeleteIfExists deletes the queue with the given name if it exists.
func (c SolaceContainer) QueueDeleteIfExists(ctx context.Context, queueName string) error {
// Check if queue exists
exists, err := c.QueueExists(ctx, queueName)
if err != nil {
return err
}
// Delete if exists
if exists {
err := c.sempClient.RequestWithoutBody(
ctx,
"DELETE",
fmt.Sprintf("/config/msgVpns/default/queues/%s", queueName),
)
return err
}
return nil
}
// TopicSubscriptionCreate creates a topic subscription for a (underlying) queue.
//
// Parameters:
// * ctx - the context object
// * queueName - the name of the queue where the topic(s) should be subscribed
// * topicName - the name of the topic with optional wildcards (e.g. "organizations/org-*")
func (c SolaceContainer) TopicSubscriptionCreate(ctx context.Context, queueName string, topicName string) error {
// Construct url and parameters
url := fmt.Sprintf("/config/msgVpns/default/queues/%s/subscriptions", queueName)
subscriptionConfig := make(map[string]any)
subscriptionConfig["subscriptionTopic"] = topicName
// Create the subscription
err := c.sempClient.RequestWithBody(ctx, "POST", url, subscriptionConfig)
return err
}
func (c SolaceContainer) NewAmqpConnection(ctx context.Context) (*amqp.Conn, error) {
return amqp.Dial(ctx, c.AmqpConnectionString, nil)
}
// ValidateTopicName checks that topicName and topicSubscriptionTopicPattern are valid and compatible
// Solace topic name constraints can be found here:
// https://docs.solace.com/Messaging/SMF-Topics.htm
func (c SolaceContainer) ValidateTopicName(topicSubscriptionTopicPattern string, topicName string) error {
// Cut off the topic:// prefix
var name string
if strings.HasPrefix(topicName, "topic://") {
name = topicName[len("topic://"):]
} else {
name = topicName
}
// Check input
if topicSubscriptionTopicPattern == "" {
return errors.New("topicSubscriptionTopicPattern is empty")
}
if name == "" {
return errors.New("topicName is empty")
}
// Check topic name
allowedTopicCharacters, err := regexp.Compile(`[0-9A-Za-z-.]+(?:/[0-9A-Za-z-.]+)+|[0-9A-Za-z-.]+`)
if err != nil {
return err
}
if !allowedTopicCharacters.MatchString(name) {
return errors.New("invalid topic name")
}
// Check topic subscription topic pattern
allowedTopicSubscriptionCharacters, err := regexp.Compile(
`(?:(?:[0-9A-Za-z-.]+|[0-9A-Za-z-.]*\*)(?:/(?:[0-9A-Za-z-.]+|[0-9A-Za-z-.]*\*))+|(?:[0-9A-Za-z-.]+|[0-9A-Za-z-.]*\*)|/>)|>`)
if err != nil {
return err
}
if !allowedTopicSubscriptionCharacters.MatchString(topicSubscriptionTopicPattern) {
return errors.New("invalid topic subscription name")
}
// Check compatibility
subscriptionIndex := 0
var expectedNextCharacter uint8 = 0
var nextError error
for i := 0; i < len(name); i++ {
if expectedNextCharacter != 0 {
if expectedNextCharacter != name[i] {
return nextError
} else {
expectedNextCharacter = 0
nextError = nil
}
}
switch topicSubscriptionTopicPattern[subscriptionIndex] {
case '*':
if name[i] == '/' {
expectedNextCharacter = '/'
nextError = fmt.Errorf("invalid character '/' at index %d", i)
subscriptionIndex++
}
case '/':
if name[i] != '/' {
return fmt.Errorf("expected character '/', got %c at index %d", name[i], i)
}
subscriptionIndex++
case '>':
// everything is allowed
break
default:
if name[i] != topicSubscriptionTopicPattern[subscriptionIndex] {
return fmt.Errorf(
"expected character %c, got %c at index %d",
topicSubscriptionTopicPattern[subscriptionIndex],
name[i],
i,
)
} else {
subscriptionIndex++
}
}
}
return nil
}
// NextMessageFromQueue returns the next message from the queue.
// It is important that the topic subscription matches the topic name.
// Otherwise, no message is returned and the test will fail by exceeding the timeout.
func (c SolaceContainer) NextMessageFromQueue(
ctx context.Context,
queueName string,
accept bool,
) (*amqp.Message, error) {
return c.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), accept)
}
func (c SolaceContainer) NextMessage(ctx context.Context, target string, accept bool) (*amqp.Message, error) {
if !strings.HasPrefix(target, AmqpTopicPrefix) && !strings.HasPrefix(target, AmqpQueuePrefix) {
return nil, fmt.Errorf(
"solace receive: target %q name lacks mandatory prefix %q, %q",
target,
AmqpTopicPrefix,
AmqpQueuePrefix,
)
}
connection, err := c.NewAmqpConnection(ctx)
if err != nil {
return nil, err
}
session, err := connection.NewSession(ctx, nil)
if err != nil {
return nil, err
}
receiver, err := session.NewReceiver(ctx, target, nil)
if err != nil {
return nil, err
}
message, err := receiver.Receive(ctx, nil)
if err != nil {
return nil, err
}
if accept {
err := receiver.AcceptMessage(ctx, message)
if err != nil {
return nil, err
}
}
return message, nil
}
func (c SolaceContainer) Stop() {
_ = c.Terminate(context.Background())
}
func (c SolaceContainer) StopOnError() {
if r := recover(); r != nil {
c.Stop()
}
}