diff --git a/audit/messaging/client.go b/audit/messaging/client.go deleted file mode 100644 index c2d0563..0000000 --- a/audit/messaging/client.go +++ /dev/null @@ -1,220 +0,0 @@ -package messaging - -// terraform-provider-solacebroker -// -// Copyright 2024 Solace Corporation. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import ( - "bytes" - "context" - "crypto/tls" - "encoding/json" - "errors" - "fmt" - "io" - "log/slog" - "net/http" - "net/http/cookiejar" - "time" - - "github.com/hashicorp/go-retryablehttp" -) - -var ( - ErrResourceNotFound = errors.New("resource not found") -) - -var firstRequest = true - -type Client struct { - *retryablehttp.Client - url string - username string - password string - bearerToken string - retries uint - retryMinInterval time.Duration - retryMaxInterval time.Duration - requestMinInterval time.Duration - requestTimeout time.Duration - rateLimiter <-chan time.Time -} - -type Option func(*Client) - -func BasicAuth(username, password string) Option { - return func(client *Client) { - client.username = username - client.password = password - } -} - -//func BearerToken(bearerToken string) Option { -// return func(client *Client) { -// client.bearerToken = bearerToken -// } -//} -// -//func Retries(numRetries uint, retryMinInterval, retryMaxInterval time.Duration) Option { -// return func(client *Client) { -// client.retries = numRetries -// client.retryMinInterval = retryMinInterval -// client.retryMaxInterval = retryMaxInterval -// } -//} -// -//func RequestLimits(requestTimeoutDuration, requestMinInterval time.Duration) Option { -// return func(client *Client) { -// client.requestTimeout = requestTimeoutDuration -// client.requestMinInterval = requestMinInterval -// } -//} - -func NewClient(url string, insecure_skip_verify bool, providerClient bool, options ...Option) *Client { - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure_skip_verify}, - MaxIdleConnsPerHost: 10, - } - retryClient := retryablehttp.NewClient() - retryClient.HTTPClient.Transport = tr - if !providerClient { - retryClient.Logger = nil - } - client := &Client{ - Client: retryClient, - url: url, - retries: 10, // default 3 - retryMinInterval: time.Second, - retryMaxInterval: time.Second * 10, - } - for _, o := range options { - o(client) - } - client.Client.RetryMax = int(client.retries) - client.Client.RetryWaitMin = client.retryMinInterval - client.Client.RetryWaitMax = client.retryMaxInterval - client.HTTPClient.Timeout = client.requestTimeout - client.HTTPClient.Jar, _ = cookiejar.New(nil) - if client.requestMinInterval > 0 { - client.rateLimiter = time.NewTicker(client.requestMinInterval).C - } else { - ch := make(chan time.Time) - // closing the channel will make receiving from the channel non-blocking (the value received will be the - // zero value) - close(ch) - client.rateLimiter = ch - } - firstRequest = true - return client -} - -func (c *Client) RequestWithBody(ctx context.Context, method, url string, body any) (map[string]any, error) { - data, err := json.Marshal(body) - if err != nil { - return nil, err - } - request, err := http.NewRequestWithContext(ctx, method, c.url+url, bytes.NewBuffer(data)) - if err != nil { - return nil, err - } - slog.Debug(fmt.Sprintf("===== %v to %v =====", request.Method, request.URL)) - rawBody, err := c.doRequest(request) - if err != nil { - return nil, err - } - return parseResponseAsObject(ctx, request, rawBody) -} - -func (c *Client) doRequest(request *http.Request) ([]byte, error) { - if !firstRequest { - // the value doesn't matter, it is waiting for the value that matters - <-c.rateLimiter - } else { - // only skip rate limiter for the first request - firstRequest = false - } - if request.Method != http.MethodGet { - request.Header.Set("Content-Type", "application/json") - } - // Prefer OAuth even if Basic Auth credentials provided - if c.bearerToken != "" { - request.Header.Set("Authorization", "Bearer "+c.bearerToken) - } else if c.username != "" { - request.SetBasicAuth(c.username, c.password) - } else { - return nil, fmt.Errorf("either username or bearer token must be provided to access the broker") - } - var response *http.Response - var err error - response, err = c.StandardClient().Do(request) - if err != nil || response == nil { - return nil, err - } - defer response.Body.Close() - rawBody, err := io.ReadAll(response.Body) - if err != nil || (response.StatusCode != http.StatusOK && response.StatusCode != http.StatusBadRequest) { - return nil, fmt.Errorf("could not perform request: status %v (%v) during %v to %v, response body:\n%s", response.StatusCode, response.Status, request.Method, request.URL, rawBody) - } - if _, err := io.Copy(io.Discard, response.Body); err != nil { - return nil, fmt.Errorf("response processing error: during %v to %v", request.Method, request.URL) - } - return rawBody, nil -} - -func parseResponseAsObject(_ context.Context, request *http.Request, dataResponse []byte) (map[string]any, error) { - data := map[string]any{} - err := json.Unmarshal(dataResponse, &data) - if err != nil { - return nil, fmt.Errorf("could not parse response body from %v to %v, response body was:\n%s", request.Method, request.URL, dataResponse) - } - rawData, ok := data["data"] - if ok { - // Valid data - data, _ = rawData.(map[string]any) - return data, nil - } else { - // Analize response metadata details - rawData, ok = data["meta"] - if ok { - data, _ = rawData.(map[string]any) - if data["responseCode"].(float64) == http.StatusOK { - // this is valid response for delete - return nil, nil - } - description := data["error"].(map[string]interface{})["description"].(string) - status := data["error"].(map[string]interface{})["status"].(string) - if status == "NOT_FOUND" { - // resource not found is a special type we want to return - return nil, fmt.Errorf("request failed from %v to %v, %v, %v, %w", request.Method, request.URL, description, status, ErrResourceNotFound) - } - slog.Error(fmt.Sprintf("SEMP request returned %v, %v", description, status)) - return nil, fmt.Errorf("request failed for %v using %v, %v, %v", request.URL, request.Method, description, status) - } - } - return nil, fmt.Errorf("could not parse response details from %v to %v, response body was:\n%s", request.Method, request.URL, dataResponse) -} - -func (c *Client) RequestWithoutBody(ctx context.Context, method, url string) (map[string]interface{}, error) { - request, err := http.NewRequestWithContext(ctx, method, c.url+url, nil) - if err != nil { - return nil, err - } - slog.Debug(fmt.Sprintf("===== %v to %v =====", request.Method, request.URL)) - rawBody, err := c.doRequest(request) - if err != nil { - return nil, err - } - return parseResponseAsObject(ctx, request, rawBody) -} diff --git a/audit/messaging/solace.go b/audit/messaging/solace.go index 102d65a..237a500 100644 --- a/audit/messaging/solace.go +++ b/audit/messaging/solace.go @@ -1,13 +1,17 @@ package messaging import ( + "bytes" "context" + "encoding/json" "errors" "fmt" "github.com/Azure/go-amqp" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" + "io" "log/slog" + "net/http" "regexp" "strings" "time" @@ -18,6 +22,98 @@ const ( AmqpQueuePrefix = "queue://" ) +var ErrResourceNotFound = errors.New("resource not found") + +type SempClient struct { + client http.Client + sempApiBaseUrl string + username string + password string +} + +func (c SempClient) RequestWithoutBody(ctx context.Context, method string, url string) error { + request, err := http.NewRequestWithContext(ctx, method, fmt.Sprintf("%s%s", c.sempApiBaseUrl, url), nil) + if err != nil { + return err + } + + response, err := c.doRequest(request) + if err != nil { + return err + } + _, err = c.parseResponseAsObject(response) + return err +} + +func (c SempClient) RequestWithBody(ctx context.Context, method string, url string, body any) error { + data, err := json.Marshal(body) + if err != nil { + return err + } + + request, err := http.NewRequestWithContext(ctx, method, fmt.Sprintf("%s%s", c.sempApiBaseUrl, url), bytes.NewBuffer(data)) + if err != nil { + return err + } + + response, err := c.doRequest(request) + if err != nil { + return err + } + _, err = c.parseResponseAsObject(response) + return err +} + +func (c SempClient) doRequest(request *http.Request) ([]byte, error) { + request.SetBasicAuth(c.username, c.password) + if request.Method != http.MethodGet { + request.Header.Set("Content-Type", "application/json") + } + response, err := c.client.Do(request) + if err != nil { + return nil, err + } + defer response.Body.Close() + rawBody, err := io.ReadAll(response.Body) + if err != nil || (response.StatusCode != http.StatusOK && response.StatusCode != http.StatusBadRequest) { + return nil, fmt.Errorf("request to %v failes with status %v (%v), response:\n%s", response.StatusCode, response.Status, request.URL, rawBody) + } + if _, err := io.Copy(io.Discard, response.Body); err != nil { + return nil, fmt.Errorf("response processing error for call to %v", request.URL) + } + return rawBody, nil +} + +func (c SempClient) parseResponseAsObject(dataResponse []byte) (map[string]any, error) { + data := map[string]any{} + err := json.Unmarshal(dataResponse, &data) + if err != nil { + return nil, fmt.Errorf("could not parse response:\n%s", dataResponse) + } + rawData, ok := data["data"] + if ok { + data, _ = rawData.(map[string]any) + return data, nil + } else { + metadata, ok := data["meta"] + if ok { + data, _ = metadata.(map[string]any) + if data["responseCode"].(float64) == http.StatusOK { + // http-delete + return nil, nil + } + description := data["error"].(map[string]interface{})["description"].(string) + status := data["error"].(map[string]interface{})["status"].(string) + if status == "NOT_FOUND" { + // resource not found + return nil, fmt.Errorf("request failed - description: %v, status: %v, %w", description, status, ErrResourceNotFound) + } + return nil, fmt.Errorf("request failed - description: %v, status: %v", description, status) + } + } + return nil, fmt.Errorf("could not parse response:\n%s", dataResponse) +} + // SolaceContainer wraps a testcontainers docker container instance of solace. // // The container must be terminated by calling: @@ -25,7 +121,7 @@ const ( type SolaceContainer struct { testcontainers.Container AmqpConnectionString string - sempClient *Client + sempClient SempClient } // NewSolaceContainer starts a container and @@ -79,23 +175,19 @@ func NewSolaceContainer(ctx context.Context) (*SolaceContainer, error) { sempApiBaseUrl := fmt.Sprintf("http://%s:%s/SEMP/v2", host, sempPort.Port()) // Construct SEMP client - sempClient := NewClient( - sempApiBaseUrl, - true, - false, - BasicAuth("admin", "admin"), - ) + sempClient := SempClient{client: http.Client{}, sempApiBaseUrl: sempApiBaseUrl, username: "admin", password: "admin"} // Poll queue endpoint until solace is ready to interact solaceStarting := true for solaceStarting { - _, err := sempClient.RequestWithoutBody( + err := sempClient.RequestWithoutBody( ctx, "GET", "/config/msgVpns/default/queues/test", ) if err != nil && strings.Contains(err.Error(), "NOT_FOUND") { solaceStarting = false + time.Sleep(1000 * time.Millisecond) } else { time.Sleep(100 * time.Millisecond) } @@ -122,7 +214,7 @@ func (c SolaceContainer) QueueCreate(ctx context.Context, queueName string) erro queueConfig["maxBindCount"] = 100 // Create the queue - _, err := c.sempClient.RequestWithBody( + err := c.sempClient.RequestWithBody( ctx, "POST", "/config/msgVpns/default/queues", @@ -135,7 +227,7 @@ func (c SolaceContainer) QueueCreate(ctx context.Context, queueName string) erro func (c SolaceContainer) QueueExists(ctx context.Context, queueName string) (bool, error) { // Check if exists - _, err := c.sempClient.RequestWithoutBody( + err := c.sempClient.RequestWithoutBody( ctx, "GET", fmt.Sprintf("/config/msgVpns/default/queues/%s", queueName), @@ -165,7 +257,7 @@ func (c SolaceContainer) QueueDeleteIfExists(ctx context.Context, queueName stri // Delete if exists if exists { - _, err := c.sempClient.RequestWithoutBody( + err := c.sempClient.RequestWithoutBody( ctx, "DELETE", fmt.Sprintf("/config/msgVpns/default/queues/%s", queueName), @@ -191,7 +283,7 @@ func (c SolaceContainer) TopicSubscriptionCreate(ctx context.Context, queueName subscriptionConfig["subscriptionTopic"] = topicName // Create the subscription - _, err := c.sempClient.RequestWithBody(ctx, "POST", url, subscriptionConfig) + err := c.sempClient.RequestWithBody(ctx, "POST", url, subscriptionConfig) return err } diff --git a/go.mod b/go.mod index 02e7333..bf2310c 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/Azure/go-amqp v1.1.0 github.com/bufbuild/protovalidate-go v0.6.4 github.com/google/uuid v1.6.0 - github.com/hashicorp/go-retryablehttp v0.7.7 github.com/stretchr/testify v1.9.0 github.com/testcontainers/testcontainers-go v0.33.0 go.opentelemetry.io/otel v1.24.0 @@ -36,7 +35,6 @@ require ( github.com/go-ole/go-ole v1.2.6 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/cel-go v0.21.0 // indirect - github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/klauspost/compress v1.17.4 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.7 // indirect diff --git a/go.sum b/go.sum index 897ba5e..381efa0 100644 --- a/go.sum +++ b/go.sum @@ -39,8 +39,6 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM= github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4= -github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= -github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= @@ -64,12 +62,6 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= -github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= -github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= -github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= -github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= -github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= -github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= @@ -82,10 +74,6 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= -github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= -github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= -github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= -github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk=