生产者-消费者问题一般是,有一个缓冲区,它支持 put 和 take 方法。如 果试图在空的缓冲区上执行 take 操作,则在某一个项变得可用之前,线程将一 直阻塞;如果试图在满的缓冲区上执行 put 操作,则在有空间变得可用之前, 线程将一直阻塞。可以在单独的等待集合中保存 put 线程和 take 线程,这样就 可以在缓冲区中的项或空间变得可用时利用最佳规划,一次只通知一个线程。可 以使用两个 Condition 实例来做到这一点。
下面是缓冲区类 LockedBuffer,在这个类的 put 和 take 方法中使用了可重入 锁与条件变量:
package conditionlock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockedBuffer {
// 可重入锁
final Lock lock = new ReentrantLock();
// 两个条件对象
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
// 缓冲区
final Object[] items = new Object[10];
int putptr, takeptr, count;// 计数器
// 放数据操作,生产者调用该方法
public void put(Object x) throws InterruptedException {
lock.lock();
try {
// 如果缓冲区满了,则线程等待 while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length)
putptr = 0;
++count;
// 向消费者线程发送通知
notEmpty.signal();
} finally {
lock.unlock();
}
}
// 消费者线程调用该方法
public Object take() throws InterruptedException {
lock.lock();
try {
// 如果缓冲区空,则等待 while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length)
takeptr = 0;
--count;
// 通知其他生产者线程 notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
生产者:
package conditionlock;
//生产者
class Producer implements Runnable {
LockedBuffer buffer;
public Producer(LockedBuffer buf) {
buffer = buf;
}
public void run() {
char c;
for (int i = 0; i < 20; i++) {
c = (char) (Math.random() * 26 + 'A');
try {
// 向缓冲区放入数据
buffer.put(c);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("Produced: " + c);
try {
Thread.sleep((int) (Math.random() * 100));
} catch (InterruptedException e) {
}
}
}
}
消费者
package conditionlock;
//生产者
class Producer implements Runnable {
LockedBuffer buffer;
public Producer(LockedBuffer buf) {
buffer = buf;
}
public void run() {
char c;
for (int i = 0; i < 20; i++) {
c = (char) (Math.random() * 26 + 'A');
try {
// 向缓冲区放入数据
buffer.put(c);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("Produced: " + c);
try {
Thread.sleep((int) (Math.random() * 100));
} catch (InterruptedException e) {
}
}
}
}
测试类
package conditionlock;
public class LockConditionTest {
public static void main(String args[]) {
LockedBuffer stack = new LockedBuffer(); // 创建生产者,消费者
int count = 3;
Producer[] producers = new Producer[count];
Consumer[] consumers = new Consumer[count];
for (int i = 0; i < count; i++) {
producers[i] = new Producer(stack);
consumers[i] = new Consumer(stack);
}
for (int i = 0; i < count; i++) {
new Thread(producers[i]).start();
new Thread(consumers[i]).start();
}
}
}
程序运行结果如下:
Produced: Z
Consumed: Z
Produced: X
Consumed: X
…..
Produced: D
Produced: N
Produced: L
Produced: U
Produced: G
Produced: V
Consumed: Q
Produced: Q
Produced: U
Consumed: M
Produced: I
Consumed: D
….
Consumed: U
Produced: M
Consumed: G
Produced: P
Consumed: V
Produced: N
Consumed: Q
Produced: J
Consumed: U
Produced: L
……
Produced: Y
Consumed: O
Produced: E
Consumed: M
Produced: I
Consumed: P