Tuesday, January 10, 2017

Theano LSTM

I'm just going to show a code snippet the shows the main forward step of a single layer Long Short Term Memory (LSTM) recurrent neural network (RNN). I'm not going to go through how it all works, as there are a ton of great resources online for RNNs. Check out this, by Andrej Karpathy, this tutorial on implementing RNNs in NumPy and Theano, or this fantastic explanation of LSTMs. Note that I've gotten this to work with the MNIST dataset, resulting in some crazy low error rates. If you're interested in seeing that code, let me know!

import time
import pdb
import numpy as np
import theano
import theano.tensor as T
import h5py
class LSTMLayer(object):
def __init__(self,X,dim,**kwargs):
"""
Set up the weight matrices for a long short term memory (LSTM) unit.
I use the notation from Graves.
args:
- dim: A dictionary containing the dimensions of the units inside the LSTM.
kwargs:
-
"""
uni = np.random.uniform
def diag_constructor(limit,size,n):
"""
args:
- limit: A list whose two elements correspond to the limit for the numpy uniform function.
- size: (Int) one dimension of the square matrix.
- n: The number of these matrices to create.
"""
diag_ind = np.diag_indices(size)
mat = np.zeros((n,size,size))
for i in xrange(n):
diag_val = uni(limit[0], limit[1],size)
mat[i,diag_ind] = diag_val
return mat.astype(theano.config.floatX)
truncate = kwargs.get("bptt_truncate", -1)
nin = dim.get('in_dim')
nout = dim.get('out_dim')
nhid = dim.get('hid_dim')
self.nin = nin
self.nout = nout
self.nhid = nhid
# print("hidden dim", nhid)
# I can cast weight matrices differently. Instead of creating separate weight matrices for each connection, I create them
# based on their size. This cleans up the code and potentially makes things more efficient. I will say that it makes
# the recurrent step function harder to read.
self.Wi = theano.shared(uni(-np.sqrt(1.0/(nin*nhid)), np.sqrt(1.0/(nin*nhid)),(4, nin, nhid)).astype(theano.config.floatX),name='Wi')
self.Wh = theano.shared(uni(-np.sqrt(1.0/(nhid**2)), np.sqrt(1.0/(nhid**2)),(4, nhid, nhid)).astype(theano.config.floatX),name='Wh')
self.Wc = theano.shared(diag_constructor([-np.sqrt(1.0/(nhid**2)), np.sqrt(1.0/(nhid**2))],nhid,3),name='Wc')
self.b = theano.shared(np.zeros((4,nhid)), name='b')
self.Wy = theano.shared(uni(-np.sqrt(1.0/(nhid*nout)), np.sqrt(1.0/(nhid*nout)),(nhid,nout)).astype(theano.config.floatX),name='Wy')
self.by = theano.shared(np.zeros(nout), name='by')
self.params = [self.Wi, self.Wh, self.Wc, self.b, self.Wy, self.by]
def recurrent_step(x_t,b_tm1,s_tm1):
"""
Define the recurrent step.
args:
- x_t: the current sequence
- b_tm1: the previous b_t (b_{t minus 1})
- s_tml: the previous s_t (s_{t minus 1}) this is the state of the cell
"""
# Input
b_L = T.nnet.sigmoid(T.dot(x_t, self.Wi[0]) + T.dot(b_tm1,self.Wh[0]) + T.dot(s_tm1, self.Wc[0]) + self.b[0])
# Forget
b_Phi = T.nnet.sigmoid(T.dot(x_t,self.Wi[1]) + T.dot(b_tm1,self.Wh[1]) + T.dot(s_tm1, self.Wc[1]) + self.b[1])
# Cell
a_Cell = T.dot(x_t, self.Wi[2]) + T.dot(b_tm1, self.Wh[2]) + self.b[2]
s_t = b_Phi * s_tm1 + b_L*T.tanh(a_Cell)
# Output
b_Om = T.nnet.sigmoid(T.dot(x_t, self.Wi[3]) + T.dot(b_tm1,self.Wh[3]) + T.dot(s_t, self.Wc[2]) + self.b[3])
# Final output (What gets sent to the next step in the recurrence)
b_Cell = b_Om*T.tanh(s_t)
# Sequence output
o_t = T.nnet.softmax(T.dot(b_Cell, self.Wy) + self.by)
return b_Cell, s_t, o_t
out, _ = theano.scan(recurrent_step,
truncate_gradient=truncate,
sequences = X,
outputs_info=[
{'initial':T.zeros((X.shape[1],nhid))},
{'initial':T.zeros((X.shape[1],nhid))},
{'initial':None}
],
n_steps=X.shape[0])
self.b_out = out[0]
self.pred = out[2]
view raw LSTM.py hosted with ❤ by GitHub