This closes #884
This commit is contained in:
commit
efc576c0c8
|
@ -64,9 +64,9 @@ ${connector-config.settings}
|
|||
<global-max-size>104857600</global-max-size>
|
||||
|
||||
<acceptors>
|
||||
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
|
||||
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
|
||||
<acceptor name="artemis">tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
|
||||
<!-- Acceptor for every supported protocol -->
|
||||
<acceptor name="artemis">tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE</acceptor>
|
||||
|
||||
${amqp-acceptor}${stomp-acceptor}${hornetq-acceptor}${mqtt-acceptor}
|
||||
</acceptors>
|
||||
|
||||
|
|
|
@ -681,7 +681,11 @@ public class PagingStoreImpl implements PagingStore {
|
|||
}
|
||||
|
||||
if (!blocking.get()) {
|
||||
ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize);
|
||||
if (pagingManager.isDiskFull()) {
|
||||
ActiveMQServerLogger.LOGGER.blockingDiskFull(address);
|
||||
} else {
|
||||
ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize);
|
||||
}
|
||||
blocking.set(true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1261,6 +1261,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
|||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void diskCapacityRestored();
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 222212, value = "Disk Full! Blocking message production on address ''{0}''. Clients will report blocked.", format = Message.Format.MESSAGE_FORMAT)
|
||||
void blockingDiskFull(SimpleString addressName);
|
||||
|
||||
|
||||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
|
||||
void initializationError(@Cause Throwable e);
|
||||
|
|
|
@ -1053,59 +1053,51 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
logger.debug("stopping bridge " + BridgeImpl.this);
|
||||
queue.removeConsumer(BridgeImpl.this);
|
||||
logger.debug("stopping bridge " + BridgeImpl.this);
|
||||
queue.removeConsumer(BridgeImpl.this);
|
||||
|
||||
if (!pendingAcks.await(10, TimeUnit.SECONDS)) {
|
||||
ActiveMQServerLogger.LOGGER.timedOutWaitingCompletions(BridgeImpl.this.toString(), pendingAcks.getCount());
|
||||
}
|
||||
synchronized (BridgeImpl.this) {
|
||||
logger.debug("Closing Session for bridge " + BridgeImpl.this.name);
|
||||
|
||||
synchronized (BridgeImpl.this) {
|
||||
logger.debug("Closing Session for bridge " + BridgeImpl.this.name);
|
||||
started = false;
|
||||
|
||||
started = false;
|
||||
active = false;
|
||||
|
||||
active = false;
|
||||
|
||||
}
|
||||
|
||||
if (session != null) {
|
||||
logger.debug("Cleaning up session " + session);
|
||||
session.removeFailureListener(BridgeImpl.this);
|
||||
try {
|
||||
session.close();
|
||||
session = null;
|
||||
} catch (ActiveMQException dontcare) {
|
||||
}
|
||||
}
|
||||
|
||||
if (sessionConsumer != null) {
|
||||
logger.debug("Cleaning up session " + session);
|
||||
try {
|
||||
sessionConsumer.close();
|
||||
sessionConsumer = null;
|
||||
} catch (ActiveMQException dontcare) {
|
||||
}
|
||||
}
|
||||
|
||||
internalCancelReferences();
|
||||
|
||||
if (csf != null) {
|
||||
csf.cleanup();
|
||||
}
|
||||
|
||||
synchronized (connectionGuard) {
|
||||
keepConnecting = true;
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Removing consumer on stopRunnable " + this + " from queue " + queue);
|
||||
}
|
||||
ActiveMQServerLogger.LOGGER.bridgeStopped(name);
|
||||
} catch (InterruptedException | RuntimeException e) {
|
||||
ActiveMQServerLogger.LOGGER.error("Failed to stop bridge", e);
|
||||
}
|
||||
|
||||
if (session != null) {
|
||||
logger.debug("Cleaning up session " + session);
|
||||
session.removeFailureListener(BridgeImpl.this);
|
||||
try {
|
||||
session.close();
|
||||
session = null;
|
||||
} catch (ActiveMQException dontcare) {
|
||||
}
|
||||
}
|
||||
|
||||
if (sessionConsumer != null) {
|
||||
logger.debug("Cleaning up session " + session);
|
||||
try {
|
||||
sessionConsumer.close();
|
||||
sessionConsumer = null;
|
||||
} catch (ActiveMQException dontcare) {
|
||||
}
|
||||
}
|
||||
|
||||
internalCancelReferences();
|
||||
|
||||
if (csf != null) {
|
||||
csf.cleanup();
|
||||
}
|
||||
|
||||
synchronized (connectionGuard) {
|
||||
keepConnecting = true;
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Removing consumer on stopRunnable " + this + " from queue " + queue);
|
||||
}
|
||||
ActiveMQServerLogger.LOGGER.bridgeStopped(name);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1616,7 +1616,7 @@ public class JMSBridgeTest extends BridgeTestBase {
|
|||
}
|
||||
|
||||
try {
|
||||
final long MAX_BATCH_TIME = 3000;
|
||||
final long MAX_BATCH_TIME = 300;
|
||||
|
||||
final int MAX_BATCH_SIZE = 100000; // something big so it won't reach it
|
||||
|
||||
|
@ -1655,7 +1655,7 @@ public class JMSBridgeTest extends BridgeTestBase {
|
|||
}
|
||||
|
||||
try {
|
||||
final long MAX_BATCH_TIME = 3000;
|
||||
final long MAX_BATCH_TIME = 300;
|
||||
|
||||
final int MAX_BATCH_SIZE = 100000; // something big so it won't reach it
|
||||
|
||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
|||
import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -144,6 +146,15 @@ public class ReceiveTest extends ActiveMQTestBase {
|
|||
cp.send(sendSession.createMessage(false));
|
||||
sendSession.commit();
|
||||
|
||||
final Queue queue = server.locateQueue(queueA);
|
||||
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return queue.getMessageCount() == 3;
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertNotNull(cc2.receive(5000));
|
||||
Assert.assertNotNull(cc.receive(5000));
|
||||
if (cc.receiveImmediate() == null) {
|
||||
|
|
|
@ -166,7 +166,7 @@ public class BridgeTest extends ActiveMQTestBase {
|
|||
connectors.put(server1tc.getName(), server1tc);
|
||||
server0.getConfiguration().setConnectorConfigurations(connectors);
|
||||
|
||||
final int messageSize = 1024 * 1024 * 5;
|
||||
final int messageSize = 1024 * 200;
|
||||
|
||||
final int numMessages = 10;
|
||||
|
||||
|
@ -220,7 +220,7 @@ public class BridgeTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage message = consumer1.receive(500000);
|
||||
ClientMessage message = consumer1.receive(5000);
|
||||
|
||||
Assert.assertNotNull(message);
|
||||
|
||||
|
|
|
@ -1175,6 +1175,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
|||
ClientSession session = addClientSession(factory.createSession());
|
||||
server.createQueue(queueName, queueName, null, false, false);
|
||||
addClientConsumer(session.createConsumer(queueName));
|
||||
Thread.sleep(100); // We check the timestamp for the creation time. We need to make sure it's different
|
||||
addClientConsumer(session.createConsumer(queueName, SimpleString.toSimpleString(filter), true));
|
||||
|
||||
String jsonString = serverControl.listConsumersAsJSON(factory.getConnection().getID().toString());
|
||||
|
|
|
@ -1616,7 +1616,7 @@ public class QueueControlTest extends ManagementTestBase {
|
|||
|
||||
ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer);
|
||||
serverControl.enableMessageCounters();
|
||||
serverControl.setMessageCounterSamplePeriod(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD);
|
||||
serverControl.setMessageCounterSamplePeriod(100);
|
||||
|
||||
String jsonString = queueControl.listMessageCounter();
|
||||
MessageCounterInfo info = MessageCounterInfo.fromJSON(jsonString);
|
||||
|
@ -1627,7 +1627,7 @@ public class QueueControlTest extends ManagementTestBase {
|
|||
ClientProducer producer = session.createProducer(address);
|
||||
producer.send(session.createMessage(false));
|
||||
|
||||
Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
|
||||
Thread.sleep(200);
|
||||
jsonString = queueControl.listMessageCounter();
|
||||
info = MessageCounterInfo.fromJSON(jsonString);
|
||||
Assert.assertEquals(1, info.getDepth());
|
||||
|
@ -1637,7 +1637,7 @@ public class QueueControlTest extends ManagementTestBase {
|
|||
|
||||
producer.send(session.createMessage(false));
|
||||
|
||||
Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
|
||||
Thread.sleep(200);
|
||||
jsonString = queueControl.listMessageCounter();
|
||||
info = MessageCounterInfo.fromJSON(jsonString);
|
||||
Assert.assertEquals(2, info.getDepth());
|
||||
|
@ -1647,7 +1647,7 @@ public class QueueControlTest extends ManagementTestBase {
|
|||
|
||||
consumeMessages(2, session, queue);
|
||||
|
||||
Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
|
||||
Thread.sleep(200);
|
||||
jsonString = queueControl.listMessageCounter();
|
||||
info = MessageCounterInfo.fromJSON(jsonString);
|
||||
Assert.assertEquals(0, info.getDepth());
|
||||
|
|
|
@ -279,7 +279,7 @@ public class PagingWithFailoverAndCountersTest extends ActiveMQTestBase {
|
|||
public void testValidateDeliveryAndCounters() throws Exception {
|
||||
startLive();
|
||||
|
||||
ServerLocator locator = SpawnedServerSupport.createLocator(PORT1).setInitialConnectAttempts(100).setReconnectAttempts(-1).setRetryInterval(100);
|
||||
ServerLocator locator = SpawnedServerSupport.createLocator(PORT1).setInitialConnectAttempts(-1).setReconnectAttempts(-1).setRetryInterval(100);
|
||||
|
||||
ClientSessionFactory factory = locator.createSessionFactory();
|
||||
|
||||
|
|
Loading…
Reference in New Issue