TA_EC/FED.py
2025-03-09 22:36:22 +08:00

281 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset
import numpy as np
import copy
from tqdm import tqdm
from model.repvit import repvit_m1_1
from model.mobilenetv3 import MobileNetV3
# 配置参数
NUM_CLIENTS = 4
NUM_ROUNDS = 3
CLIENT_EPOCHS = 5
BATCH_SIZE = 32
TEMP = 2.0 # 蒸馏温度
# 设备配置
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 数据准备
def prepare_data(num_clients):
transform = transforms.Compose([
transforms.Resize((224, 224)), # 将图像调整为 224x224
transforms.Grayscale(num_output_channels=3),
transforms.ToTensor()
])
train_set = datasets.MNIST("./data", train=True, download=True, transform=transform)
# 非IID数据划分每个客户端2个类别
client_data = {i: [] for i in range(num_clients)}
labels = train_set.targets.numpy()
for label in range(10):
label_idx = np.where(labels == label)[0]
np.random.shuffle(label_idx)
split = np.array_split(label_idx, num_clients//2)
for i, idx in enumerate(split):
client_data[i*2 + label%2].extend(idx)
return [Subset(train_set, ids) for ids in client_data.values()]
# 客户端训练函数
def client_train(client_model, server_model, dataset):
client_model.train()
server_model.eval()
optimizer = torch.optim.SGD(client_model.parameters(), lr=0.1)
loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)
# 训练进度条
progress_bar = tqdm(total=CLIENT_EPOCHS*len(loader),
desc="Client Training",
unit="batch")
for epoch in range(CLIENT_EPOCHS):
epoch_loss = 0.0
task_loss = 0.0
distill_loss = 0.0
correct = 0
total = 0
for batch_idx, (data, target) in enumerate(loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
# 前向传播
client_output = client_model(data)
# 获取教师模型输出
with torch.no_grad():
server_output = server_model(data)
# 计算损失
loss_task = F.cross_entropy(client_output, target)
loss_distill = F.kl_div(
F.log_softmax(client_output/TEMP, dim=1),
F.softmax(server_output/TEMP, dim=1),
reduction="batchmean"
) * (TEMP**2)
total_loss = loss_task + loss_distill
# 反向传播
total_loss.backward()
optimizer.step()
# 统计指标
epoch_loss += total_loss.item()
task_loss += loss_task.item()
distill_loss += loss_distill.item()
_, predicted = torch.max(client_output.data, 1)
correct += (predicted == target).sum().item()
total += target.size(0)
# 实时更新进度条
progress_bar.set_postfix({
"Epoch": f"{epoch+1}/{CLIENT_EPOCHS}",
"Batch": f"{batch_idx+1}/{len(loader)}",
"Loss": f"{total_loss.item():.4f}",
"Acc": f"{100*correct/total:.2f}%\n",
})
progress_bar.update(1)
# 每10个batch打印详细信息
if (batch_idx + 1) % 10 == 0:
progress_bar.write(f"\nEpoch {epoch+1} | Batch {batch_idx+1}")
progress_bar.write(f"Task Loss: {loss_task:.4f}")
progress_bar.write(f"Distill Loss: {loss_distill:.4f}")
progress_bar.write(f"Total Loss: {total_loss:.4f}")
progress_bar.write(f"Batch Accuracy: {100*correct/total:.2f}%\n")
# 每个epoch结束打印汇总信息
avg_loss = epoch_loss / len(loader)
avg_task = task_loss / len(loader)
avg_distill = distill_loss / len(loader)
epoch_acc = 100 * correct / total
print(f"\n{'='*40}")
print(f"Epoch {epoch+1} Summary:")
print(f"Average Loss: {avg_loss:.4f}")
print(f"Task Loss: {avg_task:.4f}")
print(f"Distill Loss: {avg_distill:.4f}")
print(f"Training Accuracy: {epoch_acc:.2f}%")
print(f"{'='*40}\n")
progress_bar.close()
return client_model.state_dict()
# 模型参数聚合FedAvg
def aggregate(client_params):
global_params = {}
for key in client_params[0].keys():
global_params[key] = torch.stack([param[key].float() for param in client_params]).mean(dim=0)
return global_params
# 服务器知识更新
def server_update(server_model, client_models, public_loader):
server_model.train()
optimizer = torch.optim.Adam(server_model.parameters(), lr=0.001)
total_loss = 0.0
progress_bar = tqdm(public_loader, desc="Server Updating", unit="batch")
for batch_idx, (data, _) in enumerate(progress_bar):
data = data.to(device)
optimizer.zero_grad()
# 获取客户端模型的平均输出
with torch.no_grad():
client_outputs = [model(data).mean(dim=0, keepdim=True) for model in client_models]
soft_targets = torch.stack(client_outputs).mean(dim=0)
# 蒸馏学习
server_output = server_model(data)
loss = F.kl_div(
F.log_softmax(server_output, dim=1),
F.softmax(soft_targets, dim=1),
reduction="batchmean"
)
# 反向传播
loss.backward()
optimizer.step()
# 更新统计信息
total_loss += loss.item()
progress_bar.set_postfix({
"Avg Loss": f"{total_loss/(batch_idx+1):.4f}",
"Current Loss": f"{loss.item():.4f}"
})
print(f"\nServer Update Complete | Average Loss: {total_loss/len(public_loader):.4f}\n")
def test_model(model, test_loader):
model.eval()
correct = 0
total = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
_, predicted = torch.max(output.data, 1)
total += target.size(0)
correct += (predicted == target).sum().item()
accuracy = 100 * correct / total
return accuracy
# 主训练流程
def main():
# 初始化模型
global_server_model = repvit_m1_1(num_classes=10).to(device)
client_models = [MobileNetV3(n_class=10).to(device) for _ in range(NUM_CLIENTS)]
round_progress = tqdm(total=NUM_ROUNDS, desc="Federated Rounds", unit="round")
# 准备数据
client_datasets = prepare_data(NUM_CLIENTS)
public_loader = DataLoader(
datasets.MNIST("./data", train=False, download=True,
transform= transforms.Compose([
transforms.Resize((224, 224)), # 将图像调整为 224x224
transforms.Grayscale(num_output_channels=3),
transforms.ToTensor() # 将图像转换为张量
])),
batch_size=100, shuffle=True)
test_dataset = datasets.MNIST(
"./data",
train=False,
transform= transforms.Compose([
transforms.Resize((224, 224)), # 将图像调整为 224x224
transforms.Grayscale(num_output_channels=3),
transforms.ToTensor() # 将图像转换为张量
])
)
test_loader = DataLoader(test_dataset, batch_size=100, shuffle=False)
for round in range(NUM_ROUNDS):
print(f"\n{'#'*50}")
print(f"Federated Round {round+1}/{NUM_ROUNDS}")
print(f"{'#'*50}")
# 客户端选择
selected_clients = np.random.choice(NUM_CLIENTS, 2, replace=False)
print(f"Selected Clients: {selected_clients}")
# 客户端本地训练
client_params = []
for cid in selected_clients:
print(f"\nTraining Client {cid}")
local_model = copy.deepcopy(client_models[cid])
local_model.load_state_dict(client_models[cid].state_dict())
updated_params = client_train(local_model, global_server_model, client_datasets[cid])
client_params.append(updated_params)
# 模型聚合
global_client_params = aggregate(client_params)
for model in client_models:
model.load_state_dict(global_client_params)
# 服务器知识更新
print("\nServer Updating...")
server_update(global_server_model, client_models, public_loader)
# 测试模型性能
server_acc = test_model(global_server_model, test_loader)
client_acc = test_model(client_models[0], test_loader)
print(f"\nRound {round+1} Performance:")
print(f"Global Model Accuracy: {server_acc:.2f}%")
print(f"Client Model Accuracy: {client_acc:.2f}%")
round_progress.update(1)
print(f"Round {round+1} completed")
print("Training completed!")
# 保存训练好的模型
torch.save(global_server_model.state_dict(), "server_model.pth")
torch.save(client_models[0].state_dict(), "client_model.pth")
print("Models saved successfully.")
# 创建测试数据加载器
# 测试服务器模型
server_model = repvit_m1_1(num_classes=10).to(device)
server_model.load_state_dict(torch.load("server_model.pth"))
server_acc = test_model(server_model, test_loader)
print(f"Server Model Test Accuracy: {server_acc:.2f}%")
# 测试客户端模型
client_model = MobileNetV3(n_class=10).to(device)
client_model.load_state_dict(torch.load("client_model.pth"))
client_acc = test_model(client_model, test_loader)
print(f"Client Model Test Accuracy: {client_acc:.2f}%")
if __name__ == "__main__":
main()