https://issues.apache.org/jira/browse/AMQ-3775 - ensure no concurrent kahadb transaction in topic store

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1310410 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-04-06 15:50:03 +00:00
parent 148455413e
commit a3f85e271a
1 changed files with 17 additions and 14 deletions

View File

@ -456,11 +456,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
// operations... but for now we must // operations... but for now we must
// externally synchronize... // externally synchronize...
Location location; Location location;
indexLock.readLock().lock(); indexLock.writeLock().lock();
try { try {
location = findMessageLocation(key, dest); location = findMessageLocation(key, dest);
}finally { }finally {
indexLock.readLock().unlock(); indexLock.writeLock().unlock();
} }
if (location == null) { if (location == null) {
return null; return null;
@ -472,7 +472,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public int getMessageCount() throws IOException { public int getMessageCount() throws IOException {
try { try {
lockAsyncJobQueue(); lockAsyncJobQueue();
indexLock.readLock().lock(); indexLock.writeLock().lock();
try { try {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
public Integer execute(Transaction tx) throws IOException { public Integer execute(Transaction tx) throws IOException {
@ -490,7 +490,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
} }
}); });
}finally { }finally {
indexLock.readLock().unlock(); indexLock.writeLock().unlock();
} }
} finally { } finally {
unlockAsyncJobQueue(); unlockAsyncJobQueue();
@ -499,7 +499,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override @Override
public boolean isEmpty() throws IOException { public boolean isEmpty() throws IOException {
indexLock.readLock().lock(); indexLock.writeLock().lock();
try { try {
return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() { return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
public Boolean execute(Transaction tx) throws IOException { public Boolean execute(Transaction tx) throws IOException {
@ -510,7 +510,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
} }
}); });
}finally { }finally {
indexLock.readLock().unlock(); indexLock.writeLock().unlock();
} }
} }
@ -540,7 +540,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
indexLock.readLock().lock(); indexLock.writeLock().lock();
try { try {
pageFile.tx().execute(new Transaction.Closure<Exception>() { pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception { public void execute(Transaction tx) throws Exception {
@ -564,12 +564,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
} }
}); });
}finally { }finally {
indexLock.readLock().unlock(); indexLock.writeLock().unlock();
} }
} }
public void resetBatching() { public void resetBatching() {
if (pageFile.isLoaded()) { if (pageFile.isLoaded()) {
indexLock.writeLock().lock();
try { try {
pageFile.tx().execute(new Transaction.Closure<Exception>() { pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception { public void execute(Transaction tx) throws Exception {
@ -580,6 +581,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}); });
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to reset batching",e); LOG.error("Failed to reset batching",e);
}finally {
indexLock.writeLock().unlock();
} }
} }
} }
@ -736,7 +739,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public SubscriptionInfo[] getAllSubscriptions() throws IOException { public SubscriptionInfo[] getAllSubscriptions() throws IOException {
final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
indexLock.readLock().lock(); indexLock.writeLock().lock();
try { try {
pageFile.tx().execute(new Transaction.Closure<IOException>() { pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException { public void execute(Transaction tx) throws IOException {
@ -752,7 +755,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
} }
}); });
}finally { }finally {
indexLock.readLock().unlock(); indexLock.writeLock().unlock();
} }
SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()]; SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
@ -762,7 +765,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName); final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
indexLock.readLock().lock(); indexLock.writeLock().lock();
try { try {
return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() { return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
public SubscriptionInfo execute(Transaction tx) throws IOException { public SubscriptionInfo execute(Transaction tx) throws IOException {
@ -776,7 +779,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
} }
}); });
}finally { }finally {
indexLock.readLock().unlock(); indexLock.writeLock().unlock();
} }
} }
@ -933,7 +936,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public Set<ActiveMQDestination> getDestinations() { public Set<ActiveMQDestination> getDestinations() {
try { try {
final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
indexLock.readLock().lock(); indexLock.writeLock().lock();
try { try {
pageFile.tx().execute(new Transaction.Closure<IOException>() { pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException { public void execute(Transaction tx) throws IOException {
@ -960,7 +963,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
} }
}); });
}finally { }finally {
indexLock.readLock().unlock(); indexLock.writeLock().unlock();
} }
return rc; return rc;
} catch (IOException e) { } catch (IOException e) {