AMQ-2045 Add consistency checks to Valve and balance usage in VMTransport

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@729835 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
David Jencks 2008-12-29 07:56:09 +00:00
parent 3b374365c1
commit e585aaec2f
2 changed files with 18 additions and 4 deletions

View File

@ -36,7 +36,7 @@ public final class Valve {
/** /**
* Turns the valve on. This method blocks until the valve is off. * Turns the valve on. This method blocks until the valve is off.
* *
* @throws InterruptedException * @throws InterruptedException if wait is interrupted
*/ */
public void turnOn() throws InterruptedException { public void turnOn() throws InterruptedException {
synchronized (mutex) { synchronized (mutex) {
@ -58,10 +58,13 @@ public final class Valve {
* Turns the valve off. This method blocks until the valve is on and the * Turns the valve off. This method blocks until the valve is on and the
* valve is not in use. * valve is not in use.
* *
* @throws InterruptedException * @throws InterruptedException if wait is interrupted
*/ */
public void turnOff() throws InterruptedException { public void turnOff() throws InterruptedException {
synchronized (mutex) { synchronized (mutex) {
if (turningOff < 0) {
throw new IllegalStateException("Unbalanced turningOff: " + turningOff);
}
try { try {
++turningOff; ++turningOff;
while (usage > 0 || !on) { while (usage > 0 || !on) {
@ -79,10 +82,16 @@ public final class Valve {
* Increments the use counter of the valve. This method blocks if the valve * Increments the use counter of the valve. This method blocks if the valve
* is off, or is being turned off. * is off, or is being turned off.
* *
* @throws InterruptedException * @throws InterruptedException if wait is interrupted
*/ */
public void increment() throws InterruptedException { public void increment() throws InterruptedException {
synchronized (mutex) { synchronized (mutex) {
if (turningOff < 0) {
throw new IllegalStateException("Unbalanced turningOff: " + turningOff);
}
if (usage < 0) {
throw new IllegalStateException("Unbalanced usage: " + usage);
}
// Do we have to wait for the value to be on? // Do we have to wait for the value to be on?
while (turningOff > 0 || !on) { while (turningOff > 0 || !on) {
mutex.wait(); mutex.wait();
@ -97,6 +106,12 @@ public final class Valve {
public void decrement() { public void decrement() {
synchronized (mutex) { synchronized (mutex) {
usage--; usage--;
if (turningOff < 0) {
throw new IllegalStateException("Unbalanced turningOff: " + turningOff);
}
if (usage < 0) {
throw new IllegalStateException("Unbalanced usage: " + usage);
}
if (turningOff > 0 && usage < 1) { if (turningOff > 0 && usage < 1) {
mutex.notifyAll(); mutex.notifyAll();
} }

View File

@ -94,7 +94,6 @@ public class VMTransport implements Transport, Task {
} else { } else {
transportListener = peer.transportListener; transportListener = peer.transportListener;
} }
enqueueValve.decrement();
} else { } else {
peer.getMessageQueue().put(command); peer.getMessageQueue().put(command);
} }