Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
94 changes: 94 additions & 0 deletions openfl-workspace/federated_analytics/smokers_health/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Federated Analytics: Smokers Health Example

This workspace demonstrates how to use OpenFL for privacy-preserving analytics on the Smokers Health dataset. The setup enables distributed computation of health statistics (such as heart rate, cholesterol, and blood pressure) across multiple collaborators, without sharing raw data.

## Instantiating a Workspace from Smokers Health Template
To instantiate a workspace from the `federated_analytics/smokers_health` template, use the `fx workspace create` command. This will set up a new workspace with the required configuration and code.

1. **Install dependencies:**
```bash
pip install virtualenv
mkdir ~/openfl-smokers-health
virtualenv ~/openfl-smokers-health/venv
source ~/openfl-smokers-health/venv/bin/activate
pip install openfl
```

2. **Create the Workspace Folder:**
```bash
cd ~/openfl-smokers-health
fx workspace create --template federated_analytics/smokers_health --prefix fl_workspace
cd ~/openfl-smokers-health/fl_workspace
```

## Directory Structure
The workspace has the following structure:
```
smokers_health
├── requirements.txt
├── .workspace
├── plan
│ ├── plan.yaml
│ ├── cols.yaml
│ ├── data.yaml
│ └── defaults/
├── src
│ ├── __init__.py
│ ├── dataloader.py
│ ├── taskrunner.py
│ └── aggregate_health.py
├── data/
└── save/
```

### Directory Breakdown
- **requirements.txt**: Lists all Python dependencies for the workspace.
- **plan/**: Contains configuration files for the federation:
- `plan.yaml`: Main plan declaration.
- `cols.yaml`: List of authorized collaborators.
- `data.yaml`: Data path for each collaborator.
- `defaults/`: Default configuration values.
- **src/**: Python modules for federated analytics:
- `dataloader.py`: Loads and shards the Smokers Health dataset, supports SQL queries.
- `taskrunner.py`: Groups data and computes mean health metrics by age, sex, and smoking status.
- `aggregatehealth.py`: Aggregates results from all collaborators.
- **data/**: Place to store the downloaded and unzipped dataset.
- **save/**: Stores aggregated results and analytics outputs.

## Data Preparation
The data loader will automatically download the Smokers Health dataset from Kaggle or a specified source. Make sure you have the required access or download the dataset manually if needed.

## Defining the Data Loader
The data loader supports SQL-like queries and can load data from CSV or other sources as configured. It shards the dataset among collaborators and provides query functionality for analytics tasks.

## Defining the Task Runner
The task runner groups the data by `age`, `sex`, and `current_smoker`, and computes the mean of `heart_rate`, `chol`, and `blood pressure (systolic/diastolic)`. The results are returned as numpy arrays for aggregation.

## Running the Federation
1. **Initialize the plan:**
```bash
fx plan initialize
```
2. **Set up the aggregator and collaborators:**
```bash
fx workspace certify
fx aggregator generate-cert-request
fx aggregator certify --silent

fx collaborator create -n collaborator1 -d 1
fx collaborator generate-cert-request -n collaborator1
fx collaborator certify -n collaborator1 --silent

fx collaborator create -n collaborator2 -d 2
fx collaborator generate-cert-request -n collaborator2
fx collaborator certify -n collaborator2 --silent
```
3. **Start the federation:**
```bash
fx aggregator start &
fx collaborator start -n collaborator1 &
fx collaborator start -n collaborator2 &
```

## License
This project is licensed under the Apache License 2.0. See the LICENSE file for details.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (C) 2025 Intel Corporation
# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.

collaborators:
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (C) 2025 Intel Corporation
# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.

# collaborator_name,data_directory_path
45 changes: 45 additions & 0 deletions openfl-workspace/federated_analytics/smokers_health/plan/plan.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
aggregator:
defaults: plan/defaults/aggregator.yaml
template: openfl.component.Aggregator
settings:
last_state_path: save/result.json
rounds_to_train: 1

collaborator:
defaults: plan/defaults/collaborator.yaml
template: openfl.component.Collaborator
settings:
use_delta_updates: false
opt_treatment: RESET

data_loader:
defaults: plan/defaults/data_loader.yaml
template: src.dataloader.SmokersHealthDataLoader
settings:
collaborator_count: 2
data_group_name: smokers_health
batch_size: 150

task_runner:
defaults: plan/defaults/task_runner.yaml
template: src.taskrunner.SmokersHealthAnalytics

network:
defaults: plan/defaults/network.yaml

assigner:
template: openfl.component.RandomGroupedAssigner
settings:
task_groups:
- name: analytics
percentage: 1.0
tasks:
- analytics

tasks:
analytics:
function: analytics
aggregation_type:
template: src.aggregate_health.AggregateHealthMetrics
kwargs:
columns: ['age', 'sex', 'current_smoker', 'heart_rate', 'blood_pressure', 'cigs_per_day', 'chol']
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import numpy as np
from openfl.interface.aggregation_functions.core import AggregationFunction


class AggregateHealthMetrics(AggregationFunction):
"""Aggregation logic for Smokers Health analytics."""

def call(self, local_tensors, *_) -> dict:
"""
Aggregates local tensors which contains mean of local health metrics such as
heart_rate_mean, cholesterol, systolic_blood_pressure, and
diastolic_blood_pressure which are grouped by age, sex and if they smoke or not.
Each tensor represents local metrics for these health parameters.

Args:
local_tensors (list): A list of objects, each containing a `tensor` attribute
that represents local means for the health metrics.
*_: Additional arguments (unused).
Returns:
dict: A dictionary containing the aggregated means for each health metric.
Raises:
ValueError: If the input list `local_tensors` is empty, indicating
that there are no metrics to aggregate.
"""

if not local_tensors:
raise ValueError("No local metrics to aggregate.")

agg_histogram = np.zeros_like(local_tensors[0].tensor)
for local_tensor in local_tensors:
agg_histogram += local_tensor.tensor / len(local_tensors)
return agg_histogram
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

from openfl.federated.data.loader import DataLoader
Comment thread
tanwarsh marked this conversation as resolved.
import pandas as pd
import os
import subprocess


class SmokersHealthDataLoader(DataLoader):
"""Data Loader for Smokers Health Dataset."""

def __init__(self, batch_size, data_path, **kwargs):
super().__init__(**kwargs)

Comment thread
rahulga1 marked this conversation as resolved.
# If data_path is None, this is being used for model initialization only
if data_path is None:
return

# Load actual data if a data path is provided
try:
int(data_path)
except ValueError:
raise ValueError(
f"Expected '{data_path}' to be representable as `int`, "
"as it refers to the data shard number used by the collaborator."
)

# Download and prepare data
self._download_raw_data()
self.data_shard = self.load_data_shard(
shard_num=int(data_path), **kwargs
)

def _download_raw_data(self):
"""
Downloads and extracts the raw data for the smokers' health dataset.
This method performs the following steps:
1. Downloads the dataset from the specified Kaggle URL using the `curl` command.
2. Saves the downloaded file as a ZIP archive in the `./data` directory.
3. Extracts the contents of the ZIP archive into the `data` directory.
"""

download_path = os.path.expanduser('./data/smokers_health.zip')
subprocess.run(
[
'curl', '-L', '-o', download_path,
'https://www.kaggle.com/api/v1/datasets/download/jaceprater/smokers-health-data'
],
check=True
)

# Unzip the downloaded file into the data directory
subprocess.run(['unzip', '-o', download_path, '-d', 'data'], check=True)

def load_data_shard(self, shard_num, **kwargs):
"""
Loads data from a CSV file.
This method reads the data from a CSV file located at './data/smoking_health_data_final.csv'
and returns it as a pandas DataFrame.
Returns:
pd.DataFrame: The data loaded from the CSV file.
"""
file_path = os.path.join('data', 'smoking_health_data_final.csv')
df = pd.read_csv(file_path)

# Split data into shards
shard_size = len(df) // shard_num
start_idx = shard_size * (shard_num - 1)
end_idx = start_idx + shard_size

return df.iloc[start_idx:end_idx]

def query(self, columns, **kwargs):
"""
Query the data shard for the specified columns.
Args:
columns (list): A list of column names to query from the data shard.
**kwargs: Additional keyword arguments (currently not used).
Returns:
DataFrame: A DataFrame containing the data for the specified columns.
Raises:
ValueError: If the columns parameter is not a list.
"""
if not isinstance(columns, list):
raise ValueError("Columns parameter must be a list")
return self.data_shard[columns]

def get_feature_shape(self):
"""
This function is not required and is kept for compatibility.

Returns:
None
"""
pass
Loading
Loading