Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (C) 2025 Intel Corporation
# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.

collaborators:
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Copyright (C) 2025 Intel Corporation
# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.

# collaborator_name,data_directory_path
one,1
Comment thread
tanwarsh marked this conversation as resolved.
Outdated
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
../../workspace/plan/defaults
Comment thread
tanwarsh marked this conversation as resolved.
Outdated

45 changes: 45 additions & 0 deletions openfl-workspace/federated_analytics/smokers_health/plan/plan.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
aggregator:
defaults: plan/defaults/aggregator.yaml
template: openfl.component.Aggregator
settings:
last_state_path: save/result.json
rounds_to_train: 1

collaborator:
defaults: plan/defaults/collaborator.yaml
template: openfl.component.Collaborator
settings:
use_delta_updates: false
opt_treatment: RESET

data_loader:
defaults: plan/defaults/data_loader.yaml
template: src.dataloader.SmokersHealthDataLoader
settings:
collaborator_count: 2
data_group_name: smokers_health
batch_size: 150

task_runner:
defaults: plan/defaults/task_runner.yaml
template: src.taskrunner.SmokersHealthAnalytics

network:
defaults: plan/defaults/network.yaml

assigner:
template: openfl.component.RandomGroupedAssigner
settings:
task_groups:
- name: analytics
percentage: 1.0
tasks:
- analytics

tasks:
analytics:
function: analytics
aggregation_type:
template: src.aggregatehealth.AggregateHealthMetrics
kwargs:
columns: ['age', 'sex', 'current_smoker', 'heart_rate', 'blood_pressure', 'cigs_per_day', 'chol']
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import numpy as np
Comment thread
tanwarsh marked this conversation as resolved.
from openfl.interface.aggregation_functions.core import AggregationFunction


class AggregateHealthMetrics(AggregationFunction):
"""Aggregation logic for Smokers Health analytics."""

def call(self, local_tensors, *_) -> dict:
"""
Aggregates local tensors which contains mean of local health metrics such as
heart_rate_mean, cholesterol, systolic_blood_pressure, and
diastolic_blood_pressure which are grouped by age, sex and if they smoke or not.
Each tensor represents local metrics for these health parameters.

Args:
local_tensors (list): A list of objects, each containing a `tensor` attribute
that represents local means for the health metrics.
*_: Additional arguments (unused).
Returns:
dict: A dictionary containing the aggregated means for each health metric.
Raises:
ValueError: If the input list `local_tensors` is empty, indicating
that there are no metrics to aggregate.
"""

if not local_tensors:
raise ValueError("No local metrics to aggregate.")

agg_histogram = np.zeros_like(local_tensors[0].tensor)
for local_tensor in local_tensors:
agg_histogram += local_tensor.tensor / len(local_tensors)
return agg_histogram
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from openfl.federated.data.loader import DataLoader
Comment thread
tanwarsh marked this conversation as resolved.
import pandas as pd
import os
import subprocess


class SmokersHealthDataLoader(DataLoader):
"""Data Loader for Smokers Health Dataset."""

def __init__(self, batch_size, data_path, **kwargs):
super().__init__(**kwargs)

Comment thread
rahulga1 marked this conversation as resolved.
# Download and prepare data
self._download_raw_data()
self.data_shard = self.load_data_shard(
shard_num=int(data_path), **kwargs
)

def _download_raw_data(self):
"""
Downloads and extracts the raw data for the smokers' health dataset.
This method performs the following steps:
1. Downloads the dataset from the specified Kaggle URL using the `curl` command.
2. Saves the downloaded file as a ZIP archive in the `./data` directory.
3. Extracts the contents of the ZIP archive into the `data` directory.
"""

download_path = os.path.expanduser('./data/smokers_health.zip')
subprocess.run(
[
'curl', '-L', '-o', download_path,
'https://www.kaggle.com/api/v1/datasets/download/jaceprater/smokers-health-data'
],
check=True
)

# Unzip the downloaded file into the data directory
subprocess.run(['unzip', '-o', download_path, '-d', 'data'], check=True)

def load_data_shard(self, shard_num, **kwargs):
"""
Loads data from a CSV file.
This method reads the data from a CSV file located at './data/smoking_health_data_final.csv'
and returns it as a pandas DataFrame.
Returns:
pd.DataFrame: The data loaded from the CSV file.
"""
file_path = os.path.join('data', 'smoking_health_data_final.csv')
df = pd.read_csv(file_path)

# Split data into shards
shard_size = len(df) // shard_num
start_idx = shard_size * (shard_num - 1)
end_idx = start_idx + shard_size

return df.iloc[start_idx:end_idx]

def query(self, columns, **kwargs):
"""
Query the data shard for the specified columns.
Args:
columns (list): A list of column names to query from the data shard.
**kwargs: Additional keyword arguments (currently not used).
Returns:
DataFrame: A DataFrame containing the data for the specified columns.
Raises:
ValueError: If the columns parameter is not a list.
"""
if not isinstance(columns, list):
raise ValueError("Columns parameter must be a list")
return self.data_shard[columns]

def get_feature_shape(self):
"""
This function is not required and is kept for compatibility.

Returns:
None
"""
pass
118 changes: 118 additions & 0 deletions openfl-workspace/federated_analytics/smokers_health/src/runner_fa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0


"""
Base classes for Federated Analytics.

This file can serve as a template for creating your own Federated Analytics experiments.
"""

from openfl.federated.task.runner import TaskRunner
from openfl.utilities import TensorKey
from openfl.utilities.split import split_tensor_dict_for_holdouts

import logging
import numpy as np

logger = logging.getLogger(__name__)


class FederatedAnalyticsTaskRunner(TaskRunner):
"""The base class for Federated Analytics Task Runner."""

def __init__(self, **kwargs):
"""Initializes the FederatedAnalyticsTaskRunner instance.

Args:
**kwargs: Additional parameters to pass to the function
"""
super().__init__(**kwargs)

# Dummy model initialization. Dummy models and weights are used here as placeholders
# to ensure compatibility with the core OpenFL framework, which currently assumes
# the presence of a model for federated learning tasks.
#
# This approach is necessary to support Federated Analytics use cases, which do not
# involve traditional model training, until OpenFL is refactored to accommodate
# broader use cases beyond learning.
#
# For more details, refer to the discussion at:
# https://github.com/securefederatedai/openfl/discussions/1385#discussioncomment-13009961.
self.model = None

self.model_tensor_names = []
self.required_tensorkeys_for_function = {}

def analytics(self, col_name, round_num, **kwargs):
"""
Return analytics result as tensors.

Args:
col_name (str): collaborator name.
round_num (int): The current round number.
**kwargs: Additional parameters for analysis.

Returns:
dict: A dictionary of analysis results.
"""
results = self.analytics_task(**kwargs)
tags = ("analytics",)
origin = col_name
for metric_name, metric_value in results.items():
print(f"Key: {metric_name}, Value: {metric_value}")
Comment thread
tanwarsh marked this conversation as resolved.
Outdated
output_metric_dict = {
# TensorKey(metric_name, origin, round_num, False, tags): metric_value
TensorKey(metric_name, origin, round_num, False, tags): np.array(metric_value) if not isinstance(metric_value, np.ndarray) else metric_value
for metric_name, metric_value in results.items()
}
print("type(output_metric_dict):", type(output_metric_dict))
return output_metric_dict, output_metric_dict

def analytics_task(self, **kwargs):
"""
Perform analytics on the provided data.
This method should be implemented by subclasses to perform specific analysis tasks.
Args:
**kwargs: Arbitrary keyword arguments that can be used for analysis.
Raises:
NotImplementedError: If the method is not implemented by a subclass.
"""
raise NotImplementedError

def get_tensor_dict(self, with_opt_vars, suffix=""):
"""
Get the model weights as a tensor dictionary.

Args:
with_opt_vars (bool): If we should include the optimizer's status.
suffix (str): Universally.

Returns:
model_weights (dict): The tensor dictionary.
"""
return {'dummy_tensor': np.float32(1)}

def get_required_tensorkeys_for_function(self, func_name, **kwargs):
"""Get the required tensors for specified function that could be called
as part of a task.

By default, this is just all of the layers and optimizer of the dummy model.

Args:
func_name (str): The function name.
**kwargs: Any function arguments.

Returns:
list: List of TensorKey objects.
"""
return []

def initialize_tensorkeys_for_functions(self, with_opt_vars=False):
"""
This function is not required and is kept for compatibility.

Returns:
None
"""
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from src.runner_fa import FederatedAnalyticsTaskRunner
import pandas as pd
import numpy as np


class SmokersHealthAnalytics(FederatedAnalyticsTaskRunner):
"""
Taskrunner class for performing federated analytics on the Smokers Health dataset.
Methods
-------
analytics(columns, **kwargs)
Groups data by specified columns and calculates averages for selected metrics.
"""

def analytics_task(self, columns, **kwargs):
"""
Perform analytics on the specified columns and compute aggregated metrics.
Args:
columns (list): List of column names to group data by.
**kwargs: Additional keyword arguments for customization.
Returns:
dict: A dictionary where keys are formatted strings representing group identifiers,
and values are numpy arrays containing aggregated metrics.
"""
# query data
data = self.data_loader.query(columns)

grouped = data.groupby(['age', 'sex', 'current_smoker'])

# Convert mean values to numpy arrays if they are not already
result = grouped.agg({
'heart_rate': 'mean',
'chol': 'mean',
'blood_pressure': lambda x: self.process_blood_pressure(x).iloc[0]
})

# Convert the result into the desired format
formatted_result = {}

keys = ', heart_rate_mean, chol_mean, systolic_blood_pressure_mean, diastolic_blood_pressure_mean'
for index, row in result.iterrows():
age, sex, current_smoker = index
heart_rate_mean = row['heart_rate']
chol_mean = row['chol']
systolic_mean = row['blood_pressure'][0]
diastolic_mean = row['blood_pressure'][1]
combined_key = f"age_{age}_sex_{sex}_current_smoker_{current_smoker} {keys}"
formatted_result[combined_key] = np.array([
heart_rate_mean, chol_mean, systolic_mean, diastolic_mean
])
return formatted_result

# Process blood pressure data
def process_blood_pressure(self, bp_series):
"""
Processes a series of blood pressure readings and calculates the mean
systolic and diastolic values.
Args:
bp_series (pd.Series): A pandas Series containing blood pressure
readings in the format "systolic/diastolic" (e.g., "120/80").
Returns:
pd.DataFrame: A DataFrame with two columns:
- 'systolic_mean': The mean of valid systolic values, or None if no valid values exist.
- 'diastolic_mean': The mean of valid diastolic values, or None if no valid values exist.
Notes:
- Invalid or non-numeric blood pressure readings are ignored.
- If all readings are invalid, the resulting means will be None.
"""

systolic, diastolic = zip(*bp_series.str.split('/').map(
lambda x: (
float(x[0]) if x[0].replace('.', '', 1).isdigit() else None,
float(x[1]) if x[1].replace('.', '', 1).isdigit() else None
)
))
systolic = [s for s in systolic if s is not None]
diastolic = [d for d in diastolic if d is not None]
return pd.DataFrame({
'systolic_mean': [sum(systolic) / len(systolic) if systolic else None],
'diastolic_mean': [sum(diastolic) / len(diastolic) if diastolic else None]
})
Loading