mirror of https://github.com/apache/activemq.git
applied patch from John Heitmann, many thanks - which fixes AMQ-515
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@436748 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d21381d21b
commit
aa3b9acd5a
|
@ -184,6 +184,7 @@ public class Queue implements Destination {
|
|||
// while
|
||||
// removing up a subscription.
|
||||
dispatchValve.turnOff();
|
||||
|
||||
try {
|
||||
|
||||
synchronized (consumers) {
|
||||
|
@ -246,8 +247,13 @@ public class Queue implements Destination {
|
|||
|
||||
public void send(final ConnectionContext context, final Message message) throws Exception {
|
||||
|
||||
if (context.isProducerFlowControl())
|
||||
if (context.isProducerFlowControl()) {
|
||||
if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
|
||||
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
|
||||
} else {
|
||||
usageManager.waitForSpace();
|
||||
}
|
||||
}
|
||||
|
||||
message.setRegionDestination(this);
|
||||
|
||||
|
@ -607,6 +613,4 @@ public class Queue implements Destination {
|
|||
return false;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -232,8 +232,13 @@ public class Topic implements Destination {
|
|||
|
||||
public void send(final ConnectionContext context, final Message message) throws Exception {
|
||||
|
||||
if (context.isProducerFlowControl())
|
||||
usageManager.waitForSpace();
|
||||
if (context.isProducerFlowControl()) {
|
||||
if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
|
||||
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
|
||||
} else {
|
||||
usageManager.waitForSpace();
|
||||
}
|
||||
}
|
||||
|
||||
message.setRegionDestination(this);
|
||||
|
||||
|
|
|
@ -50,6 +50,11 @@ public class UsageManager {
|
|||
|
||||
private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList();
|
||||
|
||||
private boolean sendFailIfNoSpace;
|
||||
|
||||
/** True if someone called setSendFailIfNoSpace() on this particular usage manager */
|
||||
private boolean sendFailIfNoSpaceExplicitySet;
|
||||
|
||||
public UsageManager() {
|
||||
this(null);
|
||||
}
|
||||
|
@ -206,6 +211,22 @@ public class UsageManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets whether or not a send() should fail if there is no space free. The default
|
||||
* value is false which means to block the send() method until space becomes available
|
||||
*/
|
||||
public void setSendFailIfNoSpace(boolean failProducerIfNoSpace) {
|
||||
sendFailIfNoSpaceExplicitySet = true;
|
||||
this.sendFailIfNoSpace = failProducerIfNoSpace;
|
||||
}
|
||||
|
||||
public boolean isSendFailIfNoSpace() {
|
||||
if (sendFailIfNoSpaceExplicitySet || parent == null) {
|
||||
return sendFailIfNoSpace;
|
||||
} else {
|
||||
return parent.isSendFailIfNoSpace();
|
||||
}
|
||||
}
|
||||
|
||||
private void setPercentUsage(int value) {
|
||||
int oldValue = percentUsage;
|
||||
|
|
Loading…
Reference in New Issue