mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-4226 - ensure potential error condition (starved rar endpoint listener) is flagged in log
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1422928 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f65857ae97
commit
757a2f1d32
|
@ -2337,6 +2337,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
for (ActiveMQSession session : this.sessions) {
|
||||
session.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
|
||||
}
|
||||
for (ActiveMQConnectionConsumer connectionConsumer: connectionConsumers) {
|
||||
ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
|
||||
if (consumerInfo.getConsumerId().equals(command.getConsumerId())) {
|
||||
consumerInfo.setPrefetchSize(command.getPrefetch());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ import javax.jms.ServerSession;
|
|||
import javax.jms.ServerSessionPool;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.command.ConsumerId;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
import org.apache.activemq.command.MessageDispatch;
|
||||
|
||||
|
@ -76,7 +77,7 @@ public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQD
|
|||
|
||||
this.connection.addConnectionConsumer(this);
|
||||
this.connection.addDispatcher(consumerInfo.getConsumerId(), this);
|
||||
this.connection.asyncSendPacket(this.consumerInfo);
|
||||
this.connection.syncSendPacket(this.consumerInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -160,4 +161,8 @@ public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQD
|
|||
// Till there is a need, lets immediately allow dispatch
|
||||
this.connection.transportInterruptionProcessingComplete();
|
||||
}
|
||||
|
||||
public ConsumerInfo getConsumerInfo() {
|
||||
return consumerInfo;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ import javax.resource.spi.work.WorkException;
|
|||
import javax.resource.spi.work.WorkManager;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionConsumer;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
|
@ -74,7 +75,7 @@ public class ActiveMQEndpointWorker {
|
|||
private final Object shutdownMutex = new String("shutdownMutex");
|
||||
|
||||
private ActiveMQConnection connection;
|
||||
private ConnectionConsumer consumer;
|
||||
private ActiveMQConnectionConsumer consumer;
|
||||
private ServerSessionPoolImpl serverSessionPool;
|
||||
private boolean running;
|
||||
|
||||
|
@ -127,7 +128,7 @@ public class ActiveMQEndpointWorker {
|
|||
connection.start();
|
||||
|
||||
if (activationSpec.isDurableSubscription()) {
|
||||
consumer = connection.createDurableConnectionConsumer(
|
||||
consumer = (ActiveMQConnectionConsumer) connection.createDurableConnectionConsumer(
|
||||
(Topic) dest,
|
||||
activationSpec.getSubscriptionName(),
|
||||
emptyToNull(activationSpec.getMessageSelector()),
|
||||
|
@ -135,7 +136,7 @@ public class ActiveMQEndpointWorker {
|
|||
connection.getPrefetchPolicy().getDurableTopicPrefetch(),
|
||||
activationSpec.getNoLocalBooleanValue());
|
||||
} else {
|
||||
consumer = connection.createConnectionConsumer(
|
||||
consumer = (ActiveMQConnectionConsumer) connection.createConnectionConsumer(
|
||||
dest,
|
||||
emptyToNull(activationSpec.getMessageSelector()),
|
||||
serverSessionPool,
|
||||
|
@ -151,6 +152,11 @@ public class ActiveMQEndpointWorker {
|
|||
} else {
|
||||
LOG.error("Could not release connection lock");
|
||||
}
|
||||
|
||||
if (consumer.getConsumerInfo().getCurrentPrefetchSize() == 0) {
|
||||
LOG.error("Endpoint " + endpointActivationKey.getActivationSpec() + " will not receive any messages due to broker 'zero prefetch' configuration for: " + dest);
|
||||
}
|
||||
|
||||
} catch (JMSException error) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Failed to connect: " + error.getMessage(), error);
|
||||
|
|
|
@ -181,7 +181,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
|
|||
}
|
||||
|
||||
if (!(activationSpec instanceof MessageActivationSpec)) {
|
||||
throw new NotSupportedException("That type of ActicationSpec not supported: " + activationSpec.getClass());
|
||||
throw new NotSupportedException("That type of ActivationSpec not supported: " + activationSpec.getClass());
|
||||
}
|
||||
|
||||
ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec);
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.lang.reflect.Method;
|
|||
import java.util.Timer;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
|
@ -49,9 +50,19 @@ import javax.transaction.xa.Xid;
|
|||
import junit.framework.TestCase;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.advisory.AdvisorySupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
import org.apache.log4j.Appender;
|
||||
import org.apache.log4j.Layout;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.spi.ErrorHandler;
|
||||
import org.apache.log4j.spi.Filter;
|
||||
import org.apache.log4j.spi.LoggingEvent;
|
||||
|
||||
public class MDBTest extends TestCase {
|
||||
|
||||
|
@ -197,6 +208,147 @@ public class MDBTest extends TestCase {
|
|||
|
||||
}
|
||||
|
||||
public void testErrorOnNoMessageDeliveryBrokerZeroPrefetchConfig() throws Exception {
|
||||
|
||||
final BrokerService brokerService = new BrokerService();
|
||||
final String brokerUrl = "vm://zeroPrefetch?create=false";
|
||||
brokerService.setBrokerName("zeroPrefetch");
|
||||
brokerService.setPersistent(false);
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry zeroPrefetchPolicy = new PolicyEntry();
|
||||
zeroPrefetchPolicy.setQueuePrefetch(0);
|
||||
policyMap.setDefaultEntry(zeroPrefetchPolicy);
|
||||
brokerService.setDestinationPolicy(policyMap);
|
||||
brokerService.start();
|
||||
|
||||
final AtomicReference<String> errorMessage = new AtomicReference<String>();
|
||||
final Appender testAppender = new Appender() {
|
||||
|
||||
@Override
|
||||
public void addFilter(Filter filter) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Filter getFilter() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearFilters() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doAppend(LoggingEvent event) {
|
||||
if (event.getLevel().isGreaterOrEqual(Level.ERROR)) {
|
||||
errorMessage.set(event.getRenderedMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setErrorHandler(ErrorHandler errorHandler) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ErrorHandler getErrorHandler() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLayout(Layout layout) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Layout getLayout() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setName(String s) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean requiresLayout() {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
LogManager.getRootLogger().addAppender(testAppender);
|
||||
|
||||
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
|
||||
Connection connection = factory.createConnection();
|
||||
connection.start();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
MessageConsumer advisory = session.createConsumer(AdvisorySupport.getConsumerAdvisoryTopic(new ActiveMQQueue("TEST")));
|
||||
|
||||
ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
|
||||
adapter.setServerUrl(brokerUrl);
|
||||
adapter.start(new StubBootstrapContext());
|
||||
|
||||
final CountDownLatch messageDelivered = new CountDownLatch(1);
|
||||
|
||||
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
|
||||
public void onMessage(Message message) {
|
||||
super.onMessage(message);
|
||||
messageDelivered.countDown();
|
||||
};
|
||||
};
|
||||
|
||||
ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
|
||||
activationSpec.setDestinationType(Queue.class.getName());
|
||||
activationSpec.setDestination("TEST");
|
||||
activationSpec.setResourceAdapter(adapter);
|
||||
activationSpec.validate();
|
||||
|
||||
MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
|
||||
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
|
||||
endpoint.xaresource = resource;
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
// Activate an Endpoint
|
||||
adapter.endpointActivation(messageEndpointFactory, activationSpec);
|
||||
|
||||
ActiveMQMessage msg = (ActiveMQMessage)advisory.receive(1000);
|
||||
if (msg != null) {
|
||||
assertEquals("Prefetch size hasn't been set", 0, ((ConsumerInfo)msg.getDataStructure()).getPrefetchSize());
|
||||
} else {
|
||||
fail("Consumer hasn't been created");
|
||||
}
|
||||
|
||||
// Send the broker a message to that endpoint
|
||||
MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
|
||||
producer.send(session.createTextMessage("Hello!"));
|
||||
|
||||
connection.close();
|
||||
|
||||
// Wait for the message to be delivered.
|
||||
assertFalse(messageDelivered.await(5000, TimeUnit.MILLISECONDS));
|
||||
|
||||
// Shut the Endpoint down.
|
||||
adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
|
||||
adapter.stop();
|
||||
|
||||
assertNotNull("We got an error message", errorMessage.get());
|
||||
assertTrue("correct message", errorMessage.get().contains("zero"));
|
||||
|
||||
LogManager.getRootLogger().removeAppender(testAppender);
|
||||
brokerService.stop();
|
||||
}
|
||||
|
||||
public void testMessageExceptionReDelivery() throws Exception {
|
||||
|
||||
|
|
|
@ -88,6 +88,8 @@ public class ManagedConnectionFactoryTest extends TestCase {
|
|||
assertTrue(connection != null);
|
||||
assertTrue(connection instanceof ManagedConnectionProxy);
|
||||
|
||||
connection.close();
|
||||
|
||||
}
|
||||
|
||||
public void testConnectionFactoryConnectionMatching() throws ResourceException, JMSException {
|
||||
|
@ -123,6 +125,9 @@ public class ManagedConnectionFactoryTest extends TestCase {
|
|||
test = managedConnectionFactory.matchManagedConnections(set, null, ri2);
|
||||
assertTrue(connection2 == test);
|
||||
|
||||
for (ManagedConnection managedConnection : set) {
|
||||
managedConnection.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
public void testConnectionFactoryIsSerializableAndReferenceable() throws ResourceException, JMSException {
|
||||
|
|
Loading…
Reference in New Issue