[AMQ-6625] remove kahadbioexceptionhandler by pushing allowIOResumption into persistence adapter. This allows the lease locker to still be used with kahadb for stopStartConnectors support

This commit is contained in:
gtully 2017-06-15 17:27:47 +01:00
parent bfbdd3c5ad
commit b07821ab64
12 changed files with 47 additions and 44 deletions

View File

@ -206,4 +206,6 @@ public interface PersistenceAdapter extends Service {
* @return the last stored sequence id or -1 if no suppression needed
*/
long getLastProducerSequenceId(ProducerId id) throws IOException;
void allowIOResumption();
}

View File

@ -237,6 +237,9 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter, NoLocalSubs
return -1;
}
@Override
public void allowIOResumption() {}
@Override
public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
// We could eventuall implement an in memory scheduler.

View File

@ -166,6 +166,11 @@ import org.slf4j.LoggerFactory;
}
protected void allowIOResumption() {
try {
broker.getPersistenceAdapter().allowIOResumption();
} catch (IOException e) {
LOG.warn("Failed to allow IO resumption", e);
}
}
private void stopBroker(Exception exception) {

View File

@ -291,6 +291,9 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
}
}
@Override
public void allowIOResumption() {}
@Override
public void init() throws Exception {
getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());

View File

@ -799,6 +799,11 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
return -1;
}
@Override
public void allowIOResumption() {
longTermPersistence.allowIOResumption();
}
@Override
public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
return longTermPersistence.createJobSchedulerStore();

View File

@ -1,43 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* @org.apache.xbean.XBean
*/
public class KahaDBIOExceptionHandler extends DefaultIOExceptionHandler {
private static final Logger LOG = LoggerFactory
.getLogger(KahaDBIOExceptionHandler.class);
protected void allowIOResumption() {
try {
if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
kahaDBPersistenceAdapter.getStore().allowIOResumption();
}
} catch (IOException e) {
LOG.warn("Failed to allow IO resumption", e);
}
}
}

View File

@ -163,6 +163,11 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
return this.letter.getLastProducerSequenceId(id);
}
@Override
public void allowIOResumption() {
this.letter.allowIOResumption();
}
/**
* @param destination
* @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)

View File

@ -288,6 +288,13 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
return maxId;
}
@Override
public void allowIOResumption() {
for (PersistenceAdapter persistenceAdapter : adapters) {
persistenceAdapter.allowIOResumption();
}
}
@Override
public void removeQueueMessageStore(ActiveMQQueue destination) {
PersistenceAdapter adapter = null;

View File

@ -647,6 +647,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
return -1;
}
@Override
public void allowIOResumption() {
if (pageFile != null) {
pageFile.allowIOResumption();
}
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;

View File

@ -1147,4 +1147,6 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def rollbackTransaction(context: ConnectionContext): Unit = {}
def createClient = new LevelDBClient(this);
def allowIOResumption() = {}
}

View File

@ -132,4 +132,6 @@ abstract class ProxyLevelDBStore extends LockableServiceSupport with BrokerServi
def removePList(name: String): Boolean = {
return proxy_target.removePList(name)
}
def allowIOResumption() = {}
}

View File

@ -416,7 +416,12 @@ public class RedeliveryRestartWithExceptionTest extends TestSupport {
public long getLastProducerSequenceId(ProducerId id) throws IOException {
return kahaDB.getLastProducerSequenceId(id);
}
@Override
public void allowIOResumption() {
kahaDB.allowIOResumption();
}
}
private class ProxyMessageStoreWithUpdateException extends ProxyMessageStore {