diff --git a/.github/workflows/python-ci.yml b/.github/workflows/python-ci.yml index 12460f1..0ef35fc 100644 --- a/.github/workflows/python-ci.yml +++ b/.github/workflows/python-ci.yml @@ -26,12 +26,12 @@ jobs: steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: ref: ${{ github.event.pull_request.head.ref }} # ${{ github.event.pull_request.head.sha }} - name: Setup Python 3.10 - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: "3.10" cache: 'pip' @@ -70,13 +70,13 @@ jobs: steps: - name: Checkout to latest changes - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: ref: ${{ needs.formatting.outputs.new_sha }} fetch-depth: 0 - name: Set up Python 3.10 - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: "3.10" cache: 'pip' @@ -94,13 +94,13 @@ jobs: steps: - name: Checkout to latest changes - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: ref: ${{ needs.formatting.outputs.new_sha }} fetch-depth: 0 - name: Set up Python 3.10 - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: "3.10" cache: 'pip' @@ -125,13 +125,13 @@ jobs: steps: - name: Checkout to latest changes - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: ref: ${{ needs.formatting.outputs.new_sha }} fetch-depth: 0 - name: Set up Python 3.10 - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: "3.10" cache: 'pip' diff --git a/config.yaml b/config.yaml index 4c561dc..9ee8e31 100644 --- a/config.yaml +++ b/config.yaml @@ -10,7 +10,7 @@ ################################################################################ ############## Dataset type to use -# Possible values: , kaggle_hf, mnist, dt4h_format +# Possible values: , kaggle_hf, diabetes, mnist, dt4h_format dataset: dt4h_format #custom #libsvm @@ -33,7 +33,7 @@ train_size: 0.7 # ****** * * * * * * * * * * * * * * * * * * * * ******************* ############## Number of clients (data centers) to use for training -num_clients: 1 +num_clients: 3 ############## Model type # Possible values: logistic_regression, lsvc, elastic_net, random_forest, weighted_random_forest, xgb @@ -43,7 +43,7 @@ model: random_forest #random_forest ############## Training length -num_rounds: 50 +num_rounds: 5 ############## Metric to select the best model # Possible values: accuracy, balanced_accuracy, f1, precision, recall @@ -87,6 +87,8 @@ smoothWeights: linear_models: n_features: 9 +n_features: 9 + # Random Forest random_forest: balanced_rf: true @@ -101,7 +103,7 @@ xgb: batch_size: 32 num_iterations: 100 task_type: BINARY - tree_num: 500 + tree_num: 10 held_out_center_id: -1 diff --git a/flcore/client_selector.py b/flcore/client_selector.py index 76fa3d5..3f92915 100644 --- a/flcore/client_selector.py +++ b/flcore/client_selector.py @@ -11,7 +11,7 @@ def get_model_client(config, data, client_id): if model in ("logistic_regression", "elastic_net", "lsvc"): client = linear_models.client.get_client(config,data,client_id) - elif model == "random_forest": + elif model in ("random_forest", "balanced_random_forest"): client = random_forest.client.get_client(config,data,client_id) elif model == "weighted_random_forest": diff --git a/flcore/datasets.py b/flcore/datasets.py index 699c4a0..d7f4e84 100644 --- a/flcore/datasets.py +++ b/flcore/datasets.py @@ -12,10 +12,13 @@ import pandas as pd from sklearn.datasets import load_svmlight_file -from sklearn.preprocessing import OrdinalEncoder, MinMaxScaler,StandardScaler +from sklearn.preprocessing import OrdinalEncoder, LabelEncoder, MinMaxScaler, StandardScaler from sklearn.model_selection import KFold, StratifiedShuffleSplit, train_test_split from sklearn.utils import shuffle -from sklearn.feature_selection import SelectKBest, f_classif +from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif +from sklearn.ensemble import RandomForestClassifier + +from ucimlrepo import fetch_ucirepo from flcore.models.xgb.utils import TreeDataset, do_fl_partitioning, get_dataloader @@ -23,6 +26,273 @@ XY = Tuple[np.ndarray, np.ndarray] Dataset = Tuple[XY, XY] +def calculate_preprocessing_params(subset_data, subset_target, n_features=None, feature_selection_method='mutual_info'): + """ + Calculate preprocessing parameters based on a subset of data (reference center) + + Args: + subset_data: DataFrame containing the subset data + subset_target: Series containing the target variable + n_features: Number of features to select (None for all features) + feature_selection_method: Method for feature selection ('mutual_info', 'f_classif', 'random_forest') + + Returns: + dict: Preprocessing parameters (imputation values, mean, std, label_encoders, feature_selector) + """ + data_copy = subset_data.copy() + target_copy = subset_target.copy() + + # Calculate imputation parameters + imputation_params = {} + label_encoders = {} + + for column in data_copy.columns: + # Handle missing values + if data_copy[column].isna().any(): + if data_copy[column].dtype in ['float64', 'int64']: + imputation_params[column] = data_copy[column].median() + else: + imputation_params[column] = data_copy[column].mode()[0] if not data_copy[column].mode().empty else 0 + + # Store label encoders for categorical variables + if data_copy[column].dtype == 'object': + le = LabelEncoder() + # Fit on non-null values only + non_null_data = data_copy[column].dropna() + if len(non_null_data) > 0: + # Add 'unknown' category for unseen labels + classes = np.append(non_null_data.astype(str).unique(), 'unknown') + le.fit(classes) + label_encoders[column] = le + + # Calculate normalization parameters for ALL columns (after conversion to numerical) + numeric_data = data_copy.copy() + + # Temporarily convert categorical to numerical for normalization parameter calculation + for column in numeric_data.columns: + if numeric_data[column].dtype == 'object': + # Use simple integer encoding for parameter calculation + numeric_data[column] = pd.Categorical(numeric_data[column]).codes + # Handle missing values temporarily for parameter calculation + if column in imputation_params: + numeric_data[column].fillna(imputation_params[column], inplace=True) + + # Convert all to numeric + numeric_data = numeric_data.apply(pd.to_numeric, errors='coerce') + + # Calculate normalization parameters + normalization_params = { + 'mean': numeric_data.mean().to_dict(), + 'std': numeric_data.std().to_dict() + } + + # Handle zero standard deviation + for col, std_val in normalization_params['std'].items(): + if std_val == 0 or np.isnan(std_val): + normalization_params['std'][col] = 1.0 + + # Feature Selection + feature_selector = None + selected_features = None + feature_scores = None + + if n_features is not None and n_features < len(numeric_data.columns): + # Prepare data for feature selection + X_temp = numeric_data.fillna(numeric_data.median()) + y_temp = target_copy + + # Handle any remaining NaN values + X_temp = X_temp.fillna(0) + + if feature_selection_method == 'mutual_info': + selector = SelectKBest(score_func=mutual_info_classif, k=min(n_features, X_temp.shape[1])) + elif feature_selection_method == 'f_classif': + selector = SelectKBest(score_func=f_classif, k=min(n_features, X_temp.shape[1])) + elif feature_selection_method == 'random_forest': + # Use Random Forest feature importance + rf = RandomForestClassifier(n_estimators=100, random_state=42) + rf.fit(X_temp, y_temp) + importances = rf.feature_importances_ + indices = np.argsort(importances)[::-1] + selected_indices = indices[:min(n_features, len(indices))] + + # Create a custom selector object + class CustomSelector: + def __init__(self, selected_indices, feature_names): + self.selected_indices = selected_indices + self.feature_names = feature_names + self.scores_ = importances + + def transform(self, X): + if isinstance(X, pd.DataFrame): + return X.iloc[:, self.selected_indices] + else: + return X[:, self.selected_indices] + + def get_support(self, indices=False): + if indices: + return self.selected_indices + else: + mask = np.zeros(len(self.feature_names), dtype=bool) + mask[self.selected_indices] = True + return mask + + selector = CustomSelector(selected_indices, numeric_data.columns.tolist()) + feature_scores = importances + else: + raise ValueError("feature_selection_method must be 'mutual_info', 'f_classif', or 'random_forest'") + + if feature_selection_method != 'random_forest': + selector.fit(X_temp, y_temp) + feature_scores = selector.scores_ + + feature_selector = selector + selected_features = numeric_data.columns[selector.get_support()].tolist() + + print(f"Feature selection: Selected {len(selected_features)} most informative features") + if feature_scores is not None: + # Print top feature scores + feature_importance = pd.DataFrame({ + 'feature': numeric_data.columns, + 'score': feature_scores + }).sort_values('score', ascending=False) + print("Top 5 features:") + for i, (_, row) in enumerate(feature_importance.head().iterrows()): + print(f" {i+1}. {row['feature']}: {row['score']:.4f}") + + return { + 'imputation': imputation_params, + 'normalization': normalization_params, + 'label_encoders': label_encoders, + 'feature_selector': feature_selector, + 'selected_features': selected_features, + 'n_features': n_features + } + +def apply_preprocessing(subset_data, preprocessing_params): + """ + Apply preprocessing to a subset using pre-calculated parameters from reference center + + Args: + subset_data: DataFrame to preprocess + preprocessing_params: dict from calculate_preprocessing_params + + Returns: + tuple: (preprocessed_data, feature_names) + """ + data_copy = subset_data.copy() + + # Step 1: Handle missing values using reference center parameters + for column in data_copy.columns: + if column in preprocessing_params['imputation']: + missing_mask = data_copy[column].isna() + if missing_mask.any(): + data_copy.loc[missing_mask, column] = preprocessing_params['imputation'][column] + + # Step 2: Convert all features to numerical using reference center label encoders + for column in data_copy.columns: + if column in preprocessing_params['label_encoders']: + le = preprocessing_params['label_encoders'][column] + # Convert to string and handle unseen labels + encoded_values = [] + for val in data_copy[column]: + if pd.isna(val): + encoded_values.append(-1) # Special value for missing + else: + str_val = str(val) + if str_val in le.classes_: + encoded_values.append(le.transform([str_val])[0]) + else: + # Map unseen labels to 'unknown' class + encoded_values.append(le.transform(['unknown'])[0]) + data_copy[column] = encoded_values + elif data_copy[column].dtype == 'object': + # Fallback: use categorical codes for any remaining object columns + data_copy[column] = pd.Categorical(data_copy[column]).codes + + # Ensure all data is numerical + data_copy = data_copy.apply(pd.to_numeric, errors='coerce') + + # Step 3: Normalize ALL features using reference center parameters + normalization_params = preprocessing_params['normalization'] + for column in data_copy.columns: + if column in normalization_params['mean']: + mean_val = normalization_params['mean'][column] + std_val = normalization_params['std'][column] + data_copy[column] = (data_copy[column] - mean_val) / std_val + + # Step 4: Apply feature selection if enabled + if preprocessing_params['feature_selector'] is not None: + selector = preprocessing_params['feature_selector'] + data_copy = pd.DataFrame(selector.transform(data_copy), + columns=preprocessing_params['selected_features']) + + return data_copy, data_copy.columns.tolist() + +def partition_data_dirichlet(labels, num_centers, alpha=1.0): + """ + Partition data among centers using Dirichlet distribution + """ + unique_labels = np.unique(labels) + n_samples = len(labels) + n_classes = len(unique_labels) + + # Create assignment matrix + center_indices = [[] for _ in range(num_centers)] + + # For each class, distribute samples to centers using Dirichlet distribution + for class_idx in unique_labels: + class_mask = (labels == class_idx) + class_indices = np.where(class_mask)[0] + n_class_samples = len(class_indices) + + if n_class_samples > 0: + # Generate Dirichlet distribution for this class + proportions = np.random.dirichlet(np.repeat(alpha, num_centers)) + proportions = proportions / proportions.sum() + + # Calculate number of samples for each center + center_samples = (proportions * n_class_samples).astype(int) + + # Adjust for rounding errors + diff = n_class_samples - center_samples.sum() + if diff > 0: + center_samples[np.random.choice(num_centers, diff, replace=True)] += 1 + + # Shuffle and assign indices + np.random.shuffle(class_indices) + ptr = 0 + for center_id in range(num_centers): + if center_samples[center_id] > 0: + center_indices[center_id].extend( + class_indices[ptr:ptr + center_samples[center_id]] + ) + ptr += center_samples[center_id] + + # Shuffle indices within each center + for center_id in range(num_centers): + np.random.shuffle(center_indices[center_id]) + + return center_indices + +def select_reference_center(all_center_data, method='largest'): + """ + Select which center to use for calculating preprocessing parameters + """ + if method == 'largest': + center_sizes = [len(X) for X, y in all_center_data] + reference_center_id = np.argmax(center_sizes) + print(f"Selected largest center (ID: {reference_center_id}) with {center_sizes[reference_center_id]} samples") + + elif method == 'random': + reference_center_id = np.random.randint(0, len(all_center_data)) + print(f"Selected random center (ID: {reference_center_id})") + + else: + raise ValueError("Method must be 'largest' or 'random'") + + return reference_center_id + def load_mnist(center_id=None, num_splits=5): """Loads the MNIST dataset using OpenML. @@ -343,7 +613,7 @@ def get_preprocessing_params(data): for feature in transformers_dict: if feature == 'ST_Slope': # Change value of last row to 'Down' to avoid error as it is missing in some splits - X_train[feature].iloc[-1] = 'Down' + X_train.loc[X_train.index[-1], feature] = 'Down' transformers_dict[feature].fit(X_train[feature].values.reshape(-1, 1)) else: transformers_dict[feature].fit(X_train[feature].values.reshape(-1, 1)) @@ -358,7 +628,9 @@ def preprocess_data(data, column_transformer): target = df1['HeartDisease'] for feature in column_transformer: - features[feature] = column_transformer[feature].transform(features[feature].values.reshape(-1, 1)) + features.loc[:, feature] = column_transformer[feature].transform(features[feature].values.reshape(-1, 1)) + + features = features.infer_objects() X_train, X_test, y_train, y_test = train_test_split(features, target, test_size = 0.20, random_state = seed, stratify=target) @@ -369,39 +641,6 @@ def preprocess_data(data, column_transformer): (X_train, y_train), (X_test, y_test) = preprocess_data(data, preprocessing_params) - # n_females = len(X_train[X_train['Sex'] == 0]) - # print(f'n_females{n_females}') - # n_males = len(X_train[X_train['Sex'] == 1]) - # print(f'n_males{n_males}') - # print(len(X_train)) - # Get indexes of rows with men (Sex == 0) - n_females = len(X_train[X_train['Sex'] == 0]) - n_males = len(X_train[X_train['Sex'] == 1]) - print(f'Center {center_id} of size {len(X_train)} with n_females {n_females} and n_males {n_males} in training set') - - if center_id == 0: - men_indexes = X_train.index[X_train['Sex'] == 1] - female_indexes = X_train.index[X_train['Sex'] == 0] - # print(len(female_indexes)) - n_females_to_drop = int(len(female_indexes)*0.9) - female_indexes = female_indexes[:n_females_to_drop] - copy_male_indexes = men_indexes[:n_females_to_drop] - # print(len(female_indexes)) - X_train = X_train.drop(index=female_indexes) - y_train = y_train.drop(index=female_indexes) - # print(len(X_train)) - # print(f'Adding males {len(copy_male_indexes)}') - X_train = pd.concat([X_train, X_train.loc[copy_male_indexes]]) - y_train = pd.concat([y_train, y_train.loc[copy_male_indexes]]) - - if center_id == 2 or center_id == -1: - X_train = pd.concat([X_train, X_train, X_train, X_train]) - y_train = pd.concat([y_train, y_train, y_train, y_train]) - - n_females = len(X_train[X_train['Sex'] == 0]) - n_males = len(X_train[X_train['Sex'] == 1]) - print(f'Center {center_id} of size {len(X_train)} with n_females {n_females} and n_males {n_males} in training set') - # xx return (X_train, y_train), (X_test, y_test) @@ -639,6 +878,122 @@ def load_dt4h(config,id): y_test = data_target[int(dat_len*config["train_size"]):].iloc[:, 0] return (X_train, y_train), (X_test, y_test) +def load_diabetes(center_id, config): + """ + Load and preprocess diabetes dataset for federated learning with feature selection + + Args: + center_id: Identifier for the federated node + num_centers: Total number of federated centers + alpha: Dirichlet concentration parameter for data partitioning + reference_method: How to select reference center ('largest' or 'random') + global_preprocessing_params: Precomputed parameters (if None, will calculate) + n_features: Number of features to select (None for all features) + feature_selection_method: Method for feature selection + + Returns: + tuple: ((X_train, y_train), (X_test, y_test), preprocessing_params) + """ + num_centers = config.get("num_clients", 5) + alpha = config.get("dirichlet_alpha", 1.0) + reference_method = config.get("reference_center_method", "largest") + global_preprocessing_params = None + n_features = config.get("n_features", 20) + feature_selection_method = config.get("feature_selection_method", "mutual_info") + + # Load the dataset + cdc_diabetes_health_indicators = fetch_ucirepo(id=891) + + # Get features and target + X = cdc_diabetes_health_indicators.data.features + y = cdc_diabetes_health_indicators.data.targets + + # convert y to a pandas Series for easier handling + y = pd.Series(y.values.flatten()) + + # Use fraction of data for faster testing (optional) + fraction = 0.02 + X = X.sample(frac=fraction, random_state=42).reset_index(drop=True) + y = y.loc[X.index].reset_index(drop=True) + + # Set random seed for reproducible partitioning + np.random.seed(42) + + # Convert target to binary classification if needed + if y.nunique() > 2: + y_binary = (y > y.median()).astype(int) + else: + y_binary = y + + # Partition data using Dirichlet distribution + all_center_indices = partition_data_dirichlet(y_binary.values, num_centers, alpha) + + # Get all center data for reference selection + all_center_data = [] + for i in range(num_centers): + if i < len(all_center_indices) and len(all_center_indices[i]) > 0: + X_center = X.iloc[all_center_indices[i]] + all_center_data.append((X_center, y_binary.iloc[all_center_indices[i]])) + else: + all_center_data.append((pd.DataFrame(), pd.Series())) + + # Calculate or use global preprocessing parameters + if global_preprocessing_params is None: + # Select reference center and calculate parameters + reference_center_id = select_reference_center(all_center_data, reference_method) + X_reference = all_center_data[reference_center_id][0] + y_reference = all_center_data[reference_center_id][1] + + if len(X_reference) == 0: + # Fallback: use full dataset if reference center is empty + X_reference = X + y_reference = y_binary + print("Warning: Reference center empty, using full dataset for preprocessing parameters") + + global_preprocessing_params = calculate_preprocessing_params( + X_reference, y_reference, n_features, feature_selection_method + ) + print("Calculated new global preprocessing parameters with feature selection") + + if center_id: + # Get indices for the requested center + if center_id >= len(all_center_indices) or len(all_center_indices[center_id]) == 0: + raise ValueError(f"Center ID {center_id} has no data assigned") + + center_indices = all_center_indices[center_id] + X_center = X.iloc[center_indices].reset_index(drop=True) + y_center = y.iloc[center_indices].reset_index(drop=True) + else: + # Use full dataset if no center_id specified + X_center = X + y_center = y + + # Split into train/test for this center + if len(X_center) > 1: + X_train, X_test, y_train, y_test = train_test_split( + X_center, y_center, test_size=0.2, random_state=42, stratify=y_center + ) + else: + X_train, y_train = X_center, y_center + X_test, y_test = X_center.iloc[:0], y_center.iloc[:0] + + # Apply GLOBAL preprocessing parameters to both train and test sets + X_train_processed, feature_names = apply_preprocessing(X_train, global_preprocessing_params) + X_test_processed, _ = apply_preprocessing(X_test, global_preprocessing_params) + + # Convert targets to numpy arrays + # y_train_processed = y_train.values + # y_test_processed = y_test.values + + # # Print center statistics + # print(f"Center {center_id}/{num_centers} (alpha={alpha}):") + # print(f" Samples: {len(X_center)} (Train: {len(X_train_processed)}, Test: {len(X_test_processed)})") + # print(f" Features: {X_train_processed.shape[1]}/{len(X.columns)} selected") + # print(f" Data range: [{X_train_processed.min():.3f}, {X_train_processed.max():.3f}]") + # print(f" Normalized stats - Mean: {X_train_processed.mean():.4f}, Std: {X_train_processed.std():.4f}") + + return (X_train_processed, y_train), (X_test_processed, y_test) + def cvd_to_torch(config): pass @@ -695,6 +1050,8 @@ def load_dataset(config, id=None): return load_ukbb_cvd(config["data_path"], id, config) elif config["dataset"] == "kaggle_hf": return load_kaggle_hf(config["data_path"], id, config) + elif config["dataset"] == "diabetes": + return load_diabetes(id, config) elif config["dataset"] == "libsvm": return load_libsvm(config, id) elif config["dataset"] == "dt4h_format": diff --git a/flcore/models/random_forest/FedCustomAggregator.py b/flcore/models/random_forest/FedCustomAggregator.py index 0da2e6b..adb8842 100644 --- a/flcore/models/random_forest/FedCustomAggregator.py +++ b/flcore/models/random_forest/FedCustomAggregator.py @@ -153,14 +153,6 @@ def aggregate_fit( self.time_server_round = time.time() print(f"Elapsed time: {elapsed_time} for round {server_round}") metrics_aggregated['training_time [s]'] = self.accum_time - - filename = 'server_results.txt' - with open( - filename, - "a", - ) as f: - f.write(f"Accumulated Time: {self.accum_time} for round {server_round}\n") - return parameters_aggregated, metrics_aggregated @@ -194,15 +186,6 @@ def aggregate_evaluate( elif server_round == 1: # Only log this warning once log(WARNING, "No evaluate_metrics_aggregation_fn provided") - # filename = 'server_results.txt' - # with open( - # filename, - # "a", - # ) as f: - # f.write(f"Accuracy: {metrics_aggregated['accuracy']} \n") - # f.write(f"Sensitivity: {metrics_aggregated['sensitivity']} \n") - # f.write(f"Specificity: {metrics_aggregated['specificity']} \n") - return loss_aggregated, metrics_aggregated diff --git a/flcore/models/random_forest/client.py b/flcore/models/random_forest/client.py index 52e07cb..7c464f7 100644 --- a/flcore/models/random_forest/client.py +++ b/flcore/models/random_forest/client.py @@ -30,8 +30,9 @@ def __init__(self, data,client_id,config): # Load data (self.X_train, self.y_train), (self.X_test, self.y_test) = data self.splits_nested = datasets.split_partitions(n_folds_out,0.2, seed, self.X_train, self.y_train) - self.bal_RF = config['random_forest']['balanced_rf'] - self.model = utils.get_model(self.bal_RF) + self.bal_RF = True if config['model'] == 'balanced_random_forest' else False + self.model = utils.get_model(self.bal_RF) + self.round_time = 0 # Setting initial parameters, akin to model.compile for keras models utils.set_initial_params_client(self.model,self.X_train, self.y_train) def get_parameters(self, ins: GetParametersIns): # , config type: ignore @@ -64,6 +65,7 @@ def fit(self, ins: FitIns): # , parameters, config type: ignore #To implement the center dropout, we need the execution time start_time = time.time() self.model.fit(X_train_2, y_train_2) + elapsed_time = (time.time() - start_time) #accuracy = model.score( X_test, y_test ) # accuracy,specificity,sensitivity,balanced_accuracy, precision, F1_score = \ # measurements_metrics(self.model,X_val, y_val) @@ -76,8 +78,8 @@ def fit(self, ins: FitIns): # , parameters, config type: ignore # print(f"precision in fit: {precision}") # print(f"F1_score in fit: {F1_score}") - elapsed_time = (time.time() - start_time) metrics["running_time"] = elapsed_time + self.round_time = elapsed_time print(f"num_client {self.client_id} has an elapsed time {elapsed_time}") @@ -108,6 +110,8 @@ def evaluate(self, ins: EvaluateIns): # , parameters, config type: ignore # measurements_metrics(self.model,self.X_test, self.y_test) y_pred = self.model.predict(self.X_test) metrics = calculate_metrics(self.y_test, y_pred) + metrics["round_time [s]"] = self.round_time + metrics["client_id"] = self.client_id # print(f"Accuracy client in evaluate: {accuracy}") # print(f"Sensitivity client in evaluate: {sensitivity}") # print(f"Specificity client in evaluate: {specificity}") diff --git a/flcore/models/random_forest/server.py b/flcore/models/random_forest/server.py index acbfd1b..06b538c 100644 --- a/flcore/models/random_forest/server.py +++ b/flcore/models/random_forest/server.py @@ -33,7 +33,7 @@ def fit_round( server_round: int ) -> Dict: def get_server_and_strategy(config): - bal_RF = config['random_forest']['balanced_rf'] + bal_RF = True if config['model'] == 'balanced_random_forest' else False model = get_model(bal_RF) utils.set_initial_params_server( model) diff --git a/flcore/models/weighted_random_forest/client.py b/flcore/models/weighted_random_forest/client.py index 74fa60e..bd7b801 100644 --- a/flcore/models/weighted_random_forest/client.py +++ b/flcore/models/weighted_random_forest/client.py @@ -94,7 +94,7 @@ def __init__(self, data,client_id,config): # Load data (self.X_train, self.y_train), (self.X_test, self.y_test) = data self.splits_nested = datasets.split_partitions(n_folds_out,0.2, seed, self.X_train, self.y_train) - self.bal_RF = config['weighted_random_forest']['balanced_rf'] + self.bal_RF = True if config['model'] == 'balanced_random_forest' else False self.model = utils.get_model(self.bal_RF) # Setting initial parameters, akin to model.compile for keras models utils.set_initial_params_client(self.model,self.X_train, self.y_train) diff --git a/flcore/models/weighted_random_forest/server.py b/flcore/models/weighted_random_forest/server.py index 877b871..20539c2 100644 --- a/flcore/models/weighted_random_forest/server.py +++ b/flcore/models/weighted_random_forest/server.py @@ -32,7 +32,7 @@ def fit_round( server_round: int ) -> Dict: def get_server_and_strategy(config): - bal_RF = config['weighted_random_forest']['balanced_rf'] + bal_RF = True if config['model'] == 'balanced_random_forest' else False model = get_model(bal_RF) utils.set_initial_params_server( model) diff --git a/flcore/models/xgb/fed_custom_strategy.py b/flcore/models/xgb/fed_custom_strategy.py index 20dbe55..9f74f4d 100644 --- a/flcore/models/xgb/fed_custom_strategy.py +++ b/flcore/models/xgb/fed_custom_strategy.py @@ -143,4 +143,10 @@ def aggregate_fit( elif server_round == 1: # Only log this warning once log(WARNING, "No fit_metrics_aggregation_fn provided") + elapsed_time = (time.time() - self.time_server_round) + self.accum_time = self.accum_time+ elapsed_time + self.time_server_round = time.time() + print(f"Elapsed time: {elapsed_time} for round {server_round}") + metrics_aggregated['training_time [s]'] = self.accum_time + return [parameters_aggregated, trees_aggregated], metrics_aggregated \ No newline at end of file diff --git a/flcore/server_selector.py b/flcore/server_selector.py index 3ba5a06..8c5e010 100644 --- a/flcore/server_selector.py +++ b/flcore/server_selector.py @@ -13,7 +13,7 @@ def get_model_server_and_strategy(config, data=None): server, strategy = linear_models_server.get_server_and_strategy( config ) - elif model == "random_forest": + elif model in ("random_forest", "balanced_random_forest"): server, strategy = random_forest_server.get_server_and_strategy( config ) diff --git a/requirements.txt b/requirements.txt index 13078ec..fc7ee35 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,5 +11,6 @@ scikit_learn==1.2.2 torch==2.0.1 torchmetrics==0.11.4 tqdm==4.65.0 +ucimlrepo==0.0.7 xgboost==1.7.5 pdfkit==1.0.0 diff --git a/tests/test_models.py b/tests/test_models.py index 669f1d0..feb2cc2 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -12,12 +12,18 @@ LOGGING_LEVEL = logging.INFO # WARNING # logging.INFO model_names = [ -# "logistic_regression", -# "elastic_net", -# "lsvc", + "logistic_regression", + "elastic_net", + "lsvc", "random_forest", - # "weighted_random_forest", - # "xgb" + "balanced_random_forest", + # # "weighted_random_forest", + "xgb" + ] + +datasets = [ + "kaggle_hf", + "diabetes", ] def free_port(port): @@ -39,12 +45,18 @@ def setup_class(self): @pytest.mark.parametrize( "model_name", - model_names + model_names, + ) + @pytest.mark.parametrize( + "dataset_name", + datasets, ) def test_get_model_client( - self, model_name + self, model_name, dataset_name ): self.config["model"] = model_name + self.config['data_path'] = 'dataset/' + self.config["dataset"] = dataset_name from flcore.client_selector import get_model_client from flcore.datasets import load_dataset @@ -57,22 +69,27 @@ def test_get_model_client( @pytest.mark.parametrize( "model_name", - model_names + model_names, ) - def test_run(self, model_name): + @pytest.mark.parametrize( + "dataset_name", + datasets, + ) + def test_run(self, model_name, dataset_name): self.config["model"] = model_name + self.config["dataset"] = dataset_name with open("config.yaml", "r") as f: config = yaml.safe_load(f) config = self.config - with open("config.yaml", "w") as f: + with open("tmp_test_config.yaml", "w") as f: yaml.dump(config, f) free_port(config["local_port"]) run_log = open("run.log", "w") - run_process = subprocess.Popen("python run.py", shell=True, stdout=run_log, stderr=run_log) + run_process = subprocess.Popen("python run.py tmp_test_config.yaml", shell=True, stdout=run_log, stderr=run_log) timer = Timer(180, run_process.kill) try: @@ -85,5 +102,8 @@ def test_run(self, model_name): run_log.close() run_log = open("run.log", "r") print(run_log.read()) + + # Delete temporary config file + os.remove("tmp_test_config.yaml") assert run_process.returncode == 0