Broadcasting over a Flux.DataLoader

Apologies for this question since I’ve had a lot of trouble producing a MWE (which I don’t have).

I’m working on an RNN using text data (each observation is a sentence - each word in each sentence is mapped to a word embedding). The problem is classification into six classes. The issue I am having I think relates to using the Flux.DataLoader type.

The basic setup is…

train_data = Flux.DataLoader((data=train, label=tlabels), batchsize=42, shuffle=true)

# train_data.data.data is 1x357, where each item in the vector is (vocab_size)x(sentence_length) and is a matrix of one-hot vectors
# train_data.data.label is 1x357, where each item in the vector is (6,) and is one-hot encoded

function two_layers(args)
    # Embed is a layer I defined to take one-hot vectors to word embeddings of length N=50
    scanner = Chain(Embed(args.vocab, args.embed_len, args.emb_table), LSTM(args.embed_len, args.N), LSTM(args.N, args.N))
    encoder = Dense(args.N, args.classes, identity)
	return scanner, encoder 
end

function model(x, scanner, encoder)
	state = scanner(x)[:,end]     
	reset!(scanner)                   
	encoder(state)                      
end 

ps = params(scanner, encoder)   

loss(x,y) = Flux.logitcrossentropy(model(x, scanner, encoder), y)

When I do:

	for (x,y) in train_data 
		gs = Flux.gradient(()->loss(x,y), ps)
		Flux.update!(opt, ps, gs)
	end 

I get the error:

ERROR: LoadError: MethodError: no method matching (::Flux.LSTMCell{Matrix{Float32}, Vector{Float32}, Tuple{Matrix{Float32}, Matrix{Float32}}})(::Tuple{Matrix{Float32}, Matrix{Float32}}, ::Matrix{Real})

I this error find confusing b/c I don’t know what ::Matrix{Real} is being referred to at the end of the error message.

I expected (x,y) to iterate over batches of data and labels (which printing some type information about x, y inside the loop suggests it is doing).

for (x,y) in train_data 
		@info "type of (x,y)" typeof((x,y))
		@info "type of x", typeof(x)
		@info "size x", size(x)
		@info "size x[1]", size(x[1])
		@info "typeof y", typeof(y)
		@info "size y", size(y)
		@info "size y[1]", size(y[1])
end

Info: type of (x,y)
└   typeof((x, y)) = Tuple{Matrix{Flux.OneHotArray{UInt32, 733, 1, 2, Vector{UInt32}}}, Matrix{Flux.OneHotArray{UInt32, 6, 0, 1, UInt32}}}
[ Info: ("type of x", Matrix{Flux.OneHotArray{UInt32, 733, 1, 2, Vector{UInt32}}})
[ Info: ("size x", (1, 42)) # 42 is the batch size
[ Info: ("size x[1]", (733, 18)) # 18 is the length of the sentence
[ Info: ("typeof y", Matrix{Flux.OneHotArray{UInt32, 6, 0, 1, UInt32}})
[ Info: ("size y", (1, 42))
[ Info: ("size y[1]", (6,))

I also can do loss(x[1], y[1]) and get a number. So I think my problems might be solved by…

  1. Figuring out what that extra ::Matrix{Real} is and where it comes from?
  2. Broadcasting over the DataLoader type? (I have tried gradient(()->loss.(x,y) but that also gives an error.)

I’m definitely missing something but I can’t figure out what! I appreciate any help and apologize for not having an MWE…

I’d recommend having a read through Model Reference · Flux because it describes what data formats are supported by Flux’s RNNs. In short, a DataLoader can give you a batch of sequences, but that must be converted to a sequence of batches in order to work with RNNs.

Thanks very much for your reply.

I am still very puzzled after reading the docs again. I’m working on a simplified example but I cannot make it work. Maybe you can help me get the simple case working and then I can figure out my actual use case?

# Suppose I have four sentences, each 20 words long, 
# w/ embedding dim per word of 10

a1 = rand(Float32, 10,20)
a2 = rand(Float32, 10,20)
a3 = rand(Float32, 10,20)
a4 = rand(Float32, 10,20)
fake_train = cat(a1, a2, a3, a4; dims = 3)


fake_labels = Flux.onehotbatch([0,1,0,1], [0,1])
# First question: is this step necessary to give the labels a similar shape to fake_train??
fake_labels = reshape(fake_labels, (2,1,4)) 

DL = Flux.DataLoader((data = fake_train, label=fake_labels), batchsize=2)


l1 = LSTM(10, 5)
d1 = Dense(5,2)
function model(x, scanner, encoder)
    state = scanner(x)[:,end]
    reset!(scanner)
    encoder(state)
end 


ps = params(l1, d1)
opt = ADAM(1e-3)

loss(x,y)= Flux.logitbinarycrossentropy(model(x, l1, d1), y)
# returns a number, not an error, so the loss function is working, I believe
loss(fake_train[:,:,1], fake_labels[:,:,1]) 

for i = 1:3
    @info i 
    Flux.train!(loss, ps, DL, opt)
end 

ERROR: LoadError: MethodError: no method matching loss(::NamedTuple{(:data, :label), Tuple{Array{Float32, 3}, Array{Bool, 3}}})
# this error makes sense given the way I defined loss(x,y)

The docs say “If d is a tuple of arguments to loss call loss(d...) , else call loss(d) .” I tried to follow advice here and do literally that (i.e., train!(x -> loss(x...), ...), but that also generated an error:
ERROR: LoadError: BoundsError: attempt to access 5×20×2 Array{Float32, 3} at index [1:5, 20]

I have also tried defining loss(x,y) = logitbinarycrossentropy.(... (i.e, broadcasting) or loss((x,y)) = ... (i.e., defining the loss over tuples directly), but these generate errors of their own.

Maybe I am a little closer to an MWE now…?

Ah sorry, apparently I posted the wrong docs page! Strongly recommend reading through Recurrence · Flux. It’ll tell you pretty much all you need to know to get an RNN working. Then the remaining part is getting your data into a form which is RNN-friendly. Here are some tips for that:

  1. Get one batch working first. e.g. write functionality to take a batch of sequences and transform them into a (possibly padded) sequence of batched features. This includes making sure the loss function works as expected.
  2. Once a single batch works, you can try using a dataloader or other functionality to sample batches from the training set.
  3. train! is both very high level and not very flexible. As a consequence, it’s basically a no-go for RNNs. Since you’re comfortable with a custom training loop already, that’s the way to go.
1 Like

Thanks for commenting again.

I can make the loss function over a batch one element at a time, but not with the DataLoader type.

Example:

using Flux
using Flux: reset!, onehotbatch 

fake_train = rand(Float32,10,20, 4)
fake_labels = Flux.onehotbatch([0,1,0,1], [0,1])
 
l1 = LSTM(10, 5)
d1 = Dense(5,2)
function model(x, scanner, encoder)
    state = scanner(x)[:,end]
    reset!(scanner)
    encoder(state)
end 

ps = params(l1, d1)
opt = ADAM(1e-3)

loss(x,y) = Flux.logitbinarycrossentropy(model(x, l1, d1), y) 

# this works
for (x,y) in zip(eachslice(fake_train, dims=3), fake_labels)
    grads = Flux.gradient(ps) do 
        loss(x,y)
    end
    Flux.Optimise.update!(opt, ps, grads)
end
 

But if I try to extend the loss function to handle a batch via the DataLoader (more or less following the docs you linked to) I get an error I can’t figure out:

DL = Flux.DataLoader((data = fake_train, label=fake_labels), batchsize=2)

function loss2(x,y)
    sum( Flux.logitbinarycrossentropy(model(dat, l1, d1), lab ) for (dat,lab) in zip(x,y))
end 

for (x,y) in DL
    loss2(x,y) 
end 

ERROR: LoadError: MethodError: no method matching (::Flux.LSTMCell{Matrix{Float32}, Vector{Float32}, Tuple{Matrix{Float32}, Matrix{Float32}}})(::Tuple{Matrix{Float32}, Matrix{Float32}}, ::Float32)

The part I don’t understand about that error is the single ::Float32 at the end of (::Tuple{Matrix{Float32}, Matrix{Float32}}, ::Float32). What Float32 is that?

I expect that x should be a Matrix{Float32} with size (10,20,2) and that y would be a onehot matrix with size (2,2,2). If DataLoader treats the last dimension as the observation dimension (right?), it should split fake_labels like that.

Maybe fake_labels is getting mangled somehow? I do not know if that is the problem though.

Let’s work backwards to see what is going on, starting from the forward pass:

function model(x, scanner, encoder)
    state = scanner(x)[:,end]
    reset!(scanner)
    encoder(state)
end 
  1. Per the linked docs, what structure should x have? If it’s not a sequence of num features x batch size arrays, then the input format is incorrect.
  2. Given an input of the correct shape, how should scanner be applied over them? Here you’ll want to consult Recurrence · Flux to see what the correct usage pattern is.
  3. Once the RNN part is correct, how should the RNN output be fed into the dense encoder? Your code is already pretty close there, but I posit to you that this should be even easier once 2) is figured out.

That’s the entirety of the forward pass. Once that’s working as expected, you can move onto making sure the loss function inputs and outputs are what you expect. I recommend starting with a dummy batch of data first before trying to integrate data loading, as it’ll remove Flux.DataLoader as a variable.

I thought I had it, but now I’m pretty sure I don’t! I appreciate your taking the time to look at this again.

This part (which you are highlighting again in your most recent comment) I think I’ve got:

A potential source of ambiguity with RNN in Flux can come from the different data layout compared to some common frameworks where data is typically a 3 dimensional array: (features, seq length, samples) . In Flux, those 3 dimensions are provided through a vector of seq length containing a matrix (features, samples) .

I have sentences of max length 80, an embedding dimension of 50, for (let’s say) 500 training samples. Then training_data is a Vector{ Matrix{Float32}} where size(training_data) = (80,) and size(data[1]) = (50,500). train_label is (1,500) where each entry is a one-hot encoded label for k classes in the problem (I think this size actually won’t work later).

I’m sure there was a deliberate, careful decision behind this data layout, but it did not correspond well to the way my data was organized (as (sentence, label) rows in a spreadsheet).

Now suppose I do:

l1 = LSTM(50, 5) # 50 is the embedding dimension

Given the data organization above, the last feature for each observation is actually given by:

l1.(train_data)[end] # not [:, end]

So what I want is roughly:

d = Dense(5,k)
d(l1.(train_data)[end]) # broadcast l1, but not d
# gives correctly sized output k x (# sentences)

I think this gets it:

function model(x, scanner, encoder)
	state = scanner.(x)[end]           # the last row
	reset!(scanner)                   # will this be called N_sentences times?
	encoder(state)
end 

I still need to understand how to make the loss work. My data is not shaped correctly. I will make another post when I can figure that out… Again I appreciate your help.

Two comments only because you are a committer to the repository!

It’s hard to test to test the model(x) functions with data organized as a Vector{ Matrix{Float32}} because it’s hard to select out “one observation”. I can’t do model(data[:][:,1]), for example, where data[:][:,1] would be all the embeddings for all of the words in the first sentence.

Another problem: isn’t it a little hard to batch training examples with this data layout? AFAICT, to batch this, you have to produce Vectors of length max_length (=80) where each element is a matrix w/ features rows (=50) and with batch_size columns. This is easier if the data is stored as a 3-D tensor but it is a little harder when it’s a Vector{ Matrix{Float32} }. Or is there an easier way?

Remember that input dimensions in Julia are generally reversed from many Python libraries. So if in PyTorch you’d pass something of size (80, 500, 50) to a RNN — PyTorch 2.1 documentation, in Flux that’d be (50, 500, 80) (or more accurately, ((50, 500), 80), more on that in a sec). The easiest way to think about this is that the sequence length gets added as an adjacent dimension to the batch size, i.e. batch x features becomes length x batch x features and features x batch becomes features x batch x length.

Yes, but note the warning under Recurrence · Flux :

Mapping and broadcasting operations with stateful layers such are discouraged, since the julia language doesn’t guarantee a specific execution order…

Since you’re only interested in the final output, there’s also no need to materialize any of the intermediate timesteps either:

function model(x, scanner, encoder)
    # this will be called once, but calling it in the middle of the forward pass may not work. Best to call it outside of the loss function completely, 
    # but if that's not possible then moving it here is better.
    reset!(scanner) 

    state = x[1]
    for i in 2:length(x)
        state = scanner(x[i])
    end
    encoder(state)
end 

foldl may be substituted for a more functional forwards pass.

This is a sore point of the current RNN interface, and to your earlier point I’m not really sure why it was designed this way. We’ve been slowly moving towards supporting contiguous sequences of AbstractArray{T, 3} (see https://github.com/FluxML/Flux.jl/pull/1686, now in stable Flux. Also the tracking issue at Recurrent network interface updates/design · Issue #1678 · FluxML/Flux.jl · GitHub) like other frameworks. So your example would become rnn_model(data[:, 1, :]) (or rnn_model(@view data[:, 1, :] to avoid the extra allocation).

If you can create a 3D array, then converting it to a Vector{Matrix} is a simple matter of collect(eachslice(x, dims=3)) :slight_smile:

Good to know this may change! Thanks for continuing to look at this question.

So having changed the model to this:

function model(x, scanner, encoder)
	reset!(scanner)                  
	state = scanner(x[1]) 
	for i=2:length(x)                 # this is explicit about the order
		state = scanner(x[i])
	end 
	encoder(state)
end

The only way I could make the loss function work across a batch of data organized as above and labels was like this:

function overall_loss(data, labels, scanner, encoder)
	losses = zeros(Float32, length(labels))
	for i = 1:size(losses,1)
		losses[i] += Flux.logitcrossentropy(model([data[j][:,i] for j = 1:size(data,1)] , scanner, encoder), labels[i])
	end 
	return mean(losses)
end 

That’s very slow (about 2:30 for one evaluation on a batch of 350 sentences), (I think) due to the time it takes to collect [data[j][:,i] for j = 1:size(data,1)].

Slightly faster was to collect all of those vectors in advance:

function another_loss(data,labels,scanner,encoder)
	losses = zeros(Float32,length(labels))
	td = [[data[i][:,k] for i =1:length(data)] for k = 1:size(data[1],2)]
	for i = 1:size(td,1)
		losses[i] += Flux.logitcrossentropy(model(td[i], scanner, encoder) ,labels[i])
	end 
	return mean(losses)
end 

But both are very slow. Is there a smarter way to do that?

logitcrossentropy and all other Flux losses should already be vectorized, so if the following doesn’t work then I’d check the shape of the labels and model outputs to make sure they’re the correct shape (output features x batch size):

function overall_loss(data, labels, scanner, encoder)
  logits = model(data, scanner, encoder)
  Flux.Zygote.ignore() do  # Debugging purposes only. If this fails, check the shapes!
    @assert ndims(logits) == 2 && size(logits) == size(labels)
  end
  Flux.logitcrossentropy(logits, labels)
end

I swear model(data, scanner, encoder) was giving me an error yesterday evening, but it works as you propose here (after reshaping my labels).

Indeed your version is much faster: 35 seconds.

Maybe the entire thing actually works now…

1 Like