Source code for xanfis.models.base_anfis

#!/usr/bin/env python
# Created by "Thieu" at 21:01, 10/04/2025 ----------%                                                                               
#       Email: nguyenthieu2102@gmail.com            %                                                    
#       Github: https://github.com/thieu1995        %                         
# --------------------------------------------------%

import numbers
from typing import TypeVar
import inspect
import pprint
import pickle
import numpy as np
import pandas as pd
from pathlib import Path
import torch
import torch.nn as nn
from sklearn.base import BaseEstimator
from mealpy import get_optimizer_by_class, Optimizer, get_all_optimizers, FloatVar
from permetrics import ClassificationMetric, RegressionMetric
from xanfis.helpers import membership_family as mfam
from xanfis.helpers.metric_util import get_all_regression_metrics, get_all_classification_metrics
from xanfis.helpers import validator


# Create a TypeVar for the base class
EstimatorType = TypeVar('EstimatorType', bound='BaseAnfis')


[docs]class EarlyStopper: """ A utility class for implementing early stopping in training processes to prevent overfitting. Attributes: - patience (int): Number of consecutive epochs to tolerate no improvement before stopping. - epsilon (float): Minimum loss improvement threshold to reset the patience counter. - counter (int): Tracks the number of epochs without sufficient improvement. - min_loss (float): Keeps track of the minimum observed loss. """ def __init__(self, patience=1, epsilon=0.01): """ Initialize the EarlyStopper with specified patience and epsilon. Parameters: - patience (int): Maximum number of epochs without improvement before stopping. - epsilon (float): Minimum loss reduction to reset the patience counter. """ self.patience = patience self.epsilon = epsilon self.counter = 0 self.min_loss = float('inf')
[docs] def early_stop(self, loss): """ Checks if training should be stopped based on the current loss. Parameters: - loss (float): The current loss value for the epoch. Returns: - bool: True if training should stop, False otherwise. """ if loss < self.min_loss: # Loss has improved; reset counter and update min_loss self.min_loss = loss self.counter = 0 elif loss > (self.min_loss + self.epsilon): # Loss did not improve sufficiently; increment counter self.counter += 1 if self.counter >= self.patience: return True return False
[docs]class CustomANFIS(nn.Module): """ A customizable Adaptive Neuro-Fuzzy Inference System (ANFIS) model implemented in PyTorch. This class implements a modular and flexible ANFIS architecture that supports different membership functions, activation functions, task types (classification, binary classification, regression), and rule strengths calculation strategies (to address vanishing gradient issues in fuzzy logic models). Parameters: input_dim (int): Number of input features. num_rules (int): Number of fuzzy rules in the ANFIS model. output_dim (int): Number of output units (e.g., number of classes for classification tasks). mf_class (str or mfam.BaseMembership, optional): The membership function class to use. Can be a string referring to a predefined membership function name or an instance of a custom membership class. task (str, optional): Type of learning task: 'classification', 'binary_classification', or 'regression'. Default is 'classification'. act_output (str, optional): Activation function to apply at the output layer. If not provided, a default activation is chosen based on the task (Softmax for classification, Sigmoid for binary classification, Identity for regression). vanishing_strategy (str, optional): Strategy for computing rule strength to mitigate vanishing gradient issues. Supported values: 'prod', 'mean', 'blend'. Default is 'prod'. reg_lambda (float, optional): Regularization strength for L2-regularized least squares when updating consequent parameters. Default is 0 (no regularization). seed (int, optional): Random seed for reproducibility. **kwargs: Additional arguments reserved for future compatibility. Attributes: memberships (nn.ModuleList): List of membership function modules for each fuzzy rule. coeffs (nn.Parameter): Learnable parameters (consequents) representing the linear coefficients per rule. act_output_ (nn.Module): Activation function module used in the output layer. mf_class_ (type): Class object of the selected membership function. _get_strength (Callable): Method used for computing rule strength (based on chosen strategy). Supported Membership Functions: - Gaussian - Trapezoidal - Triangular - Sigmoid - Bell - GBell - PiShaped - SShaped - ZShaped - Linear Supported Output Activations: Any activation function in torch.nn.modules.activation, including: ReLU, Sigmoid, Tanh, GELU, Softmax, Identity, etc. Supported Vanishing Strategies: - 'prod': Product of membership values (classical approach). - 'mean': Mean of membership values. - 'blend': A learned blend between product and mean based on log-strength scaling. Example: >>> model = CustomANFIS(input_dim=4, num_rules=5, output_dim=3, mf_class="Gaussian", >>> task="classification", act_output="Softmax", vanishing_strategy="blend") >>> output = model(torch.randn(32, 4)) """ SUPPORTED_ACTIVATIONS = [ "Threshold", "ReLU", "RReLU", "Hardtanh", "ReLU6", "Sigmoid", "Hardsigmoid", "Tanh", "SiLU", "Mish", "Hardswish", "ELU", "CELU", "SELU", "GLU", "GELU", "Hardshrink", "LeakyReLU", "LogSigmoid", "Softplus", "Softshrink", "MultiheadAttention", "PReLU", "Softsign", "Tanhshrink", "Softmin", "Softmax", "Softmax2d", "LogSoftmax", ] SUPPORT_MEMBERSHIP_CLASSES = { "Gaussian": "GaussianMembership", "Trapezoidal": "TrapezoidalMembership", "Triangular": "TriangularMembership", "Sigmoid": "SigmoidMembership", "Bell": "BellMembership", "PiShaped": "PiShapedMembership", "SShaped": "SShapedMembership", "GBell": "GBellMembership", "ZShaped": "ZShapedMembership", "Linear": "LinearMembership", } SUPPORTED_VANISHING_STRATEGIES = ["prod", "mean", "blend"] def __init__(self, input_dim=None, num_rules=None, output_dim=None, mf_class=None, task="classification", vanishing_strategy="prod", act_output=None, reg_lambda=None, seed=None, **kwargs): """ Initialize a customizable multi-layer perceptron (ANFIS) model. """ super(CustomANFIS, self).__init__() self.input_dim = input_dim self.output_dim = output_dim self.num_rules = num_rules self.task = task self.act_output = act_output self.seed = seed if reg_lambda is None: self.reg_lambda = 0. else: self.reg_lambda = reg_lambda self.kwargs = kwargs if seed is not None: torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) if vanishing_strategy is None: self.vanishing_strategy = "prod" if validator.is_str_in_sequence(vanishing_strategy, self.SUPPORTED_VANISHING_STRATEGIES): self.vanishing_strategy = vanishing_strategy if self.vanishing_strategy == "prod": self._get_strength = self.__get_strength_by_prod elif self.vanishing_strategy == "mean": self._get_strength = self.__get_strength_by_mean else: self._get_strength = self.__get_strength_by_blend else: raise ValueError(f"Unsupported vanishing strategy: {vanishing_strategy}. Supported strategies are: {self.SUPPORTED_VANISHING_STRATEGIES}") # Ensure hidden_layers is a valid list, tuple, or numpy array if mf_class is None: self.mf_class = self.SUPPORT_MEMBERSHIP_CLASSES["Gaussian"] elif isinstance(mf_class, str): if validator.is_str_in_sequence(mf_class, list(self.SUPPORT_MEMBERSHIP_CLASSES.keys())): self.mf_class = self.SUPPORT_MEMBERSHIP_CLASSES[mf_class] else: raise ValueError(f"Unsupported membership function class: {mf_class}. Supported classes are: {list(self.SUPPORT_MEMBERSHIP_CLASSES.keys())}") elif isinstance(mf_class, mfam.BaseMembership): self.mf_class = mf_class().name() else: raise TypeError(f"Unsupported membership function class type: {type(mf_class)}. Expected str or BaseMembership instance.") self.mf_class_ = getattr(mfam, self.mf_class) # Get the class from the string name # Determine activation for the output layer based on the task if act_output is None: if task == 'classification': self.act_output_ = nn.Softmax(dim=1) elif task == 'binary_classification': self.act_output_ = nn.Sigmoid() else: # regression self.act_output_ = nn.Identity() else: self.act_output_ = self._get_act(act_output) # Initialize membership functions self.memberships = nn.ModuleList([self.mf_class_(self.input_dim) for _ in range(self.num_rules)]) # Initialize linear coefficients for each rule self.coeffs = nn.Parameter(torch.zeros(num_rules, input_dim + 1, output_dim)) # (num_rules, input_dim+1, output_dim) def __repr__(self, **kwargs): """Pretty-print parameters like scikit-learn's Estimator. """ param_order = list(inspect.signature(self.__init__).parameters.keys()) param_dict = {k: getattr(self, k) for k in param_order} param_str = ", ".join(f"{k}={repr(v)}" for k, v in param_dict.items()) if len(param_str) <= 80: return f"{self.__class__.__name__}({param_str})" else: formatted_params = ",\n ".join(f"{k}={pprint.pformat(v)}" for k, v in param_dict.items()) return f"{self.__class__.__name__}(\n {formatted_params}\n)" def _get_act(self, act_name): """ Retrieve the activation function by name. Parameters: - act_name (str): Name of the activation function. Returns: - nn.Module: The activation function module. """ if act_name == "Softmax": return nn.Softmax(dim=1) elif act_name == "None": return nn.Identity() else: return getattr(nn.modules.activation, act_name)() def __get_strength_by_mean(self, memberships): """ Calculate the strengths of the rules using mean method. Parameters: - memberships (torch.Tensor): Membership values for each rule. Returns: - torch.Tensor: Strengths of the rules. """ # Calculate strengths by taking the mean along the input dimension (dim=2) return torch.mean(memberships, dim=2) def __get_strength_by_prod(self, memberships): """ Calculate the strengths of the rules using product method. Parameters: - memberships (torch.Tensor): Membership values for each rule. Returns: - torch.Tensor: Strengths of the rules. """ # Calculate rule strengths by taking the product along the input dimension (dim=2) return torch.prod(memberships, dim=2) def __get_strength_by_blend(self, memberships): """ Calculate the strengths of the rules using blend method. Parameters: - memberships (torch.Tensor): Membership values for each rule. Returns: - torch.Tensor: Strengths of the rules. """ # Calculate strengths using a blend of product and mean prod_strengths = torch.prod(memberships, dim=2) mean_strengths = torch.mean(memberships, dim=2) # Compute blending factor alpha based on log of product strength log_strength = torch.log(prod_strengths + 1e-8) alpha = torch.sigmoid(-10 * (log_strength + 6)) return (1 - alpha) * prod_strengths + alpha * mean_strengths def _get_membership_strengths(self, X): # Layer 1: Calculate membership values for all rules (N x num_rules x input_dim) memberships = torch.stack([membership(X) for membership in self.memberships], dim=1) # Layer 2: Calculate rule strengths strengths = self._get_strength(memberships) # Layer 3: Normalize strengths (N x num_rules) strengths_sum = torch.sum(strengths, dim=1, keepdim=True) return strengths / (strengths_sum + 1e-8) # normalized_strengths
[docs] def forward(self, X): """ Forward pass through the Anfis model. Parameters: - x (torch.Tensor): The input tensor. Returns: - torch.Tensor: The output of the ANFIS model. """ # Layer 1: Calculate membership values for all rules (N x num_rules x input_dim) # Layer 2: Calculate rule strengths # Layer 3: Normalize strengths (N x num_rules) normalized_strengths = self._get_membership_strengths(X) # Layer 4: Prepare input for consequent layer ==> (N, num_rules, input_dim) weighted_inputs = torch.einsum("ni,rij->nrj", X, self.coeffs[:, :-1, :]) + self.coeffs[:, -1, :] # (N, num_rules, output_dim) # Layer 5: Apply normalized weights to rule outputs # Multiply with normalized strengths (broadcasting): (N, num_rules, 1) * (N, num_rules, output_dim) output = torch.sum(normalized_strengths.unsqueeze(-1) * weighted_inputs, dim=1) # (N, output_dim) # Layer 6: Apply activation function for the output layer output = self.act_output_(output) # (N, output_dim) return output
[docs] def update_output_weights_by_least_squares(self, X, y): with torch.no_grad(): device = X.device N = X.shape[0] ones = torch.ones(N, 1, device=device) X_bias = torch.cat([X, ones], dim=1) # (N, input_dim + 1) normalized_strengths = self._get_membership_strengths(X) # (N x num_rules) # Vectorized: Multiply strengths with X_bias X_expanded = X_bias.unsqueeze(1) # (N, 1, input_dim + 1) strengths_expanded = normalized_strengths.unsqueeze(2) # (N, num_rules, 1) F = strengths_expanded * X_expanded # (N, num_rules, input_dim + 1) F = F.reshape(N, -1) # (N, num_rules * (input_dim + 1)) if self.task =="classification" and (y.ndim == 1 or y.shape[1] == 1): y = torch.nn.functional.one_hot(y.squeeze().long(), num_classes=self.output_dim).float() else: y = y.to(dtype=F.dtype) y = y.to(device) if self.reg_lambda == 0: # No regularization # coeffs_flat = torch.linalg.lstsq(F, y, driver='gelsd').solution coeffs_flat = torch.linalg.pinv(F) @ y else: # Least Squares Estimation with L2 I = torch.eye(F.shape[1], device=device, dtype=F.dtype) coeffs_flat = torch.linalg.solve(F.T @ F + self.reg_lambda * I, F.T @ y) coeffs = coeffs_flat.view(self.num_rules, self.input_dim + 1, self.output_dim) self.coeffs.data.copy_(coeffs)
[docs] def set_weights(self, solution): """ Set only the premise (non-consequent) weights of the network based on a given solution vector. Parameters: - solution (np.ndarray): A flat array of weights to set in the model (excluding consequent weights). """ with torch.no_grad(): idx = 0 for name, param in self.named_parameters(): if 'coeffs' in name: continue # Skip consequent parameters param_size = param.numel() param.copy_(torch.tensor(solution[idx:idx + param_size], dtype=param.dtype, device=param.device).view(param.shape)) idx += param_size
[docs] def get_weights(self): """ Retrieve only the premise (non-consequent) weights as a flattened NumPy array. Returns: - np.ndarray: Flattened array of the model's premise weights. """ weights = [] for name, param in self.named_parameters(): if 'coeffs' in name: continue # Skip consequent parameters weights.append(param.data.cpu().numpy().flatten()) return np.concatenate(weights)
[docs] def get_weights_size(self): """ Calculate the number of trainable premise (non-consequent) parameters in the model. Returns: - int: Total number of premise parameters. """ return sum( param.numel() for name, param in self.named_parameters() if param.requires_grad and 'coeffs' not in name )
[docs]class BaseAnfis(BaseEstimator): """ BaseAnfis is a scikit-learn style base class for managing and training an Adaptive Neuro-Fuzzy Inference System (ANFIS) model. This class provides a high-level interface for building and evaluating ANFIS models using PyTorch while retaining compatibility with scikit-learn-style APIs (e.g., `fit`, `predict`, `score`). It includes functionality for saving/loading models, metrics evaluation, and logging training results. Parameters ---------- num_rules : int Number of fuzzy rules in the ANFIS model. mf_class : str or object The membership function class to use. Can be a string name of a predefined MF type or a custom MF instance. task : str, optional (default="classification") Type of supervised learning task. One of {"classification", "binary_classification", "regression"}. vanishing_strategy : str or None, optional Strategy to handle vanishing gradients when combining membership values. Can be one of {"prod", "mean", "blend"}. default='prod') act_output : str or None, optional Output activation function. If None, will be inferred based on task type. reg_lambda : float or None, optional (default=0.0) Regularization term used when solving for consequent parameters via least squares. seed : int or None, optional Random seed for reproducibility. Attributes ---------- network : torch.nn.Module or None The underlying ANFIS model instance. Must be assigned by the subclass or during training. loss_train : list or None A list to store training loss per epoch (if tracking is implemented). SUPPORTED_CLS_METRICS : list of str List of supported classification evaluation metrics. SUPPORTED_REG_METRICS : list of str List of supported regression evaluation metrics. Notes ----- - This class is designed to be inherited and extended. The core methods `fit`, `predict`, and `score` must be implemented in a subclass or concrete version. - Helper methods for saving/loading models and logging training history are included. - The `evaluate` method provides metric evaluation for classification and regression tasks. """ SUPPORTED_CLS_METRICS = get_all_classification_metrics() SUPPORTED_REG_METRICS = get_all_regression_metrics() def __init__(self, num_rules, mf_class, task="classification", vanishing_strategy="prod", act_output=None, reg_lambda=None, seed=None): self.num_rules = num_rules self.mf_class = mf_class self.task = task self.act_output = act_output self.vanishing_strategy = vanishing_strategy if reg_lambda is None: self.reg_lambda = 0. else: self.reg_lambda = reg_lambda self.seed = seed self.network = None self.loss_train = None @staticmethod def _check_method(method=None, list_supported_methods=None): """ Validates if the given method is supported. Parameters ---------- method : str The method to be checked. list_supported_methods : list of str A list of supported method names. Returns ------- bool True if the method is supported; otherwise, raises ValueError. """ if type(method) is str: return validator.check_str("method", method, list_supported_methods) else: raise ValueError(f"method should be a string and belong to {list_supported_methods}")
[docs] def set_seed(self, seed): """ Set the random seed for the model to ensure reproducibility. Parameters: seed (int, None): The seed value to use for random number generators within the model. Notes: - This method stores the seed value in the `self.seed` attribute. - Setting a seed helps achieve reproducible results, especially in training neural networks where randomness affects initialization and other stochastic operations. """ self.seed = seed
[docs] def fit(self, X, y): """ Train the ANFIS model on the given dataset. Parameters ---------- X : array-like or torch.Tensor Training features. y : array-like or torch.Tensor Target values. """ pass
[docs] def predict(self, X): """ Generate predictions for input data using the trained model. Parameters ---------- X : array-like or torch.Tensor Input features for prediction. Returns ------- array-like or torch.Tensor Model predictions for each input sample. """ pass
[docs] def score(self, X, y): """ Evaluate the model on the given dataset. Parameters ---------- X : array-like or torch.Tensor Evaluation features. y : array-like or torch.Tensor True values. Returns ------- float The accuracy or evaluation score. """ pass
def __evaluate_reg(self, y_true, y_pred, list_metrics=("MSE", "MAE")): """ Evaluate regression performance metrics. Parameters ---------- y_true : array-like True target values. y_pred : array-like Predicted values. list_metrics : tuple of str, list of str List of metrics for evaluation (e.g., "MSE" and "MAE"). Returns ------- dict Dictionary of calculated metric values. """ rm = RegressionMetric(y_true=y_true, y_pred=y_pred) return rm.get_metrics_by_list_names(list_metrics) def __evaluate_cls(self, y_true, y_pred, list_metrics=("AS", "RS")): """ Evaluate classification performance metrics. Parameters ---------- y_true : array-like True target values. y_pred : array-like Predicted labels. list_metrics : tuple of str, list of str List of metrics for evaluation (e.g., "AS" and "RS"). Returns ------- dict Dictionary of calculated metric values. """ cm = ClassificationMetric(y_true, y_pred) return cm.get_metrics_by_list_names(list_metrics)
[docs] def evaluate(self, y_true, y_pred, list_metrics=None): """ Evaluate the model using specified metrics. Parameters ---------- y_true : array-like True target values. y_pred : array-like Model's predicted values. list_metrics : list of str, optional Names of metrics for evaluation (e.g., "MSE", "MAE"). Returns ------- dict Evaluation metrics and their values. """ pass
[docs] def save_training_loss(self, save_path="history", filename="loss.csv"): """ Save training loss history to a CSV file. Parameters ---------- save_path : str, optional Path to save the file (default: "history"). filename : str, optional Filename for saving loss history (default: "loss.csv"). """ Path(save_path).mkdir(parents=True, exist_ok=True) if self.loss_train is None: print(f"{self.__class__.__name__} model doesn't have training loss!") else: data = {"epoch": list(range(1, len(self.loss_train) + 1)), "loss": self.loss_train} pd.DataFrame(data).to_csv(f"{save_path}/{filename}", index=False)
[docs] def save_evaluation_metrics(self, y_true, y_pred, list_metrics=("RMSE", "MAE"), save_path="history", filename="metrics.csv"): """ Save evaluation metrics to a CSV file. Parameters ---------- y_true : array-like Ground truth values. y_pred : array-like Model predictions. list_metrics : list of str, optional Metrics for evaluation (default: ("RMSE", "MAE")). save_path : str, optional Path to save the file (default: "history"). filename : str, optional Filename for saving metrics (default: "metrics.csv"). """ Path(save_path).mkdir(parents=True, exist_ok=True) results = self.evaluate(y_true, y_pred, list_metrics) df = pd.DataFrame.from_dict(results, orient='index').T df.to_csv(f"{save_path}/{filename}", index=False)
[docs] def save_y_predicted(self, X, y_true, save_path="history", filename="y_predicted.csv"): """ Save true and predicted values to a CSV file. Parameters ---------- X : array-like or torch.Tensor Input features. y_true : array-like True values. save_path : str, optional Path to save the file (default: "history"). filename : str, optional Filename for saving predicted values (default: "y_predicted.csv"). """ Path(save_path).mkdir(parents=True, exist_ok=True) y_pred = self.predict(X) data = {"y_true": np.squeeze(np.asarray(y_true)), "y_pred": np.squeeze(np.asarray(y_pred))} pd.DataFrame(data).to_csv(f"{save_path}/{filename}", index=False)
[docs] def save_model(self, save_path="history", filename="model.pkl"): """ Save the trained model to a pickle file. Parameters ---------- save_path : str, optional Path to save the model (default: "history"). filename : str, optional Filename for saving model, with ".pkl" extension (default: "model.pkl"). """ Path(save_path).mkdir(parents=True, exist_ok=True) if filename[-4:] != ".pkl": filename += ".pkl" pickle.dump(self, open(f"{save_path}/{filename}", 'wb'))
[docs] @staticmethod def load_model(load_path="history", filename="model.pkl") -> EstimatorType: """ Load a model from a pickle file. Parameters ---------- load_path : str, optional Path to load the model from (default: "history"). filename : str, optional Filename of the saved model (default: "model.pkl"). Returns ------- BaseAnfis The loaded model. """ if filename[-4:] != ".pkl": filename += ".pkl" return pickle.load(open(f"{load_path}/{filename}", 'rb'))
[docs]class BaseClassicAnfis(BaseAnfis): """ A classical ANFIS (Adaptive Neuro-Fuzzy Inference System) model for classification tasks with hybrid learning: gradient descent for premise parameters and least squares estimation for consequent parameters. This implementation supports customizable optimizers, early stopping, L2 regularization, and validation split. Parameters ---------- num_rules : int, default=10 Number of fuzzy rules in the ANFIS model. mf_class : str, default="Gaussian" Type of membership function to use (e.g., "Gaussian", "Triangular"). vanishing_strategy : str or None, optional Strategy to handle vanishing gradients when combining membership values. Can be one of {"prod", "mean", "blend"}. default='prod') act_output : callable or None, default=None Activation function to apply to the output layer (e.g., softmax for classification). reg_lambda : float or None, optional L2 regularization strength. If None, regularization is disabled. epochs : int, default=1000 Number of training epochs. batch_size : int, default=16 Batch size used for training. optim : str, default="Adam" Name of the optimizer. Must be one of the supported optimizers. optim_params : dict or None, default=None Dictionary of optimizer hyperparameters (e.g., learning rate). early_stopping : bool, default=True Whether to apply early stopping during training. n_patience : int, default=10 Number of epochs to wait before early stopping if no improvement. epsilon : float, default=0.001 Minimum change in loss to qualify as improvement for early stopping. valid_rate : float, default=0.1 Percentage of data to use for validation split. seed : int, default=42 Random seed for reproducibility. verbose : bool, default=True Whether to print progress during training. device : str, optional (default = None) Device to run the model on (e.g., "cpu" or "gpu"). Attributes ---------- network : CustomANFIS The core ANFIS model with fuzzy rules and trainable layers. optimizer : torch.optim.Optimizer Optimizer instance based on the specified strategy. criterion : torch.nn.Module Loss function used (e.g., CrossEntropyLoss for classification). early_stopper : EarlyStopper or None Instance of early stopping monitor (if enabled). """ SUPPORTED_OPTIMIZERS = [ "Adafactor", "Adadelta", "Adagrad", "Adam", "Adamax", "AdamW", "ASGD", "LBFGS", "NAdam", "RAdam", "RMSprop", "Rprop", "SGD", "SparseAdam", ] def __init__(self, num_rules=10, mf_class="Gaussian", vanishing_strategy="prod", act_output=None, reg_lambda=None, epochs=1000, batch_size=16, optim="Adam", optim_params=None, early_stopping=True, n_patience=10, epsilon=0.001, valid_rate=0.1, seed=42, verbose=True, device=None): """ Initialize the ANFIS with user-defined architecture, training parameters, and optimization settings. """ super().__init__(num_rules, mf_class, "classification", vanishing_strategy=vanishing_strategy, act_output=act_output, reg_lambda=reg_lambda, seed=seed) self.epochs = epochs self.batch_size = batch_size self.optim = optim self.optim_params = optim_params if optim_params else {} self.early_stopping = early_stopping self.n_patience = n_patience self.epsilon = epsilon self.valid_rate = valid_rate self.verbose = verbose if device == 'gpu': if torch.cuda.is_available(): self.device = 'cuda' else: raise ValueError("GPU is not available. Please set device to 'cpu'.") else: self.device = "cpu" # Internal attributes for model, optimizer, and early stopping self.size_input = None self.size_output = None self.network = None self.optimizer = None self.criterion = None self.patience_count = None self.valid_mode = False self.early_stopper = None
[docs] def build_model(self): """ Construct the ANFIS model, optimizer, and loss criterion. This method: - Instantiates the ANFIS network based on current configuration. - Configures the optimizer for trainable (non-consequent) parameters. - Selects the loss function based on task type. - Initializes early stopping if enabled. """ if self.early_stopping: # Initialize early stopper if early stopping is enabled self.early_stopper = EarlyStopper(patience=self.n_patience, epsilon=self.epsilon) # Define model, optimizer, and loss criterion based on task self.network = CustomANFIS(self.size_input, self.num_rules, self.size_output, self.mf_class, self.task, self.vanishing_strategy, self.act_output, self.reg_lambda, self.seed).to(self.device) # Freeze consequent parameters during GD params = [p for name, p in self.network.named_parameters() if 'coeffs' not in name] self.optimizer = getattr(torch.optim, self.optim)(params, **self.optim_params) # Select loss function based on task type if self.task == "classification": self.criterion = nn.CrossEntropyLoss() elif self.task == "binary_classification": self.criterion = nn.BCEWithLogitsLoss() else: self.criterion = nn.MSELoss()
[docs] def process_data(self, X, y, **kwargs): """ Process and prepare data for training. Parameters ---------- X : array-like Feature data for training. y : array-like Target labels or values for training. **kwargs : additional keyword arguments Additional parameters for data processing, if needed. """ pass # Placeholder for data processing logic
def _fit(self, data, **kwargs): """ Train the ANFIS model using hybrid learning: gradient descent for the premise parameters and least squares estimation for consequent parameters. Parameters ---------- data : tuple A tuple containing (train_loader, X_valid_tensor, y_valid_tensor) for training and validation. **kwargs : additional keyword arguments Additional parameters for training, if needed. Notes ----- - Early stopping is applied if enabled. - Training loss (and validation loss, if applicable) is printed per epoch when verbose=True. - Least squares estimation is applied at each batch step to update the consequent parameters. """ # Unpack training and validation data train_loader, X_valid_tensor, y_valid_tensor = data # Start training self.loss_train = [] for epoch in range(self.epochs): self.network.train() # Set model to training mode # Initialize total loss for this epoch total_loss = 0.0 # # Update consequent parameters using LSE for all training data # X_all = torch.cat([x for x, _ in train_loader], dim=0) # y_all = torch.cat([y for _, y in train_loader], dim=0) # self.network.update_output_weights_by_least_squares(X_all.detach(), y_all.detach()) # Training step over batches for batch_X, batch_y in train_loader: self.optimizer.zero_grad() # Clear gradients # Update consequent parameters using LSE self.network.update_output_weights_by_least_squares(batch_X.detach(), batch_y.detach()) # Forward pass pred = self.network(batch_X) # Compute loss with L2 regularization (only for trainable parameters) l2_reg = sum(torch.sum(param ** 2) for name, param in self.network.named_parameters() if param.requires_grad and 'coeffs' not in name) loss = self.criterion(pred, batch_y) + self.reg_lambda * l2_reg # Compute loss # Backpropagation and optimization loss.backward() self.optimizer.step() total_loss += loss.item() # Accumulate batch loss # Calculate average training loss for this epoch avg_loss = total_loss / len(train_loader) self.loss_train.append(avg_loss) # Perform validation if validation mode is enabled if self.valid_mode: self.network.eval() # Set model to evaluation mode with torch.no_grad(): val_output = self.network(X_valid_tensor) val_loss = self.criterion(val_output, y_valid_tensor) # Early stopping based on validation loss if self.early_stopping and self.early_stopper.early_stop(val_loss): print(f"Early stopping at epoch {epoch + 1}") break if self.verbose: print(f"Epoch: {epoch + 1}, Train Loss: {avg_loss:.6f}, Validation Loss: {val_loss:.6f}") else: # Early stopping based on training loss if no validation is used if self.early_stopping and self.early_stopper.early_stop(avg_loss): print(f"Early stopping at epoch {epoch + 1}") break if self.verbose: print(f"Epoch: {epoch + 1}, Train Loss: {avg_loss:.6f}")
[docs]class BaseGdAnfis(BaseAnfis): """ A gradient-based Adaptive Neuro-Fuzzy Inference System (ANFIS) base class using PyTorch. This class supports training an ANFIS model using various gradient descent optimizers provided by PyTorch. It includes options for early stopping, regularization, and model validation. Attributes ---------- SUPPORTED_OPTIMIZERS : list of str List of supported PyTorch optimizer names. epochs : int Number of training epochs. batch_size : int Mini-batch size used for gradient-based optimization. optim : str Name of the optimizer to use (must be in SUPPORTED_OPTIMIZERS). optim_params : dict Parameters to initialize the optimizer. early_stopping : bool Whether to apply early stopping during training. n_patience : int Number of epochs with no improvement after which training is stopped. epsilon : float Minimum change in the monitored loss to qualify as an improvement. valid_rate : float Proportion of the training data to be used for validation. verbose : bool Whether to print logs during training. size_input : int Number of input features (set during data processing). size_output : int Number of output features (set during data processing). network : CustomANFIS The ANFIS model instance. optimizer : torch.optim.Optimizer The PyTorch optimizer instance. criterion : torch.nn.Module Loss function used for training. patience_count : int or None Counter for tracking early stopping patience. valid_mode : bool Whether validation mode is enabled. early_stopper : EarlyStopper or None Instance managing early stopping logic. Parameters ---------- num_rules : int, optional Number of fuzzy rules (default is 10). mf_class : str, optional Type of membership function to use (default is "Gaussian"). vanishing_strategy : str, optional Strategy to compute rule strengths (to avoid gradient vanishing too), e.g., "prod" or "min" (default is "prod"). act_output : callable or None, optional Activation function for the output layer (default is None). reg_lambda : float or None, optional Regularization parameter for L2 loss (default is None). epochs : int, optional Number of training epochs (default is 1000). batch_size : int, optional Batch size for training (default is 16). optim : str, optional Optimizer name from SUPPORTED_OPTIMIZERS (default is "Adam"). optim_params : dict or None, optional Parameters for optimizer initialization (default is None). early_stopping : bool, optional Enable or disable early stopping (default is True). n_patience : int, optional Patience threshold for early stopping (default is 10). epsilon : float, optional Minimum improvement threshold for early stopping (default is 0.001). valid_rate : float, optional Validation split ratio from training data (default is 0.1). seed : int, optional Random seed for reproducibility (default is 42). verbose : bool, optional Enable verbose output (default is True). device : str, optional (default = None) Device to run the model on (e.g., "cpu" or "gpu"). Methods ------- build_model(): Build and initialize the ANFIS network, optimizer, and loss function. process_data(X, y, **kwargs): Prepares input features and targets for training (to be implemented in subclass). _fit(data, **kwargs): Trains the ANFIS model using mini-batch gradient descent and early stopping. """ SUPPORTED_OPTIMIZERS = [ "Adafactor", "Adadelta", "Adagrad", "Adam", "Adamax", "AdamW", "ASGD", "LBFGS", "NAdam", "RAdam", "RMSprop", "Rprop", "SGD", "SparseAdam", ] def __init__(self, num_rules=10, mf_class="Gaussian", vanishing_strategy="prod", act_output=None, reg_lambda=None, epochs=1000, batch_size=16, optim="Adam", optim_params=None, early_stopping=True, n_patience=10, epsilon=0.001, valid_rate=0.1, seed=42, verbose=True, device=None): """ Initialize the ANFIS with user-defined architecture, training parameters, and optimization settings. """ super().__init__(num_rules, mf_class, "classification", vanishing_strategy=vanishing_strategy, act_output=act_output, reg_lambda=reg_lambda, seed=seed) self.epochs = epochs self.batch_size = batch_size self.optim = optim self.optim_params = optim_params if optim_params else {} self.early_stopping = early_stopping self.n_patience = n_patience self.epsilon = epsilon self.valid_rate = valid_rate self.verbose = verbose if device == 'gpu': if torch.cuda.is_available(): self.device = 'cuda' else: raise ValueError("GPU is not available. Please set device to 'cpu'.") else: self.device = "cpu" # Internal attributes for model, optimizer, and early stopping self.size_input = None self.size_output = None self.network = None self.optimizer = None self.criterion = None self.patience_count = None self.valid_mode = False self.early_stopper = None
[docs] def build_model(self): """ Build and initialize the ANFIS model, optimizer, and criterion based on user specifications. This function sets up the model structure, optimizer type and parameters, and loss criterion depending on the task type (classification or regression). """ if self.early_stopping: # Initialize early stopper if early stopping is enabled self.early_stopper = EarlyStopper(patience=self.n_patience, epsilon=self.epsilon) # Define model, optimizer, and loss criterion based on task self.network = CustomANFIS(self.size_input, self.num_rules, self.size_output, self.mf_class, self.task, self.vanishing_strategy, self.act_output, self.reg_lambda, self.seed).to(self.device) self.optimizer = getattr(torch.optim, self.optim)(self.network.parameters(), **self.optim_params) # Select loss function based on task type if self.task == "classification": self.criterion = nn.CrossEntropyLoss() elif self.task == "binary_classification": self.criterion = nn.BCEWithLogitsLoss() else: self.criterion = nn.MSELoss()
[docs] def process_data(self, X, y, **kwargs): """ Process and prepare data for training. Parameters ---------- X : array-like Feature data for training. y : array-like Target labels or values for training. **kwargs : additional keyword arguments Additional parameters for data processing, if needed. """ pass # Placeholder for data processing logic
def _fit(self, data, **kwargs): """ Train the ANFIS model on the provided data. Parameters ---------- data : tuple A tuple containing (train_loader, X_valid_tensor, y_valid_tensor) for training and validation. **kwargs : additional keyword arguments Additional parameters for training, if needed. """ # Unpack training and validation data train_loader, X_valid_tensor, y_valid_tensor = data # Start training self.loss_train = [] for epoch in range(self.epochs): self.network.train() # Set model to training mode # Initialize total loss for this epoch total_loss = 0.0 # Training step over batches for batch_X, batch_y in train_loader: self.optimizer.zero_grad() # Clear gradients # Step 1: forward pass pred = self.network(batch_X) # Step 2: compute loss with L2 regularization (only for trainable parameters) l2_reg = sum(torch.sum(param ** 2) for name, param in self.network.named_parameters()) loss = self.criterion(pred, batch_y) + self.reg_lambda * l2_reg # Compute loss # Step 3: Backpropagation and optimization loss.backward() self.optimizer.step() total_loss += loss.item() # Accumulate batch loss # Calculate average training loss for this epoch avg_loss = total_loss / len(train_loader) self.loss_train.append(avg_loss) # Perform validation if validation mode is enabled if self.valid_mode: self.network.eval() # Set model to evaluation mode with torch.no_grad(): val_output = self.network(X_valid_tensor) val_loss = self.criterion(val_output, y_valid_tensor) # Early stopping based on validation loss if self.early_stopping and self.early_stopper.early_stop(val_loss): print(f"Early stopping at epoch {epoch + 1}") break if self.verbose: print(f"Epoch: {epoch + 1}, Train Loss: {avg_loss:.6f}, Validation Loss: {val_loss:.6f}") else: # Early stopping based on training loss if no validation is used if self.early_stopping and self.early_stopper.early_stop(avg_loss): print(f"Early stopping at epoch {epoch + 1}") break if self.verbose: print(f"Epoch: {epoch + 1}, Train Loss: {avg_loss:.6f}")
[docs]class BaseBioAnfis(BaseAnfis): """ Base class for biologically-inspired ANFIS models using metaheuristic optimization. This class serves as a base for integrating Adaptive Neuro-Fuzzy Inference Systems (ANFIS) with metaheuristic algorithms (e.g., Genetic Algorithm, PSO, etc.) to optimize membership function parameters and rule weights. The consequent parameters are learned using Least Squares Estimation (LSE) in a hybrid-learning manner. Parameters ---------- num_rules : int, optional Number of fuzzy rules in the ANFIS model (default is 10). mf_class : str, optional Type of membership function to use (e.g., "Gaussian", "Triangular") (default is "Gaussian"). vanishing_strategy : str or None, optional Strategy to handle vanishing gradients when combining membership values. Can be one of {"prod", "mean", "blend"} (default is "prod"). act_output : any, optional Activation function to apply to the output layer (default is None). reg_lambda : float or None, optional L2 regularization strength for least squares estimation (default is None). optim : str, optional Name of the metaheuristic optimizer to use (default is "BaseGA"). optim_params : dict, optional Dictionary of hyperparameters for the optimizer (default is None). obj_name : str, optional Name of the objective function for optimization (default is None). seed : int, optional Random seed for reproducibility (default is 42). verbose : bool, optional Whether to print logs during training (default is True). lb : int, float, list, tuple, np.ndarray, optional. Lower bounds for optimization (default is (-1.0,)). ub : int, float, list, tuple, np.ndarray, optional. Upper bounds for optimization (default is (1.0,)). mode : str, optional Mode for optimization (default is 'single'). n_workers : int, optional Number of workers for parallel processing (default is None). termination : any, optional Termination criteria for optimization (default is None). Attributes ---------- SUPPORTED_OPTIMIZERS : list of str List of supported optimizer names from the Mealpy library. SUPPORTED_CLS_OBJECTIVES : dict Dictionary of supported classification metrics from the `permetrics` library. SUPPORTED_REG_OBJECTIVES : dict Dictionary of supported regression metrics from the `permetrics` library. optim : str Name of the metaheuristic optimizer used. optim_params : dict Hyperparameters for the selected optimizer. verbose : bool Whether to print logs during training. size_input : int Number of input features (set during model build). size_output : int Number of output neurons (set during model build). network : CustomANFIS The core ANFIS network module. optimizer : Optimizer Metaheuristic optimizer instance. obj_name : str Name of the optimization objective function. metric_class : permetrics object Metric computation class for evaluating performance. Methods ------- set_optim_and_paras(optim, optim_params) Sets the optimizer name and parameters. build_model() Constructs the ANFIS network and optimizer instance. get_name() Returns the name of the model based on optimizer settings. _set_optimizer(optim, optim_params) Internal method to initialize the optimizer. _set_lb_ub(lb, ub, n_dims) Validates and formats lower and upper bounds for optimization. objective_function(solution) Computes the loss/fitness value for a given solution vector. _fit(data, lb, ub, mode, n_workers, termination, save_population, **kwargs) Trains the ANFIS model using a metaheuristic-based optimization strategy. Notes ----- - This class supports hybrid learning by combining metaheuristics and LSE. - Intended to be extended by specific regressors/classifiers in the library. - Requires `CustomANFIS`, `Optimizer` from `mealpy`, and metrics from `permetrics`. """ SUPPORTED_OPTIMIZERS = list(get_all_optimizers(verbose=False).keys()) SUPPORTED_CLS_OBJECTIVES = get_all_classification_metrics() SUPPORTED_REG_OBJECTIVES = get_all_regression_metrics() def __init__(self, num_rules=10, mf_class="Gaussian", vanishing_strategy="prod", act_output=None, reg_lambda=None, optim="BaseGA", optim_params=None, obj_name=None, seed=42, verbose=True, lb=None, ub=None, mode='single', n_workers=None, termination=None): super().__init__(num_rules, mf_class, "classification", vanishing_strategy=vanishing_strategy, act_output=act_output, reg_lambda=reg_lambda, seed=seed) self.optim = optim self.optim_params = optim_params self.verbose = verbose self.lb = lb self.ub = ub self.mode = mode self.n_workers = n_workers self.termination = termination # Initialize model parameters self.size_input = None self.size_output = None self.network = None self.optimizer = None self.obj_name = obj_name self.metric_class = None
[docs] def set_optim_and_paras(self, optim=None, optim_params=None): """ Sets the `optim` and `optim_params` parameters for this class. Parameters ---------- optim : str The optimizer name to be set. optim_params : dict Parameters to configure the optimizer. """ self.optim = optim self.optim_params = optim_params
[docs] def _set_optimizer(self, optim=None, optim_params=None): """ Validates the real optimizer based on the provided `optim` and `optim_pras`. Parameters ---------- optim : str or Optimizer The optimizer name or instance to be set. optim_params : dict, optional Parameters to configure the optimizer. Returns ------- Optimizer An instance of the selected optimizer. Raises ------ TypeError If the provided optimizer is neither a string nor an instance of Optimizer. """ if isinstance(optim, str): opt_class = get_optimizer_by_class(optim) if isinstance(optim_params, dict): return opt_class(**optim_params) else: return opt_class(epoch=300, pop_size=30) elif isinstance(optim, Optimizer): if isinstance(optim_params, dict): if "name" in optim_params: # Check if key exists and remove it optim.name = optim_params.pop("name") optim.set_parameters(optim_params) return optim else: raise TypeError(f"optimizer needs to set as a string and supported by Mealpy library.")
[docs] def get_name(self): """ Generate a descriptive name for the ANFIS model based on the optimizer. Returns: str: A string representing the name of the model, including details about the optimizer used. If `self.optim` is a string, the name will be formatted as "<self.optim_params>-ANFIS". Otherwise, it will return "<self.optimizer.name>-ANFIS", assuming `self.optimizer` is an object with a `name` attribute. Notes: - This method relies on the presence of `self.optim`, `self.optim_params`, and `self.optimizer.name` attributes within the model instance. - It is intended to provide a consistent naming scheme for model instances based on the optimizer configuration. """ return f"{self.optimizer.name}-ANFIS-{self.optim_params}"
[docs] def build_model(self): """ Build and initialize the ANFIS model, optimizer, and loss criterion. Notes ----- - Initializes `CustomANFIS` with user settings. - Instantiates a PyTorch optimizer for trainable parameters. - Sets the appropriate loss function based on task type. - Prepares early stopping monitor if enabled. """ self.network = CustomANFIS(self.size_input, self.num_rules, self.size_output, self.mf_class, self.task, self.vanishing_strategy, self.act_output, self.reg_lambda, self.seed) self.optimizer = self._set_optimizer(self.optim, self.optim_params)
[docs] def _set_lb_ub(self, lb=None, ub=None, n_dims=None): """ Validates and sets the lower and upper bounds for optimization. Parameters ---------- lb : list, tuple, np.ndarray, int, or float, optional The lower bounds for weights and biases in network. ub : list, tuple, np.ndarray, int, or float, optional The upper bounds for weights and biases in network. n_dims : int The number of dimensions. Returns ------- tuple A tuple containing validated lower and upper bounds. Raises ------ ValueError If the bounds are not valid. """ if lb is None: lb = (-1.,) * n_dims elif isinstance(lb, numbers.Number): lb = (lb, ) * n_dims elif isinstance(lb, (list, tuple, np.ndarray)): if len(lb) == 1: lb = np.array(lb * n_dims, dtype=float) else: lb = np.array(lb, dtype=float).ravel() if ub is None: ub = (1.,) * n_dims elif isinstance(ub, numbers.Number): ub = (ub, ) * n_dims elif isinstance(ub, (list, tuple, np.ndarray)): if len(ub) == 1: ub = np.array(ub * n_dims, dtype=float) else: ub = np.array(ub, dtype=float).ravel() if len(lb) != len(ub): raise ValueError(f"Invalid lb and ub. Their length should be equal to 1 or {n_dims}.") return np.array(lb).ravel(), np.array(ub).ravel()
[docs] def objective_function(self, solution=None): """ Evaluates the fitness function for classification metrics based on the provided solution. Parameters ---------- solution : np.ndarray, default=None The proposed solution to evaluate. Returns ------- result : float The fitness value, representing the loss for the current solution. """ X_train, y_train = self.data self.network.set_weights(solution) self.network.update_output_weights_by_least_squares(X_train, y_train) y_pred = self.network(X_train).detach().cpu().numpy() y_train = y_train.detach().cpu().numpy() loss_train = self.metric_class(y_train, y_pred).get_metric_by_name(self.obj_name)[self.obj_name] return np.mean([loss_train])
[docs] def _fit(self, X, y): # Get data n_dims = self.network.get_weights_size() lb, ub = self._set_lb_ub(self.lb, self.ub, n_dims) self.data = (X, y) log_to = "console" if self.verbose else "None" if self.obj_name is None: raise ValueError("obj_name can't be None") else: if self.obj_name in self.SUPPORTED_REG_OBJECTIVES.keys(): minmax = self.SUPPORTED_REG_OBJECTIVES[self.obj_name] elif self.obj_name in self.SUPPORTED_CLS_OBJECTIVES.keys(): minmax = self.SUPPORTED_CLS_OBJECTIVES[self.obj_name] else: raise ValueError("obj_name is not supported. Please check the library: permetrics to see the supported objective function.") problem = { "obj_func": self.objective_function, "bounds": FloatVar(lb=lb, ub=ub), "minmax": minmax, "log_to": log_to, } self.optimizer.solve(problem, mode=self.mode, n_workers=self.n_workers, termination=self.termination, seed=self.seed) self.network.set_weights(self.optimizer.g_best.solution) self.network.update_output_weights_by_least_squares(X, y) self.loss_train = np.array(self.optimizer.history.list_global_best_fit) return self