Tasks

What are tasks, and how do they work in Sykdomspulsen Core?

true
2021-05-26

Introduction

A task is the basic operational unit of Sykdomspulsen Core. It is based on plnr.

In short, you can think of a Sykdomspulsen Core task as multiple plnr plans plus Sykdomspulsen Core db schemas.

Definitions

Object Description
argset A named list containing arguments.
plnr analysis These are the fundamental units that are scheduled in plnr:
  • 1 argset
  • 1 function that takes two (or more) arguments:
    • data (named list)
    • argset (named list)
    • … (optional arguments)
data_selector_fn A function that takes two arguments:
  • argset (named list)
  • schema (named list)
This function provides a named list to be used as the data argument to action_fn
action_fn A function that takes three arguments:
  • data (named list, returned from data_selector_fn)
  • argset (named list)
  • schema (named list)
This is the thing that ‘does stuff’ in Sykdomspulsen Core.
sc analysis A sc analysis is essentially a plnr analysis with database schemas:
  • 1 argset
  • 1 action_fn
plan
  • 1 data-pull (using data_selector_fn)
  • 1 list of sc analyses
task This is is the unit that Airflow schedules.
  • 1 list of plans
We sometimes run the list of plans in parallel.

General tasks

A general task showing the many options of a task.

Figure 1: A general task showing the many options of a task.

Figure 1 shows us the full potential of a task.

Data can be read from any sources, then within a plan the data will be extracted once by data_selector_fn (i.e. “one data-pull”). The data will then be provided to each analysis, which will run action_fn on:

The action_fn can then:

Typically only a subset of this would be done in a single task.

Plan-heavy or analysis-heavy tasks?

A plan-heavy task is one that has many plans and a few analyses per plan.

An analysis-heavy task is one that has few plans and many analyses per plan.

In general, a data-pull is slow and wastes time. This means that it is preferable to reduce the number of data-pulls performed by having each data-pull extract larger quantities of data. The analysis can then subset the data as required (identifed via argsets). i.e. If possible, an analysis-heavy task is preferable because it will be faster (at the cost of needing more RAM).

Obviously, if a plan’s data-pull is larger, it will use more RAM. If you need to conserve RAM, then you should use a plan-heavy approach.

Figure 1 shows only 2 location based analyses, but in reality there are 356 municipalities in Norway in 2021. If figure 1 had 2 plans (1 for 2021 data, 1 for 2020 data) and 356 analyses for each plan (1 for each location_code) then we would be taking an analysis-heavy approach.

Putting it together

A typical file setup for an implementation of Sykdomspulsen Core. `plan_argset_fn` is rarely used, and is therefore shown as blacked out in the most of the tasks.

Figure 2: A typical file setup for an implementation of Sykdomspulsen Core. plan_argset_fn is rarely used, and is therefore shown as blacked out in the most of the tasks.

Figure 2 shows a typical implementation of Sykdomspulsen Core.

config_db.r contains all of the Sykdomspulsen Core db schemas definitions. i.e. A long list of sc::add_schema_v8 commands.

config_tasks.r contains all of the task definitions. i.e. A long list of sc::add_task_from_config_v8 commands.

Then we have a one file for each task that contains the action_fn, data_selector_fn and other functions that are relevant to the task at hand.

Weather example

We will now go through an example of how a person would design and implement tasks relating to weather

db schema

As documented in more detail here, we create a db schema that fits our needs (recording weather data).

sc::add_schema_v8(
  name_access = c("anon"),
  name_grouping = "example_weather",
  name_variant = NULL,
  db_configs = sc::config$db_configs,
  field_types =  c(
    "granularity_time" = "TEXT",
    "granularity_geo" = "TEXT",
    "country_iso3" = "TEXT",
    "location_code" = "TEXT",
    "border" = "INTEGER",
    "age" = "TEXT",
    "sex" = "TEXT",
    
    "date" = "DATE",
    
    "isoyear" = "INTEGER",
    "isoweek" = "INTEGER",
    "isoyearweek" = "TEXT",
    "season" = "TEXT",
    "seasonweek" = "DOUBLE",
    
    "calyear" = "INTEGER",
    "calmonth" = "INTEGER",
    "calyearmonth" = "TEXT",

    "tg" = "DOUBLE",
    "tx" = "DOUBLE",
    "tn" = "DOUBLE",
    "rr" = "DOUBLE"
  ),
  keys = c(
    "granularity_time",
    "location_code",
    "date",
    "age",
    "sex"
  ),
  censors = list(
    anon = list(
      
    )
  ),
  validator_field_types = sc::validator_field_types_sykdomspulsen,
  validator_field_contents = sc::validator_field_contents_sykdomspulsen,
  info = "This db table is used for..."
)

