Source code for fastNLP.modules.encoder.masked_rnn

__author__ = 'max'

import torch
import torch.nn as nn
import torch.nn.functional as F

from fastNLP.modules.utils import initial_parameter
def MaskedRecurrent(reverse=False):
    def forward(input, hidden, cell, mask, train=True, dropout=0):
        :param input:
        :param hidden:
        :param cell:
        :param mask:
        :param dropout: step之间的dropout,对mask了的也会drop,应该是没问题的,反正没有gradient
        :param train: 控制dropout的行为,在StackedRNN的forward中调用
        output = []
        steps = range(input.size(0) - 1, -1, -1) if reverse else range(input.size(0))
        for i in steps:
            if mask is None or mask[i].data.min() > 0.5:  # 没有mask,都是1
                hidden = cell(input[i], hidden)
            elif mask[i].data.max() > 0.5:  # 有mask,但不全为0
                hidden_next = cell(input[i], hidden)  # 一次喂入一个batch!
                # hack to handle LSTM
                if isinstance(hidden, tuple):  # LSTM outputs a tuple of (hidden, cell), this is a common hack 😁
                    mask = mask.float()
                    hx, cx = hidden
                    hp1, cp1 = hidden_next
                    hidden = (
                        hx + (hp1 - hx) * mask[i].squeeze(),
                        cx + (cp1 - cx) * mask[i].squeeze())  # Why? 我知道了!!如果是mask就不用改变
                    hidden = hidden + (hidden_next - hidden) * mask[i]

            # if dropout != 0 and train: # warning, should i treat masked tensor differently?
            #     if isinstance(hidden, tuple):
            #         hidden = (F.dropout(hidden[0], p=dropout, training=train),
            #                   F.dropout(hidden[1], p=dropout, training=train))
            #     else:
            #         hidden = F.dropout(hidden, p=dropout, training=train)

            # hack to handle LSTM
            output.append(hidden[0] if isinstance(hidden, tuple) else hidden)

        if reverse:
        output =, 0).view(input.size(0), *output[0].size())

        return hidden, output

    return forward

def StackedRNN(inners, num_layers, lstm=False, train=True, step_dropout=0, layer_dropout=0):
    num_directions = len(inners)  # rec_factory!
    total_layers = num_layers * num_directions

    def forward(input, hidden, cells, mask):
        assert (len(cells) == total_layers)
        next_hidden = []

        if lstm:
            hidden = list(zip(*hidden))

        for i in range(num_layers):
            all_output = []
            for j, inner in enumerate(inners):
                l = i * num_directions + j
                hy, output = inner(input, hidden[l], cells[l], mask, step_dropout, train)

            input =, input.dim() - 1)  # 下一层的输入

            if layer_dropout != 0 and i < num_layers - 1:
                input = F.dropout(input, p=layer_dropout, training=train, inplace=False)

        if lstm:
            next_h, next_c = zip(*next_hidden)
            next_hidden = (
      , 0).view(total_layers, *next_h[0].size()),
      , 0).view(total_layers, *next_c[0].size())
            next_hidden =, 0).view(total_layers, *next_hidden[0].size())

        return next_hidden, input

    return forward

def AutogradMaskedRNN(num_layers=1, batch_first=False, train=True, layer_dropout=0, step_dropout=0,
                      bidirectional=False, lstm=False):
    rec_factory = MaskedRecurrent

    if bidirectional:
        layer = (rec_factory(), rec_factory(reverse=True))
        layer = (rec_factory(),)  # rec_factory 就是每层的结构啦!!在MaskedRecurrent中进行每层的计算!然后用StackedRNN接起来

    func = StackedRNN(layer,
                      layer_dropout=layer_dropout, step_dropout=step_dropout,

    def forward(input, cells, hidden, mask):
        if batch_first:
            input = input.transpose(0, 1)
            if mask is not None:
                mask = mask.transpose(0, 1)

        nexth, output = func(input, hidden, cells, mask)

        if batch_first:
            output = output.transpose(0, 1)

        return output, nexth

    return forward

def MaskedStep():
    def forward(input, hidden, cell, mask):
        if mask is None or > 0.5:
            hidden = cell(input, hidden)
        elif > 0.5:
            hidden_next = cell(input, hidden)
            # hack to handle LSTM
            if isinstance(hidden, tuple):
                hx, cx = hidden
                hp1, cp1 = hidden_next
                hidden = (hx + (hp1 - hx) * mask, cx + (cp1 - cx) * mask)
                hidden = hidden + (hidden_next - hidden) * mask
        # hack to handle LSTM
        output = hidden[0] if isinstance(hidden, tuple) else hidden

        return hidden, output

    return forward

def StackedStep(layer, num_layers, lstm=False, dropout=0, train=True):
    def forward(input, hidden, cells, mask):
        assert (len(cells) == num_layers)
        next_hidden = []

        if lstm:
            hidden = list(zip(*hidden))

        for l in range(num_layers):
            hy, output = layer(input, hidden[l], cells[l], mask)
            input = output

            if dropout != 0 and l < num_layers - 1:
                input = F.dropout(input, p=dropout, training=train, inplace=False)

        if lstm:
            next_h, next_c = zip(*next_hidden)
            next_hidden = (
      , 0).view(num_layers, *next_h[0].size()),
      , 0).view(num_layers, *next_c[0].size())
            next_hidden =, 0).view(num_layers, *next_hidden[0].size())

        return next_hidden, input

    return forward

