I’m looking into getting data from influxdb. I found this package but it seems abandoned. Is this functionality added somewhere else or this is all there is?
Would you consider using the Python client library (Use the InfluxDB Python client library | InfluxDB OSS 2.2 Documentation) with PythonCall (GitHub - cjdoris/PythonCall.jl: Python and Julia in harmony.)? Or you can use the R client library (GitHub - influxdata/influxdb-client-r: InfluxDB (v2+) Client R Package) with RCall (Getting Started · RCall.jl).
Since I only need to read data, I copy-pasted code from the package and tweaked it a bit and it works
So I guess the answer to my question is that this package is all there is for now, right?
Glad to hear. Looks like that’s all there is (maybe it’s all you need?); there is also this other personal repo GitHub - tallakt/InfluxFlux, a one-file module.
Are you able to share the minimal code you used to access an influxdb database? I’ll be looking to doing this after I’ve finished working through PostgreSQL databases.
Sure. But use it with caution!
using JSON, HTTP
using HTTP.URIs: URI
import Base: write
using MacroTools
using DataFrames
using Dates
struct InfluxServer
endpoint::URI
username::Union{String,Nothing}
password::Union{String,Nothing}
function InfluxServer(address::AbstractString; username=nothing, password=nothing)
if isnothing(match(r"^https?://", address))
address = "http://$address"
end
uri = URI(address)
if isempty(uri.port)
uri = Base.merge(uri, port=8086)
end
return new(uri, username, password)
end
end
struct Query
verb::String
fields::Vector{String}
tags::Vector{String}
measurements::Vector{String}
condition::Union{String,Nothing}
order::Union{String,Nothing}
limit::Union{Int,Nothing}
end
function authenticate!(server::InfluxServer, query::Dict)
if server.username != nothing && server.password != nothing
query["u"] = server.username
query["p"] = server.password
end
end
function query(server::InfluxServer, query_data::Dict; type::Symbol = :get)
authenticate!(server, query_data)
if type == :get
response = HTTP.get(string(server.endpoint, "/query"); query=query_data)
else
response = HTTP.post(string(server.endpoint, "/query"); query=query_data)
end
response_data = String(response.body)
if response.status != 200
error(response_data)
end
response = JSON.parse(response_data)
if !haskey(response, "results") || !haskey(response["results"][1], "series")
return
end
function series_df(series_dict)
df = DataFrame()
for name_idx in 1:length(series_dict["columns"])
col = Symbol(series_dict["columns"][name_idx])
df[!, col] = [x[name_idx] for x in series_dict["values"]]
end
return df
end
return series_df.(response["results"][1]["series"])
end
query(server::InfluxServer, data::AbstractString; kwargs...) = query(server, Dict("q" => data); kwargs...)
query(server::InfluxServer, db::AbstractString, q::Query) = query(server, Dict("q" => payload(q), "db" => db))
querify(e) = string(e)
querify(e::Symbol) = string("\"", e, "\"")
querify(e::AbstractString) = string("'", e, "'")
querify(e::QuoteNode) = querify(e.value)
function querify(e::Expr)
args = e.args
head = e.head
# Peel :call and :macrocall nodes first
if head == :call
head = args[1]
args = args[2:end]
elseif head == :macrocall && length(args) == 3 && args[2] == nothing
head = args[1]
args = args[3:end]
end
if head == :&&
head = "AND"
elseif head == :||
head = "OR"
elseif head == Symbol("==")
head = "="
elseif head == :~
head = "=~"
elseif head == Symbol("@r_str")
return string("/", args[1], "/")
else
head = string(head)
end
return string(querify(args[1]), " ", head, " ", querify(args[2]))
end
macro querify(ex)
return querify(MacroTools.striplines(ex))
end
function Query(verb::String; fields::Vector = String[],
tags::Vector = String[],
measurements::Vector = String[],
condition::Union{String,Nothing} = nothing,
order::Union{String,Nothing} = nothing,
limit::Union{Int,Nothing} = nothing)
if isempty(fields) && !isempty(tags)
@warn(
"InfluxDB has strange behavior when 'fields' is empty but 'tags' is not!",
"https://docs.influxdata.com/influxdb/v1.7/query_language/data_exploration/#selecting-tag-keys-in-the-select-clause",
)
end
return Query(
verb,
String.(fields),
String.(tags),
String.(measurements),
condition,
order,
limit,
)
end
SELECT(;kwargs...) = Query("SELECT"; kwargs...)
function join_typed(io, v::Vector, type_name, delim = ",")
if isempty(v)
return
end
write(io, "\"", v[1], "\"", "::", type_name)
for idx in 2:length(v)
write(io, delim, "\"", v[idx], "\"", "::", type_name)
end
end
function payload(q::Query)
buff = IOBuffer()
payload!(buff, q)
return String(take!(buff))
end
function payload!(io::IO, q::Query)
write(io, q.verb, " ")
if isempty(q.fields)
write(io, "*")
else
join_typed(io, q.fields, "field")
if !isempty(q.tags)
write(io, ",")
join_typed(io, q.tags, "tag")
end
end
if !isempty(q.measurements)
write(io, " FROM ")
join(io, [string("\"", m, "\"") for m in q.measurements], ",")
end
if q.condition != nothing
write(io, " WHERE ")
write(io, q.condition)
end
if q.order != nothing
write(io, " order by ")
write(io, q.order)
end
if q.limit != nothing
write(io, " LIMIT ", string(q.limit))
end
end
function list_measurements(server::InfluxServer, db::AbstractString)
return query(server, "SHOW MEASUREMENTS ON $db")
end
After that, I do:
q = Query("SELECT", measurements=["measurement_name"], order="time desc", limit=10000)
s = InfluxServer("localhost")
df = query(s, "db_name", q)[1]
Read through the definition of Query
to see how to construct more complicated queries.
I hope it helps!
I know I’m late replying but I did note this when posted. Your response is much appreciated! I’ll use this when I do get to working on influxdb use.
It looks like this is client code using the SQL-like query protocol used in InfluxDB v1.8, is that right?
InfluxDB v2.0+ uses a much different query protocol.
I drafted this pkg for Influxdb V2.
It is in a relatively rough state. I fulfills my current needs though.