https://issues.apache.org/jira/browse/AMQ-4682 - add support for mb/kb limit suffix and add karaf itest, fix up spring context in pid factory

This commit is contained in:
gtully 2013-10-07 19:58:57 +01:00
parent 21edf2b5bb
commit 77a9ade207
10 changed files with 326 additions and 17 deletions

View File

@ -41,6 +41,7 @@ public abstract class AbstractJmsFeatureTest extends AbstractFeatureTest {
} finally { } finally {
try { try {
in.close(); in.close();
out.force(true);
out.close(); out.close();
} catch (Exception e) { } catch (Exception e) {
// ignore // ignore

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.karaf.itest;
import java.io.File;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.MavenUtils;
import org.ops4j.pax.exam.Option;
import org.ops4j.pax.exam.junit.Configuration;
import org.ops4j.pax.exam.junit.JUnit4TestRunner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.openengsb.labs.paxexam.karaf.options.KarafDistributionOption.editConfigurationFilePut;
import static org.openengsb.labs.paxexam.karaf.options.KarafDistributionOption.replaceConfigurationFile;
@RunWith(JUnit4TestRunner.class)
public class ActiveMQBrokerRuntimeConfigTest extends AbstractJmsFeatureTest {
@Configuration
public static Option[] configure() {
return append(editConfigurationFilePut("etc/org.apache.activemq.server-default.cfg", "config.check", "false"),
configureBrokerStart(
append(replaceConfigurationFile("data/tmp/modified-config.xml",
new File(basedir + "/src/test/resources/org/apache/activemq/karaf/itest/activemq-runtime-config-mod.xml")),
configure("activemq")), "activemq-runtime-config"));
}
@Test
public void test() throws Throwable {
withinReason(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
assertEquals("brokerName = amq-broker", executeCommand("activemq:list").trim());
return true;
}
});
withinReason(new Callable<Boolean>(){
@Override
public Boolean call() throws Exception {
assertTrue("3MB limit", executeCommand("activemq:query").trim().contains("MemoryLimit = 3145728"));
return true;
}
});
// ensure update will be reflected in OS fs modified window
TimeUnit.SECONDS.sleep(4);
// increase from 3mb to 4mb and check
String karafDir = System.getProperty("karaf.base");
File target = new File(karafDir + "/etc/activemq.xml");
System.err.println("Modifying configuration at: " + target + "last mod: " + new Date(target.lastModified()));
copyFile(new File(karafDir + "/data/tmp/modified-config.xml"), target);
System.err.println("new mod at: " + new Date(target.lastModified()));
withinReason(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
assertTrue("4MB limit", executeCommand("activemq:query").trim().contains("MemoryLimit = 4194304"));
return true;
}
});
}
}

View File

@ -0,0 +1,93 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="${broker-name}"
dataDirectory="${data}"
start="false">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true" memoryLimit="4mb" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<kahaDB directory="${data}/kahadb"/>
</persistenceAdapter>
<plugins>
<jaasAuthenticationPlugin configuration="karaf" />
<authorizationPlugin>
<map>
<authorizationMap groupClass="org.apache.karaf.jaas.boot.principal.RolePrincipal">
<authorizationEntries>
<authorizationEntry queue=">" read="admin" write="admin" admin="admin"/>
<authorizationEntry topic=">" read="admin" write="admin" admin="admin"/>
<authorizationEntry topic="ActiveMQ.Advisory.>" read="admin" write="admin" admin="admin"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
<runtimeConfigurationPlugin checkPeriod="2000" />
</plugins>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="64 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000"/>
<transportConnector name="http" uri="http://0.0.0.0:61626"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:61636"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61646"/>
</transportConnectors>
</broker>
</beans>

View File

@ -0,0 +1,93 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="${broker-name}"
dataDirectory="${data}"
start="false">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true" memoryLimit="3mb" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<kahaDB directory="${data}/kahadb"/>
</persistenceAdapter>
<plugins>
<jaasAuthenticationPlugin configuration="karaf" />
<authorizationPlugin>
<map>
<authorizationMap groupClass="org.apache.karaf.jaas.boot.principal.RolePrincipal">
<authorizationEntries>
<authorizationEntry queue=">" read="admin" write="admin" admin="admin"/>
<authorizationEntry topic=">" read="admin" write="admin" admin="admin"/>
<authorizationEntry topic="ActiveMQ.Advisory.>" read="admin" write="admin" admin="admin"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
<runtimeConfigurationPlugin checkPeriod="2000" />
</plugins>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="64 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000"/>
<transportConnector name="http" uri="http://0.0.0.0:61626"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:61636"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61646"/>
</transportConnectors>
</broker>
</beans>

View File

@ -17,6 +17,7 @@
package org.apache.activemq.osgi; package org.apache.activemq.osgi;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.spring.SpringBrokerContext;
import org.apache.activemq.spring.Utils; import org.apache.activemq.spring.Utils;
import org.apache.xbean.spring.context.ResourceXmlApplicationContext; import org.apache.xbean.spring.context.ResourceXmlApplicationContext;
import org.osgi.framework.BundleContext; import org.osgi.framework.BundleContext;
@ -95,6 +96,10 @@ public class ActiveMQServiceFactory implements ManagedServiceFactory {
} }
//TODO deal with multiple brokers //TODO deal with multiple brokers
SpringBrokerContext brokerContext = new SpringBrokerContext();
brokerContext.setConfigurationUrl(resource.getURL().toExternalForm());
brokerContext.setApplicationContext(ctx);
broker.setBrokerContext(brokerContext);
broker.start(); broker.start();
broker.waitUntilStarted(); broker.waitUntilStarted();

View File

@ -17,7 +17,6 @@
package org.apache.activemq.plugin; package org.apache.activemq.plugin;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.util.ArrayList; import java.util.ArrayList;
@ -132,6 +131,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
BrokerContext brokerContext = next.getBrokerService().getBrokerContext(); BrokerContext brokerContext = next.getBrokerService().getBrokerContext();
if (brokerContext != null) { if (brokerContext != null) {
configToMonitor = Utils.resourceFromString(brokerContext.getConfigurationUrl()); configToMonitor = Utils.resourceFromString(brokerContext.getConfigurationUrl());
info("Configuration " + configToMonitor);
} else { } else {
LOG.error("Null BrokerContext; impossible to determine configuration url resource from broker, updates cannot be tracked"); LOG.error("Null BrokerContext; impossible to determine configuration url resource from broker, updates cannot be tracked");
} }
@ -683,8 +683,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
private void loadPropertiesPlaceHolderSupport(Document doc) { private void loadPropertiesPlaceHolderSupport(Document doc) {
BrokerContext brokerContext = getBrokerService().getBrokerContext(); BrokerContext brokerContext = getBrokerService().getBrokerContext();
if (brokerContext != null && !brokerContext.getBeansOfType(PropertyPlaceholderConfigurer.class).isEmpty()) { if (brokerContext != null) {
Properties initialProperties = new Properties(System.getProperties()); Properties initialProperties = new Properties(System.getProperties());
placeHolderUtil = new PropertiesPlaceHolderUtil(initialProperties); placeHolderUtil = new PropertiesPlaceHolderUtil(initialProperties);
mergeProperties(doc, initialProperties); mergeProperties(doc, initialProperties);
@ -728,7 +727,9 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
List<Resource> propResources = new LinkedList<Resource>(); List<Resource> propResources = new LinkedList<Resource>();
for (String value : resourcesString.split(",")) { for (String value : resourcesString.split(",")) {
try { try {
if (!value.isEmpty()) {
propResources.add(Utils.resourceFromString(replacePlaceHolders(value))); propResources.add(Utils.resourceFromString(replacePlaceHolders(value)));
}
} catch (MalformedURLException e) { } catch (MalformedURLException e) {
info("failed to resolve resource: " + value, e); info("failed to resolve resource: " + value, e);
} }
@ -762,18 +763,23 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
SchemaFactory schemaFactory = SchemaFactory.newInstance( SchemaFactory schemaFactory = SchemaFactory.newInstance(
XMLConstants.W3C_XML_SCHEMA_NS_URI); XMLConstants.W3C_XML_SCHEMA_NS_URI);
// avoid going to the net to pull down the spring schema ArrayList<StreamSource> schemas = new ArrayList<StreamSource>();
schemas.add(new StreamSource(getClass().getResource("/activemq.xsd").toExternalForm()));
// avoid going to the net to pull down the spring schema,
// REVISIT may need to be smarter in osgi
final PluggableSchemaResolver springResolver = final PluggableSchemaResolver springResolver =
new PluggableSchemaResolver(getClass().getClassLoader()); new PluggableSchemaResolver(getClass().getClassLoader());
final InputSource beanInputSource = final InputSource beanInputSource =
springResolver.resolveEntity( springResolver.resolveEntity(
"http://www.springframework.org/schema/beans", "http://www.springframework.org/schema/beans",
"http://www.springframework.org/schema/beans/spring-beans-2.0.xsd"); "http://www.springframework.org/schema/beans/spring-beans.xsd");
if (beanInputSource != null) {
schema = schemaFactory.newSchema(new Source[]{ schemas.add(new StreamSource(beanInputSource.getByteStream()));
new StreamSource(getClass().getResource("/activemq.xsd").toExternalForm()), } else {
new StreamSource(beanInputSource.getByteStream()) schemas.add(new StreamSource("http://www.springframework.org/schema/beans/spring-beans.xsd"));
}); }
schema = schemaFactory.newSchema(schemas.toArray(new Source[]{}));
} }
return schema; return schema;
} }
@ -828,6 +834,31 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
start = matcher.end(); start = matcher.end();
} }
} }
return replaceBytePostfix(str);
}
static Pattern[] byteMatchers = new Pattern[] {
Pattern.compile("^\\s*(\\d+)\\s*(b)?\\s*$", Pattern.CASE_INSENSITIVE),
Pattern.compile("^\\s*(\\d+)\\s*k(b)?\\s*$", Pattern.CASE_INSENSITIVE),
Pattern.compile("^\\s*(\\d+)\\s*m(b)?\\s*$", Pattern.CASE_INSENSITIVE),
Pattern.compile("^\\s*(\\d+)\\s*g(b)?\\s*$", Pattern.CASE_INSENSITIVE)};
// xbean can Xb, Xkb, Xmb, Xg etc
private String replaceBytePostfix(String str) {
try {
for (int i=0; i< byteMatchers.length; i++) {
Matcher matcher = byteMatchers[i].matcher(str);
if (matcher.matches()) {
long value = Long.parseLong(matcher.group(1));
for (int j=1; j<=i; j++) {
value *= 1024;
}
return String.valueOf(value);
}
}
} catch (NumberFormatException ignored) {
LOG.debug("nfe on: " + str, ignored);
}
return str; return str;
} }

