Stopping Jobs

Introduction

You can save compute resources by stopping the execution of jobs that are no longer needed. If that job is currently running, then the associated background process is terminated. Terminated background process are automatically replaced by new ones.

There are several ways to stop a Job:

Setup

library(jobqueue)
jq <- jobqueue(workers = 1)

Immediate Stop

As soon as a job is created, it can be stopped by either calling <job>$stop() or by assigning to <job>$output. These methods work up until the job state is 'done'. Once the job state is 'done', <job>$output is immutable and calling <job>$stop() will have no effect (since it’s already stopped). Calling <job>$stop() and assigning to <job>$output both have the side effect of setting the job state to 'done'.

Interrupting with <job>$stop()

job <- job_class$new({ Sys.sleep(5); 10 })
job$stop()
job$result
#> <interrupt: job stopped by user>

job <- job_class$new({ Sys.sleep(5); 10 })
job$stop('my reason')
job$result
#> <interrupt: my reason>

class(job$result)
#> [1] "interrupt" "condition"

You can call <job>$stop() from within callback hooks.

hooks <- list(
  'created' = function (job) {
    if (length(job$vars$x) > 100)
      job$stop('Job is too large.')
  }
)

job <- jq$run({ sum(x) }, vars = list(x = 1:200), hooks = hooks)
job$result
#> <interrupt: Job is too large.>

Assigning to <job>$output

<job>$output can only be assigned to once. If the job is not yet done when <job>$output is assigned, then any ongoing processing will be halted.

job <- job_class$new({ Sys.sleep(5); 10 })

job$state
#> [1] "created"

job$output <- "Custom result"

job$state
#> [1] "done"

job$result
#> [1] "Custom result"

If you’ve set <job>$reformat, it will determine <job>$result as usual. See vignette('results').

Replacing and Combining

Sometimes repeat jobs may be sent to the jobqueue, for instance, a user clicking a ‘submit’ button repeatedly. stop_id and copy_id can prevent such requests from overloading the server.

Stop ID

New jobs will replace existing jobs with the same stop_id.

job1 <- jq$run({ Sys.sleep(5); 'A' }, stop_id = 123)
job2 <- jq$run({ 'B' },               stop_id = 123)
job1$result
#> <interrupt: duplicated stop_id>
job2$result
#> [1] "B"

Copy ID

New jobs will mirror the output of existing jobs with the same copy_id.

job1 <- jq$run({ Sys.sleep(5); 'A' }, copy_id = 456)
job2 <- jq$run({ 'B' },               copy_id = 456)
job1$result
#> [1] "A"
job2$result
#> [1] "A"

In jobqueue()

If you set stop_id and/or copy_id in jobqueue(), then they must be functions (or lambda notation functions) that accept a job object as input and returns a value id. If id is NULL, then no comparisons will be done. Otherwise, id can be any data type and will be compared to other ids with identical().

jq <- jobqueue(stop_id = function (job) list(job$vars$f, job$user_id))

job1 <- jq$run({ f(x) }, vars = list(f = sum,  x = 1:10), user_id = 'A')
job2 <- jq$run({ f(x) }, vars = list(f = sum,  x = 1:2),  user_id = 'A')
job3 <- jq$run({ f(x) }, vars = list(f = mean, x = 1:10), user_id = 'A')

job1$result
#> <interrupt: duplicated stop_id>
job2$result
#> [1] 3
job3$result
#> [1] 5.5

Timeouts

job <- job_class$new({ Sys.sleep(5); 'Zzzzz' }, timeout = 0.2)
jq$submit(job)
job$result
#> <interrupt: total runtime exceeded 0.2 seconds>

job <- job_class$new({ 10 }, timeout = list(created = 0.2))
job$result
#> <interrupt: exceeded 0.2 seconds while in "created" state>

Limits (in seconds) can be set on: