ARTEMIS-883 Fix OpenWire ProducerFlowControlTest

This commit is contained in:
Howard Gao 2016-12-20 21:13:38 +08:00 committed by Clebert Suconic
parent 0b131bddfb
commit ae90edfdb6
4 changed files with 255 additions and 90 deletions

View File

@ -183,6 +183,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<>();
private AtomicBoolean disableTtl = new AtomicBoolean(false);
// TODO-NOW: check on why there are two connections created for every createConnection on the client.
public OpenWireConnection(Connection connection,
ActiveMQServer server,
@ -776,6 +778,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
this.connectionEntry = connectionEntry;
}
@Override
public boolean checkDataReceived() {
if (disableTtl.get()) {
return true;
}
return super.checkDataReceived();
}
public void setUpTtl(final long inactivityDuration,
final long inactivityDurationInitialDelay,
final boolean useKeepAlive) {
@ -818,6 +828,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
public void disableTtl() {
disableTtl.set(true);
}
public void enableTtl() {
disableTtl.set(false);
}
class SlowConsumerDetection implements SlowConsumerDetectionListener {
@Override

View File

@ -40,9 +40,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
@ -53,12 +51,12 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.wireformat.WireFormat;
public class AMQSession implements SessionCallback {
// ConsumerID is generated inside the session, 0, 1, 2, ... as many consumers as you have on the session
protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
@ -303,108 +301,103 @@ public class AMQSession implements SessionCallback {
originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
}
Runnable runnable;
boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
if (sendProducerAck) {
runnable = new Runnable() {
@Override
public void run() {
try {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
connection.dispatchSync(ack);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
connection.sendException(e);
}
final AtomicInteger count = new AtomicInteger(actualDestinations.length);
}
};
} else {
final Connection transportConnection = connection.getTransportConnection();
final Exception[] anyException = new Exception[] {null};
if (transportConnection == null) {
// I don't think this could happen, but just in case, avoiding races
runnable = null;
} else {
runnable = new Runnable() {
@Override
public void run() {
transportConnection.setAutoRead(true);
}
};
}
if (shouldBlockProducer) {
connection.getContext().setDontSendReponse(true);
}
internalSend(actualDestinations, originalCoreMsg, runnable);
}
private void internalSend(ActiveMQDestination[] actualDestinations,
ServerMessage originalCoreMsg,
final Runnable onComplete) throws Exception {
Runnable runToUse;
if (actualDestinations.length <= 1 || onComplete == null) {
// if onComplete is null, this will be null ;)
runToUse = onComplete;
} else {
final AtomicInteger count = new AtomicInteger(actualDestinations.length);
runToUse = new Runnable() {
@Override
public void run() {
if (count.decrementAndGet() == 0) {
onComplete.run();
}
}
};
}
SimpleString[] addresses = new SimpleString[actualDestinations.length];
PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
// We fillup addresses, pagingStores and we will throw failure if that's the case
for (int i = 0; i < actualDestinations.length; i++) {
ActiveMQDestination dest = actualDestinations[i];
addresses[i] = new SimpleString(dest.getPhysicalName());
pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
throw new ResourceAllocationException("Queue is full");
}
}
for (int i = 0; i < actualDestinations.length; i++) {
SimpleString address = new SimpleString(dest.getPhysicalName());
ServerMessage coreMsg = originalCoreMsg.copy();
coreMsg.setAddress(addresses[i]);
PagingStore store = pagingStores[i];
if (store.isFull()) {
connection.getTransportConnection().setAutoRead(false);
}
coreMsg.setAddress(address);
if (actualDestinations[i].isQueue()) {
checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
}
if (actualDestinations[i].isQueue()) {
coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
} else {
coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
}
RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
PagingStore store = server.getPagingManager().getPageStore(address);
if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
}
if (runToUse != null) {
// if the timeout is >0, it will wait this much milliseconds
// before running the the runToUse
// this will eventually unblock blocked destinations
// playing flow control
store.checkMemory(runToUse);
this.connection.disableTtl();
if (shouldBlockProducer) {
if (!store.checkMemory(() -> {
try {
RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
}
} catch (Exception e) {
if (anyException[0] == null) {
anyException[0] = e;
}
}
connection.enableTtl();
if (count.decrementAndGet() == 0) {
if (anyException[0] != null) {
this.connection.getContext().setDontSendReponse(false);
ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
connection.sendException(anyException[0]);
} else {
if (sendProducerAck) {
try {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
connection.dispatchAsync(ack);
} catch (Exception e) {
this.connection.getContext().setDontSendReponse(false);
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
connection.sendException(e);
}
} else {
connection.getContext().setDontSendReponse(false);
try {
Response response = new Response();
response.setCorrelationId(messageSend.getCommandId());
connection.dispatchAsync(response);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
connection.sendException(e);
}
}
}
}
})) {
this.connection.getContext().setDontSendReponse(false);
connection.enableTtl();
throw new ResourceAllocationException("Queue is full " + address);
}
} else {
//non-persistent messages goes here, by default we stop reading from
//transport
connection.getTransportConnection().setAutoRead(false);
if (!store.checkMemory(() -> {
connection.getTransportConnection().setAutoRead(true);
connection.enableTtl();
})) {
connection.getTransportConnection().setAutoRead(true);
connection.enableTtl();
throw new ResourceAllocationException("Queue is full " + address);
}
RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
}
if (count.decrementAndGet() == 0) {
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
connection.dispatchAsync(ack);
}
}
}
}
}

