---
title: "Offloading: streaming, monoids, and out-of-core fits"
author: "Gilles Colling"
date: "`r Sys.Date()`"
output: rmarkdown::html_vignette
vignette: >
  %\VignetteIndexEntry{Offloading: streaming, monoids, and out-of-core fits}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---

```{r setup, include = FALSE}
knitr::opts_chunk$set(collapse = TRUE, comment = "#>")
set.seed(1)
has_biglm <- requireNamespace("biglm", quietly = TRUE)
library(vectra)
```

This article works through one question: when can a computation run over data
that does not fit in memory, and at what cost. The answer turns out to be a
clean piece of algebra, and `vectra` exposes it through two pieces:
`collect_chunked()` for bounded folds, and `offload()` for spilling a query to
disk (or, with a key, splitting one into shards). Along the way the same
principle explains both what streams cheaply and what needs a sort.

## Two cost models

There are two distinct settings, and conflating them causes most of the
confusion about what is possible.

The **streaming model** assumes one pass over the data, working memory much
smaller than the data, and no ability to store or re-read the input. In this
setting some exact computations are provably impossible: computing the exact
number of distinct values, or the exact median, requires memory proportional to
the data. These are information-theoretic lower bounds, and no cleverness
removes them. The escape is to approximate (sketches such as HyperLogLog and
quantile sketches).

The **external-memory model** assumes bounded RAM and a disk to spill to, with
cost measured in input/output volume. Here the picture changes completely. Any
computable reduction over data that fits on disk is feasible; the lower bounds
above do not bind, because they assumed the data could not be stored. The exact
median is a sort followed by reading the middle element. The exact distinct
count is a sort followed by a scan. What varies between reductions is the I/O
cost, not whether they can be done. The only hard limit is data that exceeds the
disk.

`vectra` lives in the second model. So the design question is never "is this
fittable" but "which cost tier does it land in," and the rest of this article is
about reading that tier off the structure of the reduction.

## When a reduction streams in one pass

The cheapest tier is a single pass with a fixed-size accumulator. A reduction
belongs here exactly when it is a *fold whose accumulator is a bounded monoid*:

```
reduce(data) = combine over chunks of ( accumulate over rows )
```

with three properties carrying their weight:

* **Associativity** of `combine`. `combine(combine(a, b), c)` equals
  `combine(a, combine(b, c))`. This is what lets the data be split into chunks,
  reduced independently, and merged. Algebraically the reduction descends from
  the free monoid (sequences under concatenation): it does not depend on where
  the chunk boundaries fall.
* **Commutativity** of `combine`, when present. The result does not depend on
  the order of the chunks, so the reduction descends to the multiset and the
  pieces may be processed in any order.
* **Bounded carrier**. The accumulator size does not grow with the number of
  rows, so peak memory is one batch plus the accumulator.

`collect_chunked()` is the fold. The engine pulls one batch, applies
`f(acc, chunk)`, frees the batch, and continues, so a result far larger than RAM
reduces to a small summary in bounded memory.

```{r fold-setup}
f <- tempfile(fileext = ".vtr")
df <- data.frame(g = rep(c("a", "b", "c"), length.out = 6000),
                 wt = rnorm(6000, 3, 1), hp = rnorm(6000, 150, 40))
df$mpg <- 30 - 5 * df$wt - 0.05 * df$hp + rnorm(6000, 0, 1)
write_vtr(df, f)
```

A row count is the simplest monoid (the accumulator is an integer, `combine` is
addition):

```{r fold-count}
collect_chunked(tbl(f), function(acc, chunk) acc + nrow(chunk), .init = 0L)
```

The normal-equation pieces `X'X` and `X'y` form a monoid too: each is a matrix,
and addition is the `combine`. Accumulating them in one pass gives an exact
least-squares fit without ever holding the design matrix.

