Skip to content

Commit f9ce46f

Browse files
toddbaertchrfwowthisthat
authored
feat: multi-project support via selectors and flagSetId namespacing (#1702)
Sorry for how big this PR seems (a lot of the change lines are not significant (DB schema or tests), so I've tried to highlight the important parts. This is a substantial PR that builds on our recent changes to use go-memdb for storage. It primarily supports 2 new crucial features: - support for duplicate flag keys from multiple sources (namespaced by `flagSetId`) as described in [this ADR](https://github.com/open-feature/flagd/blob/main/docs/architecture-decisions/duplicate-flag-keys.md) - support for a robust query syntax in the "selector" fields and headers, as proposed by @tangenti in [this ADR](https://github.com/open-feature/flagd/blob/main/docs/architecture-decisions/decouple-flag-source-and-set.md) Both of these were accomplished using the new go-memdb module. **The built-in "watcher" functionality also allows us to _completely delete_ our "mux" layer, which was responsible for fanning out changes from sync-sources to listeners**; this is something the go-memdb module providers for free (the ability to watch a query for changes). Now, we completely rely on this feature for all change notifications. Additionally, unlike before, change notifications are now scoped to particular _selectors_ (ie: if a client is only interested in changes for flags from `flagSetId: x` or `source: y`, they will only get change notifications pertaining to that selection. Currently, the only supported query fields for the `selector` are `"source"` and `"flagSetId"`, but this functionality can easily be extended. By default, if no `selector` is specified, the previous key-overwrite by source priority apples (this logic has also been simplified using the new database). Most of the new functionality is tested [here](https://github.com/open-feature/flagd/pull/1702/files#diff-0cdd4fe716f4b8a94466279fd1b11187fcf4d74e6434727c33d57ed78c89fe27R163-R478). Selector in action for `Sync` API: ![demo](https://github.com/user-attachments/assets/9a4fd33e-80ef-4b5b-80d1-4fa3fc92b8e7) Selector in action for `Evaluation` API: ![demo2](https://github.com/user-attachments/assets/abf49f9c-2247-4a03-9b8f-15b6f540aad1) To test, run the new make target `make run-flagd-selector-demo`, then use the OFREP or gRPC endpoints to experiment. This new functionality is available on all APIs and endpoints. The command I ran in the gifs above are: streaming: ```shell grpcurl -d '{"selector":"flagSetId=example"}' -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/SyncFlags | jq grpcurl -d '{"selector":"flagSetId=example,source=../config/samples/example_flags.flagd.json"}' -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/SyncFlags | jq grpcurl -d '{"selector":"flagSetId=other"}' -import-path schemas/protobuf/flagd/sync/v1/ -proto sync.proto -plaintext localhost:8015 flagd.sync.v1.FlagSyncService/SyncFlags | jq ``` single-evaluations: ```shell curl -X POST -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags' | jq curl -X POST -H 'flagd-selector:flagSetId=other' -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags' | jq ``` :warning: There's no breaking changes here. Besides the new features, there is one behavioral change - the top level "metadata" object returned for bulk evaluations (and failed evaluations) was previously very nonsensical in it's behavior (we basically just aggregated the metadata from all sources, discarding duplicates, and sent it back. This field was used by providers for telemetry purposes. Now, since flag evaluations and subscripts are "query based" and can aggregate data from multiple sources, we've opted to simply reflect the selector queries contents here. So if you used a selector like `"flagSetId=1234,source=../my/source"`, the top-level metadata object in the response would be: ``` "metadata": { "flagSetId": 1234, "source": "../my/source" } ``` This is useful for the provider's ability to report errors, etc in telemetry. Fixes: #1675 Fixes: #1695 Fixes: #1611 Fixes: #1700 Fixes: #1610 --------- Signed-off-by: Todd Baert <[email protected]> Co-authored-by: chrfwow <[email protected]> Co-authored-by: Giovanni Liva <[email protected]>
1 parent 3228ad8 commit f9ce46f

33 files changed

+1640
-1092
lines changed

CONTRIBUTING.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,13 @@ curl -X POST -d '{"context":{}}' 'http://localhost:8016/ofrep/v1/evaluate/flags
8383
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{}' localhost:8013 flagd.evaluation.v1.Service/ResolveAll | jq
8484
```
8585

86+
#### Remote event streaming via gRPC
87+
88+
```sh
89+
# notifies of flag changes (but does not evaluate)
90+
grpcurl -import-path schemas/protobuf/flagd/evaluation/v1/ -proto evaluation.proto -plaintext -d '{}' localhost:8013 flagd.evaluation.v1.Service/EventStream
91+
```
92+
8693
#### Flag configuration fetch via gRPC
8794

8895
```sh

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@ test-flagd:
4747
go test -race -covermode=atomic -cover -short ./flagd/pkg/... -coverprofile=flagd-coverage.out
4848
test-flagd-proxy:
4949
go test -race -covermode=atomic -cover -short ./flagd-proxy/pkg/... -coverprofile=flagd-proxy-coverage.out
50-
flagd-integration-test: # dependent on ./bin/flagd start -f file:test-harness/flags/testing-flags.json -f file:test-harness/flags/custom-ops.json -f file:test-harness/flags/evaluator-refs.json -f file:test-harness/flags/zero-flags.json
51-
go test -cover ./test/integration $(ARGS)
5250
flagd-benchmark-test:
5351
go test -bench=Bench -short -benchtime=5s -benchmem ./core/... | tee benchmark.txt
5452
flagd-integration-test-harness:
@@ -59,7 +57,9 @@ flagd-integration-test: # dependent on flagd-e2e-test-harness if not running in
5957
run: # default to flagd
6058
make run-flagd
6159
run-flagd:
62-
cd flagd; go run main.go start -f file:../config/samples/example_flags.flagd.json
60+
cd flagd; go run main.go start -f file:../config/samples/example_flags.flagd.json
61+
run-flagd-selector-demo:
62+
cd flagd; go run main.go start -f file:../config/samples/example_flags.flagd.json -f file:../config/samples/example_flags.flagd.2.json
6363
install:
6464
cp systemd/flagd.service /etc/systemd/system/flagd.service
6565
mkdir -p /etc/flagd
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"$schema": "https://flagd.dev/schema/v0/flags.json",
3+
"metadata": {
4+
"flagSetId": "other",
5+
"version": "v1"
6+
},
7+
"flags": {
8+
"myStringFlag": {
9+
"state": "ENABLED",
10+
"variants": {
11+
"dupe1": "dupe1",
12+
"dupe2": "dupe2"
13+
},
14+
"defaultVariant": "dupe1"
15+
}
16+
}
17+
}

core/pkg/evaluator/fractional_test.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
)
1212

1313
func TestFractionalEvaluation(t *testing.T) {
14+
const source = "testSource"
15+
var sources = []string{source}
1416
ctx := context.Background()
1517

1618
commonFlags := Flags{
@@ -458,8 +460,13 @@ func TestFractionalEvaluation(t *testing.T) {
458460
for name, tt := range tests {
459461
t.Run(name, func(t *testing.T) {
460462
log := logger.NewLogger(nil, false)
461-
je := NewJSON(log, store.NewFlags())
462-
je.store.Update("", "", tt.flags.Flags, model.Metadata{})
463+
s, err := store.NewStore(log, sources)
464+
if err != nil {
465+
t.Fatalf("NewStore failed: %v", err)
466+
}
467+
468+
je := NewJSON(log, s)
469+
je.store.Update(source, tt.flags.Flags, model.Metadata{})
463470

464471
value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)
465472

@@ -486,6 +493,8 @@ func TestFractionalEvaluation(t *testing.T) {
486493
}
487494

488495
func BenchmarkFractionalEvaluation(b *testing.B) {
496+
const source = "testSource"
497+
var sources = []string{source}
489498
ctx := context.Background()
490499

491500
flags := Flags{
@@ -587,8 +596,13 @@ func BenchmarkFractionalEvaluation(b *testing.B) {
587596
for name, tt := range tests {
588597
b.Run(name, func(b *testing.B) {
589598
log := logger.NewLogger(nil, false)
590-
je := NewJSON(log, store.NewFlags())
591-
je.store.Update("", "", tt.flags.Flags, model.Metadata{})
599+
s, err := store.NewStore(log, sources)
600+
if err != nil {
601+
b.Fatalf("NewStore failed: %v", err)
602+
}
603+
je := NewJSON(log, s)
604+
je.store.Update(source, tt.flags.Flags, model.Metadata{})
605+
592606
for i := 0; i < b.N; i++ {
593607
value, variant, reason, _, err := resolve[string](
594608
ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)

core/pkg/evaluator/ievaluator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ IEvaluator is an extension of IResolver, allowing storage updates and retrievals
3535
*/
3636
type IEvaluator interface {
3737
GetState() (string, error)
38-
SetState(payload sync.DataSync) (model.Metadata, bool, error)
38+
SetState(payload sync.DataSync) (map[string]interface{}, bool, error)
3939
IResolver
4040
}
4141

core/pkg/evaluator/json.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (je *JSON) SetState(payload sync.DataSync) (map[string]interface{}, bool, e
118118
var events map[string]interface{}
119119
var reSync bool
120120

121-
events, reSync = je.store.Update(payload.Source, payload.Selector, definition.Flags, definition.Metadata)
121+
events, reSync = je.store.Update(payload.Source, definition.Flags, definition.Metadata)
122122

123123
// Number of events correlates to the number of flags changed through this sync, record it
124124
span.SetAttributes(attribute.Int("feature_flag.change_count", len(events)))
@@ -149,8 +149,12 @@ func (je *Resolver) ResolveAllValues(ctx context.Context, reqID string, context
149149
_, span := je.tracer.Start(ctx, "resolveAll")
150150
defer span.End()
151151

152-
var err error
153-
allFlags, flagSetMetadata, err := je.store.GetAll(ctx)
152+
var selector store.Selector
153+
s := ctx.Value(store.SelectorContextKey{})
154+
if s != nil {
155+
selector = s.(store.Selector)
156+
}
157+
allFlags, flagSetMetadata, err := je.store.GetAll(ctx, &selector)
154158
if err != nil {
155159
return nil, flagSetMetadata, fmt.Errorf("error retreiving flags from the store: %w", err)
156160
}
@@ -301,19 +305,19 @@ func resolve[T constraints](ctx context.Context, reqID string, key string, conte
301305
func (je *Resolver) evaluateVariant(ctx context.Context, reqID string, flagKey string, evalCtx map[string]any) (
302306
variant string, variants map[string]interface{}, reason string, metadata map[string]interface{}, err error,
303307
) {
304-
flag, metadata, ok := je.store.Get(ctx, flagKey)
305-
if !ok {
308+
309+
var selector store.Selector
310+
s := ctx.Value(store.SelectorContextKey{})
311+
if s != nil {
312+
selector = s.(store.Selector)
313+
}
314+
flag, metadata, err := je.store.Get(ctx, flagKey, &selector)
315+
if err != nil {
306316
// flag not found
307317
je.Logger.DebugWithID(reqID, fmt.Sprintf("requested flag could not be found: %s", flagKey))
308318
return "", map[string]interface{}{}, model.ErrorReason, metadata, errors.New(model.FlagNotFoundErrorCode)
309319
}
310320

311-
// add selector to evaluation metadata
312-
selector := je.store.SelectorForFlag(ctx, flag)
313-
if selector != "" {
314-
metadata[SelectorMetadataKey] = selector
315-
}
316-
317321
for key, value := range flag.Metadata {
318322
// If value is not nil or empty, copy to metadata
319323
if value != nil {

0 commit comments

Comments
 (0)