git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@589314 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-10-28 09:39:12 +00:00
parent 9352c9fcdc
commit c8761c7e0f
7 changed files with 152 additions and 22 deletions

View File

@ -352,7 +352,6 @@ public final class AsyncDataManager {
synchronized void addInterestInFile(DataFile dataFile) {
if (dataFile != null) {
dataFile.increment();
System.err.println("ADD INTEREST: " + dataFile);
}
}
@ -370,24 +369,14 @@ public final class AsyncDataManager {
if (dataFile.decrement() <= 0) {
removeDataFile(dataFile);
}
System.err.println("REMOVE INTEREST: " + dataFile);
}
}
public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Integer lastDataFile) throws IOException {
public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Set<Integer>inProgress) throws IOException {
Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
unUsed.removeAll(inUse);
// Don't purge any data files past lastDataFile
if( lastDataFile!=null ) {
for (Iterator<Integer> iterator = unUsed.iterator(); iterator.hasNext();) {
Integer i = iterator.next();
if( i >= lastDataFile ) {
iterator.remove();
}
}
}
unUsed.removeAll(inProgress);
List<DataFile> purgeList = new ArrayList<DataFile>();
for (Integer key : unUsed) {
DataFile dataFile = (DataFile)fileMap.get(key);

View File

@ -151,6 +151,7 @@ public class AMQMessageStore implements MessageStore {
synchronized (this) {
lastLocation = location;
messages.put(message.getMessageId(), data);
this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId());
}
try {
asyncWriteTask.wakeup();
@ -338,6 +339,7 @@ public class AMQMessageStore implements MessageStore {
Entry<MessageId, ReferenceData> entry = iterator.next();
try {
referenceStore.addMessageReference(context, entry.getKey(), entry.getValue());
AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this,entry.getValue().getFileId());
} catch (Throwable e) {
LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
}

View File

@ -21,12 +21,15 @@ import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activeio.journal.Journal;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
@ -55,9 +58,9 @@ import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
@ -65,6 +68,7 @@ import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* An implementation of {@link PersistenceAdapter} designed for use with a
* {@link Journal} and then check pointing asynchronously on a timeout with some
@ -102,6 +106,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
private boolean persistentIndex=true;
private boolean useNio = true;
private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
private Map<AMQMessageStore,Set<Integer>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Set<Integer>> ();
public String getBrokerName() {
@ -353,9 +358,14 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
try {
// Capture the lastDataFile so that we don't delete any data files
// after this one.
Integer lastDataFile = asyncDataManager.getCurrentDataFileId();
Set<Integer>inProgress = new CopyOnWriteArraySet<Integer>();
for (Set<Integer> set: dataFilesInProgress.values()) {
inProgress.addAll(set);
}
Integer lastDataFile = asyncDataManager.getCurrentDataFileId();
inProgress.add(lastDataFile);
Set<Integer> inUse = referenceStoreAdapter.getReferenceFileIdsInUse();
asyncDataManager.consolidateDataFilesNotIn(inUse, lastDataFile);
asyncDataManager.consolidateDataFilesNotIn(inUse, inProgress);
} catch (IOException e) {
LOG.error("Could not cleanup data files: " + e, e);
}
@ -730,4 +740,20 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
public void setMaxFileLength(int maxFileLength) {
this.maxFileLength = maxFileLength;
}
protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
Set<Integer>set = dataFilesInProgress.get(store);
if (set == null) {
set = new CopyOnWriteArraySet<Integer>();
dataFilesInProgress.put(store, set);
}
set.add(dataFileId);
}
protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) {
Set<Integer>set = dataFilesInProgress.get(store);
if (set != null) {
set.remove(dataFileId);
}
}
}

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import java.util.AbstractSet;
import java.util.Iterator;
import java.util.Set;
/**
* A Simple LRU Set
*
* @version $Revision$
* @param <K>
* @param <V>
*/
public class LRUSet<E>
extends AbstractSet<E>
implements Set<E>, Cloneable, java.io.Serializable{
private static final Object IGNORE = new Object();
private final LRUCache cache;
/**
* Default constructor for an LRU Cache The default capacity is 10000
*/
public LRUSet() {
this(0,10000, 0.75f, true);
}
/**
* Constructs a LRUCache with a maximum capacity
*
* @param maximumCacheSize
*/
public LRUSet(int maximumCacheSize) {
this(0, maximumCacheSize, 0.75f, true);
}
/**
* Constructs an empty <tt>LRUCache</tt> instance with the specified
* initial capacity, maximumCacheSize,load factor and ordering mode.
*
* @param initialCapacity
* the initial capacity.
* @param maximumCacheSize
* @param loadFactor
* the load factor.
* @param accessOrder
* the ordering mode - <tt>true</tt> for access-order,
* <tt>false</tt> for insertion-order.
* @throws IllegalArgumentException
* if the initial capacity is negative or the load factor is
* non-positive.
*/
public LRUSet(int initialCapacity, int maximumCacheSize, float loadFactor, boolean accessOrder) {
this.cache = new LRUCache<E,Object>(initialCapacity,maximumCacheSize,loadFactor,accessOrder);
}
public Iterator<E> iterator() {
return cache.keySet().iterator();
}
public int size() {
return cache.size();
}
public boolean isEmpty() {
return cache.isEmpty();
}
public boolean contains(Object o) {
return cache.containsKey(o);
}
public boolean add(E o) {
return cache.put(o, IGNORE)==null;
}
public boolean remove(Object o) {
return cache.remove(o)==IGNORE;
}
public void clear() {
cache.clear();
}
}

View File

@ -27,10 +27,10 @@ public class AMQStoreDurableTopicTest extends SimpleDurableTopicTest {
protected void configureBroker(BrokerService answer) throws Exception {
File dataFileDir = new File("target/test-amq-data/perfTest/amqdb");
answer.setDeleteAllMessagesOnStartup(true);
AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
adaptor.setDirectory(dataFileDir);
answer.setPersistenceAdapter(adaptor);
answer.setDeleteAllMessagesOnStartup(true);
answer.addConnector(bindAddress);
}
}

View File

@ -35,7 +35,7 @@ public class AMQStoreQueueTest extends SimpleQueueTest {
answer.setPersistenceAdapter(adaptor);
answer.addConnector(bindAddress);
// answer.setDeleteAllMessagesOnStartup(true);
answer.setDeleteAllMessagesOnStartup(true);
}

View File

@ -37,14 +37,14 @@ public class SimpleTopicTest extends TestCase {
protected BrokerService broker;
// protected String
// bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false";
//protected String bindAddress="tcp://localhost:61616";
protected String bindAddress="tcp://localhost:61616";
//protected String bindAddress = "tcp://localhost:61616";
//protected String bindAddress="vm://localhost?marshal=true";
protected String bindAddress="vm://localhost";
//protected String bindAddress="vm://localhost";
protected PerfProducer[] producers;
protected PerfConsumer[] consumers;
protected String destinationName = getClass().getName();
protected int samepleCount = 10;
protected int samepleCount = 20;
protected long sampleInternal = 10000;
protected int numberOfConsumers = 1;
protected int numberofProducers = 2;