task_from_config_v8

To “register” our task, we use the RStudio addin task_from_config.

# tm_run_task("example_weather_import_data_from_api")
sc::add_task_from_config_v8(
  name_grouping = "example_weather",
  name_action = "import_data_from_api",
  name_variant = NULL,
  cores = 1,
  plan_analysis_fn_name = NULL, # "PACKAGE::TASK_NAME_plan_analysis"
  for_each_plan = plnr::expand_list(
    location_code = "county03" # fhidata::norway_locations_names()[granularity_geo %in% c("county")]$location_code
  ),
  for_each_analysis = NULL,
  universal_argset = NULL,
  upsert_at_end_of_each_plan = FALSE,
  insert_at_end_of_each_plan = FALSE,
  action_fn_name = "example_weather_import_data_from_api_action",
  data_selector_fn_name = "example_weather_import_data_from_api_data_selector",
  schema = list(
    # input

    # output
    "anon_example_weather" = sc::config$schemas$anon_example_weather
  ),
  info = "This task does..."
)

There are a number of important things in this code that need highlighting.

for_each_plan

for_each_plan expects a list. Each component of the list will correspond to a plan, with the values added to the argset of all the analyses inside the plan.

For example, the following code would give 4 plans, with 1 analysis per each plan, with each analysis containing argset$var_1 and argset$var_2 as appropriate.

for_each_plan <- list()
for_each_plan[[1]] <- list(
  var_1 = 1,
  var_2 = "a"
)
for_each_plan[[2]] <- list(
  var_1 = 2,
  var_2 = "b"
)
for_each_plan[[3]] <- list(
  var_1 = 1,
  var_2 = "a"
)
for_each_plan[[4]] <- list(
  var_1 = 2,
  var_2 = "b"
)

You always need at least 1 plan. The most simple plan possible is:

plnr::expand_list(
  x = 1
)
[[1]]
[[1]]$x
[1] 1

plnr::expand_list

plnr::expand_list is esentially the same as expand.grid, except that its return values are lists instead of data.frame.

The code above could be simplified as follows.

for_each_plan <- plnr::expand_list(
  var_1 = c(1,2),
  var_2 = c("a", "b")
)
for_each_plan
[[1]]
[[1]]$var_1
[1] 1

[[1]]$var_2
[1] "a"


[[2]]
[[2]]$var_1
[1] 2

[[2]]$var_2
[1] "a"


[[3]]
[[3]]$var_1
[1] 1

[[3]]$var_2
[1] "b"


[[4]]
[[4]]$var_1
[1] 2

[[4]]$var_2
[1] "b"

for_each_analysis

for_each_plan expects a list, which will generate length(for_each_plan) plans.

for_each_analysis is the same, except it will generate analyses within each of the plans.

universal_argset

A named list that will add the values to the argset of all the analyses.

upsert_at_end_of_each_plan

If TRUE and schema contains a schema called output, then the returned values of action_fn will be stored and upserted to schema$output at the end of each plan.

If you choose to upsert/insert manually from within action_fn, you can only do so at the end of each analysis.

insert_at_end_of_each_plan

If TRUE and schema contains a schema called output, then the returned values of action_fn will be stored and inserted to schema$output at the end of each plan.

If you choose to upsert/insert manually from within action_fn, you can only do so at the end of each analysis.

action_fn_name

A character string of the action_fn, preferably including the package name.

data_selector_fn_name

A character string of the data_selector_fn, preferably including the package name.

schema

A named list containing the schemas used in this task.

data_selector_fn

Use the addins dropdown to easily add in boilerplate code.

The data_selector_fn is used to extract the data for each plan.

