Revert back to javax.jms.Destination instead of Strings and added support

for a DestinationMarshaller

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@358551 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2005-12-22 10:28:04 +00:00
parent de06fc6e9e
commit 84077a3663
16 changed files with 320 additions and 196 deletions

View File

@ -13,7 +13,9 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ *
**/
package org.activecluster; package org.activecluster;
import java.io.Serializable; import java.io.Serializable;
@ -46,7 +48,7 @@ public interface Cluster extends Service {
* *
* @return the destination to send messages to all members of the cluster * @return the destination to send messages to all members of the cluster
*/ */
public String getDestination(); public Destination getDestination();
/** /**
* A snapshot of the nodes in the cluster indexed by the Destination * A snapshot of the nodes in the cluster indexed by the Destination
@ -94,17 +96,9 @@ public interface Cluster extends Service {
* @param message the message to be sent * @param message the message to be sent
* @throws JMSException * @throws JMSException
*/ */
public void send(String destination, Message message) throws JMSException; public void send(Destination destination, Message message) throws JMSException;
/**
* Utility method for sending back replies in message exchanges
*
* @param replyTo the replyTo JMS Destination on a Message
* @param message the message to be sent
* @throws JMSException
*/
public void send(Destination replyTo, Message message) throws JMSException;
/** /**
* Creates a consumer of all the messags sent to the given destination, * Creates a consumer of all the messags sent to the given destination,
* including messages sent via the send() messages * including messages sent via the send() messages
@ -113,7 +107,7 @@ public interface Cluster extends Service {
* @return a newly created message consumer * @return a newly created message consumer
* @throws JMSException * @throws JMSException
*/ */
public MessageConsumer createConsumer(String destination) throws JMSException; public MessageConsumer createConsumer(Destination destination) throws JMSException;
/** /**
* Creates a consumer of all message sent to the given destination, * Creates a consumer of all message sent to the given destination,
@ -125,7 +119,7 @@ public interface Cluster extends Service {
* @return a newly created message consumer * @return a newly created message consumer
* @throws JMSException * @throws JMSException
*/ */
public MessageConsumer createConsumer(String destination, String selector) throws JMSException; public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException;
/** /**
* Creates a consumer of all message sent to the given destination, * Creates a consumer of all message sent to the given destination,
@ -139,7 +133,7 @@ public interface Cluster extends Service {
* @return a newly created message consumer * @return a newly created message consumer
* @throws JMSException * @throws JMSException
*/ */
public MessageConsumer createConsumer(String destination, String selector, boolean noLocal) throws JMSException; public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException;
// Message factory methods // Message factory methods

View File

@ -13,10 +13,12 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ *
**/
package org.activecluster; package org.activecluster;
import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -27,6 +29,37 @@ import javax.jms.JMSException;
*/ */
public interface ClusterFactory { public interface ClusterFactory {
/**
* Creates a new cluster connection using the given local name and destination name
* @param localName
* @param destinationName
*
* @return Cluster
* @throws JMSException
*/
public Cluster createCluster(String localName,String destinationName) throws JMSException;
/**
* Creates a new cluster connection using the given local name and destination name
* @param localName
* @param destinationName
* @param marshaller
*
* @return Cluster
* @throws JMSException
*/
public Cluster createCluster(String localName,String destinationName,DestinationMarshaller marshaller) throws JMSException;
/**
* Creates a new cluster connection - generating the localName automatically
* @param destinationName
* @return the Cluster
* @throws JMSException
*/
public Cluster createCluster(String destinationName) throws JMSException;
/** /**
* Creates a new cluster connection using the given local name and destination name * Creates a new cluster connection using the given local name and destination name
* @param localName * @param localName
@ -35,14 +68,26 @@ public interface ClusterFactory {
* @return Cluster * @return Cluster
* @throws JMSException * @throws JMSException
*/ */
public Cluster createCluster(String localName,String destination) throws JMSException; public Cluster createCluster(String localName,Destination destination) throws JMSException;
/**
* Creates a new cluster connection using the given local name and destination name
* @param localName
* @param destination
* @param marshaller
*
* @return Cluster
* @throws JMSException
*/
public Cluster createCluster(String localName,Destination destination, DestinationMarshaller marshaller) throws JMSException;
/** /**
* Creates a new cluster connection - generating the localName automatically * Creates a new cluster connection - generating the localName automatically
* @param destination * @param destination
* @return * @return the Cluster
* @throws JMSException * @throws JMSException
*/ */
public Cluster createCluster(String destination) throws JMSException; public Cluster createCluster(Destination destination) throws JMSException;
} }

View File

@ -13,11 +13,12 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ *
**/
package org.activecluster; package org.activecluster;
import java.io.Serializable;
import java.util.Map; import java.util.Map;
import javax.jms.Destination;
/** /**
@ -25,14 +26,14 @@ import java.util.Map;
* *
* @version $Revision: 1.3 $ * @version $Revision: 1.3 $
*/ */
public interface Node extends Serializable { public interface Node {
/** /**
* Access to the queue to send messages direct to this node. * Access to the queue to send messages direct to this node.
* *
* @return the destination to send messages to this node while its available * @return the destination to send messages to this node while its available
*/ */
public String getDestination(); public Destination getDestination();
/** /**
* @return an immutable map of the nodes state * @return an immutable map of the nodes state

View File

@ -13,7 +13,8 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ *
**/
package org.activecluster.impl; package org.activecluster.impl;
import java.io.Serializable; import java.io.Serializable;
@ -33,6 +34,7 @@ import javax.jms.StreamMessage;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import org.activecluster.Cluster; import org.activecluster.Cluster;
import org.activecluster.ClusterListener; import org.activecluster.ClusterListener;
import org.activecluster.DestinationMarshaller;
import org.activecluster.LocalNode; import org.activecluster.LocalNode;
import org.activecluster.Service; import org.activecluster.Service;
import org.activecluster.election.ElectionStrategy; import org.activecluster.election.ElectionStrategy;
@ -50,41 +52,53 @@ public class DefaultCluster implements Cluster {
private StateServiceImpl stateService; private StateServiceImpl stateService;
private LocalNode localNode; private LocalNode localNode;
private String destination; private Destination destination;
private Connection connection; private Connection connection;
private Session session; private Session session;
private MessageProducer producer; private MessageProducer producer;
private MessageConsumer consumer; private MessageConsumer consumer;
private Timer timer; private Timer timer;
private DestinationMarshaller marshaller;
private AtomicBoolean started = new AtomicBoolean(false); private AtomicBoolean started = new AtomicBoolean(false);
private Object clusterLock = new Object(); private Object clusterLock = new Object();
public DefaultCluster(final LocalNode localNode,String dataDestination, String destination, Connection connection, Session session, /**
MessageProducer producer, Timer timer, long inactiveTime) throws JMSException { * Construct this beast
this.localNode = localNode; * @param localNode
this.destination = destination; * @param dataDestination
this.connection = connection; * @param destination
this.session = session; * @param marshaller
this.producer = producer; * @param connection
this.timer = timer; * @param session
* @param producer
if (producer == null) { * @param timer
* @param inactiveTime
* @throws JMSException
*/
public DefaultCluster(final LocalNode localNode,Destination dataDestination,Destination destination,
DestinationMarshaller marshaller,Connection connection,Session session,MessageProducer producer,
Timer timer,long inactiveTime) throws JMSException{
this.localNode=localNode;
this.destination=destination;
this.marshaller=marshaller;
this.connection=connection;
this.session=session;
this.producer=producer;
this.timer=timer;
if(producer==null){
throw new IllegalArgumentException("No producer specified!"); throw new IllegalArgumentException("No producer specified!");
} }
// now lets subscribe the service to the updates from the data topic // now lets subscribe the service to the updates from the data topic
consumer = session.createConsumer(createDestination(dataDestination), null, true); consumer=session.createConsumer(dataDestination,null,true);
log.info("Creating data consumer on topic: "+dataDestination);
log.info("Creating data consumer on topic: " + dataDestination); this.stateService=new StateServiceImpl(this,clusterLock,new Runnable(){
public void run(){
this.stateService = new StateServiceImpl(this, clusterLock, new Runnable() { if(localNode instanceof ReplicatedLocalNode){
public void run() {
if (localNode instanceof ReplicatedLocalNode) {
((ReplicatedLocalNode) localNode).pingRemoteNodes(); ((ReplicatedLocalNode) localNode).pingRemoteNodes();
} }
} }
}, timer, inactiveTime); },timer,inactiveTime);
consumer.setMessageListener(new StateConsumer(stateService)); consumer.setMessageListener(new StateConsumer(stateService,marshaller));
} }
public void addClusterListener(ClusterListener listener) { public void addClusterListener(ClusterListener listener) {
@ -95,7 +109,7 @@ public class DefaultCluster implements Cluster {
stateService.removeClusterListener(listener); stateService.removeClusterListener(listener);
} }
public String getDestination() { public Destination getDestination() {
return destination; return destination;
} }
@ -111,24 +125,21 @@ public class DefaultCluster implements Cluster {
stateService.setElectionStrategy(strategy); stateService.setElectionStrategy(strategy);
} }
public void send(String destination,Message message) throws JMSException {
producer.send(createDestination(destination), message);
}
public void send(Destination replyTo, Message message) throws JMSException{ public void send(Destination replyTo, Message message) throws JMSException{
producer.send(replyTo,message); producer.send(replyTo,message);
} }
public MessageConsumer createConsumer(String destination) throws JMSException { public MessageConsumer createConsumer(Destination destination) throws JMSException {
return getSession().createConsumer(createDestination(destination)); return getSession().createConsumer(destination);
} }
public MessageConsumer createConsumer(String destination, String selector) throws JMSException { public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
return getSession().createConsumer(createDestination(destination), selector); return getSession().createConsumer(destination, selector);
} }
public MessageConsumer createConsumer(String destination, String selector, boolean noLocal) throws JMSException { public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
return getSession().createConsumer(createDestination(destination), selector, noLocal); return getSession().createConsumer(destination, selector, noLocal);
} }
public Message createMessage() throws JMSException { public Message createMessage() throws JMSException {

View File

@ -20,6 +20,7 @@ import java.util.Timer;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode; import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
@ -27,6 +28,7 @@ import javax.jms.Topic;
import org.activecluster.Cluster; import org.activecluster.Cluster;
import org.activecluster.ClusterException; import org.activecluster.ClusterException;
import org.activecluster.ClusterFactory; import org.activecluster.ClusterFactory;
import org.activecluster.DestinationMarshaller;
import org.activemq.util.IdGenerator; import org.activemq.util.IdGenerator;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -61,16 +63,42 @@ public class DefaultClusterFactory implements ClusterFactory {
this(connectionFactory, false, Session.AUTO_ACKNOWLEDGE, "ACTIVECLUSTER.DATA.", 6000L); this(connectionFactory, false, Session.AUTO_ACKNOWLEDGE, "ACTIVECLUSTER.DATA.", 6000L);
} }
public Cluster createCluster(String groupDestination) throws JMSException { public Cluster createCluster(Destination groupDestination) throws JMSException {
return createCluster(idGenerator.generateId(), groupDestination); return createCluster(idGenerator.generateId(), groupDestination);
} }
public Cluster createCluster(String name,String groupDestination) throws JMSException { public Cluster createCluster(String name,Destination groupDestination) throws JMSException {
Connection connection = getConnectionFactory().createConnection(); Connection connection = getConnectionFactory().createConnection();
Session session = createSession(connection); Session session = createSession(connection);
return createCluster(connection, session, name,groupDestination); return createCluster(connection, session, name,groupDestination,new DefaultDestinationMarshaller());
} }
public Cluster createCluster(String name,Destination groupDestination,DestinationMarshaller marshaller) throws JMSException {
Connection connection = getConnectionFactory().createConnection();
Session session = createSession(connection);
return createCluster(connection, session, name,groupDestination,marshaller);
}
public Cluster createCluster(String name,String groupDestinationName) throws JMSException{
Connection connection = getConnectionFactory().createConnection();
Session session = createSession(connection);
return createCluster(connection, session, name,session.createTopic(groupDestinationName),new DefaultDestinationMarshaller());
}
public Cluster createCluster(String name,String groupDestinationName,DestinationMarshaller marshaller) throws JMSException{
Connection connection = getConnectionFactory().createConnection();
Session session = createSession(connection);
return createCluster(connection, session, name,session.createTopic(groupDestinationName),marshaller);
}
public Cluster createCluster(String groupDestinationName) throws JMSException{
return createCluster(idGenerator.generateId(), groupDestinationName);
}
// Properties // Properties
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
public String getDataTopicPrefix() { public String getDataTopicPrefix() {
@ -134,34 +162,29 @@ public class DefaultClusterFactory implements ClusterFactory {
// Implementation methods // Implementation methods
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
protected Cluster createCluster(Connection connection, Session session, String name,String groupDestination) throws JMSException { protected Cluster createCluster(Connection connection,Session session,String name,Destination groupDestination,
String dataDestination = dataTopicPrefix + groupDestination; DestinationMarshaller marshaller) throws JMSException{
String dataDestination=dataTopicPrefix+groupDestination;
log.info("Creating cluster group producer on topic: " + groupDestination); log.info("Creating cluster group producer on topic: "+groupDestination);
MessageProducer producer=createProducer(session,null);
MessageProducer producer = createProducer(session, null);
producer.setDeliveryMode(deliveryMode); producer.setDeliveryMode(deliveryMode);
log.info("Creating cluster data producer on data destination: "+dataDestination);
log.info("Creating cluster data producer on data destination: " + dataDestination); Topic dataTopic=session.createTopic(dataDestination);
MessageProducer keepAliveProducer=session.createProducer(dataTopic);
Topic dataTopic = session.createTopic(dataDestination);
MessageProducer keepAliveProducer = session.createProducer(dataTopic);
keepAliveProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); keepAliveProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
StateService serviceStub = new StateServiceStub(session, keepAliveProducer); StateService serviceStub=new StateServiceStub(session,keepAliveProducer,marshaller);
Destination localInboxDestination=session.createTopic(dataDestination+"."+name);
String localInboxDestination = dataDestination + "." + name; ReplicatedLocalNode localNode=new ReplicatedLocalNode(name,localInboxDestination,serviceStub);
Timer timer=new Timer();
ReplicatedLocalNode localNode = new ReplicatedLocalNode(name,localInboxDestination, serviceStub); DefaultCluster answer=new DefaultCluster(localNode,dataTopic,groupDestination,marshaller,connection,session,
Timer timer = new Timer(); producer,timer,inactiveTime);
DefaultCluster answer = new DefaultCluster(localNode, dataDestination, groupDestination, connection, session, producer, timer, inactiveTime);
return answer; return answer;
} }
/* /*
protected Cluster createInternalCluster(Session session, Topic dataDestination) { * protected Cluster createInternalCluster(Session session, Topic dataDestination) { MessageProducer producer =
MessageProducer producer = createProducer(session); * createProducer(session); return new DefaultCluster(new NonReplicatedLocalNode(), dataDestination, connection,
return new DefaultCluster(new NonReplicatedLocalNode(), dataDestination, connection, session, producer); * session, producer); }
}
*/ */
protected MessageProducer createProducer(Session session, Topic groupDestination) throws JMSException { protected MessageProducer createProducer(Session session, Topic groupDestination) throws JMSException {

View File

@ -13,11 +13,17 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ *
**/
package org.activecluster.impl; package org.activecluster.impl;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import javax.jms.Destination;
import org.activecluster.DestinationMarshaller;
import org.activecluster.Node; import org.activecluster.Node;
@ -26,13 +32,22 @@ import org.activecluster.Node;
* *
* @version $Revision: 1.3 $ * @version $Revision: 1.3 $
*/ */
public class NodeImpl implements Node { public class NodeImpl implements Node{
private static final long serialVersionUID=-3909792803360045064L; private static final long serialVersionUID=-3909792803360045064L;
private String name; private String name;
private String destination; private Destination destination;
protected Map state; protected Map state;
protected boolean coordinator; protected boolean coordinator;
/**
* Construct an Node from a NodeState
* @param nodeState
* @param marshaller
*/
public NodeImpl(NodeState nodeState,DestinationMarshaller marshaller){
this(nodeState.getName(),marshaller.getDestination(nodeState.getDestinationName()),nodeState.getState());
}
/** /**
* Allow a node to be copied for sending it as a message * Allow a node to be copied for sending it as a message
* *
@ -47,7 +62,7 @@ public class NodeImpl implements Node {
* @param name * @param name
* @param destination * @param destination
*/ */
public NodeImpl(String name,String destination) { public NodeImpl(String name,Destination destination) {
this(name,destination, new HashMap()); this(name,destination, new HashMap());
} }
@ -57,7 +72,7 @@ public class NodeImpl implements Node {
* @param destination * @param destination
* @param state * @param state
*/ */
public NodeImpl(String name,String destination, Map state) { public NodeImpl(String name,Destination destination, Map state) {
this.name = name; this.name = name;
this.destination = destination; this.destination = destination;
this.state = state; this.state = state;
@ -80,7 +95,7 @@ public class NodeImpl implements Node {
/** /**
* @return the destination of the node * @return the destination of the node
*/ */
public String getDestination() { public Destination getDestination() {
return destination; return destination;
} }
@ -118,4 +133,14 @@ public class NodeImpl implements Node {
protected void setCoordinator(boolean value) { protected void setCoordinator(boolean value) {
coordinator = value; coordinator = value;
} }
public void writeExternal(ObjectOutput out) throws IOException{
// TODO Auto-generated method stub
}
public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException{
// TODO Auto-generated method stub
}
} }

View File

@ -13,10 +13,12 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ *
**/
package org.activecluster.impl; package org.activecluster.impl;
import java.util.Map; import java.util.Map;
import javax.jms.Destination;
import org.activecluster.LocalNode; import org.activecluster.LocalNode;
/** /**
@ -33,7 +35,7 @@ public class NonReplicatedLocalNode extends NodeImpl implements LocalNode {
* @param name * @param name
* @param destination * @param destination
*/ */
public NonReplicatedLocalNode(String name, String destination) { public NonReplicatedLocalNode(String name, Destination destination) {
super(name,destination); super(name,destination);
} }

View File

@ -13,10 +13,12 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ *
**/
package org.activecluster.impl; package org.activecluster.impl;
import java.util.Map; import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.activecluster.LocalNode; import org.activecluster.LocalNode;
import org.activecluster.Service; import org.activecluster.Service;
@ -34,7 +36,7 @@ public class ReplicatedLocalNode extends NodeImpl implements LocalNode, Service
* *
*/ */
private static final long serialVersionUID=4626381612145333540L; private static final long serialVersionUID=4626381612145333540L;
private StateService serviceStub; private transient StateService serviceStub;
/** /**
* Create ReplicatedLocalNode * Create ReplicatedLocalNode
@ -42,7 +44,7 @@ public class ReplicatedLocalNode extends NodeImpl implements LocalNode, Service
* @param destination * @param destination
* @param serviceStub * @param serviceStub
*/ */
public ReplicatedLocalNode(String name,String destination, StateService serviceStub) { public ReplicatedLocalNode(String name,Destination destination, StateService serviceStub) {
super(name,destination); super(name,destination);
this.serviceStub = serviceStub; this.serviceStub = serviceStub;
} }

View File

@ -13,11 +13,13 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ *
**/
package org.activecluster.impl; package org.activecluster.impl;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.activecluster.DestinationMarshaller;
import org.activecluster.Node; import org.activecluster.Node;
import javax.jms.Message; import javax.jms.Message;
@ -36,12 +38,14 @@ public class StateConsumer implements MessageListener {
private final static Log log = LogFactory.getLog(StateConsumer.class); private final static Log log = LogFactory.getLog(StateConsumer.class);
private StateService stateService; private StateService stateService;
private DestinationMarshaller marshaller;
public StateConsumer(StateService stateService) { public StateConsumer(StateService stateService,DestinationMarshaller marshaller) {
if (stateService == null) { if (stateService == null) {
throw new IllegalArgumentException("Must specify a valid StateService implementation"); throw new IllegalArgumentException("Must specify a valid StateService implementation");
} }
this.stateService = stateService; this.stateService = stateService;
this.marshaller = marshaller;
} }
public void onMessage(Message message) { public void onMessage(Message message) {
@ -52,7 +56,8 @@ public class StateConsumer implements MessageListener {
if (message instanceof ObjectMessage) { if (message instanceof ObjectMessage) {
ObjectMessage objectMessage = (ObjectMessage) message; ObjectMessage objectMessage = (ObjectMessage) message;
try { try {
Node node = (Node) objectMessage.getObject(); NodeState nodeState = (NodeState) objectMessage.getObject();
Node node = new NodeImpl(nodeState,marshaller);
String type = objectMessage.getJMSType(); String type = objectMessage.getJMSType();
if (type != null && type.equals("shutdown")) { if (type != null && type.equals("shutdown")) {
stateService.shutdown(node); stateService.shutdown(node);

View File

@ -13,7 +13,8 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ *
**/
package org.activecluster.impl; package org.activecluster.impl;
import java.util.HashMap; import java.util.HashMap;
@ -23,6 +24,7 @@ import java.util.Map;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.Map.Entry; import java.util.Map.Entry;
import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.activecluster.Cluster; import org.activecluster.Cluster;
import org.activecluster.ClusterEvent; import org.activecluster.ClusterEvent;
@ -49,7 +51,7 @@ public class StateServiceImpl implements StateService {
private Map nodes = new ConcurrentHashMap(); private Map nodes = new ConcurrentHashMap();
private long inactiveTime; private long inactiveTime;
private List listeners = new CopyOnWriteArrayList(); private List listeners = new CopyOnWriteArrayList();
private String localDestination; private Destination localDestination;
private Runnable localNodePing; private Runnable localNodePing;
private NodeImpl coordinator; private NodeImpl coordinator;
private ElectionStrategy electionStrategy; private ElectionStrategy electionStrategy;
@ -133,7 +135,7 @@ public class StateServiceImpl implements StateService {
* @param node * @param node
*/ */
public void keepAlive(Node node) { public void keepAlive(Node node) {
String key = node.getDestination(); Object key = node.getDestination();
if (key != null && !localDestination.equals(key)) { if (key != null && !localDestination.equals(key)) {
NodeEntry entry = (NodeEntry) nodes.get(key); NodeEntry entry = (NodeEntry) nodes.get(key);
if (entry == null) { if (entry == null) {
@ -163,7 +165,7 @@ public class StateServiceImpl implements StateService {
* shutdown the node * shutdown the node
*/ */
public void shutdown(Node node){ public void shutdown(Node node){
String key=node.getDestination(); Object key=node.getDestination();
if(key!=null){ if(key!=null){
nodes.remove(key); nodes.remove(key);
ClusterEvent event=new ClusterEvent(cluster,node,ClusterEvent.ADD_NODE); ClusterEvent event=new ClusterEvent(cluster,node,ClusterEvent.ADD_NODE);

View File

@ -13,11 +13,13 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ *
**/
package org.activecluster.impl; package org.activecluster.impl;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.activecluster.DestinationMarshaller;
import org.activecluster.Node; import org.activecluster.Node;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -38,10 +40,12 @@ public class StateServiceStub implements StateService {
private Session session; private Session session;
private MessageProducer producer; private MessageProducer producer;
private DestinationMarshaller marshaller;
public StateServiceStub(Session session, MessageProducer producer) { public StateServiceStub(Session session, MessageProducer producer,DestinationMarshaller marshaller) {
this.session = session; this.session = session;
this.producer = producer; this.producer = producer;
this.marshaller = marshaller;
} }
public void keepAlive(Node node) { public void keepAlive(Node node) {
@ -50,7 +54,7 @@ public class StateServiceStub implements StateService {
log.debug("Sending cluster data message: " + node); log.debug("Sending cluster data message: " + node);
} }
Message message = session.createObjectMessage(new NodeImpl(node)); Message message = session.createObjectMessage(new NodeState(node,marshaller));
producer.send(message); producer.send(message);
} }
catch (JMSException e) { catch (JMSException e) {
@ -64,7 +68,7 @@ public class StateServiceStub implements StateService {
log.debug("Sending shutdown message: " + node); log.debug("Sending shutdown message: " + node);
} }
Message message = session.createObjectMessage(new NodeImpl(node)); Message message = session.createObjectMessage(new NodeState(node,marshaller));
message.setJMSType("shutdown"); message.setJMSType("shutdown");
producer.send(message); producer.send(message);
} }

View File

@ -2,17 +2,18 @@
* *
* Copyright 2004 The Apache Software Foundation * Copyright 2004 The Apache Software Foundation
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* See the License for the specific language governing permissions and implied.
* limitations under the License. * See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.activecluster; package org.activecluster;
@ -150,7 +151,7 @@ public class ClusterFunctionTest extends TestCase {
try { try {
System.out.println("request received"); System.out.println("request received");
ObjectMessage om = cluster.createObjectMessage(); ObjectMessage om = cluster.createObjectMessage();
om.setJMSReplyTo(cluster.createDestination(cluster.getLocalNode().getDestination())); om.setJMSReplyTo(cluster.getLocalNode().getDestination());
om.setObject(new Response()); om.setObject(new Response());
System.out.println("sending response"); System.out.println("sending response");
cluster.send(om2.getJMSReplyTo(), om); cluster.send(om2.getJMSReplyTo(), om);
@ -192,7 +193,7 @@ public class ClusterFunctionTest extends TestCase {
// 1->1 messages // 1->1 messages
_cluster1.createConsumer(_cluster1.getLocalNode().getDestination()).setMessageListener(listener1); _cluster1.createConsumer(_cluster1.getLocalNode().getDestination()).setMessageListener(listener1);
ObjectMessage om = _cluster0.createObjectMessage(); ObjectMessage om = _cluster0.createObjectMessage();
om.setJMSReplyTo(_cluster0.createDestination(_cluster0.getLocalNode().getDestination())); om.setJMSReplyTo(_cluster0.getLocalNode().getDestination());
om.setObject(new Request()); om.setObject(new Request());
testResponsePassed = false; testResponsePassed = false;
_cluster0.send(_cluster0.getLocalNode().getDestination(), om); _cluster0.send(_cluster0.getLocalNode().getDestination(), om);

View File

@ -13,11 +13,13 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ *
**/
package org.activecluster; package org.activecluster;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import javax.jms.Destination;
import javax.jms.Message; import javax.jms.Message;
/** /**
@ -27,14 +29,14 @@ public class ClusterTest extends ClusterTestSupport {
protected int count = 2; protected int count = 2;
public void xtestCluster() throws Exception { public void testCluster() throws Exception {
cluster = createCluster(); cluster = createCluster();
subscribeToCluster(); subscribeToCluster();
cluster.start(); cluster.start();
String destination = cluster.getDestination(); Destination destination = cluster.getDestination();
Message message = cluster.createTextMessage("abcdef"); Message message = cluster.createTextMessage("abcdef");
cluster.send(destination, message); cluster.send(destination, message);

View File

@ -13,7 +13,8 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ *
**/
package org.activecluster; package org.activecluster;
import org.activecluster.impl.ActiveMQClusterFactory; import org.activecluster.impl.ActiveMQClusterFactory;
@ -49,13 +50,13 @@ public abstract class ClusterTestSupport extends TestSupport {
protected void subscribeToCluster() throws Exception { protected void subscribeToCluster() throws Exception {
// listen to cluster messages // listen to cluster messages
String clusterDestination = cluster.getDestination(); Destination clusterDestination = cluster.getDestination();
assertTrue("Local destination must not be null", clusterDestination != null); assertTrue("Local destination must not be null", clusterDestination != null);
clusterConsumer = cluster.createConsumer(clusterDestination); clusterConsumer = cluster.createConsumer(clusterDestination);
clusterConsumer.setMessageListener(clusterListener); clusterConsumer.setMessageListener(clusterListener);
// listen to inbox messages (individual messages) // listen to inbox messages (individual messages)
String localDestination = cluster.getLocalNode().getDestination(); Destination localDestination = cluster.getLocalNode().getDestination();
assertTrue("Local destination must not be null", localDestination != null); assertTrue("Local destination must not be null", localDestination != null);
System.out.println("Consuming from local destination: " + localDestination); System.out.println("Consuming from local destination: " + localDestination);

View File

@ -13,7 +13,8 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ *
**/
package org.activecluster; package org.activecluster;
import junit.framework.TestCase; import junit.framework.TestCase;
@ -51,7 +52,7 @@ public class TestSupport extends TestCase {
} }
protected Cluster createCluster(String name) throws JMSException, ClusterException { protected Cluster createCluster(String name) throws JMSException, ClusterException {
Cluster cluster = createCluster(); ClusterFactory factory = new ActiveMQClusterFactory();
return cluster; return factory.createCluster(name,"ORG.CODEHAUS.ACTIVEMQ.TEST.CLUSTER");
} }
} }

View File

@ -13,7 +13,8 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ *
**/
package org.activecluster.group; package org.activecluster.group;
import java.util.HashMap; import java.util.HashMap;
@ -22,7 +23,9 @@ import junit.framework.TestCase;
import org.activecluster.Cluster; import org.activecluster.Cluster;
import org.activecluster.ClusterEvent; import org.activecluster.ClusterEvent;
import org.activecluster.ClusterListener; import org.activecluster.ClusterListener;
import org.activecluster.DestinationMarshaller;
import org.activecluster.Node; import org.activecluster.Node;
import org.activecluster.impl.DefaultDestinationMarshaller;
import org.activecluster.impl.NodeImpl; import org.activecluster.impl.NodeImpl;
/** /**
@ -36,6 +39,7 @@ public abstract class GroupTestSupport extends TestCase {
private ClusterListener listener; private ClusterListener listener;
private Cluster cluster; private Cluster cluster;
private Map nodes = new HashMap(); private Map nodes = new HashMap();
private DestinationMarshaller marshaller = new DefaultDestinationMarshaller();
protected void addNodes(String[] nodeNames) { protected void addNodes(String[] nodeNames) {
for (int i = 0; i < nodeNames.length; i++) { for (int i = 0; i < nodeNames.length; i++) {
@ -45,7 +49,8 @@ public abstract class GroupTestSupport extends TestCase {
} }
protected void addNode(String nodeName) { protected void addNode(String nodeName) {
Node node = new NodeImpl(nodeName,nodeName);
Node node = new NodeImpl(nodeName,marshaller.getDestination(nodeName));
nodes.put(nodeName, node); nodes.put(nodeName, node);
listener.onNodeAdd(new ClusterEvent(cluster, node, ClusterEvent.ADD_NODE)); listener.onNodeAdd(new ClusterEvent(cluster, node, ClusterEvent.ADD_NODE));
} }