Streaming Data From Database to Shiny
First, a script that simulate data streaming. It rewrites the table every p
second. Here we set it to 1.
library(config)
library(DBI)
library(dplyr)
library(purrr)
library(tryCatchLog)
db <- config::get(config = "some-mysql")
# local Docker container
conn <- dbConnect(
drv = RMySQL::MySQL(),
username = db$username,
password = db$password,
host = db$host,
port = db$port,
dbname = db$dbname
)
stopifnot(DBI::dbRemoveTable(conn, "int_stream"))
generate <- function() {
data.frame(ind = LETTERS,
int = round(rnorm(
length(LETTERS), mean = 10, sd = 2
)))
}
insert_to_db <- function(p = 1) {
stopifnot(dbWriteTable(conn, "int_stream", generate(), overwrite = TRUE))
message(paste("Success:", Sys.time()))
Sys.sleep(p)
}
replicate(60, insert_to_db())
DBI::dbDisconnect(conn)
Here we create a Shiny app. It consists of a reactive
object that schedules to be invalid after a given time and then auto updates itself.
library(shiny)
library(dplyr)
library(ggplot2)
library(pool)
library(config)
# Setup ----
db <- config::get(config = "some-mysql")
# pooling object
pool <- dbPool(
drv = RMySQL::MySQL(),
username = db$username,
password = db$password,
host = db$host,
port = db$port,
dbname = db$dbname
)
# close connection upon shutting down app
onStop(function() {
poolClose(pool)
})
# App ----
ui <- fluidPage(
column(4, tableOutput("tbl")),
column(8, plotOutput("plot", width = "100%", height = "800px"))
)
server <- function(input, output, session) {
tbl_data <- reactive({
# in miliseconds
invalidateLater(1000)
pool %>%
tbl("int_stream") %>%
group_by(group) %>%
summarise(max = max(int, na.rm = TRUE)) %>%
collect()
})
output$tbl <- renderTable({
tbl_data()
})
output$plot <- renderPlot({
tbl_data() %>%
ggplot(aes(group, max)) +
geom_bar(stat = "identity") +
theme_bw()
})
}
shinyApp(ui, server)