The lines inside if(plnr::is_run_directly()){ are used to help developers. You can run the code manually/interactively to “load” the values of argset and schema.

index_plan <- 1

argset <- sc::tm_get_argset("example_weather_import_data_from_api", index_plan = index_plan)
schema <- sc::tm_get_schema("example_weather_import_data_from_api")

print(argset)
$`**universal**`
[1] "*"

$`**plan**`
[1] "*"

$location_code
[1] "county03"

$`**analysis**`
[1] "*"

$`**automatic**`
[1] "*"

$index
[1] 1

$today
[1] "2021-09-02"

$yesterday
[1] "2021-09-01"

$first_analysis
[1] TRUE

$first_argset
[1] TRUE

$last_analysis
[1] TRUE

$last_argset
[1] TRUE
print(names(schema))
[1] "anon_example_weather"
# **** data_selector **** ----
#' example_weather_import_data_from_api (data selector)
#' @param argset Argset
#' @param schema DB Schema
#' @export
example_weather_import_data_from_api_data_selector = function(argset, schema){
  if(plnr::is_run_directly()){
    # sc::tm_get_plans_argsets_as_dt("example_weather_import_data_from_api")

    index_plan <- 1

    argset <- sc::tm_get_argset("example_weather_import_data_from_api", index_plan = index_plan)
    schema <- sc::tm_get_schema("example_weather_import_data_from_api")
  }

  # find the mid lat/long for the specified location_code
  gps <- fhimaps::norway_nuts3_map_b2020_default_dt[location_code == argset$location_code,.(
    lat = mean(lat),
    long = mean(long)
  )]
  
  # download the forecast for the specified location_code
  d <- httr::GET(glue::glue("https://api.met.no/weatherapi/locationforecast/2.0/classic?lat={gps$lat}&lon={gps$long}"), httr::content_type_xml())
  d <- xml2::read_xml(d$content)

  # The variable returned must be a named list
  retval <- list(
    "data" = d
  )
  retval
}

action_fn

The lines inside if(plnr::is_run_directly()){ are used to help developers. You can run the code manually/interactively to “load” the values of argset and schema.

index_plan <- 1
index_analysis <- 1

data <- sc::tm_get_data("example_weather_import_data_from_api", index_plan = index_plan)
argset <- sc::tm_get_argset("example_weather_import_data_from_api", index_plan = index_plan, index_analysis = index_analysis)
schema <- sc::tm_get_schema("example_weather_import_data_from_api")

print(data)
$data
{xml_document}
<weatherdata noNamespaceSchemaLocation="https://schema.api.met.no/schemas/weatherapi-0.4.xsd" created="2021-09-01T23:33:38Z" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
[1] <meta>\n  <model name="met_public_forecast" termin="2021-09-01T ...
[2] <product class="pointData">\n  <time datatype="forecast" from=" ...
print(argset)
$`**universal**`
[1] "*"

$`**plan**`
[1] "*"

$location_code
[1] "county03"

$`**analysis**`
[1] "*"

$`**automatic**`
[1] "*"

$index
[1] 1

$today
[1] "2021-09-02"

$yesterday
[1] "2021-09-01"

$first_analysis
[1] TRUE

$first_argset
[1] TRUE

$last_analysis
[1] TRUE

$last_argset
[1] TRUE
print(names(schema))
[1] "anon_example_weather"
# **** action **** ----
#' example_weather_import_data_from_api (action)
#' @param data Data
#' @param argset Argset
#' @param schema DB Schema
#' @export
example_weather_import_data_from_api_action <- function(data, argset, schema) {
  # tm_run_task("example_weather_import_data_from_api")

  if(plnr::is_run_directly()){
    # sc::tm_get_plans_argsets_as_dt("example_weather_import_data_from_api")

    index_plan <- 1
    index_analysis <- 1

    data <- sc::tm_get_data("example_weather_import_data_from_api", index_plan = index_plan)
    argset <- sc::tm_get_argset("example_weather_import_data_from_api", index_plan = index_plan, index_analysis = index_analysis)
    schema <- sc::tm_get_schema("example_weather_import_data_from_api")
  }

  # code goes here
  # special case that runs before everything
  if(argset$first_analysis == TRUE){

  }
  
  a <- data$data
  
  baz <- xml2::xml_find_all(a, ".//maxTemperature")
  res <- vector("list", length = length(baz))
  for (i in seq_along(baz)) {
    parent <- xml2::xml_parent(baz[[i]])
    grandparent <- xml2::xml_parent(parent)
    time_from <- xml2::xml_attr(grandparent, "from")
    time_to <- xml2::xml_attr(grandparent, "to")
    x <- xml2::xml_find_all(parent, ".//minTemperature")
    temp_min <- xml2::xml_attr(x, "value")
    x <- xml2::xml_find_all(parent, ".//maxTemperature")
    temp_max <- xml2::xml_attr(x, "value")
    x <- xml2::xml_find_all(parent, ".//precipitation")
    precip <- xml2::xml_attr(x, "value")
    res[[i]] <- data.frame(
      time_from = as.character(time_from),
      time_to = as.character(time_to),
      tx = as.numeric(temp_max),
      tn = as.numeric(temp_min),
      rr = as.numeric(precip)
    )
  }
  res <- rbindlist(res)
  res <- res[stringr::str_sub(time_from, 12, 13) %in% c("00", "06", "12", "18")]
  res[, date := as.Date(stringr::str_sub(time_from, 1, 10))]
  res[, N := .N, by = date]
  res <- res[N == 4]
  res <- res[
    , 
    .(
      tg = NA,
      tx = max(tx),
      tn = min(tn),
      rr = sum(rr)
    ),
    keyby = .(date)
  ]
  
  # we look at the downloaded data
  print("Data after downloading")
  print(res)
  
  # we now need to format it
  res[, granularity_time := "day"]
  res[, sex := "total"]
  res[, age := "total"]
  res[, location_code := argset$location_code]
  
  # fill in missing structural variables
  sc::fill_in_missing_v8(res, border = 2020)
  
  # we look at the downloaded data
  print("Data after missing structural variables filled in")
  print(res)

  # put data in db table
  # schema$SCHEMA_NAME$insert_data(d)
  schema$anon_example_weather$upsert_data(res)
  # schema$SCHEMA_NAME$drop_all_rows_and_then_upsert_data(d)

  # special case that runs after everything
  # copy to anon_web?
  if(argset$last_analysis == TRUE){
    # sc::copy_into_new_table_where(
    #   table_from = "anon_X",
    #   table_to = "anon_webkht"
    # )
  }
}

Run the task

tm_run_task("example_weather_import_data_from_api")
[1] "Data after downloading"
         date tg   tx   tn rr
1: 2021-09-02 NA 24.8 11.4  0
2: 2021-09-03 NA 17.8 10.2  0
3: 2021-09-04 NA 14.2  8.9  0
4: 2021-09-05 NA 17.7  7.9  0
5: 2021-09-06 NA 18.6  8.6  0
6: 2021-09-07 NA 20.1  9.9  0
7: 2021-09-08 NA 21.2  9.9  0
8: 2021-09-09 NA 20.4  9.8  0
9: 2021-09-10 NA 19.4  9.5  0
[1] "Data after missing structural variables filled in"
         date tg   tx   tn rr granularity_time   sex   age
1: 2021-09-02 NA 24.8 11.4  0              day total total
2: 2021-09-03 NA 17.8 10.2  0              day total total
3: 2021-09-04 NA 14.2  8.9  0              day total total
4: 2021-09-05 NA 17.7  7.9  0              day total total
5: 2021-09-06 NA 18.6  8.6  0              day total total
6: 2021-09-07 NA 20.1  9.9  0              day total total
7: 2021-09-08 NA 21.2  9.9  0              day total total
8: 2021-09-09 NA 20.4  9.8  0              day total total
9: 2021-09-10 NA 19.4  9.5  0              day total total
   location_code granularity_geo border isoyearweek    season isoyear
1:      county03          county   2020     2021-35 2021/2022    2021
2:      county03          county   2020     2021-35 2021/2022    2021
3:      county03          county   2020     2021-35 2021/2022    2021
4:      county03          county   2020     2021-35 2021/2022    2021
5:      county03          county   2020     2021-36 2021/2022    2021
6:      county03          county   2020     2021-36 2021/2022    2021
7:      county03          county   2020     2021-36 2021/2022    2021
8:      county03          county   2020     2021-36 2021/2022    2021
9:      county03          county   2020     2021-36 2021/2022    2021
   isoweek seasonweek calyear calmonth calyearmonth country_iso3
1:      35          6    2021        9     2021-M09          nor
2:      35          6    2021        9     2021-M09          nor
3:      35          6    2021        9     2021-M09          nor
4:      35          6    2021        9     2021-M09          nor
5:      36          7    2021        9     2021-M09          nor
6:      36          7    2021        9     2021-M09          nor
7:      36          7    2021        9     2021-M09          nor
8:      36          7    2021        9     2021-M09          nor
9:      36          7    2021        9     2021-M09          nor

Examples of different types of tasks

Importing data

knitr::include_graphics("analytics_tasks_introduction/task_import_data.png")

sc::add_task_from_config_v8(
  name_grouping = "example",
  name_action = "import_data",
  name_variant = NULL,
  cores = 1,
  plan_analysis_fn_name = NULL,
  for_each_plan = plnr::expand_list(
    x = 1
  ),
  for_each_analysis = NULL,
  universal_argset = list(
    folder = sc::path("input", "example")
  ),
  upsert_at_end_of_each_plan = FALSE,
  insert_at_end_of_each_plan = FALSE,
  action_fn_name = "example_import_data_action",
  data_selector_fn_name = "example_import_data_data_selector",
  schema = list(
    # input

    # output
    "output" = sc::config$schemas$output
  ),
  info = "This task does..."
)

Analysis

sc::add_task_from_config_v8(
  name_grouping = "example",
  name_action = "analysis",
  name_variant = NULL,
  cores = 1,
  plan_analysis_fn_name = NULL, 
  for_each_plan = plnr::expand_list(
    location_code = fhidata::norway_locations_names()[granularity_geo %in% c("county")]$location_code
  ),
  for_each_analysis = NULL,
  universal_argset = NULL,
  upsert_at_end_of_each_plan = FALSE,
  insert_at_end_of_each_plan = FALSE,
  action_fn_name = "example_analysis_action",
  data_selector_fn_name = "example_analysis_data_selector",
  schema = list(
    # input
    "input" = sc::config$schemas$input,

    # output
    "output" = sc::config$schemas$output
  ),
  info = "This task does..."
)

Exporting multiple sets of results

sc::add_task_from_config_v8(
  name_grouping = "example",
  name_action = "export_results",
  name_variant = NULL,
  cores = 1,
  plan_analysis_fn_name = NULL, 
  for_each_plan = plnr::expand_list(
    location_code = fhidata::norway_locations_names()[granularity_geo %in% c("county")]$location_code
  ),
  for_each_analysis = NULL,
  universal_argset = list(
    folder = sc::path("output", "example")
  ),
  upsert_at_end_of_each_plan = FALSE,
  insert_at_end_of_each_plan = FALSE,
  action_fn_name = "example_export_results_action",
  data_selector_fn_name = "example_export_results_data_selector",
  schema = list(
    # input
    "input" = sc::config$schemas$input

    # output
  ),
  info = "This task does..."
)

Exporting combined results

sc::add_task_from_config_v8(
  name_grouping = "example",
  name_action = "export_results",
  name_variant = NULL,
  cores = 1,
  plan_analysis_fn_name = NULL, 
  for_each_plan = plnr::expand_list(
    x = 1
  ),
  for_each_analysis = NULL,
  universal_argset = list(
    folder = sc::path("output", "example"),
    granularity_geos = c("nation", "county")
  ),
  upsert_at_end_of_each_plan = FALSE,
  insert_at_end_of_each_plan = FALSE,
  action_fn_name = "example_export_results_action",
  data_selector_fn_name = "example_export_results_data_selector",
  schema = list(
    # input
    "input" = sc::config$schemas$input

    # output
  ),
  info = "This task does..."
)

Changelog

2021-05-26: Draft created.

Corrections

If you see mistakes or want to suggest changes, please create an issue on the source repository.

Reuse

Text and figures are licensed under Creative Commons Attribution CC BY 4.0. Source code is available at https://github.com/folkehelseinstituttet/sykdomspulsen-dokumentasjon, unless otherwise noted. The figures that have been reused from other sources don't fall under this license and can be recognized by a note in their caption: "Figure from ...".