-
Notifications
You must be signed in to change notification settings - Fork 594
HDDS-8655. Optimize listKeys by eliminating redundant seek. #9542
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,9 +29,18 @@ private static byte[] copyPrefix(byte[] prefix) { | |
| } | ||
|
|
||
| RDBStoreByteArrayIterator(ManagedRocksIterator iterator, | ||
| RDBTable table, byte[] prefix, IteratorType type) { | ||
| RDBTable table, byte[] prefix, IteratorType type) { | ||
| this(iterator, table, prefix, type, null); | ||
| } | ||
|
|
||
| RDBStoreByteArrayIterator(ManagedRocksIterator iterator, | ||
| RDBTable table, byte[] prefix, IteratorType type, byte[] seekKey) { | ||
|
Comment on lines
+36
to
+37
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Please do not format method signature like this. Whenever visibility / return type / method name / other modifiers are changed, we would have to reindent all parameters. |
||
| super(iterator, table, copyPrefix(prefix), type); | ||
| seekToFirst(); | ||
| if (seekKey != null) { | ||
| seek(seekKey); | ||
| } else { | ||
| seekToFirst(); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -146,7 +146,7 @@ default VALUE getReadCopy(KEY key) throws RocksDatabaseException, CodecException | |||||
| * Deletes a range of keys from the metadata store. | ||||||
| * | ||||||
| * @param beginKey start metadata key | ||||||
| * @param endKey end metadata key | ||||||
| * @param endKey end metadata key | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| */ | ||||||
| void deleteRange(KEY beginKey, KEY endKey) throws RocksDatabaseException, CodecException; | ||||||
|
|
||||||
|
|
@@ -177,6 +177,20 @@ default KeyValueIterator<KEY, VALUE> iterator(KEY prefix) throws RocksDatabaseEx | |||||
| KeyValueIterator<KEY, VALUE> iterator(KEY prefix, IteratorType type) | ||||||
| throws RocksDatabaseException, CodecException; | ||||||
|
|
||||||
| /** | ||||||
| * Iterate the table with a seek key. | ||||||
| * | ||||||
| * @param prefix The prefix of the elements to be iterated. | ||||||
| * @param seek The key to seek to. | ||||||
| * @return an iterator. | ||||||
| */ | ||||||
| default KeyValueIterator<KEY, VALUE> iterator(KEY prefix, KEY seek) | ||||||
| throws RocksDatabaseException, CodecException { | ||||||
| KeyValueIterator<KEY, VALUE> iterator = iterator(prefix, IteratorType.KEY_AND_VALUE); | ||||||
| iterator.seek(seek); | ||||||
| return iterator; | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * @param prefix The prefix of the elements to be iterated. | ||||||
| * @return a key-only iterator | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -401,6 +401,20 @@ public KeyValueIterator<KEY, VALUE> iterator(KEY prefix, IteratorType type) | |||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Override | ||||||||||
| public KeyValueIterator<KEY, VALUE> iterator(KEY prefix, KEY seek) | ||||||||||
| throws RocksDatabaseException, CodecException { | ||||||||||
| if (supportCodecBuffer) { | ||||||||||
| KeyValueIterator<KEY, VALUE> iterator = iterator(prefix, IteratorType.KEY_AND_VALUE); | ||||||||||
| iterator.seek(seek); | ||||||||||
| return iterator; | ||||||||||
| } else { | ||||||||||
| final byte[] prefixBytes = encodeKey(prefix); | ||||||||||
| final byte[] seekBytes = encodeKey(seek); | ||||||||||
| return new TypedTableIterator(rawTable.iterator(prefixBytes, seekBytes)); | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Override | ||||||||||
| public String getName() { | ||||||||||
| return rawTable.getName(); | ||||||||||
|
|
@@ -573,7 +587,9 @@ abstract class RawIterator<RAW> | |||||||||
| this.rawIterator = rawIterator; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /** Covert the given key to the {@link RAW} type. */ | ||||||||||
| /** | ||||||||||
| * Covert the given key to the {@link RAW} type. | ||||||||||
| */ | ||||||||||
|
Comment on lines
+590
to
+592
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: let's not change this unnecessarily
Suggested change
|
||||||||||
| abstract AutoCloseSupplier<RAW> convert(KEY key) throws CodecException; | ||||||||||
|
|
||||||||||
| /** | ||||||||||
|
|
||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,12 @@ | |
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import org.apache.commons.lang3.RandomStringUtils; | ||
| import org.apache.hadoop.hdds.StringUtils; | ||
| import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions; | ||
|
|
@@ -403,4 +409,136 @@ private void compareSstWithSameName(File checkpoint1, File checkpoint2) | |
| } | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testIteratorWithSeek() throws Exception { | ||
| final Table<byte[], byte[]> table = rdbStore.getTable(families.get(0)); | ||
| // Write keys: a1, a3, a5, b2, b4 | ||
| table.put(getBytesUtf16("a1"), getBytesUtf16("val1")); | ||
| table.put(getBytesUtf16("a3"), getBytesUtf16("val3")); | ||
| table.put(getBytesUtf16("a5"), getBytesUtf16("val5")); | ||
| table.put(getBytesUtf16("b2"), getBytesUtf16("val2")); | ||
| table.put(getBytesUtf16("b4"), getBytesUtf16("val4")); | ||
|
|
||
| // Case 1: Seek to existing key, no prefix | ||
| try (TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iter = table.iterator(null, | ||
| getBytesUtf16("a3"))) { | ||
| assertTrue(iter.hasNext()); | ||
| assertArrayEquals(getBytesUtf16("a3"), iter.next().getKey()); | ||
| assertTrue(iter.hasNext()); | ||
| assertArrayEquals(getBytesUtf16("a5"), iter.next().getKey()); | ||
|
Comment on lines
+426
to
+429
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to suggest adding a helper method to verify the list of keys iterated. Something like: private static void assertIterator(String prefix, String seek, Table<byte[], ?> table, List<String> expected)
throws RocksDatabaseException, CodecException {
final byte[] prefixBytes = prefix != null ? getBytesUtf16(prefix) : null;
final byte[] seekBytes = seek != null ? getBytesUtf16(seek) : null;
try (TableIterator<byte[], ? extends Table.KeyValue<byte[], ?>> iter = table.iterator(prefixBytes, seekBytes)) {
for (String e : expected) {
assertTrue(iter.hasNext());
assertArrayEquals(getBytesUtf16(e), iter.next().getKey());
}
assertFalse(iter.hasNext());
}
}Then we can both simplify cases and make sure the iterator stops (like already done in two of the five). // Case 1: Seek to existing key, no prefix
assertIterator(null, "a3", table, asList("a3", "a5", "b2", "b4"));
// Case 2: Seek to non-existent key (should land on next greater), no prefix
assertIterator(null, "a2", table, asList("a3", "a5", "b2", "b4"));
// Case 3: Seek past all keys, no prefix
assertIterator(null, "z9", table, emptyList());
// Case 4: Seek with prefix
assertIterator("b", "b3", table, singletonList("b4"));
// Case 5: Seek with prefix to start of prefix
assertIterator("b", "b2", table, asList("b2", "b4"));It can be reused in |
||
| } | ||
|
|
||
| // Case 2: Seek to non-existent key (should land on next greater), no prefix | ||
| try (TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iter = table.iterator(null, | ||
| getBytesUtf16("a2"))) { | ||
| assertTrue(iter.hasNext()); | ||
| assertArrayEquals(getBytesUtf16("a3"), iter.next().getKey()); | ||
| } | ||
|
|
||
| // Case 3: Seek past all keys, no prefix | ||
| try (TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iter = table.iterator(null, | ||
| getBytesUtf16("z9"))) { | ||
| assertFalse(iter.hasNext()); | ||
| } | ||
|
|
||
| // Case 4: Seek with prefix | ||
| try (TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iter = table.iterator(getBytesUtf16("b"), | ||
| getBytesUtf16("b3"))) { | ||
| assertTrue(iter.hasNext()); | ||
| assertArrayEquals(getBytesUtf16("b4"), iter.next().getKey()); | ||
| assertFalse(iter.hasNext()); | ||
| } | ||
|
|
||
| // Case 5: Seek with prefix to start of prefix | ||
| try (TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iter = table.iterator(getBytesUtf16("b"), | ||
| getBytesUtf16("b2"))) { | ||
| assertTrue(iter.hasNext()); | ||
| assertArrayEquals(getBytesUtf16("b2"), iter.next().getKey()); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testIteratorSeekEdgeCases() throws Exception { | ||
| final Table<byte[], byte[]> table = rdbStore.getTable(families.get(0)); | ||
| // Empty table check | ||
| try (TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iter = table.iterator(null, | ||
| getBytesUtf16("a1"))) { | ||
| assertFalse(iter.hasNext()); | ||
| } | ||
|
|
||
| table.put(getBytesUtf16("a1"), getBytesUtf16("val1")); | ||
|
|
||
| // Seek before first key | ||
| try (TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iter = table.iterator(null, | ||
| getBytesUtf16("00"))) { | ||
| assertTrue(iter.hasNext()); | ||
| assertArrayEquals(getBytesUtf16("a1"), iter.next().getKey()); | ||
| } | ||
|
|
||
| // Seek after last key | ||
| try (TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iter = table.iterator(null, | ||
| getBytesUtf16("b1"))) { | ||
| assertFalse(iter.hasNext()); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testConcurrentIteratorWithWrites() throws Exception { | ||
| final Table<byte[], byte[]> table = rdbStore.getTable(families.get(1)); | ||
| final int keyCount = 5000; | ||
| final CountDownLatch readyLatch = new CountDownLatch(1); | ||
| final CountDownLatch startLatch = new CountDownLatch(1); | ||
| final AtomicLong writtenKeys = new AtomicLong(0); | ||
|
|
||
| // Writer thread | ||
| ExecutorService executor = Executors.newFixedThreadPool(2); | ||
| Future<Void> writer = executor.submit(() -> { | ||
| readyLatch.countDown(); | ||
| startLatch.await(); | ||
| for (int i = 0; i < keyCount; i++) { | ||
| String key = String.format("key-%05d", i); | ||
| table.put(getBytesUtf16(key), getBytesUtf16("value-" + i)); | ||
| writtenKeys.incrementAndGet(); | ||
| if (i % 100 == 0) { | ||
| Thread.yield(); // Give reader a chance | ||
| } | ||
| } | ||
| return null; | ||
| }); | ||
|
|
||
| // Reader thread (using the optimization) | ||
| Future<Void> reader = executor.submit(() -> { | ||
| readyLatch.countDown(); | ||
| startLatch.await(); | ||
| // Wait for some data to be written | ||
| while (writtenKeys.get() < 100) { | ||
| Thread.sleep(1); | ||
| } | ||
|
|
||
| int seeks = 0; | ||
| for (int i = 0; i < 100; i++) { | ||
| // Randomly seek to a key that should exist (or be close to existing) | ||
| long targetId = (long) (Math.random() * writtenKeys.get()); | ||
| String seekKeyStr = String.format("key-%05d", targetId); | ||
| try (TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iter = table.iterator(null, | ||
| getBytesUtf16(seekKeyStr))) { | ||
| if (iter.hasNext()) { | ||
| assertNotNull(iter.next().getKey()); | ||
| } | ||
| seeks++; | ||
| } | ||
| } | ||
| assertTrue(seeks > 0); | ||
| return null; | ||
| }); | ||
|
|
||
| readyLatch.await(); | ||
| startLatch.countDown(); | ||
|
|
||
| writer.get(10, TimeUnit.SECONDS); | ||
| reader.get(10, TimeUnit.SECONDS); | ||
|
|
||
| executor.shutdown(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -193,7 +193,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager, | |
| // to ensure uniqueness of objectIDs. | ||
| private final long omEpoch; | ||
|
|
||
| private Map<String, Table> tableMap = new HashMap<>(); | ||
| private final Map<String, Table> tableMap = new HashMap<>(); | ||
| private final Map<String, TableCacheMetrics> tableCacheMetricsMap = | ||
| new HashMap<>(); | ||
| private SnapshotChainManager snapshotChainManager; | ||
|
|
@@ -675,7 +675,7 @@ public String getMultipartKeyFSO(String volume, String bucket, String key, Strin | |
|
|
||
| final String fileName = OzoneFSUtils.getFileName(key); | ||
| return getMultipartKey(volumeId, bucketId, parentId, | ||
| fileName, uploadId); | ||
| fileName, uploadId); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -718,10 +718,7 @@ public boolean isVolumeEmpty(String volume) throws IOException { | |
| return false; | ||
| } | ||
|
|
||
| if (isKeyPresentInTable(volumePrefix, bucketTable)) { | ||
| return false; // we found at least one key with this vol/ | ||
| } | ||
| return true; | ||
| return !isKeyPresentInTable(volumePrefix, bucketTable); // we found at least one key with this vol/ | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1103,7 +1100,7 @@ public ListKeysResult listKeys(String volumeName, String bucketName, | |
| long readFromRDbStartNs, readFromRDbStopNs = 0; | ||
| // Get maxKeys from DB if it has. | ||
| try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>> | ||
| keyIter = getKeyTable(getBucketLayout()).iterator()) { | ||
| keyIter = getKeyTable(getBucketLayout()).iterator(null, seekKey)) { | ||
| readFromRDbStartNs = Time.monotonicNowNanos(); | ||
| KeyValue< String, OmKeyInfo > kv; | ||
| keyIter.seek(seekKey); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this |
||
|
|
@@ -1875,11 +1872,7 @@ public boolean containsIncompleteMPUs(String volume, String bucket) | |
| } | ||
|
|
||
| // Check in table | ||
| if (isKeyPresentInTable(keyPrefix, multipartInfoTable)) { | ||
| return true; | ||
| } | ||
|
|
||
| return false; | ||
| return isKeyPresentInTable(keyPrefix, multipartInfoTable); | ||
| } | ||
|
|
||
| // NOTE: Update both getTableBucketPrefix(volume, bucket) & getTableBucketPrefix(tableName, volume, bucket) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Please disable
Align when multilinein settings (if using IDEA).