生产者消费者模型

我第一次写的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include<iostream>
#include<queue>
#include<thread>
#include<mutex>
#include<condition_variable>

std::queue<int> g_queue;
std::mutex g_mutex;
std::condition_variable g_cv;

void producer() {
for (int i = 0; i <= 10000; i++) {
std::unique_lock<std::mutex> lk(g_mutex);
g_queue.push(i);
std::cout << "producer:" << i << std::endl;
lk.unlock();
}
}

void consumer() {
while (true) {
std::unique_lock<std::mutex> lk(g_mutex);
g_cv.wait(lk, []() {
if (g_queue.empty()) {
return false;
}
return true;
});
int a = g_queue.front();
std::cout << "consumer:" << a << std::endl;
g_queue.pop();
lk.unlock();
g_cv.notify_one();
}
}
int main() {
std::thread thread1(producer);
std::thread thread2(consumer);

thread1.join();
thread2.join();

return 0;
}

代码的问题

这段代码能跑通,但存在三个明显缺陷

  1. 生产者一次都没有通知消费者,线程完全靠“自旋+假唤醒”干活,CPU 空转。

  2. 消费者在队列空时会无限等待,程序无法正常结束。

  3. 通知粒度太细(每 pop 一次就 notify_one),反而容易惊群。

  4. 主要是生产者没有通知消费者,导致消费者可能一直等待;此外,消费者没有退出机制,即使生产者结束,消费者也会无限等待。

  5. 问题分析:

    1. 生产者没有通知消费者:在生产者中,您解锁了互斥锁,但没有调用 g_cv.notify_one() 来唤醒可能等待的消费者。因此,消费者可能永远无法被唤醒。
    2. 消费者没有退出条件:消费者循环是无限的,即使生产者结束且队列为空,消费者也会继续等待,导致程序无法正常退出。
    3. 条件变量的使用:在消费者的等待条件中,直接返回 truefalse 可能不够清晰,且缺少生产者结束的判断。

优化后

您的代码有一些问题,主要是生产者没有通知消费者,导致消费者可能一直等待;此外,消费者没有退出机制,即使生产者结束,消费者也会无限等待。以下是优化后的代码,并附上解释。

优化后的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>

std::queue<int> g_queue;
std::mutex g_mutex;
std::condition_variable g_cv;
bool g_done = false; // 标志生产者是否完成

void producer() {
for (int i = 0; i <= 10000; i++) {
{
std::lock_guard<std::mutex> lk(g_mutex);
g_queue.push(i);
std::cout << "producer:" << i << std::endl;
} // lock_guard 自动解锁
g_cv.notify_one(); // 通知消费者
}
{
std::lock_guard<std::mutex> lk(g_mutex);
g_done = true; // 生产完成
}
g_cv.notify_one(); // 最后通知一次,确保消费者退出
}

void consumer() {
while (true) {
std::unique_lock<std::mutex> lk(g_mutex);
// 等待条件:队列非空或生产已完成
g_cv.wait(lk, []() {
return !g_queue.empty() || g_done;
});
// 如果队列为空且生产已完成,则退出
if (g_queue.empty() && g_done) {
break;
}
// 取出数据
if (!g_queue.empty()) {
int a = g_queue.front();
g_queue.pop();
std::cout << "consumer:" << a << std::endl;
}
lk.unlock(); // 解锁,允许其他操作
}
}

int main() {
std::thread thread1(producer);
std::thread thread2(consumer);

thread1.join();
thread2.join();

std::cout << "Main thread: all done!" << std::endl;
return 0;
}

优化说明:

  1. 添加生产者结束标志:引入 g_done 变量,生产者完成后将其设置为 true
  2. 生产者通知消费者:生产者在每次添加数据后调用 g_cv.notify_one() 来唤醒消费者。在生产完成后,再次通知以确保消费者能退出。
  3. 消费者退出机制:消费者在等待条件时,检查队列是否为空以及生产者是否完成。如果队列为空且生产已完成,则退出循环。
  4. 使用 std::lock_guard:在生产者中,使用 std::lock_guard 简化锁的管理,它会在作用域结束时自动解锁。
  5. 条件变量谓词:等待条件现在包括队列非空或生产已完成,这样消费者既能处理数据又能及时退出。

其他优化建议:

  • 如果生产者和消费者处理速度不匹配,可以考虑使用多个消费者线程或调整通知策略。
  • 对于高性能场景,可以考虑使用双缓冲区或无锁队列,但代码会更复杂。
  • 输出操作(如 std::cout)可能成为性能瓶颈,因为在多线程中输出需要同步。如果不需要实时输出,可以考虑将输出日志到文件或减少输出频率。

这个优化后的代码确保了生产者和消费者的正确同步,并能正常退出。

优化后的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>

std::queue<int> g_queue;
std::mutex g_mutex;
std::condition_variable g_cv;
bool g_done = false; // 标志生产者是否完成

void producer() {
for (int i = 0; i <= 10000; i++) {
{
std::lock_guard<std::mutex> lk(g_mutex);
g_queue.push(i);
std::cout << "producer:" << i << std::endl;
} // lock_guard 自动解锁
g_cv.notify_one(); // 通知消费者
}
{
std::lock_guard<std::mutex> lk(g_mutex);
g_done = true; // 生产完成
}
g_cv.notify_one(); // 最后通知一次,确保消费者退出
}

void consumer() {
while (true) {
std::unique_lock<std::mutex> lk(g_mutex);
// 等待条件:队列非空或生产已完成
g_cv.wait(lk, []() {
return !g_queue.empty() || g_done;
});
// 如果队列为空且生产已完成,则退出
if (g_queue.empty() && g_done) {
break;
}
// 取出数据
if (!g_queue.empty()) {
int a = g_queue.front();
g_queue.pop();
std::cout << "consumer:" << a << std::endl;
}
lk.unlock(); // 解锁,允许其他操作
}
}

int main() {
std::thread thread1(producer);
std::thread thread2(consumer);

thread1.join();
thread2.join();

std::cout << "Main thread: all done!" << std::endl;
return 0;
}

为什么生产者消费者模型中要使用条件变量呢

非常好的问题!这是理解多线程编程核心思想的关键。

在生产者-消费者模型中必须使用条件变量,主要原因是为了解决「忙等待」问题,实现真正的高效线程同步。让我们通过对比来理解为什么。

没有条件变量会怎样?(「忙等待」方案)

如果不使用条件变量,消费者线程可能会这样写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 错误示例:忙等待 (Busy Waiting)
void consumer() {
while (true) {
std::unique_lock<std::mutex> lk(g_mutex);
if (!g_queue.empty()) { // 检查队列是否为空
int a = g_queue.front();
g_queue.pop();
lk.unlock();
std::cout << "consumer:" << a << std::endl;
} else {
lk.unlock(); // 解锁
// 空循环,不断检查队列状态 --> 这就是「忙等待」
std::this_thread::yield(); // 稍微好一点,但本质相同
}
}
}

这种方式的致命缺点:

  1. CPU资源浪费:在队列为空时,消费者线程仍在疯狂循环,不断检查队列状态。这会占用大量CPU时间,导致CPU使用率飙升(可能接近100%),而这些CPU周期本可以用来做有用工作或节能。

  2. 性能低下:频繁的加锁、解锁操作以及缓存失效会严重影响性能。

  3. 响应延迟:即使生产者生产了数据,消费者也需要等到下一次循环检查才能发现,无法立即响应。

条件变量如何优雅地解决这个问题?

条件变量提供了「等待-通知」机制,完美解决了上述问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 正确示例:使用条件变量
void consumer() {
while (true) {
std::unique_lock<std::mutex> lk(g_mutex);
// 优雅地等待:在条件不满足时主动休眠
g_cv.wait(lk, []() { return !g_queue.empty() || g_done; });

if (g_queue.empty() && g_done) break;

int a = g_queue.front();
g_queue.pop();
lk.unlock();
std::cout << "consumer:" << a << std::endl;
}
}

条件变量的核心优势:

  1. 零CPU消耗的等待:当条件不满足时(队列为空),g_cv.wait()自动释放互斥锁并将线程置于休眠状态。此时线程不占用任何CPU时间,操作系统可以调度其他线程运行。

  2. 即时响应:当生产者调用 g_cv.notify_one()g_cv.notify_all() 时,操作系统会立即唤醒等待的消费者线程。消费者被唤醒后会自动重新获取互斥锁并检查条件。

  3. 避免竞争条件wait 操作是原子性的——它同时完成了「解锁」和「进入等待状态」两个操作,这防止了生产者在这两个操作之间发出通知而导致通知丢失的竞态条件。

关键机制:防止「虚假唤醒」

您可能注意到等待条件写成了lambda表达式:

1
g_cv.wait(lk, []() { return !g_queue.empty(); });

而不仅仅是:

1
g_cv.wait(lk); // 不推荐这样使用

这是因为某些操作系统实现中,线程可能会被虚假唤醒(spurious wakeup)——即没有收到通知也被唤醒。使用带条件的wait可以确保即使发生虚假唤醒,线程也会重新检查条件是否真正满足,如果条件不满足,它会继续等待。

总结:为什么必须使用条件变量?

方面 忙等待 (Busy Waiting) 条件变量 (Condition Variable)
CPU使用率 高(接近100%) 低(等待时接近0%)
能效 差(浪费电力) 好(节能)
响应速度 有延迟(取决于检查频率) 即时(由OS调度)
系统负载 高(增加系统负担) 低(与系统其他部分和谐共存)
代码质量 差(低效循环) 好(使用OS原生同步机制)

因此,条件变量是生产者-消费者模型的「完美搭档」,它实现了:

  • 高效同步:消费者在无数据时休眠,不浪费CPU
  • 即时响应:有数据时立即被唤醒处理
  • 线程安全:与互斥锁配合保证对共享资源的互斥访问

这就是为什么在任何正经的多线程编程中,条件变量都是实现线程间同步的首选机制,而不是低效的忙等待。

为什么生产者只使用 notify_one 而不使用 wait

这是一个很好的问题,它触及了生产者-消费者模型的核心设计原则。简单回答是:因为生产者和消费者面临的问题本质不同

根本原因:角色不对称性

在生产者和消费者模型中,两个线程的角色和面临的问题是不对称的:

消费者的困境:被动等待

消费者面临的问题是:”当没有数据时,我该怎么办?

  • 消费者不能凭空消费,必须等待数据可用
  • 如果使用忙等待(不断检查),会浪费CPU资源
  • 因此需要一种机制让消费者在无数据时休眠,有数据时被唤醒

这就是消费者需要 wait() 的原因。

生产者的优势:主动创造

生产者面临的问题是:”我生产了数据,如何通知消费者?

  • 生产者可以随时创造数据,不需要等待任何条件(除非缓冲区满)
  • 生产者的工作不是被动的,而是主动的
  • 生产者只需要在创建数据后通知可能的等待者

这就是生产者只需要 notify_one() 的原因。

深入分析

1. 生产者不需要等待(在简单模型中)

在您提供的简单模型中:

  • 队列是无界的(可以无限增长)
  • 生产者可以一直生产,不需要等待消费者消费
  • 生产者永远不会被阻塞(除非内存耗尽)

因此,生产者不需要调用 wait(),因为它永远不会需要等待什么条件。

2. 生产者也需要等待的情况

在实际应用中,如果队列有大小限制(有界缓冲区),那么生产者也需要等待:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>

const int MAX_QUEUE_SIZE = 100; // 队列最大容量

std::queue<int> g_queue;
std::mutex g_mutex;
std::condition_variable g_cv_not_empty; // 消费者等待"非空"
std::condition_variable g_cv_not_full; // 生产者等待"非满"
bool g_done = false;

void producer() {
for (int i = 0; i <= 10000; i++) {
std::unique_lock<std::mutex> lk(g_mutex);

// 生产者需要等待:队列未满
g_cv_not_full.wait(lk, []() {
return g_queue.size() < MAX_QUEUE_SIZE;
});

g_queue.push(i);
std::cout << "producer:" << i << std::endl;
lk.unlock();

// 通知消费者:队列非空
g_cv_not_empty.notify_one();
}

{
std::lock_guard<std::mutex> lk(g_mutex);
g_done = true;
}
g_cv_not_empty.notify_one(); // 最后通知一次
}

void consumer() {
while (true) {
std::unique_lock<std::mutex> lk(g_mutex);

// 消费者等待:队列非空或生产完成
g_cv_not_empty.wait(lk, []() {
return !g_queue.empty() || g_done;
});

if (g_queue.empty() && g_done) {
break;
}

if (!g_queue.empty()) {
int a = g_queue.front();
g_queue.pop();
std::cout << "consumer:" << a << std::endl;
}
lk.unlock();

// 通知生产者:队列未满
g_cv_not_full.notify_one();
}
}

int main() {
std::thread thread1(producer);
std::thread thread2(consumer);

thread1.join();
thread2.join();

return 0;
}

在这个有界缓冲区版本中:

  • 生产者需要等待队列未满 (g_cv_not_full.wait())
  • 消费者需要等待队列非空 (g_cv_not_empty.wait())
  • 生产者放入数据后通知消费者 (g_cv_not_empty.notify_one())
  • 消费者取出数据后通知生产者 (g_cv_not_full.notify_one())

总结

  1. 在无界缓冲区模型中(您的原始代码):

    • 生产者不需要等待,只需要通知
    • 消费者需要等待,也需要被通知
  2. 在有界缓冲区模型中

    • 生产者和消费者都需要等待和通知
    • 但等待的条件不同:生产者等待”非满”,消费者等待”非空”
  3. 核心原则

    • 需要等待的一方使用 wait()
    • 能够改变条件的一方使用 notify()
    • 在简单生产者-消费者模型中,生产者创造条件(数据),消费者等待条件(数据可用)

所以,您的原始代码中生产者只使用 notify_one() 而不使用 wait() 是正确的,因为它反映了无界缓冲区的基本特性。只有在引入缓冲区大小限制后,生产者才需要等待。

为什么使用同一个条件变量可能有问题

您提供的代码确实使用了同一个条件变量 g_cv 来处理生产者和消费者的等待条件。这种设计在某些情况下可以工作,但存在一些潜在问题和效率问题。让我详细解释一下。

代码分析

在您的代码中:

  • 生产者等待的条件是:g_queue.size() < MAX_QUEUE_SIZE(队列未满)
  • 消费者等待的条件是:!g_queue.empty() || g_done(队列非空或生产完成)
  • 两者使用同一个条件变量 g_cv 进行等待和通知

潜在问题

1. 不必要的唤醒(效率问题)

当生产者调用 g_cv.notify_one() 时,它可能唤醒:

  • 一个等待的消费者(这是我们希望的)
  • 或者一个等待的生产者(这不是我们希望的)

同样,当消费者调用 g_cv.notify_one() 时,它可能唤醒:

  • 一个等待的生产者(这是我们希望的)
  • 或者一个等待的消费者(这不是我们希望的)

这种”错误”的唤醒会导致线程被唤醒后检查条件,发现条件不满足,然后再次进入等待状态。这造成了不必要的上下文切换,降低了程序效率。

2. 可能导致的死锁情况

在某些边缘情况下,使用单一条件变量可能导致死锁。例如:

  1. 队列已满,生产者等待
  2. 消费者消费一个项目后调用 g_cv.notify_one()
  3. 但通知唤醒了另一个消费者而不是生产者
  4. 另一个消费者发现队列为空(或几乎为空),也进入等待
  5. 现在所有线程都在等待,没有人来唤醒生产者

虽然这种情况不太常见,但在高负载或特定时序下可能发生。

3. 可读性和维护性

使用两个独立的条件变量(一个用于”队列未满”,一个用于”队列非空”)使代码更清晰,更容易理解和维护。每个条件变量对应一个明确的等待条件。

为什么使用两个条件变量更好

使用两个条件变量的解决方案更加清晰和高效:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
std::condition_variable g_cv_not_full;  // 生产者等待的条件变量
std::condition_variable g_cv_not_empty; // 消费者等待的条件变量

void producer() {
for (int i = 0; i <= 10000; i++) {
std::unique_lock<std::mutex> lk(g_mutex);
// 只等待"队列未满"条件
g_cv_not_full.wait(lk, []() {
return g_queue.size() < MAX_QUEUE_SIZE;
});

g_queue.push(i);
std::cout << "producer:" << i << std::endl;
lk.unlock();

// 只通知可能等待"队列非空"的消费者
g_cv_not_empty.notify_one();
}

{
std::lock_guard<std::mutex> lk(g_mutex);
g_done = true;
}
// 通知消费者生产已完成
g_cv_not_empty.notify_one();
}

void consumer() {
while (true) {
std::unique_lock<std::mutex> lk(g_mutex);
// 只等待"队列非空或生产完成"条件
g_cv_not_empty.wait(lk, []() {
return !g_queue.empty() || g_done;
});

if (g_queue.empty() && g_done) {
break;
}

if (!g_queue.empty()) {
int a = g_queue.front();
g_queue.pop();
std::cout << "consumer:" << a << std::endl;

// 消费后通知可能等待"队列未满"的生产者
g_cv_not_full.notify_one();
}
lk.unlock();
}
}

结论

您的代码使用单一条件变量在逻辑上是正确的,因为条件变量的等待操作包含了条件检查(通过lambda表达式),可以防止错误操作。

但是,这种设计效率较低,因为它可能导致不必要的线程唤醒和上下文切换。在生产环境中,特别是高性能要求的场景下,使用两个独立的条件变量是更好的选择。

简单来说:

  • 单一条件变量:能工作,但效率可能不高
  • 两个条件变量:更高效,代码更清晰,是推荐的做法

您的代码本身没有逻辑错误,但在实际应用中可能会遇到性能问题。如果您只是学习或小规模使用,单一条件变量是可以接受的;但对于生产环境或高性能需求,建议使用两个条件变量。