Skip to content

Commit 29f21ed

Browse files
author
Ram Prasad Voleti
committed
Add replication throttling mechanism
Add replication throttling mechanism Signed-off-by: Ram Prasad Voleti <[email protected]>
1 parent c50f841 commit 29f21ed

File tree

12 files changed

+849
-12
lines changed

12 files changed

+849
-12
lines changed

integration/run.sh

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,11 @@ function zap() {
112112
# Check for user provided JSON module path
113113
JSON_MODULE_PATH="${JSON_MODULE_PATH:=}"
114114
if [ -z "${JSON_MODULE_PATH}" ]; then
115-
setup_json_module
116-
JSON_MODULE_PATH=${VALKEY_JSON_PATH}
115+
# Only setup JSON module if ENABLE_JSON_MODULE is set to "yes"
116+
if [ "${ENABLE_JSON_MODULE}" = "yes" ]; then
117+
setup_json_module
118+
JSON_MODULE_PATH=${VALKEY_JSON_PATH}
119+
fi
117120
fi
118121
LOG_INFO "JSON_MODULE_PATH => ${JSON_MODULE_PATH}"
119122

@@ -123,7 +126,10 @@ install_test_framework
123126
# Export variables required by the test framework
124127
export MODULE_PATH=${MODULE_PATH}
125128
export VALKEY_SERVER_PATH=${VALKEY_SERVER_PATH}
126-
export JSON_MODULE_PATH=${JSON_MODULE_PATH}
129+
# Only export JSON_MODULE_PATH if it's actually set
130+
if [ ! -z "${JSON_MODULE_PATH}" ]; then
131+
export JSON_MODULE_PATH=${JSON_MODULE_PATH}
132+
fi
127133
export SKIPLOGCLEAN=1
128134

129135
FILTER_ARGS=""
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
import pytest
2+
import time
3+
import struct
4+
from valkey_search_test_case import ValkeySearchClusterTestCase
5+
from valkey import ResponseError
6+
from valkeytestframework.conftest import resource_port_tracker
7+
8+
@pytest.mark.cluster
9+
class TestAsyncClientThrottling(ValkeySearchClusterTestCase):
10+
"""
11+
Test async client throttling during slot migration.
12+
13+
Throttling has 3 phases:
14+
- Phase 1: Queue <= threshold → no blocking
15+
- Phase 2: Queue > threshold, sync=0 → block all async
16+
- Phase 3: Queue > threshold, sync>0 → ratio-based blocking
17+
18+
Tests reduced to 1 writer thread to slow down processing and ensure sustained queue buildup.
19+
"""
20+
21+
CLUSTER_SIZE = 2
22+
# Disable JSON module to avoid atomic slot migration incompatibility
23+
LOAD_JSON_MODULE = False
24+
25+
def append_startup_args(self, args):
26+
"""Reduce threads to slow down backfill for testing."""
27+
args = super().append_startup_args(args)
28+
args.update({
29+
"search.writer-threads": "1",
30+
"search.reader-threads": "1"
31+
})
32+
return args
33+
34+
def test_threshold_async_client_throttle(self):
35+
"""
36+
Test threshold-based blocking with async mutations.
37+
Verifies blocking when async mutations exceed threshold.
38+
"""
39+
source_client = self.client_for_primary(0)
40+
target_client = self.client_for_primary(1)
41+
42+
# Create index on source (may propagate via coordinator)
43+
source_client.execute_command(
44+
"FT.CREATE", "idx", "SCHEMA",
45+
"description", "VECTOR", "HNSW", "6",
46+
"TYPE", "FLOAT32", "DIM", "3", "DISTANCE_METRIC", "L2"
47+
)
48+
49+
# Wait for coordinator propagation
50+
time.sleep(1)
51+
52+
# Enable throttling on BOTH nodes
53+
for client in [source_client, target_client]:
54+
client.execute_command(
55+
"CONFIG", "SET", "search.async-client-throttling-enabled", "yes"
56+
)
57+
client.execute_command(
58+
"CONFIG", "SET", "search.async-clients-block-threshold", "1"
59+
)
60+
61+
# Add 100 documents with vector data to ensure throttling
62+
num_keys = 100
63+
keys = [f"{{migrate}}:p{i}" for i in range(num_keys)]
64+
65+
for i, key in enumerate(keys):
66+
# Create binary vector data (3 float32 values)
67+
vector_data = struct.pack('fff', float(i+1), float(i+2), float(i+3))
68+
source_client.execute_command("HSET", key, "description", vector_data)
69+
70+
# Get slot for migration
71+
slot = source_client.execute_command("CLUSTER", "KEYSLOT", keys[0])
72+
73+
# Get node IDs
74+
source_id = source_client.execute_command("CLUSTER", "MYID").decode('utf-8')
75+
target_id = target_client.execute_command("CLUSTER", "MYID").decode('utf-8')
76+
77+
# Perform atomic slot migration
78+
source_client.execute_command(
79+
"CLUSTER", "MIGRATESLOTS",
80+
"SLOTSRANGE", str(slot), str(slot),
81+
"NODE", target_id
82+
)
83+
84+
# Wait for backfill to complete
85+
time.sleep(3)
86+
87+
# Check post-migration metrics
88+
search_info = target_client.execute_command("INFO", "search")
89+
ft_info = target_client.execute_command("FT.INFO", "idx")
90+
91+
# Parse FT.INFO
92+
ft_dict = {}
93+
for i in range(0, len(ft_info), 2):
94+
key = ft_info[i].decode('utf-8') if isinstance(ft_info[i], bytes) else ft_info[i]
95+
value = ft_info[i+1]
96+
if isinstance(value, bytes):
97+
value = value.decode('utf-8')
98+
ft_dict[key] = value
99+
100+
num_docs = int(ft_dict.get("num_docs", 0))
101+
last_blocked_duration = int(search_info.get("search_last_throttled_duration_us", 0))
102+
103+
# Validate migration completed successfully
104+
assert num_docs == num_keys, \
105+
f"Expected num_docs == {num_keys}, got {num_docs}"
106+
107+
# Validate that blocking occurred with measurable duration
108+
assert last_blocked_duration > 0, \
109+
f"Expected blocking duration > 0, got {last_blocked_duration}us"
110+
111+
# Validate queue counters
112+
async_queue = int(search_info.get("search_async_mutation_queue", -1))
113+
sync_queue = int(search_info.get("search_sync_mutation_queue", -1))
114+
assert async_queue >= 0, f"Invalid async_mutation_queue: {async_queue}"
115+
assert sync_queue == 0, f"Expected sync_queue=0 (Phase 2), got {sync_queue}"
116+
117+
# Note: throttled_clients_count might be 0 after unblocking completes
118+
# The duration metric proves blocking occurred even if count is currently 0
119+
120+
# Verify all throttling metrics exist
121+
required_metrics = [
122+
"search_async_mutation_queue",
123+
"search_sync_mutation_queue",
124+
"search_throttled_clients_count",
125+
"search_last_throttled_duration_us",
126+
"search_current_throttled_duration_us"
127+
]
128+
for metric in required_metrics:
129+
assert metric in search_info, f"Missing required metric: {metric}"
130+
131+
def test_ratio_based_throttle_with_sync_traffic(self):
132+
"""
133+
Test Phase 3 ratio-based blocking with concurrent async + sync traffic.
134+
"""
135+
import threading
136+
137+
source_client = self.client_for_primary(0)
138+
target_client = self.client_for_primary(1)
139+
cluster_client = self.new_cluster_client()
140+
141+
source_client.execute_command(
142+
"FT.CREATE", "idx_phase3", "SCHEMA",
143+
"description", "VECTOR", "HNSW", "6",
144+
"TYPE", "FLOAT32", "DIM", "3", "DISTANCE_METRIC", "L2"
145+
)
146+
time.sleep(1)
147+
148+
for client in [source_client, target_client]:
149+
client.execute_command("CONFIG", "SET", "search.async-client-throttling-enabled", "yes")
150+
client.execute_command("CONFIG", "SET", "search.async-clients-block-threshold", "3")
151+
152+
# Sync keys on target
153+
sync_keys = [f"{{sync}}:s{i}" for i in range(50)]
154+
for i, key in enumerate(sync_keys):
155+
vector_data = struct.pack('fff', float(200+i), float(201+i), float(202+i))
156+
target_client.execute_command("HSET", key, "description", vector_data)
157+
158+
# Migration keys on source
159+
migration_keys = [f"{{phase3}}:p{i}" for i in range(150)]
160+
for i, key in enumerate(migration_keys):
161+
vector_data = struct.pack('fff', float(i+1), float(i+2), float(i+3))
162+
source_client.execute_command("HSET", key, "description", vector_data)
163+
migration_slot = source_client.execute_command("CLUSTER", "KEYSLOT", migration_keys[0])
164+
165+
# Background sync threads
166+
stop_background = threading.Event()
167+
bg_count = [0]
168+
169+
def sync_traffic(tid):
170+
my_keys = sync_keys[tid::5]
171+
while not stop_background.is_set():
172+
for key in my_keys:
173+
try:
174+
vector_data = struct.pack('fff', float(300+tid), float(301+tid), float(302+tid))
175+
cluster_client.execute_command("HSET", key, "description", vector_data)
176+
bg_count[0] += 1
177+
except:
178+
pass
179+
time.sleep(0.002)
180+
181+
bg_threads = [threading.Thread(target=lambda t=i: sync_traffic(t), daemon=True) for i in range(5)]
182+
for t in bg_threads:
183+
t.start()
184+
time.sleep(0.3)
185+
186+
target_id = target_client.execute_command("CLUSTER", "MYID").decode('utf-8')
187+
source_client.execute_command("CLUSTER", "MIGRATESLOTS", "SLOTSRANGE", str(migration_slot), str(migration_slot), "NODE", target_id)
188+
time.sleep(1.5)
189+
190+
stop_background.set()
191+
for t in bg_threads:
192+
t.join(timeout=1)
193+
time.sleep(1)
194+
195+
search_info = target_client.execute_command("INFO", "search")
196+
duration = int(search_info.get("search_last_throttled_duration_us", 0))
197+
count = target_client.execute_command("CLUSTER", "COUNTKEYSINSLOT", migration_slot)
198+
199+
assert count == len(migration_keys)
200+
assert duration > 0
201+
assert bg_count[0] > 0

integration/valkey_search_test_case.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@ def get_replicas(self) -> List[Node]:
218218

219219

220220
class ValkeySearchTestCaseBase(ValkeySearchTestCaseCommon):
221+
# Override in subclass to control JSON module loading
222+
LOAD_JSON_MODULE = True
221223

222224
@pytest.fixture(autouse=True)
223225
def setup_test(self, request):
@@ -248,12 +250,19 @@ def setup_test(self, request):
248250
ReplicationGroup.cleanup(self.rg)
249251

250252
def get_config_file_lines(self, testdir, port) -> List[str]:
251-
return [
253+
config_lines = [
252254
"enable-debug-command yes",
253-
f"loadmodule {os.getenv('JSON_MODULE_PATH')}",
255+
]
256+
# Only load JSON module if class allows it AND JSON_MODULE_PATH is set
257+
if self.LOAD_JSON_MODULE:
258+
json_module_path = os.getenv('JSON_MODULE_PATH')
259+
if json_module_path and json_module_path.strip():
260+
config_lines.append(f"loadmodule {json_module_path}")
261+
config_lines.extend([
254262
f"dir {testdir}",
255263
f"loadmodule {os.getenv('MODULE_PATH')}",
256-
]
264+
])
265+
return config_lines
257266

258267
def verify_error_response(self, client, cmd, expected_err_reply):
259268
try:
@@ -320,6 +329,8 @@ class ValkeySearchClusterTestCase(ValkeySearchTestCaseCommon):
320329
CLUSTER_SIZE = 3
321330
# Default value for replication
322331
REPLICAS_COUNT = 0
332+
# Override in subclass to control JSON module loading
333+
LOAD_JSON_MODULE = True
323334

324335
def _split_range_pairs(self, start, end, n):
325336
points = [start + i * (end - start) // n for i in range(n + 1)]
@@ -456,14 +467,21 @@ def setup_test(self, request):
456467
ReplicationGroup.cleanup(rg)
457468

458469
def get_config_file_lines(self, testdir, port) -> List[str]:
459-
return [
470+
config_lines = [
460471
"enable-debug-command yes",
461-
f"loadmodule {os.getenv('JSON_MODULE_PATH')}",
472+
]
473+
# Only load JSON module if class allows it AND JSON_MODULE_PATH is set
474+
if self.LOAD_JSON_MODULE:
475+
json_module_path = os.getenv('JSON_MODULE_PATH')
476+
if json_module_path and json_module_path.strip():
477+
config_lines.append(f"loadmodule {json_module_path}")
478+
config_lines.extend([
462479
f"dir {testdir}",
463480
"cluster-enabled yes",
464481
f"cluster-config-file nodes_{port}.conf",
465482
f"loadmodule {os.getenv('MODULE_PATH')} --use-coordinator",
466-
]
483+
])
484+
return config_lines
467485

468486
def _wait_for_meet(self, count: int) -> bool:
469487
for primary in self.replication_groups:
@@ -520,4 +538,3 @@ class ValkeySearchClusterTestCaseDebugMode(ValkeySearchClusterTestCase):
520538
'''
521539
def get_config_file_lines(self, testdir, port) -> List[str]:
522540
return EnableDebugMode(super(ValkeySearchClusterTestCaseDebugMode, self).get_config_file_lines(testdir, port))
523-

0 commit comments

Comments
 (0)