```{r fold-ols}
acc <- collect_chunked(
  tbl(f) |> select(mpg, wt, hp),
  function(acc, chunk) {
    X <- cbind(1, chunk$wt, chunk$hp); y <- chunk$mpg
    list(XtX = acc$XtX + crossprod(X), Xty = acc$Xty + crossprod(X, y))
  },
  .init = list(XtX = matrix(0, 3, 3), Xty = matrix(0, 3, 1))
)
drop(solve(acc$XtX, acc$Xty))
coef(lm(mpg ~ wt + hp, data = df))
```

The two agree because summing per-batch cross-products is associative: the fit
is a monoid homomorphism from the data to the `3 x 3` accumulator.

## The offload functor

A generalized linear model does not fit in one pass. Iteratively reweighted
least squares reweights every row by the current coefficient estimate, so the
data must be read once per iteration. `biglm::bigglm()` does exactly this, and
`chunk_feeder()` hands it a resettable stream. The catch is that re-reading a
query re-runs the whole pipeline (scan, joins, mutate) on every pass.

`offload()` removes that cost. It materializes a query once to a `.vtr` file and
returns a node that streams from the file. The returned node holds the same rows
as the input; the only change is where the bytes live and that replaying is now
a disk scan. In categorical terms it is an endofunctor on query streams that is
the identity on values and changes only the cost profile:

```{r offload-identity}
s <- offload(tbl(f) |> select(mpg, wt, hp))

identical(collect(s), collect(tbl(f) |> select(mpg, wt, hp)))
s
```

Printing an offloaded node shows its cost grade, the honest label for what the
stream costs. A plain node reports a streaming scan; an offloaded node reports
the replay cache. `explain()` shows the same grade alongside the plan.

Feeding the replay cache to a fit is the point. `chunk_feeder()` accepts an
offloaded node directly, so each reweighted pass reads the prepared columns from
disk rather than rebuilding them:

```{r offload-bigglm, eval = has_biglm}
s <- offload(tbl(f) |> select(mpg, wt, hp))
fit <- biglm::bigglm(mpg ~ wt + hp, data = chunk_feeder(s),
                     family = gaussian())
coef(fit)
```

```{r offload-bigglm-note, echo = FALSE, results = "asis", eval = !has_biglm}
cat("> The out-of-core GLM example needs the **biglm** package, which is not",
    "installed, so it was not run here.\n")
```

Because the spill holds exactly the modelling columns, replay does no further
work. Bake the selects and mutates into the query you offload, and the per-pass
cost is one scan of the prepared table.

## Localizing coupling by sorting and partitioning

The one-pass tier covers reductions that decompose over independent rows. Many
models do not: a mixed model couples rows within a group, a Cox model couples
each event with everyone still at risk, a model with a `scale()` or spline term
couples through a quantity estimated from all the data. The move that rescues
them is to reorder so the coupling becomes local, then handle each local piece
on its own.

Two orderings cover most cases. Sorting by a key makes a cumulative coupling
into a running aggregate; partitioning by a key makes a within-group coupling
into independent blocks.

`arrange()` is already an external merge sort: it spills sorted runs to `.vtr`
and merges them with a bounded-memory heap, so sorting larger-than-RAM data is
built in.

```{r arrange}
tbl(f) |> arrange(desc(mpg)) |> head(3) |> collect()
```

`offload(x, by = ...)` splits a query into disjoint shards on disk in a single
streaming pass: one per key value for a discrete key, or per value range
(`method = "range"`) or hash bucket (`method = "hash"`) for a continuous or
high-cardinality key. The union of the shards reproduces the input, and row
totals are checked.

```{r partition}
p <- offload(tbl(f), by = "g")
p
```

Each shard is an ordinary node that fits in memory on its own, so a per-group
model is a function run over the shards. `group_map()` reads each shard into a
data.frame, hands it to the function with its key, and returns the results
keyed by shard:

```{r partition-fits}
fits <- group_map(p, function(d, g) coef(lm(mpg ~ wt + hp, data = d)))
fits
```

When the per-shard result is itself a data.frame, `group_modify()` binds the
pieces into one table and restores the key as a column:

