mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-4682 - implement network connector modification, match, stop, apply restart
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1515461 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
67273afa5a
commit
25a1dd071a
|
@ -54,6 +54,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
|
|||
private DiscoveryAgent discoveryAgent;
|
||||
private Map<String, String> parameters;
|
||||
private final ConcurrentMap<URI, DiscoveryEvent> activeEvents = new ConcurrentHashMap<URI, DiscoveryEvent>();
|
||||
private URI discoveryUri;
|
||||
public DiscoveryNetworkConnector() {
|
||||
}
|
||||
|
||||
|
@ -62,6 +63,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
|
|||
}
|
||||
|
||||
public void setUri(URI discoveryURI) throws IOException {
|
||||
this.discoveryUri = discoveryURI;
|
||||
setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI));
|
||||
try {
|
||||
parameters = URISupport.parseParameters(discoveryURI);
|
||||
|
@ -72,6 +74,10 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
|
|||
}
|
||||
}
|
||||
|
||||
public URI getUri() {
|
||||
return discoveryUri;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onServiceAdd(DiscoveryEvent event) {
|
||||
// Ignore events once we start stopping.
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.util;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Date;
|
||||
|
@ -80,6 +81,8 @@ public final class TypeConversionSupport {
|
|||
CONVERSION_MAP.put(new ConversionKey(Float.class, String.class), toStringConverter);
|
||||
CONVERSION_MAP.put(new ConversionKey(Double.class, String.class), toStringConverter);
|
||||
CONVERSION_MAP.put(new ConversionKey(UTF8Buffer.class, String.class), toStringConverter);
|
||||
CONVERSION_MAP.put(new ConversionKey(URI.class, String.class), toStringConverter);
|
||||
CONVERSION_MAP.put(new ConversionKey(BigInteger.class, String.class), toStringConverter);
|
||||
|
||||
CONVERSION_MAP.put(new ConversionKey(String.class, Boolean.class), new Converter() {
|
||||
@Override
|
||||
|
|
|
@ -17,10 +17,12 @@
|
|||
package org.apache.activemq.plugin;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.TreeMap;
|
||||
import javax.xml.bind.JAXBContext;
|
||||
import javax.xml.bind.JAXBElement;
|
||||
import javax.xml.bind.JAXBException;
|
||||
|
@ -111,27 +113,89 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
|
|||
}
|
||||
|
||||
private void processNetworkConnectors(Broker currentConfiguration, Broker modifiedConfiguration) {
|
||||
List<Broker.NetworkConnectors> currentNc = filterElement(
|
||||
List<Broker.NetworkConnectors> currentNCsElems = filterElement(
|
||||
currentConfiguration.getContents(), Broker.NetworkConnectors.class);
|
||||
List<Broker.NetworkConnectors> modNc = filterElement(
|
||||
List<Broker.NetworkConnectors> modifiedNCsElems = filterElement(
|
||||
modifiedConfiguration.getContents(), Broker.NetworkConnectors.class);
|
||||
|
||||
int modIndex = 0, currentIndex = 0;
|
||||
for (; modIndex < modNc.size() && currentIndex < currentNc.size(); modIndex++, currentIndex++) {
|
||||
if (!modNc.get(modIndex).getContents().get(0).equals(
|
||||
currentNc.get(currentIndex).getContents().get(0))) {
|
||||
// change in order will fool this logic
|
||||
LOG.error("not supported: mod to existing network Connector, new: "
|
||||
+ modNc.get(modIndex).getContents().get(0));
|
||||
for (; modIndex < modifiedNCsElems.size() && currentIndex < currentNCsElems.size(); modIndex++, currentIndex++) {
|
||||
// walk the list of individual nc's...
|
||||
applyModifications(currentNCsElems.get(currentIndex).getContents(),
|
||||
modifiedNCsElems.get(modIndex).getContents());
|
||||
}
|
||||
|
||||
for (; modIndex < modifiedNCsElems.size(); modIndex++) {
|
||||
// new networkConnectors element; add all
|
||||
for (Object nc : modifiedNCsElems.get(modIndex).getContents()) {
|
||||
addNetworkConnector(nc);
|
||||
}
|
||||
}
|
||||
|
||||
for (; modIndex < modNc.size(); modIndex++) {
|
||||
// additions
|
||||
addNetworkConnector(modNc.get(modIndex).getContents().get(0));
|
||||
for (; currentIndex < currentNCsElems.size(); currentIndex++) {
|
||||
// removal of networkConnectors element; remove all
|
||||
for (Object nc : modifiedNCsElems.get(modIndex).getContents()) {
|
||||
removeNetworkConnector(nc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void applyModifications(List<Object> current, List<Object> modification) {
|
||||
int modIndex = 0, currentIndex = 0;
|
||||
for (; modIndex < modification.size() && currentIndex < current.size(); modIndex++, currentIndex++) {
|
||||
Object currentNc = current.get(currentIndex);
|
||||
Object candidateNc = modification.get(modIndex);
|
||||
if (! currentNc.equals(candidateNc)) {
|
||||
LOG.info("modification to:" + currentNc + " , with: " + candidateNc);
|
||||
removeNetworkConnector(currentNc);
|
||||
addNetworkConnector(candidateNc);
|
||||
}
|
||||
}
|
||||
|
||||
for (; modIndex < modification.size(); modIndex++) {
|
||||
addNetworkConnector(modification.get(modIndex));
|
||||
}
|
||||
|
||||
for (; currentIndex < current.size(); currentIndex++) {
|
||||
removeNetworkConnector(current.get(currentIndex));
|
||||
}
|
||||
}
|
||||
|
||||
private void removeNetworkConnector(Object o) {
|
||||
if (o instanceof NetworkConnector) {
|
||||
NetworkConnector toRemove = (NetworkConnector) o;
|
||||
for (org.apache.activemq.network.NetworkConnector existingCandidate :
|
||||
getBrokerService().getNetworkConnectors()) {
|
||||
if (configMatch(toRemove, existingCandidate)) {
|
||||
if (getBrokerService().removeNetworkConnector(existingCandidate)) {
|
||||
try {
|
||||
existingCandidate.stop();
|
||||
LOG.info("stopped and removed networkConnector: " + existingCandidate);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to stop removed network connector: " + existingCandidate);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean configMatch(NetworkConnector dto, org.apache.activemq.network.NetworkConnector candidate) {
|
||||
TreeMap<String, String> dtoProps = new TreeMap<String, String>();
|
||||
IntrospectionSupport.getProperties(dto, dtoProps, null);
|
||||
|
||||
TreeMap<String, String> candidateProps = new TreeMap<String, String>();
|
||||
IntrospectionSupport.getProperties(candidate, candidateProps, null);
|
||||
|
||||
// every dto prop must be present in the candidate
|
||||
for (String key : dtoProps.keySet()) {
|
||||
if (!candidateProps.containsKey(key) || !candidateProps.get(key).equals(dtoProps.get(key))) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void addNetworkConnector(Object o) {
|
||||
if (o instanceof NetworkConnector) {
|
||||
NetworkConnector networkConnector = (NetworkConnector) o;
|
||||
|
|
|
@ -74,9 +74,9 @@ public class NetworkConnectorTest {
|
|||
}
|
||||
|
||||
|
||||
@Ignore("not implemented yet!")
|
||||
@Test
|
||||
public void testModConnector() throws Exception {
|
||||
|
||||
final String brokerConfig = configurationSeed + "-one-nc-broker";
|
||||
applyNewConfig(brokerConfig, configurationSeed + "-one-nc");
|
||||
startBroker(brokerConfig);
|
||||
|
@ -100,6 +100,23 @@ public class NetworkConnectorTest {
|
|||
assertEquals("same instance", modNetworkConnector, brokerService.getNetworkConnectors().get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveConnector() throws Exception {
|
||||
|
||||
final String brokerConfig = configurationSeed + "-two-nc-broker";
|
||||
applyNewConfig(brokerConfig, configurationSeed + "-two-nc");
|
||||
startBroker(brokerConfig);
|
||||
assertTrue("broker alive", brokerService.isStarted());
|
||||
assertEquals("correct network connectors", 2, brokerService.getNetworkConnectors().size());
|
||||
|
||||
applyNewConfig(brokerConfig, configurationSeed + "-one-nc", SLEEP);
|
||||
|
||||
assertEquals("one network connectors", 1, brokerService.getNetworkConnectors().size());
|
||||
|
||||
NetworkConnector remainingNetworkConnector = brokerService.getNetworkConnectors().get(0);
|
||||
assertEquals("name match", "one", remainingNetworkConnector.getName());
|
||||
}
|
||||
|
||||
private void applyNewConfig(String configName, String newConfigName) throws Exception {
|
||||
applyNewConfig(configName, newConfigName, 0l);
|
||||
}
|
||||
|
@ -107,7 +124,7 @@ public class NetworkConnectorTest {
|
|||
private void applyNewConfig(String configName, String newConfigName, long sleep) throws Exception {
|
||||
Resource resource = Utils.resourceFromString("org/apache/activemq");
|
||||
FileOutputStream current = new FileOutputStream(new File(resource.getFile(), configName + ".xml"));
|
||||
FileInputStream modifications = new FileInputStream(new File(resource.getFile(), (newConfigName != null ? newConfigName : configName) + ".xml"));
|
||||
FileInputStream modifications = new FileInputStream(new File(resource.getFile(), newConfigName + ".xml"));
|
||||
modifications.getChannel().transferTo(0, Long.MAX_VALUE, current.getChannel());
|
||||
current.flush();
|
||||
LOG.info("Updated: " + current.getChannel());
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
</plugins>
|
||||
|
||||
<networkConnectors>
|
||||
<networkConnector uri="static:(tcp://localhost:5555)" networkTTL="2" />
|
||||
<networkConnector uri="static:(tcp://localhost:5555)" networkTTL="2" name="one" />
|
||||
</networkConnectors>
|
||||
</broker>
|
||||
</beans>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
</plugins>
|
||||
|
||||
<networkConnectors>
|
||||
<networkConnector uri="static:(tcp://localhost:5555)"/>
|
||||
<networkConnector uri="static:(tcp://localhost:5555)" networkTTL="1" name="one" />
|
||||
</networkConnectors>
|
||||
</broker>
|
||||
</beans>
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<beans
|
||||
xmlns="http://www.springframework.org/schema/beans"
|
||||
xmlns:amq="http://activemq.apache.org/schema/core"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
|
||||
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
|
||||
|
||||
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false" >
|
||||
<plugins>
|
||||
<runtimeConfigurationPlugin checkPeriod="1000" />
|
||||
</plugins>
|
||||
|
||||
<networkConnectors>
|
||||
<networkConnector uri="static:(tcp://localhost:5555)" networkTTL="1" name="one"/>
|
||||
<networkConnector uri="static:(tcp://localhost:5555)" networkTTL="1" name="two"/>
|
||||
</networkConnectors>
|
||||
</broker>
|
||||
</beans>
|
Loading…
Reference in New Issue