From 38ab4b10a47614a07bb2e3be6e98ee4b23074e7e Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Mon, 11 Nov 2013 11:40:07 -0500 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4855 Deprecate the misspelled subcription named getters and use the correctly spelled versions where the older ones are in use. --- .../jmx/InactiveDurableSubscriptionView.java | 14 ++++- .../activemq/broker/jmx/SubscriptionView.java | 19 +++++++ .../broker/jmx/SubscriptionViewMBean.java | 15 ++++++ .../view/ConnectionDotFileInterceptor.java | 44 +++++++++------- .../console/command/store/StoreExporter.java | 52 +++++++++++++------ .../apache/activemq/leveldb/DBManager.scala | 16 +++--- .../activemq/xbean/XBeanConfigTest.java | 8 +-- 7 files changed, 121 insertions(+), 47 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java index 35c0b92ab4..3602a3acfe 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java @@ -51,21 +51,23 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp /** * @return the id of the Subscription */ - public long getSubcriptionId() { + @Override + public long getSubscriptionId() { return -1; } /** * @return the destination name */ + @Override public String getDestinationName() { return subscriptionInfo.getDestination().getPhysicalName(); - } /** * @return true if the destination is a Queue */ + @Override public boolean isDestinationQueue() { return false; } @@ -73,6 +75,7 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp /** * @return true of the destination is a Topic */ + @Override public boolean isDestinationTopic() { return true; } @@ -80,6 +83,7 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp /** * @return true if the destination is temporary */ + @Override public boolean isDestinationTemporary() { return false; } @@ -87,6 +91,7 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp /** * @return name of the durable consumer */ + @Override public String getSubscriptionName() { return subscriptionInfo.getSubscriptionName(); } @@ -94,6 +99,7 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp /** * @return true if the subscriber is active */ + @Override public boolean isActive() { return false; } @@ -110,6 +116,7 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp * @return messages * @throws OpenDataException */ + @Override public CompositeData[] browse() throws OpenDataException { return broker.browse(this); } @@ -120,6 +127,7 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp * @return messages * @throws OpenDataException */ + @Override public TabularData browseAsTable() throws OpenDataException { return broker.browseAsTable(this); } @@ -128,6 +136,7 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp * Destroys the durable subscription so that messages will no longer be * stored for this subscription */ + @Override public void destroy() throws Exception { RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); info.setClientId(clientId); @@ -138,6 +147,7 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp brokerService.getBroker().removeSubscription(context, info); } + @Override public String toString() { return "InactiveDurableSubscriptionView: " + getClientId() + ":" + getSubscriptionName(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java index 5441b1975a..82017376ab 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java @@ -123,8 +123,17 @@ public class SubscriptionView implements SubscriptionViewMBean { /** * @return the id of the Subscription */ + @Deprecated @Override public long getSubcriptionId() { + return getSubscriptionId(); + } + + /** + * @return the id of the Subscription + */ + @Override + public long getSubscriptionId() { ConsumerInfo info = getConsumerInfo(); if (info != null) { return info.getConsumerId().getValue(); @@ -289,8 +298,18 @@ public class SubscriptionView implements SubscriptionViewMBean { * @return the name of the consumer which is only used for durable * consumers. */ + @Deprecated @Override public String getSubcriptionName() { + return getSubscriptionName(); + } + + /** + * @return the name of the consumer which is only used for durable + * consumers. + */ + @Override + public String getSubscriptionName() { ConsumerInfo info = getConsumerInfo(); return info != null ? info.getSubscriptionName() : null; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java index abeff4f587..9bbedc1e2e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java @@ -45,9 +45,16 @@ public interface SubscriptionViewMBean { /** * @return the id of the Subscription */ + @Deprecated @MBeanInfo("ID of the Subscription.") long getSubcriptionId(); + /** + * @return the id of the Subscription + */ + @MBeanInfo("ID of the Subscription.") + long getSubscriptionId(); + /** * @return the destination name */ @@ -180,9 +187,17 @@ public interface SubscriptionViewMBean { * @return the name of the consumer which is only used for durable * consumers. */ + @Deprecated @MBeanInfo("The name of the subscription (durable subscriptions only).") String getSubcriptionName(); + /** + * @return the name of the consumer which is only used for durable + * consumers. + */ + @MBeanInfo("The name of the subscription (durable subscriptions only).") + String getSubscriptionName(); + /** * Returns true if this subscription (which may be using wildcards) matches the given queue name * diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java index 75f1c1e36c..cacbee305b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java @@ -16,6 +16,17 @@ */ package org.apache.activemq.broker.view; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import javax.management.ObjectName; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; @@ -28,18 +39,9 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.filter.DestinationMapNode; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import javax.management.ObjectName; /** - * + * */ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport { @@ -47,26 +49,28 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport { private final boolean redrawOnRemove; private boolean clearProducerCacheAfterRender; - private String domain = "org.apache.activemq"; + private final String domain = "org.apache.activemq"; private BrokerViewMBean brokerView; // until we have some MBeans for producers, lets do it all ourselves - private Map producers = new HashMap(); - private Map> producerDestinations = new HashMap>(); - private Object lock = new Object(); + private final Map producers = new HashMap(); + private final Map> producerDestinations = new HashMap>(); + private final Object lock = new Object(); public ConnectionDotFileInterceptor(Broker next, String file, boolean redrawOnRemove) throws IOException { super(next, file); this.redrawOnRemove = redrawOnRemove; - + } + @Override public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { Subscription answer = super.addConsumer(context, info); generateFile(); return answer; } + @Override public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { super.addProducer(context, info); ProducerId producerId = info.getProducerId(); @@ -76,6 +80,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport { generateFile(); } + @Override public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { super.removeConsumer(context, info); if (redrawOnRemove) { @@ -83,6 +88,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport { } } + @Override public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { super.removeProducer(context, info); ProducerId producerId = info.getProducerId(); @@ -95,6 +101,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport { } } + @Override public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { super.send(producerExchange, messageSend); ProducerId producerId = messageSend.getProducerId(); @@ -109,6 +116,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport { } } + @Override protected void generateFile(PrintWriter writer) throws Exception { writer.println("digraph \"ActiveMQ Connections\" {"); @@ -213,7 +221,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport { String selector = subscriber.getSelector(); // lets write out the links - String subscriberId = safeClientId + "_" + subscriber.getSessionId() + "_" + subscriber.getSubcriptionId(); + String subscriberId = safeClientId + "_" + subscriber.getSessionId() + "_" + subscriber.getSubscriptionId(); writer.print(subscriberId); writer.print(" -> "); @@ -228,7 +236,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport { // now lets write out the label writer.print(subscriberId); writer.print(" [label = \""); - String label = "Subscription: " + subscriber.getSessionId() + "-" + subscriber.getSubcriptionId(); + String label = "Subscription: " + subscriber.getSessionId() + "-" + subscriber.getSubscriptionId(); if (selector != null && selector.length() > 0) { label = label + "\\nSelector: " + selector; } @@ -322,7 +330,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport { } return path; } - + BrokerViewMBean getBrokerView() throws Exception { if (this.brokerView == null) { ObjectName brokerName = getBrokerService().getBrokerObjectName(); diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java b/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java index c6bc8bd6ff..e41a377c90 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java @@ -16,19 +16,6 @@ */ package org.apache.activemq.console.command.store; -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.*; -import org.apache.activemq.console.command.store.proto.MessagePB; -import org.apache.activemq.console.command.store.proto.QueueEntryPB; -import org.apache.activemq.console.command.store.proto.QueuePB; -import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.activemq.store.*; -import org.codehaus.jackson.map.ObjectMapper; -import org.fusesource.hawtbuf.AsciiBuffer; -import org.fusesource.hawtbuf.DataByteArrayOutputStream; -import org.fusesource.hawtbuf.UTF8Buffer; - import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; @@ -37,6 +24,30 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.SubscriptionInfo; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.console.command.store.proto.MessagePB; +import org.apache.activemq.console.command.store.proto.QueueEntryPB; +import org.apache.activemq.console.command.store.proto.QueuePB; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.TransactionRecoveryListener; +import org.codehaus.jackson.map.ObjectMapper; +import org.fusesource.hawtbuf.AsciiBuffer; +import org.fusesource.hawtbuf.DataByteArrayOutputStream; +import org.fusesource.hawtbuf.UTF8Buffer; + /** * @author Hiram Chirino */ @@ -48,7 +59,7 @@ public class StoreExporter { URI config; File file; - private ObjectMapper mapper = new ObjectMapper(); + private final ObjectMapper mapper = new ObjectMapper(); private final AsciiBuffer ds_kind = new AsciiBuffer("ds"); private final AsciiBuffer ptp_kind = new AsciiBuffer("ptp"); private final AsciiBuffer codec_id = new AsciiBuffer("openwire"); @@ -97,6 +108,7 @@ public class StoreExporter { final int[] preparedTxs = new int[]{0}; store.createTransactionStore().recover(new TransactionRecoveryListener() { + @Override public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) { preparedTxs[0] += 1; } @@ -127,18 +139,22 @@ public class StoreExporter { manager.store_queue(destRecord); queue.recover(new MessageRecoveryListener() { + @Override public boolean hasSpace() { return true; } + @Override public boolean recoverMessageReference(MessageId ref) throws Exception { return true; } + @Override public boolean isDuplicate(MessageId ref) { return false; } + @Override public boolean recoverMessage(Message message) throws IOException { messageKeyCounter[0]++; seqKeyCounter[0]++; @@ -166,7 +182,7 @@ public class StoreExporter { // TODO: use a real JSON encoder like jackson. HashMap jsonMap = new HashMap(); jsonMap.put("@class", "dsub_destination"); - jsonMap.put("name", sub.getClientId() + ":" + sub.getSubcriptionName()); + jsonMap.put("name", sub.getClientId() + ":" + sub.getSubscriptionName()); HashMap jsonTopic = new HashMap(); jsonTopic.put("name", dest.getTopicName()); jsonMap.put("topics", new Object[]{jsonTopic}); @@ -180,19 +196,23 @@ public class StoreExporter { manager.store_queue(destRecord); final long seqKeyCounter[] = new long[]{0}; - topic.recoverSubscription(sub.getClientId(), sub.getSubcriptionName(), new MessageRecoveryListener() { + topic.recoverSubscription(sub.getClientId(), sub.getSubscriptionName(), new MessageRecoveryListener() { + @Override public boolean hasSpace() { return true; } + @Override public boolean recoverMessageReference(MessageId ref) throws Exception { return true; } + @Override public boolean isDuplicate(MessageId ref) { return false; } + @Override public boolean recoverMessage(Message message) throws IOException { messageKeyCounter[0]++; seqKeyCounter[0]++; diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index adee8fb58d..e46737931e 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -195,7 +195,7 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained { def size = (if(messageRecord!=null) messageRecord.data.length+20 else 0) + ((enqueues.size+dequeues.size)*50) + xaAcks.foldLeft(0L){ case (sum, entry) => sum + 100 } - + def addToPendingStore() = { var set = manager.pendingStores.get(id) if(set==null) { @@ -214,7 +214,7 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained { } } } - + } def completeAsap() = this.synchronized { disableDelay=true } @@ -443,7 +443,7 @@ class DBManager(val parent:LevelDBStore) { var uowStoringCounter = 0L var uowStoredCounter = 0L - val uow_complete_latency = TimeMetric() + val uow_complete_latency = TimeMetric() // val closeSource = createSource(new ListEventAggregator[DelayableUOW](), dispatchQueue) // closeSource.setEventHandler(^{ @@ -454,7 +454,7 @@ class DBManager(val parent:LevelDBStore) { // closeSource.resume var pendingStores = new ConcurrentHashMap[MessageId, HashSet[DelayableUOW#MessageAction]]() - + var cancelable_enqueue_actions = new HashMap[QueueEntryKey, DelayableUOW#MessageAction]() val lastUowId = new AtomicInteger(1) @@ -479,7 +479,7 @@ class DBManager(val parent:LevelDBStore) { // The UoW may have been canceled. if( action.messageRecord!=null && action.enqueues.isEmpty ) { - action.removeFromPendingStore() + action.removeFromPendingStore() action.messageRecord = null uow.delayableActions -= 1 } @@ -701,7 +701,7 @@ class DBManager(val parent:LevelDBStore) { def collectionIsEmpty(key:Long) = { client.collectionIsEmpty(key) } - + def cursorMessages(preparedAcks:java.util.HashSet[MessageId], key:Long, listener:MessageRecoveryListener, startPos:Long, max:Long=Long.MaxValue) = { var lastmsgid:MessageId = null var count = 0L @@ -751,7 +751,7 @@ class DBManager(val parent:LevelDBStore) { val record = new SubscriptionRecord.Bean record.setTopicKey(topic_key) record.setClientId(info.getClientId) - record.setSubscriptionName(info.getSubcriptionName) + record.setSubscriptionName(info.getSubscriptionName) if( info.getSelector!=null ) { record.setSelector(info.getSelector) } @@ -821,7 +821,7 @@ class DBManager(val parent:LevelDBStore) { val sr = SubscriptionRecord.FACTORY.parseUnframed(record.getMeta) val info = new SubscriptionInfo info.setClientId(sr.getClientId) - info.setSubcriptionName(sr.getSubscriptionName) + info.setSubscriptionName(sr.getSubscriptionName) if( sr.hasSelector ) { info.setSelector(sr.getSelector) } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/XBeanConfigTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/XBeanConfigTest.java index e1e66abfbe..5ac1bb4080 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/XBeanConfigTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/XBeanConfigTest.java @@ -17,7 +17,9 @@ package org.apache.activemq.xbean; import java.net.URI; + import junit.framework.TestCase; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; @@ -36,7 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * + * */ public class XBeanConfigTest extends TestCase { @@ -71,8 +73,8 @@ public class XBeanConfigTest extends TestCase { subscriptionRecoveryPolicy = topic.getSubscriptionRecoveryPolicy(); assertTrue("subscriptionRecoveryPolicy should be TimedSubscriptionRecoveryPolicy: " + subscriptionRecoveryPolicy, subscriptionRecoveryPolicy instanceof TimedSubscriptionRecoveryPolicy); - TimedSubscriptionRecoveryPolicy timedSubcriptionPolicy = (TimedSubscriptionRecoveryPolicy)subscriptionRecoveryPolicy; - assertEquals("getRecoverDuration()", 60000, timedSubcriptionPolicy.getRecoverDuration()); + TimedSubscriptionRecoveryPolicy timedSubscriptionPolicy = (TimedSubscriptionRecoveryPolicy)subscriptionRecoveryPolicy; + assertEquals("getRecoverDuration()", 60000, timedSubscriptionPolicy.getRecoverDuration()); LOG.info("destination: " + topic); LOG.info("dispatchPolicy: " + dispatchPolicy);