mirror of
https://dev.azure.com/schwarzit/schwarzit.stackit-public/_git/audit-go
synced 2026-02-08 00:57:24 +00:00
437 lines
12 KiB
Go
437 lines
12 KiB
Go
package messaging
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/log"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/Azure/go-amqp"
|
|
"github.com/testcontainers/testcontainers-go"
|
|
"github.com/testcontainers/testcontainers-go/wait"
|
|
"io"
|
|
"net/http"
|
|
"regexp"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
AmqpQueuePrefix = "queue://"
|
|
)
|
|
|
|
var ErrResourceNotFound = errors.New("resource not found")
|
|
|
|
type SempClient struct {
|
|
client http.Client
|
|
sempApiBaseUrl string
|
|
username string
|
|
password string
|
|
}
|
|
|
|
func (c SempClient) RequestWithoutBody(ctx context.Context, method string, url string) error {
|
|
request, err := http.NewRequestWithContext(ctx, method, fmt.Sprintf("%s%s", c.sempApiBaseUrl, url), nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
response, err := c.doRequest(request)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = c.parseResponseAsObject(response)
|
|
return err
|
|
}
|
|
|
|
func (c SempClient) RequestWithBody(ctx context.Context, method string, url string, body any) error {
|
|
data, err := json.Marshal(body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
request, err := http.NewRequestWithContext(ctx, method, fmt.Sprintf("%s%s", c.sempApiBaseUrl, url), bytes.NewBuffer(data))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
response, err := c.doRequest(request)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = c.parseResponseAsObject(response)
|
|
return err
|
|
}
|
|
|
|
func (c SempClient) doRequest(request *http.Request) ([]byte, error) {
|
|
request.SetBasicAuth(c.username, c.password)
|
|
if request.Method != http.MethodGet {
|
|
request.Header.Set("Content-Type", "application/json")
|
|
}
|
|
response, err := c.client.Do(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer response.Body.Close()
|
|
rawBody, err := io.ReadAll(response.Body)
|
|
if err != nil || (response.StatusCode != http.StatusOK && response.StatusCode != http.StatusBadRequest) {
|
|
return nil, fmt.Errorf("request to %v failes with status %v (%v), response:\n%s", response.StatusCode, response.Status, request.URL, rawBody)
|
|
}
|
|
if _, err := io.Copy(io.Discard, response.Body); err != nil {
|
|
return nil, fmt.Errorf("response processing error for call to %v", request.URL)
|
|
}
|
|
return rawBody, nil
|
|
}
|
|
|
|
func (c SempClient) parseResponseAsObject(dataResponse []byte) (map[string]any, error) {
|
|
data := map[string]any{}
|
|
err := json.Unmarshal(dataResponse, &data)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not parse response:\n%s", dataResponse)
|
|
}
|
|
rawData, ok := data["data"]
|
|
if ok {
|
|
data, _ = rawData.(map[string]any)
|
|
return data, nil
|
|
} else {
|
|
metadata, ok := data["meta"]
|
|
if ok {
|
|
data, _ = metadata.(map[string]any)
|
|
if data["responseCode"].(float64) == http.StatusOK {
|
|
// http-delete
|
|
return nil, nil
|
|
}
|
|
description := data["error"].(map[string]interface{})["description"].(string)
|
|
status := data["error"].(map[string]interface{})["status"].(string)
|
|
if status == "NOT_FOUND" {
|
|
// resource not found
|
|
return nil, fmt.Errorf("request failed - description: %v, status: %v, %w", description, status, ErrResourceNotFound)
|
|
}
|
|
return nil, fmt.Errorf("request failed - description: %v, status: %v", description, status)
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("could not parse response:\n%s", dataResponse)
|
|
}
|
|
|
|
// SolaceContainer wraps a testcontainers docker container instance of solace.
|
|
//
|
|
// The container must be terminated by calling:
|
|
// solaceContainer.Terminate(ctx)
|
|
type SolaceContainer struct {
|
|
testcontainers.Container
|
|
AmqpConnectionString string
|
|
sempClient SempClient
|
|
}
|
|
|
|
// NewSolaceContainer starts a container and waits until it is ready to be used.
|
|
func NewSolaceContainer(ctx context.Context) (*SolaceContainer, error) {
|
|
|
|
env := make(map[string]string)
|
|
env["username_admin_globalaccesslevel"] = "admin"
|
|
env["username_admin_password"] = "admin"
|
|
|
|
// Start docker container
|
|
request := testcontainers.ContainerRequest{
|
|
Image: "solace/solace-pubsub-standard:10.8",
|
|
ExposedPorts: []string{"5672/tcp", "8080/tcp"},
|
|
SkipReaper: true,
|
|
AutoRemove: true,
|
|
ShmSize: 1024 * 1024 * 1024, // 1 GB,
|
|
Env: env,
|
|
WaitingFor: wait.ForLog("Running pre-startup checks:").
|
|
WithStartupTimeout(90 * time.Second),
|
|
}
|
|
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
|
|
ContainerRequest: request,
|
|
Started: true,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Extract host and port information
|
|
host, err := container.Host(ctx)
|
|
if err != nil {
|
|
_ = container.Terminate(ctx)
|
|
return nil, err
|
|
}
|
|
|
|
amqpPort, err := container.MappedPort(ctx, "5672")
|
|
if err != nil {
|
|
_ = container.Terminate(ctx)
|
|
return nil, err
|
|
}
|
|
|
|
sempPort, err := container.MappedPort(ctx, "8080")
|
|
if err != nil {
|
|
_ = container.Terminate(ctx)
|
|
return nil, err
|
|
}
|
|
log.AuditLogger.Info("UI Port: " + sempPort.Port())
|
|
|
|
// Construct connection strings
|
|
amqpConnectionString := fmt.Sprintf("amqp://%s:%s/", host, amqpPort.Port())
|
|
sempApiBaseUrl := fmt.Sprintf("http://%s:%s/SEMP/v2", host, sempPort.Port())
|
|
|
|
// Construct SEMP client
|
|
sempClient := SempClient{client: http.Client{}, sempApiBaseUrl: sempApiBaseUrl, username: "admin", password: "admin"}
|
|
|
|
// Poll queue endpoint until solace is ready to interact
|
|
solaceStarting := true
|
|
for solaceStarting {
|
|
err := sempClient.RequestWithoutBody(
|
|
ctx,
|
|
"GET",
|
|
"/config/msgVpns/default/queues/test",
|
|
)
|
|
if err != nil && strings.Contains(err.Error(), "NOT_FOUND") {
|
|
solaceStarting = false
|
|
}
|
|
time.Sleep(1000 * time.Millisecond)
|
|
}
|
|
|
|
// Return container object
|
|
return &SolaceContainer{
|
|
Container: container,
|
|
AmqpConnectionString: amqpConnectionString,
|
|
sempClient: sempClient,
|
|
}, nil
|
|
}
|
|
|
|
// QueueCreate creates a queue with the given name.
|
|
func (c SolaceContainer) QueueCreate(ctx context.Context, queueName string) error {
|
|
|
|
// Construct parameters
|
|
var queueConfig = make(map[string]any)
|
|
queueConfig["accessType"] = "non-exclusive"
|
|
queueConfig["egressEnabled"] = true
|
|
queueConfig["ingressEnabled"] = true
|
|
queueConfig["permission"] = "consume"
|
|
queueConfig["queueName"] = queueName
|
|
queueConfig["maxBindCount"] = 100
|
|
|
|
// Create the queue
|
|
err := c.sempClient.RequestWithBody(
|
|
ctx,
|
|
"POST",
|
|
"/config/msgVpns/default/queues",
|
|
queueConfig)
|
|
|
|
return err
|
|
}
|
|
|
|
// QueueExists checks if a queue with the given name exists.
|
|
func (c SolaceContainer) QueueExists(ctx context.Context, queueName string) (bool, error) {
|
|
|
|
// Check if exists
|
|
err := c.sempClient.RequestWithoutBody(
|
|
ctx,
|
|
"GET",
|
|
fmt.Sprintf("/config/msgVpns/default/queues/%s", queueName),
|
|
)
|
|
// Check if response contains "NOT_FOUND" string indicating that the queue doesn't exist
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "NOT_FOUND") {
|
|
return false, nil
|
|
}
|
|
|
|
// Return technical errors
|
|
return false, err
|
|
}
|
|
|
|
// Return queue exists
|
|
return true, nil
|
|
}
|
|
|
|
// QueueDeleteIfExists deletes the queue with the given name if it exists.
|
|
func (c SolaceContainer) QueueDeleteIfExists(ctx context.Context, queueName string) error {
|
|
|
|
// Check if queue exists
|
|
exists, err := c.QueueExists(ctx, queueName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Delete if exists
|
|
if exists {
|
|
err := c.sempClient.RequestWithoutBody(
|
|
ctx,
|
|
"DELETE",
|
|
fmt.Sprintf("/config/msgVpns/default/queues/%s", queueName),
|
|
)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// TopicSubscriptionCreate creates a topic subscription for a (underlying) queue.
|
|
//
|
|
// Parameters:
|
|
// * ctx - the context object
|
|
// * queueName - the name of the queue where the topic(s) should be subscribed
|
|
// * topicName - the name of the topic with optional wildcards (e.g. "organizations/org-*")
|
|
func (c SolaceContainer) TopicSubscriptionCreate(ctx context.Context, queueName string, topicName string) error {
|
|
|
|
// Construct url and parameters
|
|
url := fmt.Sprintf("/config/msgVpns/default/queues/%s/subscriptions", queueName)
|
|
|
|
subscriptionConfig := make(map[string]any)
|
|
subscriptionConfig["subscriptionTopic"] = topicName
|
|
|
|
// Create the subscription
|
|
err := c.sempClient.RequestWithBody(ctx, "POST", url, subscriptionConfig)
|
|
|
|
return err
|
|
}
|
|
|
|
func (c SolaceContainer) NewAmqpConnection(ctx context.Context) (*amqp.Conn, error) {
|
|
return amqp.Dial(ctx, c.AmqpConnectionString, nil)
|
|
}
|
|
|
|
// ValidateTopicName checks that topicName and topicSubscriptionTopicPattern are valid and compatible
|
|
// Solace topic name constraints can be found here:
|
|
// https://docs.solace.com/Messaging/SMF-Topics.htm
|
|
func (c SolaceContainer) ValidateTopicName(topicSubscriptionTopicPattern string, topicName string) error {
|
|
// Cut off the topic:// prefix
|
|
var name string
|
|
if strings.HasPrefix(topicName, "topic://") {
|
|
name = topicName[len("topic://"):]
|
|
} else {
|
|
name = topicName
|
|
}
|
|
|
|
// Check input
|
|
if topicSubscriptionTopicPattern == "" {
|
|
return errors.New("topicSubscriptionTopicPattern is empty")
|
|
}
|
|
if name == "" {
|
|
return errors.New("topicName is empty")
|
|
}
|
|
|
|
// Check topic name
|
|
allowedTopicCharacters, err := regexp.Compile(`[0-9A-Za-z-.]+(?:/[0-9A-Za-z-.]+)+|[0-9A-Za-z-.]+`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !allowedTopicCharacters.MatchString(name) {
|
|
return errors.New("invalid topic name")
|
|
}
|
|
|
|
// Check topic subscription topic pattern
|
|
allowedTopicSubscriptionCharacters, err := regexp.Compile(
|
|
`(?:(?:[0-9A-Za-z-.]+|[0-9A-Za-z-.]*\*)(?:/(?:[0-9A-Za-z-.]+|[0-9A-Za-z-.]*\*))+|(?:[0-9A-Za-z-.]+|[0-9A-Za-z-.]*\*)|/>)|>`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !allowedTopicSubscriptionCharacters.MatchString(topicSubscriptionTopicPattern) {
|
|
return errors.New("invalid topic subscription name")
|
|
}
|
|
|
|
// Check compatibility
|
|
subscriptionIndex := 0
|
|
var expectedNextCharacter uint8 = 0
|
|
var nextError error
|
|
for i := 0; i < len(name); i++ {
|
|
if expectedNextCharacter != 0 {
|
|
if expectedNextCharacter != name[i] {
|
|
return nextError
|
|
} else {
|
|
expectedNextCharacter = 0
|
|
nextError = nil
|
|
}
|
|
}
|
|
|
|
switch topicSubscriptionTopicPattern[subscriptionIndex] {
|
|
case '*':
|
|
if name[i] == '/' {
|
|
expectedNextCharacter = '/'
|
|
nextError = fmt.Errorf("invalid character '/' at index %d", i)
|
|
subscriptionIndex++
|
|
}
|
|
case '/':
|
|
if name[i] != '/' {
|
|
return fmt.Errorf("expected character '/', got %c at index %d", name[i], i)
|
|
}
|
|
subscriptionIndex++
|
|
case '>':
|
|
// everything is allowed
|
|
break
|
|
default:
|
|
if name[i] != topicSubscriptionTopicPattern[subscriptionIndex] {
|
|
return fmt.Errorf(
|
|
"expected character %c, got %c at index %d",
|
|
topicSubscriptionTopicPattern[subscriptionIndex],
|
|
name[i],
|
|
i,
|
|
)
|
|
} else {
|
|
subscriptionIndex++
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NextMessageFromQueue returns the next message from the queue.
|
|
// It is important that the topic subscription matches the topic name.
|
|
// Otherwise, no message is returned and the test will fail by exceeding the timeout.
|
|
func (c SolaceContainer) NextMessageFromQueue(
|
|
ctx context.Context,
|
|
queueName string,
|
|
accept bool,
|
|
) (*amqp.Message, error) {
|
|
|
|
return c.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), accept)
|
|
}
|
|
|
|
func (c SolaceContainer) NextMessage(ctx context.Context, target string, accept bool) (*amqp.Message, error) {
|
|
if !strings.HasPrefix(target, AmqpTopicPrefix) && !strings.HasPrefix(target, AmqpQueuePrefix) {
|
|
return nil, fmt.Errorf(
|
|
"solace receive: target %q name lacks mandatory prefix %q, %q",
|
|
target,
|
|
AmqpTopicPrefix,
|
|
AmqpQueuePrefix,
|
|
)
|
|
}
|
|
|
|
connection, err := c.NewAmqpConnection(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
session, err := connection.NewSession(ctx, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
receiver, err := session.NewReceiver(ctx, target, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
message, err := receiver.Receive(ctx, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if accept {
|
|
err := receiver.AcceptMessage(ctx, message)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return message, nil
|
|
}
|
|
|
|
func (c SolaceContainer) Stop() {
|
|
_ = c.Terminate(context.Background())
|
|
}
|
|
|
|
func (c SolaceContainer) StopOnError() {
|
|
if r := recover(); r != nil {
|
|
c.Stop()
|
|
}
|
|
}
|