Influxdb client package

I’m looking into getting data from influxdb. I found this package but it seems abandoned. Is this functionality added somewhere else or this is all there is?

Would you consider using the Python client library (Use the InfluxDB Python client library | InfluxDB OSS 2.2 Documentation) with PythonCall (GitHub - cjdoris/PythonCall.jl: Python and Julia in harmony.)? Or you can use the R client library (GitHub - influxdata/influxdb-client-r: InfluxDB (v2+) Client R Package) with RCall (Getting Started · RCall.jl).

Since I only need to read data, I copy-pasted code from the package and tweaked it a bit and it works :slight_smile:

So I guess the answer to my question is that this package is all there is for now, right?

Glad to hear. Looks like that’s all there is (maybe it’s all you need?); there is also this other personal repo GitHub - tallakt/InfluxFlux, a one-file module.

1 Like

Are you able to share the minimal code you used to access an influxdb database? I’ll be looking to doing this after I’ve finished working through PostgreSQL databases.

Sure. But use it with caution!

using JSON, HTTP
using HTTP.URIs: URI
import Base: write
using MacroTools
using DataFrames
using Dates

struct InfluxServer
    endpoint::URI
    username::Union{String,Nothing}
    password::Union{String,Nothing}

    function InfluxServer(address::AbstractString; username=nothing, password=nothing)
        if isnothing(match(r"^https?://", address))
            address = "http://$address"
        end
        uri = URI(address)

        if isempty(uri.port)
            uri = Base.merge(uri, port=8086)
        end

        return new(uri, username, password)
    end
end

struct Query
    verb::String
    fields::Vector{String}
    tags::Vector{String}
    measurements::Vector{String}
    condition::Union{String,Nothing}
    order::Union{String,Nothing}
    limit::Union{Int,Nothing}
end

function authenticate!(server::InfluxServer, query::Dict)
    if server.username != nothing && server.password != nothing
        query["u"] = server.username
        query["p"] = server.password
    end
end

function query(server::InfluxServer, query_data::Dict; type::Symbol = :get)
    authenticate!(server, query_data)

    if type == :get
        response = HTTP.get(string(server.endpoint, "/query"); query=query_data)
    else
        response = HTTP.post(string(server.endpoint, "/query"); query=query_data)
    end
    
    response_data = String(response.body)
    if response.status != 200
        error(response_data)
    end
        
    response = JSON.parse(response_data)

    if !haskey(response, "results") || !haskey(response["results"][1], "series")
        return
    end
        
    function series_df(series_dict)
        df = DataFrame()
        for name_idx in 1:length(series_dict["columns"])
            col = Symbol(series_dict["columns"][name_idx])
            df[!, col] = [x[name_idx] for x in series_dict["values"]]
        end
        return df
    end
        
    return series_df.(response["results"][1]["series"])
end
    
query(server::InfluxServer, data::AbstractString; kwargs...) = query(server, Dict("q" => data); kwargs...)

query(server::InfluxServer, db::AbstractString, q::Query) = query(server, Dict("q" => payload(q), "db" => db))

querify(e) = string(e)
querify(e::Symbol) = string("\"", e, "\"")
querify(e::AbstractString) = string("'", e, "'")
querify(e::QuoteNode) = querify(e.value)
function querify(e::Expr)
    args = e.args
    head = e.head

    # Peel :call and :macrocall nodes first
    if head == :call
        head = args[1]
        args = args[2:end]
    elseif head == :macrocall && length(args) == 3 && args[2] == nothing
        head = args[1]
        args = args[3:end]
    end

    if head == :&&
        head = "AND"
    elseif head == :||
        head = "OR"
    elseif head == Symbol("==")
        head = "="
    elseif head == :~
        head = "=~"
    elseif head == Symbol("@r_str")
        return string("/", args[1], "/")
    else
        head = string(head)
    end

    return string(querify(args[1]), " ", head, " ", querify(args[2]))
end

macro querify(ex)
    return querify(MacroTools.striplines(ex))
end

function Query(verb::String; fields::Vector = String[],
                             tags::Vector = String[],
                             measurements::Vector = String[],
                             condition::Union{String,Nothing} = nothing,
                             order::Union{String,Nothing} = nothing,
                             limit::Union{Int,Nothing} = nothing)
    if isempty(fields) && !isempty(tags)
        @warn(
            "InfluxDB has strange behavior when 'fields' is empty but 'tags' is not!",
            "https://docs.influxdata.com/influxdb/v1.7/query_language/data_exploration/#selecting-tag-keys-in-the-select-clause",
        )
    end
    
    return Query(
        verb,
        String.(fields),
        String.(tags),
        String.(measurements),
        condition,
        order,
        limit,
    )
end

SELECT(;kwargs...) = Query("SELECT"; kwargs...)

function join_typed(io, v::Vector, type_name, delim = ",")
    if isempty(v)
        return
    end
    
    write(io, "\"", v[1], "\"", "::", type_name)
    for idx in 2:length(v)
        write(io, delim, "\"", v[idx], "\"", "::", type_name)
    end
end

function payload(q::Query)
    buff = IOBuffer()
    payload!(buff, q)
    return String(take!(buff))
end

function payload!(io::IO, q::Query)
    write(io, q.verb, " ")
    
    if isempty(q.fields)
        write(io, "*")
    else
        join_typed(io, q.fields, "field")
        if !isempty(q.tags)
            write(io, ",")
            join_typed(io, q.tags, "tag")
        end
    end
    
    if !isempty(q.measurements)
        write(io, " FROM ")
        join(io, [string("\"", m, "\"") for m in q.measurements], ",")
    end

    if q.condition != nothing
        write(io, " WHERE ")
        write(io, q.condition)
    end

    if q.order != nothing
        write(io, " order by ")
        write(io, q.order)
    end

    if q.limit != nothing
        write(io, " LIMIT ", string(q.limit))
    end
end

function list_measurements(server::InfluxServer, db::AbstractString)
    return query(server, "SHOW MEASUREMENTS ON $db")
end

After that, I do:

q = Query("SELECT", measurements=["measurement_name"], order="time desc", limit=10000)
s = InfluxServer("localhost")
df = query(s, "db_name", q)[1]

Read through the definition of Query to see how to construct more complicated queries.

I hope it helps!

1 Like

I know I’m late replying but I did note this when posted. Your response is much appreciated! I’ll use this when I do get to working on influxdb use.

1 Like

It looks like this is client code using the SQL-like query protocol used in InfluxDB v1.8, is that right?
InfluxDB v2.0+ uses a much different query protocol.

I drafted this pkg for Influxdb V2.
It is in a relatively rough state. I fulfills my current needs though.

3 Likes