Source code for fastNLP.core.fieldarray

import numpy as np


[docs]class PadderBase: """ 所有padder都需要继承这个类,并覆盖__call__()方法。 用于对batch进行padding操作。传入的element是inplace的,即直接修改element可能导致数据变化,建议inplace修改之前deepcopy一份。 """ def __init__(self, pad_val=0, **kwargs): self.pad_val = pad_val def set_pad_val(self, pad_val): self.pad_val = pad_val def __call__(self, contents, field_name, field_ele_dtype): """ 传入的是List内容。假设有以下的DataSet。 from fastNLP import DataSet from fastNLP import Instance dataset = DataSet() dataset.append(Instance(word='this is a demo', length=4, chars=[['t', 'h', 'i', 's'], ['i', 's'], ['a'], ['d', 'e', 'm', 'o']])) dataset.append(Instance(word='another one', length=2, chars=[['a', 'n', 'o', 't', 'h', 'e', 'r'], ['o', 'n', 'e']])) # 如果batch_size=2, 下面只是用str的方式看起来更直观一点,但实际上可能word和chars在pad时都已经为index了。 word这个field的pad_func会接收到的内容会是 [ 'this is a demo', 'another one' ] length这个field的pad_func会接收到的内容会是 [4, 2] chars这个field的pad_func会接收到的内容会是 [ [['t', 'h', 'i', 's'], ['i', 's'], ['a'], ['d', 'e', 'm', 'o']], [['a', 'n', 'o', 't', 'h', 'e', 'r'], ['o', 'n', 'e']] ] 即把每个instance中某个field的内容合成一个List传入 :param contents: List[element]。传入的element是inplace的,即直接修改element可能导致数据变化,建议inplace修改之前 deepcopy一份。 :param field_name: str, field的名称,帮助定位错误 :param field_ele_dtype: np.int64, np.float64, np.str. 该field的内层list元素的类型。辅助判断是否pad,大多数情况用不上 :return: List[padded_element]或np.array([padded_element]) """ raise NotImplementedError
[docs]class AutoPadder(PadderBase): """ 根据contents的数据自动判定是否需要做padding。 (1) 如果元素类型(元素类型是指field中最里层List的元素的数据类型, 可以通过FieldArray.dtype查看,比如['This', 'is', ...]的元素类 型为np.str, [[1,2], ...]的元素类型为np.int64)的数据不为(np.int64, np.float64)则不会进行padding (2) 如果元素类型为(np.int64, np.float64), (2.1) 如果该field的内容只有一个,比如为sequence_length, 则不进行padding (2.2) 如果该field的内容为List, 那么会将Batch中的List pad为一样长。若该List下还有里层的List需要padding,请使用其它padder。 如果某个instance中field为[1, 2, 3],则可以pad; 若为[[1,2], [3,4, ...]]则不能进行pad """ def __init__(self, pad_val=0): """ :param pad_val: int, padding的位置使用该index """ super().__init__(pad_val=pad_val) def _is_two_dimension(self, contents): """ 判断contents是不是只有两个维度。[[1,2], [3]]是两个维度. [[[1,2], [3, 4, 5]], [[4,5]]]有三个维度 :param contents: :return: """ value = contents[0] if isinstance(value , (np.ndarray, list)): value = value[0] if isinstance(value, (np.ndarray, list)): return False return True return False def __call__(self, contents, field_name, field_ele_dtype): if not is_iterable(contents[0]): array = np.array([content for content in contents], dtype=field_ele_dtype) elif field_ele_dtype in (np.int64, np.float64) and self._is_two_dimension(contents): max_len = max([len(content) for content in contents]) array = np.full((len(contents), max_len), self.pad_val, dtype=field_ele_dtype) for i, content in enumerate(contents): array[i][:len(content)] = content else: # should only be str array = np.array([content for content in contents]) return array
[docs]class FieldArray(object): """``FieldArray`` is the collection of ``Instance``s of the same field. It is the basic element of ``DataSet`` class. :param str name: the name of the FieldArray :param list content: a list of int, float, str or np.ndarray, or a list of list of one, or a np.ndarray. :param bool is_target: If True, this FieldArray is used to compute loss. :param bool is_input: If True, this FieldArray is used to the model input. :param padder: PadderBase类型。大多数情况下都不需要设置该值,除非需要在多个维度上进行padding(比如英文中对character进行padding) """ def __init__(self, name, content, is_target=None, is_input=None, padder=AutoPadder(pad_val=0)): """DataSet在初始化时会有两类方法对FieldArray操作: 1) 如果DataSet使用dict初始化,那么在add_field中会构造FieldArray: 1.1) 二维list DataSet({"x": [[1, 2], [3, 4]]}) 1.2) 二维array DataSet({"x": np.array([[1, 2], [3, 4]])}) 1.3) 三维list DataSet({"x": [[[1, 2], [3, 4]], [[1, 2], [3, 4]]]}) 1.4) list of array: DataSet({"x": [np.array([1,2,3]), np.array([1,2,3])]}) 2) 如果DataSet使用list of Instance 初始化,那么在append中会先对第一个样本初始化FieldArray; 然后后面的样本使用FieldArray.append进行添加。 2.1) 一维list DataSet([Instance(x=[1, 2, 3, 4])]) 2.2) 一维array DataSet([Instance(x=np.array([1, 2, 3, 4]))]) 2.3) 二维list DataSet([Instance(x=[[1, 2], [3, 4]])]) 2.4) 二维array DataSet([Instance(x=np.array([[1, 2], [3, 4]]))]) 类型检查(dtype check)发生在当该field被设置为is_input或者is_target时。 """ self.name = name if isinstance(content, list): # 如果DataSet使用dict初始化, content 可能是二维list/二维array/三维list # 如果DataSet使用list of Instance 初始化, content可能是 [list]/[array]/[2D list] for idx, item in enumerate(content): # 这是使用list of Instance 初始化时第一个样本:FieldArray(name, [field]) # 将[np.array] 转化为 list of list # 也可以支持[array, array, array]的情况 if isinstance(item, np.ndarray): content[idx] = content[idx].tolist() elif isinstance(content, np.ndarray): content = content.tolist() # convert np.ndarray into 2-D list else: raise TypeError("content in FieldArray can only be list or numpy.ndarray, got {}.".format(type(content))) if len(content) == 0: raise RuntimeError("Cannot initialize FieldArray with empty list.") self.content = content # 1维 或 2维 或 3维 list, 形状可能不对齐 self.content_dim = None # 表示content是多少维的list self.set_padder(padder) self.BASIC_TYPES = (int, float, str) # content中可接受的Python基本类型,这里没有np.array self.pytype = None self.dtype = None self._is_input = None self._is_target = None if is_input is not None or is_target is not None: self.is_input = is_input self.is_target = is_target def _set_dtype(self): self.pytype = self._type_detection(self.content) self.dtype = self._map_to_np_type(self.pytype) @property def is_input(self): return self._is_input @is_input.setter def is_input(self, value): """ 当 field_array.is_input = True / False 时被调用 """ if value is True: self._set_dtype() self._is_input = value @property def is_target(self): return self._is_target @is_target.setter def is_target(self, value): """ 当 field_array.is_target = True / False 时被调用 """ if value is True: self._set_dtype() self._is_target = value def _type_detection(self, content): """当该field被设置为is_input或者is_target时被调用 """ if len(content) == 0: raise RuntimeError("Empty list in Field {}.".format(self.name)) type_set = set([type(item) for item in content]) if list in type_set: if len(type_set) > 1: # list 跟 非list 混在一起 raise RuntimeError("Mixed data types in Field {}: {}".format(self.name, type_set)) # >1维list inner_type_set = set() for l in content: [inner_type_set.add(type(obj)) for obj in l] if list not in inner_type_set: # 二维list self.content_dim = 2 return self._basic_type_detection(inner_type_set) else: if len(inner_type_set) == 1: # >2维list inner_inner_type_set = set() for _2d_list in content: for _1d_list in _2d_list: [inner_inner_type_set.add(type(obj)) for obj in _1d_list] if list in inner_inner_type_set: raise RuntimeError("FieldArray cannot handle 4-D or more-D list.") # 3维list self.content_dim = 3 return self._basic_type_detection(inner_inner_type_set) else: # list 跟 非list 混在一起 raise RuntimeError("Mixed data types in Field {}: {}".format(self.name, inner_type_set)) else: # 一维list for content_type in type_set: if content_type not in self.BASIC_TYPES: raise RuntimeError("Unexpected data type in Field '{}'. Expect one of {}. Got {}.".format( self.name, self.BASIC_TYPES, content_type)) self.content_dim = 1 return self._basic_type_detection(type_set) def _basic_type_detection(self, type_set): """ :param type_set: a set of Python types :return: one of self.BASIC_TYPES """ if len(type_set) == 1: return type_set.pop() elif len(type_set) == 2: # 有多个basic type; 可能需要up-cast if float in type_set and int in type_set: # up-cast int to float return float else: # str 跟 int 或者 float 混在一起 raise RuntimeError("Mixed data types in Field {}: {}".format(self.name, type_set)) else: # str, int, float混在一起 raise RuntimeError("Mixed data types in Field {}: {}".format(self.name, type_set)) def _1d_list_check(self, val): """如果不是1D list就报错 """ type_set = set((type(obj) for obj in val)) if any(obj not in self.BASIC_TYPES for obj in type_set): raise ValueError("Mixed data types in Field {}: {}".format(self.name, type_set)) self._basic_type_detection(type_set) # otherwise: _basic_type_detection will raise error return True def _2d_list_check(self, val): """如果不是2D list 就报错 """ type_set = set(type(obj) for obj in val) if list(type_set) != [list]: raise ValueError("Mixed data types in Field {}: {}".format(self.name, type_set)) inner_type_set = set() for l in val: for obj in l: inner_type_set.add(type(obj)) self._basic_type_detection(inner_type_set) return True @staticmethod def _map_to_np_type(basic_type): type_mapping = {int: np.int64, float: np.float64, str: np.str, np.ndarray: np.ndarray} return type_mapping[basic_type] def __repr__(self): return "FieldArray {}: {}".format(self.name, self.content.__repr__())
[docs] def append(self, val): """Add a new item to the tail of FieldArray. :param val: int, float, str, or a list of one. """ if isinstance(val, list): pass elif isinstance(val, tuple): # 确保最外层是list val = list(val) elif isinstance(val, np.ndarray): val = val.tolist() elif any((isinstance(val, t) for t in self.BASIC_TYPES)): pass else: raise RuntimeError( "Unexpected data type {}. Should be list, np.array, or {}".format(type(val), self.BASIC_TYPES)) if self.is_input is True or self.is_target is True: if type(val) == list: if len(val) == 0: raise ValueError("Cannot append an empty list.") if self.content_dim == 2 and self._1d_list_check(val): # 1维list检查 pass elif self.content_dim == 3 and self._2d_list_check(val): # 2维list检查 pass else: raise RuntimeError( "Dimension not matched: expect dim={}, got {}.".format(self.content_dim - 1, val)) elif type(val) in self.BASIC_TYPES and self.content_dim == 1: # scalar检查 if type(val) == float and self.pytype == int: self.pytype = float self.dtype = self._map_to_np_type(self.pytype) else: raise RuntimeError( "Unexpected data type {}. Should be list, np.array, or {}".format(type(val), self.BASIC_TYPES)) self.content.append(val)
def __getitem__(self, indices): return self.get(indices) def __setitem__(self, idx, val): assert isinstance(idx, int) self.content[idx] = val
[docs] def get(self, indices, pad=True): """Fetch instances based on indices. :param indices: an int, or a list of int. :param pad: bool, 是否对返回的结果进行padding。 :return: """ if isinstance(indices, int): return self.content[indices] if self.is_input is False and self.is_target is False: raise RuntimeError("Please specify either is_input or is_target is True for {}".format(self.name)) contents = [self.content[i] for i in indices] if self.padder is None or pad is False: return np.array(contents) else: return self.padder(contents, field_name=self.name, field_ele_dtype=self.dtype)
[docs] def set_padder(self, padder): """ 设置padding方式 :param padder: PadderBase类型或None. 设置为None即删除padder. :return: """ if padder is not None: assert isinstance(padder, PadderBase), "padder must be of type PadderBase." self.padder = padder
[docs] def set_pad_val(self, pad_val): """ 修改padder的pad_val. :param pad_val: int。 :return: """ if self.padder is not None: self.padder.set_pad_val(pad_val)
def __len__(self): """Returns the size of FieldArray. :return int length: """ return len(self.content)
def is_iterable(content): try: _ = (e for e in content) except TypeError: return False return True
[docs]class EngChar2DPadder(PadderBase): """ 用于为英语执行character级别的2D padding操作。对应的field内容应该为[['T', 'h', 'i', 's'], ['a'], ['d', 'e', 'm', 'o']](这里为 了更直观,把它们写为str,但实际使用时它们应该是character的index)。 padded过后的batch内容,形状为(batch_size, max_sentence_length, max_word_length). max_sentence_length最大句子长度。 max_word_length最长的word的长度 """ def __init__(self, pad_val=0, pad_length=0): """ :param pad_val: int, padding的位置使用该index :param pad_length: int, 如果为0则取一个batch中最大的单词长度作为padding长度。如果为大于0的数,则将所有单词的长度都pad或截 取到该长度. """ super().__init__(pad_val=pad_val) self.pad_length = pad_length def _exactly_three_dims(self, contents, field_name): """ 检查传入的contents是否刚好是3维,如果不是3维就报错。理论上,第一个维度是batch,第二个维度是word,第三个维度是character :param contents: :param field_name: str :return: """ if not isinstance(contents, list): raise TypeError("contents should be a list, not {}.".format(type(contents))) value = contents[0] try: value = value[0] except: raise ValueError("Field:{} only has one dimension.".format(field_name)) try: value = value[0] except: raise ValueError("Field:{} only has two dimensions.".format(field_name)) if is_iterable(value): raise ValueError("Field:{} has more than 3 dimension.".format(field_name)) def __call__(self, contents, field_name, field_ele_dtype): """ 期望输入类似于 [ [[0, 2], [2, 3, 4], ..], [[9, 8, 2, 4], [1, 2,], ...], .... ] :param contents: :param field_name: :param field_ele_dtype :return: """ if field_ele_dtype not in (np.int64, np.float64): raise TypeError('dtype of Field:{} should be np.int64 or np.float64 to do 2D padding, get {}.'.format( field_name, field_ele_dtype )) self._exactly_three_dims(contents, field_name) if self.pad_length < 1: max_char_length = max(max([[len(char_lst) for char_lst in word_lst] for word_lst in contents])) else: max_char_length = self.pad_length max_sent_length = max(len(word_lst) for word_lst in contents) batch_size = len(contents) dtype = type(contents[0][0][0]) padded_array = np.full((batch_size, max_sent_length, max_char_length), fill_value=self.pad_val, dtype=dtype) for b_idx, word_lst in enumerate(contents): for c_idx, char_lst in enumerate(word_lst): chars = char_lst[:max_char_length] padded_array[b_idx, c_idx, :len(chars)] = chars return padded_array