# /usr/bin/env python3.5
# -*- mode: python -*-
# =============================================================================
# @@-COPYRIGHT-START-@@
#
# Copyright (c) 2021, Qualcomm Innovation Center, Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# SPDX-License-Identifier: BSD-3-Clause
#
# @@-COPYRIGHT-END-@@
# =============================================================================
"""Cross Layer Equalization"""
import typing
import numpy as np
import tensorflow as tf
import aimet_common.libpymo as libpymo
from aimet_common.utils import AimetLogger
from aimet_tensorflow.keras.batch_norm_fold import fold_all_batch_norms
from aimet_tensorflow.keras.graphsearchtuils import GraphSearchUtils, ClsSet
from aimet_tensorflow.keras.utils import model_transform_utils
from aimet_tensorflow.keras.utils.weight_tensor_utils import WeightTensorUtils
_logger = AimetLogger.get_area_logger(AimetLogger.LogAreas.CrosslayerEqualization)
BatchNormFoldedPair = typing.Union[typing.Tuple[tf.keras.layers.Conv2D,
tf.keras.layers.BatchNormalization],
typing.Tuple[tf.keras.layers.Dense,
tf.keras.layers.BatchNormalization]]
ScaleFactor = typing.Union[np.ndarray, typing.Tuple[np.ndarray, np.ndarray]]
ReluFlag = typing.Union[bool, typing.Tuple[bool, bool]]
[docs]class ClsSetInfo:
"""
This class hold information about the layers in a CLS set, along with corresponding scaling factors
and other information like if there is a ReLU activation function between the CLS set layers
"""
[docs] class ClsSetLayerPairInfo:
"""
Models a pair of layers that were scaled using CLS. And related information.
"""
def __init__(self, layer1: tf.keras.layers.Conv2D, layer2: tf.keras.layers.Conv2D, scale_factor: np.ndarray,
relu_activation_between_layers: bool):
"""
:param layer1: Layer whose bias is folded
:param layer2: Layer to which bias of previous layer's bias is folded
:param scale_factor: Scale Factor found from Cross Layer Scaling to scale BN parameters
:param relu_activation_between_layers: If the activation between layer1 and layer2 is Relu
"""
self.layer1 = layer1
self.layer2 = layer2
self.scale_factor = scale_factor
self.relu_activation_between_layers = relu_activation_between_layers
def __eq__(self, other):
if isinstance(self, other.__class__):
return self.layer1 == other.layer1 and \
self.layer2 == other.layer2 and \
np.allclose(self.scale_factor, other.scale_factor) and \
self.relu_activation_between_layers == other.relu_activation_between_layers
return False
def __init__(self, cls_pair_1: ClsSetLayerPairInfo, cls_pair_2: ClsSetLayerPairInfo = None):
"""
Constructor takes 2 pairs if Depth-wise separable layer is being folded
:param cls_pair_1: Pair between two conv or conv and depth-wise conv
:param cls_pair_2: Pair between depth-wise conv and point-wise conv
"""
if cls_pair_2:
self.cls_pair_info_list = [cls_pair_1, cls_pair_2]
else:
self.cls_pair_info_list = [cls_pair_1]
def __eq__(self, other):
if isinstance(self, other.__class__):
return self.cls_pair_info_list == other.cls_pair_info_list
return False
class CrossLayerScaling:
"""
Code to apply the cross-layer-scaling technique to a model
"""
@staticmethod
def scale_cls_set_with_conv_layers(
cls_set: typing.Tuple[tf.keras.layers.Conv2D, tf.keras.layers.Conv2D]) -> np.ndarray:
"""
API to invoke equalize layer params (update for weights and bias is in place)
:param cls_set: Consecutive Conv layers Tuple whose weights and biases need to be equalized
:return: Scaling factor S_12 for each conv layer pair: numpy array
"""
for layer in cls_set:
# NOTE: DepthwiseConv2D and Conv2DTranspose is subclass of Conv2D
# The check below covers all of Conv2D, DepthwiseConv2D and Conv2DTranspose class
if not isinstance(layer, tf.keras.layers.Conv2D):
raise ValueError("Only Conv or Transposed Conv layers are supported for CLE")
scaling_factor, prev_layer_params, curr_layer_params = CrossLayerScaling.call_mo_scale(cls_set)
prev_layer, curr_layer = cls_set
weight_and_bias_0 = CrossLayerScaling._unpack_equalization_params(prev_layer, prev_layer_params,
unpack_bias=True)
prev_layer.set_weights(weight_and_bias_0)
weight_and_bias_1 = CrossLayerScaling._unpack_equalization_params(curr_layer, curr_layer_params,
unpack_bias=False)
curr_layer.set_weights(weight_and_bias_1)
return scaling_factor
@staticmethod
def call_mo_scale(cls_set: typing.Tuple[tf.keras.layers.Conv2D, tf.keras.layers.Conv2D]) \
-> typing.Tuple[np.ndarray, libpymo.EqualizationParams, libpymo.EqualizationParams]:
"""
Invokes scale API in model optimization library
:param cls_set: Consecutive Conv layers Tuple whose weights and biases need to be equalized
:return: Scaling factor, prev and current layer updated parameters
"""
prev_layer_params = CrossLayerScaling._pack_equalization_params(cls_set[0], pack_bias=True)
curr_layer_params = CrossLayerScaling._pack_equalization_params(cls_set[1], pack_bias=False)
scaling_factor = libpymo.scaleLayerParams(prev_layer_params, curr_layer_params)
return scaling_factor, prev_layer_params, curr_layer_params
@staticmethod
def scale_cls_set_with_depthwise_conv_layers(
cls_set: typing.Tuple[tf.keras.layers.Conv2D,
tf.keras.layers.DepthwiseConv2D,
tf.keras.layers.Conv2D]) -> typing.Tuple[np.ndarray, np.ndarray]:
"""
API to invoke equalize layer params (update for weights and bias is in place)
:param cls_set: Consecutive Conv layers whose weights and biases need to be equalized.
Second Conv layer is a depth-wise conv and third conv layer is point-wise conv
:return: Scaling factors S_12 and S_23 : numpy arrays
"""
for layer in cls_set:
# NOTE: DepthwiseConv2D and Conv2DTranspose is subclass of Conv2D
# The check below covers all of Conv2D, DepthwiseConv2D and Conv2DTranspose class
if not isinstance(layer, tf.keras.layers.Conv2D):
raise ValueError("Only Conv or Transposed Conv layers are supported for CLE")
scaling_params, prev_layer_params, curr_layer_params, next_layer_params = \
CrossLayerScaling.call_mo_scale_depthwise_separable_layer(cls_set)
prev_layer, curr_layer, next_layer = cls_set
weight_and_bias_0 = CrossLayerScaling._unpack_equalization_params(prev_layer,
prev_layer_params,
unpack_bias=True)
prev_layer.set_weights(weight_and_bias_0)
weight_and_bias_1 = CrossLayerScaling._unpack_equalization_params(curr_layer,
curr_layer_params,
unpack_bias=True)
curr_layer.set_weights(weight_and_bias_1)
weight_and_bias_2 = CrossLayerScaling._unpack_equalization_params(next_layer,
next_layer_params,
unpack_bias=False)
next_layer.set_weights(weight_and_bias_2)
return scaling_params.scalingMatrix12, scaling_params.scalingMatrix23
@staticmethod
def call_mo_scale_depthwise_separable_layer(
cls_set: typing.Tuple[tf.keras.layers.Conv2D,
tf.keras.layers.DepthwiseConv2D,
tf.keras.layers.Conv2D]) -> typing.Tuple[libpymo.RescalingParamsVectors,
libpymo.EqualizationParams,
libpymo.EqualizationParams,
libpymo.EqualizationParams]:
"""
Invokes scale API in model optimization library
:param cls_set: Consecutive Conv layers whose weights and biases need to be equalized
:return: Scaling factors, prev, current and next layer updated parameters
"""
prev_layer_params = CrossLayerScaling._pack_equalization_params(cls_set[0], pack_bias=True)
curr_layer_params = CrossLayerScaling._pack_equalization_params(cls_set[1], pack_bias=True)
next_layer_params = CrossLayerScaling._pack_equalization_params(cls_set[2], pack_bias=False)
scaling_params = libpymo.scaleDepthWiseSeparableLayer(prev_layer_params, curr_layer_params, next_layer_params)
return scaling_params, prev_layer_params, curr_layer_params, next_layer_params
@staticmethod
def _pack_equalization_params(layer: tf.keras.layers.Conv2D, pack_bias: bool) -> libpymo.EqualizationParams:
equalization_params = libpymo.EqualizationParams()
param_tensors = layer.get_weights()
weight_tensor = param_tensors[0]
weight_tensor = WeightTensorUtils.transpose_from_tf_to_libpymo_format(weight_tensor, layer)
equalization_params.weight = weight_tensor.reshape(-1)
equalization_params.weightShape = np.array(weight_tensor.shape)
if pack_bias:
if layer.use_bias:
equalization_params.bias = param_tensors[1]
else:
equalization_params.isBiasNone = True
return equalization_params
@staticmethod
def _unpack_equalization_params(layer: tf.keras.layers.Conv2D,
equalization_params: libpymo.EqualizationParams,
unpack_bias: bool) -> typing.List:
weight_tensor = np.reshape(equalization_params.weight, equalization_params.weightShape)
weight_tensor = WeightTensorUtils.transpose_from_libpymo_to_tf_format(weight_tensor, layer)
if layer.use_bias:
if unpack_bias:
bias_tensor = np.reshape(equalization_params.bias, equalization_params.weightShape[0])
else:
_, bias_tensor = layer.get_weights()
param_tensors = [weight_tensor, bias_tensor]
else:
param_tensors = [weight_tensor]
return param_tensors
@staticmethod
def scale_cls_sets(cls_sets: typing.List[ClsSet]) -> \
typing.List[typing.Union[np.ndarray, typing.Tuple[np.ndarray, np.ndarray]]]:
"""
Scale each cls set
:param cls_sets: Cls sets to scale
:return: List of scale factors corresponding to each scaled cls set
"""
scale_factor_list = []
for cls_set in cls_sets:
if len(cls_set) == 3:
scale_factor = CrossLayerScaling.scale_cls_set_with_depthwise_conv_layers(cls_set)
else:
scale_factor = CrossLayerScaling.scale_cls_set_with_conv_layers(cls_set)
scale_factor_list.append(scale_factor)
return scale_factor_list
@staticmethod
def create_cls_set_info_list(cls_sets: typing.List[ClsSet],
scale_factors: typing.List[ScaleFactor],
is_relu_activation_in_cls_sets: typing.List[ReluFlag]) -> typing.List[ClsSetInfo]:
"""
Binds information from there separate lists into one [ClsInfoSet] data structure
:param cls_sets: List of CLS sets
:param scale_factors: List of scale factors for each cls set
:param is_relu_activation_in_cls_sets: List of ReLU flag whether there is ReLU activation in each cls set
:return: List of ClsSetInfo
"""
assert len(cls_sets) == len(scale_factors) == len(is_relu_activation_in_cls_sets)
cls_set_info_list = []
for cls_set, scale_factor, has_relu_activation in zip(cls_sets,
scale_factors,
is_relu_activation_in_cls_sets):
# Depthwise separable convolution layer case (triplet of layers)
# Should have two scale factors and ReLU flags
if isinstance(scale_factor, tuple):
assert len(cls_set) == 3
assert len(scale_factor) == len(has_relu_activation) == 2
prev_layer, curr_layer, next_layer = cls_set
cls_pair_1 = ClsSetInfo.ClsSetLayerPairInfo(prev_layer, curr_layer,
scale_factor[0], has_relu_activation[0])
cls_pair_2 = ClsSetInfo.ClsSetLayerPairInfo(curr_layer, next_layer,
scale_factor[1], has_relu_activation[1])
cls_set_info = ClsSetInfo(cls_pair_1, cls_pair_2)
# Standard convolution layer case (tuple of layers)
# Should have one scale factor and ReLU flag
else:
prev_layer, curr_layer = cls_set
cls_pair = ClsSetInfo.ClsSetLayerPairInfo(prev_layer, curr_layer,
scale_factor, has_relu_activation)
cls_set_info = ClsSetInfo(cls_pair)
cls_set_info_list.append(cls_set_info)
return cls_set_info_list
@staticmethod
def scale_model(model: tf.keras.Model) -> typing.List[ClsSetInfo]:
"""
Uses cross-layer scaling to scale all applicable layers in the given model
:param model: tf.keras.Model
:return: CLS information for each CLS set
"""
# Find layer groups
graph_search_util = GraphSearchUtils(model)
layer_groups = graph_search_util.find_layer_groups_to_scale()
# Find cls sets from the layer groups
cls_sets = []
for layer_group in layer_groups:
cls_set = GraphSearchUtils.convert_layer_group_to_cls_sets(layer_group)
cls_sets += cls_set
# Scale the CLS sets
scale_factors = CrossLayerScaling.scale_cls_sets(cls_sets)
# Find if there were ReLU activations between layers of each cls set
is_relu_activation_in_cls_sets = graph_search_util.is_relu_activation_present_in_cls_sets(cls_sets)
# Convert to a list of cls set info elements
cls_set_info_list = CrossLayerScaling.create_cls_set_info_list(cls_sets,
scale_factors,
is_relu_activation_in_cls_sets)
return cls_set_info_list
class HighBiasFold:
"""
Code to apply the high-bias-fold technique to a model
"""
@staticmethod
def bias_fold(cls_set_info_list: typing.List[ClsSetInfo],
bn_layers: typing.Dict[tf.keras.layers.Conv2D, tf.keras.layers.BatchNormalization]):
"""
Folds bias values greater than 3 * sigma to next layer's bias
:param cls_set_info_list: List of info elements for each cls set
:param bn_layers: Key: Conv/Linear layer Value: Corresponding folded BN layer
"""
if not bn_layers:
_logger.info('High Bias folding is not supported for models without BatchNorm Layers')
return
for cls_set_info in cls_set_info_list:
for cls_pair_info in cls_set_info.cls_pair_info_list:
if (not cls_pair_info.layer1.use_bias) or (not cls_pair_info.layer2.use_bias) or \
(cls_pair_info.layer1 not in bn_layers):
continue
prev_layer_params, curr_layer_params = HighBiasFold.call_mo_high_bias_fold(cls_pair_info, bn_layers)
layer1 = cls_pair_info.layer1
layer1_weight_tensor, _ = layer1.get_weights()
layer1_bias_tensor = np.array(prev_layer_params.bias)
layer1.set_weights([layer1_weight_tensor, layer1_bias_tensor])
layer2 = cls_pair_info.layer2
layer2_weight_tensor, _ = layer2.get_weights()
layer2_bias_tensor = np.array(curr_layer_params.bias)
layer2.set_weights([layer2_weight_tensor, layer2_bias_tensor])
@staticmethod
def call_mo_high_bias_fold(cls_pair_info: ClsSetInfo.ClsSetLayerPairInfo,
bn_layers: typing.Dict[tf.keras.layers.Conv2D, tf.keras.layers.BatchNormalization]) \
-> typing.Tuple[libpymo.LayerParams, libpymo.LayerParams]:
"""
Invokes high bias fold MO API
:param cls_pair_info: Pair of layers that were scaled using CLS and related information
:param bn_layers: Key: Conv/Linear layer Value: Corresponding folded BN layer
:return: Updated layer params
"""
bn_layer = bn_layers[cls_pair_info.layer1]
prev_layer_bn_params = HighBiasFold._pack_bn_params_high_bias_fold(bn_layer, cls_pair_info.scale_factor)
prev_layer_params, curr_layer_params = HighBiasFold._pack_layer_params(cls_pair_info)
libpymo.updateBias(prev_layer_params, curr_layer_params, prev_layer_bn_params)
return prev_layer_params, curr_layer_params
@staticmethod
def _pack_bn_params_high_bias_fold(bn_layer: tf.keras.layers.BatchNormalization,
scaling_parameter: np.ndarray) -> libpymo.BNParamsHighBiasFold:
"""
Helper method to pack BatchNormalization parameter for high bias fold
:param bn_layer: Target batch normalization layer
:param scaling_parameter: Scaling parameters for each channel obtained from cross layer scaling
:return: Packed BNParamsHighBiasFold
"""
bn_params = libpymo.BNParamsHighBiasFold()
# Note: In BatchNormFold, we initialize gamma and beta to 1 and 0 respectively to work as Identity
# So if the original value was set, use it for High Bias Fold
if hasattr(bn_layer, "original_gamma") and hasattr(bn_layer, "original_beta"):
gamma, beta = bn_layer.original_gamma, bn_layer.original_beta
else:
gamma, beta, _, _ = bn_layer.get_weights()
if len(scaling_parameter) != len(gamma) or len(scaling_parameter) != len(beta):
raise ValueError("High Bias absorption is not supported for networks with fold-forward BatchNorms")
bn_params.gamma = np.divide(gamma, scaling_parameter)
bn_params.beta = np.divide(beta, scaling_parameter)
return bn_params
@staticmethod
def _pack_layer_params(cls_pair_info: ClsSetInfo.ClsSetLayerPairInfo) \
-> typing.Tuple[libpymo.LayerParams, libpymo.LayerParams]:
"""
Helper method to pack information of previous and current layer
:param cls_pair_info: Pair of layers that were scaled using CLS and related information
:return: Packed layer parameter tuple
"""
# Pack parameters for previous layer
prev_layer_params = libpymo.LayerParams()
prev_layer = cls_pair_info.layer1
prev_layer_params.activationIsRelu = cls_pair_info.relu_activation_between_layers
_, prev_layer_bias_tensor = prev_layer.get_weights()
prev_layer_params.bias = prev_layer_bias_tensor
# Pack parameters for current layer
curr_layer_params = libpymo.LayerParams()
curr_layer = cls_pair_info.layer2
curr_layer_weight_tensor, curr_layer_bias_tensor = curr_layer.get_weights()
curr_layer_weight_tensor = WeightTensorUtils.transpose_from_tf_to_libpymo_format(curr_layer_weight_tensor,
curr_layer)
curr_layer_params.bias = curr_layer_bias_tensor
curr_layer_params.weight = curr_layer_weight_tensor.reshape(-1)
curr_layer_params.weightShape = np.array(curr_layer_weight_tensor.shape)
return prev_layer_params, curr_layer_params
[docs]def equalize_model(model: tf.keras.Model) -> tf.keras.Model:
"""
High-level API to perform Cross-Layer Equalization (CLE) on the given model
:param model: tf.keras.Model
:return: CLE applied tf.keras.Model
"""
# replace any ReLU6 layers with ReLU
model_for_cle, _ = model_transform_utils.replace_relu6_with_relu(model)
folded_pairs, model_for_cle = fold_all_batch_norms(model_for_cle)
equalize_bn_folded_model(model_for_cle, folded_pairs)
return model_for_cle
def equalize_bn_folded_model(model: tf.keras.Model,
folded_pairs: typing.List[BatchNormFoldedPair]):
"""
Perform Cross-Layer Scaling (CLS) and High Bias Folding (HBF) on a batchnorm-folded model in-place
:param model: BatchNorm-folded model to equalize
:param folded_pairs: List of pairs of folded layers
"""
bn_dict = {}
for conv_or_linear, bn in folded_pairs:
bn_dict[conv_or_linear] = bn
# perform cross-layer scaling on applicable layer sets
cls_set_info_list = CrossLayerScaling.scale_model(model)
# high-bias fold
HighBiasFold.bias_fold(cls_set_info_list, bn_dict)