Source code for typhon.retrieval.qrnn.models.keras

"""
typhon.retrieval.qrnn.models.keras
==================================

This module provides Keras neural network models that can be used as backend
models with the :py:class:`typhon.retrieval.qrnn.QRNN` class.
"""
import logging
import numpy as np
import keras
from keras.models import Sequential
from keras.layers import Dense, Activation, deserialize
from keras.optimizers import SGD
import keras.backend as K


def save_model(f, model):
    """
    Save keras model.

    Args:
        f(:code:`str` or binary stream): Either a path or a binary stream
            to store the data to.
        model(:code:`keras.models.Models`): The Keras model to save
    """
    keras.models.save_model(model, f)


def load_model(f, quantiles):
    """
    Load keras model.

    Args:
        f(:code:`str` or binary stream): Either a path or a binary stream
            to read the model from
        quantiles(:code:`np.ndarray`): Array containing the quantiles
            that the model predicts.

    Returns:
        The loaded keras model.
    """
    #
    # This is a bit hacky but seems required to handle
    # the custom model classes.
    #
    def make_fully_connected(layers=None, **kwargs):
        layers = list(map(deserialize, layers))
        input_dimensions = layers[0].batch_input_shape[1]
        return FullyConnected(input_dimensions, quantiles, (), layers)

    custom_objects = {
        "FullyConnected": make_fully_connected,
        "QuantileLoss": QuantileLoss,
    }
    model = keras.models.load_model(f, custom_objects=custom_objects)
    return model


################################################################################
# Quantile loss
################################################################################

LOGGER = logging.getLogger(__name__)


def skewed_absolute_error(y_true, y_pred, tau):
    """
    The quantile loss function for a given quantile tau:

    L(y_true, y_pred) = (tau - I(y_pred < y_true)) * (y_pred - y_true)

    Where I is the indicator function.
    """
    dy = y_pred - y_true
    return K.mean((1.0 - tau) * K.relu(dy) + tau * K.relu(-dy), axis=-1)


def quantile_loss(y_true, y_pred, taus):
    """
    The quantiles loss for a list of quantiles. Sums up the error contribution
    from the each of the quantile loss functions.
    """
    e = skewed_absolute_error(K.flatten(y_true), K.flatten(y_pred[:, 0]), taus[0])
    for i, tau in enumerate(taus[1:]):
        e += skewed_absolute_error(K.flatten(y_true), K.flatten(y_pred[:, i + 1]), tau)
    return e


class QuantileLoss:
    """
    Wrapper class for the quantile error loss function. A class is used here
    to allow the implementation of a custom `__repr` function, so that the
    loss function object can be easily loaded using `keras.model.load`.

    Attributes:

        quantiles: List of quantiles that should be estimated with
                   this loss function.

    """

    def __init__(self, quantiles):
        self.__name__ = "QuantileLoss"
        self.quantiles = quantiles

    def __call__(self, y_true, y_pred):
        return quantile_loss(y_true, y_pred, self.quantiles)

    def __repr__(self):
        return "QuantileLoss(" + repr(self.quantiles) + ")"


class KerasModel:
    def __init__(self):
        pass

    def train(self):
        pass


################################################################################
# Keras data generators
################################################################################


