From ac66a09dab2a3a756def13c35e54a8a2e11017b8 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Mon, 4 Sep 2006 04:56:25 +0000 Subject: [PATCH] Added a MemoryPropertyEditor that allows you to specify memory sizes in the xbean config like: limit="20 MB" Upgraded the xbean maven plugin to 2.6 and the new qdox that it used did not like some of our valid inline initialization of variables, so I had to refactor to an equivalent form that qdox did like. http://issues.apache.org/activemq/browse/AMQ-827 http://issues.apache.org/activemq/browse/AMQ-909 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@439930 13f79535-47bb-0310-9956-ffa450edef68 --- activemq-core/pom.xml | 2 +- .../broker/region/policy/PolicyEntry.java | 4 + .../apache/activemq/memory/UsageManager.java | 20 +--- .../activemq/network/jms/JmsConnector.java | 49 +++++---- .../DefaultPersistenceAdapterFactory.java | 3 + .../journal/JournalPersistenceAdapter.java | 18 ++-- .../QuickJournalPersistenceAdapter.java | 18 ++-- .../kahadaptor/KahaPersistenceAdapter.java | 2 + .../store/rapid/RapidPersistenceAdapter.java | 18 ++-- .../transport/failover/FailoverTransport.java | 102 +++++++++--------- .../activemq/util/MemoryPropertyEditor.java | 47 ++++++++ .../src/release/conf/activemq-nojournal.xml | 2 +- assembly/src/release/conf/activemq.xml | 4 +- 13 files changed, 177 insertions(+), 112 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/util/MemoryPropertyEditor.java diff --git a/activemq-core/pom.xml b/activemq-core/pom.xml index 9f90f95317..159b26bcad 100755 --- a/activemq-core/pom.xml +++ b/activemq-core/pom.xml @@ -359,7 +359,7 @@ org.apache.xbean maven-xbean-plugin - 2.5 + ${xbean-version} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index cf98306f53..5f529d92e5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -172,6 +172,10 @@ public class PolicyEntry extends DestinationMapEntry { return memoryLimit; } + /** + * + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" + */ public void setMemoryLimit(long memoryLimit) { this.memoryLimit = memoryLimit; } diff --git a/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java b/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java index b4c6106b51..b1bb0b89de 100755 --- a/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java +++ b/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java @@ -151,7 +151,11 @@ public class UsageManager { } /** - * Sets the memory limit in bytes + * Sets the memory limit in bytes. + * + * When set using XBean, you can use values such as: "20 mb", "1024 kb", or "1 gb" + * + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" */ public void setLimit(long limit) { if(percentUsageMinDelta < 0 ) { @@ -165,20 +169,6 @@ public class UsageManager { setPercentUsage(percentUsage); } - /** - * Sets the memory limit in megabytes - */ - public void setLimitMb(long limitMb) { - setLimitKb(1024 * limitMb); - } - - /** - * Sets the memory limit in kilobytes - */ - public void setLimitKb(long limitKb) { - setLimit(1024 * limitKb); - } - /* * Sets the minimum number of percentage points the usage has to change before a UsageListener * event is fired by the manager. diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java index 621536d575..87cdec9342 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java @@ -63,30 +63,33 @@ public abstract class JmsConnector implements Service { protected String localPassword; private String name; - protected LRUCache replyToBridges = new LRUCache() { - /** - * - */ - private static final long serialVersionUID = -7446792754185879286L; - - protected boolean removeEldestEntry(Map.Entry enty) { - if (size() > maxCacheSize) { - Iterator iter = entrySet().iterator(); - Map.Entry lru = (Map.Entry) iter.next(); - remove(lru.getKey()); - DestinationBridge bridge = (DestinationBridge) lru.getValue(); - try { - bridge.stop(); - log.info("Expired bridge: " + bridge); - } - catch (Exception e) { - log.warn("stopping expired bridge" + bridge + " caused an exception", e); - } - } - return false; - } - }; + protected LRUCache replyToBridges = createLRUCache(); + + static private LRUCache createLRUCache() { + return new LRUCache() { + private static final long serialVersionUID = -7446792754185879286L; + + protected boolean removeEldestEntry(Map.Entry enty) { + if (size() > maxCacheSize) { + Iterator iter = entrySet().iterator(); + Map.Entry lru = (Map.Entry) iter.next(); + remove(lru.getKey()); + DestinationBridge bridge = (DestinationBridge) lru.getValue(); + try { + bridge.stop(); + log.info("Expired bridge: " + bridge); + } + catch (Exception e) { + log.warn("stopping expired bridge" + bridge + " caused an exception", e); + } + } + return false; + } + }; + } + /** + */ public boolean init() { boolean result = initialized.compareAndSet(false, true); if (result) { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java b/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java index 0d61d4c695..1de45dc96a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java @@ -82,6 +82,9 @@ public class DefaultPersistenceAdapterFactory extends DataSourceSupport implemen return journalLogFileSize; } + /** + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" + */ public void setJournalLogFileSize(int journalLogFileSize) { this.journalLogFileSize = journalLogFileSize; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java index d93449a8fb..cf3c92a3f2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java @@ -107,13 +107,17 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve private AtomicBoolean started = new AtomicBoolean(false); - private final Runnable periodicCheckpointTask = new Runnable() { - public void run() { - if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) { - checkpoint(false, true); - } - } - }; + private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask(); + + final Runnable createPeriodicCheckpointTask() { + return new Runnable() { + public void run() { + if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) { + checkpoint(false, true); + } + } + }; + } public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java index f4bd88f3cd..c764153b6c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java @@ -106,13 +106,17 @@ public class QuickJournalPersistenceAdapter implements PersistenceAdapter, Journ private AtomicBoolean started = new AtomicBoolean(false); - private final Runnable periodicCheckpointTask = new Runnable() { - public void run() { - if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) { - checkpoint(false, true); - } - } - }; + private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask(); + + final Runnable createPeriodicCheckpointTask() { + return new Runnable() { + public void run() { + if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) { + checkpoint(false, true); + } + } + }; + } public QuickJournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java index da78468c9b..1614d84606 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java @@ -188,6 +188,8 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{ /** * @param maxDataFileLength the maxDataFileLength to set + * + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" */ public void setMaxDataFileLength(long maxDataFileLength){ this.maxDataFileLength=maxDataFileLength; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java index 2b12280934..51f6baedd5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java @@ -114,13 +114,17 @@ public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEvent private boolean useExternalMessageReferences; - private final Runnable periodicCheckpointTask = new Runnable() { - public void run() { - if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) { - checkpoint(false, true); - } - } - }; + private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask(); + + final Runnable createPeriodicCheckpointTask() { + return new Runnable() { + public void run() { + if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) { + checkpoint(false, true); + } + } + }; + } public RapidPersistenceAdapter(Journal journal, TaskRunnerFactory taskRunnerFactory) throws IOException { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 5b4c6eca54..737f143d1f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -80,55 +80,59 @@ public class FailoverTransport implements CompositeTransport { private long reconnectDelay = initialReconnectDelay; private Exception connectionFailure; - private final TransportListener myTransportListener = new TransportListener() { - public void onCommand(Command command) { - if (command == null) { - return; - } - if (command.isResponse()) { - requestMap.remove(new Integer(((Response) command).getCorrelationId())); - } - if (!initialized){ - if (command.isBrokerInfo()){ - BrokerInfo info = (BrokerInfo)command; - BrokerInfo[] peers = info.getPeerBrokerInfos(); - if (peers!= null){ - for (int i =0; i < peers.length;i++){ - String brokerString = peers[i].getBrokerURL(); - add(brokerString); - } - } - initialized = true; - } - - } - if (transportListener != null) { - transportListener.onCommand(command); - } - } - - public void onException(IOException error) { - try { - handleTransportFailure(error); - } - catch (InterruptedException e) { - transportListener.onException(new InterruptedIOException()); - } - } - - public void transportInterupted(){ - if (transportListener != null){ - transportListener.transportInterupted(); - } - } - - public void transportResumed(){ - if(transportListener != null){ - transportListener.transportResumed(); - } - } - }; - + private final TransportListener myTransportListener = createTransportListener(); + + TransportListener createTransportListener() { + return new TransportListener() { + public void onCommand(Command command) { + if (command == null) { + return; + } + if (command.isResponse()) { + requestMap.remove(new Integer(((Response) command).getCorrelationId())); + } + if (!initialized){ + if (command.isBrokerInfo()){ + BrokerInfo info = (BrokerInfo)command; + BrokerInfo[] peers = info.getPeerBrokerInfos(); + if (peers!= null){ + for (int i =0; i < peers.length;i++){ + String brokerString = peers[i].getBrokerURL(); + add(brokerString); + } + } + initialized = true; + } + + } + if (transportListener != null) { + transportListener.onCommand(command); + } + } + + public void onException(IOException error) { + try { + handleTransportFailure(error); + } + catch (InterruptedException e) { + transportListener.onException(new InterruptedIOException()); + } + } + + public void transportInterupted(){ + if (transportListener != null){ + transportListener.transportInterupted(); + } + } + + public void transportResumed(){ + if(transportListener != null){ + transportListener.transportResumed(); + } + } + }; + } + public FailoverTransport() throws InterruptedIOException { // Setup a task that is used to reconnect the a connection async. diff --git a/activemq-core/src/main/java/org/apache/activemq/util/MemoryPropertyEditor.java b/activemq-core/src/main/java/org/apache/activemq/util/MemoryPropertyEditor.java new file mode 100644 index 0000000000..e24786e6b9 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/util/MemoryPropertyEditor.java @@ -0,0 +1,47 @@ +package org.apache.activemq.util; + +import java.beans.PropertyEditorSupport; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class MemoryPropertyEditor extends PropertyEditorSupport { + public void setAsText(String text) throws IllegalArgumentException { + + Pattern p = Pattern.compile("^\\s*(\\d+)\\s*(b)?\\s*$",Pattern.CASE_INSENSITIVE); + Matcher m = p.matcher(text); + if (m.matches()) { + setValue(new Long(Long.parseLong(m.group(1)))); + return; + } + + p = Pattern.compile("^\\s*(\\d+)\\s*k(b)?\\s*$",Pattern.CASE_INSENSITIVE); + m = p.matcher(text); + if (m.matches()) { + setValue(new Long(Long.parseLong(m.group(1)) * 1024)); + return; + } + + p = Pattern.compile("^\\s*(\\d+)\\s*m(b)?\\s*$", Pattern.CASE_INSENSITIVE); + m = p.matcher(text); + if (m.matches()) { + setValue(new Long(Long.parseLong(m.group(1)) * 1024 * 1024 )); + return; + } + + p = Pattern.compile("^\\s*(\\d+)\\s*g(b)?\\s*$", Pattern.CASE_INSENSITIVE); + m = p.matcher(text); + if (m.matches()) { + setValue(new Long(Long.parseLong(m.group(1)) * 1024 * 1024 * 1024 )); + return; + } + + throw new IllegalArgumentException( + "Could convert not to a memory size: " + text); + } + + public String getAsText() { + Long value = (Long) getValue(); + return (value != null ? value.toString() : ""); + } + +} diff --git a/assembly/src/release/conf/activemq-nojournal.xml b/assembly/src/release/conf/activemq-nojournal.xml index 6a3e6895a9..5715a0293a 100755 --- a/assembly/src/release/conf/activemq-nojournal.xml +++ b/assembly/src/release/conf/activemq-nojournal.xml @@ -23,7 +23,7 @@ - + diff --git a/assembly/src/release/conf/activemq.xml b/assembly/src/release/conf/activemq.xml index f634b006f9..b378163ddd 100755 --- a/assembly/src/release/conf/activemq.xml +++ b/assembly/src/release/conf/activemq.xml @@ -22,9 +22,9 @@ -