How to make it be a memory leak program(without lower function such as GC.xxx used)

Again and again, the memory occupation grows to 80%(about 12G) slowly as my program running.

My program is simple, as follows:

using Images
using HTTP
using libwebp_jll
using ImageView
using CSV
using ProgressMeter

function get_html(url)::Vector{UInt8}
    html = HTTP.get(url, 
        [("User-Agent", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 SE 2.X MetaSr 1.0")]; 
        connect_timeout = 30,
        connection_limit = 20,
        readtimeout = 3,
        ).body
end

function get_max_img(url)
    html = get_html(url) |> String
    imgs_t = [@async read_webimg(x.match) for x in eachmatch(r"(https?:[\S]*?.(jpg|jpeg|png|gif|bmp|bytes)(?=\ ))", html)]
    append!(imgs_t, [@async read_webimg(complete(url, x.captures[1])) for x in eachmatch(r"""<img [^>]*?src *?= *?"(.*?)(?=")""", html)])
    # @show typeof(imgs_t[1])
    imgs = map(fetch, imgs_t)
    if length(imgs)>0
        sort!(imgs; by=x->prod(size(x)), rev=true)
        imgs[1]
    else
        zeros(RGB{N0f8}, (0,0))
    end
end

function main(i, url)
    try
        img = get_max_img(String(url))

        if prod(size(img)) > 100
            save("target_max_img/$(i).jpg", img)
        end
    catch e
        @show e
    end
    i
end
     
data = [...]

task_running = Dict{Int, Task}()

for (i, d) in data
    task_running[i] = @async main(i, d.url)
    next!(p)
    while length(task_running) > 100
        yield()
        filter!(x->!(istaskdone(x[2])||istaskfailed(x[2])), task_running)
        println(length(task_running))
    end
end

Can anyone tell me where is the bug?

Is there a way to create a memory-leak program, with language with GC?

It looks like you’re downloading lots of images from lots of pages, and have them in memory concurrently. That’s most likely the cause for high memory usage.
You can try printing the size of imgs after the map in get_max_img - like Base.summarysize(imgs) - and see if those add up to the gigabytes of memory usage you see.

Instead of getting all the images and sorting them, you can fetch them one at a time, compare them to the current max_img’s size (initializing max_img to zeros(RGB{N0f8}, (0,0)) first), and set max_img = current_img if the current image has a larger size. This way, you don’t need to have more than one image (per main call) in memory at a time. That should help reduce the memory usage a lot.

Something like

function get_max_img(url)
    html = get_html(url) |> String
    imgs_t = [@async read_webimg(x.match) for x in eachmatch(r"(https?:[\S]*?.(jpg|jpeg|png|gif|bmp|bytes)(?=\ ))", html)]
    append!(imgs_t, [@async read_webimg(complete(url, x.captures[1])) for x in eachmatch(r"""<img [^>]*?src *?= *?"(.*?)(?=")""", html)])
    max_img = zeros(RGB{N0f8}, (0,0))
    for img in Iterators.map(fetch, imgs_t)
         if length(img) > length(max_img)
             max_img = img
         end
    end
    max_img
end

Note: untested code.

length(x) is equivalent here to size(prod(x)) btw.

Thanks a lot!
Seems correct. There are too many images opened in memory. And they are in memory concurrently!

Thanks again.

I should get the size of them one by one to reduce memory consume.

I removed the code decoding images in memory, and replace prod(size(img)) with parse(Int64, Dict(reply.headers)["Content-Length"]), and replace sort!(...) with findmax(...), and replace each @async with Threads.@spawn too.

It works at the beginning, about 3G memory occupied, but, 11G memory occupied again by now.

Notes: I reduced the argument 100 in if prod(size(img)) > 100 to 30.

Let every page have 100 images, each image with size 1M, the memory size should not be larger than 4G, right?

Could you show the current version of your code? Including that of read_webimg.

# using Images
using HTTP
# using libwebp_jll
# using ImageView
using CSV
using ProgressMeter

function get_html(url)
    r = HTTP.get(url, 
        [("User-Agent", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 SE 2.X MetaSr 1.0")]; 
        connect_timeout = 5,
        connection_limit = 1000,
        readtimeout = 3,
        status_exception = false
        )
    @assert isa(r.body, Vector{UInt8}) "body is not Vector{UInt8}!!!\n"

    r
end

function webp_decode(bytes::Vector{UInt8})::Matrix{RGB{N0f8}}
    height = [0]
    width = [0]
    ccall((:WebPGetInfo, "libwebp"), Int, (Ptr{UInt8}, Csize_t, Ptr{Int}, Ptr{Int}), Base.unsafe_convert(Ptr{UInt8},bytes), length(bytes), Ptr{Int}(Base.unsafe_convert(Ptr{Int64}, width)), Ptr{Int}(Base.unsafe_convert(Ptr{Int64}, height)))
    x = ccall((:WebPDecodeRGB, "libwebp"), Ptr{UInt8}, (Ptr{UInt8}, Csize_t, Ptr{Int}, Ptr{Int}), Base.unsafe_convert(Ptr{UInt8},bytes), length(bytes), Ptr{Int}(Base.unsafe_convert(Ptr{Int64}, width)), Ptr{Int}(Base.unsafe_convert(Ptr{Int64}, height)))
    img = [RGB(reinterpret(N0f8,unsafe_load(x, 3*(width[1]*i+j)+1)),reinterpret(N0f8,unsafe_load(x, 3*(width[1]*i+j)+2)),reinterpret(N0f8,unsafe_load(x, 3*(width[1]*i+j)+3)),) for (i,j) in Iterators.product(0:height[1]-1, 0:width[1]-1)];
end

# function open_webp(fpath::String)::Matrix{RGB{N0f8}}
#     bytes = read(fpath);
#     webp_decode(bytes)
# end

function read_webimg(url::AbstractString, io::IO, iolock) #::Matrix{RGB{N0f8}}
    global lk
    r = get_html(url)
    headers = Dict(r.headers)
    type = headers["Content-Type"]
    content_len = haskey(headers, "Content-Length") ? parse(Int64, headers["Content-Length"]) : 0
    # @assert r.status == 200 "r.status != 200 $(url)"
    # @assert startswith(type, "image/") """startswith(type, "image/") is false"""

    if r.status == 200 && startswith(type, "image/")
        type = split(type, "/")[end]
        bts = r.body
        # img = type == "webp" ? webp_decode(bts) : load(bts |> IOBuffer)
        # @assert size(img)[1] in 200:1600 && size(img)[2] in 200:1600 && max(size(img)...) /min(size(img)...) < 3 "图片大小不符合要求 $(size(img))"
        # if size(img)[1] in 200:1600 && size(img)[2] in 200:1600 && max(size(img)...) /min(size(img)...) < 3
            return content_len, type, bts
        # end
    end
    while !trylock(iolock)
        sleep(0.001)
    end
    println(io, "\t", url)
    unlock(iolock)
    0, "", Vector{UInt8}()
end

complete(url_main, url) = startswith(url, "http") ? url : (startswith(url, "//") ? "https:"*url : match(r"https?:\/\/.*?(?=\/)", url_main).match * url)

function main(i, url)
    global lk
    global io

    buflock = ReentrantLock()

    buf = IOBuffer()
    println(buf, url)
    try
        html = get_html(String(url))
        if html.status == 200
            html = html.body |> String

            imgs_t = [Threads.@spawn read_webimg(x.match, buf, buflock) for x in eachmatch(r"(https?:[\S]*?.(jpg|jpeg|png|gif|bmp|bytes)(?=\ ))", html)]
            append!(imgs_t, [Threads.@spawn read_webimg(complete(url, x.captures[1]), buf, buflock) for x in eachmatch(r"""<img [^>]*?src *?= *?"(.*?)(?=")""", html)])
            
            imgs = map(fetch, imgs_t)
            
            (sz, index) = length(imgs) > 0 ? findmax(x->x[1], imgs) : (0, 0)
        
            if sz>2000
                res = imgs[index]
                open("""target_max_img/$(i).$(res[2])""", "w") do fo
                    write(fo, res[3])
                end
            else

            end
        end
    catch e
        println(buf, "\t", e)
    end
    while !trylock(lk)
        sleep(0.01)
    end
    println(io, buf.data |> String)
    flush(io)
    unlock(lk)
end

io = open("log_down_webp", "w")

lk = ReentrantLock()
# save("test.jpg", read_webimg("https://www.thisisnotdietfood.com/wp-content/uploads/2019/06/baconcheeseburgergrilledcheesecasserole-13-min-min.jpg"))
csv_data = CSV.Rows("origin_data.csv")
urls_todo = enumerate(csv_data)

# main(1, urls_todo[1][2].targetUrl)
len = 0
for (i,row) in urls_todo 
    global len = i 
end

p = Progress(len)

task_running = Dict{Int, Task}()

for (i, row) in urls_todo
    # main(i, row.targetUrl)
    task_running[i] = Threads.@spawn main(i, row.targetUrl)
    next!(p)
    while length(task_running) > 30
        for (k, t) in task_running
            if istaskfailed(t)
                @show fetch(t)
            end
        end
        # yield()
        sleep(1)
        filter!(x->!(istaskdone(x[2])||istaskfailed(x[2])), task_running)
        # println(length(task_running))
    end
end

close(io)

Copy.

Terminate, by os, during running.

The memory leak still!
I tried with Channel instead with @async for each, and got rid of download images, just get from url and judge the response status compare with 200.

So, the bug generate from HTTP.

using HTTP
using CSV
using ProgressMeter
using DataFrames

function can_open(cin::Channel, outputfile::String, df::DataFrame)
    while isopen(cin)
        id, d = take!(cin)
        try
            if length(df[!,"output"]) > 10
                CSV.write(outputfile, df; append=true)
                empty!(df)
            end


            r = HTTP.get(String(d.targetUrl), 
                [("User-Agent", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 SE 2.X MetaSr 1.0")]; 
                connect_timeout = 10,
                connection_limit = 100,
                readtimeout = 5,
                status_exception = false
                )
            if r.status == 200
                push!(df, [id, d.title1, d.title2, d.productName, d.targetUrl, d.output])
            end
            
        catch e
            @show e
        end
    end
end

ordered_data = enumerate(CSV.File("origin_data.csv"))
len = 0
for (i,row) in ordered_data 
    global len = i 
end
@show len

cin = Channel(1000)
df = DataFrame("idex"=>[], "title1"=>[], "title2"=>[], "productName"=>[], "targetUrl"=>[],"output"=>[])
CSV.write("available.csv", df; append=false)

for i in 1:100
    @async can_open(cin, "available.csv", df)
end

p = Progress(len)
for (i, d) in ordered_data
    put!(cin, (i,d))
    next!(p)
end

while !isempty(cin)
    sleep(1)
end

close(cin)

Memory occupy(the program) is about 1.6G, after replace r = HTTP.get(...); if r.status == 200 with if true.

So, it is caused by HTTP? Other wise, try catch?

I think I’v found the problem.
Version 1: create Task for each.
Version 2: not lock the ops on global var and io.