Deprecate the misspelled subcription named getters and use the correctly
spelled versions where the older ones are in use.
This commit is contained in:
Timothy Bish 2013-11-11 11:40:07 -05:00
parent 8f078a3f4c
commit 38ab4b10a4
7 changed files with 121 additions and 47 deletions

View File

@ -51,21 +51,23 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp
/**
* @return the id of the Subscription
*/
public long getSubcriptionId() {
@Override
public long getSubscriptionId() {
return -1;
}
/**
* @return the destination name
*/
@Override
public String getDestinationName() {
return subscriptionInfo.getDestination().getPhysicalName();
}
/**
* @return true if the destination is a Queue
*/
@Override
public boolean isDestinationQueue() {
return false;
}
@ -73,6 +75,7 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp
/**
* @return true of the destination is a Topic
*/
@Override
public boolean isDestinationTopic() {
return true;
}
@ -80,6 +83,7 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp
/**
* @return true if the destination is temporary
*/
@Override
public boolean isDestinationTemporary() {
return false;
}
@ -87,6 +91,7 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp
/**
* @return name of the durable consumer
*/
@Override
public String getSubscriptionName() {
return subscriptionInfo.getSubscriptionName();
}
@ -94,6 +99,7 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp
/**
* @return true if the subscriber is active
*/
@Override
public boolean isActive() {
return false;
}
@ -110,6 +116,7 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp
* @return messages
* @throws OpenDataException
*/
@Override
public CompositeData[] browse() throws OpenDataException {
return broker.browse(this);
}
@ -120,6 +127,7 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp
* @return messages
* @throws OpenDataException
*/
@Override
public TabularData browseAsTable() throws OpenDataException {
return broker.browseAsTable(this);
}
@ -128,6 +136,7 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp
* Destroys the durable subscription so that messages will no longer be
* stored for this subscription
*/
@Override
public void destroy() throws Exception {
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
info.setClientId(clientId);
@ -138,6 +147,7 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp
brokerService.getBroker().removeSubscription(context, info);
}
@Override
public String toString() {
return "InactiveDurableSubscriptionView: " + getClientId() + ":" + getSubscriptionName();
}

View File

@ -123,8 +123,17 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return the id of the Subscription
*/
@Deprecated
@Override
public long getSubcriptionId() {
return getSubscriptionId();
}
/**
* @return the id of the Subscription
*/
@Override
public long getSubscriptionId() {
ConsumerInfo info = getConsumerInfo();
if (info != null) {
return info.getConsumerId().getValue();
@ -289,8 +298,18 @@ public class SubscriptionView implements SubscriptionViewMBean {
* @return the name of the consumer which is only used for durable
* consumers.
*/
@Deprecated
@Override
public String getSubcriptionName() {
return getSubscriptionName();
}
/**
* @return the name of the consumer which is only used for durable
* consumers.
*/
@Override
public String getSubscriptionName() {
ConsumerInfo info = getConsumerInfo();
return info != null ? info.getSubscriptionName() : null;
}

View File

@ -45,9 +45,16 @@ public interface SubscriptionViewMBean {
/**
* @return the id of the Subscription
*/
@Deprecated
@MBeanInfo("ID of the Subscription.")
long getSubcriptionId();
/**
* @return the id of the Subscription
*/
@MBeanInfo("ID of the Subscription.")
long getSubscriptionId();
/**
* @return the destination name
*/
@ -180,9 +187,17 @@ public interface SubscriptionViewMBean {
* @return the name of the consumer which is only used for durable
* consumers.
*/
@Deprecated
@MBeanInfo("The name of the subscription (durable subscriptions only).")
String getSubcriptionName();
/**
* @return the name of the consumer which is only used for durable
* consumers.
*/
@MBeanInfo("The name of the subscription (durable subscriptions only).")
String getSubscriptionName();
/**
* Returns true if this subscription (which may be using wildcards) matches the given queue name
*

View File

@ -16,6 +16,17 @@
*/
package org.apache.activemq.broker.view;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.management.ObjectName;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@ -28,18 +39,9 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.filter.DestinationMapNode;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.management.ObjectName;
/**
*
*
*/
public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
@ -47,26 +49,28 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
private final boolean redrawOnRemove;
private boolean clearProducerCacheAfterRender;
private String domain = "org.apache.activemq";
private final String domain = "org.apache.activemq";
private BrokerViewMBean brokerView;
// until we have some MBeans for producers, lets do it all ourselves
private Map<ProducerId, ProducerInfo> producers = new HashMap<ProducerId, ProducerInfo>();
private Map<ProducerId, Set<ActiveMQDestination>> producerDestinations = new HashMap<ProducerId, Set<ActiveMQDestination>>();
private Object lock = new Object();
private final Map<ProducerId, ProducerInfo> producers = new HashMap<ProducerId, ProducerInfo>();
private final Map<ProducerId, Set<ActiveMQDestination>> producerDestinations = new HashMap<ProducerId, Set<ActiveMQDestination>>();
private final Object lock = new Object();
public ConnectionDotFileInterceptor(Broker next, String file, boolean redrawOnRemove) throws IOException {
super(next, file);
this.redrawOnRemove = redrawOnRemove;
}
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
Subscription answer = super.addConsumer(context, info);
generateFile();
return answer;
}
@Override
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
super.addProducer(context, info);
ProducerId producerId = info.getProducerId();
@ -76,6 +80,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
generateFile();
}
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
super.removeConsumer(context, info);
if (redrawOnRemove) {
@ -83,6 +88,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
}
}
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
super.removeProducer(context, info);
ProducerId producerId = info.getProducerId();
@ -95,6 +101,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
}
}
@Override
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
super.send(producerExchange, messageSend);
ProducerId producerId = messageSend.getProducerId();
@ -109,6 +116,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
}
}
@Override
protected void generateFile(PrintWriter writer) throws Exception {
writer.println("digraph \"ActiveMQ Connections\" {");
@ -213,7 +221,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
String selector = subscriber.getSelector();
// lets write out the links
String subscriberId = safeClientId + "_" + subscriber.getSessionId() + "_" + subscriber.getSubcriptionId();
String subscriberId = safeClientId + "_" + subscriber.getSessionId() + "_" + subscriber.getSubscriptionId();
writer.print(subscriberId);
writer.print(" -> ");
@ -228,7 +236,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
// now lets write out the label
writer.print(subscriberId);
writer.print(" [label = \"");
String label = "Subscription: " + subscriber.getSessionId() + "-" + subscriber.getSubcriptionId();
String label = "Subscription: " + subscriber.getSessionId() + "-" + subscriber.getSubscriptionId();
if (selector != null && selector.length() > 0) {
label = label + "\\nSelector: " + selector;
}
@ -322,7 +330,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
}
return path;
}
BrokerViewMBean getBrokerView() throws Exception {
if (this.brokerView == null) {
ObjectName brokerName = getBrokerService().getBrokerObjectName();

View File

@ -16,19 +16,6 @@
*/
package org.apache.activemq.console.command.store;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.*;
import org.apache.activemq.console.command.store.proto.MessagePB;
import org.apache.activemq.console.command.store.proto.QueueEntryPB;
import org.apache.activemq.console.command.store.proto.QueuePB;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.*;
import org.codehaus.jackson.map.ObjectMapper;
import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtbuf.DataByteArrayOutputStream;
import org.fusesource.hawtbuf.UTF8Buffer;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
@ -37,6 +24,30 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.console.command.store.proto.MessagePB;
import org.apache.activemq.console.command.store.proto.QueueEntryPB;
import org.apache.activemq.console.command.store.proto.QueuePB;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.codehaus.jackson.map.ObjectMapper;
import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtbuf.DataByteArrayOutputStream;
import org.fusesource.hawtbuf.UTF8Buffer;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
@ -48,7 +59,7 @@ public class StoreExporter {
URI config;
File file;
private ObjectMapper mapper = new ObjectMapper();
private final ObjectMapper mapper = new ObjectMapper();
private final AsciiBuffer ds_kind = new AsciiBuffer("ds");
private final AsciiBuffer ptp_kind = new AsciiBuffer("ptp");
private final AsciiBuffer codec_id = new AsciiBuffer("openwire");
@ -97,6 +108,7 @@ public class StoreExporter {
final int[] preparedTxs = new int[]{0};
store.createTransactionStore().recover(new TransactionRecoveryListener() {
@Override
public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
preparedTxs[0] += 1;
}
@ -127,18 +139,22 @@ public class StoreExporter {
manager.store_queue(destRecord);
queue.recover(new MessageRecoveryListener() {
@Override
public boolean hasSpace() {
return true;
}
@Override
public boolean recoverMessageReference(MessageId ref) throws Exception {
return true;
}
@Override
public boolean isDuplicate(MessageId ref) {
return false;
}
@Override
public boolean recoverMessage(Message message) throws IOException {
messageKeyCounter[0]++;
seqKeyCounter[0]++;
@ -166,7 +182,7 @@ public class StoreExporter {
// TODO: use a real JSON encoder like jackson.
HashMap<String, Object> jsonMap = new HashMap<String, Object>();
jsonMap.put("@class", "dsub_destination");
jsonMap.put("name", sub.getClientId() + ":" + sub.getSubcriptionName());
jsonMap.put("name", sub.getClientId() + ":" + sub.getSubscriptionName());
HashMap<String, Object> jsonTopic = new HashMap<String, Object>();
jsonTopic.put("name", dest.getTopicName());
jsonMap.put("topics", new Object[]{jsonTopic});
@ -180,19 +196,23 @@ public class StoreExporter {
manager.store_queue(destRecord);
final long seqKeyCounter[] = new long[]{0};
topic.recoverSubscription(sub.getClientId(), sub.getSubcriptionName(), new MessageRecoveryListener() {
topic.recoverSubscription(sub.getClientId(), sub.getSubscriptionName(), new MessageRecoveryListener() {
@Override
public boolean hasSpace() {
return true;
}
@Override
public boolean recoverMessageReference(MessageId ref) throws Exception {
return true;
}
@Override
public boolean isDuplicate(MessageId ref) {
return false;
}
@Override
public boolean recoverMessage(Message message) throws IOException {
messageKeyCounter[0]++;
seqKeyCounter[0]++;

View File

@ -195,7 +195,7 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
def size = (if(messageRecord!=null) messageRecord.data.length+20 else 0) + ((enqueues.size+dequeues.size)*50) + xaAcks.foldLeft(0L){ case (sum, entry) =>
sum + 100
}
def addToPendingStore() = {
var set = manager.pendingStores.get(id)
if(set==null) {
@ -214,7 +214,7 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
}
}
}
}
def completeAsap() = this.synchronized { disableDelay=true }
@ -443,7 +443,7 @@ class DBManager(val parent:LevelDBStore) {
var uowStoringCounter = 0L
var uowStoredCounter = 0L
val uow_complete_latency = TimeMetric()
val uow_complete_latency = TimeMetric()
// val closeSource = createSource(new ListEventAggregator[DelayableUOW](), dispatchQueue)
// closeSource.setEventHandler(^{
@ -454,7 +454,7 @@ class DBManager(val parent:LevelDBStore) {
// closeSource.resume
var pendingStores = new ConcurrentHashMap[MessageId, HashSet[DelayableUOW#MessageAction]]()
var cancelable_enqueue_actions = new HashMap[QueueEntryKey, DelayableUOW#MessageAction]()
val lastUowId = new AtomicInteger(1)
@ -479,7 +479,7 @@ class DBManager(val parent:LevelDBStore) {
// The UoW may have been canceled.
if( action.messageRecord!=null && action.enqueues.isEmpty ) {
action.removeFromPendingStore()
action.removeFromPendingStore()
action.messageRecord = null
uow.delayableActions -= 1
}
@ -701,7 +701,7 @@ class DBManager(val parent:LevelDBStore) {
def collectionIsEmpty(key:Long) = {
client.collectionIsEmpty(key)
}
def cursorMessages(preparedAcks:java.util.HashSet[MessageId], key:Long, listener:MessageRecoveryListener, startPos:Long, max:Long=Long.MaxValue) = {
var lastmsgid:MessageId = null
var count = 0L
@ -751,7 +751,7 @@ class DBManager(val parent:LevelDBStore) {
val record = new SubscriptionRecord.Bean
record.setTopicKey(topic_key)
record.setClientId(info.getClientId)
record.setSubscriptionName(info.getSubcriptionName)
record.setSubscriptionName(info.getSubscriptionName)
if( info.getSelector!=null ) {
record.setSelector(info.getSelector)
}
@ -821,7 +821,7 @@ class DBManager(val parent:LevelDBStore) {
val sr = SubscriptionRecord.FACTORY.parseUnframed(record.getMeta)
val info = new SubscriptionInfo
info.setClientId(sr.getClientId)
info.setSubcriptionName(sr.getSubscriptionName)
info.setSubscriptionName(sr.getSubscriptionName)
if( sr.hasSelector ) {
info.setSelector(sr.getSelector)
}

View File

@ -17,7 +17,9 @@
package org.apache.activemq.xbean;
import java.net.URI;
import junit.framework.TestCase;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
@ -36,7 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*
*/
public class XBeanConfigTest extends TestCase {
@ -71,8 +73,8 @@ public class XBeanConfigTest extends TestCase {
subscriptionRecoveryPolicy = topic.getSubscriptionRecoveryPolicy();
assertTrue("subscriptionRecoveryPolicy should be TimedSubscriptionRecoveryPolicy: " + subscriptionRecoveryPolicy,
subscriptionRecoveryPolicy instanceof TimedSubscriptionRecoveryPolicy);
TimedSubscriptionRecoveryPolicy timedSubcriptionPolicy = (TimedSubscriptionRecoveryPolicy)subscriptionRecoveryPolicy;
assertEquals("getRecoverDuration()", 60000, timedSubcriptionPolicy.getRecoverDuration());
TimedSubscriptionRecoveryPolicy timedSubscriptionPolicy = (TimedSubscriptionRecoveryPolicy)subscriptionRecoveryPolicy;
assertEquals("getRecoverDuration()", 60000, timedSubscriptionPolicy.getRecoverDuration());
LOG.info("destination: " + topic);
LOG.info("dispatchPolicy: " + dispatchPolicy);