DDPM

Important: The generative model presented in this article comes from an assignment in USC EE660, a course taught by Professor Stephen Lyle Tu. Since I skipped some prerequisite ML courses, my foundational understanding of ML principles is not particularly strong, and I am not particularly interested in ML theory. As a result, my grade in this course was relatively low. However, Professor Stephen still put a great deal of effort into this course, teaching the theoretical foundations of many algorithms. Although theory can be dry, it is indeed a crucial part of understanding algorithms! I highly recommend taking this course—don’t worry about your GPA; just learn what you want to learn.

It is important to note that the generative model presented in this article is very simple. You don’t even need to implement the model yourself; you can simply use the corresponding libraries. Using open-source libraries is much more convenient. This article aims to highlight some finer details of generative algorithms. Additionally, simply calling GPT can generate similar code.

重要:本文展示的生成式模型来自USC EE660的作业。该科程来自Stephen Lyle Tu。由于我跳过了一些ML的前置科程,一些ML的基础原理并没有特别扎实,并且我对ML的理论知识其实并不感兴趣。所以我这门课程的分数较低。但是Pro Stephen依旧在这门科程上付出了许多心血。教授了许多算法的理论基础。尽管理论是枯燥的,但这确实是理解算法特别重要的一环!我强烈推荐去上这门课。不要担心自己的GPA。想学什么就学什么。
需要注意的是本文展示的生成式模型是十分简单的,你甚至不用自己去实现这个模型,直接去使用对应的库就可以。使用开源库更加方便,此文指在展示生成式算法的一些细枝末节。并且简单的调用GPT也可以生成出这种代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99

# define the MLP denoiser
class MLPDenoiser(nn.Module):
def __init__(self, input_dim=2, hidden_dim=128):
super().__init__()
self.net = nn.Sequential(
nn.Linear(input_dim + 1, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, input_dim)
)

def forward(self, x, t):
t_embed = t.view(-1, 1) # time step as an additional feature
x_t = torch.cat([x, t_embed], dim=1)
return self.net(x_t)

# DDPM parameters
T = 1000 # total time steps
beta = torch.linspace(0.0001, 0.02, T)
alpha = 1 - beta
alpha_bar = torch.cumprod(alpha, dim=0)

def q_sample(x0, t, noise):
sqrt_alpha_bar = torch.sqrt(alpha_bar[t]).view(-1, 1)
sqrt_one_minus_alpha_bar = torch.sqrt(1 - alpha_bar[t]).view(-1, 1)
return sqrt_alpha_bar * x0 + sqrt_one_minus_alpha_bar * noise

# initialize the model, optimizer and loss function
batch_size = 128
dataset = TensorDataset(X_train)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MLPDenoiser().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()

# train the model
num_epochs = 5000
for epoch in range(num_epochs):
for x0_batch, in dataloader:
x0_batch = x0_batch.to(device)
t = torch.randint(0, T, (x0_batch.size(0),), device=device)
noise = torch.randn_like(x0_batch)
x_t = q_sample(x0_batch, t, noise)

noise_pred = model(x_t, t.float())
loss = loss_fn(noise_pred, noise)

optimizer.zero_grad()
loss.backward()
optimizer.step()

if (epoch + 1) % 20 == 0:
print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item():.4f}")

# generate new data
def p_sample(x, t):
with torch.no_grad():
t = t.long() # convert to integer
noise_pred = model(x, t.float()) # predict the noise

# calculate the coefficients
coef1 = (1 / torch.sqrt(alpha[t])).view(-1, 1).expand(-1, x.shape[1])
coef2 = ((1 - alpha[t]) / torch.sqrt(1 - alpha_bar[t])).view(-1, 1).expand(-1, x.shape[1])

x_prev = coef1 * (x - coef2 * noise_pred) # calculate x_{t-1}

# fix the broadcasting issue
nonzero_mask = (t > 0).float().view(-1, 1) # make shape compatible
z = torch.randn_like(x) # generate noise
sigma_t = torch.sqrt(beta[t]).view(-1, 1).expand(-1, x.shape[1]) # dimension broadcasting

x_prev = x_prev + nonzero_mask * sigma_t * z # add noise
return x_prev



# generate new data
num_samples = 1000
x_T = torch.randn((num_samples, 2), device=device)
x_t = x_T
for t in reversed(range(T)):
t_tensor = torch.full((num_samples,), t, device=device, dtype=torch.float32)
x_t = p_sample(x_t, t_tensor)

x_gen = x_t.cpu().numpy()

# show data
plt.figure(figsize=(6, 6))
plt.scatter(X_train[:, 0], X_train[:, 1], alpha=0.3, label="Real Data")
plt.scatter(x_gen[:, 0], x_gen[:, 1], alpha=0.3, label="Generated Data")
plt.legend()
plt.show()