a conformal LSTM classifier to detect anomalies: and here is the code can somebody help me with the code correction I am new to julia:
Install required packages
using Pkg
Pkg.add([“Flux”, “ConformalPrediction”, “DataFrames”, “CSV”, “Plots”, “Statistics”, “StatsPlots”, “Random”])
Import libraries
using Flux, ConformalPrediction, DataFrames, CSV, Plots, Statistics, Random
Load the dataset
df = CSV.read(“C:/Users/unnat/OneDrive/Desktop/Individual project/Dataset/ec2_cpu_utilization_24ae8d.csv”, DataFrame)
Convert timestamps
using Dates
df[!, “timestamp”] = DateTime.(df[!, “timestamp”], dateformat"yyyy-mm-dd HH:MM:SS")
Define anomalies (based on dataset reference)
anomalies_timestamp = [
“2014-02-26 22:05:00”,
“2014-02-27 17:15:00”
]
Assign labels: -1 (anomaly), 1 (normal)
df[!, “is_anomaly”] .= 0
for each in anomalies_timestamp
df[df[!, “timestamp”] .== each, “is_anomaly”] .= 1
end
Normalize the data
X = df.value
X_normalized = (X .- mean(X)) ./ std(X)
y = df.is_anomaly
Split dataset: Train (80%), Calibration (10%), Test (10%)
n = length(X)
n_train = floor(Int, 0.8n)
n_calib = floor(Int, 0.1n)
X_train = X_normalized[1:n_train]
y_train = y[1:n_train]
X_calib = X_normalized[n_train+1:n_train+n_calib]
y_calib = y[n_train+1:n_train+n_calib]
X_test = X_normalized[n_train+n_calib+1:end]
y_test = y[n_train+n_calib+1:end]
Prepare sequences for LSTM (window size = 10)
window_size = 10
function create_sequences(data, window_size)
sequences =
for i in 1:length(data)-window_size
push!(sequences, data[i:i+window_size-1])
end
return sequences
end
X_train_seq = create_sequences(X_train, window_size)
X_calib_seq = create_sequences(X_calib, window_size)
X_test_seq = create_sequences(X_test, window_size)
Convert to Flux format
X_train_flux = [reshape(x, 1, window_size, 1) for x in X_train_seq]
X_calib_flux = [reshape(x, 1, window_size, 1) for x in X_calib_seq]
X_test_flux = [reshape(x, 1, window_size, 1) for x in X_test_seq]
Prepare labels (align with sequences)
y_train_seq = y_train[window_size+1:end]
y_calib_seq = y_calib[window_size+1:end]
y_test_seq = y_test[window_size+1:end]
Define LSTM model
Add proper initialization and dropout
model = Chain(
LSTM(1 => 32, init=Flux.glorot_uniform),
Dropout(0.3),
LSTM(32 => 16, init=Flux.glorot_uniform),
Dense(16 => 1, sigmoid)
)
Reduce learning rate
opt = Adam(0.0001)
Loss function
Weight anomalies 10x more than normal points
function weighted_binary_crossentropy(y_pred, y_true, weights)
return -mean(weights[1] * y_true .* log.(y_pred .+ eps()) .+ weights[2] * (1 .- y_true) .* log.(1 .- y_pred .+ eps()))
end
loss(x, y) = weighted_binary_crossentropy(model(x), y, [10.0f0, 1.0f0])
Optimizer
using Flux: Adam, params
opt = Adam(0.001)
Training function
function train!(model, data, labels, opt)
for (x, y) in zip(data, labels)
gs = gradient(() → loss(x, y), params(model))
Flux.update!(opt, params(model), gs)
end
end
Train the model
epochs = 50
for epoch in 1:epochs
Flux.train!(loss, params(model), zip(X_train_flux, y_train_seq), opt)
train_loss = loss(X_train_flux[1], y_train_seq[1])
println("Epoch $epoch - Loss: ", train_loss)
end
Compute nonconformity scores for calibration (absolute residuals)
Compute calibration scores
calib_scores = [y == 1 ? 1 - model(x)[1] : model(x)[1] for (x, y) in zip(X_calib_flux, y_calib_seq)]
Compute test scores
test_scores = [y == 1 ? 1 - model(x)[1] : model(x)[1] for (x, y) in zip(X_test_flux, y_test_seq)]
Compute p-values for test set
function compute_p_value(score, calib_scores)
return sum(calib_scores .>= score) / length(calib_scores)
end
p_values = [compute_p_value(s, calib_scores) for s in test_scores]
Set confidence level (alpha = 0.05 for 95% confidence)
alpha = 0.05
threshold = quantile(calib_scores, 1 - alpha)
Predict anomalies (p-value < alpha)
y_pred = [p < alpha ? -1 : 1 for p in p_values]
Confusion Matrix
function confusion_matrix(y_true, y_pred)
tp = fp = fn = tn = 0
for (t, p) in zip(y_true, y_pred)
if t == -1 && p == -1
tp += 1
elseif t == 1 && p == -1
fp += 1
elseif t == -1 && p == 1
fn += 1
else
tn += 1
end
end
return (tp=tp, fp=fp, fn=fn, tn=tn)
end
cm = confusion_matrix(y_test_seq, y_pred)
Print Confusion Matrix
println(“”"
Confusion Matrix:
True Positives (Anomalies): (cm.tp)
False Positives: (cm.fp)
False Negatives: (cm.fn)
True Negatives: (cm.tn)
“”")
Calculate Metrics
tp, fp, fn, tn = cm.tp, cm.fp, cm.fn, cm.tn
prec = tp / (tp + fp + eps())
rec = tp / (tp + fn + eps())
f1 = 2 * (prec * rec) / (prec + rec + eps())
fpr = fp / (fp + tn + eps())
println(“\nAnomaly Detection Metrics:”)
println("Precision: ", round(prec, digits=4))
println("Recall: ", round(rec, digits=4))
println("F1-Score: ", round(f1, digits=4))
println("FPR: ", round(fpr, digits=4))
Plot test scores distribution
using StatsPlots
histogram(test_scores, bins=20, label=“Model Scores”, color=:blue)
vline!([threshold], label=“Conformal Threshold”, color=:red)