mirror of https://github.com/apache/activemq.git
fix for https://issues.apache.org/activemq/browse/AMQ-1926 with test case
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@691621 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
874e935abb
commit
d261ffd9e9
|
@ -434,6 +434,9 @@ public class AsyncDataManager {
|
||||||
purgeList.add(dataFile);
|
purgeList.add(dataFile);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("lastFileId=" + lastFile + ", purgeList: (" + purgeList.size() + ") " + purgeList);
|
||||||
|
}
|
||||||
for (DataFile dataFile : purgeList) {
|
for (DataFile dataFile : purgeList) {
|
||||||
forceRemoveDataFile(dataFile);
|
forceRemoveDataFile(dataFile);
|
||||||
}
|
}
|
||||||
|
@ -465,17 +468,17 @@ public class AsyncDataManager {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
accessorPool.disposeDataFileAccessors(dataFile);
|
accessorPool.disposeDataFileAccessors(dataFile);
|
||||||
fileByFileMap.remove(dataFile.getFile());
|
fileByFileMap.remove(dataFile.getFile());
|
||||||
DataFile removed = fileMap.remove(dataFile.getDataFileId());
|
|
||||||
storeSize.addAndGet(-dataFile.getLength());
|
storeSize.addAndGet(-dataFile.getLength());
|
||||||
dataFile.unlink();
|
dataFile.unlink();
|
||||||
if (archiveDataLogs) {
|
if (archiveDataLogs) {
|
||||||
dataFile.move(getDirectoryArchive());
|
dataFile.move(getDirectoryArchive());
|
||||||
LOG.info("moved data file " + dataFile + " to "
|
LOG.debug("moved data file " + dataFile + " to "
|
||||||
+ getDirectoryArchive());
|
+ getDirectoryArchive());
|
||||||
} else {
|
} else {
|
||||||
boolean result = dataFile.delete();
|
boolean result = dataFile.delete();
|
||||||
LOG.info("discarding data file " + dataFile
|
if (!result) {
|
||||||
+ (result ? "successful " : "failed"));
|
LOG.info("Failed to discard data file " + dataFile);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -261,6 +261,9 @@ public class AMQMessageStore implements MessageStore {
|
||||||
data = messages.remove(id);
|
data = messages.remove(id);
|
||||||
if (data == null) {
|
if (data == null) {
|
||||||
messageAcks.add(ack);
|
messageAcks.add(ack);
|
||||||
|
} else {
|
||||||
|
// message never got written so datafileReference will still exist
|
||||||
|
AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, data.getFileId());
|
||||||
}
|
}
|
||||||
}finally {
|
}finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
|
|
@ -26,9 +26,9 @@ import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CopyOnWriteArraySet;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.activeio.journal.Journal;
|
import org.apache.activeio.journal.Journal;
|
||||||
|
@ -122,7 +122,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
||||||
private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
|
private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
|
||||||
private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
|
private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
|
||||||
private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH;
|
private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH;
|
||||||
private Map<AMQMessageStore,Set<Integer>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Set<Integer>> ();
|
private Map<AMQMessageStore,Map<Integer, AtomicInteger>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Map<Integer, AtomicInteger>> ();
|
||||||
private String directoryPath = "";
|
private String directoryPath = "";
|
||||||
private RandomAccessFile lockFile;
|
private RandomAccessFile lockFile;
|
||||||
private FileLock lock;
|
private FileLock lock;
|
||||||
|
@ -271,14 +271,14 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
||||||
checkpoint(false);
|
checkpoint(false);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval);
|
Scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval());
|
||||||
periodicCleanupTask = new Runnable() {
|
periodicCleanupTask = new Runnable() {
|
||||||
|
|
||||||
public void run() {
|
public void run() {
|
||||||
cleanup();
|
cleanup();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Scheduler.executePeriodically(periodicCleanupTask, cleanupInterval);
|
Scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval());
|
||||||
|
|
||||||
if (lockAquired && lockLogged) {
|
if (lockAquired && lockLogged) {
|
||||||
LOG.info("Aquired lock for AMQ Store" + getDirectory());
|
LOG.info("Aquired lock for AMQ Store" + getDirectory());
|
||||||
|
@ -426,8 +426,11 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
||||||
public void cleanup() {
|
public void cleanup() {
|
||||||
try {
|
try {
|
||||||
Set<Integer>inProgress = new HashSet<Integer>();
|
Set<Integer>inProgress = new HashSet<Integer>();
|
||||||
for (Set<Integer> set: dataFilesInProgress.values()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
inProgress.addAll(set);
|
LOG.debug("dataFilesInProgress.values: (" + dataFilesInProgress.values().size() + ") " + dataFilesInProgress.values());
|
||||||
|
}
|
||||||
|
for (Map<Integer, AtomicInteger> set: dataFilesInProgress.values()) {
|
||||||
|
inProgress.addAll(set.keySet());
|
||||||
}
|
}
|
||||||
Integer lastDataFile = asyncDataManager.getCurrentDataFileId();
|
Integer lastDataFile = asyncDataManager.getCurrentDataFileId();
|
||||||
inProgress.add(lastDataFile);
|
inProgress.add(lastDataFile);
|
||||||
|
@ -437,6 +440,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
||||||
if (lastActiveTx != null) {
|
if (lastActiveTx != null) {
|
||||||
lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId());
|
lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId());
|
||||||
}
|
}
|
||||||
|
LOG.debug("lastDataFile: " + lastDataFile);
|
||||||
asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1);
|
asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Could not cleanup data files: " + e, e);
|
LOG.error("Could not cleanup data files: " + e, e);
|
||||||
|
@ -967,18 +971,32 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
||||||
|
|
||||||
|
|
||||||
protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
|
protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
|
||||||
Set<Integer>set = dataFilesInProgress.get(store);
|
Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
|
||||||
if (set == null) {
|
if (map == null) {
|
||||||
set = new CopyOnWriteArraySet<Integer>();
|
map = new ConcurrentHashMap<Integer, AtomicInteger>();
|
||||||
dataFilesInProgress.put(store, set);
|
dataFilesInProgress.put(store, map);
|
||||||
}
|
}
|
||||||
set.add(dataFileId);
|
AtomicInteger count = map.get(dataFileId);
|
||||||
|
if (count == null) {
|
||||||
|
count = new AtomicInteger(0);
|
||||||
|
map.put(dataFileId, count);
|
||||||
|
}
|
||||||
|
count.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) {
|
protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) {
|
||||||
Set<Integer>set = dataFilesInProgress.get(store);
|
Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
|
||||||
if (set != null) {
|
if (map != null) {
|
||||||
set.remove(dataFileId);
|
AtomicInteger count = map.get(dataFileId);
|
||||||
|
if (count != null) {
|
||||||
|
int newCount = count.decrementAndGet();
|
||||||
|
if (newCount <=0) {
|
||||||
|
map.remove(dataFileId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (map.isEmpty()) {
|
||||||
|
dataFilesInProgress.remove(store);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -72,6 +72,7 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
|
||||||
result.setUseNio(isUseNio());
|
result.setUseNio(isUseNio());
|
||||||
result.setMaxFileLength(getMaxFileLength());
|
result.setMaxFileLength(getMaxFileLength());
|
||||||
result.setCleanupInterval(getCleanupInterval());
|
result.setCleanupInterval(getCleanupInterval());
|
||||||
|
result.setCheckpointInterval(getCheckpointInterval());
|
||||||
result.setIndexBinSize(getIndexBinSize());
|
result.setIndexBinSize(getIndexBinSize());
|
||||||
result.setIndexKeySize(getIndexKeySize());
|
result.setIndexKeySize(getIndexKeySize());
|
||||||
result.setIndexPageSize(getIndexPageSize());
|
result.setIndexPageSize(getIndexPageSize());
|
||||||
|
|
|
@ -0,0 +1,158 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.bugs;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageListener;
|
||||||
|
import javax.jms.ObjectMessage;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
|
||||||
|
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
|
||||||
|
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* see https://issues.apache.org/activemq/browse/AMQ-1926
|
||||||
|
*/
|
||||||
|
public class DataFileNotDeletedTest extends TestCase {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(DataFileNotDeletedTest.class);
|
||||||
|
|
||||||
|
private final CountDownLatch latch = new CountDownLatch(max_messages);
|
||||||
|
private static int max_messages = 600;
|
||||||
|
private static int messageCounter;
|
||||||
|
private String destinationName = getName()+"_Queue";
|
||||||
|
private BrokerService broker;
|
||||||
|
private Connection receiverConnection;
|
||||||
|
private Connection producerConnection;
|
||||||
|
final boolean useTopic = false;
|
||||||
|
|
||||||
|
AMQPersistenceAdapter persistentAdapter;
|
||||||
|
protected static final String payload = new String(new byte[512]);
|
||||||
|
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
messageCounter = 0;
|
||||||
|
startBroker();
|
||||||
|
receiverConnection = createConnection();
|
||||||
|
receiverConnection.start();
|
||||||
|
producerConnection = createConnection();
|
||||||
|
producerConnection.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
receiverConnection.close();
|
||||||
|
producerConnection.close();
|
||||||
|
broker.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testForDataFileNotDeleted() throws Exception {
|
||||||
|
doTestForDataFileNotDeleted(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testForDataFileNotDeletedTransacted() throws Exception {
|
||||||
|
doTestForDataFileNotDeleted(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doTestForDataFileNotDeleted(boolean transacted) throws Exception {
|
||||||
|
|
||||||
|
Receiver receiver = new Receiver() {
|
||||||
|
public void receive(String s) throws Exception {
|
||||||
|
messageCounter++;
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
buildReceiver(receiverConnection, destinationName, transacted, receiver, useTopic);
|
||||||
|
|
||||||
|
final MessageSender producer = new MessageSender(destinationName, producerConnection, transacted, useTopic);
|
||||||
|
for (int i=0; i< max_messages; i++) {
|
||||||
|
producer.send(payload );
|
||||||
|
}
|
||||||
|
latch.await();
|
||||||
|
assertEquals(max_messages, messageCounter);
|
||||||
|
waitFordataFilesToBeCleanedUp(persistentAdapter.getAsyncDataManager(), 30000, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitFordataFilesToBeCleanedUp(
|
||||||
|
AsyncDataManager asyncDataManager, int timeout, int numExpected) throws InterruptedException {
|
||||||
|
long expiry = System.currentTimeMillis() + timeout;
|
||||||
|
while(expiry > System.currentTimeMillis()) {
|
||||||
|
if (asyncDataManager.getFiles().size() <= numExpected) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals("persistence adapter dataManager has correct number of files", 2, asyncDataManager.getFiles().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Connection createConnection() throws JMSException {
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||||
|
return factory.createConnection();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startBroker() throws Exception {
|
||||||
|
broker = new BrokerService();
|
||||||
|
broker.setDeleteAllMessagesOnStartup(true);
|
||||||
|
broker.setPersistent(true);
|
||||||
|
broker.setUseJmx(true);
|
||||||
|
broker.addConnector("tcp://localhost:61616").setName("Default");
|
||||||
|
|
||||||
|
AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
|
||||||
|
// ensure there are a bunch of data files but multiple entries in each
|
||||||
|
factory.setMaxFileLength(1024 * 20);
|
||||||
|
// speed up the test case, checkpoint an cleanup early and often
|
||||||
|
factory.setCheckpointInterval(500);
|
||||||
|
factory.setCleanupInterval(500);
|
||||||
|
factory.setSyncOnWrite(false);
|
||||||
|
|
||||||
|
persistentAdapter = (AMQPersistenceAdapter) broker.getPersistenceAdapter();
|
||||||
|
broker.start();
|
||||||
|
LOG.info("Starting broker..");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver, boolean isTopic) throws Exception {
|
||||||
|
final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageConsumer inputMessageConsumer = session.createConsumer(isTopic ? session.createTopic(queueName) : session.createQueue(queueName));
|
||||||
|
MessageListener messageListener = new MessageListener() {
|
||||||
|
|
||||||
|
public void onMessage(Message message) {
|
||||||
|
try {
|
||||||
|
ObjectMessage objectMessage = (ObjectMessage)message;
|
||||||
|
String s = (String)objectMessage.getObject();
|
||||||
|
receiver.receive(s);
|
||||||
|
if (session.getTransacted()) {
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
inputMessageConsumer.setMessageListener(messageListener);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue