--- title: "mirai - Plumber Integration" vignette: > %\VignetteIndexEntry{mirai - Plumber Integration} %\VignetteEngine{litedown::vignette} %\VignetteEncoding{UTF-8} --- ### Plumber Integration `mirai` may be used as an asynchronous backend for [`plumber`](https://www.rplumber.io/) pipelines. Example usage is provided below for different types of endpoint. #### Example GET Endpoint The plumber router code is run in a daemon process itself so that it does not block the interactive process. The /echo endpoint takes a GET request, sleeps for 1 second (simulating an expensive computation) and simply returns the 'msg' request header together with a timestamp and the process ID of the process it is run on. ``` r library(mirai) # supply SIGINT so the plumber server is interrupted and exits cleanly when finished daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT) #> [1] 1 m <- mirai({ library(plumber) library(promises) # to provide the promise pipe library(mirai) # more efficient not to use dispatcher if all requests are similar length daemons(4L, dispatcher = FALSE) # handles 4 requests simultaneously pr() |> pr_get( "/echo", function(req, res) { mirai( { Sys.sleep(1L) list( status = 200L, body = list( time = format(Sys.time()), msg = msg, pid = Sys.getpid() ) ) }, msg = req[["HEADERS"]][["msg"]] ) %...>% (function(x) { res$status <- x$status res$body <- x$body }) } ) |> pr_run(host = "127.0.0.1", port = 8985) }) ``` The API can be queried using an async HTTP client such as `nanonext::ncurl_aio()`. Here, all 8 requests are submitted at once, but we note that that responses have differing timestamps as only 4 can be processed at any one time (limited by the number of daemons set). ``` r library(nanonext) res <- lapply( 1:8, function(i) ncurl_aio( "http://127.0.0.1:8985/echo", headers = c(msg = as.character(i)) ) ) collect_aio(res) #> [[1]] #> [1] "{\"time\":[\"2024-08-20 20:02:05\"],\"msg\":[\"1\"],\"pid\":[6735]}" #> #> [[2]] #> [1] "{\"time\":[\"2024-08-20 20:02:05\"],\"msg\":[\"2\"],\"pid\":[6733]}" #> #> [[3]] #> [1] "{\"time\":[\"2024-08-20 20:02:05\"],\"msg\":[\"3\"],\"pid\":[6738]}" #> #> [[4]] #> [1] "{\"time\":[\"2024-08-20 20:02:05\"],\"msg\":[\"4\"],\"pid\":[6741]}" #> #> [[5]] #> [1] "{\"time\":[\"2024-08-20 20:02:06\"],\"msg\":[\"5\"],\"pid\":[6733]}" #> #> [[6]] #> [1] "{\"time\":[\"2024-08-20 20:02:06\"],\"msg\":[\"6\"],\"pid\":[6735]}" #> #> [[7]] #> [1] "{\"time\":[\"2024-08-20 20:02:06\"],\"msg\":[\"7\"],\"pid\":[6738]}" #> #> [[8]] #> [1] "{\"time\":[\"2024-08-20 20:02:06\"],\"msg\":[\"8\"],\"pid\":[6741]}" daemons(0) #> [1] 0 ``` #### Example POST Endpoint Below is a demonstration of the equivalent using a POST endpoint, accepting a JSON instruction sent as request data. Note that `req$postBody` should always be accessed in the router process and passed in as an argument to the 'mirai', as this is retrieved using a connection that is not serializable. ``` r library(mirai) # supply SIGINT so the plumber server is interrupted and exits cleanly when finished daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT) #> [1] 1 m <- mirai({ library(plumber) library(promises) # to provide the promise pipe library(mirai) # uses dispatcher - suitable when requests take differing times to complete daemons(4L, dispatcher = TRUE) # handles 4 requests simultaneously pr() |> pr_post( "/echo", function(req, res) { mirai( { Sys.sleep(1L) # simulate expensive computation list( status = 200L, body = list( time = format(Sys.time()), msg = jsonlite::fromJSON(data)[["msg"]], pid = Sys.getpid() ) ) }, data = req$postBody ) %...>% (function(x) { res$status <- x$status res$body <- x$body }) } ) |> pr_run(host = "127.0.0.1", port = 8986) }) ``` Querying the endpoint produces the same set of outputs as the previous example. ``` r library(nanonext) res <- lapply( 1:8, function(i) ncurl_aio( "http://127.0.0.1:8986/echo", method = "POST", data = sprintf('{"msg":"%d"}', i) ) ) collect_aio(res) #> [[1]] #> [1] "{\"time\":[\"2024-08-20 20:02:09\"],\"msg\":[\"1\"],\"pid\":[7002]}" #> #> [[2]] #> [1] "{\"time\":[\"2024-08-20 20:02:09\"],\"msg\":[\"2\"],\"pid\":[7004]}" #> #> [[3]] #> [1] "{\"time\":[\"2024-08-20 20:02:10\"],\"msg\":[\"3\"],\"pid\":[7004]}" #> #> [[4]] #> [1] "{\"time\":[\"2024-08-20 20:02:09\"],\"msg\":[\"4\"],\"pid\":[7007]}" #> #> [[5]] #> [1] "{\"time\":[\"2024-08-20 20:02:10\"],\"msg\":[\"5\"],\"pid\":[7010]}" #> #> [[6]] #> [1] "{\"time\":[\"2024-08-20 20:02:09\"],\"msg\":[\"6\"],\"pid\":[7010]}" #> #> [[7]] #> [1] "{\"time\":[\"2024-08-20 20:02:10\"],\"msg\":[\"7\"],\"pid\":[7002]}" #> #> [[8]] #> [1] "{\"time\":[\"2024-08-20 20:02:10\"],\"msg\":[\"8\"],\"pid\":[7007]}" daemons(0) #> [1] 0 ```