Add support for aborting a slow AMQP consumer and some testing.
This commit is contained in:
Timothy Bish 2015-04-07 10:25:19 -04:00
parent 61da1faa4c
commit 25c99a6c36
11 changed files with 349 additions and 43 deletions

View File

@ -46,6 +46,7 @@ import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
@ -538,6 +539,12 @@ public class AmqpConnection implements AmqpProtocolConverter {
// Pass down any unexpected async errors. Should this close the connection?
Throwable exception = ((ConnectionError) command).getException();
handleException(exception);
} else if (command.isConsumerControl()) {
ConsumerControl control = (ConsumerControl) command;
AmqpSender sender = subscriptionsByConsumerId.get(control.getConsumerId());
if (sender != null) {
sender.onConsumerControl(control);
}
} else if (command.isBrokerInfo()) {
// ignore
} else {

View File

@ -46,7 +46,9 @@ import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
@ -309,6 +311,21 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
}
}
/**
* Called when the Broker sends a ConsumerControl command to the Consumer that
* this sender creates to obtain messages to dispatch via the sender for this
* end of the open link.
*
* @param control
* The ConsumerControl command to process.
*/
public void onConsumerControl(ConsumerControl control) {
if (control.isClose()) {
close(new ErrorCondition(AmqpError.INTERNAL_ERROR, "Receiver forcably closed"));
session.pumpProtonToSocket();
}
}
@Override
public String toString() {
return "AmqpSender {" + getConsumerId() + "}";

View File

@ -107,6 +107,8 @@ public class AmqpTestSupport {
brokerService.setUseJmx(true);
brokerService.getManagementContext().setCreateConnector(false);
performAdditionalConfiguration(brokerService);
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
SSLContext.setDefault(ctx);
@ -132,6 +134,10 @@ public class AmqpTestSupport {
addTranportConnectors();
}
protected void performAdditionalConfiguration(BrokerService brokerService) throws Exception {
}
protected void addTranportConnectors() throws Exception {
TransportConnector connector = null;

View File

@ -339,7 +339,6 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
connection.close();
}
//@Ignore("Test fails currently due to improper implementation of drain.")
@Test(timeout = 60000)
public void testReceiverCanDrainMessages() throws Exception {
int MSG_COUNT = 20;

View File

@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.TimeUnit;
import javax.management.InstanceNotFoundException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.util.Wait;
import org.junit.Test;
/**
* Test the handling of consumer abort when the AbortSlowConsumerStrategy is used.
*/
public class AmqpSlowReceiverTest extends AmqpClientTestSupport {
private final long DEFAULT_CHECK_PERIOD = 1000;
private final long DEFAULT_MAX_SLOW_DURATION = 3000;
private AbortSlowConsumerStrategy strategy;
@Test(timeout = 60 * 1000)
public void testSlowConsumerIsAborted() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
final AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
receiver.flow(100);
assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
sendMessages(getTestName(), 100, false);
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
assertTrue("Receiver should be closed", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return receiver.isClosed();
}
}));
assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
}
@Test
public void testSlowConsumerIsAbortedViaJmx() throws Exception {
strategy.setMaxSlowDuration(60*1000); // so jmx does the abort
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
final AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
receiver.flow(100);
sendMessages(getTestName(), 100, false);
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
QueueViewMBean queue = getProxyToQueue(getTestName());
ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
assertNotNull(slowConsumerPolicyMBeanName);
AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean)
brokerService.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true);
TimeUnit.SECONDS.sleep(3);
TabularData slowOnes = abortPolicy.getSlowConsumers();
assertEquals("one slow consumers", 1, slowOnes.size());
LOG.info("slow ones:" + slowOnes);
CompositeData slowOne = (CompositeData) slowOnes.values().iterator().next();
LOG.info("Slow one: " + slowOne);
assertTrue("we have an object name", slowOne.get("subscription") instanceof ObjectName);
abortPolicy.abortConsumer((ObjectName)slowOne.get("subscription"));
assertTrue("Receiver should be closed", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return receiver.isClosed();
}
}));
slowOnes = abortPolicy.getSlowConsumers();
assertEquals("no slow consumers left", 0, slowOnes.size());
// verify mbean gone with destination
brokerService.getAdminView().removeQueue(getTestName());
try {
abortPolicy.getSlowConsumers();
fail("expect not found post destination removal");
} catch(UndeclaredThrowableException expected) {
assertTrue("correct exception: " + expected.getCause(),
expected.getCause() instanceof InstanceNotFoundException);
}
}
@Override
protected boolean isUseOpenWireConnector() {
return true;
}
@Override
protected void performAdditionalConfiguration(BrokerService brokerService) throws Exception {
strategy = new AbortSlowConsumerStrategy();
strategy.setAbortConnection(false);
strategy.setCheckPeriod(DEFAULT_CHECK_PERIOD);
strategy.setMaxSlowDuration(DEFAULT_MAX_SLOW_DURATION);
PolicyEntry policy = new PolicyEntry();
policy.setSlowConsumerStrategy(strategy);
policy.setQueuePrefetch(10);
policy.setTopicPrefetch(10);
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
brokerService.setDestinationPolicy(pMap);
}
}

