""" CTC-Connectionist Temporal Classification Code provided by Mohammad Pezeshki - May. 2015 - Montreal Institute for Learning Algorithms Referece: Graves, Alex, et al. "Connectionist temporal classification: labelling unsegmented sequence data with recurrent neural networks." Proceedings of the 23rd international conference on Machine learning. ACM, 2006. Credits: Shawn Tan, Rakesh Var This code is distributed without any warranty, express or implied. """ import theano from theano import tensor floatX = theano.config.floatX # T: INPUT_SEQUENCE_LENGTH # B: BATCH_SIZE # L: OUTPUT_SEQUENCE_LENGTH # C: NUM_CLASSES class CTC(object): """Connectionist Temporal Classification y_hat : T x B x C+1 y : L x B y_hat_mask : T x B y_mask : L x B """ @staticmethod def add_blanks(y, blank_symbol, y_mask=None): """Add blanks to a matrix and updates mask Input shape: L x B Output shape: 2L+1 x B """ # for y y_extended = y.T.dimshuffle(0, 1, 'x') blanks = tensor.zeros_like(y_extended) + blank_symbol concat = tensor.concatenate([y_extended, blanks], axis=2) res = concat.reshape((concat.shape[0], concat.shape[1] * concat.shape[2])).T begining_blanks = tensor.zeros((1, res.shape[1])) + blank_symbol blanked_y = tensor.concatenate([begining_blanks, res], axis=0) # for y_mask if y_mask is not None: y_mask_extended = y_mask.T.dimshuffle(0, 1, 'x') concat = tensor.concatenate([y_mask_extended, y_mask_extended], axis=2) res = concat.reshape((concat.shape[0], concat.shape[1] * concat.shape[2])).T begining_blanks = tensor.ones((1, res.shape[1]), dtype=floatX) blanked_y_mask = tensor.concatenate([begining_blanks, res], axis=0) else: blanked_y_mask = None return blanked_y, blanked_y_mask @staticmethod def class_batch_to_labeling_batch(y, y_hat, y_hat_mask=None): y_hat = y_hat * y_hat_mask.dimshuffle(0, 'x', 1) batch_size = y_hat.shape[2] res = y_hat[:, y.astype('int32'), tensor.arange(batch_size)] return res @staticmethod def recurrence_relation(y, y_mask, blank_symbol): n_y = y.shape[0] blanks = tensor.zeros((2, y.shape[1])) + blank_symbol ybb = tensor.concatenate((y, blanks), axis=0).T sec_diag = (tensor.neq(ybb[:, :-2], ybb[:, 2:]) * tensor.eq(ybb[:, 1:-1], blank_symbol) * y_mask.T) # r1: LxL # r2: LxL # r3: LxLxB r2 = tensor.eye(n_y, k=1) r3 = (tensor.eye(n_y, k=2).dimshuffle(0, 1, 'x') * sec_diag.dimshuffle(1, 'x', 0)) return r2, r3 @classmethod def path_probabs(cls, y, y_hat, y_mask, y_hat_mask, blank_symbol): pred_y = cls.class_batch_to_labeling_batch(y, y_hat, y_hat_mask) r2, r3 = cls.recurrence_relation(y, y_mask, blank_symbol) def step(p_curr, p_prev): # instead of dot product, we * first # and then sum oven one dimension. # objective: T.dot((p_prev)BxL, LxLxB) # solusion: Lx1xB * LxLxB --> LxLxB --> (sumover)xLxB dotproduct = (p_prev + tensor.dot(p_prev, r2) + (p_prev.dimshuffle(1, 'x', 0) * r3).sum(axis=0).T) return p_curr.T * dotproduct * y_mask.T # B x L probabilities, _ = theano.scan( step, sequences=[pred_y], outputs_info=[tensor.eye(y.shape[0])[0] * tensor.ones(y.T.shape)]) return probabilities, probabilities.shape @classmethod def cost(cls, y, y_hat, y_mask, y_hat_mask, blank_symbol): y_hat_mask_len = tensor.sum(y_hat_mask, axis=0, dtype='int32') y_mask_len = tensor.sum(y_mask, axis=0, dtype='int32') probabilities, sth = cls.path_probabs(y, y_hat, y_mask, y_hat_mask, blank_symbol) batch_size = probabilities.shape[1] labels_probab = (probabilities[y_hat_mask_len - 1, tensor.arange(batch_size), y_mask_len - 1] + probabilities[y_hat_mask_len - 1, tensor.arange(batch_size), y_mask_len - 2]) avg_cost = tensor.mean(-tensor.log(labels_probab)) return avg_cost, sth @staticmethod def _epslog(x): return tensor.cast(tensor.log(tensor.clip(x, 1E-12, 1E12)), theano.config.floatX) @staticmethod def log_add(a, b): max_ = tensor.maximum(a, b) return (max_ + tensor.log1p(tensor.exp(a + b - 2 * max_))) @staticmethod def log_dot_matrix(x, z): inf = 1E12 log_dot = tensor.dot(x, z) zeros_to_minus_inf = (z.max(axis=0) - 1) * inf return log_dot + zeros_to_minus_inf @staticmethod def log_dot_tensor(x, z): inf = 1E12 log_dot = (x.dimshuffle(1, 'x', 0) * z).sum(axis=0).T zeros_to_minus_inf = (z.max(axis=0) - 1) * inf return log_dot + zeros_to_minus_inf.T @classmethod def log_path_probabs(cls, y, y_hat, y_mask, y_hat_mask, blank_symbol): pred_y = cls.class_batch_to_labeling_batch(y, y_hat, y_hat_mask) r2, r3 = cls.recurrence_relation(y, y_mask, blank_symbol) def step(log_p_curr, log_p_prev): p1 = log_p_prev p2 = cls.log_dot_matrix(p1, r2) p3 = cls.log_dot_tensor(p1, r3) p123 = cls.log_add(p3, cls.log_add(p1, p2)) return (log_p_curr.T + p123 + cls._epslog(y_mask.T)) log_probabilities, _ = theano.scan( step, sequences=[cls._epslog(pred_y)], outputs_info=[cls._epslog(tensor.eye(y.shape[0])[0] * tensor.ones(y.T.shape))]) return log_probabilities @classmethod def log_cost(cls, y, y_hat, y_mask, y_hat_mask, blank_symbol): y_hat_mask_len = tensor.sum(y_hat_mask, axis=0, dtype='int32') y_mask_len = tensor.sum(y_mask, axis=0, dtype='int32') log_probabs = cls.log_path_probabs(y, y_hat, y_mask, y_hat_mask, blank_symbol) batch_size = log_probabs.shape[1] labels_probab = cls.log_add( log_probabs[y_hat_mask_len - 1, tensor.arange(batch_size), y_mask_len - 1], log_probabs[y_hat_mask_len - 1, tensor.arange(batch_size), y_mask_len - 2]) avg_cost = tensor.mean(-labels_probab) return avg_cost @classmethod def apply(cls, y, y_hat, y_mask, y_hat_mask, scale='log_scale'): y_hat = y_hat.dimshuffle(0, 2, 1) num_classes = y_hat.shape[1] - 1 blanked_y, blanked_y_mask = cls.add_blanks( y=y, blank_symbol=num_classes.astype(floatX), y_mask=y_mask) if scale == 'log_scale': final_cost = cls.log_cost(blanked_y, y_hat, blanked_y_mask, y_hat_mask, num_classes) else: final_cost, sth = cls.cost(blanked_y, y_hat, blanked_y_mask, y_hat_mask, num_classes) return final_cost