Skip to content

Conversation

@ozgrakkurt
Copy link
Contributor

@ozgrakkurt ozgrakkurt commented Nov 26, 2025

Summary by CodeRabbit

  • Chores
    • Consolidated Arrow & Parquet dependencies at workspace level and migrated internal data handling to Arrow RecordBatch; simplified Parquet writing to stream batches directly.
  • Breaking Changes
    • Removed optional Ethers integration and all related conversions/features.
    • Public surface now uses RecordBatch; several legacy batch helpers/aliases and re-exports were removed.
  • Tests
    • Updated tests for RecordBatch access patterns, name-based column access, and row-count semantics.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 26, 2025

Walkthrough

This PR migrates code from polars-arrow types to the native arrow crate, promotes workspace-scoped arrow/parquet dependencies, replaces ArrowBatch with arrow::array::RecordBatch across client/schema/format/streaming layers, rewrites parquet output to use AsyncArrowWriter, consolidates FromArrow into RecordBatch-based inherent constructors, and removes the ethers feature and its conversions.

Changes

Cohort / File(s) Change Summary
Workspace dependencies
Cargo.toml, hypersync-schema/Cargo.toml, hypersync-client/Cargo.toml
Add workspace-scoped arrow and parquet; remove polars-arrow; drop tokio-util and ethers uses; expand tokio features; add futures/arrayvec.
Examples
examples/all_erc20
examples/all_erc20/Cargo.toml, examples/all_erc20/src/main.rs
Switch example to workspace arrow; update amount extraction to use Arrow compute (Float64Type) and compute::sum.
Core API & exports
hypersync-client/src/lib.rs, hypersync-client/src/types.rs, hypersync-schema/src/lib.rs
Remove ArrowBatch/ArrowChunk aliases and exports; replace stored ArrowBatch vectors with RecordBatch; remove to_ethers module and related public items; switch schema constructors to Schema::new.
FromArrow & parsing
hypersync-client/src/from_arrow.rs, hypersync-client/src/parse_response.rs
Replace trait-based FromArrow with per-type from_arrow(&RecordBatch) -> Vec<Self>; unify column access helpers; parse_response now collects Vec<RecordBatch> using FileReader::try_new.
Column mapping & utils
hypersync-client/src/column_mapping.rs, hypersync-client/src/util.rs
Rework mapping to Arrow ArrayRef/PrimitiveArray/RecordBatch; update casting/decimal mappings (Decimal128(38,0)), builders, hex-encoding and log-decoding; remove map_batch_to_binary_view.
Streaming & parquet output
hypersync-client/src/stream.rs, hypersync-client/src/parquet_out.rs
Migrate pipeline to RecordBatch usage (num_rows()); replace Polars-based row-group encoder with parquet::AsyncArrowWriter; spawn/run writer channels now send RecordBatch.
hypersync-format: remove ethers
hypersync-format/Cargo.toml, hypersync-format/src/types/*.rs
Remove ethers dependency and feature; delete feature-gated conversions and related error types (FixedSizeData, Quantity, UInt, Withdrawal conversions).
Schema util removal
hypersync-schema/src/util.rs
Remove project_schema public function and related imports.
Tests & small fixes
hypersync-client/tests/api_test.rs, hypersync-net-types/src/*, fuzz/fuzz_targets/net_types_serde_capnp.rs, examples/call_decode_output/src/main.rs
Update tests for RecordBatch access (batch.num_rows(), column_by_name(...).as_primitive()), adjust field name access to f.name(), modify fuzz equality check for QueryId, and remove obsolete FromArrow import in example.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant StreamProcessor
    participant ColumnMapping
    participant ParquetWriter

    Client->>StreamProcessor: request data
    StreamProcessor->>StreamProcessor: collect Vec<RecordBatch>
    StreamProcessor->>ColumnMapping: apply_to_batch(batch, mapping)
    ColumnMapping-->>StreamProcessor: mapped RecordBatch
    alt hex encoding
        StreamProcessor->>StreamProcessor: hex_encode_batch(batch, prefixed)
        StreamProcessor-->>StreamProcessor: encoded RecordBatch
    end
    StreamProcessor->>ParquetWriter: send RecordBatch (mpsc)
    ParquetWriter->>ParquetWriter: AsyncArrowWriter::write(batch)
    ParquetWriter-->>StreamProcessor: finished / ack
    StreamProcessor-->>Client: yield processed data
Loading
sequenceDiagram
    participant IPCReader as Arrow IPC reader
    participant Parser as parse_response
    participant FromArrow as per-type constructors
    participant Consumer as App

    IPCReader->>Parser: read file -> RecordBatch chunks
    Parser->>Parser: FileReader::try_new -> Vec<RecordBatch>
    Parser-->>FromArrow: Vec<RecordBatch>
    FromArrow->>FromArrow: from_arrow(&RecordBatch) -> Vec<Struct>
    FromArrow-->>Consumer: Vec<Struct>
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

  • Files/areas needing extra attention:
    • hypersync-client/src/from_arrow.rs — field extraction, null handling, and per-row assembly.
    • hypersync-client/src/column_mapping.rs — casting rules, Decimal128(38,0) mapping, builder correctness.
    • hypersync-client/src/parquet_out.rs — AsyncArrowWriter lifecycle, schema-from-first-batch logic, channel handling.
    • hypersync-client/src/stream.rs and util.rs — batch transformations, hex encoding and decode correctness, and API surface changes.
    • Tests — ensure updated access patterns match runtime behavior.

Possibly related PRs

Suggested reviewers

  • JonoPrest
  • JasoonS

Poem

🐇 I hopped through crates and RecordBatch streams,
swapped Polars dreams for Arrow beams.
Ethers slid off like dew at dawn,
writers hum tidy as I hop on.
🥕 hooray — a rabbit’s tidy code dance

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 51.69% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely describes the main changes: migrating from polars-arrow to arrow-rs and removing ethers integration, which aligns with the primary objectives of this substantial refactoring PR.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
hypersync-client/src/util.rs (1)

147-159: Remove dead code: sequential decoding loop is unused.

The decoded_cols vector built here is never used. The parallel iteration on lines 161-174 performs the same work and its result is what's actually returned.

-        let mut decoded_cols = Vec::with_capacity(event.body().len());
-
-        for (i, ty) in event.body().iter().enumerate() {
-            decoded_cols.push(
-                decode_body_col(
-                    decoded_tuples
-                        .iter()
-                        .map(|t| t.as_ref().map(|t| t.get(i).unwrap())),
-                    ty,
-                )
-                .context("decode body column")?,
-            );
-        }
-
         event
🧹 Nitpick comments (5)
Cargo.toml (1)

18-20: Workspace-wide Arrow/Parquet versions are a good move

Defining arrow = "57" and parquet = "57" at the workspace level should prevent version skew between crates and fits the Arrow‑native refactor.

If compile times or binary size become a concern later, you could consider tightening default features here and enabling only the subsets each crate actually needs.

hypersync-schema/src/lib.rs (1)

156-167: Consider expanding test coverage.

The smoke test verifies that schema constructors don't panic, but doesn't validate the schema contents. Consider adding assertions to verify field counts or specific field properties for regression detection.

hypersync-client/src/stream.rs (2)

377-384: Inconsistent accessor pattern for UInt8Array.

UInt8 uses manual downcast_ref while UInt64 uses as_primitive::<UInt64Type>(). Consider using as_primitive::<UInt8Type>() for consistency:

-        ArrowDataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(
-            array
-                .as_any()
-                .downcast_ref::<UInt8Array>()
-                .unwrap()
-                .iter()
-                .rev(),
-        ))),
+        ArrowDataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(
+            array.as_primitive::<arrow::datatypes::UInt8Type>().iter().rev(),
+        ))),

345-346: Consider propagating error instead of unwrap.

RecordBatch::try_new().unwrap() could panic if there's a schema/column mismatch. While the current logic preserves schema structure, using ? would provide better error messages:

-        batch = RecordBatch::try_new(batch.schema(), cols).unwrap();
+        batch = RecordBatch::try_new(batch.schema(), cols)
+            .context("reconstruct batch after reverse")?;
hypersync-client/src/from_arrow.rs (1)

86-91: Silent handling of type mismatches in column_as.

column_as returns None both when a column doesn't exist AND when it exists but has an unexpected type. This could mask schema evolution bugs. Consider logging when downcast fails:

 fn column_as<'a, T: 'static>(batch: &'a RecordBatch, col_name: &str) -> Option<&'a T> {
     match batch.column_by_name(col_name) {
         None => None,
-        Some(c) => c.as_any().downcast_ref::<T>(),
+        Some(c) => {
+            let result = c.as_any().downcast_ref::<T>();
+            if result.is_none() {
+                log::trace!("column '{}' exists but has unexpected type", col_name);
+            }
+            result
+        }
     }
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e7991dc and 9c2ff0f.

📒 Files selected for processing (26)
  • Cargo.toml (1 hunks)
  • examples/all_erc20/Cargo.toml (1 hunks)
  • examples/all_erc20/src/main.rs (2 hunks)
  • hypersync-client/Cargo.toml (1 hunks)
  • hypersync-client/src/column_mapping.rs (3 hunks)
  • hypersync-client/src/from_arrow.rs (2 hunks)
  • hypersync-client/src/lib.rs (1 hunks)
  • hypersync-client/src/parquet_out.rs (3 hunks)
  • hypersync-client/src/parse_response.rs (1 hunks)
  • hypersync-client/src/stream.rs (4 hunks)
  • hypersync-client/src/to_ethers.rs (0 hunks)
  • hypersync-client/src/types.rs (1 hunks)
  • hypersync-client/src/util.rs (13 hunks)
  • hypersync-client/tests/api_test.rs (9 hunks)
  • hypersync-format/Cargo.toml (0 hunks)
  • hypersync-format/src/types/fixed_size_data.rs (0 hunks)
  • hypersync-format/src/types/quantity.rs (0 hunks)
  • hypersync-format/src/types/uint.rs (0 hunks)
  • hypersync-format/src/types/withdrawal.rs (0 hunks)
  • hypersync-net-types/src/block.rs (1 hunks)
  • hypersync-net-types/src/log.rs (1 hunks)
  • hypersync-net-types/src/trace.rs (1 hunks)
  • hypersync-net-types/src/transaction.rs (1 hunks)
  • hypersync-schema/Cargo.toml (1 hunks)
  • hypersync-schema/src/lib.rs (4 hunks)
  • hypersync-schema/src/util.rs (0 hunks)
💤 Files with no reviewable changes (7)
  • hypersync-format/Cargo.toml
  • hypersync-format/src/types/withdrawal.rs
  • hypersync-format/src/types/uint.rs
  • hypersync-format/src/types/fixed_size_data.rs
  • hypersync-format/src/types/quantity.rs
  • hypersync-schema/src/util.rs
  • hypersync-client/src/to_ethers.rs
🧰 Additional context used
🧬 Code graph analysis (5)
examples/all_erc20/src/main.rs (1)
hypersync-client/src/stream.rs (4)
  • array (366-366)
  • array (369-369)
  • array (375-375)
  • array (378-380)
hypersync-client/src/from_arrow.rs (1)
hypersync-client/src/types.rs (1)
  • from (56-94)
hypersync-client/src/stream.rs (1)
hypersync-client/src/util.rs (4)
  • decode_logs_batch (72-181)
  • hex_encode_batch (42-70)
  • batch (43-65)
  • batch (101-104)
hypersync-client/src/column_mapping.rs (3)
hypersync-client/src/stream.rs (6)
  • array (366-366)
  • array (369-369)
  • array (375-375)
  • array (378-380)
  • batch (339-343)
  • new (433-449)
hypersync-client/tests/api_test.rs (2)
  • batch (96-99)
  • batch (100-103)
hypersync-client/src/util.rs (4)
  • batch (43-65)
  • batch (101-104)
  • v (332-332)
  • v (333-333)
hypersync-client/src/util.rs (1)
hypersync-client/src/column_mapping.rs (4)
  • batch (81-108)
  • col (116-116)
  • col (151-151)
  • col (177-177)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test_release
🔇 Additional comments (29)
hypersync-net-types/src/block.rs (1)

423-428: Switch to f.name() is the correct adaptation for arrow-rs Field

Using f.name().clone() matches the arrow-rs Field API and keeps the test semantics (collecting owned String names) unchanged. Looks good.

hypersync-net-types/src/log.rs (1)

546-551: Log schema test correctly updated to use f.name()

The change to .map(|f| f.name().clone()) matches the Arrow Field accessor and preserves the original comparison logic between schema fields and LogField::all().

hypersync-net-types/src/trace.rs (1)

791-797: Trace schema field-name extraction correctly switched to f.name()

This aligns the test with the Arrow Field API while keeping the BTreeSet comparison against TraceField::all() identical in behavior.

hypersync-net-types/src/transaction.rs (1)

1224-1229: Transaction schema test updated to Arrow-style f.name() access

Using f.name().clone() is appropriate for arrow-rs Field and keeps the schema-vs-TransactionField::all() equality check intact.

examples/all_erc20/Cargo.toml (1)

9-9: Arrow as a workspace dependency looks correct

Sharing arrow via workspace = true keeps the example aligned with the main crates’ Arrow version and features. No issues from this change alone.

hypersync-client/Cargo.toml (1)

11-12: Workspace Arrow/Parquet wiring and async stack look consistent

Using workspace-scoped arrow and parquet with ipc_compression and arrow/async features aligns with your IPC + Parquet usage, and the expanded Tokio features match the client’s async patterns.

No manifest issues stand out; just ensure you’ve run cargo tree -e features to confirm there are no unexpected feature combos or duplicate Arrow versions pulled in.

Also applies to: 19-28

hypersync-schema/Cargo.toml (1)

10-10: Schema crate now correctly follows workspace Arrow

Switching to arrow = { workspace = true } removes the polars-arrow dependency and ensures this crate tracks the same Arrow version as the rest of the workspace. Looks good.

hypersync-client/src/parse_response.rs (1)

1-3: IPC FileReader + RecordBatch conversion looks correct

The new read_chunks implementation using Cursor<&[u8]> and ipc::reader::FileReader::try_new to yield Vec<RecordBatch> is idiomatic arrow-rs and integrates cleanly with ArrowResponseData. (arrow.apache.org) The contextual error messages for each stage (create reader, read chunk, parse * data) are also nice.

Just double‑check that the server is indeed emitting Arrow IPC file format (not streaming), as this determines whether FileReader vs StreamReader is appropriate.

Also applies to: 5-5, 8-18, 57-66

hypersync-client/tests/api_test.rs (2)

4-4: Arrow-based column access in ordering test is appropriate

Using column_by_name("block_number") / "log_index" with as_primitive::<UInt64Type>() and iterating over the resulting primitive arrays is a solid Arrow-native replacement for the previous Polars/ArrowBatch logic. This keeps the ordering assertion semantics the same while matching arrow-rs APIs.

Also applies to: 96-107


177-177: Switch to num_rows() across tests matches RecordBatch semantics

Updating assertions and aggregates to use batch.num_rows() (for decoded logs, blocks, transactions, and logs) is the right adaptation now that ArrowResponseData exposes plain RecordBatch vectors instead of custom ArrowBatch wrappers. The logical intent of the tests is preserved while aligning with the new data model.

Also applies to: 351-352, 357-358, 381-382, 387-387, 411-412, 441-442, 463-464, 489-490

hypersync-client/src/types.rs (1)

3-4: RecordBatch-based ArrowResponseData and FromArrow wiring look coherent

Replacing the custom ArrowBatch wrapper with Vec<RecordBatch> for all ArrowResponseData collections is consistent with arrow-rs and simplifies the type surface. The From<&ArrowResponse> for QueryResponse impl correctly delegates to Block::from_arrow / Transaction::from_arrow / Log::from_arrow / Trace::from_arrow, and importing the FromArrow trait ensures these trait methods resolve.

This refactor should be transparent to external users of ArrowResponse and QueryResponse while aligning the client with arrow-rs’s native batch type.

Also applies to: 5-6, 10-23, 55-95

hypersync-schema/src/lib.rs (1)

1-53: LGTM! Clean migration to arrow-rs schema construction.

The schema definitions correctly use Schema::new(vec![...]).into() to create SchemaRef (Arc). The transition from BinaryView/Utf8View to Binary/Utf8 aligns with arrow-rs standard types.

hypersync-client/src/lib.rs (2)

113-113: Breaking change: ArrowBatch removed from public exports.

This is a breaking API change. Consumers previously using ArrowBatch will need to migrate to using RecordBatch directly. Consider documenting this in release notes or CHANGELOG.


75-113: LGTM! Clean removal of Polars-based abstractions.

The public API is simplified by removing the ArrowBatch indirection. The client now works directly with Arrow's RecordBatch, which is a cleaner API surface.

hypersync-client/src/stream.rs (1)

218-220: LGTM! Clean migration to RecordBatch-based row counting.

The implementation correctly uses RecordBatch::num_rows() which is the idiomatic arrow-rs approach.

hypersync-client/src/parquet_out.rs (1)

175-206: LGTM! Clean AsyncArrowWriter implementation.

The implementation correctly:

  1. Uses the first batch's schema to initialize the writer
  2. Handles the empty batch case by returning early
  3. Uses BufWriter for efficient I/O
  4. Properly closes the writer with close().await
hypersync-client/src/util.rs (8)

15-24: LGTM!

The change from String::from_utf8_unchecked to String::from_utf8(...).unwrap() is a good safety improvement. The faster_hex::hex_encode output is guaranteed to be valid UTF-8 (hex characters only), so the unwrap is safe, and this avoids the need for an unsafe block.


26-40: LGTM!

The function correctly uses StringBuilder with append_option to handle nullable values, and the branching for prefixed vs. non-prefixed encoding is clean.


42-70: LGTM!

The parallel iteration pattern aligns well with similar code in column_mapping.rs. The downcast_ref().unwrap() on line 50 is safe because the match guard ensures we're only downcasting when data_type() is Binary. The try_new(...).unwrap() on line 69 is safe since the schema is constructed from the same transformed columns.


183-256: LGTM!

The decode_body_col function correctly handles nullable values using builders with append_null() and append_value().


258-328: LGTM!

The decode_col function mirrors the pattern in decode_body_col with proper null handling for topic decoding.


330-345: LGTM!

The function correctly handles various DynSolValue variants and appends binary data appropriately.


347-404: LGTM!

Schema construction and field creation logic is correct, properly separating indexed and non-indexed inputs.


422-469: LGTM!

Tests are properly updated to use the new Arrow API with Schema::new, BinaryBuilder, and direct array access methods.

hypersync-client/src/column_mapping.rs (5)

1-71: LGTM!

The imports are properly organized for the Arrow migration, and the DataType enum with its conversion to ArrowDataType is well-defined. The Decimal128(38, 0) mapping is the standard maximum precision for 128-bit decimals.


73-113: LGTM overall!

The parallel mapping pattern is consistent with util.rs and correctly handles the column transformation with proper error propagation.


130-146: LGTM!

The to_array_ref helper and map_column dispatch are clean and correctly route to the appropriate conversion functions.


282-327: LGTM!

The generic binary_to_target_array function is well-designed with proper null handling via append_option. The float conversion functions correctly handle signed values through the absolute value approach.


329-355: LGTM!

The test covers important edge cases including negative numbers, zero, positive numbers, and boundary values (i64::MAX, i64::MIN).

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
hypersync-client/src/column_mapping.rs (2)

115-128: map_l1_fee_scalar still has multiple unwraps and can panic on bad data

This function will currently panic if:

  • The array is not actually a BinaryArray.
  • The bytes are not valid UTF‑8.
  • The parsed string is not a valid f64.

Given that this operates on external data, these should be treated as errors, not panics. You can mirror the error-handling style used elsewhere in this file:

 pub fn map_l1_fee_scalar(col: &dyn Array, target_data_type: DataType) -> Result<ArrayRef> {
-    let col = col.as_any().downcast_ref::<BinaryArray>().unwrap();
-    let col = Float64Array::from_iter(
-        col.iter()
-            .map(|v| v.map(|v| std::str::from_utf8(v).unwrap().parse().unwrap())),
-    );
+    let col = col
+        .as_any()
+        .downcast_ref::<BinaryArray>()
+        .context("expected BinaryArray for l1_fee_scalar")?;
+
+    let values: Vec<Option<f64>> = col
+        .iter()
+        .map(|opt| {
+            opt.map(|bytes| {
+                let s = std::str::from_utf8(bytes)
+                    .context("l1_fee_scalar: invalid UTF-8")?;
+                let v = s
+                    .parse::<f64>()
+                    .context("l1_fee_scalar: failed to parse float")?;
+                Ok(v)
+            })
+            .transpose()
+        })
+        .collect::<Result<_>>()?;
+
+    let col = Float64Array::from(values);
 
     let arrow_dt = ArrowDataType::from(target_data_type);
 
     let arr = compute::cast(&col, &arrow_dt)
-        .with_context(|| anyhow!("failed to l1_fee_scalar to {:?}", target_data_type))?;
+        .with_context(|| anyhow!("failed to cast l1_fee_scalar to {:?}", target_data_type))?;
 
     Ok(arr)
 }

This keeps the same semantics but turns all malformed inputs into structured errors instead of aborting the process.


243-253: Fix error message in map_to_decimal128 catch-all arm

The catch-all error currently says "int64" even though this function maps to Decimal128, which can be confusing when debugging:

-        dt => Err(anyhow!("Can't convert {:?} to int64", dt)),
+        dt => Err(anyhow!("Can't convert {:?} to decimal128", dt)),

This aligns the message with the actual target type.

🧹 Nitpick comments (2)
hypersync-client/src/column_mapping.rs (2)

81-113: Avoid unwrap on RecordBatch::try_new to prevent panics

The overall apply_to_batch logic (parallel per-column mapping plus schema reconstruction) looks solid, but the unwrap() on RecordBatch::try_new can still panic if a length/schema invariant is ever violated.

Consider propagating the Arrow error instead:

-    let schema = Arc::new(Schema::new(fields));
-
-    Ok(RecordBatch::try_new(schema, cols).unwrap())
+    let schema = Arc::new(Schema::new(fields));
+    let batch = RecordBatch::try_new(schema, cols)
+        .context("failed to build mapped RecordBatch")?;
+    Ok(batch)

This keeps the function panic-free while preserving the same external behavior under normal conditions.


295-301: Make signed_binary_to_target error text type-agnostic

signed_binary_to_target is used for both signed and unsigned targets (e.g., u64, u32), but the error message mentions only “signed type”. A more neutral message avoids confusion:

-        .map_err(|_e| anyhow!("failed to cast number to requested signed type"))
+        .map_err(|_e| anyhow!("failed to cast number to requested integer type"))

This better reflects actual usage without changing behavior.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9c2ff0f and 3715b1b.

📒 Files selected for processing (1)
  • hypersync-client/src/column_mapping.rs (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
hypersync-client/src/column_mapping.rs (4)
hypersync-client/src/stream.rs (6)
  • array (366-366)
  • array (369-369)
  • array (375-375)
  • array (378-380)
  • batch (339-343)
  • new (433-449)
hypersync-client/tests/api_test.rs (2)
  • batch (96-99)
  • batch (100-103)
hypersync-client/src/util.rs (4)
  • batch (43-65)
  • batch (101-104)
  • v (332-332)
  • v (333-333)
hypersync-client/src/types.rs (1)
  • from (56-94)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: fuzz
  • GitHub Check: test_release
🔇 Additional comments (2)
hypersync-client/src/column_mapping.rs (2)

57-69: DataType → ArrowDataType mapping looks consistent

The enum-to-Arrow mapping (including Decimal256(76, 0) and Decimal128(38, 0)) is coherent and matches the rest of the conversion helpers in this module. No changes needed here.


329-355: Nice focused test coverage on numeric/string/decimal round-trips

The test_signed_binary_to_target test validating conversions across i64, float, string, and decimal paths from the same raw bytes is a good sanity check for the shared helpers in this module. No changes needed here.

@ozgrakkurt ozgrakkurt changed the title Use arrow-rs instead of polars-arrow Use arrow-rs instead of polars-arrow and remove ethers integration Nov 26, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
hypersync-client/src/util.rs (1)

109-145: Null handling improved but could be more explicit.

The change from opt.unwrap() (which would panic) to opt.unwrap_or(b"") addresses the previous review concern. However, the current approach is indirect:

  1. Null data → empty bytes
  2. Empty bytes → likely decode failure (caught at lines 132-141)
  3. Decode failure → None tuple → nulls in output columns

Consider making this more explicit by directly checking for null and returning None:

             .map(|opt| {
-                let val = opt.unwrap_or(b"");
+                let val = match opt {
+                    Some(v) => v,
+                    None => return Ok(None),
+                };
                 let tuple = tuple_decoder

This makes the intent clearer: null input data produces null decoded output.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3715b1b and 682dc88.

📒 Files selected for processing (1)
  • hypersync-client/src/util.rs (13 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: fuzz
  • GitHub Check: test_release
🔇 Additional comments (7)
hypersync-client/src/util.rs (7)

26-40: LGTM!

The migration to StringBuilder with proper null handling via append_option is correct.


183-256: LGTM!

The builder pattern usage is correct for arrow-rs, with proper null handling via append_null() and Arc wrapping of finished arrays.


258-328: LGTM!

Consistent with decode_body_col, the arrow-rs builder usage and null handling are correct.


330-345: LGTM!

The migration to arrow-rs BinaryBuilder::append_value() is correct for all handled Solidity value types.


347-375: LGTM!

Using Schema::new(fields) is the correct constructor for arrow-rs.


377-404: LGTM!

Using the Field::name() accessor method is correct for the arrow-rs API.


422-469: LGTM!

Tests correctly updated to use arrow-rs API with Schema::new and BinaryBuilder.

Comment on lines 14 to 15
/// Convert given record batch to a vector of structs
fn from_arrow(batch: &RecordBatch) -> Vec<Self>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, we don't need this trait any more if we're only dealing with binary/utf8

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, it can just be a method on each struct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this trait

match array {
None => None,
Some(a) => {
if a.is_valid(index) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does "is_valid" do a bounds check? Or just if the value is null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It checks if the value is null

None => None,
Some(a) => {
if a.is_valid(index) {
Some(a.value(index).try_into().unwrap())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely want to make this safe if possible 🙏🏼

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

Comment on lines +47 to +56
fn get_bool(array: Option<&BooleanArray>, index: usize) -> Option<bool> {
match array {
None => None,
Some(a) => {
if a.is_valid(index) {
Some(a.value(index))
} else {
None
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function can be made generic probably.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is worth it. These functions fairly simple now but it tends to look ugly if there are generics on it. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally fine 👍🏼

fn column_as<'a, T: 'static>(batch: &'a RecordBatch, col_name: &str) -> Option<&'a T> {
match batch.column_by_name(col_name) {
None => None,
Some(c) => c.as_any().downcast_ref::<T>(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does downcas_ref panic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It returns None if the type doesn't match. I'll work through this stuff to make sure error handling is solid

Copy link
Collaborator

@JonoPrest JonoPrest left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ozgrakkurt before this is merged and released I'm keen to have some tests, and have a safe api for the end user.

Happy if you want to merge this into a feature branch so long and cut a side release for internal use 👍🏼

@ozgrakkurt
Copy link
Contributor Author

Hey @ozgrakkurt before this is merged and released I'm keen to have some tests, and have a safe api for the end user.

Happy if you want to merge this into a feature branch so long and cut a side release for internal use 👍🏼

Hey Jono, makes sense. I had the same feeling but wasn't sure if it would be worth the time investment.

I'll remove the panics as much as I can, then go through the code to see what is easily testable once.

Then I'll also create a testing setup with pre-fetched data from server and add some integration tests like that.

Maybe I'll add fuzz tests for some parts of the code too.

LMK if you have something different in mind

@ozgrakkurt ozgrakkurt requested a review from JonoPrest November 27, 2025 17:38
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
hypersync-client/tests/api_test.rs (2)

86-145: LGTM!

The test correctly validates the decoding of Uniswap V2 PairCreated events and verifies the extracted fields against expected values.

Minor nit: FixedSizeData<20> likely implements Copy, so .clone() on line 101 can be simplified to just passing contract_addr directly.


260-262: Consider extracting a helper for API token retrieval.

The pattern std::env::var(ENVIO_API_TOKEN).unwrap() is repeated across all tests. Consider extracting a helper function to reduce repetition and provide a more descriptive error message:

fn get_api_token() -> String {
    std::env::var(ENVIO_API_TOKEN)
        .expect("ENVIO_API_TOKEN environment variable must be set for integration tests")
}
hypersync-format/src/types/quantity.rs (2)

48-66: Numeric conversions look correct; consider deduplicating canonicalization logic

The TryFrom<i64> / From<u64> implementations correctly enforce non-negativity and produce canonical big‑endian byte representations that satisfy the Quantity invariants. To avoid duplicating the to_be_bytes + canonicalize_bytes pattern (also used in the Arbitrary impl) and reduce the chance of future drift, you could route TryFrom<i64> through From<u64> (or a tiny shared helper), for example:

 impl TryFrom<i64> for Quantity {
     type Error = Error;

-    fn try_from(value: i64) -> Result<Self> {
-        if value < 0 {
-            return Err(Error::UnexpectedQuantity(value.to_string()));
-        }
-
-        Ok(Quantity::from(canonicalize_bytes(
-            value.to_be_bytes().as_slice().to_vec(),
-        )))
-    }
+    fn try_from(value: i64) -> Result<Self> {
+        let unsigned =
+            u64::try_from(value).map_err(|_| Error::UnexpectedQuantity(value.to_string()))?;
+        Ok(Quantity::from(unsigned))
+    }
 }

(This keeps behavior identical today while centralizing the integer→bytes mapping.)


86-103: Visitor numeric handling is sound; consider adding a negative‑integer test

Delegating visit_i64/visit_u64 to the new TryFrom<i64> / From<u64> keeps serde behavior aligned with the core conversion APIs and correctly rejects negative quantities while accepting both signed/unsigned positives. To lock this in, it would be helpful to add a serde‑test case (and/or a JSON test) that exercises a negative integer (e.g., Token::I64(-1) or "-1") and asserts that deserialization fails with the expected error, so future changes to TryFrom<i64> can’t silently weaken the invariant.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 682dc88 and a14f7cf.

📒 Files selected for processing (3)
  • fuzz/fuzz_targets/net_types_serde_capnp.rs (1 hunks)
  • hypersync-client/tests/api_test.rs (23 hunks)
  • hypersync-format/src/types/quantity.rs (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
hypersync-format/src/types/quantity.rs (1)
hypersync-format/src/types/util.rs (1)
  • canonicalize_bytes (11-22)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: fuzz
🔇 Additional comments (6)
fuzz/fuzz_targets/net_types_serde_capnp.rs (1)

40-46: LGTM: Improved test clarity with explicit dual assertions.

The split into two separate assertions enhances test coverage:

  1. Object equality (lines 40-42): Validates that QueryId derived from the original query matches the QueryId derived from the deserialized query, ensuring Query deserialization preserves QueryId-relevant information.
  2. Bytes consistency (lines 43-46): Validates that the QueryId bytes match what was serialized in the capnp message body, ensuring capnp QueryId serialization correctness.

This approach is more explicit and thorough than a single combined check.

hypersync-client/tests/api_test.rs (5)

1-21: LGTM!

The imports are appropriate for the arrow-rs migration, and centralizing the API token environment variable name in a constant improves maintainability across tests.


22-85: LGTM!

The helper function is well-structured with comprehensive field selections for querying Uniswap V2 pool creation events. The use of BTreeSet for field selections and ArrayVec for topic filters is appropriate.


147-253: LGTM!

Comprehensive test that validates the complete response structure for Uniswap V2 pool creation queries.

Note: The commented-out trace assertions (lines 226-252) with the comment "Doesn't return trace for some reason" suggest an open investigation. Consider tracking this as an issue if trace data is expected to be returned.


345-351: Verify null handling for primitive array iteration.

The iterator over as_primitive::<UInt64Type>() yields Option<u64> values. The .unwrap() on lines 346-347 will panic if the data contains null values.

If block_number and log_index are guaranteed non-null by the query's field selection, this is acceptable. Otherwise, consider using .expect("block_number should not be null") for clearer panic messages or handling nulls explicitly.


591-598: LGTM!

The migration from .chunk.len() to .num_rows() correctly aligns with the arrow-rs RecordBatch API. The pattern is applied consistently across all tests.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (4)
hypersync-client/src/from_arrow.rs (4)

24-38: Critical: try_into().unwrap() can panic on malformed binary data.

The .unwrap() on line 32 will panic if the binary data doesn't match the expected type. This helper is used throughout the codebase for extracting field values, making it a high-impact issue.

Based on past review feedback and your PR comment about removing panics, consider propagating the error:

-fn get_binary<'a, T: TryFrom<&'a [u8]>>(array: Option<&'a BinaryArray>, index: usize) -> Option<T>
+fn get_binary<'a, T: TryFrom<&'a [u8]>>(
+    array: Option<&'a BinaryArray>, 
+    index: usize
+) -> Result<Option<T>, Box<dyn std::error::Error>>
 where
     <T as TryFrom<&'a [u8]>>::Error: Debug,
 {
     match array {
         None => None,
         Some(a) => {
             if a.is_valid(index) {
-                Some(a.value(index).try_into().unwrap())
+                Some(a.value(index).try_into()
+                    .map_err(|e| format!("Failed to convert binary at index {}: {:?}", index, e))?)
             } else {
                 None
             }
         }
-    }
+    }.map(Ok).transpose()
 }

Then update all call sites to propagate errors accordingly.


137-147: Critical: Multiple unwraps can panic on malformed data.

Lines 139 and 147 use .unwrap() which will panic on:

  • Line 139: Invalid 32-byte chunks in uncles data
  • Line 147: Malformed bincode-serialized withdrawals data

Apply error handling consistent with your plan to remove panics:

-                uncles: get_binary(uncles, idx).map(|v: &[u8]| {
+                uncles: get_binary(uncles, idx).and_then(|v: &[u8]| {
                     v.chunks(32)
-                        .map(|chunk| chunk.try_into().unwrap())
-                        .collect()
+                        .map(|chunk| chunk.try_into().ok())
+                        .collect::<Option<Vec<_>>>()
                 }),
-                withdrawals: get_binary(withdrawals, idx).map(|v| bincode::deserialize(v).unwrap()),
+                withdrawals: get_binary(withdrawals, idx).and_then(|v| bincode::deserialize(v).ok()),

227-229: Critical: bincode::deserialize().unwrap() can panic on malformed data.

Both access_list and authorization_list deserialization will panic on corrupted bincode data.

Apply consistent error handling:

-                access_list: get_binary(access_list, idx).map(|v| bincode::deserialize(v).unwrap()),
-                authorization_list: get_binary(authorization_list, idx)
-                    .map(|v| bincode::deserialize(v).unwrap()),
+                access_list: get_binary(access_list, idx).and_then(|v| bincode::deserialize(v).ok()),
+                authorization_list: get_binary(authorization_list, idx).and_then(|v| bincode::deserialize(v).ok()),

354-355: Critical: bincode::deserialize().unwrap() can panic on malformed data.

The trace_address deserialization will panic on corrupted bincode data.

Apply consistent error handling:

-                trace_address: get_binary(trace_address, idx)
-                    .map(|v| bincode::deserialize(v).unwrap()),
+                trace_address: get_binary(trace_address, idx).and_then(|v| bincode::deserialize(v).ok()),
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a14f7cf and 74793b8.

📒 Files selected for processing (4)
  • examples/call_decode_output/src/main.rs (1 hunks)
  • hypersync-client/src/from_arrow.rs (2 hunks)
  • hypersync-client/src/lib.rs (1 hunks)
  • hypersync-client/src/types.rs (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
hypersync-client/src/types.rs (1)
hypersync-client/src/stream.rs (4)
  • array (366-366)
  • array (369-369)
  • array (375-375)
  • array (378-380)
hypersync-client/src/from_arrow.rs (8)
hypersync-format/src/types/uint.rs (1)
  • fmt (86-88)
hypersync-format/src/types/transaction_type.rs (1)
  • fmt (79-81)
hypersync-format/src/types/transaction_status.rs (1)
  • fmt (101-103)
hypersync-client/src/stream.rs (5)
  • array (366-366)
  • array (369-369)
  • array (375-375)
  • array (378-380)
  • batch (339-343)
hypersync-client/src/util.rs (5)
  • batch (43-65)
  • batch (101-104)
  • v (332-332)
  • v (333-333)
  • data (109-145)
examples/all_erc20/src/main.rs (1)
  • batch (87-90)
hypersync-client/tests/api_test.rs (2)
  • batch (336-339)
  • batch (340-343)
hypersync-client/src/simple_types.rs (3)
  • from (49-71)
  • data (104-112)
  • data (114-121)
🔇 Additional comments (4)
examples/call_decode_output/src/main.rs (1)

6-6: LGTM: Import update aligns with API changes.

The removal of FromArrow from imports is correct, as Trace::from_arrow is now an inherent method rather than a trait method.

hypersync-client/src/types.rs (1)

1-20: LGTM: Clean migration to RecordBatch.

The migration from ArrowBatch to RecordBatch is consistent throughout the ArrowResponseData struct, aligning with the PR's goal to use arrow-rs types directly.

hypersync-client/src/lib.rs (1)

112-112: LGTM: Public API cleanup.

Removing ArrowBatch and FromArrow from public exports correctly reflects the migration to using RecordBatch directly.

hypersync-client/src/from_arrow.rs (1)

266-305: LGTM: Safe implementation.

The Log::from_arrow implementation avoids unwraps and handles optional values properly throughout.

Comment on lines +231 to 235
blob_versioned_hashes: get_binary(blob_versioned_hashes, idx).map(|v: &[u8]| {
v.chunks(32)
.map(|chunk| chunk.try_into().unwrap())
.collect()
}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: try_into().unwrap() can panic on invalid hash length.

If blob_versioned_hashes contains chunks that aren't exactly 32 bytes, this will panic.

Apply safe conversion:

-                blob_versioned_hashes: get_binary(blob_versioned_hashes, idx).map(|v: &[u8]| {
+                blob_versioned_hashes: get_binary(blob_versioned_hashes, idx).and_then(|v: &[u8]| {
                     v.chunks(32)
-                        .map(|chunk| chunk.try_into().unwrap())
-                        .collect()
+                        .map(|chunk| chunk.try_into().ok())
+                        .collect::<Option<Vec<_>>>()
                 }),
🤖 Prompt for AI Agents
In hypersync-client/src/from_arrow.rs around lines 231 to 235, the code uses
try_into().unwrap() which can panic if a chunk is not exactly 32 bytes; replace
the unwrap with a safe conversion: for each chunk check chunk.len() == 32, copy
the 32 bytes into a [u8;32] buffer (e.g. create a [u8;32] and copy_from_slice),
and propagate or handle the error instead of panicking (return a Result/Err up
the call chain or convert the outer Option to None) so invalid-length chunks are
handled safely.

logs_bloom: get_binary(logs_bloom, idx),
type_: get_u8(type_, idx).map(TransactionType::from),
root: get_binary(root, idx),
status: get_u8(status, idx).map(|v| TransactionStatus::from_u8(v).unwrap()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Major: from_u8().unwrap() can panic on invalid status value.

If the status byte doesn't map to a valid TransactionStatus, this will panic.

Handle invalid values gracefully:

-                status: get_u8(status, idx).map(|v| TransactionStatus::from_u8(v).unwrap()),
+                status: get_u8(status, idx).and_then(|v| TransactionStatus::from_u8(v).ok()),
🤖 Prompt for AI Agents
In hypersync-client/src/from_arrow.rs around line 243, the code uses
TransactionStatus::from_u8(v).unwrap() which can panic on invalid status bytes;
replace the unwrap with a safe handling path — e.g., use
and_then(TransactionStatus::from_u8) to produce an Option<TransactionStatus> (or
match the result) and either map invalid values to None or to a defined fallback
like TransactionStatus::Unknown while logging the unexpected byte, or propagate
an error (Result) upstream instead; ensure no unwrap is used so
invalid/malformed status bytes are handled gracefully.

Comment on lines +247 to +248
l1_fee_scalar: get_binary(l1_fee_scalar, idx)
.map(|v| std::str::from_utf8(v).unwrap().parse().unwrap()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Double .unwrap() creates multiple panic points.

This line can panic on:

  1. Invalid UTF-8 in l1_fee_scalar
  2. Invalid decimal string format

Replace with safe parsing:

                 l1_fee_scalar: get_binary(l1_fee_scalar, idx)
-                    .map(|v| std::str::from_utf8(v).unwrap().parse().unwrap()),
+                    .and_then(|v| {
+                        std::str::from_utf8(v)
+                            .ok()
+                            .and_then(|s| s.parse().ok())
+                    }),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
l1_fee_scalar: get_binary(l1_fee_scalar, idx)
.map(|v| std::str::from_utf8(v).unwrap().parse().unwrap()),
l1_fee_scalar: get_binary(l1_fee_scalar, idx)
.and_then(|v| {
std::str::from_utf8(v)
.ok()
.and_then(|s| s.parse().ok())
}),

Comment on lines +27 to +30
let mut block_field_selection = BTreeSet::new();
block_field_selection.insert(BlockField::Number);
block_field_selection.insert(BlockField::Timestamp);
block_field_selection.insert(BlockField::Hash);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check out the query builder docs for next time:
https://docs.rs/hypersync-net-types/0.12.2/hypersync_net_types/query/struct.Query.html#basic-examples

Makes these things much simpler

}]]
);

// Doesn't return trace for some reason
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to use eth-traces.hypersync.xyz url

Copy link
Collaborator

@JonoPrest JonoPrest left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! Lets get this in. Won't cut a release yet until I've added my changes though.

@JonoPrest JonoPrest merged commit eedeed4 into enviodev:main Nov 28, 2025
5 checks passed
@coderabbitai coderabbitai bot mentioned this pull request Dec 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants