import tensorflow as tf
from collections import namedtuple, OrderedDict
from copy import copy
from itertools import chain
from tensorflow.python.keras.initializers import RandomNormal, Zeros
from tensorflow.python.keras.layers import Input, Lambda
from .inputs import create_embedding_matrix, embedding_lookup, get_dense_input, varlen_embedding_lookup, \
get_varlen_pooling_list, mergeDict
from .layers import Linear
from .layers.utils import concat_func
DEFAULT_GROUP_NAME = "default_group"
[docs]class SparseFeat(namedtuple('SparseFeat',
['name', 'vocabulary_size', 'embedding_dim', 'use_hash', 'vocabulary_path', 'dtype', 'embeddings_initializer',
'embedding_name',
'group_name', 'trainable'])):
__slots__ = ()
def __new__(cls, name, vocabulary_size, embedding_dim=4, use_hash=False, vocabulary_path=None, dtype="int32", embeddings_initializer=None,
embedding_name=None,
group_name=DEFAULT_GROUP_NAME, trainable=True):
if embedding_dim == "auto":
embedding_dim = 6 * int(pow(vocabulary_size, 0.25))
if embeddings_initializer is None:
embeddings_initializer = RandomNormal(mean=0.0, stddev=0.0001, seed=2020)
if embedding_name is None:
embedding_name = name
return super(SparseFeat, cls).__new__(cls, name, vocabulary_size, embedding_dim, use_hash, vocabulary_path, dtype,
embeddings_initializer,
embedding_name, group_name, trainable)
def __hash__(self):
return self.name.__hash__()
[docs]class VarLenSparseFeat(namedtuple('VarLenSparseFeat',
['sparsefeat', 'maxlen', 'combiner', 'length_name', 'weight_name', 'weight_norm'])):
__slots__ = ()
def __new__(cls, sparsefeat, maxlen, combiner="mean", length_name=None, weight_name=None, weight_norm=True):
return super(VarLenSparseFeat, cls).__new__(cls, sparsefeat, maxlen, combiner, length_name, weight_name,
weight_norm)
@property
def name(self):
return self.sparsefeat.name
@property
def vocabulary_size(self):
return self.sparsefeat.vocabulary_size
@property
def embedding_dim(self):
return self.sparsefeat.embedding_dim
@property
def use_hash(self):
return self.sparsefeat.use_hash
@property
def vocabulary_path(self):
return self.sparsefeat.vocabulary_path
@property
def dtype(self):
return self.sparsefeat.dtype
@property
def embeddings_initializer(self):
return self.sparsefeat.embeddings_initializer
@property
def embedding_name(self):
return self.sparsefeat.embedding_name
@property
def group_name(self):
return self.sparsefeat.group_name
@property
def trainable(self):
return self.sparsefeat.trainable
def __hash__(self):
return self.name.__hash__()
[docs]class DenseFeat(namedtuple('DenseFeat', ['name', 'dimension', 'dtype', 'transform_fn'])):
""" Dense feature
Args:
name: feature name.
dimension: dimension of the feature, default = 1.
dtype: dtype of the feature, default="float32".
transform_fn: If not `None` , a function that can be used to transform
values of the feature. the function takes the input Tensor as its
argument, and returns the output Tensor.
(e.g. lambda x: (x - 3.0) / 4.2).
"""
__slots__ = ()
def __new__(cls, name, dimension=1, dtype="float32", transform_fn=None):
return super(DenseFeat, cls).__new__(cls, name, dimension, dtype, transform_fn)
def __hash__(self):
return self.name.__hash__()
# def __eq__(self, other):
# if self.name == other.name:
# return True
# return False
# def __repr__(self):
# return 'DenseFeat:'+self.name
[docs]def get_feature_names(feature_columns):
features = build_input_features(feature_columns)
return list(features.keys())
[docs]def get_linear_logit(features, feature_columns, units=1, use_bias=False, seed=1024, prefix='linear',
l2_reg=0, sparse_feat_refine_weight=None):
linear_feature_columns = copy(feature_columns)
for i in range(len(linear_feature_columns)):
if isinstance(linear_feature_columns[i], SparseFeat):
linear_feature_columns[i] = linear_feature_columns[i]._replace(embedding_dim=1,
embeddings_initializer=Zeros())
if isinstance(linear_feature_columns[i], VarLenSparseFeat):
linear_feature_columns[i] = linear_feature_columns[i]._replace(
sparsefeat=linear_feature_columns[i].sparsefeat._replace(embedding_dim=1,
embeddings_initializer=Zeros()))
linear_emb_list = [input_from_feature_columns(features, linear_feature_columns, l2_reg, seed,
prefix=prefix + str(i))[0] for i in range(units)]
_, dense_input_list = input_from_feature_columns(features, linear_feature_columns, l2_reg, seed, prefix=prefix)
linear_logit_list = []
for i in range(units):
if len(linear_emb_list[i]) > 0 and len(dense_input_list) > 0:
sparse_input = concat_func(linear_emb_list[i])
dense_input = concat_func(dense_input_list)
if sparse_feat_refine_weight is not None:
sparse_input = Lambda(lambda x: x[0] * tf.expand_dims(x[1], axis=1))(
[sparse_input, sparse_feat_refine_weight])
linear_logit = Linear(l2_reg, mode=2, use_bias=use_bias, seed=seed)([sparse_input, dense_input])
elif len(linear_emb_list[i]) > 0:
sparse_input = concat_func(linear_emb_list[i])
if sparse_feat_refine_weight is not None:
sparse_input = Lambda(lambda x: x[0] * tf.expand_dims(x[1], axis=1))(
[sparse_input, sparse_feat_refine_weight])
linear_logit = Linear(l2_reg, mode=0, use_bias=use_bias, seed=seed)(sparse_input)
elif len(dense_input_list) > 0:
dense_input = concat_func(dense_input_list)
linear_logit = Linear(l2_reg, mode=1, use_bias=use_bias, seed=seed)(dense_input)
else: #empty feature_columns
return Lambda(lambda x: tf.constant([[0.0]]))(list(features.values())[0])
linear_logit_list.append(linear_logit)
return concat_func(linear_logit_list)