同步
Critical-Selection Problem
Example
total是共享变量, 内存中, 所以需要临时的寄存器

跟DB里的问题类似。
只能被至多一个用户占有的资源为临界资源。而程序中访问临界资源的代码段,我们称之为临界区段(critical section, CS)
- Mutual exclusion - 没有两个进程可以同时在运行 critical section。
- Progress - 系统整体上是在运行的,即要么有进程在运行它的 critical section,要么没有任何进程想要(将要,即在运行 critical section 之前的 section)进入 critical section,要么在有限时间内将有一个进程被选中进入它的 critical section。
- Bounded waiting - 任何一个进程等待进入 critical section 的时间是有限的。
Peterson's solution
算法
// `i` is 0 or 1, indicating current pid, while `j` is another pid.
process(i) {
j = 1 - i;
READY[i] = true; // ┐
TURN = j; // │
while (READY[j] && TURN == j) {} // ├ entry section
// i.e. wait until: // │
// (1) j exits, // │
// (2) j is slower, so it // │
// should run now. // ┘
/* operate critical resources */ // - critical section
READY[i] = false; // - exit section
/* other things */ // - remainder section
}
想法是利用TURN被竞争地去写的性质。严格证明
依赖于READY[i] = true;总在TURN=j 先执行的性质。但是编译器可能会重排这两个语句。
Memory barrier
Atomic Inst.
利用特殊的硬件指令。
test_and_set
把指令抽象为如下的代码:
返回target原来的值,并将target设为true利用test and set进行进程同步。如果lock是false,就说明没被占用,进入临界区域(同时lock被set为true)。完成后把lock设为false
while (true) {
while (test_and_set(&lock)) ; /* do nothing */
/* Critical Section */
/* Exit Section */
lock = false;
/* Remainder Section */
}
缺点:
- 可能饥饿: 因为等待进程中任意选择一个进入临界区。因此不满足bounded-waiting
- 可能死锁
while (true) {
/* Entry Section */
waiting[i] = true;
while (waiting[i] && test_and_set(&lock)) ; /* do nothing */
waiting[i] = false;
/* Critical Section */
/* Exit Section */
j = (i + 1) % n;
while ((j != i) && !waiting[j])) j = (j + 1) % n;
//找到第一个在等待的进程
if (j == i) lock = false; //如果没有在等的,就释放锁
else waiting[j] = false; //让这个等待状态的进程进入CS
/* Remainder Section */
}
每一个进程至多等待 n-1 个进程在其前面进入 CS,满足了 bounded waiting 条件。
- 会有busy-waiting的问题
mutex&spinlock
- 会有busy-waiting的问题 等待的时候会占用CPU
while (true) {
acquire();
/* critical section */
release();
/* remainder section */
}
/* ------- */
void acquire() {
while (!available)
; /* busy waiting */
avaliable = false;
}
void release() {
avaliable = true;
}
semaphore (信号量)
一个 semaphore S 是一个整型变量,它除了初始化外只能通过两个 atomic 操作 wait() 和 signal() (原称为 P() 和 V() )来访问。
- 初始值代表最多可用的资源数
- S>0,代表还有S个资源可用
-
S<0,代表有|S|个进程在排队
-
计数型信号量
- 二值信号量 0/1
Example
5个并发进程,信号量S初始值为3,问取值范围 答: [-2,3]
为了避免busywaiting。每个信号量添加一个waiting queue
wait(S){
S->value--;
if(S->value<0){
//把进程加入S->queue,进程进入阻塞
}
}
signal(S){
S->value++;
if(S->value<=0){
//从S->queue中取出一个进程P,重新启动P的执行
}
}
死锁:

10.31 下课前review questions
经典同步问题
Bounded buffer
给定两个进程:producer 和 consumer,它们共用大小为BUF_SIZE 的 buffer。Producer 生产数据放入 buffer,consumer 从 buffer 取出数据从而使用之。
三个信号量
- mutex 二值0/1 控制对缓冲区的访问
- full 代表缓冲区中空闲个数,<BUF_SIZE的时候才能放
- empty 代表缓冲区中非空闲位置个数, >0的时候才能取
因为信号量显然只能实现单方向的比较
semaphore mutex = 1; //互斥锁
semaphore nempty = BUFFER_SIZE; //buffer中空位的个数
semaphore nfull = 0; //buffer中数据的个数
producer(){
while(1){
wait(nempty);//如果空位个数<=0,就等待
//否则nempty-- 方一个
wait(mutex);
add_to_buffer(next_produced);
signal(mutex);
signal(nfull);//nfull++
}
}
consumer({
while(1){
wait(nfull);//如果nfull=0 说明buffer为空,不能取
//否则nfull-- 取1个
wait(mutex);
remove_from_buffer();
signal(mutex);
signal(nempty);//空闲位置多了1个
}
})
wait的顺序:
- full的 empty变为n-1, full变成1
Readers-Writers
多个reader可以同时读, 但是writer写的时候不能有其他writer和reader
同时被获取也同时被释放,因此其实我们可以合并为一个信号量,不妨叫做 write_lock
- write_lock 二值0/1 避免读写、写写冲突
- reader_count_lock 用来读写读者个数
- reader_count (不是信号量,是普通变量,读者个数)
semaphore write_lock = 1;
int reader_count = 0;
semaphore reader_count_lock = 1;
writer(){
while(1){
wait(write_lock);//没有人在读
write();
signal(write_lock);
}
}
reader() {
while (true) {
wait(reader_count_lock); //用来保护reader_count
reader_count++;
if (reader_count == 1) wait(write_lock);
//如果有读者,就lock,不能写了。=1防止重复lock
signal(reader_count_lock);
read();
wait(reader_count_lock);
reader_count--;
if (reader_count == 0) signal(write_lock);
//如果没有读者了,才能释放write_lock
signal(reader_count_lock);
}
}
Dining Philosopher
一个朴素的解法是这样的:
| 1 2 3 4 5 6 7 8 9 10 11 12 |
vector<semaphore> chopstick(5, 1); // initialize semaphores to all 1 philosopher(int index) { while (true) { wait(chopstick[i]); wait(chopstick[(i + 1) % 5]); eat(); signal(chopstick[i]); signal(chopstick[(i + 1) % 5]); think(); } } |
问题是,可能某时刻每个人同时拿起左边的筷子,这样会导致死锁。
解决方案之一是,只允许同时拿起两根筷子;这种方案的实现是,轮流询问每个人是否能够拿起两根筷子,如果能则拿起,如果不能则需要等待那些筷子放下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
vector<semaphore> chopstick(5, 1); // initialize semaphores to all 1 semaphore lock = 1; philosopher(int index) { while (true) { wait(lock); wait(chopstick[i]); wait(chopstick[(i + 1) % 5]); signal(lock); eat(); signal(chopstick[i]); signal(chopstick[(i + 1) % 5]); think(); } } |
另一种解决方案是,奇数号人先拿左边筷子,偶数号人先拿右边筷子,这样也能避免死锁。