mirror of https://github.com/apache/activemq.git
Add properties to the AMQPersistenceAdaptorFactory
to set syncOnWrite(default=false) and persistentIndex(default=true) properties git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@582801 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1d487896b4
commit
d1eb18af70
|
@ -266,4 +266,8 @@ public interface Store {
|
||||||
* @return the amount of disk space the store is occupying
|
* @return the amount of disk space the store is occupying
|
||||||
*/
|
*/
|
||||||
long size();
|
long size();
|
||||||
|
|
||||||
|
public boolean isPersistentIndex();
|
||||||
|
|
||||||
|
public void setPersistentIndex(boolean persistentIndex);
|
||||||
}
|
}
|
||||||
|
|
|
@ -438,6 +438,14 @@ public class KahaStore implements Store {
|
||||||
persistentIndex = true;
|
persistentIndex = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isPersistentIndex() {
|
||||||
|
return persistentIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPersistentIndex(boolean persistentIndex) {
|
||||||
|
this.persistentIndex = persistentIndex;
|
||||||
|
}
|
||||||
|
|
||||||
public synchronized void initialize() throws IOException {
|
public synchronized void initialize() throws IOException {
|
||||||
if (closed) {
|
if (closed) {
|
||||||
|
|
|
@ -99,6 +99,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
||||||
private File directory;
|
private File directory;
|
||||||
private BrokerService brokerService;
|
private BrokerService brokerService;
|
||||||
private AtomicLong storeSize = new AtomicLong();
|
private AtomicLong storeSize = new AtomicLong();
|
||||||
|
private boolean persistentIndex=true;
|
||||||
|
|
||||||
public String getBrokerName() {
|
public String getBrokerName() {
|
||||||
return this.brokerName;
|
return this.brokerName;
|
||||||
|
@ -418,6 +419,14 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
||||||
public void rollbackTransaction(ConnectionContext context) throws IOException {
|
public void rollbackTransaction(ConnectionContext context) throws IOException {
|
||||||
referenceStoreAdapter.rollbackTransaction(context);
|
referenceStoreAdapter.rollbackTransaction(context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isPersistentIndex() {
|
||||||
|
return persistentIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPersistentIndex(boolean persistentIndex) {
|
||||||
|
this.persistentIndex = persistentIndex;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param location
|
* @param location
|
||||||
|
@ -605,6 +614,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
||||||
|
|
||||||
protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
|
protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
|
||||||
KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize);
|
KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize);
|
||||||
|
adaptor.setPersistentIndex(isPersistentIndex());
|
||||||
return adaptor;
|
return adaptor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,8 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
|
||||||
private int journalThreadPriority = Thread.MAX_PRIORITY;
|
private int journalThreadPriority = Thread.MAX_PRIORITY;
|
||||||
private String brokerName = "localhost";
|
private String brokerName = "localhost";
|
||||||
private ReferenceStoreAdapter referenceStoreAdapter;
|
private ReferenceStoreAdapter referenceStoreAdapter;
|
||||||
|
private boolean syncOnWrite;
|
||||||
|
private boolean persistentIndex;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return a AMQPersistenceAdapter
|
* @return a AMQPersistenceAdapter
|
||||||
|
@ -47,6 +49,8 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
|
||||||
result.setDirectory(getDataDirectory());
|
result.setDirectory(getDataDirectory());
|
||||||
result.setTaskRunnerFactory(getTaskRunnerFactory());
|
result.setTaskRunnerFactory(getTaskRunnerFactory());
|
||||||
result.setBrokerName(getBrokerName());
|
result.setBrokerName(getBrokerName());
|
||||||
|
result.setSyncOnWrite(isSyncOnWrite());
|
||||||
|
result.setPersistentIndex(isPersistentIndex());
|
||||||
result.setReferenceStoreAdapter(getReferenceStoreAdapter());
|
result.setReferenceStoreAdapter(getReferenceStoreAdapter());
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -127,4 +131,20 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
|
||||||
public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
|
public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
|
||||||
this.referenceStoreAdapter = referenceStoreAdapter;
|
this.referenceStoreAdapter = referenceStoreAdapter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isPersistentIndex() {
|
||||||
|
return persistentIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPersistentIndex(boolean persistentIndex) {
|
||||||
|
this.persistentIndex = persistentIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isSyncOnWrite() {
|
||||||
|
return syncOnWrite;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSyncOnWrite(boolean syncOnWrite) {
|
||||||
|
this.syncOnWrite = syncOnWrite;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,6 +49,8 @@ import org.apache.activemq.util.IOHelper;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
import sun.misc.Perf.GetPerfAction;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @org.apache.xbean.XBean
|
* @org.apache.xbean.XBean
|
||||||
* @version $Revision: 1.4 $
|
* @version $Revision: 1.4 $
|
||||||
|
@ -71,6 +73,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter {
|
||||||
private Store theStore;
|
private Store theStore;
|
||||||
private boolean initialized;
|
private boolean initialized;
|
||||||
private final AtomicLong storeSize;
|
private final AtomicLong storeSize;
|
||||||
|
private boolean persistentIndex = true;
|
||||||
|
|
||||||
|
|
||||||
public KahaPersistenceAdapter(AtomicLong size) {
|
public KahaPersistenceAdapter(AtomicLong size) {
|
||||||
|
@ -80,6 +83,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter {
|
||||||
public KahaPersistenceAdapter() {
|
public KahaPersistenceAdapter() {
|
||||||
this(new AtomicLong());
|
this(new AtomicLong());
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<ActiveMQDestination> getDestinations() {
|
public Set<ActiveMQDestination> getDestinations() {
|
||||||
Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
|
Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
|
||||||
try {
|
try {
|
||||||
|
@ -244,6 +248,14 @@ public class KahaPersistenceAdapter implements PersistenceAdapter {
|
||||||
public long getMaxDataFileLength() {
|
public long getMaxDataFileLength() {
|
||||||
return maxDataFileLength;
|
return maxDataFileLength;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isPersistentIndex() {
|
||||||
|
return persistentIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPersistentIndex(boolean persistentIndex) {
|
||||||
|
this.persistentIndex = persistentIndex;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param maxDataFileLength the maxDataFileLength to set
|
* @param maxDataFileLength the maxDataFileLength to set
|
||||||
|
@ -257,6 +269,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter {
|
||||||
if (theStore == null) {
|
if (theStore == null) {
|
||||||
theStore = StoreFactory.open(getStoreDirectory(), "rw",storeSize);
|
theStore = StoreFactory.open(getStoreDirectory(), "rw",storeSize);
|
||||||
theStore.setMaxDataFileLength(maxDataFileLength);
|
theStore.setMaxDataFileLength(maxDataFileLength);
|
||||||
|
theStore.setPersistentIndex(isPersistentIndex());
|
||||||
}
|
}
|
||||||
return theStore;
|
return theStore;
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,6 +61,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
|
||||||
private ListContainer<SubscriptionInfo> durableSubscribers;
|
private ListContainer<SubscriptionInfo> durableSubscribers;
|
||||||
private boolean storeValid;
|
private boolean storeValid;
|
||||||
private Store stateStore;
|
private Store stateStore;
|
||||||
|
private boolean persistentIndex = true;
|
||||||
|
|
||||||
public KahaReferenceStoreAdapter(AtomicLong size){
|
public KahaReferenceStoreAdapter(AtomicLong size){
|
||||||
super(size);
|
super(size);
|
||||||
|
@ -273,6 +274,14 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
|
||||||
StoreFactory.delete(stateDirectory);
|
StoreFactory.delete(stateDirectory);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isPersistentIndex() {
|
||||||
|
return persistentIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPersistentIndex(boolean persistentIndex) {
|
||||||
|
this.persistentIndex = persistentIndex;
|
||||||
|
}
|
||||||
|
|
||||||
private Store createStateStore(File directory) {
|
private Store createStateStore(File directory) {
|
||||||
File stateDirectory = new File(directory, "state");
|
File stateDirectory = new File(directory, "state");
|
||||||
|
@ -292,4 +301,6 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
|
||||||
protected void removeSubscriberState(SubscriptionInfo info) {
|
protected void removeSubscriberState(SubscriptionInfo info) {
|
||||||
durableSubscribers.remove(info);
|
durableSubscribers.remove(info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue