ensure member variables are always synchronized

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@551271 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-06-27 18:53:30 +00:00
parent 8a0157a506
commit a3e382199c
53 changed files with 299 additions and 240 deletions

View File

@ -591,7 +591,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge())); props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
props.setProperty("statsEnabled",Boolean.toString(isStatsEnabled())); props.setProperty("statsEnabled",Boolean.toString(isStatsEnabled()));
props.setProperty("alwaysSyncSend",Boolean.toString(isAlwaysSyncSend())); props.setProperty("alwaysSyncSend",Boolean.toString(isAlwaysSyncSend()));
props.setProperty("producerWindowSize", Integer.toString(producerWindowSize)); props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
} }
public boolean isUseCompression() { public boolean isUseCompression() {

View File

@ -136,7 +136,6 @@ public class BrokerService implements Service {
private transient Thread shutdownHook; private transient Thread shutdownHook;
private String[] transportConnectorURIs; private String[] transportConnectorURIs;
private String[] networkConnectorURIs; private String[] networkConnectorURIs;
private String[] proxyConnectorURIs;
private JmsConnector[] jmsBridgeConnectors; //these are Jms to Jms bridges to other jms messaging systems private JmsConnector[] jmsBridgeConnectors; //these are Jms to Jms bridges to other jms messaging systems
private boolean deleteAllMessagesOnStartup; private boolean deleteAllMessagesOnStartup;
private boolean advisorySupport = true; private boolean advisorySupport = true;
@ -1126,12 +1125,6 @@ public class BrokerService implements Service {
addNetworkConnector(uri); addNetworkConnector(uri);
} }
} }
if (proxyConnectorURIs != null) {
for (int i = 0; i < proxyConnectorURIs.length; i++) {
String uri = proxyConnectorURIs[i];
addProxyConnector(uri);
}
}
if (jmsBridgeConnectors != null){ if (jmsBridgeConnectors != null){
for (int i = 0; i < jmsBridgeConnectors.length; i++){ for (int i = 0; i < jmsBridgeConnectors.length; i++){

View File

@ -349,7 +349,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
return null; return null;
} }
public Response processWireFormat(WireFormatInfo info) throws Exception{ public synchronized Response processWireFormat(WireFormatInfo info) throws Exception{
wireFormatInfo=info; wireFormatInfo=info;
protocolVersion.set(info.getVersion()); protocolVersion.set(info.getVersion());
return null; return null;
@ -370,6 +370,9 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
if(cs!=null){ if(cs!=null){
context=cs.getContext(); context=cs.getContext();
} }
if (cs == null) {
throw new NullPointerException("Context is null");
}
// Avoid replaying dup commands // Avoid replaying dup commands
if(cs.getTransactionState(info.getTransactionId())==null){ if(cs.getTransactionState(info.getTransactionId())==null){
cs.addTransactionState(info.getTransactionId()); cs.addTransactionState(info.getTransactionId());
@ -391,6 +394,9 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
if(cs!=null){ if(cs!=null){
context=cs.getContext(); context=cs.getContext();
} }
if (cs == null) {
throw new NullPointerException("Context is null");
}
TransactionState transactionState=cs.getTransactionState(info.getTransactionId()); TransactionState transactionState=cs.getTransactionState(info.getTransactionId());
if(transactionState==null) if(transactionState==null)
throw new IllegalStateException("Cannot prepare a transaction that had not been started: " throw new IllegalStateException("Cannot prepare a transaction that had not been started: "
@ -414,6 +420,9 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
if(cs!=null){ if(cs!=null){
context=cs.getContext(); context=cs.getContext();
} }
if (cs == null) {
throw new NullPointerException("Context is null");
}
cs.removeTransactionState(info.getTransactionId()); cs.removeTransactionState(info.getTransactionId());
broker.commitTransaction(context,info.getTransactionId(),true); broker.commitTransaction(context,info.getTransactionId(),true);
return null; return null;
@ -425,6 +434,9 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
if(cs!=null){ if(cs!=null){
context=cs.getContext(); context=cs.getContext();
} }
if (cs == null) {
throw new NullPointerException("Context is null");
}
cs.removeTransactionState(info.getTransactionId()); cs.removeTransactionState(info.getTransactionId());
broker.commitTransaction(context,info.getTransactionId(),false); broker.commitTransaction(context,info.getTransactionId(),false);
return null; return null;
@ -436,6 +448,9 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
if(cs!=null){ if(cs!=null){
context=cs.getContext(); context=cs.getContext();
} }
if (cs == null) {
throw new NullPointerException("Context is null");
}
cs.removeTransactionState(info.getTransactionId()); cs.removeTransactionState(info.getTransactionId());
broker.rollbackTransaction(context,info.getTransactionId()); broker.rollbackTransaction(context,info.getTransactionId());
return null; return null;
@ -869,10 +884,11 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
log.debug("Stopping connection: "+transport.getRemoteAddress()); log.debug("Stopping connection: "+transport.getRemoteAddress());
connector.onStopped(this); connector.onStopped(this);
try{ try{
synchronized(this){
if(masterBroker!=null){ if(masterBroker!=null){
masterBroker.stop(); masterBroker.stop();
} }
if (duplexBridge != null) { if(duplexBridge!=null){
duplexBridge.stop(); duplexBridge.stop();
} }
// If the transport has not failed yet, // If the transport has not failed yet,
@ -880,6 +896,8 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
if(transportException==null){ if(transportException==null){
transport.oneway(new ShutdownInfo()); transport.oneway(new ShutdownInfo());
} }
}
}catch(Exception ignore){ }catch(Exception ignore){
log.trace("Exception caught stopping",ignore); log.trace("Exception caught stopping",ignore);
} }
@ -1069,7 +1087,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
this.pendingStop=pendingStop; this.pendingStop=pendingStop;
} }
public Response processBrokerInfo(BrokerInfo info){ public synchronized Response processBrokerInfo(BrokerInfo info){
if(info.isSlaveBroker()){ if(info.isSlaveBroker()){
// stream messages from this broker (the master) to // stream messages from this broker (the master) to
// the slave // the slave
@ -1172,8 +1190,10 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
synchronized(consumerExchanges){ synchronized(consumerExchanges){
result=new ConsumerBrokerExchange(); result=new ConsumerBrokerExchange();
ConnectionState state=lookupConnectionState(id); ConnectionState state=lookupConnectionState(id);
synchronized(this){
context=state.getContext(); context=state.getContext();
result.setConnectionContext(context); result.setConnectionContext(context);
}
SessionState ss=state.getSessionState(id.getParentId()); SessionState ss=state.getSessionState(id.getParentId());
if(ss!=null){ if(ss!=null){
ConsumerState cs=ss.getConsumerState(id); ConsumerState cs=ss.getConsumerState(id);

View File

@ -54,7 +54,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
return active; return active;
} }
protected boolean isFull(){ protected synchronized boolean isFull(){
return !active||super.isFull(); return !active||super.isFull();
} }
@ -113,7 +113,6 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
topic.deactivate(context,this); topic.deactivate(context,this);
} }
} }
synchronized(dispatched){
for(Iterator iter=dispatched.iterator();iter.hasNext();){ for(Iterator iter=dispatched.iterator();iter.hasNext();){
// Mark the dispatched messages as redelivered for next time. // Mark the dispatched messages as redelivered for next time.
MessageReference node=(MessageReference)iter.next(); MessageReference node=(MessageReference)iter.next();
@ -132,7 +131,6 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
} }
iter.remove(); iter.remove();
} }
}
if(!keepDurableSubsActive){ if(!keepDurableSubsActive){
synchronized(pending){ synchronized(pending){
try{ try{
@ -167,11 +165,11 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
super.add(node); super.add(node);
} }
protected void doAddRecoveredMessage(MessageReference message) throws Exception{ protected synchronized void doAddRecoveredMessage(MessageReference message) throws Exception{
pending.addRecoveredMessage(message); pending.addRecoveredMessage(message);
} }
public int getPendingQueueSize(){ public synchronized int getPendingQueueSize(){
if(active||keepDurableSubsActive){ if(active||keepDurableSubsActive){
return super.getPendingQueueSize(); return super.getPendingQueueSize();
} }
@ -184,7 +182,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
"You cannot dynamically change the selector for durable topic subscriptions"); "You cannot dynamically change the selector for durable topic subscriptions");
} }
protected boolean canDispatch(MessageReference node){ protected synchronized boolean canDispatch(MessageReference node){
return active; return active;
} }
@ -198,7 +196,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
return subscriptionKey.getSubscriptionName(); return subscriptionKey.getSubscriptionName();
} }
public String toString(){ public synchronized String toString(){
return "DurableTopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size() return "DurableTopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size()
+", total="+enqueueCounter+", pending="+getPendingQueueSize()+", dispatched="+dispatchCounter +", total="+enqueueCounter+", pending="+getPendingQueueSize()+", dispatched="+dispatchCounter
+", inflight="+dispatched.size()+", prefetchExtension="+this.prefetchExtension; +", inflight="+dispatched.size()+", prefetchExtension="+this.prefetchExtension;
@ -215,7 +213,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
/** /**
* Release any references that we are holding. * Release any references that we are holding.
*/ */
public void destroy(){ public synchronized void destroy(){
try{ try{
synchronized(pending){ synchronized(pending){
pending.reset(); pending.reset();

View File

@ -76,7 +76,7 @@ public class IndirectMessageReference implements QueueMessageReference {
this.referenceCount=1; this.referenceCount=1;
message.incrementReferenceCount(); message.incrementReferenceCount();
this.cachedSize = message != null ? message.getSize() : 0; this.cachedSize = message.getSize();
} }
synchronized public Message getMessageHardRef() { synchronized public Message getMessageHardRef() {
@ -212,7 +212,7 @@ public class IndirectMessageReference implements QueueMessageReference {
return false; return false;
} }
public int getSize(){ public synchronized int getSize(){
Message msg = message; Message msg = message;
if (msg != null){ if (msg != null){
return msg.getSize(); return msg.getSize();

View File

@ -104,7 +104,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
* Occurs when a pull times out. If nothing has been dispatched since the timeout was setup, then send the NULL * Occurs when a pull times out. If nothing has been dispatched since the timeout was setup, then send the NULL
* message. * message.
*/ */
private synchronized void pullTimeout(long dispatchCounterBeforePull){ final synchronized void pullTimeout(long dispatchCounterBeforePull){
if(dispatchCounterBeforePull==dispatchCounter){ if(dispatchCounterBeforePull==dispatchCounter){
try{ try{
add(QueueMessageReference.NULL_MESSAGE); add(QueueMessageReference.NULL_MESSAGE);
@ -300,14 +300,14 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
/** /**
* @return true when 60% or more room is left for dispatching messages * @return true when 60% or more room is left for dispatching messages
*/ */
public boolean isLowWaterMark(){ public synchronized boolean isLowWaterMark(){
return (dispatched.size()-prefetchExtension)<=(info.getPrefetchSize()*.4); return (dispatched.size()-prefetchExtension)<=(info.getPrefetchSize()*.4);
} }
/** /**
* @return true when 10% or less room is left for dispatching messages * @return true when 10% or less room is left for dispatching messages
*/ */
public boolean isHighWaterMark(){ public synchronized boolean isHighWaterMark(){
return (dispatched.size()-prefetchExtension)>=(info.getPrefetchSize()*.9); return (dispatched.size()-prefetchExtension)>=(info.getPrefetchSize()*.9);
} }
@ -315,17 +315,13 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
return info.getPrefetchSize()+prefetchExtension-dispatched.size(); return info.getPrefetchSize()+prefetchExtension-dispatched.size();
} }
public int getPendingQueueSize(){ public synchronized int getPendingQueueSize(){
synchronized(pending){
return pending.size(); return pending.size();
} }
}
public int getDispatchedQueueSize(){ public synchronized int getDispatchedQueueSize(){
synchronized(dispatched){
return dispatched.size(); return dispatched.size();
} }
}
synchronized public long getDequeueCounter(){ synchronized public long getDequeueCounter(){
return dequeueCounter; return dequeueCounter;
@ -344,11 +340,11 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
public PendingMessageCursor getPending(){ public synchronized PendingMessageCursor getPending(){
return this.pending; return this.pending;
} }
public void setPending(PendingMessageCursor pending){ public synchronized void setPending(PendingMessageCursor pending){
this.pending=pending; this.pending=pending;
} }
@ -413,7 +409,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
} }
protected boolean dispatch(final MessageReference node) throws IOException{ protected synchronized boolean dispatch(final MessageReference node) throws IOException{
final Message message=node.getMessage(); final Message message=node.getMessage();
if(message==null){ if(message==null){
return false; return false;

View File

@ -47,7 +47,6 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
@ -421,7 +420,7 @@ public class Queue implements Destination, Task {
doMessageSend(producerExchange, message); doMessageSend(producerExchange, message);
} }
private void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
final ConnectionContext context = producerExchange.getConnectionContext(); final ConnectionContext context = producerExchange.getConnectionContext();
message.setRegionDestination(this); message.setRegionDestination(this);
if(store!=null&&message.isPersistent()){ if(store!=null&&message.isPersistent()){
@ -979,7 +978,7 @@ public class Queue implements Destination, Task {
} }
private void sendMessage(final ConnectionContext context,Message msg) throws Exception{ final void sendMessage(final ConnectionContext context,Message msg) throws Exception{
synchronized(messages){ synchronized(messages){
messages.addMessageLast(msg); messages.addMessageLast(msg);
} }

View File

@ -39,7 +39,7 @@ public class QueueBrowserSubscription extends QueueSubscription {
return !((QueueMessageReference)node).isAcked(); return !((QueueMessageReference)node).isAcked();
} }
public String toString() { public synchronized String toString() {
return return
"QueueBrowserSubscription:" + "QueueBrowserSubscription:" +
" consumer="+info.getConsumerId()+ " consumer="+info.getConsumerId()+

View File

@ -139,7 +139,7 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
} }
} }
public String toString() { public synchronized String toString() {
return return
"QueueSubscription:" + "QueueSubscription:" +
" consumer="+info.getConsumerId()+ " consumer="+info.getConsumerId()+

View File

@ -341,7 +341,7 @@ public class Topic implements Destination {
doMessageSend(producerExchange, message); doMessageSend(producerExchange, message);
} }
private void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
final ConnectionContext context = producerExchange.getConnectionContext(); final ConnectionContext context = producerExchange.getConnectionContext();
message.setRegionDestination(this); message.setRegionDestination(this);

View File

@ -166,7 +166,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
nonPersistent.addMessageLast(node); nonPersistent.addMessageLast(node);
} }
public void clear(){ public synchronized void clear(){
pendingCount=0; pendingCount=0;
nonPersistent.clear(); nonPersistent.clear();
for(PendingMessageCursor tsp: storePrefetches){ for(PendingMessageCursor tsp: storePrefetches){

View File

@ -90,7 +90,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{
} }
} }
public void addMessageFirst(MessageReference node) throws Exception{ public synchronized void addMessageFirst(MessageReference node) throws Exception{
if(node!=null){ if(node!=null){
Message msg=node.getMessage(); Message msg=node.getMessage();
if(started){ if(started){
@ -105,7 +105,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{
} }
} }
public void clear(){ public synchronized void clear(){
pendingCount=0; pendingCount=0;
} }
@ -150,7 +150,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{
persistent.reset(); persistent.reset();
} }
public int size(){ public synchronized int size(){
return pendingCount; return pendingCount;
} }
@ -165,25 +165,25 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{
* @see org.apache.activemq.region.cursors.PendingMessageCursor * @see org.apache.activemq.region.cursors.PendingMessageCursor
* @return true if recovery required * @return true if recovery required
*/ */
public boolean isRecoveryRequired(){ public synchronized boolean isRecoveryRequired(){
return false; return false;
} }
/** /**
* @return the nonPersistent Cursor * @return the nonPersistent Cursor
*/ */
public PendingMessageCursor getNonPersistent(){ public synchronized PendingMessageCursor getNonPersistent(){
return this.nonPersistent; return this.nonPersistent;
} }
/** /**
* @param nonPersistent cursor to set * @param nonPersistent cursor to set
*/ */
public void setNonPersistent(PendingMessageCursor nonPersistent){ public synchronized void setNonPersistent(PendingMessageCursor nonPersistent){
this.nonPersistent=nonPersistent; this.nonPersistent=nonPersistent;
} }
public void setMaxBatchSize(int maxBatchSize){ public synchronized void setMaxBatchSize(int maxBatchSize){
persistent.setMaxBatchSize(maxBatchSize); persistent.setMaxBatchSize(maxBatchSize);
if(nonPersistent!=null){ if(nonPersistent!=null){
nonPersistent.setMaxBatchSize(maxBatchSize); nonPersistent.setMaxBatchSize(maxBatchSize);
@ -191,7 +191,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{
super.setMaxBatchSize(maxBatchSize); super.setMaxBatchSize(maxBatchSize);
} }
public void gc() { public synchronized void gc() {
if (persistent != null) { if (persistent != null) {
persistent.gc(); persistent.gc();
} }
@ -200,7 +200,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{
} }
} }
public void setUsageManager(UsageManager usageManager){ public synchronized void setUsageManager(UsageManager usageManager){
super.setUsageManager(usageManager); super.setUsageManager(usageManager);
if (persistent != null) { if (persistent != null) {
persistent.setUsageManager(usageManager); persistent.setUsageManager(usageManager);

View File

@ -80,7 +80,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
/** /**
* @return true if there are no pendingCount messages * @return true if there are no pendingCount messages
*/ */
public boolean isEmpty(){ public synchronized boolean isEmpty(){
return pendingCount <= 0; return pendingCount <= 0;
} }
@ -99,7 +99,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
} }
} }
public void addMessageFirst(MessageReference node) throws Exception{ public synchronized void addMessageFirst(MessageReference node) throws Exception{
if(node!=null){ if(node!=null){
if(started){ if(started){
firstMessageId=node.getMessageId(); firstMessageId=node.getMessageId();

View File

@ -92,7 +92,7 @@ public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecover
this.maximumSize=maximumSize; this.maximumSize=maximumSize;
} }
public Message[] browse(ActiveMQDestination destination) throws Exception{ public synchronized Message[] browse(ActiveMQDestination destination) throws Exception{
List result=new ArrayList(); List result=new ArrayList();
DestinationFilter filter=DestinationFilter.parseFilter(destination); DestinationFilter filter=DestinationFilter.parseFilter(destination);
int t=tail; int t=tail;

View File

@ -90,7 +90,7 @@ public class SubscriptionInfo implements DataStructure {
return IntrospectionSupport.toString(this); return IntrospectionSupport.toString(this);
} }
public int hasCode() { public int hashCode() {
int h1 = clientId != null ? clientId.hashCode():-1; int h1 = clientId != null ? clientId.hashCode():-1;
int h2 = subcriptionName != null ? subcriptionName.hashCode():-1; int h2 = subcriptionName != null ? subcriptionName.hashCode():-1;
return h1 ^ h2; return h1 ^ h2;

View File

@ -389,6 +389,7 @@ public class ReadOnlyContext implements Context, Serializable {
} }
private class ListEnumeration extends LocalNamingEnumeration { private class ListEnumeration extends LocalNamingEnumeration {
ListEnumeration(){}
public Object next() throws NamingException { public Object next() throws NamingException {
return nextElement(); return nextElement();
} }
@ -400,6 +401,8 @@ public class ReadOnlyContext implements Context, Serializable {
} }
private class ListBindingEnumeration extends LocalNamingEnumeration { private class ListBindingEnumeration extends LocalNamingEnumeration {
ListBindingEnumeration(){
}
public Object next() throws NamingException { public Object next() throws NamingException {
return nextElement(); return nextElement();
} }

View File

@ -61,8 +61,8 @@ public final class AsyncDataManager {
public static final int ITEM_HEAD_FOOT_SPACE=ITEM_HEAD_SPACE+ITEM_FOOT_SPACE; public static final int ITEM_HEAD_FOOT_SPACE=ITEM_HEAD_SPACE+ITEM_FOOT_SPACE;
public static final byte[] ITEM_HEAD_SOR=new byte[]{'S', 'O', 'R'}; // static final byte[] ITEM_HEAD_SOR=new byte[]{'S', 'O', 'R'}; //
public static final byte[] ITEM_HEAD_EOR=new byte[]{'E', 'O', 'R'}; // static final byte[] ITEM_HEAD_EOR=new byte[]{'E', 'O', 'R'}; //
public static final byte DATA_ITEM_TYPE=1; public static final byte DATA_ITEM_TYPE=1;
public static final byte REDO_ITEM_TYPE=2; public static final byte REDO_ITEM_TYPE=2;
@ -71,8 +71,8 @@ public final class AsyncDataManager {
public static final String DEFAULT_FILE_PREFIX="data-"; public static final String DEFAULT_FILE_PREFIX="data-";
public static final int DEFAULT_MAX_FILE_LENGTH=1024*1024*32; public static final int DEFAULT_MAX_FILE_LENGTH=1024*1024*32;
private File directory = new File(DEFAULT_DIRECTORY); File directory = new File(DEFAULT_DIRECTORY);
private String filePrefix=DEFAULT_FILE_PREFIX; String filePrefix=DEFAULT_FILE_PREFIX;
private int maxFileLength = DEFAULT_MAX_FILE_LENGTH; private int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
private int preferedFileLength = DEFAULT_MAX_FILE_LENGTH-1024*512; private int preferedFileLength = DEFAULT_MAX_FILE_LENGTH-1024*512;
@ -101,8 +101,10 @@ public final class AsyncDataManager {
started=true; started=true;
directory.mkdirs(); directory.mkdirs();
controlFile = new ControlFile(new File(directory, filePrefix+"control"), CONTROL_RECORD_MAX_LENGTH); synchronized(this){
controlFile=new ControlFile(new File(directory,filePrefix+"control"),CONTROL_RECORD_MAX_LENGTH);
controlFile.lock(); controlFile.lock();
}
ByteSequence sequence = controlFile.load(); ByteSequence sequence = controlFile.load();
if( sequence != null && sequence.getLength()>0 ) { if( sequence != null && sequence.getLength()>0 ) {
@ -116,7 +118,7 @@ public final class AsyncDataManager {
File[] files=directory.listFiles(new FilenameFilter(){ File[] files=directory.listFiles(new FilenameFilter(){
public boolean accept(File dir,String n){ public boolean accept(File dir,String n){
return dir.equals(dir)&&n.startsWith(filePrefix); return dir.equals(directory)&&n.startsWith(filePrefix);
} }
}); });
@ -252,8 +254,8 @@ public final class AsyncDataManager {
} }
DataFile getDataFile(Location item) throws IOException{ DataFile getDataFile(Location item) throws IOException{
Integer key=new Integer(item.getDataFileId()); Integer key= Integer.valueOf(item.getDataFileId());
DataFile dataFile=(DataFile) fileMap.get(key); DataFile dataFile=fileMap.get(key);
if(dataFile==null){ if(dataFile==null){
log.error("Looking for key " + key + " but not found in fileMap: " + fileMap); log.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
throw new IOException("Could not locate data file "+filePrefix+"-"+item.getDataFileId()); throw new IOException("Could not locate data file "+filePrefix+"-"+item.getDataFileId());
@ -265,14 +267,12 @@ public final class AsyncDataManager {
return (DataFile) dataFile.getNext(); return (DataFile) dataFile.getNext();
} }
public void close() throws IOException{ public synchronized void close() throws IOException{
synchronized(this){
if(!started){ if(!started){
return; return;
} }
Scheduler.cancel(cleanupTask); Scheduler.cancel(cleanupTask);
accessorPool.close(); accessorPool.close();
}
storeState(false); storeState(false);
appender.close(); appender.close();
fileMap.clear(); fileMap.clear();
@ -281,7 +281,7 @@ public final class AsyncDataManager {
started=false; started=false;
} }
private synchronized void cleanup() { synchronized void cleanup() {
if( accessorPool!=null ) { if( accessorPool!=null ) {
accessorPool.disposeUnused(); accessorPool.disposeUnused();
} }
@ -375,7 +375,7 @@ public final class AsyncDataManager {
} }
} }
private void removeDataFile(DataFile dataFile) throws IOException{ private synchronized void removeDataFile(DataFile dataFile) throws IOException{
// Make sure we don't delete too much data. // Make sure we don't delete too much data.
if( dataFile==currentWriteFile || mark==null || dataFile.getDataFileId() >= mark.getDataFileId() ) { if( dataFile==currentWriteFile || mark==null || dataFile.getDataFileId() >= mark.getDataFileId() ) {
@ -414,7 +414,7 @@ public final class AsyncDataManager {
return mark; return mark;
} }
public Location getNextLocation(Location location) throws IOException, IllegalStateException { public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
Location cur = null; Location cur = null;
@ -492,17 +492,17 @@ public final class AsyncDataManager {
storeState(sync); storeState(sync);
} }
private void storeState(boolean sync) throws IOException { private synchronized void storeState(boolean sync) throws IOException{
ByteSequence state = marshallState(); ByteSequence state=marshallState();
appender.storeItem(state, Location.MARK_TYPE, sync); appender.storeItem(state,Location.MARK_TYPE,sync);
controlFile.store(state, sync); controlFile.store(state,sync);
} }
public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
return appender.storeItem(data, Location.USER_TYPE, sync); return appender.storeItem(data, Location.USER_TYPE, sync);
} }
public Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException { public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {
return appender.storeItem(data, type, sync); return appender.storeItem(data, type, sync);
} }

View File

@ -27,7 +27,7 @@ import org.apache.activemq.util.LinkedNode;
* *
* @version $Revision: 1.1.1.1 $ * @version $Revision: 1.1.1.1 $
*/ */
class DataFile extends LinkedNode implements Comparable { class DataFile extends LinkedNode implements Comparable<DataFile> {
private final File file; private final File file;
private final Integer dataFileId; private final Integer dataFileId;
@ -39,7 +39,7 @@ class DataFile extends LinkedNode implements Comparable {
DataFile(File file, int number, int preferedSize){ DataFile(File file, int number, int preferedSize){
this.file=file; this.file=file;
this.preferedSize = preferedSize; this.preferedSize = preferedSize;
this.dataFileId=new Integer(number); this.dataFileId=Integer.valueOf(number);
length=(int)(file.exists()?file.length():0); length=(int)(file.exists()?file.length():0);
} }
@ -98,11 +98,18 @@ class DataFile extends LinkedNode implements Comparable {
return file.delete(); return file.delete();
} }
public int compareTo(Object o) { public int compareTo(DataFile df) {
DataFile df = (DataFile) o;
return dataFileId - df.dataFileId; return dataFileId - df.dataFileId;
} }
public boolean equals(Object o) {
boolean result = false;
if (o instanceof DataFile) {
result = compareTo((DataFile)o)==0;
}
return result;
}
} }

View File

@ -55,9 +55,12 @@ class DataFileAppender {
return hash; return hash;
} }
public boolean equals(Object obj) { public boolean equals(Object obj){
WriteKey di = (WriteKey)obj; if(obj instanceof WriteKey){
return di.file == file && di.offset == offset; WriteKey di=(WriteKey)obj;
return di.file==file&&di.offset==offset;
}
return false;
} }
} }

View File

@ -132,4 +132,17 @@ public final class Location implements Comparable<Location> {
return dataFileId - l.dataFileId; return dataFileId - l.dataFileId;
} }
public boolean equals(Object o) {
boolean result = false;
if (o instanceof Location) {
result = compareTo((Location)o)==0;
}
return result;
}
public int hashCode() {
return dataFileId ^ offset;
}
} }

View File

@ -188,12 +188,13 @@ public abstract class BaseContainerImpl{
} }
} }
protected final void delete(IndexItem key,IndexItem prev,IndexItem next){ protected final void delete(final IndexItem keyItem,final IndexItem prevItem,final IndexItem nextItem){
if(keyItem!=null){
try{ try{
dataManager.removeInterestInFile(key.getKeyFile()); IndexItem prev=prevItem==null?root:prevItem;
dataManager.removeInterestInFile(key.getValueFile()); IndexItem next=nextItem!=root?nextItem:null;
prev=prev==null?root:prev; dataManager.removeInterestInFile(keyItem.getKeyFile());
next=next!=root?next:null; dataManager.removeInterestInFile(keyItem.getValueFile());
if(next!=null){ if(next!=null){
prev.setNextItem(next.getOffset()); prev.setNextItem(next.getOffset());
next.setPreviousItem(prev.getOffset()); next.setPreviousItem(prev.getOffset());
@ -202,12 +203,13 @@ public abstract class BaseContainerImpl{
prev.setNextItem(Item.POSITION_NOT_SET); prev.setNextItem(Item.POSITION_NOT_SET);
} }
updateIndexes(prev); updateIndexes(prev);
indexManager.freeIndex(key); indexManager.freeIndex(keyItem);
}catch(IOException e){ }catch(IOException e){
log.error("Failed to delete "+key,e); log.error("Failed to delete "+keyItem,e);
throw new RuntimeStoreException(e); throw new RuntimeStoreException(e);
} }
} }
}
protected final void checkClosed(){ protected final void checkClosed(){
if(closed){ if(closed){

View File

@ -115,13 +115,14 @@ public class ContainerKeySet extends ContainerCollectionSupport implements Set{
} }
public String toString() { public String toString() {
String result ="ContainerKeySet["; StringBuffer result =new StringBuffer(32);
result.append("ContainerKeySet[");
IndexItem item = container.getInternalList().getRoot(); IndexItem item = container.getInternalList().getRoot();
while ((item = container.getInternalList().getNextEntry(item)) != null) { while ((item = container.getInternalList().getNextEntry(item)) != null) {
result += container.getKey(item); result.append(container.getKey(item));
result += ","; result.append(",");
} }
result +="]"; result.append("]");
return result; return result.toString();
} }
} }

View File

@ -122,6 +122,10 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
return result; return result;
} }
public int hashCode() {
return super.hashCode();
}
/* /*
* (non-Javadoc) * (non-Javadoc)
* *
@ -158,13 +162,14 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
public synchronized Object removeFirst(){ public synchronized Object removeFirst(){
load(); load();
Object result=null; Object result=null;
IndexItem item=(IndexItem)indexList.getFirst(); IndexItem item=indexList.getFirst();
if(item!=null){ if(item!=null){
itemRemoved(0); itemRemoved(0);
result=getValue(item); result=getValue(item);
IndexItem prev=root; IndexItem prev=root;
IndexItem next=indexList.size()>1?(IndexItem)indexList.get(1):null; IndexItem next=indexList.size()>1?(IndexItem)indexList.get(1):null;
indexList.removeFirst(); indexList.removeFirst();
delete(item,prev,next); delete(item,prev,next);
item=null; item=null;
} }
@ -306,6 +311,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
IndexItem prev=indexList.getPrevEntry(item); IndexItem prev=indexList.getPrevEntry(item);
IndexItem next=indexList.getNextEntry(item); IndexItem next=indexList.getNextEntry(item);
indexList.remove(item); indexList.remove(item);
delete(item,prev,next); delete(item,prev,next);
} }
@ -591,7 +597,6 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*/ */
public synchronized ListIterator listIterator(){ public synchronized ListIterator listIterator(){
load(); load();
IndexItem start= indexList.getFirst();
return new ContainerListIterator(this,indexList,indexList.getRoot()); return new ContainerListIterator(this,indexList,indexList.getRoot());
} }

View File

@ -497,13 +497,12 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
protected synchronized IndexItem write(Object key,Object value){ protected synchronized IndexItem write(Object key,Object value){
IndexItem index=null; IndexItem index=null;
try{ try{
if(key!=null){
index=indexManager.createNewIndex(); index=indexManager.createNewIndex();
StoreLocation data=dataManager.storeDataItem(keyMarshaller,key); StoreLocation data=dataManager.storeDataItem(keyMarshaller,key);
index.setKeyData(data); index.setKeyData(data);
}
if(value!=null){ if(value!=null){
StoreLocation data=dataManager.storeDataItem(valueMarshaller,value); data=dataManager.storeDataItem(valueMarshaller,value);
index.setValueData(data); index.setValueData(data);
} }
IndexItem prev=indexList.getLast(); IndexItem prev=indexList.getLast();

View File

@ -38,7 +38,7 @@ class DataFile{
DataFile(File file,int number){ DataFile(File file,int number){
this.file=file; this.file=file;
this.number=new Integer(number); this.number=Integer.valueOf(number);
length=file.exists()?file.length():0; length=file.exists()?file.length():0;
} }

View File

@ -43,7 +43,7 @@ public final class DataManagerImpl implements DataManager {
private static final Log log=LogFactory.getLog(DataManagerImpl.class); private static final Log log=LogFactory.getLog(DataManagerImpl.class);
public static final long MAX_FILE_LENGTH=1024*1024*32; public static final long MAX_FILE_LENGTH=1024*1024*32;
private static final String NAME_PREFIX="data-"; private static final String NAME_PREFIX="data-";
private final File dir; private final File directory;
private final String name; private final String name;
private SyncDataFileReader reader; private SyncDataFileReader reader;
private SyncDataFileWriter writer; private SyncDataFileWriter writer;
@ -59,14 +59,14 @@ public final class DataManagerImpl implements DataManager {
private String dataFilePrefix; private String dataFilePrefix;
public DataManagerImpl(File dir, final String name){ public DataManagerImpl(File dir, final String name){
this.dir=dir; this.directory=dir;
this.name=name; this.name=name;
dataFilePrefix = NAME_PREFIX+name+"-"; dataFilePrefix = NAME_PREFIX+name+"-";
// build up list of current dataFiles // build up list of current dataFiles
File[] files=dir.listFiles(new FilenameFilter(){ File[] files=dir.listFiles(new FilenameFilter(){
public boolean accept(File dir,String n){ public boolean accept(File dir,String n){
return dir.equals(dir)&&n.startsWith(dataFilePrefix); return dir.equals(directory)&&n.startsWith(dataFilePrefix);
} }
}); });
if(files!=null){ if(files!=null){
@ -86,7 +86,7 @@ public final class DataManagerImpl implements DataManager {
private DataFile createAndAddDataFile(int num){ private DataFile createAndAddDataFile(int num){
String fileName=dataFilePrefix+num; String fileName=dataFilePrefix+num;
File file=new File(dir,fileName); File file=new File(directory,fileName);
DataFile result=new DataFile(file,num); DataFile result=new DataFile(file,num);
fileMap.put(result.getNumber(),result); fileMap.put(result.getNumber(),result);
return result; return result;
@ -114,7 +114,7 @@ public final class DataManagerImpl implements DataManager {
} }
DataFile getDataFile(StoreLocation item) throws IOException{ DataFile getDataFile(StoreLocation item) throws IOException{
Integer key=new Integer(item.getFile()); Integer key=Integer.valueOf(item.getFile());
DataFile dataFile=(DataFile) fileMap.get(key); DataFile dataFile=(DataFile) fileMap.get(key);
if(dataFile==null){ if(dataFile==null){
log.error("Looking for key " + key + " but not found in fileMap: " + fileMap); log.error("Looking for key " + key + " but not found in fileMap: " + fileMap);

View File

@ -297,12 +297,8 @@ class HashBin {
} }
} }
private void doUnderFlow(int index) { private void doUnderFlow(@SuppressWarnings("unused") int index) {
int pageNo = index / maximumEntries; //does little
int nextPageNo = pageNo + 1;
if (nextPageNo < hashPages.size()) {
}
HashPageInfo info = hashPages.get(pageNo);
} }
private void end() throws IOException { private void end() throws IOException {

View File

@ -43,7 +43,7 @@ class HashEntry implements Comparable{
return compareTo(o)==0; return compareTo(o)==0;
} }
public int hasCode(){ public int hashCode(){
return key.hashCode(); return key.hashCode();
} }

View File

@ -260,7 +260,7 @@ public class HashIndex implements Index{
public synchronized void delete() throws IOException{ public synchronized void delete() throws IOException{
unload(); unload();
if(file.exists()){ if(file.exists()){
boolean result=file.delete(); file.delete();
} }
length=0; length=0;
} }

View File

@ -225,9 +225,12 @@ class HashPage {
void dump() { void dump() {
String str = this + ": "; StringBuffer str = new StringBuffer(32);
str.append(toString());
str.append(": ");
for (HashEntry entry : hashIndexEntries) { for (HashEntry entry : hashIndexEntries) {
str += entry + ","; str .append(entry);
str.append(",");
} }
log.info(str); log.info(str);
} }

View File

@ -45,7 +45,7 @@ class TreeEntry implements Comparable{
return compareTo(o)==0; return compareTo(o)==0;
} }
public int hasCode(){ public int hashCode(){
return key.hashCode(); return key.hashCode();
} }

View File

@ -50,7 +50,7 @@ public class CacheEvictionUsageListener implements UsageListener {
}, "Cache Evictor: "+System.identityHashCode(this)); }, "Cache Evictor: "+System.identityHashCode(this));
} }
private boolean evictMessages() { boolean evictMessages() {
// Try to take the memory usage down below the low mark. // Try to take the memory usage down below the low mark.
try { try {
log.debug("Evicting cache memory usage: "+usageManager.getPercentUsage()); log.debug("Evicting cache memory usage: "+usageManager.getPercentUsage());

View File

@ -125,7 +125,7 @@ public class ForwardingBridge implements Service{
/** /**
* @throws IOException * @throws IOException
*/ */
private void startBridge() throws IOException { final void startBridge() throws IOException {
connectionInfo = new ConnectionInfo(); connectionInfo = new ConnectionInfo();
connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
connectionInfo.setClientId(clientId); connectionInfo.setClientId(clientId);

View File

@ -196,7 +196,7 @@ public class ConnectionPool {
protected class Synchronization implements javax.transaction.Synchronization { protected class Synchronization implements javax.transaction.Synchronization {
private final PooledSession session; private final PooledSession session;
private Synchronization(PooledSession session) { protected Synchronization(PooledSession session) {
this.session = session; this.session = session;
} }

View File

@ -168,19 +168,23 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
} }
} }
public Response processAddDestination(DestinationInfo info) { public Response processAddDestination(DestinationInfo info){
ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId()); if(info!=null){
if( cs != null && info != null && info.getDestination().isTemporary() ) { ConnectionState cs=(ConnectionState)connectionStates.get(info.getConnectionId());
if(cs!=null&&info.getDestination().isTemporary()){
cs.addTempDestination(info); cs.addTempDestination(info);
} }
}
return TRACKED_RESPONSE_MARKER; return TRACKED_RESPONSE_MARKER;
} }
public Response processRemoveDestination(DestinationInfo info) { public Response processRemoveDestination(DestinationInfo info){
ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId()); if(info!=null){
if( cs != null && info != null && info.getDestination().isTemporary() ) { ConnectionState cs=(ConnectionState)connectionStates.get(info.getConnectionId());
if(cs!=null&&info.getDestination().isTemporary()){
cs.removeTempDestination(info.getDestination()); cs.removeTempDestination(info.getDestination());
} }
}
return TRACKED_RESPONSE_MARKER; return TRACKED_RESPONSE_MARKER;
} }

View File

@ -130,7 +130,7 @@ public class AMQMessageStore implements MessageStore{
} }
} }
private void addMessage(final Message message,final Location location) throws InterruptedIOException{ void addMessage(final Message message,final Location location) throws InterruptedIOException{
ReferenceData data=new ReferenceData(); ReferenceData data=new ReferenceData();
data.setExpiration(message.getExpiration()); data.setExpiration(message.getExpiration());
data.setFileId(location.getDataFileId()); data.setFileId(location.getDataFileId());
@ -205,7 +205,7 @@ public class AMQMessageStore implements MessageStore{
} }
} }
private void removeMessage(final MessageAck ack,final Location location) throws InterruptedIOException{ final void removeMessage(final MessageAck ack,final Location location) throws InterruptedIOException{
ReferenceData data; ReferenceData data;
synchronized(this){ synchronized(this){
lastLocation=location; lastLocation=location;
@ -273,7 +273,7 @@ public class AMQMessageStore implements MessageStore{
* @return * @return
* @throws IOException * @throws IOException
*/ */
private void asyncWrite(){ void asyncWrite(){
try{ try{
CountDownLatch countDown; CountDownLatch countDown;
synchronized(this){ synchronized(this){

View File

@ -218,8 +218,10 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
if(!started.compareAndSet(true,false)) if(!started.compareAndSet(true,false))
return; return;
this.usageManager.removeUsageListener(this); this.usageManager.removeUsageListener(this);
synchronized(this){
Scheduler.cancel(periodicCheckpointTask); Scheduler.cancel(periodicCheckpointTask);
Scheduler.cancel(periodicCleanupTask); Scheduler.cancel(periodicCleanupTask);
}
Iterator<AMQMessageStore> iterator=queues.values().iterator(); Iterator<AMQMessageStore> iterator=queues.values().iterator();
while(iterator.hasNext()){ while(iterator.hasNext()){
AMQMessageStore ms=iterator.next(); AMQMessageStore ms=iterator.next();
@ -232,7 +234,9 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
} }
// Take one final checkpoint and stop checkpoint processing. // Take one final checkpoint and stop checkpoint processing.
checkpoint(true); checkpoint(true);
synchronized(this){
checkpointTask.shutdown(); checkpointTask.shutdown();
}
queues.clear(); queues.clear();
topics.clear(); topics.clear();
IOException firstException=null; IOException firstException=null;
@ -259,8 +263,8 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
CountDownLatch latch=null; CountDownLatch latch=null;
synchronized(this){ synchronized(this){
latch=nextCheckpointCountDownLatch; latch=nextCheckpointCountDownLatch;
}
checkpointTask.wakeup(); checkpointTask.wakeup();
}
if(sync){ if(sync){
if(log.isDebugEnabled()){ if(log.isDebugEnabled()){
log.debug("Waitng for checkpoint to complete."); log.debug("Waitng for checkpoint to complete.");
@ -585,7 +589,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
return transactionStore; return transactionStore;
} }
public void deleteAllMessages() throws IOException{ public synchronized void deleteAllMessages() throws IOException{
deleteAllMessages=true; deleteAllMessages=true;
} }
@ -669,11 +673,11 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
this.maxCheckpointWorkers=maxCheckpointWorkers; this.maxCheckpointWorkers=maxCheckpointWorkers;
} }
public File getDirectory(){ public synchronized File getDirectory(){
return directory; return directory;
} }
public void setDirectory(File directory){ public synchronized void setDirectory(File directory){
this.directory=directory; this.directory=directory;
} }

View File

@ -143,7 +143,7 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
* @param key * @param key
* @throws InterruptedIOException * @throws InterruptedIOException
*/ */
private void acknowledge(MessageId messageId,Location location,SubscriptionKey key) throws InterruptedIOException{ protected void acknowledge(MessageId messageId,Location location,SubscriptionKey key) throws InterruptedIOException{
synchronized(this){ synchronized(this){
lastLocation=location; lastLocation=location;
ackedLastAckLocations.put(key,messageId); ackedLastAckLocations.put(key,messageId);

View File

@ -125,7 +125,7 @@ public class JournalMessageStore implements MessageStore {
} }
} }
private void addMessage(final Message message, final RecordLocation location) { void addMessage(final Message message, final RecordLocation location) {
synchronized (this) { synchronized (this) {
lastLocation = location; lastLocation = location;
MessageId id = message.getMessageId(); MessageId id = message.getMessageId();
@ -187,7 +187,7 @@ public class JournalMessageStore implements MessageStore {
} }
} }
private void removeMessage(final MessageAck ack, final RecordLocation location) { final void removeMessage(final MessageAck ack, final RecordLocation location) {
synchronized (this) { synchronized (this) {
lastLocation = location; lastLocation = location;
MessageId id = ack.getLastMessageId(); MessageId id = ack.getLastMessageId();
@ -253,33 +253,31 @@ public class JournalMessageStore implements MessageStore {
ConnectionContext context = transactionTemplate.getContext(); ConnectionContext context = transactionTemplate.getContext();
// Checkpoint the added messages. // Checkpoint the added messages.
Iterator iterator = cpAddedMessageIds.values().iterator(); synchronized(JournalMessageStore.this){
while (iterator.hasNext()) { Iterator iterator=cpAddedMessageIds.values().iterator();
Message message = (Message) iterator.next(); while(iterator.hasNext()){
try { Message message=(Message)iterator.next();
longTermStore.addMessage(context, message); try{
} catch (Throwable e) { longTermStore.addMessage(context,message);
log.warn("Message could not be added to long term store: " + e.getMessage(), e); }catch(Throwable e){
log.warn("Message could not be added to long term store: "+e.getMessage(),e);
} }
size+=message.getSize();
size += message.getSize();
message.decrementReferenceCount(); message.decrementReferenceCount();
// Commit the batch if it's getting too big // Commit the batch if it's getting too big
if( size >= maxCheckpointMessageAddSize ) { if(size>=maxCheckpointMessageAddSize){
persitanceAdapter.commitTransaction(context); persitanceAdapter.commitTransaction(context);
persitanceAdapter.beginTransaction(context); persitanceAdapter.beginTransaction(context);
size=0; size=0;
} }
}
} }
persitanceAdapter.commitTransaction(context); persitanceAdapter.commitTransaction(context);
persitanceAdapter.beginTransaction(context); persitanceAdapter.beginTransaction(context);
// Checkpoint the removed messages. // Checkpoint the removed messages.
iterator = cpRemovedMessageLocations.iterator(); Iterator iterator = cpRemovedMessageLocations.iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
try { try {
MessageAck ack = (MessageAck) iterator.next(); MessageAck ack = (MessageAck) iterator.next();
@ -303,7 +301,8 @@ public class JournalMessageStore implements MessageStore {
if( cpActiveJournalLocations.size() > 0 ) { if( cpActiveJournalLocations.size() > 0 ) {
Collections.sort(cpActiveJournalLocations); Collections.sort(cpActiveJournalLocations);
return (RecordLocation) cpActiveJournalLocations.get(0); return (RecordLocation) cpActiveJournalLocations.get(0);
} else { }
synchronized (this){
return lastLocation; return lastLocation;
} }
} }

View File

@ -92,8 +92,8 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
private final ConcurrentHashMap topics = new ConcurrentHashMap(); private final ConcurrentHashMap topics = new ConcurrentHashMap();
private UsageManager usageManager; private UsageManager usageManager;
private long checkpointInterval = 1000 * 60 * 5; long checkpointInterval = 1000 * 60 * 5;
private long lastCheckpointRequest = System.currentTimeMillis(); long lastCheckpointRequest = System.currentTimeMillis();
private long lastCleanup = System.currentTimeMillis(); private long lastCleanup = System.currentTimeMillis();
private int maxCheckpointWorkers = 10; private int maxCheckpointWorkers = 10;
private int maxCheckpointMessageAddSize = 1024*1024; private int maxCheckpointMessageAddSize = 1024*1024;
@ -112,7 +112,11 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
final Runnable createPeriodicCheckpointTask() { final Runnable createPeriodicCheckpointTask() {
return new Runnable() { return new Runnable() {
public void run() { public void run() {
if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) { long lastTime = 0;
synchronized(this) {
lastTime = lastCheckpointRequest;
}
if( System.currentTimeMillis()>lastTime+checkpointInterval ) {
checkpoint(false, true); checkpoint(false, true);
} }
} }

View File

@ -142,7 +142,7 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
* @param location * @param location
* @param key * @param key
*/ */
private void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) { protected void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
synchronized(this) { synchronized(this) {
lastLocation = location; lastLocation = location;
ackedLastAckLocations.put(key, messageId); ackedLastAckLocations.put(key, messageId);

View File

@ -174,7 +174,7 @@ public class KahaMessageStore implements MessageStore{
* @param nextToDispatch * @param nextToDispatch
* @see org.apache.activemq.store.MessageStore#resetBatching(org.apache.activemq.command.MessageId) * @see org.apache.activemq.store.MessageStore#resetBatching(org.apache.activemq.command.MessageId)
*/ */
public void resetBatching(){ public synchronized void resetBatching(){
batchEntry=null; batchEntry=null;
} }

View File

@ -149,7 +149,7 @@ public class KahaReferenceStore implements ReferenceStore{
messageContainer.clear(); messageContainer.clear();
} }
public void resetBatching(){ public synchronized void resetBatching(){
batchEntry=null; batchEntry=null;
lastBatchId=null; lastBatchId=null;
} }

View File

@ -212,7 +212,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
} }
@Override @Override
public void setDirectory(File directory){ public synchronized void setDirectory(File directory){
File file = new File(directory,"data"); File file = new File(directory,"data");
super.setDirectory(file); super.setDirectory(file);
this.stateStore=createStateStore(directory); this.stateStore=createStateStore(directory);

View File

@ -138,7 +138,7 @@ public class KahaTransactionStore implements TransactionStore{
* @param ack * @param ack
* @throws IOException * @throws IOException
*/ */
private void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException{ final void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException{
if(ack.isInTransaction()){ if(ack.isInTransaction()){
KahaTransaction tx=getOrCreateTx(ack.getTransactionId()); KahaTransaction tx=getOrCreateTx(ack.getTransactionId());
tx.add((KahaMessageStore) destination,ack); tx.add((KahaMessageStore) destination,ack);

View File

@ -27,7 +27,6 @@ import java.util.Iterator;
public class TopicSubContainer { public class TopicSubContainer {
private transient ListContainer listContainer; private transient ListContainer listContainer;
private transient StoreEntry batchEntry; private transient StoreEntry batchEntry;
private transient String lastBatchId;
public TopicSubContainer(ListContainer container) { public TopicSubContainer(ListContainer container) {
this.listContainer = container; this.listContainer = container;
@ -45,12 +44,10 @@ public class TopicSubContainer {
* @param batchEntry the batchEntry to set * @param batchEntry the batchEntry to set
*/ */
public void setBatchEntry(String id,StoreEntry batchEntry) { public void setBatchEntry(String id,StoreEntry batchEntry) {
this.lastBatchId=id;
this.batchEntry = batchEntry; this.batchEntry = batchEntry;
} }
public void reset() { public void reset() {
lastBatchId=null;
batchEntry = null; batchEntry = null;
} }

View File

@ -229,7 +229,7 @@ public class MemoryTransactionStore implements TransactionStore {
* @param ack * @param ack
* @throws IOException * @throws IOException
*/ */
private void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException { final void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException {
if( doingRecover ) if( doingRecover )
return; return;

View File

@ -82,7 +82,7 @@ class DedicatedTaskRunner implements TaskRunner {
shutdown(0); shutdown(0);
} }
private void runTask() { final void runTask() {
try { try {
while( true ) { while( true ) {

View File

@ -96,7 +96,7 @@ class PooledTaskRunner implements TaskRunner {
public void shutdown() throws InterruptedException { public void shutdown() throws InterruptedException {
shutdown(0); shutdown(0);
} }
private void runTask() { final void runTask() {
synchronized (runable) { synchronized (runable) {
queued = false; queued = false;

View File

@ -68,7 +68,7 @@ public class InactivityMonitor extends TransportFilter {
} }
private void writeCheck() { final void writeCheck() {
synchronized(writeChecker) { synchronized(writeChecker) {
if( inSend.get() ) { if( inSend.get() ) {
log.trace("A send is in progress"); log.trace("A send is in progress");
@ -90,7 +90,7 @@ public class InactivityMonitor extends TransportFilter {
} }
} }
private void readCheck() { final void readCheck() {
synchronized(readChecker) { synchronized(readChecker) {
if( inReceive.get() ) { if( inReceive.get() ) {
log.trace("A receive is in progress"); log.trace("A receive is in progress");

View File

@ -60,8 +60,6 @@ public class RendezvousDiscoveryAgent implements DiscoveryAgent, ServiceListener
private String group = "default"; private String group = "default";
private final CopyOnWriteArrayList serviceInfos = new CopyOnWriteArrayList(); private final CopyOnWriteArrayList serviceInfos = new CopyOnWriteArrayList();
private String brokerName;
// DiscoveryAgent interface // DiscoveryAgent interface
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
public void start() throws Exception { public void start() throws Exception {
@ -232,11 +230,16 @@ public class RendezvousDiscoveryAgent implements DiscoveryAgent, ServiceListener
return "_" + group+"."+TYPE_SUFFIX; return "_" + group+"."+TYPE_SUFFIX;
} }
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
}
public void serviceFailed(DiscoveryEvent event) throws IOException { public void serviceFailed(DiscoveryEvent event) throws IOException {
// TODO: is there a way to notify the JmDNS that the service failed? // TODO: is there a way to notify the JmDNS that the service failed?
} }
/**
* @param brokerName
* @see org.apache.activemq.transport.discovery.DiscoveryAgent#setBrokerName(java.lang.String)
*/
public void setBrokerName(String brokerName){
// implementation of interface
}
} }

View File

@ -91,7 +91,7 @@ public class FailoverTransport implements CompositeTransport {
return; return;
} }
if (command.isResponse()) { if (command.isResponse()) {
Object object = requestMap.remove(new Integer(((Response) command).getCorrelationId())); Object object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
if( object!=null && object.getClass() == Tracked.class ) { if( object!=null && object.getClass() == Tracked.class ) {
((Tracked)object).onResponses(); ((Tracked)object).onResponses();
} }
@ -231,7 +231,7 @@ public class FailoverTransport implements CompositeTransport {
}, "ActiveMQ Failover Worker: "+System.identityHashCode(this)); }, "ActiveMQ Failover Worker: "+System.identityHashCode(this));
} }
private void handleTransportFailure(IOException e) throws InterruptedException { final void handleTransportFailure(IOException e) throws InterruptedException {
if (transportListener != null){ if (transportListener != null){
transportListener.transportInterupted(); transportListener.transportInterupted();
} }
@ -382,9 +382,9 @@ public class FailoverTransport implements CompositeTransport {
// it later. // it later.
Tracked tracked = stateTracker.track(command); Tracked tracked = stateTracker.track(command);
if( tracked!=null && tracked.isWaitingForResponse() ) { if( tracked!=null && tracked.isWaitingForResponse() ) {
requestMap.put(new Integer(command.getCommandId()), tracked); requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
} else if ( tracked==null && command.isResponseRequired()) { } else if ( tracked==null && command.isResponseRequired()) {
requestMap.put(new Integer(command.getCommandId()), command); requestMap.put(Integer.valueOf(command.getCommandId()), command);
} }
// Send the message. // Send the message.
@ -398,7 +398,7 @@ public class FailoverTransport implements CompositeTransport {
// since we will retry in this method.. take it out of the request // since we will retry in this method.. take it out of the request
// map so that it is not sent 2 times on recovery // map so that it is not sent 2 times on recovery
if( command.isResponseRequired() ) { if( command.isResponseRequired() ) {
requestMap.remove(new Integer(command.getCommandId())); requestMap.remove(Integer.valueOf(command.getCommandId()));
} }
// Rethrow the exception so it will handled by the outer catch // Rethrow the exception so it will handled by the outer catch

View File

@ -19,6 +19,7 @@ package org.apache.activemq.util;
import javax.jms.Message; import javax.jms.Message;
import java.io.Serializable;
import java.util.Comparator; import java.util.Comparator;
/** /**
@ -26,7 +27,7 @@ import java.util.Comparator;
* *
* @version $Revision$ * @version $Revision$
*/ */
public abstract class MessageComparatorSupport implements Comparator { public abstract class MessageComparatorSupport implements Comparator, Serializable {
public int compare(Object object1, Object object2) { public int compare(Object object1, Object object2) {
Message command1 = (Message) object1; Message command1 = (Message) object1;
@ -36,11 +37,20 @@ public abstract class MessageComparatorSupport implements Comparator {
protected abstract int compareMessages(Message message1, Message message2); protected abstract int compareMessages(Message message1, Message message2);
protected int compareComparators(Comparable comparable, Comparable comparable2) { protected int compareComparators(final Comparable comparable, final Comparable comparable2) {
if (comparable != null) { if (comparable == null && comparable2 == null) {
return 0;
}
else if (comparable != null) {
if (comparable2== null) {
return 1;
}
return comparable.compareTo(comparable2); return comparable.compareTo(comparable2);
} }
else if (comparable2 != null) { else if (comparable2 != null) {
if (comparable== null) {
return -11;
}
return comparable2.compareTo(comparable) * -1; return comparable2.compareTo(comparable) * -1;
} }
return 0; return 0;