package test import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "net/http" "regexp" "strings" "time" "github.com/Azure/go-amqp" docker "github.com/docker/docker/api/types/container" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" pkgLog "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/log" ) const AmqpQueuePrefix = "queue://" const AmqpTopicPrefix = "topic://" const dockerImage = "schwarzit-docker.jfrog.io/solace/solace-pubsub-standard:10.8.1.241" var ErrResourceNotFound = errors.New("resource not found") type SempClient struct { client http.Client sempApiBaseUrl string username string password string } func (c SempClient) RequestWithoutBody(ctx context.Context, method string, url string) error { request, err := http.NewRequestWithContext(ctx, method, fmt.Sprintf("%s%s", c.sempApiBaseUrl, url), nil) if err != nil { return err } response, err := c.doRequest(request) if err != nil { return err } _, err = c.parseResponseAsObject(response) return err } func (c SempClient) RequestWithBody(ctx context.Context, method string, url string, body any) error { data, err := json.Marshal(body) if err != nil { return err } request, err := http.NewRequestWithContext(ctx, method, fmt.Sprintf("%s%s", c.sempApiBaseUrl, url), bytes.NewBuffer(data)) if err != nil { return err } response, err := c.doRequest(request) if err != nil { return err } _, err = c.parseResponseAsObject(response) return err } func (c SempClient) doRequest(request *http.Request) ([]byte, error) { request.SetBasicAuth(c.username, c.password) if request.Method != http.MethodGet { request.Header.Set("Content-Type", "application/json") } response, err := c.client.Do(request) if err != nil { return nil, err } defer response.Body.Close() rawBody, err := io.ReadAll(response.Body) if err != nil || (response.StatusCode != http.StatusOK && response.StatusCode != http.StatusBadRequest) { return nil, fmt.Errorf("request to %v failes with status %v (%v), response:\n%s", response.StatusCode, response.Status, request.URL, rawBody) } if _, err := io.Copy(io.Discard, response.Body); err != nil { return nil, fmt.Errorf("response processing error for call to %v", request.URL) } return rawBody, nil } func (c SempClient) parseResponseAsObject(dataResponse []byte) (map[string]any, error) { data := map[string]any{} err := json.Unmarshal(dataResponse, &data) if err != nil { return nil, fmt.Errorf("could not parse response:\n%s", dataResponse) } rawData, ok := data["data"] if ok { data, _ = rawData.(map[string]any) return data, nil } else { metadata, ok := data["meta"] if ok { data, _ = metadata.(map[string]any) if data["responseCode"].(float64) == http.StatusOK { // http-delete return nil, nil } description := data["error"].(map[string]interface{})["description"].(string) status := data["error"].(map[string]interface{})["status"].(string) if status == "NOT_FOUND" { // resource not found return nil, fmt.Errorf("request failed - description: %v, status: %v, %w", description, status, ErrResourceNotFound) } return nil, fmt.Errorf("request failed - description: %v, status: %v", description, status) } } return nil, fmt.Errorf("could not parse response:\n%s", dataResponse) } // SolaceContainer wraps a testcontainers docker container instance of solace. // // The container must be terminated by calling: // solaceContainer.Terminate(ctx) type SolaceContainer struct { testcontainers.Container AmqpConnectionString string sempClient SempClient } // NewSolaceContainer starts a container and waits until it is ready to be used. func NewSolaceContainer(ctx context.Context) (*SolaceContainer, error) { env := make(map[string]string) env["username_admin_globalaccesslevel"] = "admin" env["username_admin_password"] = "admin" // Start docker container request := testcontainers.ContainerRequest{ Image: dockerImage, ExposedPorts: []string{"5672/tcp", "8080/tcp"}, HostConfigModifier: func(config *docker.HostConfig) { config.AutoRemove = true config.ShmSize = 1024 * 1024 * 1024 // 1 GB, }, Env: env, WaitingFor: wait.ForLog("Running pre-startup checks:"). WithStartupTimeout(90 * time.Second), } container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: request, Started: true, }) if err != nil { return nil, err } // Extract host and port information host, err := container.Host(ctx) if err != nil { _ = container.Terminate(ctx) return nil, err } amqpPort, err := container.MappedPort(ctx, "5672") if err != nil { _ = container.Terminate(ctx) return nil, err } sempPort, err := container.MappedPort(ctx, "8080") if err != nil { _ = container.Terminate(ctx) return nil, err } pkgLog.AuditLogger.Info("UI Port: " + sempPort.Port()) // Construct connection strings amqpConnectionString := fmt.Sprintf("amqp://%s:%s/", host, amqpPort.Port()) sempApiBaseUrl := fmt.Sprintf("http://%s:%s/SEMP/v2", host, sempPort.Port()) // Construct SEMP client sempClient := SempClient{client: http.Client{}, sempApiBaseUrl: sempApiBaseUrl, username: "admin", password: "admin"} // Poll queue endpoint until solace is ready to interact solaceStarting := true for solaceStarting { err := sempClient.RequestWithoutBody( ctx, "GET", "/config/msgVpns/default/queues/test", ) if err != nil && strings.Contains(err.Error(), "NOT_FOUND") { solaceStarting = false } time.Sleep(1000 * time.Millisecond) } // Return container object return &SolaceContainer{ Container: container, AmqpConnectionString: amqpConnectionString, sempClient: sempClient, }, nil } // QueueCreate creates a queue with the given name. func (c SolaceContainer) QueueCreate(ctx context.Context, queueName string) error { // Construct parameters var queueConfig = make(map[string]any) queueConfig["accessType"] = "non-exclusive" queueConfig["egressEnabled"] = true queueConfig["ingressEnabled"] = true queueConfig["permission"] = "consume" queueConfig["queueName"] = queueName queueConfig["maxBindCount"] = 100 // Create the queue err := c.sempClient.RequestWithBody( ctx, "POST", "/config/msgVpns/default/queues", queueConfig) return err } // QueueExists checks if a queue with the given name exists. func (c SolaceContainer) QueueExists(ctx context.Context, queueName string) (bool, error) { // Check if exists err := c.sempClient.RequestWithoutBody( ctx, "GET", fmt.Sprintf("/config/msgVpns/default/queues/%s", queueName), ) // Check if response contains "NOT_FOUND" string indicating that the queue doesn't exist if err != nil { if strings.Contains(err.Error(), "NOT_FOUND") { return false, nil } // Return technical errors return false, err } // Return queue exists return true, nil } // QueueDeleteIfExists deletes the queue with the given name if it exists. func (c SolaceContainer) QueueDeleteIfExists(ctx context.Context, queueName string) error { // Check if queue exists exists, err := c.QueueExists(ctx, queueName) if err != nil { return err } // Delete if exists if exists { err := c.sempClient.RequestWithoutBody( ctx, "DELETE", fmt.Sprintf("/config/msgVpns/default/queues/%s", queueName), ) return err } return nil } // TopicSubscriptionCreate creates a topic subscription for a (underlying) queue. // // Parameters: // * ctx - the context object // * queueName - the name of the queue where the topic(s) should be subscribed // * topicName - the name of the topic with optional wildcards (e.g. "organizations/org-*") func (c SolaceContainer) TopicSubscriptionCreate(ctx context.Context, queueName string, topicName string) error { // Construct url and parameters url := fmt.Sprintf("/config/msgVpns/default/queues/%s/subscriptions", queueName) subscriptionConfig := make(map[string]any) subscriptionConfig["subscriptionTopic"] = topicName // Create the subscription err := c.sempClient.RequestWithBody(ctx, "POST", url, subscriptionConfig) return err } func (c SolaceContainer) NewAmqpConnection(ctx context.Context) (*amqp.Conn, error) { return amqp.Dial(ctx, c.AmqpConnectionString, nil) } // ValidateTopicName checks that topicName and topicSubscriptionTopicPattern are valid and compatible // Solace topic name constraints can be found here: // https://docs.solace.com/Messaging/SMF-Topics.htm func (c SolaceContainer) ValidateTopicName(topicSubscriptionTopicPattern string, topicName string) error { // Cut off the topic:// prefix var name string if strings.HasPrefix(topicName, AmqpTopicPrefix) { name = topicName[len(AmqpTopicPrefix):] } else { name = topicName } // Check input if topicSubscriptionTopicPattern == "" { return errors.New("topicSubscriptionTopicPattern is empty") } if name == "" { return errors.New("topicName is empty") } // Check topic name allowedTopicCharacters, err := regexp.Compile(`[0-9A-Za-z-.]+(?:/[0-9A-Za-z-.]+)+|[0-9A-Za-z-.]+`) if err != nil { return err } if !allowedTopicCharacters.MatchString(name) { return errors.New("invalid topic name") } // Check topic subscription topic pattern allowedTopicSubscriptionCharacters, err := regexp.Compile( `(?:(?:[0-9A-Za-z-.]+|[0-9A-Za-z-.]*\*)(?:/(?:[0-9A-Za-z-.]+|[0-9A-Za-z-.]*\*))+|(?:[0-9A-Za-z-.]+|[0-9A-Za-z-.]*\*)|/>)|>`) if err != nil { return err } if !allowedTopicSubscriptionCharacters.MatchString(topicSubscriptionTopicPattern) { return errors.New("invalid topic subscription name") } // Check compatibility subscriptionIndex := 0 var expectedNextCharacter uint8 = 0 var nextError error for i := 0; i < len(name); i++ { if expectedNextCharacter != 0 { if expectedNextCharacter != name[i] { return nextError } else { expectedNextCharacter = 0 nextError = nil } } switch topicSubscriptionTopicPattern[subscriptionIndex] { case '*': if name[i] == '/' { expectedNextCharacter = '/' nextError = fmt.Errorf("invalid character '/' at index %d", i) subscriptionIndex++ } case '/': if name[i] != '/' { return fmt.Errorf("expected character '/', got %c at index %d", name[i], i) } subscriptionIndex++ case '>': // everything is allowed break default: if name[i] != topicSubscriptionTopicPattern[subscriptionIndex] { return fmt.Errorf( "expected character %c, got %c at index %d", topicSubscriptionTopicPattern[subscriptionIndex], name[i], i, ) } else { subscriptionIndex++ } } } return nil } // NextMessageFromQueue returns the next message from the queue. // It is important that the topic subscription matches the topic name. // Otherwise, no message is returned and the test will fail by exceeding the timeout. func (c SolaceContainer) NextMessageFromQueue( ctx context.Context, queueName string, accept bool, ) (*amqp.Message, error) { return c.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), accept) } func (c SolaceContainer) NextMessage(ctx context.Context, target string, accept bool) (*amqp.Message, error) { if !strings.HasPrefix(target, AmqpTopicPrefix) && !strings.HasPrefix(target, AmqpQueuePrefix) { return nil, fmt.Errorf( "solace receive: target %q name lacks mandatory prefix %q, %q", target, AmqpTopicPrefix, AmqpQueuePrefix, ) } connection, err := c.NewAmqpConnection(ctx) if err != nil { return nil, err } session, err := connection.NewSession(ctx, nil) if err != nil { return nil, err } receiver, err := session.NewReceiver(ctx, target, nil) if err != nil { return nil, err } message, err := receiver.Receive(ctx, nil) if err != nil { return nil, err } if accept { err := receiver.AcceptMessage(ctx, message) if err != nil { return nil, err } } return message, nil } func (c SolaceContainer) Stop() { _ = c.Terminate(context.Background()) } func (c SolaceContainer) StopOnError() { if r := recover(); r != nil { c.Stop() } }