resolve https://issues.apache.org/activemq/browse/AMQ-2910 - fix and test, index on temp plist store was not being protected

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@997849 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-09-16 17:09:22 +00:00
parent bc2a7f1ae0
commit cf7a935b06
3 changed files with 234 additions and 82 deletions

View File

@ -34,10 +34,11 @@ public class PList {
private long lastId = EntryLocation.NOT_SET;
private final AtomicBoolean loaded = new AtomicBoolean();
private int size = 0;
Object indexLock;
PList(PListStore store) {
this.store = store;
this.indexLock = store.getIndexLock();
}
public void setName(String name) {
@ -108,11 +109,13 @@ public class PList {
}
public synchronized void destroy() throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
destroy(tx);
}
});
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
destroy(tx);
}
});
}
}
void destroy(Transaction tx) throws IOException {
@ -158,15 +161,17 @@ public class PList {
}
synchronized public void addLast(final String id, final ByteSequence bs) throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
addLast(tx, id, bs);
}
});
final Location location = this.store.write(bs, false);
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
addLast(tx, id, bs, location);
}
});
}
}
void addLast(Transaction tx, String id, ByteSequence bs) throws IOException {
Location location = this.store.write(bs, false);
private void addLast(Transaction tx, String id, ByteSequence bs, Location location) throws IOException {
EntryLocation entry = createEntry(tx, id, this.lastId, EntryLocation.NOT_SET);
entry.setLocation(location);
storeEntry(tx, entry);
@ -180,15 +185,17 @@ public class PList {
}
synchronized public void addFirst(final String id, final ByteSequence bs) throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
addFirst(tx, id, bs);
}
});
final Location location = this.store.write(bs, false);
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
addFirst(tx, id, bs, location);
}
});
}
}
void addFirst(Transaction tx, String id, ByteSequence bs) throws IOException {
Location location = this.store.write(bs, false);
private void addFirst(Transaction tx, String id, ByteSequence bs, Location location) throws IOException {
EntryLocation entry = createEntry(tx, id, EntryLocation.NOT_SET, EntryLocation.NOT_SET);
entry.setLocation(location);
EntryLocation oldFirst = getFirst(tx);
@ -209,42 +216,50 @@ public class PList {
synchronized public boolean remove(final String id) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
result.set(remove(tx, id));
}
});
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
result.set(remove(tx, id));
}
});
}
return result.get();
}
synchronized public boolean remove(final int position) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
result.set(remove(tx, position));
}
});
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
result.set(remove(tx, position));
}
});
}
return result.get();
}
synchronized public boolean remove(final PListEntry entry) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
result.set(doRemove(tx, entry.getEntry()));
}
});
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
result.set(doRemove(tx, entry.getEntry()));
}
});
}
return result.get();
}
synchronized public PListEntry get(final int position) throws IOException {
PListEntry result = null;
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(get(tx, position));
}
});
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(get(tx, position));
}
});
}
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getLocation());
result = new PListEntry(ref.get(), bs);
@ -255,41 +270,10 @@ public class PList {
synchronized public PListEntry getFirst() throws IOException {
PListEntry result = null;
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(getFirst(tx));
}
});
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getLocation());
result = new PListEntry(ref.get(), bs);
}
return result;
}
synchronized public PListEntry getLast() throws IOException {
PListEntry result = null;
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(getLast(tx));
}
});
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getLocation());
result = new PListEntry(ref.get(), bs);
}
return result;
}
synchronized public PListEntry getNext(PListEntry entry) throws IOException {
PListEntry result = null;
final long nextId = entry != null ? entry.getEntry().getNext() : this.rootId;
if (nextId != EntryLocation.NOT_SET) {
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(getNext(tx, nextId));
ref.set(getFirst(tx));
}
});
if (ref.get() != null) {
@ -300,17 +284,55 @@ public class PList {
return result;
}
synchronized public PListEntry getLast() throws IOException {
PListEntry result = null;
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(getLast(tx));
}
});
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getLocation());
result = new PListEntry(ref.get(), bs);
}
}
return result;
}
synchronized public PListEntry getNext(PListEntry entry) throws IOException {
PListEntry result = null;
final long nextId = entry != null ? entry.getEntry().getNext() : this.rootId;
if (nextId != EntryLocation.NOT_SET) {
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(getNext(tx, nextId));
}
});
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getLocation());
result = new PListEntry(ref.get(), bs);
}
}
}
return result;
}
synchronized public PListEntry refresh(final PListEntry entry) throws IOException {
PListEntry result = null;
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(loadEntry(tx, entry.getEntry().getPage().getPageId()));
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(loadEntry(tx, entry.getEntry().getPage().getPageId()));
}
});
if (ref.get() != null) {
result = new PListEntry(ref.get(), entry.getByteSequence());
}
});
if (ref.get() != null) {
result = new PListEntry(ref.get(), entry.getByteSequence());
}
return result;
}
@ -390,7 +412,7 @@ public class PList {
return null;
}
boolean doRemove(Transaction tx, EntryLocation entry) throws IOException {
private boolean doRemove(Transaction tx, EntryLocation entry) throws IOException {
boolean result = false;
if (entry != null) {
@ -450,6 +472,7 @@ public class PList {
}
return entry;
}
private void storeEntry(Transaction tx, EntryLocation entry) throws IOException {
tx.store(entry.getPage(), EntryLocationMarshaller.INSTANCE, true);
}

View File

@ -45,6 +45,9 @@ import org.apache.kahadb.util.LockFile;
import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller;
/**
* @org.apache.xbean.XBean
*/
public class PListStore extends ServiceSupport {
static final Log LOG = LogFactory.getLog(PListStore.class);
private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@ -65,6 +68,11 @@ public class PListStore extends ServiceSupport {
MetaData metaData = new MetaData(this);
final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
Map<String, PList> persistentLists = new HashMap<String, PList>();
final Object indexLock = new Object();
public Object getIndexLock() {
return indexLock;
}
protected class MetaData {
protected MetaData(PListStore store) {
@ -188,7 +196,7 @@ public class PListStore extends ServiceSupport {
}
}
public PList getPList(final String name) throws Exception {
synchronized public PList getPList(final String name) throws Exception {
if (!isStarted()) {
throw new IllegalStateException("Not started");
}

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleClientsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
public class AMQ2910Test extends JmsMultipleClientsTestSupport {
final int maxConcurrency = 60;
final int msgCount = 200;
final Vector<Throwable> exceptions = new Vector<Throwable>();
@Override
protected BrokerService createBroker() throws Exception {
//persistent = true;
BrokerService broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
broker.addConnector("tcp://localhost:0");
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
defaultEntry.setCursorMemoryHighWaterMark(50);
defaultEntry.setMemoryLimit(500*1024);
defaultEntry.setProducerFlowControl(false);
policyMap.setDefaultEntry(defaultEntry);
broker.setDestinationPolicy(policyMap);
broker.getSystemUsage().getMemoryUsage().setLimit(1000 * 1024);
return broker;
}
public void testConcurrentSendToPendingCursor() throws Exception {
final ActiveMQConnectionFactory factory =
new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
factory.setCloseTimeout(30000);
ExecutorService executor = Executors.newCachedThreadPool();
for (int i=0; i<maxConcurrency; i++) {
final ActiveMQQueue dest = new ActiveMQQueue("Queue-" + i);
executor.execute(new Runnable() {
public void run() {
try {
sendMessages(factory.createConnection(), dest, msgCount);
} catch (Throwable t) {
exceptions.add(t);
}
}
});
}
executor.shutdown();
assertTrue("send completed", executor.awaitTermination(60, TimeUnit.SECONDS));
assertNoExceptions();
executor = Executors.newCachedThreadPool();
for (int i=0; i<maxConcurrency; i++) {
final ActiveMQQueue dest = new ActiveMQQueue("Queue-" + i);
executor.execute(new Runnable() {
public void run() {
try {
startConsumers(factory, dest);
} catch (Throwable t) {
exceptions.add(t);
}
}
});
}
executor.shutdown();
assertTrue("consumers completed", executor.awaitTermination(60, TimeUnit.SECONDS));
allMessagesList.setMaximumDuration(120*1000);
final int numExpected = maxConcurrency * msgCount;
allMessagesList.waitForMessagesToArrive(numExpected);
if (allMessagesList.getMessageCount() != numExpected) {
dumpAllThreads(getName());
}
allMessagesList.assertMessagesReceivedNoWait(numExpected);
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
}
private void assertNoExceptions() {
if (!exceptions.isEmpty()) {
for (Throwable t: exceptions) {
t.printStackTrace();
}
}
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
}
}