1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| # define the MLP denoiser class MLPDenoiser(nn.Module): def __init__(self, input_dim=2, hidden_dim=128): super().__init__() self.net = nn.Sequential( nn.Linear(input_dim + 1, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, input_dim) ) def forward(self, x, t): t_embed = t.view(-1, 1) # time step as an additional feature x_t = torch.cat([x, t_embed], dim=1) return self.net(x_t)
# DDPM parameters T = 1000 # total time steps beta = torch.linspace(0.0001, 0.02, T) alpha = 1 - beta alpha_bar = torch.cumprod(alpha, dim=0)
def q_sample(x0, t, noise): sqrt_alpha_bar = torch.sqrt(alpha_bar[t]).view(-1, 1) sqrt_one_minus_alpha_bar = torch.sqrt(1 - alpha_bar[t]).view(-1, 1) return sqrt_alpha_bar * x0 + sqrt_one_minus_alpha_bar * noise
# initialize the model, optimizer and loss function batch_size = 128 dataset = TensorDataset(X_train) dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = MLPDenoiser().to(device) optimizer = optim.Adam(model.parameters(), lr=1e-3) loss_fn = nn.MSELoss()
# train the model num_epochs = 5000 for epoch in range(num_epochs): for x0_batch, in dataloader: x0_batch = x0_batch.to(device) t = torch.randint(0, T, (x0_batch.size(0),), device=device) noise = torch.randn_like(x0_batch) x_t = q_sample(x0_batch, t, noise) noise_pred = model(x_t, t.float()) loss = loss_fn(noise_pred, noise) optimizer.zero_grad() loss.backward() optimizer.step() if (epoch + 1) % 20 == 0: print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item():.4f}")
# generate new data def p_sample(x, t): with torch.no_grad(): t = t.long() # convert to integer noise_pred = model(x, t.float()) # predict the noise
# calculate the coefficients coef1 = (1 / torch.sqrt(alpha[t])).view(-1, 1).expand(-1, x.shape[1]) coef2 = ((1 - alpha[t]) / torch.sqrt(1 - alpha_bar[t])).view(-1, 1).expand(-1, x.shape[1])
x_prev = coef1 * (x - coef2 * noise_pred) # calculate x_{t-1}
# fix the broadcasting issue nonzero_mask = (t > 0).float().view(-1, 1) # make shape compatible z = torch.randn_like(x) # generate noise sigma_t = torch.sqrt(beta[t]).view(-1, 1).expand(-1, x.shape[1]) # dimension broadcasting
x_prev = x_prev + nonzero_mask * sigma_t * z # add noise return x_prev
# generate new data num_samples = 1000 x_T = torch.randn((num_samples, 2), device=device) x_t = x_T for t in reversed(range(T)): t_tensor = torch.full((num_samples,), t, device=device, dtype=torch.float32) x_t = p_sample(x_t, t_tensor)
x_gen = x_t.cpu().numpy()
# show data plt.figure(figsize=(6, 6)) plt.scatter(X_train[:, 0], X_train[:, 1], alpha=0.3, label="Real Data") plt.scatter(x_gen[:, 0], x_gen[:, 1], alpha=0.3, label="Generated Data") plt.legend() plt.show()
|