View File

@ -37,10 +37,10 @@ public class PolicyEntryTest extends RuntimeConfigTestSupport {
verifyQueueLimit("Before", 1024); verifyQueueLimit("Before", 1024);
applyNewConfig(brokerConfig, configurationSeed + "-policy-ml-mod", SLEEP); applyNewConfig(brokerConfig, configurationSeed + "-policy-ml-mod", SLEEP);
verifyQueueLimit("After", 2048); verifyQueueLimit("After", 4194304);
// change to existing dest // change to existing dest
verifyQueueLimit("Before", 2048); verifyQueueLimit("Before", 4194304);
} }
@Test @Test

View File

@ -28,8 +28,8 @@
<destinationPolicy> <destinationPolicy>
<policyMap> <policyMap>
<policyEntries> <policyEntries>
<policyEntry queue=">" memoryLimit="2048"/> <policyEntry queue=">" memoryLimit="2048b"/>
<policyEntry topic=">" memoryLimit="2048"/> <policyEntry topic=">" memoryLimit="2kb"/>
</policyEntries> </policyEntries>
</policyMap> </policyMap>
</destinationPolicy> </destinationPolicy>

View File

@ -28,7 +28,7 @@
<destinationPolicy> <destinationPolicy>
<policyMap> <policyMap>
<policyEntries> <policyEntries>
<policyEntry queue=">" memoryLimit="2048"/> <policyEntry queue=">" memoryLimit="4 mb"/>
</policyEntries> </policyEntries>
</policyMap> </policyMap>
</destinationPolicy> </destinationPolicy>

View File

@ -28,7 +28,7 @@
<destinationPolicy> <destinationPolicy>
<policyMap> <policyMap>
<policyEntries> <policyEntries>
<policyEntry queue=">" memoryLimit="1024"/> <policyEntry queue=">" memoryLimit="1024b"/>
</policyEntries> </policyEntries>
</policyMap> </policyMap>
</destinationPolicy> </destinationPolicy>