Replace SempClient implementation

This commit is contained in:
Christian Schaible 2024-10-07 10:27:17 +02:00
parent 8263ad9d5f
commit 3bbf1cca71
4 changed files with 104 additions and 246 deletions

View file

@ -1,220 +0,0 @@
package messaging
// terraform-provider-solacebroker
//
// Copyright 2024 Solace Corporation. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"net/http/cookiejar"
"time"
"github.com/hashicorp/go-retryablehttp"
)
var (
ErrResourceNotFound = errors.New("resource not found")
)
var firstRequest = true
type Client struct {
*retryablehttp.Client
url string
username string
password string
bearerToken string
retries uint
retryMinInterval time.Duration
retryMaxInterval time.Duration
requestMinInterval time.Duration
requestTimeout time.Duration
rateLimiter <-chan time.Time
}
type Option func(*Client)
func BasicAuth(username, password string) Option {
return func(client *Client) {
client.username = username
client.password = password
}
}
//func BearerToken(bearerToken string) Option {
// return func(client *Client) {
// client.bearerToken = bearerToken
// }
//}
//
//func Retries(numRetries uint, retryMinInterval, retryMaxInterval time.Duration) Option {
// return func(client *Client) {
// client.retries = numRetries
// client.retryMinInterval = retryMinInterval
// client.retryMaxInterval = retryMaxInterval
// }
//}
//
//func RequestLimits(requestTimeoutDuration, requestMinInterval time.Duration) Option {
// return func(client *Client) {
// client.requestTimeout = requestTimeoutDuration
// client.requestMinInterval = requestMinInterval
// }
//}
func NewClient(url string, insecure_skip_verify bool, providerClient bool, options ...Option) *Client {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure_skip_verify},
MaxIdleConnsPerHost: 10,
}
retryClient := retryablehttp.NewClient()
retryClient.HTTPClient.Transport = tr
if !providerClient {
retryClient.Logger = nil
}
client := &Client{
Client: retryClient,
url: url,
retries: 10, // default 3
retryMinInterval: time.Second,
retryMaxInterval: time.Second * 10,
}
for _, o := range options {
o(client)
}
client.Client.RetryMax = int(client.retries)
client.Client.RetryWaitMin = client.retryMinInterval
client.Client.RetryWaitMax = client.retryMaxInterval
client.HTTPClient.Timeout = client.requestTimeout
client.HTTPClient.Jar, _ = cookiejar.New(nil)
if client.requestMinInterval > 0 {
client.rateLimiter = time.NewTicker(client.requestMinInterval).C
} else {
ch := make(chan time.Time)
// closing the channel will make receiving from the channel non-blocking (the value received will be the
// zero value)
close(ch)
client.rateLimiter = ch
}
firstRequest = true
return client
}
func (c *Client) RequestWithBody(ctx context.Context, method, url string, body any) (map[string]any, error) {
data, err := json.Marshal(body)
if err != nil {
return nil, err
}
request, err := http.NewRequestWithContext(ctx, method, c.url+url, bytes.NewBuffer(data))
if err != nil {
return nil, err
}
slog.Debug(fmt.Sprintf("===== %v to %v =====", request.Method, request.URL))
rawBody, err := c.doRequest(request)
if err != nil {
return nil, err
}
return parseResponseAsObject(ctx, request, rawBody)
}
func (c *Client) doRequest(request *http.Request) ([]byte, error) {
if !firstRequest {
// the value doesn't matter, it is waiting for the value that matters
<-c.rateLimiter
} else {
// only skip rate limiter for the first request
firstRequest = false
}
if request.Method != http.MethodGet {
request.Header.Set("Content-Type", "application/json")
}
// Prefer OAuth even if Basic Auth credentials provided
if c.bearerToken != "" {
request.Header.Set("Authorization", "Bearer "+c.bearerToken)
} else if c.username != "" {
request.SetBasicAuth(c.username, c.password)
} else {
return nil, fmt.Errorf("either username or bearer token must be provided to access the broker")
}
var response *http.Response
var err error
response, err = c.StandardClient().Do(request)
if err != nil || response == nil {
return nil, err
}
defer response.Body.Close()
rawBody, err := io.ReadAll(response.Body)
if err != nil || (response.StatusCode != http.StatusOK && response.StatusCode != http.StatusBadRequest) {
return nil, fmt.Errorf("could not perform request: status %v (%v) during %v to %v, response body:\n%s", response.StatusCode, response.Status, request.Method, request.URL, rawBody)
}
if _, err := io.Copy(io.Discard, response.Body); err != nil {
return nil, fmt.Errorf("response processing error: during %v to %v", request.Method, request.URL)
}
return rawBody, nil
}
func parseResponseAsObject(_ context.Context, request *http.Request, dataResponse []byte) (map[string]any, error) {
data := map[string]any{}
err := json.Unmarshal(dataResponse, &data)
if err != nil {
return nil, fmt.Errorf("could not parse response body from %v to %v, response body was:\n%s", request.Method, request.URL, dataResponse)
}
rawData, ok := data["data"]
if ok {
// Valid data
data, _ = rawData.(map[string]any)
return data, nil
} else {
// Analize response metadata details
rawData, ok = data["meta"]
if ok {
data, _ = rawData.(map[string]any)
if data["responseCode"].(float64) == http.StatusOK {
// this is valid response for delete
return nil, nil
}
description := data["error"].(map[string]interface{})["description"].(string)
status := data["error"].(map[string]interface{})["status"].(string)
if status == "NOT_FOUND" {
// resource not found is a special type we want to return
return nil, fmt.Errorf("request failed from %v to %v, %v, %v, %w", request.Method, request.URL, description, status, ErrResourceNotFound)
}
slog.Error(fmt.Sprintf("SEMP request returned %v, %v", description, status))
return nil, fmt.Errorf("request failed for %v using %v, %v, %v", request.URL, request.Method, description, status)
}
}
return nil, fmt.Errorf("could not parse response details from %v to %v, response body was:\n%s", request.Method, request.URL, dataResponse)
}
func (c *Client) RequestWithoutBody(ctx context.Context, method, url string) (map[string]interface{}, error) {
request, err := http.NewRequestWithContext(ctx, method, c.url+url, nil)
if err != nil {
return nil, err
}
slog.Debug(fmt.Sprintf("===== %v to %v =====", request.Method, request.URL))
rawBody, err := c.doRequest(request)
if err != nil {
return nil, err
}
return parseResponseAsObject(ctx, request, rawBody)
}

View file

@ -1,13 +1,17 @@
package messaging
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/Azure/go-amqp"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"io"
"log/slog"
"net/http"
"regexp"
"strings"
"time"
@ -18,6 +22,98 @@ const (
AmqpQueuePrefix = "queue://"
)
var ErrResourceNotFound = errors.New("resource not found")
type SempClient struct {
client http.Client
sempApiBaseUrl string
username string
password string
}
func (c SempClient) RequestWithoutBody(ctx context.Context, method string, url string) error {
request, err := http.NewRequestWithContext(ctx, method, fmt.Sprintf("%s%s", c.sempApiBaseUrl, url), nil)
if err != nil {
return err
}
response, err := c.doRequest(request)
if err != nil {
return err
}
_, err = c.parseResponseAsObject(response)
return err
}
func (c SempClient) RequestWithBody(ctx context.Context, method string, url string, body any) error {
data, err := json.Marshal(body)
if err != nil {
return err
}
request, err := http.NewRequestWithContext(ctx, method, fmt.Sprintf("%s%s", c.sempApiBaseUrl, url), bytes.NewBuffer(data))
if err != nil {
return err
}
response, err := c.doRequest(request)
if err != nil {
return err
}
_, err = c.parseResponseAsObject(response)
return err
}
func (c SempClient) doRequest(request *http.Request) ([]byte, error) {
request.SetBasicAuth(c.username, c.password)
if request.Method != http.MethodGet {
request.Header.Set("Content-Type", "application/json")
}
response, err := c.client.Do(request)
if err != nil {
return nil, err
}
defer response.Body.Close()
rawBody, err := io.ReadAll(response.Body)
if err != nil || (response.StatusCode != http.StatusOK && response.StatusCode != http.StatusBadRequest) {
return nil, fmt.Errorf("request to %v failes with status %v (%v), response:\n%s", response.StatusCode, response.Status, request.URL, rawBody)
}
if _, err := io.Copy(io.Discard, response.Body); err != nil {
return nil, fmt.Errorf("response processing error for call to %v", request.URL)
}
return rawBody, nil
}
func (c SempClient) parseResponseAsObject(dataResponse []byte) (map[string]any, error) {
data := map[string]any{}
err := json.Unmarshal(dataResponse, &data)
if err != nil {
return nil, fmt.Errorf("could not parse response:\n%s", dataResponse)
}
rawData, ok := data["data"]
if ok {
data, _ = rawData.(map[string]any)
return data, nil
} else {
metadata, ok := data["meta"]
if ok {
data, _ = metadata.(map[string]any)
if data["responseCode"].(float64) == http.StatusOK {
// http-delete
return nil, nil
}
description := data["error"].(map[string]interface{})["description"].(string)
status := data["error"].(map[string]interface{})["status"].(string)
if status == "NOT_FOUND" {
// resource not found
return nil, fmt.Errorf("request failed - description: %v, status: %v, %w", description, status, ErrResourceNotFound)
}
return nil, fmt.Errorf("request failed - description: %v, status: %v", description, status)
}
}
return nil, fmt.Errorf("could not parse response:\n%s", dataResponse)
}
// SolaceContainer wraps a testcontainers docker container instance of solace.
//
// The container must be terminated by calling:
@ -25,7 +121,7 @@ const (
type SolaceContainer struct {
testcontainers.Container
AmqpConnectionString string
sempClient *Client
sempClient SempClient
}
// NewSolaceContainer starts a container and
@ -79,23 +175,19 @@ func NewSolaceContainer(ctx context.Context) (*SolaceContainer, error) {
sempApiBaseUrl := fmt.Sprintf("http://%s:%s/SEMP/v2", host, sempPort.Port())
// Construct SEMP client
sempClient := NewClient(
sempApiBaseUrl,
true,
false,
BasicAuth("admin", "admin"),
)
sempClient := SempClient{client: http.Client{}, sempApiBaseUrl: sempApiBaseUrl, username: "admin", password: "admin"}
// Poll queue endpoint until solace is ready to interact
solaceStarting := true
for solaceStarting {
_, err := sempClient.RequestWithoutBody(
err := sempClient.RequestWithoutBody(
ctx,
"GET",
"/config/msgVpns/default/queues/test",
)
if err != nil && strings.Contains(err.Error(), "NOT_FOUND") {
solaceStarting = false
time.Sleep(1000 * time.Millisecond)
} else {
time.Sleep(100 * time.Millisecond)
}
@ -122,7 +214,7 @@ func (c SolaceContainer) QueueCreate(ctx context.Context, queueName string) erro
queueConfig["maxBindCount"] = 100
// Create the queue
_, err := c.sempClient.RequestWithBody(
err := c.sempClient.RequestWithBody(
ctx,
"POST",
"/config/msgVpns/default/queues",
@ -135,7 +227,7 @@ func (c SolaceContainer) QueueCreate(ctx context.Context, queueName string) erro
func (c SolaceContainer) QueueExists(ctx context.Context, queueName string) (bool, error) {
// Check if exists
_, err := c.sempClient.RequestWithoutBody(
err := c.sempClient.RequestWithoutBody(
ctx,
"GET",
fmt.Sprintf("/config/msgVpns/default/queues/%s", queueName),
@ -165,7 +257,7 @@ func (c SolaceContainer) QueueDeleteIfExists(ctx context.Context, queueName stri
// Delete if exists
if exists {
_, err := c.sempClient.RequestWithoutBody(
err := c.sempClient.RequestWithoutBody(
ctx,
"DELETE",
fmt.Sprintf("/config/msgVpns/default/queues/%s", queueName),
@ -191,7 +283,7 @@ func (c SolaceContainer) TopicSubscriptionCreate(ctx context.Context, queueName
subscriptionConfig["subscriptionTopic"] = topicName
// Create the subscription
_, err := c.sempClient.RequestWithBody(ctx, "POST", url, subscriptionConfig)
err := c.sempClient.RequestWithBody(ctx, "POST", url, subscriptionConfig)
return err
}

2
go.mod
View file

@ -7,7 +7,6 @@ require (
github.com/Azure/go-amqp v1.1.0
github.com/bufbuild/protovalidate-go v0.6.4
github.com/google/uuid v1.6.0
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.33.0
go.opentelemetry.io/otel v1.24.0
@ -36,7 +35,6 @@ require (
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/cel-go v0.21.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.7 // indirect

12
go.sum
View file

@ -39,8 +39,6 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM=
github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
@ -64,12 +62,6 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k=
github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU=
github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
@ -82,10 +74,6 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk=