package JavaTest.lect11.a;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedQueue<T> {
ReentrantLock enqLock,deqLock;
Condition notEmptyCondition,notFullCondition;
AtomicInteger size;
volatile Node head,tail;
int capacity;
public BoundedQueue(int _capacity) {
capacity=_capacity;
head=new Node(null);
tail=head;
size=new AtomicInteger(0);
enqLock=new ReentrantLock();
notFullCondition=enqLock.newCondition();
deqLock=new ReentrantLock();
notEmptyCondition=deqLock.newCondition();
}
public void enq(T x) {
boolean mustWakeDequeuers=false;
enqLock.lock();
try {
while(size.get() == capacity)
try {
notFullCondition.await();
} catch(InterruptedException ex) {}
Node e=new Node(x);
tail.next=e; tail=e;
if(size.getAndIncrement()==0)
mustWakeDequeuers=true;
System.out.println("size: "+size.get());
} finally {
enqLock.unlock();
}
if(mustWakeDequeuers) {
deqLock.lock();
try {
notEmptyCondition.signalAll();
} finally {
deqLock.unlock();
}
}
}
public T deq() {
T result;
boolean mustWakeEnqueuers=false;
deqLock.lock();
try {
while(size.get()==0)
try {
notEmptyCondition.await();
} catch(InterruptedException ex) {}
System.out.println(head.next.value);
result=head.next.value;
head=head.next;
if(size.getAndDecrement()==capacity)
mustWakeEnqueuers=true;
System.out.println("size: "+size.get());
} finally {
deqLock.unlock();
}
if(mustWakeEnqueuers) {
enqLock.lock();
try {
notFullCondition.signalAll();
} finally {
enqLock.unlock();
}
}
return result;
}
protected class Node {
public T value;
public volatile Node next;
public Node(T x) {
value=x;
next=null;
}
}
}
package JavaTest.lect11.a;
public class TestThread extends Thread {
private BoundedQueue<String> bq;
private int i;
public TestThread(BoundedQueue<String> bq,int i) {
this.bq=bq;
this.i=i;
}
public void run() {
bq.enq("A"+i);
bq.enq("B"+i);
//bq.enq("C"+i);
//bq.enq("D"+i);
}
}
package JavaTest.lect11.a;
public class MainDequeue {
public static void main(String[] args) {
BoundedQueue<String> bq=new BoundedQueue<String>(4);
Thread t1=new TestThread(bq,1);
Thread t2=new TestThread(bq,2);
t1.start();
t2.start();
System.out.println(bq.deq());
System.out.println(bq.deq());
}
}