View File

@ -17,34 +17,36 @@
package org.apache.activemq.command;
import java.util.Map;
import org.apache.activemq.util.IntrospectionSupport;
/**
*
*
* @openwire:marshaller
*
*
*/
public abstract class BaseCommand implements Command {
protected int commandId;
protected boolean responseRequired;
private transient Endpoint from;
private transient Endpoint to;
public void copy(BaseCommand copy) {
copy.commandId = commandId;
copy.responseRequired = responseRequired;
}
}
/**
* @openwire:property version=1
*/
@Override
public int getCommandId() {
return commandId;
}
@Override
public void setCommandId(int commandId) {
this.commandId = commandId;
}
@ -52,10 +54,12 @@ public abstract class BaseCommand implements Command {
/**
* @openwire:property version=1
*/
@Override
public boolean isResponseRequired() {
return responseRequired;
}
@Override
public void setResponseRequired(boolean responseRequired) {
this.responseRequired = responseRequired;
}
@ -64,72 +68,90 @@ public abstract class BaseCommand implements Command {
public String toString() {
return toString(null);
}
public String toString(Map<String, Object>overrideFields) {
return IntrospectionSupport.toString(this, BaseCommand.class, overrideFields);
public String toString(Map<String, Object> overrideFields) {
return IntrospectionSupport.toString(this, BaseCommand.class, overrideFields);
}
@Override
public boolean isWireFormatInfo() {
return false;
}
@Override
public boolean isBrokerInfo() {
return false;
}
@Override
public boolean isResponse() {
return false;
}
@Override
public boolean isMessageDispatch() {
return false;
}
@Override
public boolean isMessage() {
return false;
}
@Override
public boolean isMarshallAware() {
return false;
}
@Override
public boolean isMessageAck() {
return false;
}
@Override
public boolean isMessageDispatchNotification() {
return false;
}
@Override
public boolean isShutdownInfo() {
return false;
}
@Override
public boolean isConnectionControl() {
return false;
}
@Override
public boolean isConsumerControl() {
return false;
}
/**
* The endpoint within the transport where this message came from.
*/
@Override
public Endpoint getFrom() {
return from;
}
@Override
public void setFrom(Endpoint from) {
this.from = from;
}
/**
* The endpoint within the transport where this message is going to - null means all endpoints.
* The endpoint within the transport where this message is going to - null
* means all endpoints.
*/
@Override
public Endpoint getTo() {
return to;
}
@Override
public void setTo(Endpoint to) {
this.to = to;
}
}

View File

@ -21,8 +21,6 @@ import org.apache.activemq.state.CommandVisitor;
/**
* The Command Pattern so that we can send and receive commands on the different
* transports
*
*
*/
public interface Command extends DataStructure {
@ -52,9 +50,11 @@ public interface Command extends DataStructure {
boolean isMessageDispatchNotification();
boolean isShutdownInfo();
boolean isConnectionControl();
boolean isConsumerControl();
Response visit(CommandVisitor visitor) throws Exception;
/**

View File

@ -20,9 +20,9 @@ import org.apache.activemq.state.CommandVisitor;
/**
* Used to start and stop transports as well as terminating clients.
*
*
* @openwire:marshaller code="17"
*
*
*/
public class ConsumerControl extends BaseCommand {
@ -48,14 +48,21 @@ public class ConsumerControl extends BaseCommand {
this.destination = destination;
}
@Override
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
@Override
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processConsumerControl(this);
}
@Override
public boolean isConsumerControl() {
return true;
}
/**
* @openwire:property version=1
* @return Returns the close.
@ -65,7 +72,8 @@ public class ConsumerControl extends BaseCommand {
}
/**
* @param close The close to set.
* @param close
* The new value to assign the close state flag.
*/
public void setClose(boolean close) {
this.close = close;
@ -80,7 +88,8 @@ public class ConsumerControl extends BaseCommand {
}
/**
* @param consumerId The consumerId to set.
* @param consumerId
* The consumerId to set.
*/
public void setConsumerId(ConsumerId consumerId) {
this.consumerId = consumerId;
@ -95,7 +104,8 @@ public class ConsumerControl extends BaseCommand {
}
/**
* @param prefetch The prefetch to set.
* @param prefetch
* The prefetch to set.
*/
public void setPrefetch(int prefetch) {
this.prefetch = prefetch;
@ -110,7 +120,8 @@ public class ConsumerControl extends BaseCommand {
}
/**
* @param flush the flush to set
* @param flush
* The flush value to set on this command.
*/
public void setFlush(boolean flush) {
this.flush = flush;
@ -125,7 +136,8 @@ public class ConsumerControl extends BaseCommand {
}
/**
* @param start the start to set
* @param start
* The start value to set on this command.
*/
public void setStart(boolean start) {
this.start = start;
@ -140,7 +152,8 @@ public class ConsumerControl extends BaseCommand {
}
/**
* @param stop the stop to set
* @param stop
* the stop value to set on this Command.
*/
public void setStop(boolean stop) {
this.stop = stop;

View File

@ -21,9 +21,9 @@ import org.apache.activemq.state.CommandVisitor;
/**
* Represents a partial command; a large command that has been split up into
* pieces.
*
*
* @openwire:marshaller code="60"
*
*
*/
public class PartialCommand implements Command {
@ -38,6 +38,7 @@ public class PartialCommand implements Command {
public PartialCommand() {
}
@Override
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
@ -45,17 +46,19 @@ public class PartialCommand implements Command {
/**
* @openwire:property version=1
*/
@Override
public int getCommandId() {
return commandId;
}
@Override
public void setCommandId(int commandId) {
this.commandId = commandId;
}
/**
* The data for this part of the command
*
*
* @openwire:property version=1 mandatory=true
*/
public byte[] getData() {
@ -66,79 +69,102 @@ public class PartialCommand implements Command {
this.data = data;
}
@Override
public Endpoint getFrom() {
return from;
}
@Override
public void setFrom(Endpoint from) {
this.from = from;
}
@Override
public Endpoint getTo() {
return to;
}
@Override
public void setTo(Endpoint to) {
this.to = to;
}
@Override
public Response visit(CommandVisitor visitor) throws Exception {
throw new IllegalStateException("The transport layer should filter out PartialCommand instances but received: " + this);
}
@Override
public boolean isResponseRequired() {
return false;
}
@Override
public boolean isResponse() {
return false;
}
@Override
public boolean isBrokerInfo() {
return false;
}
@Override
public boolean isMessageDispatch() {
return false;
}
@Override
public boolean isMessage() {
return false;
}
@Override
public boolean isMessageAck() {
return false;
}
@Override
public boolean isMessageDispatchNotification() {
return false;
}
@Override
public boolean isShutdownInfo() {
return false;
}
@Override
public boolean isConnectionControl() {
return false;
}
@Override
public boolean isConsumerControl() {
return false;
}
@Override
public void setResponseRequired(boolean responseRequired) {
}
@Override
public boolean isWireFormatInfo() {
return false;
}
@Override
public boolean isMarshallAware() {
return false;
}
@Override
public String toString() {
int size = 0;
if (data != null) {
size = data.length;
}
return "PartialCommand[id: " + commandId + " data: " + size + " byte(s)]";
}
}
}

View File

@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
@ -33,7 +34,7 @@ import org.fusesource.hawtbuf.UTF8Buffer;
/**
* @openwire:marshaller code="1"
*
*
*/
public class WireFormatInfo implements Command, MarshallAware {
@ -49,14 +50,17 @@ public class WireFormatInfo implements Command, MarshallAware {
private transient Endpoint from;
private transient Endpoint to;
@Override
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
@Override
public boolean isWireFormatInfo() {
return true;
}
@Override
public boolean isMarshallAware() {
return true;
}
@ -97,10 +101,12 @@ public class WireFormatInfo implements Command, MarshallAware {
/**
* The endpoint within the transport where this message came from.
*/
@Override
public Endpoint getFrom() {
return from;
}
@Override
public void setFrom(Endpoint from) {
this.from = from;
}
@ -109,16 +115,18 @@ public class WireFormatInfo implements Command, MarshallAware {
* The endpoint within the transport where this message is going to - null
* means all endpoints.
*/
@Override
public Endpoint getTo() {
return to;
}
@Override
public void setTo(Endpoint to) {
this.to = to;
}
// ////////////////////
//
//
// Implementation Methods.
//
// ////////////////////
@ -169,6 +177,7 @@ public class WireFormatInfo implements Command, MarshallAware {
return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)), MAX_PROPERTY_SIZE);
}
@Override
public void beforeMarshall(WireFormat wireFormat) throws IOException {
// Need to marshal the properties.
if (marshalledProperties == null && properties != null) {
@ -180,12 +189,15 @@ public class WireFormatInfo implements Command, MarshallAware {
}
}
@Override
public void afterMarshall(WireFormat wireFormat) throws IOException {
}
@Override
public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
}
@Override
public void afterUnmarshall(WireFormat wireFormat) throws IOException {
}
@ -193,6 +205,7 @@ public class WireFormatInfo implements Command, MarshallAware {
return magic != null && Arrays.equals(magic, MAGIC);
}
@Override
public void setResponseRequired(boolean responseRequired) {
}
@ -256,7 +269,7 @@ public class WireFormatInfo implements Command, MarshallAware {
if( buff == null ) {
return null;
}
return (String) buff.toString();
return buff.toString();
}
public void setHost(String hostname) throws IOException {
@ -274,7 +287,7 @@ public class WireFormatInfo implements Command, MarshallAware {
public void setMaxInactivityDuration(long maxInactivityDuration) throws IOException {
setProperty("MaxInactivityDuration", new Long(maxInactivityDuration));
}
public long getMaxInactivityDurationInitalDelay() throws IOException {
Long l = (Long)getProperty("MaxInactivityDurationInitalDelay");
return l == null ? 0 : l.longValue();
@ -292,8 +305,6 @@ public class WireFormatInfo implements Command, MarshallAware {
public void setMaxFrameSize(long maxFrameSize) throws IOException {
setProperty("MaxFrameSize", new Long(maxFrameSize));
}
/**
* @throws IOException
@ -307,6 +318,7 @@ public class WireFormatInfo implements Command, MarshallAware {
setProperty("CacheSize", new Integer(cacheSize));
}
@Override
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processWireFormat(this);
}
@ -340,54 +352,69 @@ public class WireFormatInfo implements Command, MarshallAware {
//
// /////////////////////////////////////////////////////////////
@Override
public void setCommandId(int value) {
}
@Override
public int getCommandId() {
return 0;
}
@Override
public boolean isResponseRequired() {
return false;
}
@Override
public boolean isResponse() {
return false;
}
@Override
public boolean isBrokerInfo() {
return false;
}
@Override
public boolean isMessageDispatch() {
return false;
}
@Override
public boolean isMessage() {
return false;
}
@Override
public boolean isMessageAck() {
return false;
}
@Override
public boolean isMessageDispatchNotification() {
return false;
}
@Override
public boolean isShutdownInfo() {
return false;
}
@Override
public boolean isConnectionControl() {
return false;
}
@Override
public boolean isConsumerControl() {
return false;
}
public void setCachedMarshalledForm(WireFormat wireFormat, ByteSequence data) {
}
public ByteSequence getCachedMarshalledForm(WireFormat wireFormat) {
return null;
}
}

View File

@ -94,85 +94,108 @@ public class StompFrame implements Command {
this.headers = headers;
}
//
// Methods in the Command interface
//
@Override
public int getCommandId() {
return 0;
}
@Override
public Endpoint getFrom() {
return null;
}
@Override
public Endpoint getTo() {
return null;
}
@Override
public boolean isBrokerInfo() {
return false;
}
@Override
public boolean isMessage() {
return false;
}
@Override
public boolean isMessageAck() {
return false;
}
@Override
public boolean isMessageDispatch() {
return false;
}
@Override
public boolean isMessageDispatchNotification() {
return false;
}
@Override
public boolean isResponse() {
return false;
}
@Override
public boolean isResponseRequired() {
return false;
}
@Override
public boolean isShutdownInfo() {
return false;
}
@Override
public boolean isConnectionControl() {
return false;
}
@Override
public boolean isConsumerControl() {
return false;
}
@Override
public boolean isWireFormatInfo() {
return false;
}
@Override
public void setCommandId(int value) {
}
@Override
public void setFrom(Endpoint from) {
}
@Override
public void setResponseRequired(boolean responseRequired) {
}
@Override
public void setTo(Endpoint to) {
}
@Override
public Response visit(CommandVisitor visitor) throws Exception {
return null;
}
@Override
public byte getDataStructureType() {
return 0;
}
@Override
public boolean isMarshallAware() {
return false;
}
@Override
public String toString() {
return format(true);
}