diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 192814041a..c93a419653 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -456,11 +456,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { // operations... but for now we must // externally synchronize... Location location; - indexLock.readLock().lock(); + indexLock.writeLock().lock(); try { location = findMessageLocation(key, dest); }finally { - indexLock.readLock().unlock(); + indexLock.writeLock().unlock(); } if (location == null) { return null; @@ -472,7 +472,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public int getMessageCount() throws IOException { try { lockAsyncJobQueue(); - indexLock.readLock().lock(); + indexLock.writeLock().lock(); try { return pageFile.tx().execute(new Transaction.CallableClosure() { public Integer execute(Transaction tx) throws IOException { @@ -490,7 +490,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } }); }finally { - indexLock.readLock().unlock(); + indexLock.writeLock().unlock(); } } finally { unlockAsyncJobQueue(); @@ -499,7 +499,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { @Override public boolean isEmpty() throws IOException { - indexLock.readLock().lock(); + indexLock.writeLock().lock(); try { return pageFile.tx().execute(new Transaction.CallableClosure() { public Boolean execute(Transaction tx) throws IOException { @@ -510,7 +510,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } }); }finally { - indexLock.readLock().unlock(); + indexLock.writeLock().unlock(); } } @@ -540,7 +540,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { - indexLock.readLock().lock(); + indexLock.writeLock().lock(); try { pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws Exception { @@ -564,12 +564,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } }); }finally { - indexLock.readLock().unlock(); + indexLock.writeLock().unlock(); } } public void resetBatching() { if (pageFile.isLoaded()) { + indexLock.writeLock().lock(); try { pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws Exception { @@ -580,6 +581,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { }); } catch (Exception e) { LOG.error("Failed to reset batching",e); + }finally { + indexLock.writeLock().unlock(); } } } @@ -736,7 +739,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public SubscriptionInfo[] getAllSubscriptions() throws IOException { final ArrayList subscriptions = new ArrayList(); - indexLock.readLock().lock(); + indexLock.writeLock().lock(); try { pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws IOException { @@ -752,7 +755,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } }); }finally { - indexLock.readLock().unlock(); + indexLock.writeLock().unlock(); } SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()]; @@ -762,7 +765,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); - indexLock.readLock().lock(); + indexLock.writeLock().lock(); try { return pageFile.tx().execute(new Transaction.CallableClosure() { public SubscriptionInfo execute(Transaction tx) throws IOException { @@ -776,7 +779,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } }); }finally { - indexLock.readLock().unlock(); + indexLock.writeLock().unlock(); } } @@ -933,7 +936,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public Set getDestinations() { try { final HashSet rc = new HashSet(); - indexLock.readLock().lock(); + indexLock.writeLock().lock(); try { pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws IOException { @@ -960,7 +963,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } }); }finally { - indexLock.readLock().unlock(); + indexLock.writeLock().unlock(); } return rc; } catch (IOException e) {