View File

@ -45,8 +45,8 @@ public class BasicOpenWireTest extends OpenWireTestBase {
public TestName name = new TestName();
protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true";
protected ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
protected ActiveMQXAConnectionFactory xaFactory = new ActiveMQXAConnectionFactory(urlString);
protected ActiveMQConnectionFactory factory;
protected ActiveMQXAConnectionFactory xaFactory;
protected ActiveMQConnection connection;
protected String topicName = "amqTestTopic1";
@ -64,6 +64,9 @@ public class BasicOpenWireTest extends OpenWireTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", "5");
factory = new ActiveMQConnectionFactory(getConnectionUrl());
xaFactory = new ActiveMQXAConnectionFactory(getConnectionUrl());
SimpleString coreQueue = new SimpleString(queueName);
this.server.createQueue(coreQueue, RoutingType.ANYCAST, coreQueue, null, false, false, -1, false, true);
testQueues.put(queueName, coreQueue);
@ -81,6 +84,10 @@ public class BasicOpenWireTest extends OpenWireTestBase {
}
}
protected String getConnectionUrl() {
return urlString;
}
@Override
@After
public void tearDown() throws Exception {

View File

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.openwire.amq;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
public class ProducerBlockingTtlTest extends BasicOpenWireTest {
ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
protected ActiveMQConnection flowControlConnection;
@Override
protected void extraServerConfig(Configuration serverConfig) {
String match = "#";
Map<String, AddressSettings> asMap = serverConfig.getAddressesSettings();
asMap.get(match).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
this.makeSureCoreQueueExist("QUEUE.A");
}
@Override
@After
public void tearDown() throws Exception {
try {
if (flowControlConnection != null) {
TcpTransport t = flowControlConnection.getTransport().narrow(TcpTransport.class);
try {
flowControlConnection.getTransport().stop();
flowControlConnection.close();
} catch (Throwable ignored) {
}
}
} finally {
super.tearDown();
}
}
//set ttl to 1000
@Override
protected String getConnectionUrl() {
return urlString + "&wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000";
}
@Test
public void testProducerBlockWontGetTimeout() throws Exception {
flowControlConnection = (ActiveMQConnection) factory.createConnection();
Connection consumerConnection = factory.createConnection();
Thread fillThread = null;
AtomicBoolean keepGoing = new AtomicBoolean(true);
try {
flowControlConnection.start();
final Session session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(queueA);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
final String text = "Hello World";
final int num = 10;
fillThread = new Thread("Fill thread.") {
@Override
public void run() {
try {
for (int i = 0; i < num && keepGoing.get(); i++) {
producer.send(session.createTextMessage(text + i));
}
} catch (JMSException e) {
}
}
};
fillThread.start();
//longer enough than TTL (1000)
Thread.sleep(4000);
//receive messages and unblock the producer
consumerConnection.start();
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(queueA);
for (int i = 0; i < num; i++) {
TextMessage m = (TextMessage) consumer.receive(5000);
assertNotNull(m);
assertEquals("Hello World" + i, m.getText());
}
assertNull(consumer.receive(3));
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fillThread != null) {
keepGoing.set(false);
fillThread.interrupt();
fillThread.join();
}
try {
flowControlConnection.close();
flowControlConnection = null;
} catch (Throwable t) {
}
try {
consumerConnection.close();
} catch (Throwable t) {
}
}
}
}