audit-go/audit/api/api_routable.go
2024-10-30 15:41:48 +01:00

189 lines
5.5 KiB
Go

package api
import (
"context"
"errors"
"fmt"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"strings"
"google.golang.org/protobuf/proto"
"dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/audit/messaging"
auditV1 "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/gen/go/audit/v1"
)
// routableTopicNameResolver implements TopicNameResolver.
// Resolves topic names by concatenating topic type prefixes with routing identifiers.
type routableTopicNameResolver struct {
folderTopicPrefix string
organizationTopicPrefix string
projectTopicPrefix string
// If no identifier is provided for routing, it will be routed to a system topic
systemTopicName string
}
// Resolve implements TopicNameResolver.Resolve.
func (r *routableTopicNameResolver) Resolve(routableIdentifier *RoutableIdentifier) (string, error) {
if routableIdentifier == nil {
return "", ErrObjectIdentifierNil
}
switch routableIdentifier.Type {
case ObjectTypeOrganization:
return fmt.Sprintf("topic://%s/%s", r.organizationTopicPrefix, routableIdentifier.Identifier), nil
case ObjectTypeProject:
return fmt.Sprintf("topic://%s/%s", r.projectTopicPrefix, routableIdentifier.Identifier), nil
case ObjectTypeFolder:
return fmt.Sprintf("topic://%s/%s", r.folderTopicPrefix, routableIdentifier.Identifier), nil
case ObjectTypeSystem:
return r.systemTopicName, nil
default:
return "", ErrUnsupportedObjectIdentifierType
}
}
// topicNameConfig provides topic name information required for the topic name resolution.
type topicNameConfig struct {
FolderTopicPrefix string
OrganizationTopicPrefix string
ProjectTopicPrefix string
SystemTopicName string
}
// routableAuditApi is an implementation of AuditApi.
// Warning: It is only there for local (compatibility) testing.
// DO NOT USE IT!
type routableAuditApi struct {
messagingApi *messaging.Api
topicNameResolver *TopicNameResolver
tracer trace.Tracer
validator *ProtobufValidator
}
// NewRoutableAuditApi can be used to initialize the audit log api.
func newRoutableAuditApi(
messagingApi *messaging.Api,
topicNameConfig topicNameConfig,
validator ProtobufValidator,
) (*AuditApi, error) {
if messagingApi == nil {
return nil, errors.New("messaging api nil")
}
// Topic resolver
if topicNameConfig.FolderTopicPrefix == "" {
return nil, errors.New("folder topic prefix is required")
}
if topicNameConfig.OrganizationTopicPrefix == "" {
return nil, errors.New("organization topic prefix is required")
}
if topicNameConfig.ProjectTopicPrefix == "" {
return nil, errors.New("project topic prefix is required")
}
if topicNameConfig.SystemTopicName == "" {
return nil, errors.New("system topic name is required")
}
var topicNameResolver TopicNameResolver = &routableTopicNameResolver{
folderTopicPrefix: topicNameConfig.FolderTopicPrefix,
organizationTopicPrefix: topicNameConfig.OrganizationTopicPrefix,
projectTopicPrefix: topicNameConfig.ProjectTopicPrefix,
systemTopicName: topicNameConfig.SystemTopicName,
}
// Audit api
var auditApi AuditApi = &routableAuditApi{
messagingApi: messagingApi,
topicNameResolver: &topicNameResolver,
tracer: otel.Tracer("routable-audit-api"),
validator: &validator,
}
return &auditApi, nil
}
// Log implements AuditApi.Log
func (a *routableAuditApi) Log(
ctx context.Context,
event *auditV1.AuditLogEntry,
visibility auditV1.Visibility,
routableIdentifier *RoutableIdentifier,
) error {
cloudEvent, err := a.ValidateAndSerialize(ctx, event, visibility, routableIdentifier)
if err != nil {
return err
}
return a.Send(ctx, routableIdentifier, cloudEvent)
}
// ValidateAndSerialize implements AuditApi.ValidateAndSerialize
func (a *routableAuditApi) ValidateAndSerialize(
ctx context.Context,
event *auditV1.AuditLogEntry,
visibility auditV1.Visibility,
routableIdentifier *RoutableIdentifier,
) (*CloudEvent, error) {
ctx, span := a.tracer.Start(ctx, "validate-and-serialize")
defer span.End()
routableEvent, err := validateAndSerializePartially(
a.validator,
event,
visibility,
routableIdentifier)
if err != nil {
return nil, err
}
// Reject event type data-access as the downstream services
// cannot handle it at the moment
if strings.HasSuffix(event.LogName, string(EventTypeDataAccess)) {
return nil, ErrUnsupportedEventTypeDataAccess
}
routableEventBytes, err := proto.Marshal(routableEvent)
if err != nil {
return nil, err
}
traceParent, traceState := TraceParentAndStateFromContext(ctx)
message := CloudEvent{
SpecVersion: "1.0",
Source: event.ProtoPayload.ServiceName,
Id: event.InsertId,
Time: event.ProtoPayload.RequestMetadata.RequestAttributes.Time.AsTime(),
DataContentType: ContentTypeCloudEventsProtobuf,
DataType: fmt.Sprintf("%v", routableEvent.ProtoReflect().Descriptor().FullName()),
Subject: event.ProtoPayload.ResourceName,
Data: routableEventBytes,
TraceParent: &traceParent,
TraceState: &traceState,
}
return &message, nil
}
// Send implements AuditApi.Send
func (a *routableAuditApi) Send(
ctx context.Context,
routableIdentifier *RoutableIdentifier,
cloudEvent *CloudEvent,
) error {
if cloudEvent != nil && cloudEvent.TraceParent != nil && cloudEvent.TraceState != nil {
ctx = AddTraceParentAndStateToContext(ctx, *cloudEvent.TraceParent, *cloudEvent.TraceState)
}
ctx, span := a.tracer.Start(ctx, "send")
defer span.End()
return send(a.topicNameResolver, a.messagingApi, ctx, routableIdentifier, cloudEvent)
}