mirror of https://github.com/apache/activemq.git
Added a MemoryPropertyEditor that allows you to specify memory sizes in the xbean config like: limit="20 MB"
Upgraded the xbean maven plugin to 2.6 and the new qdox that it used did not like some of our valid inline initialization of variables, so I had to refactor to an equivalent form that qdox did like. http://issues.apache.org/activemq/browse/AMQ-827 http://issues.apache.org/activemq/browse/AMQ-909 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@439930 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
96356920b1
commit
ac66a09dab
|
@ -359,7 +359,7 @@
|
|||
<plugin>
|
||||
<groupId>org.apache.xbean</groupId>
|
||||
<artifactId>maven-xbean-plugin</artifactId>
|
||||
<version>2.5</version>
|
||||
<version>${xbean-version}</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<configuration>
|
||||
|
|
|
@ -172,6 +172,10 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
return memoryLimit;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
|
||||
*/
|
||||
public void setMemoryLimit(long memoryLimit) {
|
||||
this.memoryLimit = memoryLimit;
|
||||
}
|
||||
|
|
|
@ -151,7 +151,11 @@ public class UsageManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Sets the memory limit in bytes
|
||||
* Sets the memory limit in bytes.
|
||||
*
|
||||
* When set using XBean, you can use values such as: "20 mb", "1024 kb", or "1 gb"
|
||||
*
|
||||
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
|
||||
*/
|
||||
public void setLimit(long limit) {
|
||||
if(percentUsageMinDelta < 0 ) {
|
||||
|
@ -165,20 +169,6 @@ public class UsageManager {
|
|||
setPercentUsage(percentUsage);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the memory limit in megabytes
|
||||
*/
|
||||
public void setLimitMb(long limitMb) {
|
||||
setLimitKb(1024 * limitMb);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the memory limit in kilobytes
|
||||
*/
|
||||
public void setLimitKb(long limitKb) {
|
||||
setLimit(1024 * limitKb);
|
||||
}
|
||||
|
||||
/*
|
||||
* Sets the minimum number of percentage points the usage has to change before a UsageListener
|
||||
* event is fired by the manager.
|
||||
|
|
|
@ -63,30 +63,33 @@ public abstract class JmsConnector implements Service {
|
|||
protected String localPassword;
|
||||
private String name;
|
||||
|
||||
protected LRUCache replyToBridges = new LRUCache() {
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private static final long serialVersionUID = -7446792754185879286L;
|
||||
protected LRUCache replyToBridges = createLRUCache();
|
||||
|
||||
protected boolean removeEldestEntry(Map.Entry enty) {
|
||||
if (size() > maxCacheSize) {
|
||||
Iterator iter = entrySet().iterator();
|
||||
Map.Entry lru = (Map.Entry) iter.next();
|
||||
remove(lru.getKey());
|
||||
DestinationBridge bridge = (DestinationBridge) lru.getValue();
|
||||
try {
|
||||
bridge.stop();
|
||||
log.info("Expired bridge: " + bridge);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn("stopping expired bridge" + bridge + " caused an exception", e);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
static private LRUCache createLRUCache() {
|
||||
return new LRUCache() {
|
||||
private static final long serialVersionUID = -7446792754185879286L;
|
||||
|
||||
protected boolean removeEldestEntry(Map.Entry enty) {
|
||||
if (size() > maxCacheSize) {
|
||||
Iterator iter = entrySet().iterator();
|
||||
Map.Entry lru = (Map.Entry) iter.next();
|
||||
remove(lru.getKey());
|
||||
DestinationBridge bridge = (DestinationBridge) lru.getValue();
|
||||
try {
|
||||
bridge.stop();
|
||||
log.info("Expired bridge: " + bridge);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn("stopping expired bridge" + bridge + " caused an exception", e);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
public boolean init() {
|
||||
boolean result = initialized.compareAndSet(false, true);
|
||||
if (result) {
|
||||
|
|
|
@ -82,6 +82,9 @@ public class DefaultPersistenceAdapterFactory extends DataSourceSupport implemen
|
|||
return journalLogFileSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
|
||||
*/
|
||||
public void setJournalLogFileSize(int journalLogFileSize) {
|
||||
this.journalLogFileSize = journalLogFileSize;
|
||||
}
|
||||
|
|
|
@ -107,13 +107,17 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
|
|||
|
||||
private AtomicBoolean started = new AtomicBoolean(false);
|
||||
|
||||
private final Runnable periodicCheckpointTask = new Runnable() {
|
||||
public void run() {
|
||||
if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
|
||||
checkpoint(false, true);
|
||||
}
|
||||
}
|
||||
};
|
||||
private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
|
||||
|
||||
final Runnable createPeriodicCheckpointTask() {
|
||||
return new Runnable() {
|
||||
public void run() {
|
||||
if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
|
||||
checkpoint(false, true);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
|
||||
|
||||
|
|
|
@ -106,13 +106,17 @@ public class QuickJournalPersistenceAdapter implements PersistenceAdapter, Journ
|
|||
|
||||
private AtomicBoolean started = new AtomicBoolean(false);
|
||||
|
||||
private final Runnable periodicCheckpointTask = new Runnable() {
|
||||
public void run() {
|
||||
if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
|
||||
checkpoint(false, true);
|
||||
}
|
||||
}
|
||||
};
|
||||
private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
|
||||
|
||||
final Runnable createPeriodicCheckpointTask() {
|
||||
return new Runnable() {
|
||||
public void run() {
|
||||
if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
|
||||
checkpoint(false, true);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public QuickJournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
|
||||
|
||||
|
|
|
@ -188,6 +188,8 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
|
|||
|
||||
/**
|
||||
* @param maxDataFileLength the maxDataFileLength to set
|
||||
*
|
||||
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
|
||||
*/
|
||||
public void setMaxDataFileLength(long maxDataFileLength){
|
||||
this.maxDataFileLength=maxDataFileLength;
|
||||
|
|
|
@ -114,13 +114,17 @@ public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEvent
|
|||
private boolean useExternalMessageReferences;
|
||||
|
||||
|
||||
private final Runnable periodicCheckpointTask = new Runnable() {
|
||||
public void run() {
|
||||
if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
|
||||
checkpoint(false, true);
|
||||
}
|
||||
}
|
||||
};
|
||||
private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
|
||||
|
||||
final Runnable createPeriodicCheckpointTask() {
|
||||
return new Runnable() {
|
||||
public void run() {
|
||||
if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
|
||||
checkpoint(false, true);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public RapidPersistenceAdapter(Journal journal, TaskRunnerFactory taskRunnerFactory) throws IOException {
|
||||
|
||||
|
|
|
@ -80,54 +80,58 @@ public class FailoverTransport implements CompositeTransport {
|
|||
private long reconnectDelay = initialReconnectDelay;
|
||||
private Exception connectionFailure;
|
||||
|
||||
private final TransportListener myTransportListener = new TransportListener() {
|
||||
public void onCommand(Command command) {
|
||||
if (command == null) {
|
||||
return;
|
||||
}
|
||||
if (command.isResponse()) {
|
||||
requestMap.remove(new Integer(((Response) command).getCorrelationId()));
|
||||
}
|
||||
if (!initialized){
|
||||
if (command.isBrokerInfo()){
|
||||
BrokerInfo info = (BrokerInfo)command;
|
||||
BrokerInfo[] peers = info.getPeerBrokerInfos();
|
||||
if (peers!= null){
|
||||
for (int i =0; i < peers.length;i++){
|
||||
String brokerString = peers[i].getBrokerURL();
|
||||
add(brokerString);
|
||||
}
|
||||
}
|
||||
initialized = true;
|
||||
}
|
||||
private final TransportListener myTransportListener = createTransportListener();
|
||||
|
||||
}
|
||||
if (transportListener != null) {
|
||||
transportListener.onCommand(command);
|
||||
}
|
||||
}
|
||||
TransportListener createTransportListener() {
|
||||
return new TransportListener() {
|
||||
public void onCommand(Command command) {
|
||||
if (command == null) {
|
||||
return;
|
||||
}
|
||||
if (command.isResponse()) {
|
||||
requestMap.remove(new Integer(((Response) command).getCorrelationId()));
|
||||
}
|
||||
if (!initialized){
|
||||
if (command.isBrokerInfo()){
|
||||
BrokerInfo info = (BrokerInfo)command;
|
||||
BrokerInfo[] peers = info.getPeerBrokerInfos();
|
||||
if (peers!= null){
|
||||
for (int i =0; i < peers.length;i++){
|
||||
String brokerString = peers[i].getBrokerURL();
|
||||
add(brokerString);
|
||||
}
|
||||
}
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
public void onException(IOException error) {
|
||||
try {
|
||||
handleTransportFailure(error);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
transportListener.onException(new InterruptedIOException());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (transportListener != null) {
|
||||
transportListener.onCommand(command);
|
||||
}
|
||||
}
|
||||
|
||||
public void transportInterupted(){
|
||||
if (transportListener != null){
|
||||
transportListener.transportInterupted();
|
||||
}
|
||||
}
|
||||
public void onException(IOException error) {
|
||||
try {
|
||||
handleTransportFailure(error);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
transportListener.onException(new InterruptedIOException());
|
||||
}
|
||||
}
|
||||
|
||||
public void transportResumed(){
|
||||
if(transportListener != null){
|
||||
transportListener.transportResumed();
|
||||
}
|
||||
}
|
||||
};
|
||||
public void transportInterupted(){
|
||||
if (transportListener != null){
|
||||
transportListener.transportInterupted();
|
||||
}
|
||||
}
|
||||
|
||||
public void transportResumed(){
|
||||
if(transportListener != null){
|
||||
transportListener.transportResumed();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public FailoverTransport() throws InterruptedIOException {
|
||||
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
package org.apache.activemq.util;
|
||||
|
||||
import java.beans.PropertyEditorSupport;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class MemoryPropertyEditor extends PropertyEditorSupport {
|
||||
public void setAsText(String text) throws IllegalArgumentException {
|
||||
|
||||
Pattern p = Pattern.compile("^\\s*(\\d+)\\s*(b)?\\s*$",Pattern.CASE_INSENSITIVE);
|
||||
Matcher m = p.matcher(text);
|
||||
if (m.matches()) {
|
||||
setValue(new Long(Long.parseLong(m.group(1))));
|
||||
return;
|
||||
}
|
||||
|
||||
p = Pattern.compile("^\\s*(\\d+)\\s*k(b)?\\s*$",Pattern.CASE_INSENSITIVE);
|
||||
m = p.matcher(text);
|
||||
if (m.matches()) {
|
||||
setValue(new Long(Long.parseLong(m.group(1)) * 1024));
|
||||
return;
|
||||
}
|
||||
|
||||
p = Pattern.compile("^\\s*(\\d+)\\s*m(b)?\\s*$", Pattern.CASE_INSENSITIVE);
|
||||
m = p.matcher(text);
|
||||
if (m.matches()) {
|
||||
setValue(new Long(Long.parseLong(m.group(1)) * 1024 * 1024 ));
|
||||
return;
|
||||
}
|
||||
|
||||
p = Pattern.compile("^\\s*(\\d+)\\s*g(b)?\\s*$", Pattern.CASE_INSENSITIVE);
|
||||
m = p.matcher(text);
|
||||
if (m.matches()) {
|
||||
setValue(new Long(Long.parseLong(m.group(1)) * 1024 * 1024 * 1024 ));
|
||||
return;
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException(
|
||||
"Could convert not to a memory size: " + text);
|
||||
}
|
||||
|
||||
public String getAsText() {
|
||||
Long value = (Long) getValue();
|
||||
return (value != null ? value.toString() : "");
|
||||
}
|
||||
|
||||
}
|
|
@ -23,7 +23,7 @@
|
|||
<broker xmlns="http://activemq.org/config/1.0">
|
||||
|
||||
<memoryManager>
|
||||
<usageManager id="memory-manager" limitMb="50"/>
|
||||
<usageManager id="memory-manager" limit="50 MB"/>
|
||||
</memoryManager>
|
||||
|
||||
<!-- In ActiveMQ 4, you can setup destination policies -->
|
||||
|
|
|
@ -22,9 +22,9 @@
|
|||
|
||||
<broker useJmx="true" xmlns="http://activemq.org/config/1.0">
|
||||
|
||||
<!-- Use the following to set the broker memory limit (in bytes)
|
||||
<!-- Use the following to set the broker memory limit
|
||||
<memoryManager>
|
||||
<usageManager id="memory-manager" limit="1048576"/>
|
||||
<usageManager id="memory-manager" limit="20 MB"/>
|
||||
</memoryManager>
|
||||
-->
|
||||
|
||||
|
|
Loading…
Reference in New Issue