---
title: "Advanced Chunking and Backends in delarr"
output: rmarkdown::html_vignette
vignette: >
  %\VignetteIndexEntry{Advanced Chunking and Backends in delarr}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---

```{r setup, include = FALSE}
knitr::opts_chunk$set(
  collapse = TRUE,
  comment = "#>",
  message = FALSE,
  warning = FALSE
)
has_shard <- requireNamespace("shard", quietly = TRUE)
has_hdf5 <- requireNamespace("hdf5r", quietly = TRUE)
```

```{r load-delarr}
library(delarr)
```

## When should you reach for the advanced tools?

The basic workflow in `vignette("delarr-getting-started", package = "delarr")`
is enough when you only need a lazy pipeline and a final `collect()`. This
vignette is for the next step: understanding chunk plans, running several
summaries in one pass, streaming to backends, and checking whether an optional
parallel path behaves the way you expect.

All examples use one small dense matrix and validate the key claims in code.

```{r make-example-matrix}
set.seed(11)
mat <- matrix(
  rnorm(96),
  nrow = 12,
  ncol = 8,
  dimnames = list(paste0("sample_", 1:12), paste0("feature_", 1:8))
)
```

## What will the execution plan do?

`explain()` shows the effective output shape, the chunk axis, the chosen chunk
size, and the recorded operations after optimization.

```{r inspect-plan}
pipe <- delarr(mat)[, -1] |>
  d_map(~ .x^2 + 1) |>
  d_where(function(x) x > 1.25, fill = 0)

plan <- explain(pipe, chunk_size = 3L)
plan
```

```{r check-plan, include = FALSE}
stopifnot(
  identical(plan$output_dim, dim(pipe)),
  identical(plan$chunk_margin, "cols"),
  identical(plan$chunk_count, ceiling(ncol(pipe) / 3))
)
```

## How do you let `delarr` choose a chunk size?

If you do not want to hard-code `chunk_size`, you can pass a memory budget with
`target_bytes`.

```{r adaptive-collect}
adaptive_plan <- explain(pipe, target_bytes = 256)
adaptive_plan

adaptive_result <- collect(pipe, target_bytes = 256)
dim(adaptive_result)
```

```{r check-adaptive-collect, include = FALSE}
fixed_result <- collect(pipe, chunk_size = 3L)
stopifnot(
  all(is.finite(adaptive_result)),
  isTRUE(all.equal(adaptive_result, fixed_result))
)
```

## How do you compute several summaries in one pass?

`d_reduce_many()` runs several built-in reducers together and returns a matrix
when the outputs have a common length.

```{r multi-reduce}
row_summary <- d_reduce_many(
  delarr(mat),
  fns = list(sum = sum, mean = mean, max = max),
  dim = "rows",
  chunk_size = 3L
)

row_summary[1:4, , drop = FALSE]
```

```{r check-multi-reduce, include = FALSE}
stopifnot(
  is.matrix(row_summary),
  isTRUE(all.equal(row_summary[, "sum"], rowSums(mat))),
  isTRUE(all.equal(row_summary[, "mean"], rowMeans(mat))),
  isTRUE(all.equal(row_summary[, "max"], apply(mat, 1L, max)))
)
```

## How do you work block-by-block?

`block_apply()` is useful when you want chunk-local summaries or diagnostics
without materializing the whole array.

```{r block-apply}
col_blocks <- block_apply(
  delarr(mat),
  margin = "cols",
  size = 3L,
  fn = function(block) colMeans(block)
)

block_means <- unlist(col_blocks, use.names = FALSE)
block_means
```

```{r check-block-apply, include = FALSE}
stopifnot(
  all(is.finite(block_means)),
  isTRUE(all.equal(block_means, unname(colMeans(mat))))
)
```

## How do delayed matrix products fit into a pipeline?

`d_matmul()` returns another `delarr`, so you can materialize only the block you
need from a larger product.

```{r delayed-matmul}
rhs <- matrix(rnorm(30), nrow = 6, ncol = 5)
product_block <- d_matmul(delarr(mat[, 1:6, drop = FALSE]), delarr(rhs))[1:4, 1:3] |>
  collect(chunk_size = 2L)

product_block
```

```{r check-delayed-matmul, include = FALSE}
expected_block <- (mat[, 1:6, drop = FALSE] %*% rhs)[1:4, 1:3, drop = FALSE]
stopifnot(
  all(is.finite(product_block)),
  isTRUE(all.equal(product_block, expected_block))
)
```

## How do you stream a transformed matrix to disk?

The writer interface is useful when the result is still large enough that you
do not want to hold it in memory. The HDF5 backend is optional; the chunks below
run only when the `hdf5r` package is installed.

```{r prepare-scaled-hdf5, include = FALSE, eval = has_hdf5}
tf_in <- tempfile(fileext = ".h5")
tf_out <- tempfile(fileext = ".h5")

write_hdf5(mat, tf_in, "X")
```

```{r stream-scaled-hdf5, eval = has_hdf5}
X <- delarr_hdf5(tf_in, "X")
scaled <- X |> d_scale(dim = "cols", center = TRUE, scale = TRUE)
writer <- hdf5_writer(tf_out, "X_scaled", ncol = ncol(X), chunk = c(6L, 4L))

collect(scaled, into = writer, chunk_size = 4L)
```

```{r inspect-scaled-hdf5, eval = has_hdf5}
disk_result <- read_hdf5(tf_out, "X_scaled")
rbind(
  mean = round(colMeans(disk_result), 6),
  sd = round(apply(disk_result, 2L, stats::sd), 6)
)
```

```{r check-scaled-hdf5, include = FALSE, eval = has_hdf5}
centered <- sweep(mat, 2L, colMeans(mat), "-")
reference <- sweep(centered, 2L, apply(mat, 2L, stats::sd), "/")
stopifnot(
  all(is.finite(disk_result)),
  isTRUE(all.equal(unname(disk_result), unname(reference), tolerance = 1e-8)),
  all(abs(colMeans(disk_result)) < 1e-8),
  all(abs(apply(disk_result, 2L, stats::sd) - 1) < 1e-8)
)
unlink(c(tf_in, tf_out))
```

## How do you use shared-memory workers?

If you install the optional `shard` package, `collect_shard()` can evaluate a
supported pipeline in worker processes while keeping the underlying matrix in
shared memory.

```{r shard-collect, eval = has_shard}
shard_result <- delarr_shard(mat) |>
  d_map(~ .x * 2) |>
  d_reduce(sum, dim = "rows") |>
  collect_shard(workers = 2L, chunk_size = 3L)

head(shard_result)
```

```{r check-shard-collect, include = FALSE, eval = has_shard}
stopifnot(
  all(is.finite(shard_result)),
  isTRUE(all.equal(shard_result, rowSums(mat * 2)))
)
```

## How do you profile a candidate pipeline?

`profile_collect()` repeats `collect()` and records elapsed time plus the size
of the realized output.

```{r profile-pipeline}
profile <- profile_collect(pipe, reps = 2L, chunk_size = 3L)
profile
```

```{r check-profile-pipeline, include = FALSE}
stopifnot(
  identical(profile$reps, 2L),
  all(is.finite(profile$elapsed)),
  profile$min_sec >= 0
)
```

## Where should you go after this?

Return to `vignette("delarr-getting-started", package = "delarr")` for the core
lazy workflow, then use `explain()`, `block_apply()`, `d_reduce_many()`, and
`collect_shard()` as you tune real pipelines for storage layout, chunking, and
execution strategy.