```{r partition-modify}
group_modify(p, function(d, g)
  data.frame(n = nrow(d), mean_mpg = mean(d$mpg)))
```

### The monoidal reduce, and the law it rests on

Partitioning also lets a single global reduction run as independent per-shard
folds whose results merge. This is where the algebra returns: merging partial
accumulators needs `combine`, and `collect_chunked()` takes it as an argument.
Supplying `combine` declares the reduction a monoid, with `.init` as its
identity.

```{r partition-reduce}
accumulate <- function(acc, chunk) {
  X <- cbind(1, chunk$wt, chunk$hp); y <- chunk$mpg
  list(XtX = acc$XtX + crossprod(X), Xty = acc$Xty + crossprod(X, y))
}
combine <- function(a, b) list(XtX = a$XtX + b$XtX, Xty = a$Xty + b$Xty)

acc <- collect_chunked(
  offload(tbl(f) |> select(g, mpg, wt, hp), by = "g"),
  accumulate,
  .init = list(XtX = matrix(0, 3, 3), Xty = matrix(0, 3, 1)),
  combine = combine, commutative = TRUE
)
drop(solve(acc$XtX, acc$Xty))
coef(lm(mpg ~ wt + hp, data = df))
```

The partitioned reduce reproduces the global fit, and that is the correctness
law made concrete. Offloading preserves the answer exactly when the reduction
factors through the quotient the spill plan quotients by. Associativity of
`combine` lets the chunk boundaries move (the reduction descends from the free
monoid). Commutativity lets the shards merge in any order (it descends to the
multiset). Here `combine` is matrix addition, which is both, so the partition
order is irrelevant and `commutative = TRUE` records that.

The declared algebra also fixes the cheapest legal plan:

| declared | what it permits | cheapest plan |
|---|---|---|
| semigroup (associative `combine`) | moving chunk boundaries | sequential spill and merge |
| monoid (identity `.init` too) | empty and parallel merges | partition, k-way merge |
| commutative monoid | reordering shards | shuffle, no stable sort |

## The cost tiers

Reading the structure of a reduction gives its tier directly:

| tier | I/O | peak memory | examples |
|---|---|---|---|
| monoid fold | `O(n)`, one pass | `O(1)` | sum, count, mean, `X'X` |
| sort or partition localizable | `O(n log n)` | `O(1)` | median, distinct, per-group fits, Cox |
| all-to-all, no structure | `O(n^2)` or more | varies | dense kernel or Gaussian-process Gram |

Every tier is feasible with a disk; the tier sets the cost. The first is
`collect_chunked()` on a plain node. The second is `arrange()` or
`offload(x, by = ...)` ahead of a fold or a per-shard fit. The third still runs,
at quadratic I/O, and is usually attacked with approximation (inducing points,
low rank).

## A note on the abstraction

It is tempting to reach for heavier type theory here, since the correctness
condition is a statement about quotients. The lighter framing is the one that
fits the engine. The equality that matters, `collect(offload(x))` equals
`collect(x)`, is a plain proposition with no higher structure, and nothing about
it is checkable inside C or R at compile time. So the contract is a declared
monoid plus property tests: the test suite asserts the identity on values,
replay equivalence, and that a partitioned reduce equals the single-pass fold.
Those tests are the working proof of the quotient law.

## Choosing a path

* A bounded summary of a large result (count, mean, sufficient statistics): fold
  it with `collect_chunked()`.
* An iterative fit (a GLM through `biglm::bigglm()`): `offload()` the prepared
  query once and feed it to `chunk_feeder()`.
* A model that couples within groups, or a global reduce you want to merge from
  pieces: `offload(x, by = ...)`, then `group_map()` for a fit per shard,
  `group_modify()` to recombine per-shard tables, or a monoidal reduce with
  `collect_chunked(..., combine = )`.
* Data that must be in key order: `arrange()`, which sorts externally.

```{r cleanup, include = FALSE}
unlink(f)
```
