Tutorial 1: Introduction

Lets start building tasks!

true
2021-07-15

Setup

Implementing Sykdomspulsen Core requires a number of functions to be called in the correct order. To make this as simple as possible, we have provided a skeleton implementation at https://github.com/folkehelseinstituttet/scskeleton

You should clone this GitHub repo (https://github.com/folkehelseinstituttet/scskeleton) to your server. This will be the package that you will be working on throughout this tutorial. You may choose to do a global find/replace on scskeleton with the name you want for your R package. We will refer to this R package as your “sc implementation”.

You should also clone https://github.com/folkehelseinstituttet/scexample to your server. This is the end product of the tutorial, and you should refer to it in order to check your work.

For the purposes of this tutorial, we assume that the reader is either using RStudio Server Open Source or RStudio Workbench inside Docker containers that have been built according to the Sykdomspulsen specifications. We will refer to your implementation of RStudio Server Open Source/RStudio Workbench with the generic term “RStudio”.

Load the code

Open scskeleton in RStudio project mode. Restart the R session via Ctrl+Shift+F10, rstudioapi::restartSession(), or Session > Restart R. This will ensure that you have a clean working environment before you begin. You may now load your sc implementation. This can be done via Ctrl+Shift+L, devtools::load_all("."), or Build > Load All.

rstudioapi::restartSession()
devtools::load_all(".")

You can now see which schemas have been loaded. These schemas were included in the skeleton. Note that schemas beginning with config_* are special schemas that are automatically generated by sc.

[1] "config_last_updated"          "config_structure_time"       
[3] "rundate"                      "config_datetime"             
[5] "anon_example_weather_rawdata" "anon_example_weather_data"   

You can now see which tasks have been loaded. These tasks were included in the skeleton.

[1] "weather_download_and_import_rawdata"
[2] "weather_clean_data"                 
[3] "weather_export_plots"               

Running

You can now run these tasks. Note that we use scskeleton::tm_run_task instead of sc::tm_run_task. This is because we want to ensure that scexample::.onLoad has been called.

scskeleton::tm_run_task("weather_download_and_import_rawdata")
scskeleton::tm_run_task("weather_clean_data")
scskeleton::tm_run_task("weather_export_plots")

Developing weather_download_and_import_rawdata

We will walk you through the development of a task that downloads weather data from an API and imports the raw data into a database table.

1. Schemas

The first step when developing any task is specifying the schemas that will be used.

It is strongly recommended that you use the RStudio Addins menu to help you quickly insert code templates.

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/03_db_schemas.r#L18-L64

18 |   ## > anon_example_weather_rawdata ----
19 |   sc::add_schema_v8(
20 |     name_access = c("anon"),
21 |     name_grouping = "example_weather",
22 |     name_variant = "rawdata",
23 |     db_configs = sc::config$db_configs,
24 |     field_types =  c(
25 |       "granularity_time" = "TEXT",
26 |       "granularity_geo" = "TEXT",
27 |       "country_iso3" = "TEXT",
28 |       "location_code" = "TEXT",
29 |       "border" = "INTEGER",
30 |       "age" = "TEXT",
31 |       "sex" = "TEXT",
32 | 
33 |       "date" = "DATE",
34 | 
35 |       "isoyear" = "INTEGER",
36 |       "isoweek" = "INTEGER",
37 |       "isoyearweek" = "TEXT",
38 |       "season" = "TEXT",
39 |       "seasonweek" = "DOUBLE",
40 | 
41 |       "calyear" = "INTEGER",
42 |       "calmonth" = "INTEGER",
43 |       "calyearmonth" = "TEXT",
44 | 
45 |       "temp_max" = "DOUBLE",
46 |       "temp_min" = "DOUBLE",
47 |       "precip" = "DOUBLE"
48 |     ),
49 |     keys = c(
50 |       "granularity_time",
51 |       "location_code",
52 |       "date",
53 |       "age",
54 |       "sex"
55 |     ),
56 |     censors = list(
57 |       anon = list(
58 | 
59 |       )
60 |     ),
61 |     validator_field_types = sc::validator_field_types_sykdomspulsen,
62 |     validator_field_contents = sc::validator_field_contents_sykdomspulsen,
63 |     info = "This db table is used for..."
64 |   )

Schema name

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/03_db_schemas.r#L20-L22

20 |     name_access = c("anon"),
21 |     name_grouping = "example_weather",
22 |     name_variant = "rawdata",

Here we define the name of the schema to be anon_example_weather_rawdata.

Validators

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/03_db_schemas.r#L61-L63

61 |     validator_field_types = sc::validator_field_types_sykdomspulsen,
62 |     validator_field_contents = sc::validator_field_contents_sykdomspulsen,
63 |     info = "This db table is used for..."

These are validators that check:

When using validator_field_types = sc::validator_field_types_sykdomspulsen we expect that the first 16 columns are always as follows (i.e. standardized structural data).

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/03_db_schemas.r#L25-L43

25 |       "granularity_time" = "TEXT",
26 |       "granularity_geo" = "TEXT",
27 |       "country_iso3" = "TEXT",
28 |       "location_code" = "TEXT",
29 |       "border" = "INTEGER",
30 |       "age" = "TEXT",
31 |       "sex" = "TEXT",
32 | 
33 |       "date" = "DATE",
34 | 
35 |       "isoyear" = "INTEGER",
36 |       "isoweek" = "INTEGER",
37 |       "isoyearweek" = "TEXT",
38 |       "season" = "TEXT",
39 |       "seasonweek" = "DOUBLE",
40 | 
41 |       "calyear" = "INTEGER",
42 |       "calmonth" = "INTEGER",
43 |       "calyearmonth" = "TEXT",

Field types/column names

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/03_db_schemas.r#L45-L47

45 |       "temp_max" = "DOUBLE",
46 |       "temp_min" = "DOUBLE",
47 |       "precip" = "DOUBLE"

The extra columns that contain data.

Keys

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/03_db_schemas.r#L49-L55

49 |     keys = c(
50 |       "granularity_time",
51 |       "location_code",
52 |       "date",
53 |       "age",
54 |       "sex"
55 |     ),

The combination of these columns represents a unique row in the dataset.

Censoring

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/03_db_schemas.r#L56-L60

56 |     censors = list(
57 |       anon = list(
58 | 
59 |       )
60 |     ),

Censoring that is applied to the datasets.

2. Task definition (task_from_config)

The second step is defining the task.

It is strongly recommended that you use the RStudio Addins menu to help you quickly insert code templates.

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/04_tasks.r#L21-L43

21 |   sc::add_task_from_config_v8(
22 |     name_grouping = "weather",
23 |     name_action = "download_and_import_rawdata",
24 |     name_variant = NULL,
25 |     cores = 1,
26 |     plan_analysis_fn_name = NULL,
27 |     for_each_plan = plnr::expand_list(
28 |       location_code = fhidata::norway_locations_names()[granularity_geo %in% c("municip")]$location_code
29 |     ),
30 |     for_each_analysis = NULL,
31 |     universal_argset = NULL,
32 |     upsert_at_end_of_each_plan = FALSE,
33 |     insert_at_end_of_each_plan = FALSE,
34 |     action_fn_name = "scskeleton::weather_download_and_import_rawdata_action",
35 |     data_selector_fn_name = "scskeleton::weather_download_and_import_rawdata_data_selector",
36 |     schema = list(
37 |       # input
38 | 
39 |       # output
40 |       "anon_example_weather_rawdata" = sc::config$schemas$anon_example_weather_rawdata
41 |     ),
42 |     info = "This task downloads and imports the raw weather data from MET's API at the municipal level"
43 |   )

Task name

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/04_tasks.r#L22-L24

22 |     name_grouping = "weather",
23 |     name_action = "download_and_import_rawdata",
24 |     name_variant = NULL,

Here we define the name of the task to be weather_download_and_import_rawdata.

CPU cores

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/04_tasks.r#L25-L25

25 |     cores = 1,

We specify that the plans will run sequentially with 1 CPU core. If the number of CPU cores is 2 or higher then the first and last plans will run sequentially, and all the plans in the middle will run in parallel. The first and last plans always run sequentially because this allows us to write “special” code for the first and last plans (i.e. “do this before everything runs” and “do this after everything runs”).

Plan/analysis structure

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/04_tasks.r#L26-L30

26 |     plan_analysis_fn_name = NULL,
27 |     for_each_plan = plnr::expand_list(
28 |       location_code = fhidata::norway_locations_names()[granularity_geo %in% c("municip")]$location_code
29 |     ),
30 |     for_each_analysis = NULL,

We specify the plan/analysis structure here. You may use one of the following combinations:

plan_analysis_fn_name is a (rarely used) function that will provide a list containing the plan/analysis structure. It is generally only used when the plan/analysis structure needs to be reactive depending upon some external data (e.g. “an unknown number of data files are provided each day and need to be cleaned”).

for_each_plan is a list, with each element corresponding to a plan defined by a named list. Within this named list, each of the named elements will be translated into argset elements that are available for the respective plans. This particular for_each_plan defines a task with 356 plans (one for each municipality).

for_each_analysis is nearly the same as for_each_plan. It specifies what kind of analyses you would like to perform within each plan. It is a named list, with each element corresponding to an analysis defined by a named list. Within this named list, each of the named elements will be translated into argset elements that are available for the respective analyses.

An example of a for_each_plan that would correspond to 11 tasks (one for each county):

options(width = 150)
for_each_plan = plnr::expand_list(
  location_code = fhidata::norway_locations_names()[granularity_geo %in% c("county")]$location_code
)
for_each_plan
[[1]]
[[1]]$location_code
[1] "county03"


[[2]]
[[2]]$location_code
[1] "county11"


[[3]]
[[3]]$location_code
[1] "county15"


[[4]]
[[4]]$location_code
[1] "county18"


[[5]]
[[5]]$location_code
[1] "county30"


[[6]]
[[6]]$location_code
[1] "county34"


[[7]]
[[7]]$location_code
[1] "county38"


[[8]]
[[8]]$location_code
[1] "county42"


[[9]]
[[9]]$location_code
[1] "county46"


[[10]]
[[10]]$location_code
[1] "county50"


[[11]]
[[11]]$location_code
[1] "county54"

Universal argset

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/04_tasks.r#L31-L31

31 |     universal_argset = NULL,

Here we can specify a named list, where each of the named elements will be translated into argset elements that are available for all plans/analyses.

Upsert/insert at end of each plan

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/04_tasks.r#L32-L33

32 |     upsert_at_end_of_each_plan = FALSE,
33 |     insert_at_end_of_each_plan = FALSE,

If you include a schema called output, then these options will let you upsert/insert the returned value from action_fn_name at the end of each plan. This is an important nuance, because when you write/develop your task, you can (typically) only write one function (action_fn_name) that is applied to all analyses. This means that if your action_fn wants to upsert/insert data to a schema, it (typically) will do this within every analysis. If you have an analysis-heavy task, then this will be a lot of frequent traffic to the databases, which may affect performance. By using these flags, you can restrict the upsert/insert to the end of the plan, which may increase performance.

action_fn_name

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/04_tasks.r#L34-L34

34 |     action_fn_name = "scskeleton::weather_download_and_import_rawdata_action",

Here we specify the name of the function that corresponds to the action. That is, the function that is called in every analysis. Note that:

data_selector_fn_name

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/04_tasks.r#L35-L35

35 |     data_selector_fn_name = "scskeleton::weather_download_and_import_rawdata_data_selector",

Here we specify the name of the function that corresponds to the data selector. That is, the function that is called at the start of every plan to provide data to all of the analyses inside the plan. Note that:

Schemas

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/04_tasks.r#L36-L41

36 |     schema = list(
37 |       # input
38 | 
39 |       # output
40 |       "anon_example_weather_rawdata" = sc::config$schemas$anon_example_weather_rawdata
41 |     ),

Here we specify a named list, where each element consists of a schema. The names will be passed through as schema$name in action_fn_name and data_selector_fn_name.

3. data_selector_fn

The third step is defining a data selector function. This is the function that will perform the “one data-pull per plan” and subsequently provide the data to the action.

It is strongly recommended that you use the RStudio Addins menu to help you quickly insert code templates.

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_download_and_import_rawdata.r#L88-L119

 88 | # **** data_selector **** ----
 89 | #' weather_download_and_import_rawdata (data selector)
 90 | #' @param argset Argset
 91 | #' @param schema DB Schema
 92 | #' @export
 93 | weather_download_and_import_rawdata_data_selector <- function(argset, schema) {
 94 |   if (plnr::is_run_directly()) {
 95 |     # sc::tm_get_plans_argsets_as_dt("weather_download_and_import_rawdata")
 96 | 
 97 |     index_plan <- 1
 98 | 
 99 |     argset <- sc::tm_get_argset("weather_download_and_import_rawdata", index_plan = index_plan)
100 |     schema <- sc::tm_get_schema("weather_download_and_import_rawdata")
101 |   }
102 | 
103 |   # find the mid lat/long for the specified location_code
104 |   gps <- fhimaps::norway_lau2_map_b2020_default_dt[location_code == argset$location_code,.(
105 |     lat = mean(lat),
106 |     long = mean(long)
107 |   )]
108 | 
109 |   # download the forecast for the specified location_code
110 |   d <- httr::GET(glue::glue("https://api.met.no/weatherapi/locationforecast/2.0/classic?lat={gps$lat}&lon={gps$long}"), httr::content_type_xml())
111 |   d <- xml2::read_xml(d$content)
112 | 
113 |   # The variable returned must be a named list
114 |   retval <- list(
115 |     "data" = d
116 |   )
117 | 
118 |   retval
119 | }

plnr::is_run_directly()

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_download_and_import_rawdata.r#L94-L101

 94 |   if (plnr::is_run_directly()) {
 95 |     # sc::tm_get_plans_argsets_as_dt("weather_download_and_import_rawdata")
 96 | 
 97 |     index_plan <- 1
 98 | 
 99 |     argset <- sc::tm_get_argset("weather_download_and_import_rawdata", index_plan = index_plan)
100 |     schema <- sc::tm_get_schema("weather_download_and_import_rawdata")
101 |   }

At the top of all data_selector_fns you will see a section of code wrapped inside if (plnr::is_run_directly()) {. This code will only be run if it is manually highlighted inside RStudio and then “run”. This is extremely beneficial to the user, because it means that the user can easily write small pieces of code that are only used during development, which will not be run when the code is run “properly”.

Sykdomspulsen core uses these sections to let the user “jump” directly into the function. Look at the arguments for weather_download_and_import_rawdata_data_selector and you will see that it needs argset and schema.

The code inside if (plnr::is_run_directly()) { loads argset and schema for index_plan = 1. By running these lines, you can treat the inside of weather_download_and_import_rawdata_data_selector as an interactive script!

This makes the development of the code extremely easy as “everything is an interactive script”.

Getting data

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_download_and_import_rawdata.r#L103-L111

103 |   # find the mid lat/long for the specified location_code
104 |   gps <- fhimaps::norway_lau2_map_b2020_default_dt[location_code == argset$location_code,.(
105 |     lat = mean(lat),
106 |     long = mean(long)
107 |   )]
108 | 
109 |   # download the forecast for the specified location_code
110 |   d <- httr::GET(glue::glue("https://api.met.no/weatherapi/locationforecast/2.0/classic?lat={gps$lat}&lon={gps$long}"), httr::content_type_xml())
111 |   d <- xml2::read_xml(d$content)

The majority of the data_selector_fn is concerned with selecting data (obviously). Remember that the data should be selected to meet the needs of the plan. If you have 11 plans (one for each county), then your data_selector_fn should only extract data for the county of interest.

Returning data

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_download_and_import_rawdata.r#L113-L116

113 |   # The variable returned must be a named list
114 |   retval <- list(
115 |     "data" = d
116 |   )

data_selector_fn needs to return a named list. This will be made available to the user in action_fn (weather_download_and_import_rawdata_action) via the argument data.

4. action_fn

The fourth step is defining an action function. This is the function that will perform the “action” within the the analysis. That is, given:

What do you actually want to do with them?

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_download_and_import_rawdata.r#L1-L86

 1 | # **** action **** ----
 2 | #' weather_download_and_import_rawdata (action)
 3 | #' @param data Data
 4 | #' @param argset Argset
 5 | #' @param schema DB Schema
 6 | #' @export
 7 | weather_download_and_import_rawdata_action <- function(data, argset, schema) {
 8 |   # tm_run_task("weather_download_and_import_rawdata")
 9 | 
10 |   if (plnr::is_run_directly()) {
11 |     # sc::tm_get_plans_argsets_as_dt("weather_download_and_import_rawdata")
12 | 
13 |     index_plan <- 1
14 |     index_analysis <- 1
15 | 
16 |     data <- sc::tm_get_data("weather_download_and_import_rawdata", index_plan = index_plan)
17 |     argset <- sc::tm_get_argset("weather_download_and_import_rawdata", index_plan = index_plan, index_analysis = index_analysis)
18 |     schema <- sc::tm_get_schema("weather_download_and_import_rawdata")
19 |   }
20 | 
21 |   # special case that runs before everything
22 |   if (argset$first_analysis == TRUE) {
23 | 
24 |   }
25 | 
26 |   a <- data$data
27 | 
28 |   baz <- xml2::xml_find_all(a, ".//maxTemperature")
29 |   res <- vector("list", length = length(baz))
30 |   for (i in seq_along(baz)) {
31 |     parent <- xml2::xml_parent(baz[[i]])
32 |     grandparent <- xml2::xml_parent(parent)
33 |     time_from <- xml2::xml_attr(grandparent, "from")
34 |     time_to <- xml2::xml_attr(grandparent, "to")
35 |     x <- xml2::xml_find_all(parent, ".//minTemperature")
36 |     temp_min <- xml2::xml_attr(x, "value")
37 |     x <- xml2::xml_find_all(parent, ".//maxTemperature")
38 |     temp_max <- xml2::xml_attr(x, "value")
39 |     x <- xml2::xml_find_all(parent, ".//precipitation")
40 |     precip <- xml2::xml_attr(x, "value")
41 |     res[[i]] <- data.frame(
42 |       time_from = as.character(time_from),
43 |       time_to = as.character(time_to),
44 |       temp_max = as.numeric(temp_max),
45 |       temp_min = as.numeric(temp_min),
46 |       precip = as.numeric(precip)
47 |     )
48 |   }
49 |   res <- rbindlist(res)
50 |   res <- res[stringr::str_sub(time_from, 12, 13) %in% c("00", "06", "12", "18")]
51 |   res[, date := as.Date(stringr::str_sub(time_from, 1, 10))]
52 |   res[, N := .N, by = date]
53 |   res <- res[N == 4]
54 |   res <- res[
55 |     ,
56 |     .(
57 |       temp_max = max(temp_max),
58 |       temp_min = min(temp_min),
59 |       precip = sum(precip)
60 |     ),
61 |     keyby = .(date)
62 |   ]
63 | 
64 |   # we look at the downloaded data
65 |   # res
66 | 
67 |   # we now need to format it
68 |   res[, granularity_time := "day"]
69 |   res[, sex := "total"]
70 |   res[, age := "total"]
71 |   res[, location_code := argset$location_code]
72 | 
73 |   # fill in missing structural variables
74 |   sc::fill_in_missing_v8(res, border = 2020)
75 | 
76 |   # we look at the downloaded data
77 |   # res
78 | 
79 |   # put data in db table
80 |   schema$anon_example_weather_rawdata$insert_data(res)
81 | 
82 |   # special case that runs after everything
83 |   if (argset$last_analysis == TRUE) {
84 | 
85 |   }
86 | }

plnr::is_run_directly()

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_download_and_import_rawdata.r#L95-L102

 95 |     # sc::tm_get_plans_argsets_as_dt("weather_download_and_import_rawdata")
 96 | 
 97 |     index_plan <- 1
 98 | 
 99 |     argset <- sc::tm_get_argset("weather_download_and_import_rawdata", index_plan = index_plan)
100 |     schema <- sc::tm_get_schema("weather_download_and_import_rawdata")
101 |   }
102 | 

At the top of all action_fns you will see a section of code wrapped inside if (plnr::is_run_directly()) {. This code will only be run if it is manually highlighted inside RStudio and then “run”. This is extremely beneficial to the user, because it means that the user can easily write small pieces of code that are only used during development, which will not be run when the code is run “properly”.

Sykdomspulsen core uses these sections to let the user “jump” directly into the function. Look at the arguments for weather_download_and_import_rawdata_data_selector and you will see that it needs data, argset and schema. The code inside if (plnr::is_run_directly()) { loads data, argset and schema for index_plan = 1 and index_analysis = 1. By running these lines, you can treat the inside of weather_download_and_import_rawdata_action as an interactive script!

This makes the development of the code extremely easy as “everything is an interactive script”.

argset$first_analysis

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_download_and_import_rawdata.r#L21-L24

21 |   # special case that runs before everything
22 |   if (argset$first_analysis == TRUE) {
23 | 
24 |   }

This code is only run if it is the first analysis. It is typically used to drop rows in a database, so that the following code may insert data (faster) instead of using upsert data (slower).

Doing things

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_download_and_import_rawdata.r#L26-L80

26 |   a <- data$data
27 | 
28 |   baz <- xml2::xml_find_all(a, ".//maxTemperature")
29 |   res <- vector("list", length = length(baz))
30 |   for (i in seq_along(baz)) {
31 |     parent <- xml2::xml_parent(baz[[i]])
32 |     grandparent <- xml2::xml_parent(parent)
33 |     time_from <- xml2::xml_attr(grandparent, "from")
34 |     time_to <- xml2::xml_attr(grandparent, "to")
35 |     x <- xml2::xml_find_all(parent, ".//minTemperature")
36 |     temp_min <- xml2::xml_attr(x, "value")
37 |     x <- xml2::xml_find_all(parent, ".//maxTemperature")
38 |     temp_max <- xml2::xml_attr(x, "value")
39 |     x <- xml2::xml_find_all(parent, ".//precipitation")
40 |     precip <- xml2::xml_attr(x, "value")
41 |     res[[i]] <- data.frame(
42 |       time_from = as.character(time_from),
43 |       time_to = as.character(time_to),
44 |       temp_max = as.numeric(temp_max),
45 |       temp_min = as.numeric(temp_min),
46 |       precip = as.numeric(precip)
47 |     )
48 |   }
49 |   res <- rbindlist(res)
50 |   res <- res[stringr::str_sub(time_from, 12, 13) %in% c("00", "06", "12", "18")]
51 |   res[, date := as.Date(stringr::str_sub(time_from, 1, 10))]
52 |   res[, N := .N, by = date]
53 |   res <- res[N == 4]
54 |   res <- res[
55 |     ,
56 |     .(
57 |       temp_max = max(temp_max),
58 |       temp_min = min(temp_min),
59 |       precip = sum(precip)
60 |     ),
61 |     keyby = .(date)
62 |   ]
63 | 
64 |   # we look at the downloaded data
65 |   # res
66 | 
67 |   # we now need to format it
68 |   res[, granularity_time := "day"]
69 |   res[, sex := "total"]
70 |   res[, age := "total"]
71 |   res[, location_code := argset$location_code]
72 | 
73 |   # fill in missing structural variables
74 |   sc::fill_in_missing_v8(res, border = 2020)
75 | 
76 |   # we look at the downloaded data
77 |   # res
78 | 
79 |   # put data in db table
80 |   schema$anon_example_weather_rawdata$insert_data(res)

Every analysis will perform this code.

Accessing data from data_selector_fn

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_download_and_import_rawdata.r#L26-L26

26 |   a <- data$data

Here you see that we access the data that was passed to us from data_selector_fn

Structural data/sc::fill_in_missing_v8

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_download_and_import_rawdata.r#L68-L74

68 |   res[, granularity_time := "day"]
69 |   res[, sex := "total"]
70 |   res[, age := "total"]
71 |   res[, location_code := argset$location_code]
72 | 
73 |   # fill in missing structural variables
74 |   sc::fill_in_missing_v8(res, border = 2020)

We have 16 structural data columns that we expect. These columns typically have a lot of redundancy (e.g. date, isoyear, isoyearweek). To make things easier, we provide a function called sc::fill_in_missing_v8 that uses the information present in the dataset to try and impute the missing structural data.

Insert/upsert to databases

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_download_and_import_rawdata.r#L80-L80

80 |   schema$anon_example_weather_rawdata$insert_data(res)

Here we insert the data to the database table.

Remember that insert is an append (so the data cannot already exist in the database table), while upsert is “update (overwrite) if already exists, insert (append) if it doesn’t”.

argset$last_analysis

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_download_and_import_rawdata.r#L21-L24

21 |   # special case that runs before everything
22 |   if (argset$first_analysis == TRUE) {
23 | 
24 |   }

This code is only run if it is the last analysis. It is typically used to copy an internal database table (i.e. one that the public is not directly viewing) to an external database (i.e. one that the public is directly viewing).

By distinguishing between internal database tables (e.g. anon_webkhtint_test) and external database tables (e.g. anon_webkht_test) we can do whatever we want to anon_webkhtint_test while anon_webkht_test remains in place and untouched. This makes it less likely that any mistakes will affect any APIs or websites that the public uses.

Which plan/analysis is which?

Inside the if (plnr::is_run_directly()) { sections, you specify index_plan and index_analysis. However, these are just numbers. If you want to specifically look at the plan for Oslo municipality, how do you know which index_plan this corresponds to?

options(width = 150)
sc::tm_get_plans_argsets_as_dt("weather_download_and_import_rawdata")
     index_plan index_analysis **universal** **plan** location_code **analysis** **automatic** index      today  yesterday first_analysis
  1:          1              1             *        *   municip0301            *             *     1 2021-09-02 2021-09-01           TRUE
  2:          2              1             *        *   municip1101            *             *     2 2021-09-02 2021-09-01          FALSE
  3:          3              1             *        *   municip1103            *             *     3 2021-09-02 2021-09-01          FALSE
  4:          4              1             *        *   municip1106            *             *     4 2021-09-02 2021-09-01          FALSE
  5:          5              1             *        *   municip1108            *             *     5 2021-09-02 2021-09-01          FALSE
 ---                                                                                                                                     
352:        352              1             *        *   municip5440            *             *   352 2021-09-02 2021-09-01          FALSE
353:        353              1             *        *   municip5441            *             *   353 2021-09-02 2021-09-01          FALSE
354:        354              1             *        *   municip5442            *             *   354 2021-09-02 2021-09-01          FALSE
355:        355              1             *        *   municip5443            *             *   355 2021-09-02 2021-09-01          FALSE
356:        356              1             *        *   municip5444            *             *   356 2021-09-02 2021-09-01          FALSE
     first_argset last_analysis last_argset
  1:         TRUE         FALSE       FALSE
  2:        FALSE         FALSE       FALSE
  3:        FALSE         FALSE       FALSE
  4:        FALSE         FALSE       FALSE
  5:        FALSE         FALSE       FALSE
 ---                                       
352:        FALSE         FALSE       FALSE
353:        FALSE         FALSE       FALSE
354:        FALSE         FALSE       FALSE
355:        FALSE         FALSE       FALSE
356:        FALSE          TRUE        TRUE

Developing weather_clean_data

The previous task (weather_download_and_import_rawdata) focused on downloading raw data from an API and inserting it into a database table.

The task weather_clean_data focuses on cleaning the raw data and inserting it in another database table. That is, the data source is a Sykdomspulsen Core database table, and the output is also a Sykdomspulsen Core database table.

We will walk you through the development of weather_clean_data, however, the description of this task will be less comprehensive than the previous task, and will focus primarily on parts that are novel.

1. Schemas

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/03_db_schemas.r

  1 | # ******************************************************************************
  2 | # ******************************************************************************
  3 | #
  4 | # 03_db_schemas.r
  5 | #
  6 | # PURPOSE 1:
  7 | #   Set db schemas that are used throughout the package.
  8 | #
  9 | #   These are basically all of the database tables that you will be writing to,
 10 | #   and reading from.
 11 | #
 12 | # ******************************************************************************
 13 | # ******************************************************************************
 14 | 
 15 | set_db_schemas <- function() {
 16 |   # __________ ----
 17 |   # Weather  ----
 18 |   ## > anon_example_weather_rawdata ----
 19 |   sc::add_schema_v8(
 20 |     name_access = c("anon"),
 21 |     name_grouping = "example_weather",
 22 |     name_variant = "rawdata",
 23 |     db_configs = sc::config$db_configs,
 24 |     field_types =  c(
 25 |       "granularity_time" = "TEXT",
 26 |       "granularity_geo" = "TEXT",
 27 |       "country_iso3" = "TEXT",
 28 |       "location_code" = "TEXT",
 29 |       "border" = "INTEGER",
 30 |       "age" = "TEXT",
 31 |       "sex" = "TEXT",
 32 | 
 33 |       "date" = "DATE",
 34 | 
 35 |       "isoyear" = "INTEGER",
 36 |       "isoweek" = "INTEGER",
 37 |       "isoyearweek" = "TEXT",
 38 |       "season" = "TEXT",
 39 |       "seasonweek" = "DOUBLE",
 40 | 
 41 |       "calyear" = "INTEGER",
 42 |       "calmonth" = "INTEGER",
 43 |       "calyearmonth" = "TEXT",
 44 | 
 45 |       "temp_max" = "DOUBLE",
 46 |       "temp_min" = "DOUBLE",
 47 |       "precip" = "DOUBLE"
 48 |     ),
 49 |     keys = c(
 50 |       "granularity_time",
 51 |       "location_code",
 52 |       "date",
 53 |       "age",
 54 |       "sex"
 55 |     ),
 56 |     censors = list(
 57 |       anon = list(
 58 | 
 59 |       )
 60 |     ),
 61 |     validator_field_types = sc::validator_field_types_sykdomspulsen,
 62 |     validator_field_contents = sc::validator_field_contents_sykdomspulsen,
 63 |     info = "This db table is used for..."
 64 |   )
 65 | 
 66 |   ## > anon_example_weather_data ----
 67 |   sc::add_schema_v8(
 68 |     name_access = c("anon"),
 69 |     name_grouping = "example_weather",
 70 |     name_variant = "data",
 71 |     db_configs = sc::config$db_configs,
 72 |     field_types =  c(
 73 |       "granularity_time" = "TEXT",
 74 |       "granularity_geo" = "TEXT",
 75 |       "country_iso3" = "TEXT",
 76 |       "location_code" = "TEXT",
 77 |       "border" = "INTEGER",
 78 |       "age" = "TEXT",
 79 |       "sex" = "TEXT",
 80 | 
 81 |       "date" = "DATE",
 82 | 
 83 |       "isoyear" = "INTEGER",
 84 |       "isoweek" = "INTEGER",
 85 |       "isoyearweek" = "TEXT",
 86 |       "season" = "TEXT",
 87 |       "seasonweek" = "DOUBLE",
 88 | 
 89 |       "calyear" = "INTEGER",
 90 |       "calmonth" = "INTEGER",
 91 |       "calyearmonth" = "TEXT",
 92 | 
 93 |       "temp_max" = "DOUBLE",
 94 |       "temp_min" = "DOUBLE",
 95 |       "precip" = "DOUBLE"
 96 |     ),
 97 |     keys = c(
 98 |       "granularity_time",
 99 |       "location_code",
100 |       "date",
101 |       "age",
102 |       "sex"
103 |     ),
104 |     censors = list(
105 |       anon = list(
106 | 
107 |       )
108 |     ),
109 |     validator_field_types = sc::validator_field_types_sykdomspulsen,
110 |     validator_field_contents = sc::validator_field_contents_sykdomspulsen,
111 |     info = "This db table is used for..."
112 |   )
113 | }

2. Task definition (task_from_config)

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/04_tasks.r#L45-L70

45 |   ## > weather_clean_data ----
46 |   # tm_run_task("weather_clean_data")
47 |   sc::add_task_from_config_v8(
48 |     name_grouping = "weather",
49 |     name_action = "clean_data",
50 |     name_variant = NULL,
51 |     cores = 1,
52 |     plan_analysis_fn_name = NULL,
53 |     for_each_plan = plnr::expand_list(
54 |       x = 1
55 |     ),
56 |     for_each_analysis = NULL,
57 |     universal_argset = NULL,
58 |     upsert_at_end_of_each_plan = FALSE,
59 |     insert_at_end_of_each_plan = FALSE,
60 |     action_fn_name = "scskeleton::weather_clean_data_action",
61 |     data_selector_fn_name = "scskeleton::weather_clean_data_data_selector",
62 |     schema = list(
63 |       # input
64 |       "anon_example_weather_rawdata" = sc::config$schemas$anon_example_weather_rawdata,
65 | 
66 |       # output
67 |       "anon_example_weather_data" = sc::config$schemas$anon_example_weather_data
68 |     ),
69 |     info = "This task cleans the raw data and aggregates it to county and national level"
70 |   )

Plan/analysis structure

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/04_tasks.r#L52-L56

52 |     plan_analysis_fn_name = NULL,
53 |     for_each_plan = plnr::expand_list(
54 |       x = 1
55 |     ),
56 |     for_each_analysis = NULL,

For this particular task, we have decided to only implement one plan containing one analysis, which will process all of the data at once.

If we were only aggregating municipality data to the county level, we could have implemented 11 plans (one for each county). However, because we are also aggregating to the national level, we need all the data available at once.

Schemas

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/04_tasks.r#L62-L68

62 |     schema = list(
63 |       # input
64 |       "anon_example_weather_rawdata" = sc::config$schemas$anon_example_weather_rawdata,
65 | 
66 |       # output
67 |       "anon_example_weather_data" = sc::config$schemas$anon_example_weather_data
68 |     ),

We need to specify the schemas that are used for both input and output.

3. data_selector_fn

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_clean_data.r#L184-L251

184 | # **** data_selector **** ----
185 | #' weather_clean_data (data selector)
186 | #' @param argset Argset
187 | #' @param schema DB Schema
188 | #' @export
189 | weather_clean_data_data_selector <- function(argset, schema) {
190 |   if (plnr::is_run_directly()) {
191 |     # sc::tm_get_plans_argsets_as_dt("weather_clean_data")
192 | 
193 |     index_plan <- 1
194 | 
195 |     argset <- sc::tm_get_argset("weather_clean_data", index_plan = index_plan)
196 |     schema <- sc::tm_get_schema("weather_clean_data")
197 |   }
198 | 
199 |   # The database schemas can be accessed here
200 |   d <- schema$anon_example_weather_rawdata$tbl() %>%
201 |     sc::mandatory_db_filter(
202 |       granularity_time = "day",
203 |       granularity_time_not = NULL,
204 |       granularity_geo = "municip",
205 |       granularity_geo_not = NULL,
206 |       country_iso3 = NULL,
207 |       location_code = NULL,
208 |       age = "total",
209 |       age_not = NULL,
210 |       sex = "total",
211 |       sex_not = NULL
212 |     ) %>%
213 |     dplyr::select(
214 |       granularity_time,
215 |       # granularity_geo,
216 |       # country_iso3,
217 |       location_code,
218 |       # border,
219 |       # age,
220 |       # sex,
221 | 
222 |       date,
223 | 
224 |       # isoyear,
225 |       # isoweek,
226 |       # isoyearweek,
227 |       # season,
228 |       # seasonweek,
229 | 
230 |       # calyear,
231 |       # calmonth,
232 |       # calyearmonth,
233 | 
234 |       temp_max,
235 |       temp_min,
236 |       precip
237 |     ) %>%
238 |     dplyr::collect() %>%
239 |     as.data.table() %>%
240 |     setorder(
241 |       location_code,
242 |       date
243 |     )
244 | 
245 |   # The variable returned must be a named list
246 |   retval <- list(
247 |     "day_municip" = d
248 |   )
249 | 
250 |   retval
251 | }

Getting data (specify the schema)

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_clean_data.r#L200-L200

200 |   d <- schema$anon_example_weather_rawdata$tbl() %>%

We start by connecting to the database table linked to the schema.

Getting data (sc::mandatory_db_filter)

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_clean_data.r#L201-L212

201 |     sc::mandatory_db_filter(
202 |       granularity_time = "day",
203 |       granularity_time_not = NULL,
204 |       granularity_geo = "municip",
205 |       granularity_geo_not = NULL,
206 |       country_iso3 = NULL,
207 |       location_code = NULL,
208 |       age = "total",
209 |       age_not = NULL,
210 |       sex = "total",
211 |       sex_not = NULL
212 |     ) %>%

We then introduce the sc::mandatory_db_filter. This is a filter on the most common structural variables. We say this is “mandatory” because we want the user to always keep in mind:

You will notice that we don’t use all of the arguments passed into the function, but we use as many as we can.

Getting data (dplyr::select)

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_clean_data.r#L213-L237

213 |     dplyr::select(
214 |       granularity_time,
215 |       # granularity_geo,
216 |       # country_iso3,
217 |       location_code,
218 |       # border,
219 |       # age,
220 |       # sex,
221 | 
222 |       date,
223 | 
224 |       # isoyear,
225 |       # isoweek,
226 |       # isoyearweek,
227 |       # season,
228 |       # seasonweek,
229 | 
230 |       # calyear,
231 |       # calmonth,
232 |       # calyearmonth,
233 | 
234 |       temp_max,
235 |       temp_min,
236 |       precip
237 |     ) %>%

We always want to be as explicit as possible with what data is needed to do the job. To achieve this, we use dplyr::select to select the columns that we are interested in.

If you want to quickly generate a dplyr::select boilerplate for your schema that you can copy/paste, you can do this via the following:

schema$anon_example_weather_rawdata$print_dplyr_select()
dplyr::select(
  granularity_time,
  granularity_geo,
  country_iso3,
  location_code,
  border,
  age,
  sex,
  date,
  isoyear,
  isoweek,
  isoyearweek,
  season,
  seasonweek,
  calyear,
  calmonth,
  calyearmonth,
  temp_max,
  temp_min,
  precip
) %>%

Getting data (dplyr::collect)

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_clean_data.r#L238-L238

238 |     dplyr::collect() %>%

This executes the SQL call to the database.

Getting data (data.table and setorder)

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_clean_data.r#L239-L243

239 |     as.data.table() %>%
240 |     setorder(
241 |       location_code,
242 |       date
243 |     )

Firstly, as a general rule we prefer to use data.table. So we would like to convert our data.frame to a data.table.

Secondly, we are not guaranteed to receive our data in any particular order. Because of this, it is very important that we sort our data on arrival (if this is relevant to the action_fn, e.g. if cumulative sums are created).

4. action_fn

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_clean_data.r#L1-L182

  1 | # **** action **** ----
  2 | #' weather_clean_data (action)
  3 | #' @param data Data
  4 | #' @param argset Argset
  5 | #' @param schema DB Schema
  6 | #' @export
  7 | weather_clean_data_action <- function(data, argset, schema) {
  8 |   # tm_run_task("weather_clean_data")
  9 | 
 10 |   if (plnr::is_run_directly()) {
 11 |     # sc::tm_get_plans_argsets_as_dt("weather_clean_data")
 12 | 
 13 |     index_plan <- 1
 14 |     index_analysis <- 1
 15 | 
 16 |     data <- sc::tm_get_data("weather_clean_data", index_plan = index_plan)
 17 |     argset <- sc::tm_get_argset("weather_clean_data", index_plan = index_plan, index_analysis = index_analysis)
 18 |     schema <- sc::tm_get_schema("weather_clean_data")
 19 |   }
 20 | 
 21 |   # special case that runs before everything
 22 |   if (argset$first_analysis == TRUE) {
 23 | 
 24 |   }
 25 | 
 26 |   # make sure there's no missing data via the creation of a skeleton
 27 |   # https://folkehelseinstituttet.github.io/fhidata/articles/Skeletons.html
 28 | 
 29 |   # Create a variable (possibly a list) to hold the data
 30 |   d_agg <- list()
 31 |   d_agg$day_municip <- copy(data$day_municip)
 32 | 
 33 |   # Pull out important dates
 34 |   date_min <- min(d_agg$day_municip$date, na.rm = T)
 35 |   date_max <- max(d_agg$day_municip$date, na.rm = T)
 36 | 
 37 |   # Create `multiskeleton`
 38 |   # granularity_geo should have the following groups:
 39 |   # - nodata (when no data is available, and there is no "finer" data available to aggregate up)
 40 |   # - all levels of granularity_geo where you have data available
 41 |   # If you do not have data for a specific granularity_geo, but there is "finer" data available
 42 |   # then you should not include this granularity_geo in the multiskeleton, because you will create
 43 |   # it later when you aggregate up your data (baregion)
 44 |   multiskeleton_day <- fhidata::make_skeleton(
 45 |     date_min = date_min,
 46 |     date_max = date_max,
 47 |     granularity_geo = list(
 48 |       "nodata" = c(
 49 |         "wardoslo",
 50 |         "extrawardoslo",
 51 |         "missingwardoslo",
 52 |         "wardbergen",
 53 |         "missingwardbergen",
 54 |         "wardstavanger",
 55 |         "missingwardstavanger",
 56 |         "notmainlandmunicip",
 57 |         "missingmunicip",
 58 |         "notmainlandcounty",
 59 |         "missingcounty"
 60 |       ),
 61 |       "municip" = c(
 62 |         "municip"
 63 |       )
 64 |     )
 65 |   )
 66 | 
 67 |   # Merge in the information you have at different geographical granularities
 68 |   # one level at a time
 69 |   # municip
 70 |   multiskeleton_day$municip[
 71 |     d_agg$day_municip,
 72 |     on = c("location_code", "date"),
 73 |     c(
 74 |       "temp_max",
 75 |       "temp_min",
 76 |       "precip"
 77 |     ) := .(
 78 |       temp_max,
 79 |       temp_min,
 80 |       precip
 81 |     )
 82 |   ]
 83 | 
 84 |   multiskeleton_day$municip[]
 85 | 
 86 |   # Aggregate up to higher geographical granularities (county)
 87 |   multiskeleton_day$county <- multiskeleton_day$municip[
 88 |     fhidata::norway_locations_hierarchy(
 89 |       from = "municip",
 90 |       to = "county"
 91 |     ),
 92 |     on = c(
 93 |       "location_code==from_code"
 94 |     )
 95 |   ][,
 96 |     .(
 97 |       temp_max = mean(temp_max, na.rm = T),
 98 |       temp_min = mean(temp_min, na.rm = T),
 99 |       precip = mean(precip, na.rm = T),
100 |       granularity_geo = "county"
101 |     ),
102 |     by = .(
103 |       granularity_time,
104 |       date,
105 |       location_code = to_code
106 |     )
107 |   ]
108 | 
109 |   multiskeleton_day$county[]
110 | 
111 |   # Aggregate up to higher geographical granularities (nation)
112 |   multiskeleton_day$nation <- multiskeleton_day$municip[
113 |     ,
114 |     .(
115 |       temp_max = mean(temp_max, na.rm = T),
116 |       temp_min = mean(temp_min, na.rm = T),
117 |       precip = mean(precip, na.rm = T),
118 |       granularity_geo = "nation",
119 |       location_code = "norge"
120 |     ),
121 |     by = .(
122 |       granularity_time,
123 |       date
124 |     )
125 |   ]
126 | 
127 |   multiskeleton_day$nation[]
128 | 
129 |   # combine all the different granularity_geos
130 |   skeleton_day <- rbindlist(multiskeleton_day, fill = TRUE, use.names = TRUE)
131 | 
132 |   skeleton_day[]
133 | 
134 |   # 10. (If desirable) aggregate up to higher time granularities
135 |   # if necessary, it is now easy to aggregate up to weekly data from here
136 |   skeleton_isoweek <- copy(skeleton_day)
137 |   skeleton_isoweek[, isoyearweek := fhiplot::isoyearweek_c(date)]
138 |   skeleton_isoweek <- skeleton_isoweek[
139 |     ,
140 |     .(
141 |       temp_max = mean(temp_max, na.rm = T),
142 |       temp_min = mean(temp_min, na.rm = T),
143 |       precip = mean(precip, na.rm = T),
144 |       granularity_time = "isoweek"
145 |     ),
146 |     keyby = .(
147 |       isoyearweek,
148 |       granularity_geo,
149 |       location_code
150 |     )
151 |   ]
152 | 
153 |   skeleton_isoweek[]
154 | 
155 |   # we now need to format it and fill in missing structural variables
156 |   # day
157 |   skeleton_day[, sex := "total"]
158 |   skeleton_day[, age := "total"]
159 |   sc::fill_in_missing_v8(skeleton_day, border = config$border)
160 | 
161 |   # isoweek
162 |   skeleton_isoweek[, sex := "total"]
163 |   skeleton_isoweek[, age := "total"]
164 |   sc::fill_in_missing_v8(skeleton_isoweek, border = config$border)
165 |   skeleton_isoweek[, date := as.Date(date)]
166 | 
167 |   skeleton <- rbindlist(
168 |     list(
169 |       skeleton_day,
170 |       skeleton_isoweek
171 |     ),
172 |     use.names = T
173 |   )
174 | 
175 |   # put data in db table
176 |   schema$anon_example_weather_data$drop_all_rows_and_then_insert_data(skeleton)
177 | 
178 |   # special case that runs after everything
179 |   if (argset$last_analysis == TRUE) {
180 | 
181 |   }
182 | }

Skeleton

Read here about the concept of skeletons

Developing weather_export_plots

The task weather_export_plots takes the cleaned data and plots 11 graphs (one for each county).

1. Schemas

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/03_db_schemas.r#L66-L112

 66 |   ## > anon_example_weather_data ----
 67 |   sc::add_schema_v8(
 68 |     name_access = c("anon"),
 69 |     name_grouping = "example_weather",
 70 |     name_variant = "data",
 71 |     db_configs = sc::config$db_configs,
 72 |     field_types =  c(
 73 |       "granularity_time" = "TEXT",
 74 |       "granularity_geo" = "TEXT",
 75 |       "country_iso3" = "TEXT",
 76 |       "location_code" = "TEXT",
 77 |       "border" = "INTEGER",
 78 |       "age" = "TEXT",
 79 |       "sex" = "TEXT",
 80 | 
 81 |       "date" = "DATE",
 82 | 
 83 |       "isoyear" = "INTEGER",
 84 |       "isoweek" = "INTEGER",
 85 |       "isoyearweek" = "TEXT",
 86 |       "season" = "TEXT",
 87 |       "seasonweek" = "DOUBLE",
 88 | 
 89 |       "calyear" = "INTEGER",
 90 |       "calmonth" = "INTEGER",
 91 |       "calyearmonth" = "TEXT",
 92 | 
 93 |       "temp_max" = "DOUBLE",
 94 |       "temp_min" = "DOUBLE",
 95 |       "precip" = "DOUBLE"
 96 |     ),
 97 |     keys = c(
 98 |       "granularity_time",
 99 |       "location_code",
100 |       "date",
101 |       "age",
102 |       "sex"
103 |     ),
104 |     censors = list(
105 |       anon = list(
106 | 
107 |       )
108 |     ),
109 |     validator_field_types = sc::validator_field_types_sykdomspulsen,
110 |     validator_field_contents = sc::validator_field_contents_sykdomspulsen,
111 |     info = "This db table is used for..."
112 |   )

This schema has already been created by the previous task weather_clean_data.

2. Task definition (task_from_config)

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/04_tasks.r#L72-L100

 72 |   ## > weather_clean_data ----
 73 |   # tm_run_task("weather_export_plots")
 74 |   sc::add_task_from_config_v8(
 75 |     name_grouping = "weather",
 76 |     name_action = "export_plots",
 77 |     name_variant = NULL,
 78 |     cores = 1,
 79 |     plan_analysis_fn_name = NULL,
 80 |     for_each_plan = plnr::expand_list(
 81 |       location_code = fhidata::norway_locations_names()[granularity_geo %in% c("county")]$location_code
 82 |     ),
 83 |     for_each_analysis = NULL,
 84 |     universal_argset = list(
 85 |       output_dir = tempdir(),
 86 |       output_filename = "weather_{argset$location_code}.png",
 87 |       output_absolute_path = fs::path("{argset$output_dir}", "{argset$output_filename}")
 88 |     ),
 89 |     upsert_at_end_of_each_plan = FALSE,
 90 |     insert_at_end_of_each_plan = FALSE,
 91 |     action_fn_name = "scskeleton::weather_export_plots_action",
 92 |     data_selector_fn_name = "scskeleton::weather_export_plots_data_selector",
 93 |     schema = list(
 94 |       # input
 95 |       "anon_example_weather_data" = sc::config$schemas$anon_example_weather_data
 96 | 
 97 |       # output
 98 |     ),
 99 |     info = "This task ploduces plots"
100 |   )

Plan/analysis structure

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/04_tasks.r#L79-L83

79 |     plan_analysis_fn_name = NULL,
80 |     for_each_plan = plnr::expand_list(
81 |       location_code = fhidata::norway_locations_names()[granularity_geo %in% c("county")]$location_code
82 |     ),
83 |     for_each_analysis = NULL,

Here we choose a plan-heavy approach (11 plans, 1 analysis per plan) to minimize the amount of data loaded into RAM at any point in time.

Universal argset

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/04_tasks.r#L84-L88

84 |     universal_argset = list(
85 |       output_dir = tempdir(),
86 |       output_filename = "weather_{argset$location_code}.png",
87 |       output_absolute_path = fs::path("{argset$output_dir}", "{argset$output_filename}")
88 |     ),

The benefits of placing the output directories and filenames in the task declaration are:

3. data_selector_fn

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_export_plots.r#L45-L110

 45 | # **** data_selector **** ----
 46 | #' weather_export_plots (data selector)
 47 | #' @param argset Argset
 48 | #' @param schema DB Schema
 49 | #' @export
 50 | weather_export_plots_data_selector = function(argset, schema){
 51 |   if(plnr::is_run_directly()){
 52 |     # sc::tm_get_plans_argsets_as_dt("weather_export_plots")
 53 | 
 54 |     index_plan <- 1
 55 | 
 56 |     argset <- sc::tm_get_argset("weather_export_plots", index_plan = index_plan)
 57 |     schema <- sc::tm_get_schema("weather_export_plots")
 58 |   }
 59 | 
 60 |   # The database schemas can be accessed here
 61 |   d <- schema$anon_example_weather_data$tbl() %>%
 62 |     sc::mandatory_db_filter(
 63 |       granularity_time = NULL,
 64 |       granularity_time_not = NULL,
 65 |       granularity_geo = NULL,
 66 |       granularity_geo_not = NULL,
 67 |       country_iso3 = NULL,
 68 |       location_code = argset$location_code,
 69 |       age = NULL,
 70 |       age_not = NULL,
 71 |       sex = NULL,
 72 |       sex_not = NULL
 73 |     ) %>%
 74 |     dplyr::select(
 75 |       # granularity_time,
 76 |       # granularity_geo,
 77 |       # country_iso3,
 78 |       # location_code,
 79 |       # border,
 80 |       # age,
 81 |       # sex,
 82 | 
 83 |       date,
 84 | 
 85 |       # isoyear,
 86 |       # isoweek,
 87 |       # isoyearweek,
 88 |       # season,
 89 |       # seasonweek,
 90 |       #
 91 |       # calyear,
 92 |       # calmonth,
 93 |       # calyearmonth,
 94 | 
 95 |       temp_max,
 96 |       temp_min
 97 |     ) %>%
 98 |     dplyr::collect() %>%
 99 |     as.data.table() %>%
100 |     setorder(
101 |       # location_code,
102 |       date
103 |     )
104 | 
105 |   # The variable returned must be a named list
106 |   retval <- list(
107 |     "data" = d
108 |   )
109 |   retval
110 | }

4. action_fn

https://github.com/folkehelseinstituttet/scskeleton/blob/main/R/weather_export_plots.r#L1-L43

 1 | # **** action **** ----
 2 | #' weather_export_plots (action)
 3 | #' @param data Data
 4 | #' @param argset Argset
 5 | #' @param schema DB Schema
 6 | #' @export
 7 | weather_export_plots_action <- function(data, argset, schema) {
 8 |   # tm_run_task("weather_export_plots")
 9 | 
10 |   if(plnr::is_run_directly()){
11 |     # sc::tm_get_plans_argsets_as_dt("weather_export_plots")
12 | 
13 |     index_plan <- 1
14 |     index_analysis <- 1
15 | 
16 |     data <- sc::tm_get_data("weather_export_plots", index_plan = index_plan)
17 |     argset <- sc::tm_get_argset("weather_export_plots", index_plan = index_plan, index_analysis = index_analysis)
18 |     schema <- sc::tm_get_schema("weather_export_plots")
19 |   }
20 | 
21 |   # code goes here
22 |   # special case that runs before everything
23 |   if(argset$first_analysis == TRUE){
24 | 
25 |   }
26 | 
27 |   # create the output_dir (if it doesn't exist)
28 |   fs::dir_create(glue::glue(argset$output_dir))
29 | 
30 |   q <- ggplot(data$data, aes(x = date, ymin = temp_min, ymax = temp_max))
31 |   q <- q + geom_ribbon(alpha = 0.5)
32 | 
33 |   ggsave(
34 |     filename = glue::glue(argset$output_absolute_path),
35 |     plot = q
36 |   )
37 | 
38 |   # special case that runs after everything
39 |   # copy to anon_web?
40 |   if(argset$last_analysis == TRUE){
41 | 
42 |   }
43 | }

What now?

After Tutorial 1, we expect that you understand the four fundamental parts of developing a task:

  1. Schemas
  2. Task definition (task_from_config)
  3. data_selector_fn
  4. action_fn

We also expect that you can:

  1. Run a task using tm_run_task
  2. Use sc::tm_get_plans_argsets_as_dt to identify which index_plan and index_analysis corresponds to the plan/analysis you are interested in (e.g. Oslo)
  3. Run the inside code of a data_selector_fn for different index_plans as if it were an interactive script
  4. Run the inside code of an action_fn for different index_plans and index_analysiss as if it were an interactive script

Tutorial 2 will challenge you to start creating your own tasks to solve problems.

Changelog

2021-07-15: Draft created.

Corrections

If you see mistakes or want to suggest changes, please create an issue on the source repository.

Reuse

Text and figures are licensed under Creative Commons Attribution CC BY 4.0. Source code is available at https://github.com/folkehelseinstituttet/sykdomspulsen-dokumentasjon, unless otherwise noted. The figures that have been reused from other sources don't fall under this license and can be recognized by a note in their caption: "Figure from ...".