-
Notifications
You must be signed in to change notification settings - Fork 14
Basic serialization and deserialization of diff filters #71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
d90d6f0
3e33988
c143f4f
fc9c45d
aa215ce
42f76b8
a80c3e6
3fc9ff3
0b63f37
14c5b08
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,7 @@ use crate::{Count, Diff}; | |
|
|
||
| mod bitvec; | ||
| mod config; | ||
| mod serde; | ||
| mod sim_hash; | ||
|
|
||
| use bitvec::*; | ||
|
|
@@ -77,7 +78,7 @@ impl<C: GeoConfig<Diff>> std::fmt::Debug for GeoDiffCount<'_, C> { | |
| } | ||
| } | ||
|
|
||
| impl<C: GeoConfig<Diff>> GeoDiffCount<'_, C> { | ||
| impl<'a, C: GeoConfig<Diff>> GeoDiffCount<'a, C> { | ||
| pub fn new(config: C) -> Self { | ||
| Self { | ||
| config, | ||
|
|
@@ -208,16 +209,23 @@ impl<C: GeoConfig<Diff>> GeoDiffCount<'_, C> { | |
| /// that makes the cost of the else case negligible. | ||
| fn xor_bit(&mut self, bucket: C::BucketType) { | ||
| if bucket.into_usize() < self.lsb.num_bits() { | ||
| // The bit being toggled is within our LSB bit vector | ||
| // so toggle it directly. | ||
| self.lsb.toggle(bucket.into_usize()); | ||
| } else { | ||
| let msb = self.msb.to_mut(); | ||
| match msb.binary_search_by(|k| bucket.cmp(k)) { | ||
| Ok(idx) => { | ||
| // The bit is already set in the MSB sparse bitset, remove it (XOR) | ||
| msb.remove(idx); | ||
|
|
||
| // We have removed a value from our MSB, move a value in the | ||
| // LSB into the MSB | ||
|
||
| let (first, second) = { | ||
| let mut lsb = iter_ones(self.lsb.bit_chunks().peekable()); | ||
| (lsb.next(), lsb.next()) | ||
| }; | ||
|
|
||
itsibitzi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| let new_smallest = if let Some(smallest) = first { | ||
| msb.push(C::BucketType::from_usize(smallest)); | ||
| second.map(|_| smallest).unwrap_or(0) | ||
|
|
@@ -229,15 +237,19 @@ impl<C: GeoConfig<Diff>> GeoDiffCount<'_, C> { | |
| Err(idx) => { | ||
| msb.insert(idx, bucket); | ||
| if msb.len() > self.config.max_msb_len() { | ||
| // We have too many values in the MSB sparse index vector, | ||
| // let's move the smalles MSB value into the LSB bit vector | ||
itsibitzi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| let smallest = msb | ||
| .pop() | ||
| .expect("we should have at least one element!") | ||
| .into_usize(); | ||
| // ensure vector covers smallest | ||
|
|
||
| let new_smallest = msb | ||
| .last() | ||
| .expect("should have at least one element") | ||
| .into_usize(); | ||
|
|
||
| // ensure LSB bit vector has the space for `smallest` | ||
| self.lsb.resize(new_smallest); | ||
| self.lsb.toggle(smallest); | ||
| } | ||
|
|
@@ -360,7 +372,7 @@ impl<C: GeoConfig<Diff>> Count<Diff> for GeoDiffCount<'_, C> { | |
| #[cfg(test)] | ||
| mod tests { | ||
| use itertools::Itertools; | ||
| use rand::{RngCore, SeedableRng}; | ||
| use rand::{seq::IteratorRandom, RngCore, SeedableRng}; | ||
|
|
||
| use crate::{ | ||
| build_hasher::UnstableDefaultBuildHasher, | ||
|
|
@@ -580,4 +592,58 @@ mod tests { | |
| iter_ones(self.bit_chunks().peekable()).map(C::BucketType::from_usize) | ||
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_serialization_empty() { | ||
| let before = GeoDiffCount7::default(); | ||
|
|
||
| let mut writer = vec![]; | ||
| before.write(&mut writer).unwrap(); | ||
|
|
||
| assert_eq!(writer.len(), 0); | ||
|
|
||
| let after = GeoDiffCount7::from_bytes(before.config.clone(), &writer); | ||
|
|
||
| assert_eq!(before, after); | ||
| } | ||
|
|
||
| // This helper exists in order to easily test serializing types with different | ||
| // bucket types in the MSB sparse bit field representation. See tests below. | ||
| fn serialization_round_trip<C: GeoConfig<Diff> + Default>() { | ||
| let mut rnd = rand::rngs::StdRng::from_os_rng(); | ||
|
|
||
| // Run 100 simulations of random values being put into | ||
| // a diff counter. "Serializing" to a vector to emulate | ||
| // writing to a disk, and then deserializing and asserting | ||
| // the filters are equal. | ||
| for _ in 0..100 { | ||
| let mut before = GeoDiffCount::<'_, C>::default(); | ||
|
|
||
| // Select a random number of items to insert | ||
itsibitzi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| let items = (1..1000).choose(&mut rnd).unwrap(); | ||
|
|
||
| for _ in 0..items { | ||
| before.push_hash(rnd.next_u64()); | ||
| } | ||
|
|
||
| let mut writer = vec![]; | ||
| before.write(&mut writer).unwrap(); | ||
|
|
||
| let after = GeoDiffCount::<'_, C>::from_bytes(before.config.clone(), &writer); | ||
|
|
||
| assert_eq!(before, after); | ||
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_serialization_round_trip_7() { | ||
| // Uses a u16 for MSB buckets | ||
| serialization_round_trip::<GeoDiffConfig7>(); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_serialization_round_trip_13() { | ||
| // Uses a u32 for MSB buckets | ||
| serialization_round_trip::<GeoDiffConfig7>(); | ||
itsibitzi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,11 +2,11 @@ use std::borrow::Cow; | |||||||||||||
| use std::cmp::Ordering; | ||||||||||||||
| use std::iter::Peekable; | ||||||||||||||
| use std::mem::{size_of, size_of_val}; | ||||||||||||||
| use std::ops::{Index, Range}; | ||||||||||||||
| use std::ops::{Deref as _, Index, Range}; | ||||||||||||||
|
|
||||||||||||||
| use crate::config::BitChunk; | ||||||||||||||
| use crate::config::IsBucketType; | ||||||||||||||
| use crate::config::BITS_PER_BLOCK; | ||||||||||||||
| use crate::config::{BitChunk, BYTES_PER_BLOCK}; | ||||||||||||||
|
|
||||||||||||||
| /// A bit vector where every bit occupies exactly one bit (in contrast to `Vec<bool>` where each | ||||||||||||||
| /// bit consumes 1 byte). It only implements the minimum number of operations that we need for our | ||||||||||||||
|
|
@@ -81,6 +81,10 @@ impl BitVec<'_> { | |||||||||||||
| self.num_bits | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| pub fn is_empty(&self) -> bool { | ||||||||||||||
| self.num_bits() == 0 | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /// Tests the bit specified by the provided zero-based bit position. | ||||||||||||||
| pub fn test_bit(&self, index: usize) -> bool { | ||||||||||||||
| assert!(index < self.num_bits); | ||||||||||||||
|
|
@@ -142,6 +146,60 @@ impl BitVec<'_> { | |||||||||||||
| let Self { num_bits, blocks } = self; | ||||||||||||||
| size_of_val(num_bits) + blocks.len() * size_of::<u64>() | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| pub fn from_bytes(mut buf: &[u8]) -> Self { | ||||||||||||||
| if buf.is_empty() { | ||||||||||||||
| return Self::default(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
itsibitzi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||
| // The first byte of the serialized BitVec is used to indicate how many | ||||||||||||||
| // of the bits in the left-most byte are *unoccupied*. | ||||||||||||||
itsibitzi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||
| // See [`BitVec::write`] implementation for how this is done. | ||||||||||||||
| assert!( | ||||||||||||||
| buf[0] < 64, | ||||||||||||||
| "Number of unoccupied bits should be <64, got {}", | ||||||||||||||
itsibitzi marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||
| buf[0] | ||||||||||||||
| ); | ||||||||||||||
|
|
||||||||||||||
| let num_bits = (buf.len() - 1) * 8 - buf[0] as usize; | ||||||||||||||
| buf = &buf[1..]; | ||||||||||||||
|
|
||||||||||||||
| assert_eq!( | ||||||||||||||
| buf.len() % BYTES_PER_BLOCK, | ||||||||||||||
| 0, | ||||||||||||||
| "buffer should be a multiple of 8 bytes, got {}", | ||||||||||||||
| buf.len() | ||||||||||||||
| ); | ||||||||||||||
|
|
||||||||||||||
| let blocks = unsafe { | ||||||||||||||
| std::mem::transmute(std::slice::from_raw_parts( | ||||||||||||||
| buf.as_ptr(), | ||||||||||||||
| buf.len() / BYTES_PER_BLOCK, | ||||||||||||||
| )) | ||||||||||||||
| }; | ||||||||||||||
|
||||||||||||||
| }; | |
| let (prefix, blocks, suffix) = unsafe { buf.align_to::<u64>() }; | |
| assert!( | |
| prefix.is_empty() && suffix.is_empty(), | |
| "Buffer is not properly aligned for u64" | |
| ); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| //! Convert a [`GeoDiffCount`] to and from byte arrays. | ||
itsibitzi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| //! | ||
| //! Since most of our target platforms are little endian there are more optimised approaches | ||
| //! for little endian platforms, just splatting the bytes into the writer. This is contrary | ||
| //! to the usual "network endian" approach where big endian is the default, but most of our | ||
| //! consumers are little endian so it makes sense for this to be the optimal approach. | ||
| //! | ||
| //! For now we do not support big endian platforms. In the future we might add a big endian | ||
| //! platform specific implementation which is able to read the little endian serialized | ||
| //! representation. For now, if you attempt to serialize a filter on a big endian platform | ||
| //! you get a panic. | ||
|
|
||
| use std::{borrow::Cow, ops::Deref as _}; | ||
|
|
||
| use crate::{config::GeoConfig, Diff}; | ||
|
|
||
| use super::{bitvec::BitVec, GeoDiffCount}; | ||
|
|
||
| impl<'a, C: GeoConfig<Diff>> GeoDiffCount<'a, C> { | ||
| /// Create a new [`GeoDiffCount`] from a slice of bytes | ||
| #[cfg(target_endian = "little")] | ||
itsibitzi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| pub fn from_bytes(c: C, buf: &'a [u8]) -> Self { | ||
| if buf.is_empty() { | ||
| return Self::new(c); | ||
| } | ||
|
|
||
| // The number of most significant bits stores in the MSB sparse repr | ||
| let msb_len = (buf.len() / size_of::<C::BucketType>()).min(c.max_msb_len()); | ||
|
|
||
| let msb = | ||
| unsafe { std::slice::from_raw_parts(buf.as_ptr() as *const C::BucketType, msb_len) }; | ||
|
|
||
| // The number of bytes representing the MSB - this is how many bytes we need to | ||
| // skip over to reach the LSB | ||
| let msb_bytes_len = msb_len * size_of::<C::BucketType>(); | ||
|
|
||
| Self { | ||
| config: c, | ||
| msb: Cow::Borrowed(msb), | ||
| lsb: BitVec::from_bytes(&buf[msb_bytes_len..]), | ||
| } | ||
| } | ||
|
|
||
| /// Create a new [`GeoDiffCount`] from a slice of bytes | ||
| #[cfg(target_endian = "big")] | ||
| pub fn from_bytes(c: C, buf: &'a [u8]) -> Self { | ||
| unimplemented!("not supported on big endian platforms") | ||
| } | ||
itsibitzi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| #[cfg(target_endian = "little")] | ||
| pub fn write<W: std::io::Write>(&self, writer: &mut W) -> std::io::Result<usize> { | ||
| if self.msb.is_empty() { | ||
| return Ok(0); | ||
| } | ||
|
|
||
| let msb_buckets = self.msb.deref(); | ||
| let msb_bytes = unsafe { | ||
| std::slice::from_raw_parts( | ||
| msb_buckets.as_ptr() as *const u8, | ||
| msb_buckets.len() * size_of::<C::BucketType>(), | ||
| ) | ||
| }; | ||
| writer.write_all(msb_bytes)?; | ||
|
|
||
| let mut bytes_written = msb_bytes.len(); | ||
|
|
||
| bytes_written += self.lsb.write(writer)?; | ||
|
|
||
| Ok(bytes_written) | ||
| } | ||
|
|
||
| #[cfg(target_endian = "big")] | ||
| pub fn write<W: std::io::Write>(&self, writer: &mut W) -> std::io::Result<usize> { | ||
| unimplemented!("not supported on big endian platforms") | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.