diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index 68d54140ee..7ffaca0969 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -591,7 +591,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge())); props.setProperty("statsEnabled",Boolean.toString(isStatsEnabled())); props.setProperty("alwaysSyncSend",Boolean.toString(isAlwaysSyncSend())); - props.setProperty("producerWindowSize", Integer.toString(producerWindowSize)); + props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize())); } public boolean isUseCompression() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index ca2e0b0fd8..44c049dfd4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -136,7 +136,6 @@ public class BrokerService implements Service { private transient Thread shutdownHook; private String[] transportConnectorURIs; private String[] networkConnectorURIs; - private String[] proxyConnectorURIs; private JmsConnector[] jmsBridgeConnectors; //these are Jms to Jms bridges to other jms messaging systems private boolean deleteAllMessagesOnStartup; private boolean advisorySupport = true; @@ -1126,13 +1125,7 @@ public class BrokerService implements Service { addNetworkConnector(uri); } } - if (proxyConnectorURIs != null) { - for (int i = 0; i < proxyConnectorURIs.length; i++) { - String uri = proxyConnectorURIs[i]; - addProxyConnector(uri); - } - } - + if (jmsBridgeConnectors != null){ for (int i = 0; i < jmsBridgeConnectors.length; i++){ addJmsConnector(jmsBridgeConnectors[i]); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 0899f1284e..959e0a648a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -349,7 +349,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit return null; } - public Response processWireFormat(WireFormatInfo info) throws Exception{ + public synchronized Response processWireFormat(WireFormatInfo info) throws Exception{ wireFormatInfo=info; protocolVersion.set(info.getVersion()); return null; @@ -370,6 +370,9 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit if(cs!=null){ context=cs.getContext(); } + if (cs == null) { + throw new NullPointerException("Context is null"); + } // Avoid replaying dup commands if(cs.getTransactionState(info.getTransactionId())==null){ cs.addTransactionState(info.getTransactionId()); @@ -391,6 +394,9 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit if(cs!=null){ context=cs.getContext(); } + if (cs == null) { + throw new NullPointerException("Context is null"); + } TransactionState transactionState=cs.getTransactionState(info.getTransactionId()); if(transactionState==null) throw new IllegalStateException("Cannot prepare a transaction that had not been started: " @@ -414,6 +420,9 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit if(cs!=null){ context=cs.getContext(); } + if (cs == null) { + throw new NullPointerException("Context is null"); + } cs.removeTransactionState(info.getTransactionId()); broker.commitTransaction(context,info.getTransactionId(),true); return null; @@ -425,6 +434,9 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit if(cs!=null){ context=cs.getContext(); } + if (cs == null) { + throw new NullPointerException("Context is null"); + } cs.removeTransactionState(info.getTransactionId()); broker.commitTransaction(context,info.getTransactionId(),false); return null; @@ -436,6 +448,9 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit if(cs!=null){ context=cs.getContext(); } + if (cs == null) { + throw new NullPointerException("Context is null"); + } cs.removeTransactionState(info.getTransactionId()); broker.rollbackTransaction(context,info.getTransactionId()); return null; @@ -869,17 +884,20 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit log.debug("Stopping connection: "+transport.getRemoteAddress()); connector.onStopped(this); try{ - if(masterBroker!=null){ - masterBroker.stop(); - } - if (duplexBridge != null) { - duplexBridge.stop(); - } - // If the transport has not failed yet, - // notify the peer that we are doing a normal shutdown. - if(transportException==null){ - transport.oneway(new ShutdownInfo()); + synchronized(this){ + if(masterBroker!=null){ + masterBroker.stop(); + } + if(duplexBridge!=null){ + duplexBridge.stop(); + } + // If the transport has not failed yet, + // notify the peer that we are doing a normal shutdown. + if(transportException==null){ + transport.oneway(new ShutdownInfo()); + } } + }catch(Exception ignore){ log.trace("Exception caught stopping",ignore); } @@ -1069,7 +1087,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit this.pendingStop=pendingStop; } - public Response processBrokerInfo(BrokerInfo info){ + public synchronized Response processBrokerInfo(BrokerInfo info){ if(info.isSlaveBroker()){ // stream messages from this broker (the master) to // the slave @@ -1172,8 +1190,10 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit synchronized(consumerExchanges){ result=new ConsumerBrokerExchange(); ConnectionState state=lookupConnectionState(id); - context=state.getContext(); - result.setConnectionContext(context); + synchronized(this){ + context=state.getContext(); + result.setConnectionContext(context); + } SessionState ss=state.getSessionState(id.getParentId()); if(ss!=null){ ConsumerState cs=ss.getConsumerState(id); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 644905cb38..0e43561cfa 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -54,7 +54,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us return active; } - protected boolean isFull(){ + protected synchronized boolean isFull(){ return !active||super.isFull(); } @@ -113,25 +113,23 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us topic.deactivate(context,this); } } - synchronized(dispatched){ - for(Iterator iter=dispatched.iterator();iter.hasNext();){ - // Mark the dispatched messages as redelivered for next time. - MessageReference node=(MessageReference)iter.next(); - Integer count=(Integer)redeliveredMessages.get(node.getMessageId()); - if(count!=null){ - redeliveredMessages.put(node.getMessageId(),Integer.valueOf(count.intValue()+1)); - }else{ - redeliveredMessages.put(node.getMessageId(),Integer.valueOf(1)); - } - if(keepDurableSubsActive){ - synchronized(pending){ - pending.addMessageFirst(node); - } - }else{ - node.decrementReferenceCount(); - } - iter.remove(); + for(Iterator iter=dispatched.iterator();iter.hasNext();){ + // Mark the dispatched messages as redelivered for next time. + MessageReference node=(MessageReference)iter.next(); + Integer count=(Integer)redeliveredMessages.get(node.getMessageId()); + if(count!=null){ + redeliveredMessages.put(node.getMessageId(),Integer.valueOf(count.intValue()+1)); + }else{ + redeliveredMessages.put(node.getMessageId(),Integer.valueOf(1)); } + if(keepDurableSubsActive){ + synchronized(pending){ + pending.addMessageFirst(node); + } + }else{ + node.decrementReferenceCount(); + } + iter.remove(); } if(!keepDurableSubsActive){ synchronized(pending){ @@ -167,11 +165,11 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us super.add(node); } - protected void doAddRecoveredMessage(MessageReference message) throws Exception{ + protected synchronized void doAddRecoveredMessage(MessageReference message) throws Exception{ pending.addRecoveredMessage(message); } - public int getPendingQueueSize(){ + public synchronized int getPendingQueueSize(){ if(active||keepDurableSubsActive){ return super.getPendingQueueSize(); } @@ -184,7 +182,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us "You cannot dynamically change the selector for durable topic subscriptions"); } - protected boolean canDispatch(MessageReference node){ + protected synchronized boolean canDispatch(MessageReference node){ return active; } @@ -198,7 +196,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us return subscriptionKey.getSubscriptionName(); } - public String toString(){ + public synchronized String toString(){ return "DurableTopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size() +", total="+enqueueCounter+", pending="+getPendingQueueSize()+", dispatched="+dispatchCounter +", inflight="+dispatched.size()+", prefetchExtension="+this.prefetchExtension; @@ -215,7 +213,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us /** * Release any references that we are holding. */ - public void destroy(){ + public synchronized void destroy(){ try{ synchronized(pending){ pending.reset(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java index 8c06130cc9..3246235741 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java @@ -76,7 +76,7 @@ public class IndirectMessageReference implements QueueMessageReference { this.referenceCount=1; message.incrementReferenceCount(); - this.cachedSize = message != null ? message.getSize() : 0; + this.cachedSize = message.getSize(); } synchronized public Message getMessageHardRef() { @@ -212,7 +212,7 @@ public class IndirectMessageReference implements QueueMessageReference { return false; } - public int getSize(){ + public synchronized int getSize(){ Message msg = message; if (msg != null){ return msg.getSize(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 14ff553361..67057ce88a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -104,7 +104,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ * Occurs when a pull times out. If nothing has been dispatched since the timeout was setup, then send the NULL * message. */ - private synchronized void pullTimeout(long dispatchCounterBeforePull){ + final synchronized void pullTimeout(long dispatchCounterBeforePull){ if(dispatchCounterBeforePull==dispatchCounter){ try{ add(QueueMessageReference.NULL_MESSAGE); @@ -300,14 +300,14 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ /** * @return true when 60% or more room is left for dispatching messages */ - public boolean isLowWaterMark(){ + public synchronized boolean isLowWaterMark(){ return (dispatched.size()-prefetchExtension)<=(info.getPrefetchSize()*.4); } /** * @return true when 10% or less room is left for dispatching messages */ - public boolean isHighWaterMark(){ + public synchronized boolean isHighWaterMark(){ return (dispatched.size()-prefetchExtension)>=(info.getPrefetchSize()*.9); } @@ -315,16 +315,12 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ return info.getPrefetchSize()+prefetchExtension-dispatched.size(); } - public int getPendingQueueSize(){ - synchronized(pending){ - return pending.size(); - } + public synchronized int getPendingQueueSize(){ + return pending.size(); } - public int getDispatchedQueueSize(){ - synchronized(dispatched){ - return dispatched.size(); - } + public synchronized int getDispatchedQueueSize(){ + return dispatched.size(); } synchronized public long getDequeueCounter(){ @@ -344,11 +340,11 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ } - public PendingMessageCursor getPending(){ + public synchronized PendingMessageCursor getPending(){ return this.pending; } - public void setPending(PendingMessageCursor pending){ + public synchronized void setPending(PendingMessageCursor pending){ this.pending=pending; } @@ -413,7 +409,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ } } - protected boolean dispatch(final MessageReference node) throws IOException{ + protected synchronized boolean dispatch(final MessageReference node) throws IOException{ final Message message=node.getMessage(); if(message==null){ return false; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 1a95e5bb7f..453a79c5d7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -47,7 +47,6 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerAck; -import org.apache.activemq.command.Response; import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.kaha.Store; @@ -421,7 +420,7 @@ public class Queue implements Destination, Task { doMessageSend(producerExchange, message); } - private void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { + void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { final ConnectionContext context = producerExchange.getConnectionContext(); message.setRegionDestination(this); if(store!=null&&message.isPersistent()){ @@ -979,7 +978,7 @@ public class Queue implements Destination, Task { } - private void sendMessage(final ConnectionContext context,Message msg) throws Exception{ + final void sendMessage(final ConnectionContext context,Message msg) throws Exception{ synchronized(messages){ messages.addMessageLast(msg); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java index 85b8942aad..e476c887d1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java @@ -39,7 +39,7 @@ public class QueueBrowserSubscription extends QueueSubscription { return !((QueueMessageReference)node).isAcked(); } - public String toString() { + public synchronized String toString() { return "QueueBrowserSubscription:" + " consumer="+info.getConsumerId()+ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index 051ef19cb6..350dd2f2e6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -139,7 +139,7 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner } } - public String toString() { + public synchronized String toString() { return "QueueSubscription:" + " consumer="+info.getConsumerId()+ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 4bb55983aa..881bcba932 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -341,7 +341,7 @@ public class Topic implements Destination { doMessageSend(producerExchange, message); } - private void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { + void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { final ConnectionContext context = producerExchange.getConnectionContext(); message.setRegionDestination(this); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index eb9cd0e707..f5e23ca1fb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -166,7 +166,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ nonPersistent.addMessageLast(node); } - public void clear(){ + public synchronized void clear(){ pendingCount=0; nonPersistent.clear(); for(PendingMessageCursor tsp: storePrefetches){ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java index bdf71b9cbf..800bfb2437 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java @@ -90,7 +90,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{ } } - public void addMessageFirst(MessageReference node) throws Exception{ + public synchronized void addMessageFirst(MessageReference node) throws Exception{ if(node!=null){ Message msg=node.getMessage(); if(started){ @@ -105,7 +105,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{ } } - public void clear(){ + public synchronized void clear(){ pendingCount=0; } @@ -150,7 +150,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{ persistent.reset(); } - public int size(){ + public synchronized int size(){ return pendingCount; } @@ -165,25 +165,25 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{ * @see org.apache.activemq.region.cursors.PendingMessageCursor * @return true if recovery required */ - public boolean isRecoveryRequired(){ + public synchronized boolean isRecoveryRequired(){ return false; } /** * @return the nonPersistent Cursor */ - public PendingMessageCursor getNonPersistent(){ + public synchronized PendingMessageCursor getNonPersistent(){ return this.nonPersistent; } /** * @param nonPersistent cursor to set */ - public void setNonPersistent(PendingMessageCursor nonPersistent){ + public synchronized void setNonPersistent(PendingMessageCursor nonPersistent){ this.nonPersistent=nonPersistent; } - public void setMaxBatchSize(int maxBatchSize){ + public synchronized void setMaxBatchSize(int maxBatchSize){ persistent.setMaxBatchSize(maxBatchSize); if(nonPersistent!=null){ nonPersistent.setMaxBatchSize(maxBatchSize); @@ -191,7 +191,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{ super.setMaxBatchSize(maxBatchSize); } - public void gc() { + public synchronized void gc() { if (persistent != null) { persistent.gc(); } @@ -200,7 +200,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{ } } - public void setUsageManager(UsageManager usageManager){ + public synchronized void setUsageManager(UsageManager usageManager){ super.setUsageManager(usageManager); if (persistent != null) { persistent.setUsageManager(usageManager); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java index 5ca5c232d3..9f58308e1d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java @@ -80,7 +80,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message /** * @return true if there are no pendingCount messages */ - public boolean isEmpty(){ + public synchronized boolean isEmpty(){ return pendingCount <= 0; } @@ -99,7 +99,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message } } - public void addMessageFirst(MessageReference node) throws Exception{ + public synchronized void addMessageFirst(MessageReference node) throws Exception{ if(node!=null){ if(started){ firstMessageId=node.getMessageId(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java index e8594e37bf..5e1f4c620f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java @@ -92,7 +92,7 @@ public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecover this.maximumSize=maximumSize; } - public Message[] browse(ActiveMQDestination destination) throws Exception{ + public synchronized Message[] browse(ActiveMQDestination destination) throws Exception{ List result=new ArrayList(); DestinationFilter filter=DestinationFilter.parseFilter(destination); int t=tail; diff --git a/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java index bbe3acef29..7202c06dc6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java @@ -90,7 +90,7 @@ public class SubscriptionInfo implements DataStructure { return IntrospectionSupport.toString(this); } - public int hasCode() { + public int hashCode() { int h1 = clientId != null ? clientId.hashCode():-1; int h2 = subcriptionName != null ? subcriptionName.hashCode():-1; return h1 ^ h2; diff --git a/activemq-core/src/main/java/org/apache/activemq/jndi/ReadOnlyContext.java b/activemq-core/src/main/java/org/apache/activemq/jndi/ReadOnlyContext.java index 58b1f373f3..33b082bbf5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/jndi/ReadOnlyContext.java +++ b/activemq-core/src/main/java/org/apache/activemq/jndi/ReadOnlyContext.java @@ -389,6 +389,7 @@ public class ReadOnlyContext implements Context, Serializable { } private class ListEnumeration extends LocalNamingEnumeration { + ListEnumeration(){} public Object next() throws NamingException { return nextElement(); } @@ -400,6 +401,8 @@ public class ReadOnlyContext implements Context, Serializable { } private class ListBindingEnumeration extends LocalNamingEnumeration { + ListBindingEnumeration(){ + } public Object next() throws NamingException { return nextElement(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java index 0afd244950..4d0341b1d1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java @@ -61,8 +61,8 @@ public final class AsyncDataManager { public static final int ITEM_HEAD_FOOT_SPACE=ITEM_HEAD_SPACE+ITEM_FOOT_SPACE; - public static final byte[] ITEM_HEAD_SOR=new byte[]{'S', 'O', 'R'}; // - public static final byte[] ITEM_HEAD_EOR=new byte[]{'E', 'O', 'R'}; // + static final byte[] ITEM_HEAD_SOR=new byte[]{'S', 'O', 'R'}; // + static final byte[] ITEM_HEAD_EOR=new byte[]{'E', 'O', 'R'}; // public static final byte DATA_ITEM_TYPE=1; public static final byte REDO_ITEM_TYPE=2; @@ -71,8 +71,8 @@ public final class AsyncDataManager { public static final String DEFAULT_FILE_PREFIX="data-"; public static final int DEFAULT_MAX_FILE_LENGTH=1024*1024*32; - private File directory = new File(DEFAULT_DIRECTORY); - private String filePrefix=DEFAULT_FILE_PREFIX; + File directory = new File(DEFAULT_DIRECTORY); + String filePrefix=DEFAULT_FILE_PREFIX; private int maxFileLength = DEFAULT_MAX_FILE_LENGTH; private int preferedFileLength = DEFAULT_MAX_FILE_LENGTH-1024*512; @@ -101,8 +101,10 @@ public final class AsyncDataManager { started=true; directory.mkdirs(); - controlFile = new ControlFile(new File(directory, filePrefix+"control"), CONTROL_RECORD_MAX_LENGTH); - controlFile.lock(); + synchronized(this){ + controlFile=new ControlFile(new File(directory,filePrefix+"control"),CONTROL_RECORD_MAX_LENGTH); + controlFile.lock(); + } ByteSequence sequence = controlFile.load(); if( sequence != null && sequence.getLength()>0 ) { @@ -116,7 +118,7 @@ public final class AsyncDataManager { File[] files=directory.listFiles(new FilenameFilter(){ public boolean accept(File dir,String n){ - return dir.equals(dir)&&n.startsWith(filePrefix); + return dir.equals(directory)&&n.startsWith(filePrefix); } }); @@ -252,8 +254,8 @@ public final class AsyncDataManager { } DataFile getDataFile(Location item) throws IOException{ - Integer key=new Integer(item.getDataFileId()); - DataFile dataFile=(DataFile) fileMap.get(key); + Integer key= Integer.valueOf(item.getDataFileId()); + DataFile dataFile=fileMap.get(key); if(dataFile==null){ log.error("Looking for key " + key + " but not found in fileMap: " + fileMap); throw new IOException("Could not locate data file "+filePrefix+"-"+item.getDataFileId()); @@ -265,14 +267,12 @@ public final class AsyncDataManager { return (DataFile) dataFile.getNext(); } - public void close() throws IOException{ - synchronized(this){ - if(!started){ - return; - } - Scheduler.cancel(cleanupTask); - accessorPool.close(); + public synchronized void close() throws IOException{ + if(!started){ + return; } + Scheduler.cancel(cleanupTask); + accessorPool.close(); storeState(false); appender.close(); fileMap.clear(); @@ -281,7 +281,7 @@ public final class AsyncDataManager { started=false; } - private synchronized void cleanup() { + synchronized void cleanup() { if( accessorPool!=null ) { accessorPool.disposeUnused(); } @@ -375,7 +375,7 @@ public final class AsyncDataManager { } } - private void removeDataFile(DataFile dataFile) throws IOException{ + private synchronized void removeDataFile(DataFile dataFile) throws IOException{ // Make sure we don't delete too much data. if( dataFile==currentWriteFile || mark==null || dataFile.getDataFileId() >= mark.getDataFileId() ) { @@ -414,7 +414,7 @@ public final class AsyncDataManager { return mark; } - public Location getNextLocation(Location location) throws IOException, IllegalStateException { + public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException { Location cur = null; @@ -492,17 +492,17 @@ public final class AsyncDataManager { storeState(sync); } - private void storeState(boolean sync) throws IOException { - ByteSequence state = marshallState(); - appender.storeItem(state, Location.MARK_TYPE, sync); - controlFile.store(state, sync); - } + private synchronized void storeState(boolean sync) throws IOException{ + ByteSequence state=marshallState(); + appender.storeItem(state,Location.MARK_TYPE,sync); + controlFile.store(state,sync); + } - public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { + public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { return appender.storeItem(data, Location.USER_TYPE, sync); } - public Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException { + public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException { return appender.storeItem(data, type, sync); } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java index bceaa9dfc4..f355c704e4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java @@ -27,7 +27,7 @@ import org.apache.activemq.util.LinkedNode; * * @version $Revision: 1.1.1.1 $ */ -class DataFile extends LinkedNode implements Comparable { +class DataFile extends LinkedNode implements Comparable { private final File file; private final Integer dataFileId; @@ -39,7 +39,7 @@ class DataFile extends LinkedNode implements Comparable { DataFile(File file, int number, int preferedSize){ this.file=file; this.preferedSize = preferedSize; - this.dataFileId=new Integer(number); + this.dataFileId=Integer.valueOf(number); length=(int)(file.exists()?file.length():0); } @@ -98,10 +98,17 @@ class DataFile extends LinkedNode implements Comparable { return file.delete(); } - public int compareTo(Object o) { - DataFile df = (DataFile) o; + public int compareTo(DataFile df) { return dataFileId - df.dataFileId; } + + public boolean equals(Object o) { + boolean result = false; + if (o instanceof DataFile) { + result = compareTo((DataFile)o)==0; + } + return result; + } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java index 99c851f80e..3c77075d36 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java @@ -55,10 +55,13 @@ class DataFileAppender { return hash; } - public boolean equals(Object obj) { - WriteKey di = (WriteKey)obj; - return di.file == file && di.offset == offset; - } + public boolean equals(Object obj){ + if(obj instanceof WriteKey){ + WriteKey di=(WriteKey)obj; + return di.file==file&&di.offset==offset; + } + return false; + } } public class WriteBatch { diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java index a9d8eb8fbf..0901dc7efb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java @@ -131,5 +131,18 @@ public final class Location implements Comparable { } return dataFileId - l.dataFileId; } + + public boolean equals(Object o) { + boolean result = false; + if (o instanceof Location) { + result = compareTo((Location)o)==0; + } + return result; + } + + public int hashCode() { + return dataFileId ^ offset; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java index a2e0765017..c3800f4682 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java @@ -188,24 +188,26 @@ public abstract class BaseContainerImpl{ } } - protected final void delete(IndexItem key,IndexItem prev,IndexItem next){ - try{ - dataManager.removeInterestInFile(key.getKeyFile()); - dataManager.removeInterestInFile(key.getValueFile()); - prev=prev==null?root:prev; - next=next!=root?next:null; - if(next!=null){ - prev.setNextItem(next.getOffset()); - next.setPreviousItem(prev.getOffset()); - updateIndexes(next); - }else{ - prev.setNextItem(Item.POSITION_NOT_SET); + protected final void delete(final IndexItem keyItem,final IndexItem prevItem,final IndexItem nextItem){ + if(keyItem!=null){ + try{ + IndexItem prev=prevItem==null?root:prevItem; + IndexItem next=nextItem!=root?nextItem:null; + dataManager.removeInterestInFile(keyItem.getKeyFile()); + dataManager.removeInterestInFile(keyItem.getValueFile()); + if(next!=null){ + prev.setNextItem(next.getOffset()); + next.setPreviousItem(prev.getOffset()); + updateIndexes(next); + }else{ + prev.setNextItem(Item.POSITION_NOT_SET); + } + updateIndexes(prev); + indexManager.freeIndex(keyItem); + }catch(IOException e){ + log.error("Failed to delete "+keyItem,e); + throw new RuntimeStoreException(e); } - updateIndexes(prev); - indexManager.freeIndex(key); - }catch(IOException e){ - log.error("Failed to delete "+key,e); - throw new RuntimeStoreException(e); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java index 19f74483c7..f1ba401a3c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java @@ -115,13 +115,14 @@ public class ContainerKeySet extends ContainerCollectionSupport implements Set{ } public String toString() { - String result ="ContainerKeySet["; + StringBuffer result =new StringBuffer(32); + result.append("ContainerKeySet["); IndexItem item = container.getInternalList().getRoot(); while ((item = container.getInternalList().getNextEntry(item)) != null) { - result += container.getKey(item); - result += ","; + result.append(container.getKey(item)); + result.append(","); } - result +="]"; - return result; + result.append("]"); + return result.toString(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java index 644d2b2956..465ebf24f5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java @@ -121,6 +121,10 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine } return result; } + + public int hashCode() { + return super.hashCode(); + } /* * (non-Javadoc) @@ -158,13 +162,14 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine public synchronized Object removeFirst(){ load(); Object result=null; - IndexItem item=(IndexItem)indexList.getFirst(); + IndexItem item=indexList.getFirst(); if(item!=null){ itemRemoved(0); result=getValue(item); IndexItem prev=root; IndexItem next=indexList.size()>1?(IndexItem)indexList.get(1):null; indexList.removeFirst(); + delete(item,prev,next); item=null; } @@ -306,6 +311,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine IndexItem prev=indexList.getPrevEntry(item); IndexItem next=indexList.getNextEntry(item); indexList.remove(item); + delete(item,prev,next); } @@ -591,7 +597,6 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine */ public synchronized ListIterator listIterator(){ load(); - IndexItem start= indexList.getFirst(); return new ContainerListIterator(this,indexList,indexList.getRoot()); } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java index 89a7b25228..d701593b58 100755 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java @@ -497,13 +497,12 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont protected synchronized IndexItem write(Object key,Object value){ IndexItem index=null; try{ - if(key!=null){ - index=indexManager.createNewIndex(); - StoreLocation data=dataManager.storeDataItem(keyMarshaller,key); - index.setKeyData(data); - } + index=indexManager.createNewIndex(); + StoreLocation data=dataManager.storeDataItem(keyMarshaller,key); + index.setKeyData(data); + if(value!=null){ - StoreLocation data=dataManager.storeDataItem(valueMarshaller,value); + data=dataManager.storeDataItem(valueMarshaller,value); index.setValueData(data); } IndexItem prev=indexList.getLast(); diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java index 376678ce5e..2454c215f4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java @@ -38,7 +38,7 @@ class DataFile{ DataFile(File file,int number){ this.file=file; - this.number=new Integer(number); + this.number=Integer.valueOf(number); length=file.exists()?file.length():0; } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java index d95fb8ac07..d08ca0032e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java @@ -43,7 +43,7 @@ public final class DataManagerImpl implements DataManager { private static final Log log=LogFactory.getLog(DataManagerImpl.class); public static final long MAX_FILE_LENGTH=1024*1024*32; private static final String NAME_PREFIX="data-"; - private final File dir; + private final File directory; private final String name; private SyncDataFileReader reader; private SyncDataFileWriter writer; @@ -59,14 +59,14 @@ public final class DataManagerImpl implements DataManager { private String dataFilePrefix; public DataManagerImpl(File dir, final String name){ - this.dir=dir; + this.directory=dir; this.name=name; dataFilePrefix = NAME_PREFIX+name+"-"; // build up list of current dataFiles File[] files=dir.listFiles(new FilenameFilter(){ public boolean accept(File dir,String n){ - return dir.equals(dir)&&n.startsWith(dataFilePrefix); + return dir.equals(directory)&&n.startsWith(dataFilePrefix); } }); if(files!=null){ @@ -86,7 +86,7 @@ public final class DataManagerImpl implements DataManager { private DataFile createAndAddDataFile(int num){ String fileName=dataFilePrefix+num; - File file=new File(dir,fileName); + File file=new File(directory,fileName); DataFile result=new DataFile(file,num); fileMap.put(result.getNumber(),result); return result; @@ -114,7 +114,7 @@ public final class DataManagerImpl implements DataManager { } DataFile getDataFile(StoreLocation item) throws IOException{ - Integer key=new Integer(item.getFile()); + Integer key=Integer.valueOf(item.getFile()); DataFile dataFile=(DataFile) fileMap.get(key); if(dataFile==null){ log.error("Looking for key " + key + " but not found in fileMap: " + fileMap); diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java index 497fa38704..a305b92aba 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java @@ -297,12 +297,8 @@ class HashBin { } } - private void doUnderFlow(int index) { - int pageNo = index / maximumEntries; - int nextPageNo = pageNo + 1; - if (nextPageNo < hashPages.size()) { - } - HashPageInfo info = hashPages.get(pageNo); + private void doUnderFlow(@SuppressWarnings("unused") int index) { + //does little } private void end() throws IOException { diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java index 44068d7750..e759389998 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java @@ -43,7 +43,7 @@ class HashEntry implements Comparable{ return compareTo(o)==0; } - public int hasCode(){ + public int hashCode(){ return key.hashCode(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java index 2d395c93bc..cb323ac7ae 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java @@ -260,7 +260,7 @@ public class HashIndex implements Index{ public synchronized void delete() throws IOException{ unload(); if(file.exists()){ - boolean result=file.delete(); + file.delete(); } length=0; } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java index e42d37ba8f..934c990acd 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java @@ -225,9 +225,12 @@ class HashPage { void dump() { - String str = this + ": "; + StringBuffer str = new StringBuffer(32); + str.append(toString()); + str.append(": "); for (HashEntry entry : hashIndexEntries) { - str += entry + ","; + str .append(entry); + str.append(","); } log.info(str); } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java index 237ff261fd..657e738962 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java @@ -45,7 +45,7 @@ class TreeEntry implements Comparable{ return compareTo(o)==0; } - public int hasCode(){ + public int hashCode(){ return key.hashCode(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java b/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java index 90493ba5fb..fa16cdb784 100755 --- a/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java +++ b/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java @@ -50,7 +50,7 @@ public class CacheEvictionUsageListener implements UsageListener { }, "Cache Evictor: "+System.identityHashCode(this)); } - private boolean evictMessages() { + boolean evictMessages() { // Try to take the memory usage down below the low mark. try { log.debug("Evicting cache memory usage: "+usageManager.getPercentUsage()); diff --git a/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java index 792f127d64..8b487120ff 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java @@ -125,7 +125,7 @@ public class ForwardingBridge implements Service{ /** * @throws IOException */ - private void startBridge() throws IOException { + final void startBridge() throws IOException { connectionInfo = new ConnectionInfo(); connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); connectionInfo.setClientId(clientId); diff --git a/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java b/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java index a36564a53d..edacf9c263 100644 --- a/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java +++ b/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java @@ -196,7 +196,7 @@ public class ConnectionPool { protected class Synchronization implements javax.transaction.Synchronization { private final PooledSession session; - private Synchronization(PooledSession session) { + protected Synchronization(PooledSession session) { this.session = session; } diff --git a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java index 672b692635..0f140935f0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java @@ -168,18 +168,22 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { } } - public Response processAddDestination(DestinationInfo info) { - ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId()); - if( cs != null && info != null && info.getDestination().isTemporary() ) { - cs.addTempDestination(info); + public Response processAddDestination(DestinationInfo info){ + if(info!=null){ + ConnectionState cs=(ConnectionState)connectionStates.get(info.getConnectionId()); + if(cs!=null&&info.getDestination().isTemporary()){ + cs.addTempDestination(info); + } } return TRACKED_RESPONSE_MARKER; } - public Response processRemoveDestination(DestinationInfo info) { - ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId()); - if( cs != null && info != null && info.getDestination().isTemporary() ) { - cs.removeTempDestination(info.getDestination()); + public Response processRemoveDestination(DestinationInfo info){ + if(info!=null){ + ConnectionState cs=(ConnectionState)connectionStates.get(info.getConnectionId()); + if(cs!=null&&info.getDestination().isTemporary()){ + cs.removeTempDestination(info.getDestination()); + } } return TRACKED_RESPONSE_MARKER; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java index 0d93fb2826..04a1bd49c1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java @@ -130,7 +130,7 @@ public class AMQMessageStore implements MessageStore{ } } - private void addMessage(final Message message,final Location location) throws InterruptedIOException{ + void addMessage(final Message message,final Location location) throws InterruptedIOException{ ReferenceData data=new ReferenceData(); data.setExpiration(message.getExpiration()); data.setFileId(location.getDataFileId()); @@ -205,7 +205,7 @@ public class AMQMessageStore implements MessageStore{ } } - private void removeMessage(final MessageAck ack,final Location location) throws InterruptedIOException{ + final void removeMessage(final MessageAck ack,final Location location) throws InterruptedIOException{ ReferenceData data; synchronized(this){ lastLocation=location; @@ -273,7 +273,7 @@ public class AMQMessageStore implements MessageStore{ * @return * @throws IOException */ - private void asyncWrite(){ + void asyncWrite(){ try{ CountDownLatch countDown; synchronized(this){ diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java index 54ae832f69..a55775ad86 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java @@ -218,8 +218,10 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, if(!started.compareAndSet(true,false)) return; this.usageManager.removeUsageListener(this); - Scheduler.cancel(periodicCheckpointTask); - Scheduler.cancel(periodicCleanupTask); + synchronized(this){ + Scheduler.cancel(periodicCheckpointTask); + Scheduler.cancel(periodicCleanupTask); + } Iterator iterator=queues.values().iterator(); while(iterator.hasNext()){ AMQMessageStore ms=iterator.next(); @@ -232,7 +234,9 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, } // Take one final checkpoint and stop checkpoint processing. checkpoint(true); - checkpointTask.shutdown(); + synchronized(this){ + checkpointTask.shutdown(); + } queues.clear(); topics.clear(); IOException firstException=null; @@ -259,8 +263,8 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, CountDownLatch latch=null; synchronized(this){ latch=nextCheckpointCountDownLatch; + checkpointTask.wakeup(); } - checkpointTask.wakeup(); if(sync){ if(log.isDebugEnabled()){ log.debug("Waitng for checkpoint to complete."); @@ -585,7 +589,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, return transactionStore; } - public void deleteAllMessages() throws IOException{ + public synchronized void deleteAllMessages() throws IOException{ deleteAllMessages=true; } @@ -669,11 +673,11 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, this.maxCheckpointWorkers=maxCheckpointWorkers; } - public File getDirectory(){ + public synchronized File getDirectory(){ return directory; } - public void setDirectory(File directory){ + public synchronized void setDirectory(File directory){ this.directory=directory; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java index 93db67b8d2..d4adfd9796 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java @@ -143,7 +143,7 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag * @param key * @throws InterruptedIOException */ - private void acknowledge(MessageId messageId,Location location,SubscriptionKey key) throws InterruptedIOException{ + protected void acknowledge(MessageId messageId,Location location,SubscriptionKey key) throws InterruptedIOException{ synchronized(this){ lastLocation=location; ackedLastAckLocations.put(key,messageId); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java index c156af137b..11376a51fd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java @@ -125,7 +125,7 @@ public class JournalMessageStore implements MessageStore { } } - private void addMessage(final Message message, final RecordLocation location) { + void addMessage(final Message message, final RecordLocation location) { synchronized (this) { lastLocation = location; MessageId id = message.getMessageId(); @@ -187,7 +187,7 @@ public class JournalMessageStore implements MessageStore { } } - private void removeMessage(final MessageAck ack, final RecordLocation location) { + final void removeMessage(final MessageAck ack, final RecordLocation location) { synchronized (this) { lastLocation = location; MessageId id = ack.getLastMessageId(); @@ -253,33 +253,31 @@ public class JournalMessageStore implements MessageStore { ConnectionContext context = transactionTemplate.getContext(); // Checkpoint the added messages. - Iterator iterator = cpAddedMessageIds.values().iterator(); - while (iterator.hasNext()) { - Message message = (Message) iterator.next(); - try { - longTermStore.addMessage(context, message); - } catch (Throwable e) { - log.warn("Message could not be added to long term store: " + e.getMessage(), e); + synchronized(JournalMessageStore.this){ + Iterator iterator=cpAddedMessageIds.values().iterator(); + while(iterator.hasNext()){ + Message message=(Message)iterator.next(); + try{ + longTermStore.addMessage(context,message); + }catch(Throwable e){ + log.warn("Message could not be added to long term store: "+e.getMessage(),e); + } + size+=message.getSize(); + message.decrementReferenceCount(); + // Commit the batch if it's getting too big + if(size>=maxCheckpointMessageAddSize){ + persitanceAdapter.commitTransaction(context); + persitanceAdapter.beginTransaction(context); + size=0; + } } - - size += message.getSize(); - - message.decrementReferenceCount(); - - // Commit the batch if it's getting too big - if( size >= maxCheckpointMessageAddSize ) { - persitanceAdapter.commitTransaction(context); - persitanceAdapter.beginTransaction(context); - size=0; - } - } persitanceAdapter.commitTransaction(context); persitanceAdapter.beginTransaction(context); // Checkpoint the removed messages. - iterator = cpRemovedMessageLocations.iterator(); + Iterator iterator = cpRemovedMessageLocations.iterator(); while (iterator.hasNext()) { try { MessageAck ack = (MessageAck) iterator.next(); @@ -303,7 +301,8 @@ public class JournalMessageStore implements MessageStore { if( cpActiveJournalLocations.size() > 0 ) { Collections.sort(cpActiveJournalLocations); return (RecordLocation) cpActiveJournalLocations.get(0); - } else { + } + synchronized (this){ return lastLocation; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java index 2cefc36961..473179ea54 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java @@ -92,8 +92,8 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve private final ConcurrentHashMap topics = new ConcurrentHashMap(); private UsageManager usageManager; - private long checkpointInterval = 1000 * 60 * 5; - private long lastCheckpointRequest = System.currentTimeMillis(); + long checkpointInterval = 1000 * 60 * 5; + long lastCheckpointRequest = System.currentTimeMillis(); private long lastCleanup = System.currentTimeMillis(); private int maxCheckpointWorkers = 10; private int maxCheckpointMessageAddSize = 1024*1024; @@ -112,7 +112,11 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve final Runnable createPeriodicCheckpointTask() { return new Runnable() { public void run() { - if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) { + long lastTime = 0; + synchronized(this) { + lastTime = lastCheckpointRequest; + } + if( System.currentTimeMillis()>lastTime+checkpointInterval ) { checkpoint(false, true); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java index 6737bc9e8a..79ee6eac28 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java @@ -142,7 +142,7 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top * @param location * @param key */ - private void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) { + protected void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) { synchronized(this) { lastLocation = location; ackedLastAckLocations.put(key, messageId); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java index 7717576099..c7eacc248b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java @@ -174,7 +174,7 @@ public class KahaMessageStore implements MessageStore{ * @param nextToDispatch * @see org.apache.activemq.store.MessageStore#resetBatching(org.apache.activemq.command.MessageId) */ - public void resetBatching(){ + public synchronized void resetBatching(){ batchEntry=null; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java index 85976bc72c..f6d0715138 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java @@ -149,7 +149,7 @@ public class KahaReferenceStore implements ReferenceStore{ messageContainer.clear(); } - public void resetBatching(){ + public synchronized void resetBatching(){ batchEntry=null; lastBatchId=null; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java index 4d1c07123c..f0eb94a90e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java @@ -212,7 +212,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements } @Override - public void setDirectory(File directory){ + public synchronized void setDirectory(File directory){ File file = new File(directory,"data"); super.setDirectory(file); this.stateStore=createStateStore(directory); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java index bccc0c2279..8d2cf3905a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java @@ -138,7 +138,7 @@ public class KahaTransactionStore implements TransactionStore{ * @param ack * @throws IOException */ - private void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException{ + final void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException{ if(ack.isInTransaction()){ KahaTransaction tx=getOrCreateTx(ack.getTransactionId()); tx.add((KahaMessageStore) destination,ack); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java index f134dcc9f3..d27d1c1111 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java @@ -27,8 +27,7 @@ import java.util.Iterator; public class TopicSubContainer { private transient ListContainer listContainer; private transient StoreEntry batchEntry; - private transient String lastBatchId; - + public TopicSubContainer(ListContainer container) { this.listContainer = container; } @@ -45,12 +44,10 @@ public class TopicSubContainer { * @param batchEntry the batchEntry to set */ public void setBatchEntry(String id,StoreEntry batchEntry) { - this.lastBatchId=id; this.batchEntry = batchEntry; } public void reset() { - lastBatchId=null; batchEntry = null; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java index a439129786..5906ade6cc 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java @@ -229,7 +229,7 @@ public class MemoryTransactionStore implements TransactionStore { * @param ack * @throws IOException */ - private void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException { + final void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException { if( doingRecover ) return; diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java b/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java index c2acff39bd..e412101582 100644 --- a/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java @@ -82,7 +82,7 @@ class DedicatedTaskRunner implements TaskRunner { shutdown(0); } - private void runTask() { + final void runTask() { try { while( true ) { diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java b/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java index 531d4814d9..95339ebd6c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java @@ -96,7 +96,7 @@ class PooledTaskRunner implements TaskRunner { public void shutdown() throws InterruptedException { shutdown(0); } - private void runTask() { + final void runTask() { synchronized (runable) { queued = false; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index 88aadd2989..8300cc9d05 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -68,7 +68,7 @@ public class InactivityMonitor extends TransportFilter { } - private void writeCheck() { + final void writeCheck() { synchronized(writeChecker) { if( inSend.get() ) { log.trace("A send is in progress"); @@ -90,7 +90,7 @@ public class InactivityMonitor extends TransportFilter { } } - private void readCheck() { + final void readCheck() { synchronized(readChecker) { if( inReceive.get() ) { log.trace("A receive is in progress"); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java index 57262a40f3..5a1526e8c1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java @@ -60,8 +60,6 @@ public class RendezvousDiscoveryAgent implements DiscoveryAgent, ServiceListener private String group = "default"; private final CopyOnWriteArrayList serviceInfos = new CopyOnWriteArrayList(); - private String brokerName; - // DiscoveryAgent interface //------------------------------------------------------------------------- public void start() throws Exception { @@ -232,11 +230,16 @@ public class RendezvousDiscoveryAgent implements DiscoveryAgent, ServiceListener return "_" + group+"."+TYPE_SUFFIX; } - public void setBrokerName(String brokerName) { - this.brokerName = brokerName; - } - public void serviceFailed(DiscoveryEvent event) throws IOException { // TODO: is there a way to notify the JmDNS that the service failed? } + + /** + * @param brokerName + * @see org.apache.activemq.transport.discovery.DiscoveryAgent#setBrokerName(java.lang.String) + */ + public void setBrokerName(String brokerName){ + // implementation of interface + + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index febd697109..289669bd80 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -91,7 +91,7 @@ public class FailoverTransport implements CompositeTransport { return; } if (command.isResponse()) { - Object object = requestMap.remove(new Integer(((Response) command).getCorrelationId())); + Object object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId())); if( object!=null && object.getClass() == Tracked.class ) { ((Tracked)object).onResponses(); } @@ -231,7 +231,7 @@ public class FailoverTransport implements CompositeTransport { }, "ActiveMQ Failover Worker: "+System.identityHashCode(this)); } - private void handleTransportFailure(IOException e) throws InterruptedException { + final void handleTransportFailure(IOException e) throws InterruptedException { if (transportListener != null){ transportListener.transportInterupted(); } @@ -382,9 +382,9 @@ public class FailoverTransport implements CompositeTransport { // it later. Tracked tracked = stateTracker.track(command); if( tracked!=null && tracked.isWaitingForResponse() ) { - requestMap.put(new Integer(command.getCommandId()), tracked); + requestMap.put(Integer.valueOf(command.getCommandId()), tracked); } else if ( tracked==null && command.isResponseRequired()) { - requestMap.put(new Integer(command.getCommandId()), command); + requestMap.put(Integer.valueOf(command.getCommandId()), command); } // Send the message. @@ -398,7 +398,7 @@ public class FailoverTransport implements CompositeTransport { // since we will retry in this method.. take it out of the request // map so that it is not sent 2 times on recovery if( command.isResponseRequired() ) { - requestMap.remove(new Integer(command.getCommandId())); + requestMap.remove(Integer.valueOf(command.getCommandId())); } // Rethrow the exception so it will handled by the outer catch diff --git a/activemq-core/src/main/java/org/apache/activemq/util/MessageComparatorSupport.java b/activemq-core/src/main/java/org/apache/activemq/util/MessageComparatorSupport.java index a43b36dbf2..0bbc242cae 100644 --- a/activemq-core/src/main/java/org/apache/activemq/util/MessageComparatorSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/MessageComparatorSupport.java @@ -19,6 +19,7 @@ package org.apache.activemq.util; import javax.jms.Message; +import java.io.Serializable; import java.util.Comparator; /** @@ -26,7 +27,7 @@ import java.util.Comparator; * * @version $Revision$ */ -public abstract class MessageComparatorSupport implements Comparator { +public abstract class MessageComparatorSupport implements Comparator, Serializable { public int compare(Object object1, Object object2) { Message command1 = (Message) object1; @@ -36,11 +37,20 @@ public abstract class MessageComparatorSupport implements Comparator { protected abstract int compareMessages(Message message1, Message message2); - protected int compareComparators(Comparable comparable, Comparable comparable2) { - if (comparable != null) { + protected int compareComparators(final Comparable comparable, final Comparable comparable2) { + if (comparable == null && comparable2 == null) { + return 0; + } + else if (comparable != null) { + if (comparable2== null) { + return 1; + } return comparable.compareTo(comparable2); } else if (comparable2 != null) { + if (comparable== null) { + return -11; + } return comparable2.compareTo(comparable) * -1; } return 0;