Skip to content

feat: Add ScanOrder and concurrent_files to ArrowScan for bounded-memory reads#3046

Open
sumedhsakdeo wants to merge 27 commits intoapache:mainfrom
sumedhsakdeo:fix/arrow-scan-benchmark-3036
Open

feat: Add ScanOrder and concurrent_files to ArrowScan for bounded-memory reads#3046
sumedhsakdeo wants to merge 27 commits intoapache:mainfrom
sumedhsakdeo:fix/arrow-scan-benchmark-3036

Conversation

@sumedhsakdeo
Copy link

@sumedhsakdeo sumedhsakdeo commented Feb 15, 2026

Summary

Addresses #3036 — ArrowScan.to_record_batches() uses executor.map + list() which eagerly materializes all record batches per file into memory, causing OOM on large tables.

This PR adds three parameters to to_arrow_batch_reader() that give users control over memory usage and parallelism:

  • batch_size — Controls the number of rows per batch passed to PyArrow's ds.Scanner. Default is PyArrow's built-in 131,072 rows.
  • order: ScanOrder — Controls batch ordering. ScanOrder.TASK (default) preserves existing behavior of returning batches grouped by file in task submission order, with each file fully materialized before proceeding to the next. ScanOrder.ARRIVAL yields batches as they are produced across files without materializing entire files into memory.
  • concurrent_files — Number of files to read concurrently when order=ScanOrder.ARRIVAL. A per-scan ThreadPoolExecutor(max_workers=concurrent_files) bounds concurrency, and a bounded queue (max 16 batches) provides backpressure to cap memory usage.

Problem

The current implementation materializes all batches from each file via list() inside executor.map, which runs up to min(32, cpu_count+4) files in parallel. For large files this means all batches from ~20 files are held in memory simultaneously before any are yielded to the consumer.

Solution

Before: OOM on large tables

 batches = table.scan().to_arrow_batch_reader()

After: bounded memory, tunable parallelism

from pyiceberg.table import ScanOrder

batches = table.scan().to_arrow_batch_reader(
    order=ScanOrder.ARRIVAL,
    concurrent_files=4,
    batch_size=10000,
)

Default behavior is unchanged — ScanOrder.TASK preserves the existing executor.map + list() path for backwards compatibility.

Architecture

When order=ScanOrder.ARRIVAL, batches flow through _bounded_concurrent_batches:

  1. All file tasks are submitted to a per-scan ThreadPoolExecutor(max_workers=concurrent_files)
  2. Workers push batches into a bounded Queue(maxsize=16) — when full, workers block (backpressure)
  3. The consumer yields batches from the queue via blocking queue.get()
  4. A sentinel value signals completion — no timeout-based polling
  5. On early termination (consumer stops), a cancel event is set and the queue is drained to unblock workers
  6. The executor context manager handles deterministic shutdown

Refactored to_record_batches into helpers: _prepare_tasks_and_deletes, _iter_batches_arrival, _iter_batches_materialized, _apply_limit.

Ordering semantics

Configuration File ordering Within-file ordering
ScanOrder.TASK (default) Batches grouped by file, in task submission order Row order
ScanOrder.ARRIVAL, concurrent_files=1 Grouped by file, sequential Row order
ScanOrder.ARRIVAL, concurrent_files>1 Interleaved (no grouping guarantee) Row order within each file

PR Stack

Breakdown of this large PR into smaller PRs:

  1. PR 0: batch_size forwarding
  2. PR 1: ScanOrder enum — stop materializing entire files
  3. PR 2: concurrent_files — bounded concurrent reads in arrival order
  4. PR 3: benchmark + docs guidance

Benchmark results

32 files × 500K rows, 5 columns (int64, float64, string, bool, timestamp), batch_size=131,072 (PyArrow default):

Config Throughput (rows/s) TTFR (ms) Peak Arrow Memory
default (TASK, all threads) 197,074,805 53.5 574.6 MB
arrival, cf=1 59,319,076 28.1 10.3 MB
arrival, cf=2 104,812,584 28.6 42.0 MB
arrival, cf=4 165,764,584 30.2 116.0 MB
arrival, cf=8 212,274,577 32.9 270.0 MB
arrival, cf=16 211,132,082 43.5 463.9 MB

TTFR = Time to First Record

Note on throughput plateau at cf=8: This benchmark runs against local filesystem where Parquet reads are CPU-bound (decompression + decoding). Throughput plateaus once enough threads saturate available cores. On cloud storage (S3/GCS/ADLS), reads are I/O-bound with 50-200ms per-file latency, so higher concurrent_files values (16-64+) would continue to show throughput gains until network bandwidth saturates. The optimal concurrent_files will be higher for remote storage than what this local benchmark suggests.

Positional deletes, row filters, and limit are handled correctly in all modes.

Are these changes tested?

Yes. 25 new unit tests across two test files, plus a micro-benchmark:

  • tests/io/test_pyarrow.py (16 tests): batch_size controls rows per batch, arrival order yields all rows correctly, arrival order respects limit, within-file ordering preserved, positional deletes applied correctly in all three modes (task order, arrival order, concurrent), positional deletes with limit, concurrent_files < 1 raises ValueError
  • tests/io/test_bounded_concurrent_batches.py (9 tests): single/multi-file correctness, incremental streaming, backpressure blocks producers when queue is full, error propagation from workers to consumer, early termination cancels workers cleanly, concurrency limit enforced, empty task list, ArrowScan integration with limit
  • tests/benchmark/test_read_benchmark.py: read throughput micro-benchmark across 6 configurations measuring rows/sec, TTFR, and peak Arrow memory

Are there any user-facing changes?

Yes. Three new optional parameters on DataScan.to_arrow_batch_reader():

  • batch_size: int | None — number of rows per batch (default: PyArrow's 131,072)
  • order: ScanOrder — controls batch ordering (ScanOrder.TASK default, ScanOrder.ARRIVAL for streaming)
  • concurrent_files: int — number of files to read concurrently in arrival order (default: 1)

New public enum ScanOrder exported from pyiceberg.table.

All parameters are optional with backwards-compatible defaults. Existing code is unaffected.

Documentation updated in mkdocs/docs/api.md with usage examples, ordering semantics, and configuration guidance table.

@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-benchmark-3036 branch 3 times, most recently from ab8c31b to 7ad9910 Compare February 15, 2026 01:47
Add batch_size parameter to _task_to_record_batches,
_record_batches_from_scan_tasks_and_deletes, ArrowScan.to_record_batches,
and DataScan.to_arrow_batch_reader so users can control the number of
rows per RecordBatch returned by PyArrow's Scanner.

Closes partially apache#3036

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-benchmark-3036 branch from 7ad9910 to c86f0be Compare February 15, 2026 02:10
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-benchmark-3036 branch from c86f0be to 05e07d1 Compare February 15, 2026 02:27
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-benchmark-3036 branch 2 times, most recently from 65a5007 to 1da7eb6 Compare February 17, 2026 03:39
sumedhsakdeo and others added 13 commits February 16, 2026 21:00
Introduce ScanOrder.TASK (default) and ScanOrder.ARRIVAL to control
batch ordering. TASK materializes each file before yielding; ARRIVAL
yields batches as produced for lower memory usage.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add _bounded_concurrent_batches() with proper lock discipline:
- Queue backpressure caps memory (scan.max-buffered-batches, default 16)
- Semaphore limits concurrent file reads (concurrent_files param)
- Cancel event with timeouts on all blocking ops (no lock over IO)
- Error propagation and early termination support

When streaming=True and concurrent_files > 1, batches are yielded
as they arrive from parallel file reads. File ordering is not
guaranteed (documented).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace shared ExecutorFactory + Semaphore with per-scan
ThreadPoolExecutor(max_workers=concurrent_files) for deterministic
shutdown and simpler concurrency control.

Refactor to_record_batches into helpers:
- _prepare_tasks_and_deletes: resolve delete files
- _iter_batches_streaming: bounded concurrent streaming path
- _iter_batches_materialized: executor.map materialization path
- _apply_limit: unified row limit logic (was duplicated)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tests and docs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Setting `mock.call_count = 0` does not actually reset the mock's
internal call tracking, causing the second assertion to see accumulated
calls from both test phases. Use `reset_mock()` instead.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a parametrized benchmark case for default (executor.map) with
max_workers=4 to compare memory/throughput against unbounded threading.
Add TTFR (time to first record) measurement across all configurations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a "which config should I use?" tip box with recommended starting
points for common use cases, and clarify that batch_size is an advanced
tuning knob.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove @pytest.mark.benchmark so the read throughput tests are included
in the default `make test` filter as parametrize-marked tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…and docs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-benchmark-3036 branch from 1da7eb6 to afb244c Compare February 17, 2026 05:07
@sumedhsakdeo sumedhsakdeo changed the title feat: Add streaming and concurrent file reads to ArrowScan to reduce memory usage and increase throughput feat: Add ScanOrder enum and concurrent file reads to ArrowScan to reduce memory usage and increase throughput Feb 17, 2026
@sumedhsakdeo sumedhsakdeo changed the title feat: Add ScanOrder enum and concurrent file reads to ArrowScan to reduce memory usage and increase throughput feat: Add ScanOrder and concurrent_files to ArrowScan for bounded-memory reads Feb 17, 2026
@sumedhsakdeo sumedhsakdeo requested a review from cbb330 February 17, 2026 05:24
finally:
cancel.set()
# Drain the queue to unblock any workers stuck on put()
while not batch_queue.empty():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could cause a worker to hang when concurrent_files > 1 and max_buffered_batches=1. Here is an example.

Starting state

max_buffered_batches=1, concurrent_files=3. Queue is full (1 item). Workers A, B, and C are all blocked on batch_queue.put().

Timeline

Step Main thread Workers A, B, C
1 cancel.set()
2 get_nowait() → removes 1 item. Queue: 0. Internally notifies Worker A Worker A: woken but hasn't run yet
3 empty()True (queue IS empty because A hasn't put yet). Exits drain loop.
4 executor.__exit__()shutdown(wait=True), joins all threads... Worker A runs, put() completes → Queue: 1. Checks cancel → returns. ✓
5 DEADLOCK — waiting for B and C to finish Workers B, C: still blocked on put(). Queue is full, nobody will ever drain.

Fix
In the worker use put with a timeout so it can check if the thread is canceled periodically.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I did have timeout in the previous version of the code, but it was causing performance regression. Exploring few other alternatives like condition variables, more complex but does not result in the bug.

tasks: list[FileScanTask],
batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
concurrent_files: int,
max_buffered_batches: int = 16,
Copy link

@robreeves robreeves Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be hardcoded. This means the only control a user has over the memory is lowering the batch size or making files with less than 16 batches and setting concurrent_files. It feels like they have to be too in the weeds to tune this.

Instead, WDYT you think of consolidating max_buffered_batches and concurrent_files? Instead have a single parallelism config. It would be used for the max worker count and number of batches. In the end the caller still gets the same number of parallel batches loaded.

One downside is it could result in a large number of files open if they want high parallelism. So a simpler alternative could be to expose max_buffered_batches so a user can set it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - the challenge today is Arrow doesn't support truly vectorized reads that allow concurrent I/O to different sections of the same file, hence we need concurrent_files to drive throughput and max_buffered_batches to control memory as orthogonal concerns.

While the hardcoded limit might lose control users can have, I would lean on going with a reasonable default for this, for now.

In future, if there is a demand, we could expose max_buffered_batches as a parameter to give users direct memory control, as future use cases may require different buffer depths for optimal performance, or use some heuristic of concurrent_files x scale_factor, where scale_factor can be based on memory used and memory available.

If concurrent_files variable name is tying us to Arrow implementation, we can rename it to something more general, concurrent_streams, wdyt?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can leave it as a default for now and iterate if the need comes up. concurrent_streams works. Other ideas--concurrent_readers, concurrent_reader_count

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ARRIVAL is not used should it through an exception if concurrent_files is set? It feels confusing to have it in the method params when it is not always used. Could it be inside a ScanOrder class, where each ScanOrder has a subclass with params specific to it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored it from enum -> class. PTAL.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

sumedhsakdeo and others added 4 commits February 18, 2026 12:34
Replace ScanOrder(str, Enum) with:
- ScanOrder(ABC): Base class for ordering strategies
- TaskOrder(ScanOrder): Default behavior, preserves existing API
- ArrivalOrder(ScanOrder): Encapsulates concurrent_streams and max_buffered_batches

This addresses reviewer feedback about unused parameters and improves type safety.
Parameters are now scoped to their appropriate ordering mode.

Rename concurrent_files → concurrent_streams for clarity.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
Replace ScanOrder.TASK/ARRIVAL with TaskOrder()/ArrivalOrder() instances.
Update concurrent_files → concurrent_streams parameter usage.

All existing test scenarios preserved with new type-safe API.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
Restructure parameterized benchmark tests to use ScanOrder class instances:
- TaskOrder() for default behavior
- ArrivalOrder(concurrent_streams=N) for streaming configurations

Simplifies test parameters by eliminating separate concurrent_files argument.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
Replace ScanOrder enum examples with new class-based API:
- TaskOrder() for default behavior
- ArrivalOrder(concurrent_streams=N) for streaming
- ArrivalOrder(concurrent_streams=N, max_buffered_batches=M) for memory control

Add configuration guidance table and update ordering semantics.
Rename concurrent_files → concurrent_streams throughout examples.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
sumedhsakdeo and others added 7 commits February 18, 2026 17:29
- Remove ABC inheritance from ScanOrder since no abstract methods are defined
- Remove unused enum.Enum import
- Fix B008 error by moving TaskOrder() call from function default to inside function
- Clean up dataclass formatting

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Break long line in _iter_batches_arrival call for better readability
- Fix B008 error by moving TaskOrder() call from function default to inside function
- Sort imports alphabetically

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Update all test function calls to use concurrent_streams parameter
- Fix parameter name mismatch with _bounded_concurrent_batches function signature
- Update variable names and comments to match new parameter name

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Sort imports alphabetically as required by ruff formatting
- No functional changes

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Add batch_size parameter to ArrivalOrder class with comprehensive documentation
- Include memory formula: Peak memory ≈ concurrent_streams × batch_size × max_buffered_batches × (average row size)
- Update default concurrent_streams from 1 to 8 for better performance out-of-the-box
- Remove batch_size parameter from to_arrow_batch_reader() and to_record_batches() methods
- Simplify API by putting batch_size where it has direct memory impact (streaming orders)
- TaskOrder uses PyArrow defaults, ArrivalOrder provides full memory control

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Update benchmark tests to use simplified parameter structure
- Remove separate batch_size parameter from test calls
- Fix concurrent_streams validation error message in unit tests
- Maintain all existing test coverage and functionality

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Update all examples to use new ArrivalOrder(batch_size=X) syntax
- Add comprehensive memory formula with row size calculation
- Remove backward compatibility references (batch_size is new in this PR)
- Include performance characteristics and use case recommendations
- Provide clear guidance on TaskOrder vs ArrivalOrder memory behavior

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

Comments