class BatchedDataset:
    """
    Keras data loader that batches a given dataset of numpy arryas.
    """

    def __init__(self, training_data, batch_size):
        """
        Create batched dataset.

        Args:
            training_data: Tuple :code:`(x, y)` containing the input
               and output data as arrays.
            batch_size(:code:`int`): The batch size
        """
        x, y = training_data
        self.x = x
        self.y = y
        self.bs = batch_size
        self.indices = np.random.permutation(x.shape[0])
        self.i = 0

    def __iter__(self):
        LOGGER.info("iter...")
        return self

    def __len__(self):
        return self.x.shape[0] // self.bs

    def __next__(self):
        inds = self.indices[
            np.arange(self.i * self.bs, (self.i + 1) * self.bs) % self.indices.size
        ]
        x_batch = np.copy(self.x[inds, :])
        y_batch = self.y[inds]
        self.i = self.i + 1
        # Shuffle training set after each epoch.
        if self.i % (self.x.shape[0] // self.bs) == 0:
            self.indices = np.random.permutation(self.x.shape[0])

        return (x_batch, y_batch)


class TrainingGenerator:
    """
    This Keras sample generator takes a generator for noise-free training data
    and adds independent Gaussian noise to each of the components of the input.

    Attributes:
        training_data: Data generator providing the data
        sigma_noise: A vector containing the standard deviation of each
                 component.
    """

    def __init__(self, training_data, sigma_noise=None):
        """
        Args:
            training_data: Data generator providing the original (noise-free)
                training data.
            sigma_noise: Vector the length of the input dimensions specifying
                the standard deviation of the noise.
        """
        self.training_data = training_data
        self.sigma_noise = sigma_noise

    def __iter__(self):
        LOGGER.info("iter...")
        return self

    def __len__(self):
        return len(self.training_data)

    def __next__(self):
        x_batch, y_batch = next(self.training_data)
        if not self.sigma_noise is None:
            x_batch += np.random.randn(*x_batch.shape) * self.sigma_noise
        return (x_batch, y_batch)


class AdversarialTrainingGenerator:
    """
    This Keras sample generator takes the noise-free training data
    and adds independent Gaussian noise to each of the components
    of the input.

    Attributes:
        training_data: Training generator to use to generate the input
            data
        input_gradients: Keras function to compute the gradients of the
            network
        eps: The perturbation factor.
    """

    def __init__(self, training_data, input_gradients, eps):
        """
        Args:
            training_data: Training generator to use to generate the input
                data
            input_gradients: Keras function to compute the gradients of the
                network
            eps: The perturbation factor.
        """
        self.training_data = training_data
        self.input_gradients = input_gradients
        self.eps = eps

    def __iter__(self):
        LOGGER.info("iter...")
        return self

    def __len__(self):
        return len(self.training_data)

    def __next__(self):
        if self.i % 2 == 0:
            x_batch, y_batch = next(self.training_data)
            self.x_batch = x_batch
            self.y_batch = y_batch
        else:
            x_batch = self.x_batch
            y_batch = self.y_batch
            grads = self.input_gradients([x_batch, y_batch, 1.0])
            x_batch += self.eps * np.sign(grads)

        self.i = self.i + 1
        return x_batch, y_batch


class ValidationGenerator:
    """
    This Keras sample generator is similar to the training generator
    only that it returns the whole validation set and doesn't perform
    any randomization.

    Attributes:

        x_val: The validation input, i.e. the brightness temperatures
                 measured by the satellite.
        y_val: The validation output, i.e. the value of the retrieval
                 quantity.
        x_mean: A vector containing the mean of each input component.
        x_sigma: A vector containing the standard deviation of each
                 component.
    """

    def __init__(self, validation_data, sigma_noise):
        self.validation_data = validation_data
        self.sigma_noise = sigma_noise

    def __iter__(self):
        return self

    def __next__(self):
        x_val, y_val = next(self.validation_data)
        if not self.sigma_noise is None:
            x_val += np.random.randn(*self.x_val.shape) * self.sigma_noise
        return (x_val, self.y_val)


################################################################################
# LRDecay
################################################################################


class LRDecay(keras.callbacks.Callback):
    """
    The LRDecay class implements the Keras callback interface and reduces
    the learning rate according to validation loss reduction.

    Attributes:

        lr_decay: The factor c > 1.0 by which the learning rate is
                  reduced.
        lr_minimum: The training is stopped when this learning rate
                    is reached.
        convergence_steps: The number of epochs without validation loss
                           reduction required to reduce the learning rate.

    """

    def __init__(self, model, lr_decay, lr_minimum, convergence_steps):
        self.model = model
        self.lr_decay = lr_decay
        self.lr_minimum = lr_minimum
        self.convergence_steps = convergence_steps
        self.steps = 0

    def on_train_begin(self, logs={}):
        self.losses = []
        self.steps = 0
        self.min_loss = 1e30

    def on_epoch_end(self, epoch, logs={}):
        loss = logs.get("val_loss")
        if loss is None:
            loss = logs.get("loss")
        self.losses += [loss]
        if not self.losses[-1] < self.min_loss:
            self.steps = self.steps + 1
        else:
            self.steps = 0
        if self.steps > self.convergence_steps:
            lr = keras.backend.get_value(self.model.optimizer.lr)
            keras.backend.set_value(self.model.optimizer.lr, lr / self.lr_decay)
            self.steps = 0
            LOGGER.info("\n Reduced learning rate to " + str(lr))

            if lr < self.lr_minimum:
                self.model.stop_training = True

        self.min_loss = min(self.min_loss, self.losses[-1])


################################################################################
# QRNN
################################################################################


class KerasModel:
    r"""
    Base class for Keras models.

    This base class provides generic utility function for the training, saving
    and evaluation of Keras models.

    Attributes:
        input_dimensions (int): The input dimension of the neural network, i.e.
            the dimension of the measurement vector.
        quantiles (numpy.array): The 1D-array containing the quantiles
            :math:`\tau \in [0, 1]` that the network learns to predict.

        depth (int):
            The number layers in the network excluding the input layer.

        width (int):
            The width of the hidden layers in the network.

        activation (str):
            The name of the activation functions to use in the hidden layers
            of the network.

        models (list of keras.models.Sequential):
            The ensemble of Keras neural networks used for the quantile regression
            neural network.
    """

    def __init__(self, input_dimension, quantiles):
        """
        Create a QRNN model.

        Arguments:
            input_dimension(int): The dimension of the measurement space, i.e. the number
                            of elements in a single measurement vector y

            quantiles(np.array): 1D-array containing the quantiles  to estimate of
                                 the posterior distribution. Given as fractions
                                 within the range [0, 1].
        """
        self.input_dimension = input_dimension
        self.quantiles = np.array(quantiles)

    def reset(self):
        """
        Reinitialize the state of the model.
        """
        self.reset_states()

    def train(
        self,
        training_data,
        validation_data=None,
        batch_size=256,
        sigma_noise=None,
        adversarial_training=False,
        delta_at=0.01,
        initial_learning_rate=1e-2,
        momentum=0.0,
        convergence_epochs=5,
        learning_rate_decay=2.0,
        learning_rate_minimum=1e-6,
        maximum_epochs=200,
        training_split=0.9,
        gpu=False,
    ):

        if type(training_data) == tuple:
            if not type(training_data[0]) == np.ndarray:
                raise ValueError(
                    "When training data is provided as tuple"
                    " (x, y) it must contain numpy arrays."
                )
            training_data = BatchedDataset(training_data, batch_size)

        if type(validation_data) is tuple:
            validation_data = BatchedDataset(validation_data, batch_size)

        loss = QuantileLoss(self.quantiles)

        # Compile model
        self.custom_objects = {loss.__name__: loss}
        optimizer = SGD(lr=initial_learning_rate)
        self.compile(loss=loss, optimizer=optimizer)

        #
        # Setup training generator
        #
        training_generator = TrainingGenerator(training_data, sigma_noise)
        if adversarial_training:
            inputs = [self.input, self.targets[0], self.sample_weights[0]]
            input_gradients = K.function(
                inputs, K.gradients(self.total_loss, self.input)
            )
            training_generator = AdversarialTrainingGenerator(
                training_generator, input_gradients, delta_at
            )

        if validation_data is None:
            validation_generator = None
        else:
            validation_generator = ValidationGenerator(validation_data, sigma_noise)
        lr_callback = LRDecay(
            self, learning_rate_decay, learning_rate_minimum, convergence_epochs
        )
        self.fit_generator(
            training_generator,
            steps_per_epoch=len(training_generator),
            epochs=maximum_epochs,
            validation_data=validation_generator,
            validation_steps=1,
            callbacks=[lr_callback],
        )


################################################################################
# Fully-connected network
################################################################################


[docs] class FullyConnected(KerasModel, Sequential): """ Keras implementation of fully-connected networks. """
[docs] def __init__(self, input_dimension, quantiles, arch, layers=None): """ Create a fully-connected neural network. Args: input_dimension(:code:`int`): Number of input features quantiles(:code:`array`): The quantiles to predict given as fractions within [0, 1]. arch(tuple): Tuple :code:`(d, w, a)` containing :code:`d`, the number of hidden layers in the network, :code:`w`, the width of the network and :code:`a`, the type of activation functions to be used as string. """ quantiles = np.array(quantiles) output_dimension = quantiles.size if layers is None: if len(arch) == 0: layers = [Dense(output_dimension, input_shape=(input_dimension))] else: d, w, a = arch layers = [Dense(w, input_shape=(input_dimension,))] for _ in range(d - 1): layers.append(Dense(w, input_shape=(w,))) if a is not None: layers.append(Activation(a)) layers.append(Dense(output_dimension, input_shape=(w,))) KerasModel.__init__(self, input_dimension, quantiles) Sequential.__init__(self, layers)