def AutogradMaskedStep(num_layers=1, dropout=0, train=True, lstm=False):
    layer = MaskedStep()

    func = StackedStep(layer,

    def forward(input, cells, hidden, mask):
        nexth, output = func(input, hidden, cells, mask)
        return output, nexth

    return forward

[docs]class MaskedRNNBase(nn.Module): def __init__(self, Cell, input_size, hidden_size, num_layers=1, bias=True, batch_first=False, layer_dropout=0, step_dropout=0, bidirectional=False, initial_method = None , **kwargs): """ :param Cell: :param input_size: :param hidden_size: :param num_layers: :param bias: :param batch_first: :param layer_dropout: :param step_dropout: :param bidirectional: :param kwargs: """ super(MaskedRNNBase, self).__init__() self.Cell = Cell self.input_size = input_size self.hidden_size = hidden_size self.num_layers = num_layers self.bias = bias self.batch_first = batch_first self.layer_dropout = layer_dropout self.step_dropout = step_dropout self.bidirectional = bidirectional num_directions = 2 if bidirectional else 1 self.all_cells = [] for layer in range(num_layers): # 初始化所有cell for direction in range(num_directions): layer_input_size = input_size if layer == 0 else hidden_size * num_directions cell = self.Cell(layer_input_size, hidden_size, self.bias, **kwargs) self.all_cells.append(cell) self.add_module('cell%d' % (layer * num_directions + direction), cell) # Max的代码写得真好看 initial_parameter(self, initial_method) def reset_parameters(self): for cell in self.all_cells: cell.reset_parameters()
[docs] def forward(self, input, mask=None, hx=None): batch_size = input.size(0) if self.batch_first else input.size(1) lstm = self.Cell is nn.LSTMCell if hx is None: num_directions = 2 if self.bidirectional else 1 hx = torch.autograd.Variable( * num_directions, batch_size, self.hidden_size).zero_()) if lstm: hx = (hx, hx) func = AutogradMaskedRNN(num_layers=self.num_layers, batch_first=self.batch_first, step_dropout=self.step_dropout, layer_dropout=self.layer_dropout,, bidirectional=self.bidirectional, lstm=lstm) # 传入all_cells,继续往底层封装走 output, hidden = func(input, self.all_cells, hx, None if mask is None else mask.view(mask.size() + (1,))) # 这个+ (1, )是个什么操作? return output, hidden
[docs] def step(self, input, hx=None, mask=None): ''' execute one step forward (only for one-directional RNN). Args: input (batch, input_size): input tensor of this step. hx (num_layers, batch, hidden_size): the hidden state of last step. mask (batch): the mask tensor of this step. Returns: output (batch, hidden_size): tensor containing the output of this step from the last layer of RNN. hn (num_layers, batch, hidden_size): tensor containing the hidden state of this step ''' assert not self.bidirectional, "step only cannot be applied to bidirectional RNN." # aha, typo! batch_size = input.size(0) lstm = self.Cell is nn.LSTMCell if hx is None: hx = torch.autograd.Variable(, batch_size, self.hidden_size).zero_()) if lstm: hx = (hx, hx) func = AutogradMaskedStep(num_layers=self.num_layers, dropout=self.step_dropout,, lstm=lstm) output, hidden = func(input, self.all_cells, hx, mask) return output, hidden
[docs]class MaskedRNN(MaskedRNNBase): r"""Applies a multi-layer Elman RNN with costomized non-linearity to an input sequence. For each element in the input sequence, each layer computes the following function: .. math:: h_t = \tanh(w_{ih} * x_t + b_{ih} + w_{hh} * h_{(t-1)} + b_{hh}) where :math:`h_t` is the hidden state at time `t`, and :math:`x_t` is the hidden state of the previous layer at time `t` or :math:`input_t` for the first layer. If nonlinearity='relu', then `ReLU` is used instead of `tanh`. Args: input_size: The number of expected features in the input x hidden_size: The number of features in the hidden state h num_layers: Number of recurrent layers. nonlinearity: The non-linearity to use ['tanh'|'relu']. Default: 'tanh' bias: If False, then the layer does not use bias weights b_ih and b_hh. Default: True batch_first: If True, then the input and output tensors are provided as (batch, seq, feature) dropout: If non-zero, introduces a dropout layer on the outputs of each RNN layer except the last layer bidirectional: If True, becomes a bidirectional RNN. Default: False Inputs: input, mask, h_0 - **input** (seq_len, batch, input_size): tensor containing the features of the input sequence. **mask** (seq_len, batch): 0-1 tensor containing the mask of the input sequence. - **h_0** (num_layers * num_directions, batch, hidden_size): tensor containing the initial hidden state for each element in the batch. Outputs: output, h_n - **output** (seq_len, batch, hidden_size * num_directions): tensor containing the output features (h_k) from the last layer of the RNN, for each k. If a :class:`torch.nn.utils.rnn.PackedSequence` has been given as the input, the output will also be a packed sequence. - **h_n** (num_layers * num_directions, batch, hidden_size): tensor containing the hidden state for k=seq_len. """ def __init__(self, *args, **kwargs): super(MaskedRNN, self).__init__(nn.RNNCell, *args, **kwargs)
[docs]class MaskedLSTM(MaskedRNNBase): r"""Applies a multi-layer long short-term memory (LSTM) RNN to an input sequence. For each element in the input sequence, each layer computes the following function: .. math:: \begin{array}{ll} i_t = \mathrm{sigmoid}(W_{ii} x_t + b_{ii} + W_{hi} h_{(t-1)} + b_{hi}) \\ f_t = \mathrm{sigmoid}(W_{if} x_t + b_{if} + W_{hf} h_{(t-1)} + b_{hf}) \\ g_t = \tanh(W_{ig} x_t + b_{ig} + W_{hc} h_{(t-1)} + b_{hg}) \\ o_t = \mathrm{sigmoid}(W_{io} x_t + b_{io} + W_{ho} h_{(t-1)} + b_{ho}) \\ c_t = f_t * c_{(t-1)} + i_t * g_t \\ h_t = o_t * \tanh(c_t) \end{array} where :math:`h_t` is the hidden state at time `t`, :math:`c_t` is the cell state at time `t`, :math:`x_t` is the hidden state of the previous layer at time `t` or :math:`input_t` for the first layer, and :math:`i_t`, :math:`f_t`, :math:`g_t`, :math:`o_t` are the input, forget, cell, and out gates, respectively. Args: input_size: The number of expected features in the input x hidden_size: The number of features in the hidden state h num_layers: Number of recurrent layers. bias: If False, then the layer does not use bias weights b_ih and b_hh. Default: True batch_first: If True, then the input and output tensors are provided as (batch, seq, feature) dropout: If non-zero, introduces a dropout layer on the outputs of each RNN layer except the last layer bidirectional: If True, becomes a bidirectional RNN. Default: False Inputs: input, mask, (h_0, c_0) - **input** (seq_len, batch, input_size): tensor containing the features of the input sequence. **mask** (seq_len, batch): 0-1 tensor containing the mask of the input sequence. - **h_0** (num_layers \* num_directions, batch, hidden_size): tensor containing the initial hidden state for each element in the batch. - **c_0** (num_layers \* num_directions, batch, hidden_size): tensor containing the initial cell state for each element in the batch. Outputs: output, (h_n, c_n) - **output** (seq_len, batch, hidden_size * num_directions): tensor containing the output features `(h_t)` from the last layer of the RNN, for each t. If a :class:`torch.nn.utils.rnn.PackedSequence` has been given as the input, the output will also be a packed sequence. - **h_n** (num_layers * num_directions, batch, hidden_size): tensor containing the hidden state for t=seq_len - **c_n** (num_layers * num_directions, batch, hidden_size): tensor containing the cell state for t=seq_len """ def __init__(self, *args, **kwargs): super(MaskedLSTM, self).__init__(nn.LSTMCell, *args, **kwargs)
[docs]class MaskedGRU(MaskedRNNBase): r"""Applies a multi-layer gated recurrent unit (GRU) RNN to an input sequence. For each element in the input sequence, each layer computes the following function: .. math:: \begin{array}{ll} r_t = \mathrm{sigmoid}(W_{ir} x_t + b_{ir} + W_{hr} h_{(t-1)} + b_{hr}) \\ z_t = \mathrm{sigmoid}(W_{iz} x_t + b_{iz} + W_{hz} h_{(t-1)} + b_{hz}) \\ n_t = \tanh(W_{in} x_t + b_{in} + r_t * (W_{hn} h_{(t-1)}+ b_{hn})) \\ h_t = (1 - z_t) * n_t + z_t * h_{(t-1)} \\ \end{array} where :math:`h_t` is the hidden state at time `t`, :math:`x_t` is the hidden state of the previous layer at time `t` or :math:`input_t` for the first layer, and :math:`r_t`, :math:`z_t`, :math:`n_t` are the reset, input, and new gates, respectively. Args: input_size: The number of expected features in the input x hidden_size: The number of features in the hidden state h num_layers: Number of recurrent layers. nonlinearity: The non-linearity to use ['tanh'|'relu']. Default: 'tanh' bias: If False, then the layer does not use bias weights b_ih and b_hh. Default: True batch_first: If True, then the input and output tensors are provided as (batch, seq, feature) dropout: If non-zero, introduces a dropout layer on the outputs of each RNN layer except the last layer bidirectional: If True, becomes a bidirectional RNN. Default: False Inputs: input, mask, h_0 - **input** (seq_len, batch, input_size): tensor containing the features of the input sequence. **mask** (seq_len, batch): 0-1 tensor containing the mask of the input sequence. - **h_0** (num_layers * num_directions, batch, hidden_size): tensor containing the initial hidden state for each element in the batch. Outputs: output, h_n - **output** (seq_len, batch, hidden_size * num_directions): tensor containing the output features (h_k) from the last layer of the RNN, for each k. If a :class:`torch.nn.utils.rnn.PackedSequence` has been given as the input, the output will also be a packed sequence. - **h_n** (num_layers * num_directions, batch, hidden_size): tensor containing the hidden state for k=seq_len. """ def __init__(self, *args, **kwargs): super(MaskedGRU, self).__init__(nn.GRUCell, *args, **kwargs)