mirror of https://github.com/apache/activemq.git
Cleanup the leaky abstraction of the DiscoveryAgent. see:
https://issues.apache.org/activemq/browse/AMQ-1477 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@587927 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1c1ed4d953
commit
43b808d158
|
@ -229,7 +229,6 @@ public class TransportConnector implements Connector {
|
||||||
getServer().start();
|
getServer().start();
|
||||||
DiscoveryAgent da = getDiscoveryAgent();
|
DiscoveryAgent da = getDiscoveryAgent();
|
||||||
if (da != null) {
|
if (da != null) {
|
||||||
da.setBrokerName(getBrokerInfo().getBrokerName());
|
|
||||||
da.registerService(getConnectUri().toString());
|
da.registerService(getConnectUri().toString());
|
||||||
da.start();
|
da.start();
|
||||||
}
|
}
|
||||||
|
|
|
@ -141,7 +141,6 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
|
||||||
this.discoveryAgent = discoveryAgent;
|
this.discoveryAgent = discoveryAgent;
|
||||||
if (discoveryAgent != null) {
|
if (discoveryAgent != null) {
|
||||||
this.discoveryAgent.setDiscoveryListener(this);
|
this.discoveryAgent.setDiscoveryListener(this);
|
||||||
this.discoveryAgent.setBrokerName(getBrokerName());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,10 +53,4 @@ public interface DiscoveryAgent extends Service {
|
||||||
*/
|
*/
|
||||||
void serviceFailed(DiscoveryEvent event) throws IOException;
|
void serviceFailed(DiscoveryEvent event) throws IOException;
|
||||||
|
|
||||||
String getGroup();
|
|
||||||
|
|
||||||
void setGroup(String group);
|
|
||||||
|
|
||||||
void setBrokerName(String brokerName);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -160,7 +160,6 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
|
||||||
private boolean loopBackMode;
|
private boolean loopBackMode;
|
||||||
private Map<String, RemoteBrokerData> brokersByService = new ConcurrentHashMap<String, RemoteBrokerData>();
|
private Map<String, RemoteBrokerData> brokersByService = new ConcurrentHashMap<String, RemoteBrokerData>();
|
||||||
private String group = "default";
|
private String group = "default";
|
||||||
private String brokerName;
|
|
||||||
private URI discoveryURI;
|
private URI discoveryURI;
|
||||||
private InetAddress inetAddress;
|
private InetAddress inetAddress;
|
||||||
private SocketAddress sockAddress;
|
private SocketAddress sockAddress;
|
||||||
|
@ -200,43 +199,6 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the group used for discovery
|
|
||||||
*
|
|
||||||
* @return the group
|
|
||||||
*/
|
|
||||||
public String getGroup() {
|
|
||||||
return group;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the group for discovery
|
|
||||||
*
|
|
||||||
* @param group
|
|
||||||
*/
|
|
||||||
public void setGroup(String group) {
|
|
||||||
this.group = group;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return Returns the brokerName.
|
|
||||||
*/
|
|
||||||
public String getBrokerName() {
|
|
||||||
return brokerName;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param brokerName The brokerName to set.
|
|
||||||
*/
|
|
||||||
public void setBrokerName(String brokerName) {
|
|
||||||
if (brokerName != null) {
|
|
||||||
brokerName = brokerName.replace('.', '-');
|
|
||||||
brokerName = brokerName.replace(':', '-');
|
|
||||||
brokerName = brokerName.replace('%', '-');
|
|
||||||
this.brokerName = brokerName;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Returns the loopBackMode.
|
* @return Returns the loopBackMode.
|
||||||
*/
|
*/
|
||||||
|
@ -299,9 +261,6 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
|
||||||
if (group == null || group.length() == 0) {
|
if (group == null || group.length() == 0) {
|
||||||
throw new IOException("You must specify a group to discover");
|
throw new IOException("You must specify a group to discover");
|
||||||
}
|
}
|
||||||
if (brokerName == null || brokerName.length() == 0) {
|
|
||||||
LOG.warn("brokerName not set");
|
|
||||||
}
|
|
||||||
String type = getType();
|
String type = getType();
|
||||||
if (!type.endsWith(".")) {
|
if (!type.endsWith(".")) {
|
||||||
LOG.warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
|
LOG.warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
|
||||||
|
@ -369,15 +328,11 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
|
||||||
if (payload.startsWith(ALIVE)) {
|
if (payload.startsWith(ALIVE)) {
|
||||||
String brokerName = getBrokerName(payload.substring(ALIVE.length()));
|
String brokerName = getBrokerName(payload.substring(ALIVE.length()));
|
||||||
String service = payload.substring(ALIVE.length() + brokerName.length() + 2);
|
String service = payload.substring(ALIVE.length() + brokerName.length() + 2);
|
||||||
if (!brokerName.equals(this.brokerName)) {
|
|
||||||
processAlive(brokerName, service);
|
processAlive(brokerName, service);
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
String brokerName = getBrokerName(payload.substring(DEAD.length()));
|
String brokerName = getBrokerName(payload.substring(DEAD.length()));
|
||||||
String service = payload.substring(DEAD.length() + brokerName.length() + 2);
|
String service = payload.substring(DEAD.length() + brokerName.length() + 2);
|
||||||
if (!brokerName.equals(this.brokerName)) {
|
processDead(service);
|
||||||
processDead(brokerName, service);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -398,7 +353,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
|
||||||
if (selfService != null) {
|
if (selfService != null) {
|
||||||
String payload = getType();
|
String payload = getType();
|
||||||
payload += started.get() ? ALIVE : DEAD;
|
payload += started.get() ? ALIVE : DEAD;
|
||||||
payload += DELIMITER + brokerName + DELIMITER;
|
payload += DELIMITER + "localhost" + DELIMITER;
|
||||||
payload += selfService;
|
payload += selfService;
|
||||||
try {
|
try {
|
||||||
byte[] data = payload.getBytes();
|
byte[] data = payload.getBytes();
|
||||||
|
@ -439,7 +394,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processDead(String brokerName, String service) {
|
private void processDead(String service) {
|
||||||
if (!service.equals(selfService)) {
|
if (!service.equals(selfService)) {
|
||||||
RemoteBrokerData data = brokersByService.remove(service);
|
RemoteBrokerData data = brokersByService.remove(service);
|
||||||
if (data != null && !data.isFailed()) {
|
if (data != null && !data.isFailed()) {
|
||||||
|
@ -453,7 +408,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
|
||||||
for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) {
|
for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) {
|
||||||
RemoteBrokerData data = i.next();
|
RemoteBrokerData data = i.next();
|
||||||
if (data.getLastHeartBeat() < expireTime) {
|
if (data.getLastHeartBeat() < expireTime) {
|
||||||
processDead(brokerName, data.service);
|
processDead(data.service);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -552,4 +507,8 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
|
||||||
public void setUseExponentialBackOff(boolean useExponentialBackOff) {
|
public void setUseExponentialBackOff(boolean useExponentialBackOff) {
|
||||||
this.useExponentialBackOff = useExponentialBackOff;
|
this.useExponentialBackOff = useExponentialBackOff;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setGroup(String group) {
|
||||||
|
this.group = group;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -234,12 +234,4 @@ public class RendezvousDiscoveryAgent implements DiscoveryAgent, ServiceListener
|
||||||
// TODO: is there a way to notify the JmDNS that the service failed?
|
// TODO: is there a way to notify the JmDNS that the service failed?
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @param brokerName
|
|
||||||
* @see org.apache.activemq.transport.discovery.DiscoveryAgent#setBrokerName(java.lang.String)
|
|
||||||
*/
|
|
||||||
public void setBrokerName(String brokerName) {
|
|
||||||
// implementation of interface
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,6 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent {
|
||||||
private long minConnectTime = 500;
|
private long minConnectTime = 500;
|
||||||
private DiscoveryListener listener;
|
private DiscoveryListener listener;
|
||||||
private String services[] = new String[] {};
|
private String services[] = new String[] {};
|
||||||
private String group = "DEFAULT";
|
|
||||||
private final AtomicBoolean running = new AtomicBoolean(false);
|
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||||
|
|
||||||
class SimpleDiscoveryEvent extends DiscoveryEvent {
|
class SimpleDiscoveryEvent extends DiscoveryEvent {
|
||||||
|
@ -97,17 +96,6 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getGroup() {
|
|
||||||
return group;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setGroup(String group) {
|
|
||||||
this.group = group;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setBrokerName(String brokerName) {
|
|
||||||
}
|
|
||||||
|
|
||||||
public void serviceFailed(DiscoveryEvent devent) throws IOException {
|
public void serviceFailed(DiscoveryEvent devent) throws IOException {
|
||||||
|
|
||||||
final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
|
final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
|
||||||
|
|
|
@ -36,7 +36,6 @@ public class SimpleDiscoveryAgentFactory extends DiscoveryAgentFactory {
|
||||||
Map options = data.getParameters();
|
Map options = data.getParameters();
|
||||||
|
|
||||||
SimpleDiscoveryAgent rc = new SimpleDiscoveryAgent();
|
SimpleDiscoveryAgent rc = new SimpleDiscoveryAgent();
|
||||||
rc.setGroup(uri.getHost());
|
|
||||||
IntrospectionSupport.setProperties(rc, options);
|
IntrospectionSupport.setProperties(rc, options);
|
||||||
rc.setServices(data.getComponents());
|
rc.setServices(data.getComponents());
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue