This article works through one question: when can a computation run
over data that does not fit in memory, and at what cost. The answer
turns out to be a clean piece of algebra, and vectra
exposes it through two pieces: collect_chunked() for
bounded folds, and offload() for spilling a query to disk
(or, with a key, splitting one into shards). Along the way the same
principle explains both what streams cheaply and what needs a sort.
There are two distinct settings, and conflating them causes most of the confusion about what is possible.
The streaming model assumes one pass over the data, working memory much smaller than the data, and no ability to store or re-read the input. In this setting some exact computations are provably impossible: computing the exact number of distinct values, or the exact median, requires memory proportional to the data. These are information-theoretic lower bounds, and no cleverness removes them. The escape is to approximate (sketches such as HyperLogLog and quantile sketches).
The external-memory model assumes bounded RAM and a disk to spill to, with cost measured in input/output volume. Here the picture changes completely. Any computable reduction over data that fits on disk is feasible; the lower bounds above do not bind, because they assumed the data could not be stored. The exact median is a sort followed by reading the middle element. The exact distinct count is a sort followed by a scan. What varies between reductions is the I/O cost, not whether they can be done. The only hard limit is data that exceeds the disk.
vectra lives in the second model. So the design question
is never “is this fittable” but “which cost tier does it land in,” and
the rest of this article is about reading that tier off the structure of
the reduction.
The cheapest tier is a single pass with a fixed-size accumulator. A reduction belongs here exactly when it is a fold whose accumulator is a bounded monoid:
reduce(data) = combine over chunks of ( accumulate over rows )
with three properties carrying their weight:
combine.
combine(combine(a, b), c) equals
combine(a, combine(b, c)). This is what lets the data be
split into chunks, reduced independently, and merged. Algebraically the
reduction descends from the free monoid (sequences under concatenation):
it does not depend on where the chunk boundaries fall.combine, when
present. The result does not depend on the order of the chunks, so the
reduction descends to the multiset and the pieces may be processed in
any order.collect_chunked() is the fold. The engine pulls one
batch, applies f(acc, chunk), frees the batch, and
continues, so a result far larger than RAM reduces to a small summary in
bounded memory.
f <- tempfile(fileext = ".vtr")
df <- data.frame(g = rep(c("a", "b", "c"), length.out = 6000),
wt = rnorm(6000, 3, 1), hp = rnorm(6000, 150, 40))
df$mpg <- 30 - 5 * df$wt - 0.05 * df$hp + rnorm(6000, 0, 1)
write_vtr(df, f)A row count is the simplest monoid (the accumulator is an integer,
combine is addition):
The normal-equation pieces X'X and X'y form
a monoid too: each is a matrix, and addition is the
combine. Accumulating them in one pass gives an exact
least-squares fit without ever holding the design matrix.
acc <- collect_chunked(
tbl(f) |> select(mpg, wt, hp),
function(acc, chunk) {
X <- cbind(1, chunk$wt, chunk$hp); y <- chunk$mpg
list(XtX = acc$XtX + crossprod(X), Xty = acc$Xty + crossprod(X, y))
},
.init = list(XtX = matrix(0, 3, 3), Xty = matrix(0, 3, 1))
)
drop(solve(acc$XtX, acc$Xty))
#> [1] 30.00280427 -5.00469529 -0.04991014
coef(lm(mpg ~ wt + hp, data = df))
#> (Intercept) wt hp
#> 30.00280427 -5.00469529 -0.04991014The two agree because summing per-batch cross-products is
associative: the fit is a monoid homomorphism from the data to the
3 x 3 accumulator.
A generalized linear model does not fit in one pass. Iteratively
reweighted least squares reweights every row by the current coefficient
estimate, so the data must be read once per iteration.
biglm::bigglm() does exactly this, and
chunk_feeder() hands it a resettable stream. The catch is
that re-reading a query re-runs the whole pipeline (scan, joins, mutate)
on every pass.
offload() removes that cost. It materializes a query
once to a .vtr file and returns a node that streams from
the file. The returned node holds the same rows as the input; the only
change is where the bytes live and that replaying is now a disk scan. In
categorical terms it is an endofunctor on query streams that is the
identity on values and changes only the cost profile:
s <- offload(tbl(f) |> select(mpg, wt, hp))
identical(collect(s), collect(tbl(f) |> select(mpg, wt, hp)))
#> [1] TRUE
s
#> vectra query node
#> Columns (3):
#> mpg <double>
#> wt <double>
#> hp <double>
#> <offload grade: replay cache>
#> passes over data : 1 to spill, then O(1) re-reads
#> peak memory : O(one batch)
#> I/O cost : O(n) to write, O(n) per replay
#> note : temp spill, removed when the node is garbage-collectedPrinting an offloaded node shows its cost grade, the honest label for
what the stream costs. A plain node reports a streaming scan; an
offloaded node reports the replay cache. explain() shows
the same grade alongside the plan.
Feeding the replay cache to a fit is the point.
chunk_feeder() accepts an offloaded node directly, so each
reweighted pass reads the prepared columns from disk rather than
rebuilding them:
s <- offload(tbl(f) |> select(mpg, wt, hp))
fit <- biglm::bigglm(mpg ~ wt + hp, data = chunk_feeder(s),
family = gaussian())
coef(fit)
#> (Intercept) wt hp
#> 30.00280427 -5.00469529 -0.04991014Because the spill holds exactly the modelling columns, replay does no further work. Bake the selects and mutates into the query you offload, and the per-pass cost is one scan of the prepared table.
The one-pass tier covers reductions that decompose over independent
rows. Many models do not: a mixed model couples rows within a group, a
Cox model couples each event with everyone still at risk, a model with a
scale() or spline term couples through a quantity estimated
from all the data. The move that rescues them is to reorder so the
coupling becomes local, then handle each local piece on its own.
Two orderings cover most cases. Sorting by a key makes a cumulative coupling into a running aggregate; partitioning by a key makes a within-group coupling into independent blocks.
arrange() is already an external merge sort: it spills
sorted runs to .vtr and merges them with a bounded-memory
heap, so sorting larger-than-RAM data is built in.
tbl(f) |> arrange(desc(mpg)) |> head(3) |> collect()
#> g wt hp mpg
#> 1 b 0.0602263 76.83606 27.82284
#> 2 a -0.2021101 99.33357 26.93803
#> 3 a 0.1110793 82.67477 26.52015offload(x, by = ...) splits a query into disjoint shards
on disk in a single streaming pass: one per key value for a discrete
key, or per value range (method = "range") or hash bucket
(method = "hash") for a continuous or high-cardinality key.
The union of the shards reproduces the input, and row totals are
checked.
p <- offload(tbl(f), by = "g")
p
#> <vectra partition: 3 shards by 'g'>
#> a 2000 rows
#> b 2000 rows
#> c 2000 rows
#> <offload grade: partition by 'g' (level)>
#> passes over data : 1 spill + 1 routing pass
#> peak memory : O(routing budget = 1e+06 rows), or O(one shard) when collected
#> I/O cost : O(n)
#> note : localizes coupling so each shard fits in RAMEach shard is an ordinary node that fits in memory on its own, so a
per-group model is a function run over the shards.
group_map() reads each shard into a data.frame, hands it to
the function with its key, and returns the results keyed by shard:
fits <- group_map(p, function(d, g) coef(lm(mpg ~ wt + hp, data = d)))
fits
#> $a
#> (Intercept) wt hp
#> 30.09466375 -5.05029777 -0.04956424
#>
#> $b
#> (Intercept) wt hp
#> 29.93969075 -4.98416691 -0.04993128
#>
#> $c
#> (Intercept) wt hp
#> 29.98165286 -4.97939793 -0.05028584When the per-shard result is itself a data.frame,
group_modify() binds the pieces into one table and restores
the key as a column:
group_modify(p, function(d, g)
data.frame(n = nrow(d), mean_mpg = mean(d$mpg)))
#> g n mean_mpg
#> 1 a 2000 7.479311
#> 2 b 2000 7.564069
#> 3 c 2000 7.604408Partitioning also lets a single global reduction run as independent
per-shard folds whose results merge. This is where the algebra returns:
merging partial accumulators needs combine, and
collect_chunked() takes it as an argument. Supplying
combine declares the reduction a monoid, with
.init as its identity.
accumulate <- function(acc, chunk) {
X <- cbind(1, chunk$wt, chunk$hp); y <- chunk$mpg
list(XtX = acc$XtX + crossprod(X), Xty = acc$Xty + crossprod(X, y))
}
combine <- function(a, b) list(XtX = a$XtX + b$XtX, Xty = a$Xty + b$Xty)
acc <- collect_chunked(
offload(tbl(f) |> select(g, mpg, wt, hp), by = "g"),
accumulate,
.init = list(XtX = matrix(0, 3, 3), Xty = matrix(0, 3, 1)),
combine = combine, commutative = TRUE
)
drop(solve(acc$XtX, acc$Xty))
#> [1] 30.00280427 -5.00469529 -0.04991014
coef(lm(mpg ~ wt + hp, data = df))
#> (Intercept) wt hp
#> 30.00280427 -5.00469529 -0.04991014The partitioned reduce reproduces the global fit, and that is the
correctness law made concrete. Offloading preserves the answer exactly
when the reduction factors through the quotient the spill plan quotients
by. Associativity of combine lets the chunk boundaries move
(the reduction descends from the free monoid). Commutativity lets the
shards merge in any order (it descends to the multiset). Here
combine is matrix addition, which is both, so the partition
order is irrelevant and commutative = TRUE records
that.
The declared algebra also fixes the cheapest legal plan:
| declared | what it permits | cheapest plan |
|---|---|---|
semigroup (associative combine) |
moving chunk boundaries | sequential spill and merge |
monoid (identity .init too) |
empty and parallel merges | partition, k-way merge |
| commutative monoid | reordering shards | shuffle, no stable sort |
Reading the structure of a reduction gives its tier directly:
| tier | I/O | peak memory | examples |
|---|---|---|---|
| monoid fold | O(n), one pass |
O(1) |
sum, count, mean, X'X |
| sort or partition localizable | O(n log n) |
O(1) |
median, distinct, per-group fits, Cox |
| all-to-all, no structure | O(n^2) or more |
varies | dense kernel or Gaussian-process Gram |
Every tier is feasible with a disk; the tier sets the cost. The first
is collect_chunked() on a plain node. The second is
arrange() or offload(x, by = ...) ahead of a
fold or a per-shard fit. The third still runs, at quadratic I/O, and is
usually attacked with approximation (inducing points, low rank).
It is tempting to reach for heavier type theory here, since the
correctness condition is a statement about quotients. The lighter
framing is the one that fits the engine. The equality that matters,
collect(offload(x)) equals collect(x), is a
plain proposition with no higher structure, and nothing about it is
checkable inside C or R at compile time. So the contract is a declared
monoid plus property tests: the test suite asserts the identity on
values, replay equivalence, and that a partitioned reduce equals the
single-pass fold. Those tests are the working proof of the quotient
law.
collect_chunked().biglm::bigglm()):
offload() the prepared query once and feed it to
chunk_feeder().offload(x, by = ...), then
group_map() for a fit per shard,
group_modify() to recombine per-shard tables, or a monoidal
reduce with collect_chunked(..., combine = ).arrange(), which sorts
externally.