This closes #3666
This commit is contained in:
commit
96f399ce21
|
@ -31,7 +31,7 @@ public enum QueueField {
|
||||||
DELIVERING_COUNT("deliveringCount"),
|
DELIVERING_COUNT("deliveringCount"),
|
||||||
MESSAGES_ADDED("messagesAdded"),
|
MESSAGES_ADDED("messagesAdded"),
|
||||||
MESSAGES_ACKED("messagesAcked"),
|
MESSAGES_ACKED("messagesAcked"),
|
||||||
RATE("rate"),
|
MESSAGES_EXPIRED("messagesExpired"),
|
||||||
ROUTING_TYPE("routingType"),
|
ROUTING_TYPE("routingType"),
|
||||||
USER("user"),
|
USER("user"),
|
||||||
AUTO_CREATED("autoCreated"),
|
AUTO_CREATED("autoCreated"),
|
||||||
|
|
|
@ -49,7 +49,6 @@ public class QueueView extends ActiveMQAbstractView<QueueControl> {
|
||||||
.add(QueueField.NAME.getName(), toString(queue.getName()))
|
.add(QueueField.NAME.getName(), toString(queue.getName()))
|
||||||
.add(QueueField.ADDRESS.getName(), toString(queue.getAddress()))
|
.add(QueueField.ADDRESS.getName(), toString(queue.getAddress()))
|
||||||
.add(QueueField.FILTER.getName(), toString(queue.getFilter()))
|
.add(QueueField.FILTER.getName(), toString(queue.getFilter()))
|
||||||
.add(QueueField.RATE.getName(), toString(q.getRate()))
|
|
||||||
.add(QueueField.DURABLE.getName(), toString(queue.isDurable()))
|
.add(QueueField.DURABLE.getName(), toString(queue.isDurable()))
|
||||||
.add(QueueField.PAUSED.getName(), toString(q.isPaused()))
|
.add(QueueField.PAUSED.getName(), toString(q.isPaused()))
|
||||||
.add(QueueField.TEMPORARY.getName(), toString(queue.isTemporary()))
|
.add(QueueField.TEMPORARY.getName(), toString(queue.isTemporary()))
|
||||||
|
@ -62,6 +61,7 @@ public class QueueView extends ActiveMQAbstractView<QueueControl> {
|
||||||
.add(QueueField.MESSAGES_ADDED.getName(), toString(queue.getMessagesAdded()))
|
.add(QueueField.MESSAGES_ADDED.getName(), toString(queue.getMessagesAdded()))
|
||||||
.add(QueueField.MESSAGE_COUNT.getName(), toString(queue.getMessageCount()))
|
.add(QueueField.MESSAGE_COUNT.getName(), toString(queue.getMessageCount()))
|
||||||
.add(QueueField.MESSAGES_ACKED.getName(), toString(queue.getMessagesAcknowledged()))
|
.add(QueueField.MESSAGES_ACKED.getName(), toString(queue.getMessagesAcknowledged()))
|
||||||
|
.add(QueueField.MESSAGES_EXPIRED.getName(), toString(queue.getMessagesExpired()))
|
||||||
.add(QueueField.DELIVERING_COUNT.getName(), toString(queue.getDeliveringCount()))
|
.add(QueueField.DELIVERING_COUNT.getName(), toString(queue.getDeliveringCount()))
|
||||||
.add(QueueField.MESSAGES_KILLED.getName(), toString(queue.getMessagesKilled()))
|
.add(QueueField.MESSAGES_KILLED.getName(), toString(queue.getMessagesKilled()))
|
||||||
.add(QueueField.DIRECT_DELIVER.getName(), toString(q.isDirectDeliver()))
|
.add(QueueField.DIRECT_DELIVER.getName(), toString(q.isDirectDeliver()))
|
||||||
|
@ -95,8 +95,6 @@ public class QueueView extends ActiveMQAbstractView<QueueControl> {
|
||||||
return queue.getAddress();
|
return queue.getAddress();
|
||||||
case FILTER:
|
case FILTER:
|
||||||
return queue.getFilter();
|
return queue.getFilter();
|
||||||
case RATE:
|
|
||||||
return q.getRate();
|
|
||||||
case DURABLE:
|
case DURABLE:
|
||||||
return queue.isDurable();
|
return queue.isDurable();
|
||||||
case PAUSED:
|
case PAUSED:
|
||||||
|
@ -121,6 +119,8 @@ public class QueueView extends ActiveMQAbstractView<QueueControl> {
|
||||||
return queue.getMessageCount();
|
return queue.getMessageCount();
|
||||||
case MESSAGES_ACKED:
|
case MESSAGES_ACKED:
|
||||||
return queue.getMessagesAcknowledged();
|
return queue.getMessagesAcknowledged();
|
||||||
|
case MESSAGES_EXPIRED:
|
||||||
|
return queue.getMessagesExpired();
|
||||||
case DELIVERING_COUNT:
|
case DELIVERING_COUNT:
|
||||||
return queue.getDeliveringCount();
|
return queue.getDeliveringCount();
|
||||||
case MESSAGES_KILLED:
|
case MESSAGES_KILLED:
|
||||||
|
|
|
@ -68,7 +68,7 @@ public class QueueFilterPredicate extends ActiveMQFilterPredicate<QueueControl>
|
||||||
return matches(queue.getMessagesAdded());
|
return matches(queue.getMessagesAdded());
|
||||||
case MESSAGES_ACKED:
|
case MESSAGES_ACKED:
|
||||||
return matches(queue.getMessagesAcknowledged());
|
return matches(queue.getMessagesAcknowledged());
|
||||||
case RATE:
|
case MESSAGES_EXPIRED:
|
||||||
return matches(queue.getMessagesExpired());
|
return matches(queue.getMessagesExpired());
|
||||||
case ROUTING_TYPE:
|
case ROUTING_TYPE:
|
||||||
return matches(queue.getRoutingType());
|
return matches(queue.getRoutingType());
|
||||||
|
|
|
@ -486,8 +486,6 @@ public interface Queue extends Bindable,CriticalComponent {
|
||||||
|
|
||||||
void postAcknowledge(MessageReference ref, AckReason reason);
|
void postAcknowledge(MessageReference ref, AckReason reason);
|
||||||
|
|
||||||
float getRate();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the user associated with this queue
|
* @return the user associated with this queue
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -4033,8 +4033,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
messagesKilled.set(0);
|
messagesKilled.set(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private float getRate() {
|
||||||
public float getRate() {
|
|
||||||
long locaMessageAdded = getMessagesAdded();
|
long locaMessageAdded = getMessagesAdded();
|
||||||
float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
|
float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
|
||||||
if (timeSlice == 0) {
|
if (timeSlice == 0) {
|
||||||
|
|
|
@ -1631,11 +1631,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public float getRate() {
|
|
||||||
return 0.0f;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SimpleString getUser() {
|
public SimpleString getUser() {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -962,11 +962,6 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
||||||
public void postAcknowledge(MessageReference ref, AckReason reason) {
|
public void postAcknowledge(MessageReference ref, AckReason reason) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public float getRate() {
|
|
||||||
return 0.0f;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SimpleString getUser() {
|
public SimpleString getUser() {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.unit.core.server.impl;
|
package org.apache.activemq.artemis.tests.unit.core.server.impl;
|
||||||
|
|
||||||
|
import java.lang.reflect.Method;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
@ -210,7 +211,7 @@ public class QueueImplTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRate() throws InterruptedException {
|
public void testRate() throws Exception {
|
||||||
QueueImpl queue = getTemporaryQueue();
|
QueueImpl queue = getTemporaryQueue();
|
||||||
|
|
||||||
final int numMessages = 10;
|
final int numMessages = 10;
|
||||||
|
@ -223,7 +224,10 @@ public class QueueImplTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
|
|
||||||
float rate = queue.getRate();
|
Method getRate = QueueImpl.class.getDeclaredMethod("getRate", null);
|
||||||
|
getRate.setAccessible(true);
|
||||||
|
float rate = (float) getRate.invoke(queue, null);
|
||||||
|
|
||||||
Assert.assertTrue(rate <= 10.0f);
|
Assert.assertTrue(rate <= 10.0f);
|
||||||
log.debug("Rate: " + rate);
|
log.debug("Rate: " + rate);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue