mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-5938 - add remove(messageId) op to offline durable subs jmx view
This commit is contained in:
parent
2e4c907f2d
commit
11579bb918
|
@ -105,6 +105,10 @@ public class DurableSubscriptionView extends SubscriptionView implements Durable
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeMessage(@MBeanInfo("messageId") String messageId) throws Exception {
|
||||
throw new IllegalStateException("Subscription must be inactive");
|
||||
}
|
||||
|
||||
public boolean doesCursorHaveMessagesBuffered() {
|
||||
if (durableSub != null && durableSub.getPending() != null) {
|
||||
|
|
|
@ -27,6 +27,7 @@ public interface DurableSubscriptionViewMBean extends SubscriptionViewMBean {
|
|||
/**
|
||||
* @return name of the durable subscription name
|
||||
*/
|
||||
@MBeanInfo("The subscription name.")
|
||||
String getSubscriptionName();
|
||||
|
||||
/**
|
||||
|
@ -35,6 +36,7 @@ public interface DurableSubscriptionViewMBean extends SubscriptionViewMBean {
|
|||
* @return messages
|
||||
* @throws OpenDataException
|
||||
*/
|
||||
@MBeanInfo("Browse the composite data array of pending messages in this subscription")
|
||||
CompositeData[] browse() throws OpenDataException;
|
||||
|
||||
/**
|
||||
|
@ -43,44 +45,61 @@ public interface DurableSubscriptionViewMBean extends SubscriptionViewMBean {
|
|||
* @return messages
|
||||
* @throws OpenDataException
|
||||
*/
|
||||
@MBeanInfo("Browse the tabular data of pending messages in this subscription")
|
||||
TabularData browseAsTable() throws OpenDataException;
|
||||
|
||||
/**
|
||||
* Destroys the durable subscription so that messages will no longer be
|
||||
* stored for this subscription
|
||||
*/
|
||||
@MBeanInfo("Destroy or delete this subscription")
|
||||
void destroy() throws Exception;
|
||||
|
||||
/**
|
||||
* @return true if the message cursor has memory space available
|
||||
* to page in more messages
|
||||
*/
|
||||
@MBeanInfo("The subscription has space for more messages in memory")
|
||||
public boolean doesCursorHaveSpace();
|
||||
|
||||
/**
|
||||
* @return true if the cursor has reached its memory limit for
|
||||
* paged in messages
|
||||
*/
|
||||
@MBeanInfo("The subscription cursor is full")
|
||||
public boolean isCursorFull();
|
||||
|
||||
/**
|
||||
* @return true if the cursor has messages buffered to deliver
|
||||
*/
|
||||
@MBeanInfo("The subscription cursor has messages in memory")
|
||||
public boolean doesCursorHaveMessagesBuffered();
|
||||
|
||||
/**
|
||||
* @return the cursor memory usage in bytes
|
||||
*/
|
||||
@MBeanInfo("The subscription cursor memory usage bytes")
|
||||
public long getCursorMemoryUsage();
|
||||
|
||||
/**
|
||||
* @return the cursor memory usage as a percentage
|
||||
*/
|
||||
@MBeanInfo("The subscription cursor memory usage %")
|
||||
public int getCursorPercentUsage();
|
||||
|
||||
/**
|
||||
* @return the number of messages available to be paged in
|
||||
* by the cursor
|
||||
*/
|
||||
@MBeanInfo("The subscription cursor size or message count")
|
||||
public int cursorSize();
|
||||
|
||||
/**
|
||||
* Removes a message from the durable subscription.
|
||||
*
|
||||
* @param messageId
|
||||
* @throws Exception
|
||||
*/
|
||||
@MBeanInfo("Remove a message from the subscription by JMS message ID.")
|
||||
public void removeMessage(@MBeanInfo("messageId") String messageId) throws Exception;
|
||||
}
|
||||
|
|
|
@ -157,4 +157,10 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp
|
|||
public String getSelector() {
|
||||
return subscriptionInfo.getSelector();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeMessage(@MBeanInfo("messageId") String messageId) throws Exception {
|
||||
broker.remove(this, messageId);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,10 +17,8 @@
|
|||
package org.apache.activemq.broker.jmx;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
@ -29,6 +27,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
import javax.jms.IllegalStateException;
|
||||
import javax.management.InstanceNotFoundException;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.management.ObjectName;
|
||||
|
@ -47,8 +46,10 @@ import org.apache.activemq.broker.ProducerBrokerExchange;
|
|||
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.DestinationFactory;
|
||||
import org.apache.activemq.broker.region.DestinationFactoryImpl;
|
||||
import org.apache.activemq.broker.region.DestinationInterceptor;
|
||||
import org.apache.activemq.broker.region.DurableTopicSubscription;
|
||||
import org.apache.activemq.broker.region.MessageReference;
|
||||
import org.apache.activemq.broker.region.NullMessageReference;
|
||||
import org.apache.activemq.broker.region.Queue;
|
||||
import org.apache.activemq.broker.region.Region;
|
||||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
|
@ -64,12 +65,10 @@ import org.apache.activemq.command.ActiveMQTopic;
|
|||
import org.apache.activemq.command.ConnectionInfo;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.command.MessageAck;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.command.ProducerInfo;
|
||||
import org.apache.activemq.command.SubscriptionInfo;
|
||||
import org.apache.activemq.store.MessageRecoveryListener;
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.store.TopicMessageStore;
|
||||
import org.apache.activemq.thread.Scheduler;
|
||||
import org.apache.activemq.thread.TaskRunnerFactory;
|
||||
import org.apache.activemq.transaction.XATransaction;
|
||||
|
@ -539,11 +538,11 @@ public class ManagedRegionBroker extends RegionBroker {
|
|||
}
|
||||
|
||||
public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
|
||||
List<Message> messages = getSubscriberMessages(view);
|
||||
CompositeData c[] = new CompositeData[messages.size()];
|
||||
Message[] messages = getSubscriberMessages(view);
|
||||
CompositeData c[] = new CompositeData[messages.length];
|
||||
for (int i = 0; i < c.length; i++) {
|
||||
try {
|
||||
c[i] = OpenTypeSupport.convert(messages.get(i));
|
||||
c[i] = OpenTypeSupport.convert(messages[i]);
|
||||
} catch (Throwable e) {
|
||||
LOG.error("Failed to browse: {}", view, e);
|
||||
}
|
||||
|
@ -553,53 +552,59 @@ public class ManagedRegionBroker extends RegionBroker {
|
|||
|
||||
public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
|
||||
OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
|
||||
List<Message> messages = getSubscriberMessages(view);
|
||||
Message[] messages = getSubscriberMessages(view);
|
||||
CompositeType ct = factory.getCompositeType();
|
||||
TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
|
||||
TabularDataSupport rc = new TabularDataSupport(tt);
|
||||
for (int i = 0; i < messages.size(); i++) {
|
||||
rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i))));
|
||||
for (int i = 0; i < messages.length; i++) {
|
||||
rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
protected List<Message> getSubscriberMessages(SubscriptionView view) {
|
||||
// TODO It is very dangerous operation for big backlogs
|
||||
if (!(destinationFactory instanceof DestinationFactoryImpl)) {
|
||||
throw new RuntimeException("unsupported by " + destinationFactory);
|
||||
public void remove(SubscriptionView view, String messageId) throws Exception {
|
||||
ActiveMQDestination destination = getTopicDestination(view);
|
||||
if (destination != null) {
|
||||
final Topic topic = (Topic) getTopicRegion().getDestinationMap().get(destination);
|
||||
final MessageAck messageAck = new MessageAck();
|
||||
messageAck.setMessageID(new MessageId(messageId));
|
||||
messageAck.setDestination(destination);
|
||||
|
||||
topic.getMessageStore().removeMessage(brokerService.getAdminConnectionContext(), messageAck);
|
||||
|
||||
// if sub is active, remove from cursor
|
||||
if (view.subscription instanceof DurableTopicSubscription) {
|
||||
final DurableTopicSubscription durableTopicSubscription = (DurableTopicSubscription) view.subscription;
|
||||
final MessageReference messageReference = new NullMessageReference();
|
||||
messageReference.getMessage().setMessageId(messageAck.getFirstMessageId());
|
||||
durableTopicSubscription.getPending().remove(messageReference);
|
||||
}
|
||||
|
||||
} else {
|
||||
throw new IllegalStateException("can't determine topic for sub:" + view);
|
||||
}
|
||||
PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
|
||||
final List<Message> result = new ArrayList<Message>();
|
||||
try {
|
||||
ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
|
||||
TopicMessageStore store = adapter.createTopicMessageStore(topic);
|
||||
store.recover(new MessageRecoveryListener() {
|
||||
@Override
|
||||
public boolean recoverMessage(Message message) throws Exception {
|
||||
result.add(message);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
|
||||
throw new RuntimeException("Should not be called.");
|
||||
}
|
||||
protected Message[] getSubscriberMessages(SubscriptionView view) {
|
||||
ActiveMQDestination destination = getTopicDestination(view);
|
||||
if (destination != null) {
|
||||
Topic topic = (Topic) getTopicRegion().getDestinationMap().get(destination);
|
||||
return topic.browse();
|
||||
|
||||
@Override
|
||||
public boolean hasSpace() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDuplicate(MessageId id) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
} catch (Throwable e) {
|
||||
LOG.error("Failed to browse messages for Subscription {}", view, e);
|
||||
} else {
|
||||
LOG.warn("can't determine topic to browse for sub:" + view);
|
||||
return new Message[]{};
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private ActiveMQDestination getTopicDestination(SubscriptionView view) {
|
||||
ActiveMQDestination destination = null;
|
||||
if (view.subscription instanceof DurableTopicSubscription) {
|
||||
destination = new ActiveMQTopic(view.getDestinationName());
|
||||
} else if (view instanceof InactiveDurableSubscriptionView) {
|
||||
destination = ((InactiveDurableSubscriptionView)view).subscriptionInfo.getDestination();
|
||||
}
|
||||
return destination;
|
||||
}
|
||||
|
||||
protected ObjectName[] getTopics() {
|
||||
|
|
|
@ -58,7 +58,7 @@ public interface SubscriptionViewMBean {
|
|||
/**
|
||||
* @return the destination name
|
||||
*/
|
||||
@MBeanInfo("The name of the destionation the subscription is on.")
|
||||
@MBeanInfo("The name of the destination the subscription is on.")
|
||||
String getDestinationName();
|
||||
|
||||
/**
|
||||
|
@ -158,13 +158,13 @@ public interface SubscriptionViewMBean {
|
|||
/**
|
||||
* @return whether or not the subscriber is durable (persistent)
|
||||
*/
|
||||
@MBeanInfo("The subsription is persistent.")
|
||||
@MBeanInfo("The subscription is persistent.")
|
||||
boolean isDurable();
|
||||
|
||||
/**
|
||||
* @return whether or not the subscriber ignores local messages
|
||||
*/
|
||||
@MBeanInfo("The subsription ignores local messages.")
|
||||
@MBeanInfo("The subscription ignores local messages.")
|
||||
boolean isNoLocal();
|
||||
|
||||
/**
|
||||
|
|
|
@ -22,9 +22,9 @@ import org.apache.activemq.command.Message;
|
|||
import org.apache.activemq.command.MessageId;
|
||||
|
||||
/**
|
||||
* Only used by the {@link QueueMessageReference#NULL_MESSAGE}
|
||||
* Used by the {@link QueueMessageReference#NULL_MESSAGE}
|
||||
*/
|
||||
final class NullMessageReference implements QueueMessageReference {
|
||||
public final class NullMessageReference implements QueueMessageReference {
|
||||
|
||||
private final ActiveMQMessage message = new ActiveMQMessage();
|
||||
private volatile int references;
|
||||
|
|
|
@ -0,0 +1,151 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.usecases;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.TestSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class DurableSubscriptionOfflineBrowseRemoveTest extends DurableSubscriptionOfflineTestBase {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineBrowseRemoveTest.class);
|
||||
|
||||
public boolean keepDurableSubsActive;
|
||||
|
||||
@Parameterized.Parameters(name = "PA-{0}.KeepSubsActive-{1}")
|
||||
public static Collection<Object[]> getTestParameters() {
|
||||
List<Object[]> testParameters = new ArrayList<Object[]>();
|
||||
testParameters.add(new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, Boolean.TRUE});
|
||||
testParameters.add(new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, Boolean.FALSE});
|
||||
|
||||
testParameters.add(new Object[]{TestSupport.PersistenceAdapterChoice.JDBC, Boolean.TRUE});
|
||||
testParameters.add(new Object[]{TestSupport.PersistenceAdapterChoice.JDBC, Boolean.FALSE});
|
||||
|
||||
// leveldb needs some work on finding index from green messageId
|
||||
return testParameters;
|
||||
}
|
||||
|
||||
public DurableSubscriptionOfflineBrowseRemoveTest(TestSupport.PersistenceAdapterChoice adapter, boolean keepDurableSubsActive) {
|
||||
this.defaultPersistenceAdapter = adapter;
|
||||
this.usePrioritySupport = true;
|
||||
this.keepDurableSubsActive = keepDurableSubsActive;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException {
|
||||
broker.setKeepDurableSubsActive(keepDurableSubsActive);
|
||||
return super.setPersistenceAdapter(broker, defaultPersistenceAdapter);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
|
||||
connectionFactory.setWatchTopicAdvisories(false);
|
||||
return connectionFactory;
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testBrowseRemoveBrowseOfflineSub() throws Exception {
|
||||
// create durable subscription
|
||||
Connection con = createConnection();
|
||||
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.createDurableSubscriber(topic, "SubsId");
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
// send messages
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(null);
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Message message = session.createMessage();
|
||||
message.setStringProperty("filter", "true");
|
||||
producer.send(topic, message);
|
||||
}
|
||||
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
// browse the durable sub
|
||||
ObjectName[] subs = broker.getAdminView().getInactiveDurableTopicSubscribers();
|
||||
assertEquals(1, subs.length);
|
||||
ObjectName subName = subs[0];
|
||||
DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
|
||||
broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
|
||||
CompositeData[] data = sub.browse();
|
||||
assertNotNull(data);
|
||||
assertEquals(10, data.length);
|
||||
|
||||
LinkedList<String> idToRemove = new LinkedList<>();
|
||||
idToRemove.add((String)data[5].get("JMSMessageID"));
|
||||
idToRemove.add((String)data[9].get("JMSMessageID"));
|
||||
idToRemove.add((String)data[0].get("JMSMessageID"));
|
||||
|
||||
LOG.info("Removing: " + idToRemove);
|
||||
for (String id: idToRemove) {
|
||||
sub.removeMessage(id);
|
||||
}
|
||||
|
||||
if (defaultPersistenceAdapter.compareTo(TestSupport.PersistenceAdapterChoice.JDBC) == 0) {
|
||||
for (int i=0; i<10; i++) {
|
||||
// each iteration does one priority
|
||||
((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
data = sub.browse();
|
||||
assertNotNull(data);
|
||||
assertEquals(7, data.length);
|
||||
|
||||
for (CompositeData c: data) {
|
||||
String id = (String)c.get("JMSMessageID");
|
||||
for (String removedId : idToRemove) {
|
||||
assertNotEquals(id, removedId);
|
||||
}
|
||||
}
|
||||
|
||||
// remove non existent
|
||||
LOG.info("Repeat remove: " + idToRemove.getFirst());
|
||||
sub.removeMessage(idToRemove.getFirst());
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue