This closes #853

This commit is contained in:
Clebert Suconic 2016-10-21 09:33:52 -04:00
commit 0f9efa9223
9 changed files with 359 additions and 15 deletions

View File

@ -867,7 +867,10 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
if (newPeriod < MessageCounterManagerImpl.MIN_SAMPLE_PERIOD) {
throw ActiveMQMessageBundle.BUNDLE.invalidMessageCounterPeriod(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD);
if (newPeriod <= 0) {
throw ActiveMQMessageBundle.BUNDLE.periodMustGreaterThanZero(newPeriod);
}
ActiveMQServerLogger.LOGGER.invalidMessageCounterPeriod(newPeriod);
}
if (messageCounterManager != null && newPeriod != messageCounterManager.getSamplePeriod()) {

View File

@ -21,6 +21,9 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
public interface PageSubscriptionCounter {
//incremental counter of messages added
long getValueAdded();
long getValue();
void increment(Transaction tx, int add) throws Exception;

View File

@ -61,6 +61,8 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
private final AtomicLong value = new AtomicLong(0);
private final AtomicLong added = new AtomicLong(0);
private final AtomicLong pendingValue = new AtomicLong(0);
private final LinkedList<Long> incrementRecords = new LinkedList<>();
@ -92,6 +94,11 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
this.subscription = subscription;
}
@Override
public long getValueAdded() {
return added.get() + pendingValue.get();
}
@Override
public long getValue() {
return value.get() + pendingValue.get();
@ -205,6 +212,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
this.subscription.notEmpty();
}
this.value.set(value1);
this.added.set(value1);
this.recordID = recordID1;
}
@ -243,6 +251,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
recordID = -1;
value.set(0);
added.set(0);
incrementRecords.clear();
}
} finally {
@ -269,6 +278,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
for (Pair<Long, Integer> incElement : loadList) {
value.addAndGet(incElement.getB());
added.addAndGet(incElement.getB());
incrementRecords.add(incElement.getA());
}
loadList.clear();
@ -279,7 +289,9 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
@Override
public synchronized void addInc(long id, int variance) {
value.addAndGet(variance);
if (variance > 0) {
added.addAndGet(variance);
}
if (id >= 0) {
incrementRecords.add(id);
}

View File

@ -876,7 +876,6 @@ public class PagingStoreImpl implements PagingStore {
}
for (org.apache.activemq.artemis.core.server.Queue q : nonDurableQueues) {
q.getPageSubscription().getCounter().increment(tx, 1);
q.getPageSubscription().notEmpty();
ids[i++] = q.getID();
}

View File

@ -194,8 +194,8 @@ public interface ActiveMQMessageBundle {
@Message(id = 119046, value = "invalid value: {0} count must be greater than 0", format = Message.Format.MESSAGE_FORMAT)
IllegalArgumentException greaterThanZero(Integer count);
@Message(id = 119047, value = "Cannot set Message Counter Sample Period < {0}ms", format = Message.Format.MESSAGE_FORMAT)
IllegalArgumentException invalidMessageCounterPeriod(Long period);
@Message(id = 119047, value = "invalid value: {0} sample period must be greater than 0", format = Message.Format.MESSAGE_FORMAT)
IllegalArgumentException periodMustGreaterThanZero(Long newPeriod);
@Message(id = 119048, value = "invalid new Priority value: {0}. It must be between 0 and 9 (both included)", format = Message.Format.MESSAGE_FORMAT)
IllegalArgumentException invalidNewPriority(Integer period);

View File

@ -1516,4 +1516,9 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224069, value = "Change detected in broker configuration file, but reload failed", format = Message.Format.MESSAGE_FORMAT)
void configurationReloadFailed(@Cause Throwable t);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 224072, value = "Message Counter Sample Period too short: {0}", format = Message.Format.MESSAGE_FORMAT)
void invalidMessageCounterPeriod(long value);
}

View File

