from collections import namedtuple, OrderedDict
from copy import copy
from itertools import chain
from tensorflow.python.keras.initializers import RandomNormal, Zeros
from tensorflow.python.keras.layers import Input
from .inputs import create_embedding_matrix, embedding_lookup, get_dense_input, varlen_embedding_lookup, \
get_varlen_pooling_list, mergeDict
from .layers import Linear
from .layers.utils import concat_func, add_func
DEFAULT_GROUP_NAME = "default_group"
[docs]class SparseFeat(namedtuple('SparseFeat',
['name', 'vocabulary_size', 'embedding_dim', 'use_hash', 'dtype', 'embeddings_initializer',
'embedding_name',
'group_name', 'trainable'])):
__slots__ = ()
def __new__(cls, name, vocabulary_size, embedding_dim=4, use_hash=False, dtype="int32", embeddings_initializer=None,
embedding_name=None,
group_name=DEFAULT_GROUP_NAME, trainable=True):
if embedding_dim == "auto":
embedding_dim = 6 * int(pow(vocabulary_size, 0.25))
if embeddings_initializer is None:
embeddings_initializer = RandomNormal(mean=0.0, stddev=0.0001, seed=2020)
if embedding_name is None:
embedding_name = name
return super(SparseFeat, cls).__new__(cls, name, vocabulary_size, embedding_dim, use_hash, dtype,
embeddings_initializer,
embedding_name, group_name, trainable)
def __hash__(self):
return self.name.__hash__()
[docs]class VarLenSparseFeat(namedtuple('VarLenSparseFeat',
['sparsefeat', 'maxlen', 'combiner', 'length_name', 'weight_name', 'weight_norm'])):
__slots__ = ()
def __new__(cls, sparsefeat, maxlen, combiner="mean", length_name=None, weight_name=None, weight_norm=True):
return super(VarLenSparseFeat, cls).__new__(cls, sparsefeat, maxlen, combiner, length_name, weight_name,
weight_norm)
@property
def name(self):
return self.sparsefeat.name
@property
def vocabulary_size(self):
return self.sparsefeat.vocabulary_size
@property
def embedding_dim(self):
return self.sparsefeat.embedding_dim
@property
def use_hash(self):
return self.sparsefeat.use_hash
@property
def dtype(self):
return self.sparsefeat.dtype
@property
def embeddings_initializer(self):
return self.sparsefeat.embeddings_initializer
@property
def embedding_name(self):
return self.sparsefeat.embedding_name
@property
def group_name(self):
return self.sparsefeat.group_name
@property
def trainable(self):
return self.sparsefeat.trainable
def __hash__(self):
return self.name.__hash__()
[docs]class DenseFeat(namedtuple('DenseFeat', ['name', 'dimension', 'dtype', 'transform_fn'])):
""" Dense feature
Args:
name: feature name,
dimension: dimension of the feature, default = 1.
dtype: dtype of the feature, default="float32".
transform_fn: If not `None` , a function that can be used to transform
values of the feature. the function takes the input Tensor as its
argument, and returns the output Tensor.
(e.g. lambda x: (x - 3.0) / 4.2).
"""
__slots__ = ()
def __new__(cls, name, dimension=1, dtype="float32", transform_fn=None):
return super(DenseFeat, cls).__new__(cls, name, dimension, dtype, transform_fn)
def __hash__(self):
return self.name.__hash__()
# def __eq__(self, other):
# if self.name == other.name:
# return True
# return False
# def __repr__(self):
# return 'DenseFeat:'+self.name
[docs]def get_feature_names(feature_columns):
features = build_input_features(feature_columns)
return list(features.keys())
[docs]def get_linear_logit(features, feature_columns, units=1, use_bias=False, seed=1024, prefix='linear',
l2_reg=0):
linear_feature_columns = copy(feature_columns)
for i in range(len(linear_feature_columns)):
if isinstance(linear_feature_columns[i], SparseFeat):
linear_feature_columns[i] = linear_feature_columns[i]._replace(embedding_dim=1,
embeddings_initializer=Zeros())
if isinstance(linear_feature_columns[i], VarLenSparseFeat):
linear_feature_columns[i] = linear_feature_columns[i]._replace(
sparsefeat=linear_feature_columns[i].sparsefeat._replace(embedding_dim=1,
embeddings_initializer=Zeros()))
linear_emb_list = [input_from_feature_columns(features, linear_feature_columns, l2_reg, seed,
prefix=prefix + str(i))[0] for i in range(units)]
_, dense_input_list = input_from_feature_columns(features, linear_feature_columns, l2_reg, seed, prefix=prefix)
linear_logit_list = []
for i in range(units):
if len(linear_emb_list[i]) > 0 and len(dense_input_list) > 0:
sparse_input = concat_func(linear_emb_list[i])
dense_input = concat_func(dense_input_list)
linear_logit = Linear(l2_reg, mode=2, use_bias=use_bias, seed=seed)([sparse_input, dense_input])
elif len(linear_emb_list[i]) > 0:
sparse_input = concat_func(linear_emb_list[i])
linear_logit = Linear(l2_reg, mode=0, use_bias=use_bias, seed=seed)(sparse_input)
elif len(dense_input_list) > 0:
dense_input = concat_func(dense_input_list)
linear_logit = Linear(l2_reg, mode=1, use_bias=use_bias, seed=seed)(dense_input)
else:
# raise NotImplementedError
return add_func([])
linear_logit_list.append(linear_logit)
return concat_func(linear_logit_list)