package messaging import ( "context" "errors" "fmt" "github.com/Azure/go-amqp" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" "log/slog" "regexp" "strings" "time" ) const ( AmqpTopicPrefix = "topic://" AmqpQueuePrefix = "queue://" ) // SolaceContainer wraps a testcontainers docker container instance of solace. // // The container must be terminated by calling: // solaceContainer.Terminate(ctx) type SolaceContainer struct { testcontainers.Container AmqpConnectionString string sempClient *Client } // NewSolaceContainer starts a container and func NewSolaceContainer(ctx context.Context) (*SolaceContainer, error) { env := make(map[string]string) env["username_admin_globalaccesslevel"] = "admin" env["username_admin_password"] = "admin" // Start docker container request := testcontainers.ContainerRequest{ Image: "solace/solace-pubsub-standard:10.8", ExposedPorts: []string{"5672/tcp", "8080/tcp"}, SkipReaper: true, AutoRemove: true, ShmSize: 1024 * 1024 * 1024, // 1 GB, Env: env, WaitingFor: wait.ForLog("Running pre-startup checks:"). WithStartupTimeout(60 * time.Second), } container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: request, Started: true, }) if err != nil { return nil, err } // Extract host and port information host, err := container.Host(ctx) if err != nil { _ = container.Terminate(ctx) return nil, err } amqpPort, err := container.MappedPort(ctx, "5672") if err != nil { _ = container.Terminate(ctx) return nil, err } sempPort, err := container.MappedPort(ctx, "8080") if err != nil { _ = container.Terminate(ctx) return nil, err } slog.Info("UI Port: " + sempPort.Port()) // Construct connection strings amqpConnectionString := fmt.Sprintf("amqp://%s:%s/", host, amqpPort.Port()) sempApiBaseUrl := fmt.Sprintf("http://%s:%s/SEMP/v2", host, sempPort.Port()) // Construct SEMP client sempClient := NewClient( sempApiBaseUrl, true, false, BasicAuth("admin", "admin"), ) // Poll queue endpoint until solace is ready to interact solaceStarting := true for solaceStarting { _, err := sempClient.RequestWithoutBody( ctx, "GET", "/config/msgVpns/default/queues/test", ) if err != nil && strings.Contains(err.Error(), "NOT_FOUND") { solaceStarting = false } else { time.Sleep(100 * time.Millisecond) } } // Return container object return &SolaceContainer{ Container: container, AmqpConnectionString: amqpConnectionString, sempClient: sempClient, }, nil } // QueueCreate creates a queue with the given name. func (c SolaceContainer) QueueCreate(ctx context.Context, queueName string) error { // Construct parameters var queueConfig = make(map[string]any) queueConfig["accessType"] = "non-exclusive" queueConfig["egressEnabled"] = true queueConfig["ingressEnabled"] = true queueConfig["permission"] = "consume" queueConfig["queueName"] = queueName queueConfig["maxBindCount"] = 100 // Create the queue _, err := c.sempClient.RequestWithBody( ctx, "POST", "/config/msgVpns/default/queues", queueConfig) return err } // QueueExists checks if a queue with the given name exists. func (c SolaceContainer) QueueExists(ctx context.Context, queueName string) (bool, error) { // Check if exists _, err := c.sempClient.RequestWithoutBody( ctx, "GET", fmt.Sprintf("/config/msgVpns/default/queues/%s", queueName), ) // Check if response contains "NOT_FOUND" string indicating that the queue doesn't exist if err != nil { if strings.Contains(err.Error(), "NOT_FOUND") { return false, nil } // Return technical errors return false, err } // Return queue exists return true, nil } // QueueDeleteIfExists deletes the queue with the given name if it exists. func (c SolaceContainer) QueueDeleteIfExists(ctx context.Context, queueName string) error { // Check if queue exists exists, err := c.QueueExists(ctx, queueName) if err != nil { return err } // Delete if exists if exists { _, err := c.sempClient.RequestWithoutBody( ctx, "DELETE", fmt.Sprintf("/config/msgVpns/default/queues/%s", queueName), ) return err } return nil } // TopicSubscriptionCreate creates a topic subscription for a (underlying) queue. // // Parameters: // * ctx - the context object // * queueName - the name of the queue where the topic(s) should be subscribed // * topicName - the name of the topic with optional wildcards (e.g. "organizations/org-*") func (c SolaceContainer) TopicSubscriptionCreate(ctx context.Context, queueName string, topicName string) error { // Construct url and parameters url := fmt.Sprintf("/config/msgVpns/default/queues/%s/subscriptions", queueName) subscriptionConfig := make(map[string]any) subscriptionConfig["subscriptionTopic"] = topicName // Create the subscription _, err := c.sempClient.RequestWithBody(ctx, "POST", url, subscriptionConfig) return err } func (c SolaceContainer) NewAmqpConnection(ctx context.Context) (*amqp.Conn, error) { return amqp.Dial(ctx, c.AmqpConnectionString, nil) } // ValidateTopicName checks that topicName and topicSubscriptionTopicPattern are valid and compatible // Solace topic name constraints can be found here: // https://docs.solace.com/Messaging/SMF-Topics.htm func (c SolaceContainer) ValidateTopicName(topicSubscriptionTopicPattern string, topicName string) error { // Cut off the topic:// prefix var name string if strings.HasPrefix(topicName, "topic://") { name = topicName[len("topic://"):] } else { name = topicName } // Check input if topicSubscriptionTopicPattern == "" { return errors.New("topicSubscriptionTopicPattern is empty") } if name == "" { return errors.New("topicName is empty") } // Check topic name allowedTopicCharacters, err := regexp.Compile(`[0-9A-Za-z-.]+(?:/[0-9A-Za-z-.]+)+|[0-9A-Za-z-.]+`) if err != nil { return err } if !allowedTopicCharacters.MatchString(name) { return errors.New("invalid topic name") } // Check topic subscription topic pattern allowedTopicSubscriptionCharacters, err := regexp.Compile( `(?:(?:[0-9A-Za-z-.]+|[0-9A-Za-z-.]*\*)(?:/(?:[0-9A-Za-z-.]+|[0-9A-Za-z-.]*\*))+|(?:[0-9A-Za-z-.]+|[0-9A-Za-z-.]*\*)|/>)|>`) if err != nil { return err } if !allowedTopicSubscriptionCharacters.MatchString(topicSubscriptionTopicPattern) { return errors.New("invalid topic subscription name") } // Check compatibility subscriptionIndex := 0 var expectedNextCharacter uint8 = 0 var nextError error for i := 0; i < len(name); i++ { if expectedNextCharacter != 0 { if expectedNextCharacter != name[i] { return nextError } else { expectedNextCharacter = 0 nextError = nil } } switch topicSubscriptionTopicPattern[subscriptionIndex] { case '*': if name[i] == '/' { expectedNextCharacter = '/' nextError = fmt.Errorf("invalid character '/' at index %d", i) subscriptionIndex++ } case '/': if name[i] != '/' { return fmt.Errorf("expected character '/', got %c at index %d", name[i], i) } subscriptionIndex++ case '>': // everything is allowed break default: if name[i] != topicSubscriptionTopicPattern[subscriptionIndex] { return fmt.Errorf( "expected character %c, got %c at index %d", topicSubscriptionTopicPattern[subscriptionIndex], name[i], i, ) } else { subscriptionIndex++ } } } return nil } // NextMessageFromQueue returns the next message from the queue. // It is important that the topic subscription matches the topic name. // Otherwise, no message is returned and the test will fail by exceeding the timeout. func (c SolaceContainer) NextMessageFromQueue( ctx context.Context, queueName string, accept bool, ) (*amqp.Message, error) { return c.NextMessage(ctx, fmt.Sprintf("queue://%s", queueName), accept) } func (c SolaceContainer) NextMessage(ctx context.Context, target string, accept bool) (*amqp.Message, error) { if !strings.HasPrefix(target, AmqpTopicPrefix) && !strings.HasPrefix(target, AmqpQueuePrefix) { return nil, fmt.Errorf( "solace receive: target %q name lacks mandatory prefix %q, %q", target, AmqpTopicPrefix, AmqpQueuePrefix, ) } connection, err := c.NewAmqpConnection(ctx) if err != nil { return nil, err } session, err := connection.NewSession(ctx, nil) if err != nil { return nil, err } receiver, err := session.NewReceiver(ctx, target, nil) if err != nil { return nil, err } message, err := receiver.Receive(ctx, nil) if err != nil { return nil, err } if accept { err := receiver.AcceptMessage(ctx, message) if err != nil { return nil, err } } return message, nil } func (c SolaceContainer) Stop() { _ = c.Terminate(context.Background()) } func (c SolaceContainer) StopOnError() { if r := recover(); r != nil { c.Stop() } }