@ -518,7 +518,9 @@ public class QueueImpl implements Queue {
directDeliver = false;
messagesAdded++;
if (!ref.isPaged()) {
messagesAdded++;
}
}
@Override
@ -573,7 +575,9 @@ public class QueueImpl implements Queue {
protected boolean scheduleIfPossible(MessageReference ref) {
if (scheduledDeliveryHandler.checkAndSchedule(ref, true)) {
synchronized (this) {
messagesAdded++;
if (!ref.isPaged()) {
messagesAdded++;
}
}
return true;
@ -1165,7 +1169,7 @@ public class QueueImpl implements Queue {
@Override
public long getMessagesAdded() {
if (pageSubscription != null) {
return messagesAdded + pageSubscription.getCounter().getValue() - pagedReferences.get();
return messagesAdded + pageSubscription.getCounter().getValueAdded();
} else {
return messagesAdded;
}
@ -1819,7 +1823,10 @@ public class QueueImpl implements Queue {
while ((ref = intermediateMessageReferences.poll()) != null) {
internalAddTail(ref);
messagesAdded++;
if (!ref.isPaged()) {
messagesAdded++;
}
if (added++ > MAX_DELIVERIES_IN_LOOP) {
// if we just keep polling from the intermediate we could starve in case there's a sustained load
deliverAsync();

View File

@ -0,0 +1,318 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.MessageCounterInfo;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class ClusteredMessageCounterTest extends ClusterTestBase {
private AtomicInteger total = new AtomicInteger();
private AtomicBoolean stopFlag = new AtomicBoolean();
private Timer timer1 = new Timer();
private Timer timer2 = new Timer();
private int numMsg = 1000;
private List<MessageCounterInfo> results = new ArrayList<>();
@Override
@Before
public void setUp() throws Exception {
super.setUp();
setupServers();
setupClusters();
total.set(0);
stopFlag.set(false);
}
@Override
@After
public void tearDown() throws Exception {
timer1.cancel();
timer2.cancel();
super.tearDown();
}
protected void setupServers() throws Exception {
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());
}
protected void setupClusters() {
setupClusterConnection("cluster0", 0, 1, "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), false);
setupClusterConnection("cluster1", 1, 0, "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), false);
}
protected boolean isNetty() {
return true;
}
@Override
protected ConfigurationImpl createBasicConfig(final int serverID) {
ConfigurationImpl config = super.createBasicConfig(serverID);
Map<String, AddressSettings> addrSettingsMap = config.getAddressesSettings();
AddressSettings addrSettings = new AddressSettings();
addrSettings.setMaxSizeBytes(10 * 1024);
addrSettings.setPageSizeBytes(5 * 1024);
addrSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addrSettingsMap.put("queues", addrSettings);
if (serverID == 1) {
config.setMessageCounterEnabled(true);
}
return config;
}
@Test
public void testNonDurableMessageAddedWithPaging() throws Exception {
testMessageAddedWithPaging(false);
}
@Test
public void testDurableMessageAddedWithPaging() throws Exception {
testMessageAddedWithPaging(true);
}
//messages flow from one node to another, in paging mode
//check the messageAdded is correct.
private void testMessageAddedWithPaging(boolean durable) throws Exception {
startServers(0, 1);
numMsg = 100;
try {
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, "queues", "queue0", null, false);
createQueue(1, "queues", "queue0", null, false);
waitForBindings(1, "queues", 1, 0, true);
waitForBindings(0, "queues", 1, 0, false);
addConsumer(1, 1, "queue0", null);
System.out.println("sending.....");
send(0, "queues", numMsg, durable, null);
verifyReceiveAllOnSingleConsumer(true, numMsg, 1);
QueueControl control = (QueueControl) servers[1].getManagementService().getResource(ResourceNames.CORE_QUEUE + "queue0");
//wait up to 30sec to allow the counter get updated
long timeout = 30000;
while (timeout > 0 && (numMsg != control.getMessagesAdded())) {
Thread.sleep(1000);
timeout -= 1000;
}
assertEquals(numMsg, control.getMessagesAdded());
} finally {
stopServers(0, 1);
}
}
@Test
public void testMessageCounterWithPaging() throws Exception {
startServers(0, 1);
try {
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, "queues", "queue0", null, false);
createQueue(1, "queues", "queue0", null, false);
waitForBindings(1, "queues", 1, 0, true);
waitForBindings(0, "queues", 1, 0, false);
System.out.println("sending.....");
Thread sendThread = new Thread(new Runnable() {
@Override
public void run() {
try {
send(0, "queues", numMsg, true, null);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("messages sent.");
}
});
QueueControl control = (QueueControl) servers[1].getManagementService().getResource(ResourceNames.CORE_QUEUE + "queue0");
ActiveMQServerControl serverControl = (ActiveMQServerControl) servers[1].getManagementService().getResource(ResourceNames.CORE_SERVER);
serverControl.setMessageCounterSamplePeriod(300);
CountDownLatch resultLatch = new CountDownLatch(40);
MessageCounterCollector collector = new MessageCounterCollector(control, resultLatch);
timer1.schedule(collector, 0);
PeriodicalReceiver receiver = new PeriodicalReceiver(50, 1, 100);
timer2.schedule(receiver, 0);
sendThread.start();
try {
resultLatch.await(120, TimeUnit.SECONDS);
} finally {
stopFlag.set(true);
}
sendThread.join();
System.out.println("Results collected: " + results.size());
//checking
for (MessageCounterInfo info : results) {
assertTrue("countDelta should be positive " + info.getCountDelta() + dumpResults(results), info.getCountDelta() >= 0);
}
} finally {
timer1.cancel();
timer2.cancel();
stopServers(0, 1);
}
}
private String dumpResults(List<MessageCounterInfo> results) {
StringBuilder builder = new StringBuilder("\n");
for (int i = 0; i < results.size(); i++) {
builder.append("result[" + i + "]: " + results.get(i).getCountDelta() + " " + results.get(i).getCount() + "\n");
}
return builder.toString();
}
//Periodically read the counter
private class MessageCounterCollector extends TimerTask {
private QueueControl queueControl;
private CountDownLatch resultLatch;
MessageCounterCollector(QueueControl queueControl, CountDownLatch resultLatch) {
this.queueControl = queueControl;
this.resultLatch = resultLatch;
}
@Override
public void run() {
if (stopFlag.get()) {
return;
}
try {
String result = queueControl.listMessageCounter();
MessageCounterInfo info = MessageCounterInfo.fromJSON(result);
if (info.getCountDelta() != 0) {
System.out.println("non zero value got ---> " + info.getCountDelta());
}
results.add(info);
resultLatch.countDown();
if (info.getCountDelta() < 0) {
//stop and make the test finish quick
stopFlag.set(true);
while (resultLatch.getCount() > 0) {
resultLatch.countDown();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (!stopFlag.get()) {
timer1.schedule(new MessageCounterCollector(this.queueControl, resultLatch), 200);
}
}
}
}
//Peroidically receive a number of messages
private class PeriodicalReceiver extends TimerTask {
private int batchSize;
private int serverID;
private long period;
PeriodicalReceiver(int batchSize, int serverID, long period) {
this.batchSize = batchSize;
this.serverID = serverID;
this.period = period;
}
@Override
public void run() {
if (stopFlag.get()) {
return;
}
int num = 0;
ClientSessionFactory sf = sfs[serverID];
ClientSession session = null;
ClientConsumer consumer = null;
try {
session = sf.createSession(false, true, false);
consumer = session.createConsumer("queue0", null);
session.start();
for (; num < batchSize || stopFlag.get(); num++) {
ClientMessage message = consumer.receive(2000);
if (message == null) {
System.out.println("No more messages received!");
break;
}
message.acknowledge();
}
session.commit();
} catch (ActiveMQException e) {
e.printStackTrace();
} finally {
System.out.println("received messages: " + num);
if (consumer != null) {
try {
consumer.close();
} catch (ActiveMQException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (ActiveMQException e) {
e.printStackTrace();
}
}
//we only receive (numMsg - 200) to avoid the paging being cleaned up
//when all paged messages are consumed.
if (!stopFlag.get() && total.addAndGet(num) < numMsg - 200) {
System.out.println("go for another batch " + total.get());
timer2.schedule(new PeriodicalReceiver(this.batchSize, this.serverID, this.period), period);
}
}
}
}
}

View File

@ -414,13 +414,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
} catch (Exception e) {
}
try {
serverControl.setMessageCounterSamplePeriod(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD - 1);
Assert.fail();
} catch (Exception e) {
}
//this only gets warning now and won't cause exception.
serverControl.setMessageCounterSamplePeriod(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD - 1);
Assert.assertEquals(newSample, serverControl.getMessageCounterSamplePeriod());
Assert.assertEquals(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD - 1, serverControl.getMessageCounterSamplePeriod());
}
protected void restartServer() throws Exception {