I'm just going to show a code snippet the shows the main forward step of a single layer Long Short Term Memory (LSTM) recurrent neural network (RNN). I'm not going to go through how it all works, as there are a ton of great resources online for RNNs. Check out this, by Andrej Karpathy, this tutorial on implementing RNNs in NumPy and Theano, or this fantastic explanation of LSTMs. Note that I've gotten this to work with the MNIST dataset, resulting in some crazy low error rates. If you're interested in seeing that code, let me know!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import pdb | |
import numpy as np | |
import theano | |
import theano.tensor as T | |
import h5py | |
class LSTMLayer(object): | |
def __init__(self,X,dim,**kwargs): | |
""" | |
Set up the weight matrices for a long short term memory (LSTM) unit. | |
I use the notation from Graves. | |
args: | |
- dim: A dictionary containing the dimensions of the units inside the LSTM. | |
kwargs: | |
- | |
""" | |
uni = np.random.uniform | |
def diag_constructor(limit,size,n): | |
""" | |
args: | |
- limit: A list whose two elements correspond to the limit for the numpy uniform function. | |
- size: (Int) one dimension of the square matrix. | |
- n: The number of these matrices to create. | |
""" | |
diag_ind = np.diag_indices(size) | |
mat = np.zeros((n,size,size)) | |
for i in xrange(n): | |
diag_val = uni(limit[0], limit[1],size) | |
mat[i,diag_ind] = diag_val | |
return mat.astype(theano.config.floatX) | |
truncate = kwargs.get("bptt_truncate", -1) | |
nin = dim.get('in_dim') | |
nout = dim.get('out_dim') | |
nhid = dim.get('hid_dim') | |
self.nin = nin | |
self.nout = nout | |
self.nhid = nhid | |
# print("hidden dim", nhid) | |
# I can cast weight matrices differently. Instead of creating separate weight matrices for each connection, I create them | |
# based on their size. This cleans up the code and potentially makes things more efficient. I will say that it makes | |
# the recurrent step function harder to read. | |
self.Wi = theano.shared(uni(-np.sqrt(1.0/(nin*nhid)), np.sqrt(1.0/(nin*nhid)),(4, nin, nhid)).astype(theano.config.floatX),name='Wi') | |
self.Wh = theano.shared(uni(-np.sqrt(1.0/(nhid**2)), np.sqrt(1.0/(nhid**2)),(4, nhid, nhid)).astype(theano.config.floatX),name='Wh') | |
self.Wc = theano.shared(diag_constructor([-np.sqrt(1.0/(nhid**2)), np.sqrt(1.0/(nhid**2))],nhid,3),name='Wc') | |
self.b = theano.shared(np.zeros((4,nhid)), name='b') | |
self.Wy = theano.shared(uni(-np.sqrt(1.0/(nhid*nout)), np.sqrt(1.0/(nhid*nout)),(nhid,nout)).astype(theano.config.floatX),name='Wy') | |
self.by = theano.shared(np.zeros(nout), name='by') | |
self.params = [self.Wi, self.Wh, self.Wc, self.b, self.Wy, self.by] | |
def recurrent_step(x_t,b_tm1,s_tm1): | |
""" | |
Define the recurrent step. | |
args: | |
- x_t: the current sequence | |
- b_tm1: the previous b_t (b_{t minus 1}) | |
- s_tml: the previous s_t (s_{t minus 1}) this is the state of the cell | |
""" | |
# Input | |
b_L = T.nnet.sigmoid(T.dot(x_t, self.Wi[0]) + T.dot(b_tm1,self.Wh[0]) + T.dot(s_tm1, self.Wc[0]) + self.b[0]) | |
# Forget | |
b_Phi = T.nnet.sigmoid(T.dot(x_t,self.Wi[1]) + T.dot(b_tm1,self.Wh[1]) + T.dot(s_tm1, self.Wc[1]) + self.b[1]) | |
# Cell | |
a_Cell = T.dot(x_t, self.Wi[2]) + T.dot(b_tm1, self.Wh[2]) + self.b[2] | |
s_t = b_Phi * s_tm1 + b_L*T.tanh(a_Cell) | |
# Output | |
b_Om = T.nnet.sigmoid(T.dot(x_t, self.Wi[3]) + T.dot(b_tm1,self.Wh[3]) + T.dot(s_t, self.Wc[2]) + self.b[3]) | |
# Final output (What gets sent to the next step in the recurrence) | |
b_Cell = b_Om*T.tanh(s_t) | |
# Sequence output | |
o_t = T.nnet.softmax(T.dot(b_Cell, self.Wy) + self.by) | |
return b_Cell, s_t, o_t | |
out, _ = theano.scan(recurrent_step, | |
truncate_gradient=truncate, | |
sequences = X, | |
outputs_info=[ | |
{'initial':T.zeros((X.shape[1],nhid))}, | |
{'initial':T.zeros((X.shape[1],nhid))}, | |
{'initial':None} | |
], | |
n_steps=X.shape[0]) | |
self.b_out = out[0] | |
self.pred = out[2] |