mirror of https://github.com/apache/activemq.git
[AMQ-6849] - fix sendFailIfNoSpaceAfterTimeout policy entry default value
This commit is contained in:
parent
6da08b245e
commit
8e576be1d9
|
@ -206,7 +206,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
|
if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
|
||||||
ExceptionResponse response = new ExceptionResponse(
|
ExceptionResponse response = new ExceptionResponse(
|
||||||
new ResourceAllocationException(
|
new ResourceAllocationException(
|
||||||
"Usage Manager Memory Limit reached. Stopping producer ("
|
"Usage Manager Memory Limit Wait Timeout. Stopping producer ("
|
||||||
+ timeout.message.getProducerId()
|
+ timeout.message.getProducerId()
|
||||||
+ ") to prevent flooding "
|
+ ") to prevent flooding "
|
||||||
+ getActiveMQDestination().getQualifiedName()
|
+ getActiveMQDestination().getQualifiedName()
|
||||||
|
|
|
@ -314,7 +314,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
||||||
if (sendFailIfNoSpace != -1) {
|
if (sendFailIfNoSpace != -1) {
|
||||||
destination.getSystemUsage().setSendFailIfNoSpace(isSendFailIfNoSpace());
|
destination.getSystemUsage().setSendFailIfNoSpace(isSendFailIfNoSpace());
|
||||||
}
|
}
|
||||||
if (sendFailIfNoSpaceAfterTimeout != 0) {
|
if (sendFailIfNoSpaceAfterTimeout != -1) {
|
||||||
destination.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(getSendFailIfNoSpaceAfterTimeout());
|
destination.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(getSendFailIfNoSpaceAfterTimeout());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,6 +66,8 @@ public class DuplicateFromStoreTest {
|
||||||
public static CountDownLatch consumersFinished = new CountDownLatch(NUM_CONSUMERS );
|
public static CountDownLatch consumersFinished = new CountDownLatch(NUM_CONSUMERS );
|
||||||
|
|
||||||
public AtomicInteger totalMessagesToSend = new AtomicInteger(NUM_MSGS);
|
public AtomicInteger totalMessagesToSend = new AtomicInteger(NUM_MSGS);
|
||||||
|
public AtomicInteger totalMessagesSent = new AtomicInteger(NUM_MSGS);
|
||||||
|
|
||||||
public AtomicInteger totalReceived = new AtomicInteger(0);
|
public AtomicInteger totalReceived = new AtomicInteger(0);
|
||||||
|
|
||||||
public int messageSize = 16*1000;
|
public int messageSize = 16*1000;
|
||||||
|
@ -92,7 +94,7 @@ public class DuplicateFromStoreTest {
|
||||||
|
|
||||||
// configure <systemUsage>
|
// configure <systemUsage>
|
||||||
MemoryUsage memoryUsage = new MemoryUsage();
|
MemoryUsage memoryUsage = new MemoryUsage();
|
||||||
memoryUsage.setPercentOfJvmHeap(70);
|
memoryUsage.setPercentOfJvmHeap(50);
|
||||||
|
|
||||||
StoreUsage storeUsage = new StoreUsage();
|
StoreUsage storeUsage = new StoreUsage();
|
||||||
storeUsage.setLimit(8 * 1024 * 1024 * 1024); // 8 gb
|
storeUsage.setLimit(8 * 1024 * 1024 * 1024); // 8 gb
|
||||||
|
@ -133,7 +135,7 @@ public class DuplicateFromStoreTest {
|
||||||
|
|
||||||
LOG.info("All producers and consumers got started. Awaiting their termination");
|
LOG.info("All producers and consumers got started. Awaiting their termination");
|
||||||
producersFinished.await(100, TimeUnit.MINUTES);
|
producersFinished.await(100, TimeUnit.MINUTES);
|
||||||
LOG.info("All producers have terminated.");
|
LOG.info("All producers have terminated. remaining to send: " + totalMessagesToSend.get() + ", sent:" + totalMessagesSent.get());
|
||||||
|
|
||||||
consumersFinished.await(100, TimeUnit.MINUTES);
|
consumersFinished.await(100, TimeUnit.MINUTES);
|
||||||
LOG.info("All consumers have terminated.");
|
LOG.info("All consumers have terminated.");
|
||||||
|
@ -232,6 +234,7 @@ public class DuplicateFromStoreTest {
|
||||||
// send message
|
// send message
|
||||||
while (totalMessagesToSend.decrementAndGet() >= 0) {
|
while (totalMessagesToSend.decrementAndGet() >= 0) {
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
|
totalMessagesSent.incrementAndGet();
|
||||||
log.debug("Sent message: " + counter);
|
log.debug("Sent message: " + counter);
|
||||||
counter++;
|
counter++;
|
||||||
|
|
||||||
|
@ -241,7 +244,7 @@ public class DuplicateFromStoreTest {
|
||||||
Thread.sleep(PRODUCER_SLEEP);
|
Thread.sleep(PRODUCER_SLEEP);
|
||||||
}
|
}
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
log.error(ex.getMessage());
|
log.error(ex.toString());
|
||||||
return;
|
return;
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
try {
|
||||||
|
@ -313,10 +316,10 @@ public class DuplicateFromStoreTest {
|
||||||
TextMessage textMessage = (TextMessage) message2;
|
TextMessage textMessage = (TextMessage) message2;
|
||||||
String text = textMessage.getText();
|
String text = textMessage.getText();
|
||||||
log.debug("Received: " + text.substring(0, 50));
|
log.debug("Received: " + text.substring(0, 50));
|
||||||
|
} else if (totalReceived.get() < NUM_MSGS) {
|
||||||
|
log.error("Received message of unsupported type. Expecting TextMessage. count: " + totalReceived.get());
|
||||||
} else {
|
} else {
|
||||||
if (totalReceived.get() < NUM_MSGS) {
|
// all done
|
||||||
log.error("Received message of unsupported type. Expecting TextMessage. " + message2);
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (message2 != null) {
|
if (message2 != null) {
|
||||||
|
|
Loading…
Reference in New Issue