package api import ( "context" "errors" "fmt" "strings" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" auditV1 "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/gen/go/audit/v1" internalAuditApi "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/internal/audit/api" pkgAuditCommon "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/audit/common" pkgMessagingApi "dev.azure.com/schwarzit/schwarzit.stackit-public/audit-go.git/pkg/messaging/api" ) // routableTopicNameResolver implements TopicNameResolver. // Resolves topic names by concatenating topic type prefixes with routing identifiers. type routableTopicNameResolver struct { folderTopicPrefix string organizationTopicPrefix string projectTopicPrefix string // If no identifier is provided for routing, it will be routed to a system topic systemTopicName string } // Resolve implements TopicNameResolver.Resolve. func (r *routableTopicNameResolver) Resolve(routableIdentifier *pkgAuditCommon.RoutableIdentifier) (string, error) { if routableIdentifier == nil { return "", pkgAuditCommon.ErrObjectIdentifierNil } switch routableIdentifier.Type { case pkgAuditCommon.ObjectTypeOrganization: return fmt.Sprintf("topic://%s/%s", r.organizationTopicPrefix, routableIdentifier.Identifier), nil case pkgAuditCommon.ObjectTypeProject: return fmt.Sprintf("topic://%s/%s", r.projectTopicPrefix, routableIdentifier.Identifier), nil case pkgAuditCommon.ObjectTypeFolder: return fmt.Sprintf("topic://%s/%s", r.folderTopicPrefix, routableIdentifier.Identifier), nil case pkgAuditCommon.ObjectTypeSystem: return r.systemTopicName, nil default: return "", pkgAuditCommon.ErrUnsupportedObjectIdentifierType } } // topicNameConfig provides topic name information required for the topic name resolution. type topicNameConfig struct { FolderTopicPrefix string OrganizationTopicPrefix string ProjectTopicPrefix string SystemTopicName string } // routableAuditApi is an implementation of AuditApi. // Warning: It is only there for local (compatibility) testing. // DO NOT USE IT! type routableAuditApi struct { messagingApi pkgMessagingApi.Api topicNameResolver pkgAuditCommon.TopicNameResolver tracer trace.Tracer validator pkgAuditCommon.ProtobufValidator } // NewRoutableAuditApi can be used to initialize the audit log api. func newRoutableAuditApi( messagingApi pkgMessagingApi.Api, topicNameConfig topicNameConfig, validator pkgAuditCommon.ProtobufValidator, ) (pkgAuditCommon.AuditApi, error) { if messagingApi == nil { return nil, errors.New("messaging api nil") } // Topic resolver if topicNameConfig.FolderTopicPrefix == "" { return nil, errors.New("folder topic prefix is required") } if topicNameConfig.OrganizationTopicPrefix == "" { return nil, errors.New("organization topic prefix is required") } if topicNameConfig.ProjectTopicPrefix == "" { return nil, errors.New("project topic prefix is required") } if topicNameConfig.SystemTopicName == "" { return nil, errors.New("system topic name is required") } var topicNameResolver pkgAuditCommon.TopicNameResolver = &routableTopicNameResolver{ folderTopicPrefix: topicNameConfig.FolderTopicPrefix, organizationTopicPrefix: topicNameConfig.OrganizationTopicPrefix, projectTopicPrefix: topicNameConfig.ProjectTopicPrefix, systemTopicName: topicNameConfig.SystemTopicName, } // Audit api var auditApi pkgAuditCommon.AuditApi = &routableAuditApi{ messagingApi: messagingApi, topicNameResolver: topicNameResolver, tracer: otel.Tracer("routable-audit-api"), validator: validator, } return auditApi, nil } // Log implements AuditApi.Log func (a *routableAuditApi) Log( ctx context.Context, event *auditV1.AuditLogEntry, visibility auditV1.Visibility, routableIdentifier *pkgAuditCommon.RoutableIdentifier, ) error { cloudEvent, err := a.ValidateAndSerialize(ctx, event, visibility, routableIdentifier) if err != nil { return err } return a.Send(ctx, routableIdentifier, cloudEvent) } // ValidateAndSerialize implements AuditApi.ValidateAndSerialize func (a *routableAuditApi) ValidateAndSerialize( ctx context.Context, event *auditV1.AuditLogEntry, visibility auditV1.Visibility, routableIdentifier *pkgAuditCommon.RoutableIdentifier, ) (*pkgAuditCommon.CloudEvent, error) { ctx, span := a.tracer.Start(ctx, "validate-and-serialize") defer span.End() routableEvent, err := internalAuditApi.ValidateAndSerializePartially( a.validator, event, visibility, routableIdentifier) if err != nil { return nil, err } // Reject event type data-access as the downstream services // cannot handle it at the moment if strings.HasSuffix(event.LogName, string(pkgAuditCommon.EventTypeDataAccess)) { return nil, pkgAuditCommon.ErrUnsupportedEventTypeDataAccess } routableEventBytes, err := proto.Marshal(routableEvent) if err != nil { return nil, err } traceParent, traceState := internalAuditApi.TraceParentAndStateFromContext(ctx) message := pkgAuditCommon.CloudEvent{ SpecVersion: "1.0", Source: event.ProtoPayload.ServiceName, Id: event.InsertId, Time: event.ProtoPayload.RequestMetadata.RequestAttributes.Time.AsTime(), DataContentType: pkgAuditCommon.ContentTypeCloudEventsProtobuf, DataType: fmt.Sprintf("%v", routableEvent.ProtoReflect().Descriptor().FullName()), Subject: event.ProtoPayload.ResourceName, Data: routableEventBytes, TraceParent: &traceParent, TraceState: &traceState, } return &message, nil } // Send implements AuditApi.Send func (a *routableAuditApi) Send( ctx context.Context, routableIdentifier *pkgAuditCommon.RoutableIdentifier, cloudEvent *pkgAuditCommon.CloudEvent, ) error { if cloudEvent != nil && cloudEvent.TraceParent != nil && cloudEvent.TraceState != nil { ctx = internalAuditApi.AddTraceParentAndStateToContext(ctx, *cloudEvent.TraceParent, *cloudEvent.TraceState) } ctx, span := a.tracer.Start(ctx, "send") defer span.End() return internalAuditApi.Send(a.topicNameResolver, a.messagingApi, ctx, routableIdentifier, cloudEvent) }