From a717c86c2f065cf18a4dd2c886963f3be1cb894e Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Mon, 23 Oct 2006 16:49:38 +0000 Subject: [PATCH] this module was moved to https://svn.apache.org/repos/asf/incubator/activemq/activecluster/ git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/branches/activemq-4.0@467022 13f79535-47bb-0310-9956-ffa450edef68 --- activecluster/README.txt | 23 -- activecluster/maven.xml | 80 ----- activecluster/pom.xml | 66 ---- activecluster/project.properties | 57 --- activecluster/project.xml | 165 --------- .../org/apache/activecluster/Cluster.java | 228 ------------ .../apache/activecluster/ClusterEvent.java | 150 -------- .../activecluster/ClusterException.java | 28 -- .../apache/activecluster/ClusterFactory.java | 94 ----- .../apache/activecluster/ClusterListener.java | 64 ---- .../activecluster/DestinationMarshaller.java | 46 --- .../org/apache/activecluster/LocalNode.java | 38 -- .../java/org/apache/activecluster/Node.java | 59 ---- .../org/apache/activecluster/Service.java | 42 --- .../election/ElectionStrategy.java | 40 --- .../election/impl/BullyElectionStrategy.java | 58 --- .../activecluster/group/BuddyGroupModel.java | 46 --- .../org/apache/activecluster/group/Group.java | 114 ------ .../group/GroupClusterListener.java | 60 ---- .../activecluster/group/GroupModel.java | 330 ------------------ .../activecluster/group/MasterZoneFilter.java | 41 --- .../activecluster/group/Membership.java | 64 ---- .../activecluster/group/NodeFilter.java | 34 -- .../activecluster/group/NodeMemberships.java | 77 ---- .../apache/activecluster/group/package.html | 26 -- .../impl/ActiveMQClusterFactory.java | 61 ---- .../activecluster/impl/DefaultCluster.java | 233 ------------- .../impl/DefaultClusterFactory.java | 196 ----------- .../impl/DefaultDestinationMarshaller.java | 100 ------ .../apache/activecluster/impl/NodeImpl.java | 150 -------- .../apache/activecluster/impl/NodeState.java | 153 -------- .../impl/NonReplicatedLocalNode.java | 58 --- .../impl/ReplicatedLocalNode.java | 84 ----- .../impl/SimpleDestinationMarshaller.java | 76 ---- .../activecluster/impl/StateConsumer.java | 78 ----- .../activecluster/impl/StateService.java | 42 --- .../activecluster/impl/StateServiceImpl.java | 302 ---------------- .../activecluster/impl/StateServiceStub.java | 80 ----- .../apache/activecluster/impl/package.html | 27 -- .../org/apache/activecluster/package.html | 31 -- .../org/apache/activecluster/ChatDemo.java | 146 -------- .../org/apache/activecluster/ClusterDemo.java | 112 ------ .../activecluster/ClusterFunctionTest.java | 209 ----------- .../org/apache/activecluster/ClusterTest.java | 122 ------- .../activecluster/ClusterTestSupport.java | 78 ----- .../activecluster/StubMessageListener.java | 77 ---- .../org/apache/activecluster/TestSupport.java | 63 ---- .../activecluster/TestingClusterListener.java | 58 --- .../group/BuddyGroupModelTest.java | 75 ---- .../activecluster/group/GroupModelTest.java | 62 ---- .../activecluster/group/GroupTestSupport.java | 84 ----- 51 files changed, 4787 deletions(-) delete mode 100644 activecluster/README.txt delete mode 100644 activecluster/maven.xml delete mode 100644 activecluster/pom.xml delete mode 100644 activecluster/project.properties delete mode 100644 activecluster/project.xml delete mode 100644 activecluster/src/java/org/apache/activecluster/Cluster.java delete mode 100644 activecluster/src/java/org/apache/activecluster/ClusterEvent.java delete mode 100644 activecluster/src/java/org/apache/activecluster/ClusterException.java delete mode 100644 activecluster/src/java/org/apache/activecluster/ClusterFactory.java delete mode 100644 activecluster/src/java/org/apache/activecluster/ClusterListener.java delete mode 100644 activecluster/src/java/org/apache/activecluster/DestinationMarshaller.java delete mode 100644 activecluster/src/java/org/apache/activecluster/LocalNode.java delete mode 100644 activecluster/src/java/org/apache/activecluster/Node.java delete mode 100644 activecluster/src/java/org/apache/activecluster/Service.java delete mode 100644 activecluster/src/java/org/apache/activecluster/election/ElectionStrategy.java delete mode 100644 activecluster/src/java/org/apache/activecluster/election/impl/BullyElectionStrategy.java delete mode 100644 activecluster/src/java/org/apache/activecluster/group/BuddyGroupModel.java delete mode 100644 activecluster/src/java/org/apache/activecluster/group/Group.java delete mode 100644 activecluster/src/java/org/apache/activecluster/group/GroupClusterListener.java delete mode 100644 activecluster/src/java/org/apache/activecluster/group/GroupModel.java delete mode 100644 activecluster/src/java/org/apache/activecluster/group/MasterZoneFilter.java delete mode 100644 activecluster/src/java/org/apache/activecluster/group/Membership.java delete mode 100644 activecluster/src/java/org/apache/activecluster/group/NodeFilter.java delete mode 100644 activecluster/src/java/org/apache/activecluster/group/NodeMemberships.java delete mode 100644 activecluster/src/java/org/apache/activecluster/group/package.html delete mode 100644 activecluster/src/java/org/apache/activecluster/impl/ActiveMQClusterFactory.java delete mode 100644 activecluster/src/java/org/apache/activecluster/impl/DefaultCluster.java delete mode 100644 activecluster/src/java/org/apache/activecluster/impl/DefaultClusterFactory.java delete mode 100644 activecluster/src/java/org/apache/activecluster/impl/DefaultDestinationMarshaller.java delete mode 100644 activecluster/src/java/org/apache/activecluster/impl/NodeImpl.java delete mode 100644 activecluster/src/java/org/apache/activecluster/impl/NodeState.java delete mode 100644 activecluster/src/java/org/apache/activecluster/impl/NonReplicatedLocalNode.java delete mode 100644 activecluster/src/java/org/apache/activecluster/impl/ReplicatedLocalNode.java delete mode 100644 activecluster/src/java/org/apache/activecluster/impl/SimpleDestinationMarshaller.java delete mode 100644 activecluster/src/java/org/apache/activecluster/impl/StateConsumer.java delete mode 100644 activecluster/src/java/org/apache/activecluster/impl/StateService.java delete mode 100644 activecluster/src/java/org/apache/activecluster/impl/StateServiceImpl.java delete mode 100644 activecluster/src/java/org/apache/activecluster/impl/StateServiceStub.java delete mode 100644 activecluster/src/java/org/apache/activecluster/impl/package.html delete mode 100644 activecluster/src/java/org/apache/activecluster/package.html delete mode 100644 activecluster/src/test/org/apache/activecluster/ChatDemo.java delete mode 100644 activecluster/src/test/org/apache/activecluster/ClusterDemo.java delete mode 100644 activecluster/src/test/org/apache/activecluster/ClusterFunctionTest.java delete mode 100644 activecluster/src/test/org/apache/activecluster/ClusterTest.java delete mode 100644 activecluster/src/test/org/apache/activecluster/ClusterTestSupport.java delete mode 100644 activecluster/src/test/org/apache/activecluster/StubMessageListener.java delete mode 100644 activecluster/src/test/org/apache/activecluster/TestSupport.java delete mode 100644 activecluster/src/test/org/apache/activecluster/TestingClusterListener.java delete mode 100644 activecluster/src/test/org/apache/activecluster/group/BuddyGroupModelTest.java delete mode 100644 activecluster/src/test/org/apache/activecluster/group/GroupModelTest.java delete mode 100644 activecluster/src/test/org/apache/activecluster/group/GroupTestSupport.java diff --git a/activecluster/README.txt b/activecluster/README.txt deleted file mode 100644 index bcec715f5b..0000000000 --- a/activecluster/README.txt +++ /dev/null @@ -1,23 +0,0 @@ -Welcome to ActiveCluster! -========================= - -ActiveCluster is an Apache 2.0 licenced cluster communication toolkit. - -To help you get started, try surfing the following links... - -Building -http://activecluster.codehaus.org/Building - -Examples -http://activecluster.codehaus.org/Examples - -Refer to the website for details of finding the issue tracker, email lists, wiki or IRC channel -http://activecluster.codehaus.org/ - -Please help us make ActiveCluster better - we appreciate any feedback you may have. - -Enjoy! - - ----------------------- -The ActiveCluster team diff --git a/activecluster/maven.xml b/activecluster/maven.xml deleted file mode 100644 index b670b0ff39..0000000000 --- a/activecluster/maven.xml +++ /dev/null @@ -1,80 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - Running the ActiveCluster demo... - - - - - - - - - Running the ActiveCluster demo... - - - - - - - - - - diff --git a/activecluster/pom.xml b/activecluster/pom.xml deleted file mode 100644 index 3516d3be59..0000000000 --- a/activecluster/pom.xml +++ /dev/null @@ -1,66 +0,0 @@ - - - - - 4.0.0 - - incubator-activemq - activemq-parent - 4.0.2-SNAPSHOT - - activecluster - ActiveCluster - - - - ${pom.groupId} - activemq-core - - - org.apache.derby - derby - - - - junit - junit - test - - - - - src/java - src/test - - - org.apache.maven.plugins - maven-surefire-plugin - - - **/Testing*.java - **/TestSupport.java - **/ClusterTest.java - **/ClusterFunctionTest.java - - - - - - - diff --git a/activecluster/project.properties b/activecluster/project.properties deleted file mode 100644 index 7113d502a6..0000000000 --- a/activecluster/project.properties +++ /dev/null @@ -1,57 +0,0 @@ -## --------------------------------------------------------------------------- -## Licensed to the Apache Software Foundation (ASF) under one or more -## contributor license agreements. See the NOTICE file distributed with -## this work for additional information regarding copyright ownership. -## The ASF licenses this file to You under the Apache License, Version 2.0 -## (the "License"); you may not use this file except in compliance with -## the License. You may obtain a copy of the License at -## -## http://www.apache.org/licenses/LICENSE-2.0 -## -## Unless required by applicable law or agreed to in writing, software -## distributed under the License is distributed on an "AS IS" BASIS, -## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -## See the License for the specific language governing permissions and -## limitations under the License. -## --------------------------------------------------------------------------- - -maven.repo.remote=http://www.ibiblio.org/maven,http://dist.codehaus.org,http://people.apache.org/repository - -maven.compile.source=1.4 -maven.compile.target=1.4 -maven.test.source=1.4 -maven.compile.deprecation=true -maven.compile.debug=true -maven.compile.optimize=true - -maven.javadoc.links=http://java.sun.com/j2se/1.4.1/docs/api/,http://java.sun.com/j2ee/1.4/docs/api/ -maven.javadoc.source=1.4 - -maven.junit.fork = true - -# use Sun coding standards - -checkstyle.lcurly.type = eol -checkstyle.lcurly.method = eol -checkstyle.lcurly.other = eol -checkstyle.header.ignore.line = 1,2,3,4,5,6 -checkstyle.const.pattern = ^[a-z][a-zA-Z0-9]*$ - -# disable these non-critical errors to highlight -# more important ones line missing javadoc - -checkstyle.maxlinelen = 132 -checkstyle.ignore.whitespace = true -#maven.checkstyle.ignore.public.in.interface = true -checkstyle.paren.pad = ignore - -##################################################### -# codehaus theme -##################################################### -maven.xdoc.theme.url=http://codehaus.org/codehaus-style.css - -maven.repo.central = dist.codehaus.org -maven.repo.central.directory = /dist - -maven.remote.group = activecluster - diff --git a/activecluster/project.xml b/activecluster/project.xml deleted file mode 100644 index 3dace578b5..0000000000 --- a/activecluster/project.xml +++ /dev/null @@ -1,165 +0,0 @@ - - - - - 3 - ${basedir}/../etc/project.xml - - ActiveCluster - activecluster - - org.apache.activecluster - - - Core ActiveCluster API - org.apache.activecluster - - - Group organisation protocols - org.apache.activecluster.group - - - Election protocols - org.apache.activecluster.election - - - ActiveMQ specific implementation classes - org.apache.activecluster.activemq - - - Implementation classes - org.apache.activecluster.impl:org.apache.activecluster.election.impl - - - - ActiveCluster is a framework for building cluster-aware software - activecluster - - - ActiveCluster is a framework for building cluster-aware software - - - http://activecluster.codehaus.org/ - http://jira.codehaus.org/browse/ACL - - - - commons-logging - commons-logging - 1.0.4 - http://jakarta.apache.org/commons/logging/ - - - - org.apache.geronimo.specs - geronimo-jms_1.1_spec - 1.0 - - - - org.apache.geronimo.specs - geronimo-jta_1.0.1B_spec - 1.0 - - - - org.apache.geronimo.specs - geronimo-j2ee-management_1.0_spec - 1.0 - - - - - ${pom.groupId} - activemq-core - ${pom.currentVersion} - - - - ${pom.groupId} - activeio-core - ${activeio_version} - - - - - junit - 3.8.1 - - - - - backport-util-concurrent - 2.0_01_pd - - - - - sysunit - 1.0-beta-13 - - - - org.apache.derby - derby - ${derby_version} - - - - - - - scm@activecluster.codehaus.org - src/java - src/test - - - - - - - - src/java - - **/*.properties - - - - - **/*Test.* - - - - **/ClusterTest.* - **/ClusterFunctionTest.* - - - - - - - - diff --git a/activecluster/src/java/org/apache/activecluster/Cluster.java b/activecluster/src/java/org/apache/activecluster/Cluster.java deleted file mode 100644 index 85d4103821..0000000000 --- a/activecluster/src/java/org/apache/activecluster/Cluster.java +++ /dev/null @@ -1,228 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ - -package org.apache.activecluster; - -import java.io.Serializable; -import java.util.Map; -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.ObjectMessage; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; - -import org.apache.activecluster.election.ElectionStrategy; - -/** - * Represents a logical connection to a cluster. From this object you can - * obtain the destination to send messages to, view the members of the cluster, - * watch cluster events (nodes joining, leaving, updating their state) as well - * as viewing each members state. - *

- * You may also update the local node's state. - * - * @version $Revision: 1.5 $ - */ -public interface Cluster extends Service { - - /** - * Returns the destination used to send a message to all members of the cluster - * - * @return the destination to send messages to all members of the cluster - */ - public Destination getDestination(); - - /** - * A snapshot of the nodes in the cluster indexed by the Destination - * @return a Map containing all the nodes in the cluster, where key=node destination,value=node - */ - public Map getNodes(); - - /** - * Adds a new listener to cluster events - * - * @param listener - */ - public void addClusterListener(ClusterListener listener); - - /** - * Removes a listener to cluster events - * - * @param listener - */ - public void removeClusterListener(ClusterListener listener); - - /** - * The local Node which allows you to mutate the state or subscribe to the - * nodes temporary queue for inbound messages direct to the Node - * @return the Node representing this peer in the cluster - */ - public LocalNode getLocalNode(); - - /** - * Allows overriding of the default election strategy with a custom - * implementation. - * @param strategy - */ - public void setElectionStrategy(ElectionStrategy strategy); - - - // Messaging helper methods - //------------------------------------------------------------------------- - - /** - * Sends a message to a destination, which could be to the entire group - * or could be a single Node's destination - * - * @param destination is either the group topic or a node's destination - * @param message the message to be sent - * @throws JMSException - */ - public void send(Destination destination, Message message) throws JMSException; - - - /** - * Creates a consumer of all the messags sent to the given destination, - * including messages sent via the send() messages - * - * @param destination - * @return a newly created message consumer - * @throws JMSException - */ - public MessageConsumer createConsumer(Destination destination) throws JMSException; - - /** - * Creates a consumer of all message sent to the given destination, - * including messages sent via the send() message with an optional SQL 92 based selector to filter - * messages - * - * @param destination - * @param selector - * @return a newly created message consumer - * @throws JMSException - */ - public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException; - - /** - * Creates a consumer of all message sent to the given destination, - * including messages sent via the send() message with an optional SQL 92 based selector to filter - * messages along with optionally ignoring local traffic - messages sent via the send() - * method on this object. - * - * @param destination the destination to consume from - * @param selector an optional SQL 92 filter of messages which could be null - * @param noLocal which if true messages sent via send() on this object will not be delivered to the consumer - * @return a newly created message consumer - * @throws JMSException - */ - public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException; - - - // Message factory methods - //------------------------------------------------------------------------- - - /** - * Creates a new message without a body - * @return the create Message - * - * @throws JMSException - */ - public Message createMessage() throws JMSException; - - /** - * Creates a new bytes message - * @return the create BytesMessage - * - * @throws JMSException - */ - public BytesMessage createBytesMessage() throws JMSException; - - /** - * Creates a new {@link MapMessage} - * @return the created MapMessage - * - * @throws JMSException - */ - public MapMessage createMapMessage() throws JMSException; - - /** - * Creates a new {@link ObjectMessage} - * @return the created ObjectMessage - * - * @throws JMSException - */ - public ObjectMessage createObjectMessage() throws JMSException; - - /** - * Creates a new {@link ObjectMessage} - * - * @param object - * @return the createdObjectMessage - * @throws JMSException - */ - public ObjectMessage createObjectMessage(Serializable object) throws JMSException; - - /** - * Creates a new {@link StreamMessage} - * @return the create StreamMessage - * - * @throws JMSException - */ - public StreamMessage createStreamMessage() throws JMSException; - - /** - * Creates a new {@link TextMessage} - * @return the create TextMessage - * - * @throws JMSException - */ - public TextMessage createTextMessage() throws JMSException; - - /** - * Creates a new {@link TextMessage} - * - * @param text - * @return the create TextMessage - * @throws JMSException - */ - public TextMessage createTextMessage(String text) throws JMSException; - - /** - * Create a named Destination - * @param name - * @return the Destinatiion - * @throws JMSException - */ - public Destination createDestination(String name) throws JMSException; - - /** - * wait until a the cardimality of the cluster is reaches the expected count. This method will return false if the - * cluster isn't started or stopped while waiting - * - * @param expectedCount the number of expected members of a cluster - * @param timeout timeout in milliseconds - * @return true if the cluster is fully connected - * @throws InterruptedException - */ - boolean waitForClusterToComplete(int expectedCount, long timeout) throws InterruptedException; -} diff --git a/activecluster/src/java/org/apache/activecluster/ClusterEvent.java b/activecluster/src/java/org/apache/activecluster/ClusterEvent.java deleted file mode 100644 index 6aa05a1d66..0000000000 --- a/activecluster/src/java/org/apache/activecluster/ClusterEvent.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activecluster; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; - -/** - * A cluster event - * - * @version $Revision: 1.3 $ - */ -public class ClusterEvent implements Externalizable { - - private static final long serialVersionUID=-4103732679231950873L; - /** - * A node has joined the cluster - */ - public static final int ADD_NODE = 1; - /** - * existing node has updated it's state - */ - public static final int UPDATE_NODE = 2; - /** - * A node has left the Cluster - */ - public static final int REMOVE_NODE = 3; - /** - * A node has failed due to a system/network error - */ - public static final int FAILED_NODE = 4; - - /** - * this node has been elected Coordinator - */ - public static final int ELECTED_COORDINATOR = 5; - - private transient Cluster cluster; - private Node node; - private int type; - - /** - * empty constructor - */ - public ClusterEvent() { - } - - /** - * @param source - * @param node - * @param type - */ - public ClusterEvent(Cluster source, Node node, int type) { - this.cluster = source; - this.node = node; - this.type = type; - } - - /** - * @return the Cluster - */ - public Cluster getCluster() { - return cluster; - } - - /** - * set the cluster - * @param source - */ - public void setCluster(Cluster source){ - this.cluster = source; - } - /** - * @return the node - */ - public Node getNode() { - return node; - } - - /** - * @return the type of event - */ - public int getType() { - return type; - } - - /** - * @return pretty type - */ - public String toString() { - return "ClusterEvent[" + getTypeAsString() + " : " + node + "]"; - } - - /** - * dump on to a stream - * - * @param out - * @throws IOException - */ - public void writeExternal(ObjectOutput out) throws IOException { - out.writeByte(type); - out.writeObject(node); - } - - /** - * read from stream - * - * @param in - * @throws IOException - * @throws ClassNotFoundException - */ - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - type = in.readByte(); - node = (Node) in.readObject(); - } - - private String getTypeAsString() { - String result = "unknown type"; - if (type == ADD_NODE) { - result = "ADD_NODE"; - } - else if (type == REMOVE_NODE) { - result = "REMOVE_NODE"; - } - else if (type == UPDATE_NODE) { - result = "UPDATE_NODE"; - } - else if (type == FAILED_NODE) { - result = "FAILED_NODE"; - } - return result; - } -} \ No newline at end of file diff --git a/activecluster/src/java/org/apache/activecluster/ClusterException.java b/activecluster/src/java/org/apache/activecluster/ClusterException.java deleted file mode 100644 index 05f59b3df0..0000000000 --- a/activecluster/src/java/org/apache/activecluster/ClusterException.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activecluster; - - -/** - * Represents a Cluster related exception - * - * @version $Revision: 1.2 $ - */ -public class ClusterException extends Exception { -} diff --git a/activecluster/src/java/org/apache/activecluster/ClusterFactory.java b/activecluster/src/java/org/apache/activecluster/ClusterFactory.java deleted file mode 100644 index 947a336f1f..0000000000 --- a/activecluster/src/java/org/apache/activecluster/ClusterFactory.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ - -package org.apache.activecluster; - -import javax.jms.Destination; -import javax.jms.JMSException; - - -/** - * A Factory of Cluster instances - * - * @version $Revision: 1.3 $ - */ -public interface ClusterFactory { - - /** - * Creates a new cluster connection using the given local name and destination name - * @param localName - * @param destinationName - * - * @return Cluster - * @throws JMSException - */ - public Cluster createCluster(String localName,String destinationName) throws JMSException; - - /** - * Creates a new cluster connection using the given local name and destination name - * @param localName - * @param destinationName - * @param marshaller - * - * @return Cluster - * @throws JMSException - */ - public Cluster createCluster(String localName,String destinationName,DestinationMarshaller marshaller) throws JMSException; - - - - /** - * Creates a new cluster connection - generating the localName automatically - * @param destinationName - * @return the Cluster - * @throws JMSException - */ - public Cluster createCluster(String destinationName) throws JMSException; - - /** - * Creates a new cluster connection using the given local name and destination name - * @param localName - * @param destination - * - * @return Cluster - * @throws JMSException - */ - public Cluster createCluster(String localName,Destination destination) throws JMSException; - - /** - * Creates a new cluster connection using the given local name and destination name - * @param localName - * @param destination - * @param marshaller - * - * @return Cluster - * @throws JMSException - */ - public Cluster createCluster(String localName,Destination destination, DestinationMarshaller marshaller) throws JMSException; - - - - /** - * Creates a new cluster connection - generating the localName automatically - * @param destination - * @return the Cluster - * @throws JMSException - */ - public Cluster createCluster(Destination destination) throws JMSException; -} diff --git a/activecluster/src/java/org/apache/activecluster/ClusterListener.java b/activecluster/src/java/org/apache/activecluster/ClusterListener.java deleted file mode 100644 index 67cc9169bc..0000000000 --- a/activecluster/src/java/org/apache/activecluster/ClusterListener.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activecluster; - -import java.util.EventListener; - - -/** - * Listener to events occuring on the cluster - * - * @version $Revision: 1.2 $ - */ -public interface ClusterListener extends EventListener { - - /** - * A new node has been added - * - * @param event - */ - public void onNodeAdd(ClusterEvent event); - - /** - * A node has updated its state - * - * @param event - */ - public void onNodeUpdate(ClusterEvent event); - - /** - * A node has been removed (a clean shutdown) - * - * @param event - */ - public void onNodeRemoved(ClusterEvent event); - - /** - * A node has failed due to process or network failure - * - * @param event - */ - public void onNodeFailed(ClusterEvent event); - - /** - * An election has occurred and a new coordinator has been selected - * @param event - */ - public void onCoordinatorChanged(ClusterEvent event); -} diff --git a/activecluster/src/java/org/apache/activecluster/DestinationMarshaller.java b/activecluster/src/java/org/apache/activecluster/DestinationMarshaller.java deleted file mode 100644 index e1c6ccf70d..0000000000 --- a/activecluster/src/java/org/apache/activecluster/DestinationMarshaller.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ - -package org.apache.activecluster; - -import javax.jms.Destination; -import javax.jms.JMSException; - -/** - * A simple marshaller for Destinations - * - * @version $Revision: 1.5 $ - */ -public interface DestinationMarshaller { - - /** - * Builds a destination from a destinationName - * @param destinationName - * - * @return the destination to send messages to all members of the cluster - */ - public Destination getDestination(String destinationName) throws JMSException; - - /** - * Gets a destination's physical name - * @param destination - * @return the destination's physical name - */ - public String getDestinationName(Destination destination); -} diff --git a/activecluster/src/java/org/apache/activecluster/LocalNode.java b/activecluster/src/java/org/apache/activecluster/LocalNode.java deleted file mode 100644 index a8d04a106d..0000000000 --- a/activecluster/src/java/org/apache/activecluster/LocalNode.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster; - -import javax.jms.JMSException; -import java.util.Map; - -/** - * Represents the local (in process) node - * - * @version $Revision: 1.2 $ - */ -public interface LocalNode extends Node { - - /** - * Allows the local state to be modified, which will - * be replicated asynchronously around the cluster - * @param state - * @throws JMSException - */ - public void setState(Map state) throws JMSException; - -} diff --git a/activecluster/src/java/org/apache/activecluster/Node.java b/activecluster/src/java/org/apache/activecluster/Node.java deleted file mode 100644 index 4c93e1134a..0000000000 --- a/activecluster/src/java/org/apache/activecluster/Node.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ -package org.apache.activecluster; - -import java.util.Map; -import javax.jms.Destination; - - -/** - * Represents a node member in a cluster - * - * @version $Revision: 1.3 $ - */ -public interface Node { - - /** - * Access to the queue to send messages direct to this node. - * - * @return the destination to send messages to this node while its available - */ - public Destination getDestination(); - - /** - * @return an immutable map of the nodes state - */ - public Map getState(); - - /** - * @return the name of the node - */ - public String getName(); - - /** - * @return true if this node has been elected as coordinator - */ - public boolean isCoordinator(); - - /** - * Returns the Zone of this node - typically the DMZ zone or the subnet on which the - * node is on - */ - public Object getZone(); -} diff --git a/activecluster/src/java/org/apache/activecluster/Service.java b/activecluster/src/java/org/apache/activecluster/Service.java deleted file mode 100644 index 51af48e1ae..0000000000 --- a/activecluster/src/java/org/apache/activecluster/Service.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activecluster; - -import javax.jms.JMSException; - -/** - *

Service represents some service of some kind with a simple start/stop lifecycle.

- * - * @version $Revision: 1.2 $ - */ -public interface Service { - - /** - * Called to start the service - * @throws JMSException - */ - public void start() throws JMSException; - - /** - * Called to shutdown the service - * @throws JMSException - */ - public void stop() throws JMSException; - -} diff --git a/activecluster/src/java/org/apache/activecluster/election/ElectionStrategy.java b/activecluster/src/java/org/apache/activecluster/election/ElectionStrategy.java deleted file mode 100644 index c148a20272..0000000000 --- a/activecluster/src/java/org/apache/activecluster/election/ElectionStrategy.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster.election; - -import javax.jms.JMSException; - -import org.apache.activecluster.Cluster; -import org.apache.activecluster.Node; - -/** - *

Service Used by the Cluster to elect a coordinator.

- * - * @version $Revision: 1.2 $ - */ -public interface ElectionStrategy { - - /** - * Elect a coordinator. - * @param cluster - * @return the elected Node - * @throws JMSException - */ - public Node doElection(Cluster cluster) throws JMSException; - -} diff --git a/activecluster/src/java/org/apache/activecluster/election/impl/BullyElectionStrategy.java b/activecluster/src/java/org/apache/activecluster/election/impl/BullyElectionStrategy.java deleted file mode 100644 index f6a7d17d8c..0000000000 --- a/activecluster/src/java/org/apache/activecluster/election/impl/BullyElectionStrategy.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activecluster.election.impl; - -import org.apache.activecluster.Cluster; -import org.apache.activecluster.Node; -import org.apache.activecluster.election.ElectionStrategy; - -import javax.jms.JMSException; -import java.util.Iterator; -import java.util.Map; - -/** - *

BullyElectionStrategy Use a simple bully algorithm to elect a coordinator. - * the member with the lowest lexicographical name is choosen

- * - * @version $Revision: 1.2 $ - */ -public class BullyElectionStrategy implements ElectionStrategy { - - /** - * Elect a coordinator. - * - * @param cluster - * @return the elected Node - * @throws JMSException - */ - public Node doElection(Cluster cluster) throws JMSException { - Node elect = cluster.getLocalNode(); - - Map nodes = cluster.getNodes(); - for (Iterator i = nodes.values().iterator(); i.hasNext();) { - Node node = (Node) i.next(); - if (elect.getName().compareTo(node.getName()) < 0) { - elect = node; - } - } - - return elect; - } - -} diff --git a/activecluster/src/java/org/apache/activecluster/group/BuddyGroupModel.java b/activecluster/src/java/org/apache/activecluster/group/BuddyGroupModel.java deleted file mode 100644 index 938a13e5c0..0000000000 --- a/activecluster/src/java/org/apache/activecluster/group/BuddyGroupModel.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster.group; - -import org.apache.activecluster.Node; - -/** - * A kind of {@link GroupModel} in which every {@link Node} has its - * own {@link Group} and other nodes in the cluster act as buddies (slaves) - * - * @version $Revision: 1.2 $ - */ -public class BuddyGroupModel extends GroupModel { - - public synchronized void addNode(Node node) { - Group group = makeNewGroup(node); - if (group == null) { - if (!addToExistingGroup(node)) { - addToUnusedNodes(node); - } - } - else { - // now lets try choose some existing nodes to add as buddy's - tryToFillGroupWithBuddies(group); - - // now that the group may well be filled, add it to the collections - addGroup(group); - } - } - -} diff --git a/activecluster/src/java/org/apache/activecluster/group/Group.java b/activecluster/src/java/org/apache/activecluster/group/Group.java deleted file mode 100644 index 8a1fcf118e..0000000000 --- a/activecluster/src/java/org/apache/activecluster/group/Group.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster.group; - -import org.apache.activecluster.Node; - -import java.util.ArrayList; -import java.util.List; - -/** - * Represents a logical group of nodes in a cluster, - * such as a Master and a number of Slaves which operate as a - * logical unit. - *

- * A cluster can be divided into a single group, or many groups - * depending on the policy required. - *

- * The number of groups could be application defined; created on demand - * or there could even be one group for each node, with other nodes acting - * as buddy nodes in each nodes' group (i.e. each node is a master with N - * buddies/slaves) - * - * @version $Revision: 1.2 $ - */ -public class Group { - private int minimumMemberCount; - private int maximumMemberCount; - private List members = new ArrayList(); - private int memberCount; - - public Group() { - } - - public Group(int minimumMemberCount, int maximumMemberCount) { - this.minimumMemberCount = minimumMemberCount; - this.maximumMemberCount = maximumMemberCount; - } - - public synchronized List getMembers() { - return new ArrayList(members); - } - - /** - * Adds a node to the given group - * - * @return the index of the node in the group (0 = master, 1..N = slave) - */ - public synchronized int addMember(Node node) { - int index = members.indexOf(node); - if (index >= 0) { - return index; - } - members.add(node); - return memberCount++; - } - - public synchronized boolean removeMember(Node node) { - boolean answer = members.remove(node); - if (answer) { - memberCount--; - } - return answer; - } - - - /** - * Returns true if the group is usable, that it has enough members to be used. - */ - public boolean isUsable() { - return memberCount >= minimumMemberCount; - } - - /** - * Returns true if the group cannot accept any more new members - */ - public boolean isFull() { - return memberCount >= maximumMemberCount; - } - - public int getMemberCount() { - return memberCount; - } - - public int getMaximumMemberCount() { - return maximumMemberCount; - } - - public void setMaximumMemberCount(int maximumMemberCount) { - this.maximumMemberCount = maximumMemberCount; - } - - public int getMinimumMemberCount() { - return minimumMemberCount; - } - - public void setMinimumMemberCount(int minimumMemberCount) { - this.minimumMemberCount = minimumMemberCount; - } -} diff --git a/activecluster/src/java/org/apache/activecluster/group/GroupClusterListener.java b/activecluster/src/java/org/apache/activecluster/group/GroupClusterListener.java deleted file mode 100644 index 4e174b1bbe..0000000000 --- a/activecluster/src/java/org/apache/activecluster/group/GroupClusterListener.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster.group; - -import org.apache.activecluster.ClusterEvent; -import org.apache.activecluster.ClusterListener; - -/** - * A {@link ClusterListener} which maintains a {@link GroupModel} implementation - * - * @version $Revision: 1.2 $ - */ -public class GroupClusterListener implements ClusterListener { - private GroupModel model; - - public GroupClusterListener(GroupModel model) { - this.model = model; - } - - // Properties - //------------------------------------------------------------------------- - public GroupModel getModel() { - return model; - } - - // ClusterListener interface - //------------------------------------------------------------------------- - public void onNodeAdd(ClusterEvent event) { - model.addNode(event.getNode()); - } - - public void onNodeUpdate(ClusterEvent event) { - } - - public void onNodeRemoved(ClusterEvent event) { - model.removeNode(event.getNode()); - } - - public void onNodeFailed(ClusterEvent event) { - model.removeNode(event.getNode()); - } - - public void onCoordinatorChanged(ClusterEvent event) { - } -} diff --git a/activecluster/src/java/org/apache/activecluster/group/GroupModel.java b/activecluster/src/java/org/apache/activecluster/group/GroupModel.java deleted file mode 100644 index 2ef677ef5d..0000000000 --- a/activecluster/src/java/org/apache/activecluster/group/GroupModel.java +++ /dev/null @@ -1,330 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster.group; - -import org.apache.activecluster.Node; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -/** - * Represents a collection of zero or more groups in a cluster. - * The default implementation will create groups as nodes are added to the cluster; filling - * the groups with its required number of buddies / slaves until a new group can be created. - *

- * Nodes which are not allowed to be master nodes will be kept around in a pool ready to be added - * as slaves when a new master arrives and forces the creation of a group. - * - * @version $Revision: 1.2 $ - * @see Group - */ -public class GroupModel { - private int maximumGroups = -1; - private int minimumMemberCount = 2; - private int maximumMemberCount = 3; - private List groups = new ArrayList(); - private LinkedList incompleteGroups = new LinkedList(); - private LinkedList completeGroups = new LinkedList(); - private LinkedList fullGroups = new LinkedList(); - private LinkedList unusedNodes = new LinkedList(); - private NodeFilter masterFilter; - private Map nodeMemberships = new HashMap(); - - // allow a node to be a master and 2 buddies - private int maximumWeighting = 10; - - /** - * Adds the new node to this group model; we assume the node has not been added before. - * - * @param node - */ - public synchronized void addNode(Node node) { - if (!addToExistingGroup(node)) { - Group group = makeNewGroup(node); - if (group == null) { - addToUnusedNodes(node); - } - else { - addGroup(group); - } - } - } - - /** - * Removes the node from the group model - * - * @param node - */ - public synchronized void removeNode(Node node) { - unusedNodes.remove(node); - - // lets remove the node from each group - for (Iterator iter = groups.iterator(); iter.hasNext();) { - Group group = (Group) iter.next(); - boolean wasFull = group.isFull(); - boolean wasUsable = group.isUsable(); - - if (removeNodeFromGroup(group, node)) { - updateGroupCollections(group, wasFull, wasUsable); - } - } - } - - // Properties - //------------------------------------------------------------------------- - - /** - * Returns a snapshot of the groups currently available - */ - public synchronized List getGroups() { - return new ArrayList(groups); - } - - public NodeFilter getMasterFilter() { - return masterFilter; - } - - public void setMasterFilter(NodeFilter masterFilter) { - this.masterFilter = masterFilter; - } - - public int getMaximumGroups() { - return maximumGroups; - } - - public void setMaximumGroups(int maximumGroups) { - this.maximumGroups = maximumGroups; - } - - public int getMaximumMemberCount() { - return maximumMemberCount; - } - - public void setMaximumMemberCount(int maximumMemberCount) { - this.maximumMemberCount = maximumMemberCount; - } - - public int getMinimumMemberCount() { - return minimumMemberCount; - } - - public void setMinimumMemberCount(int minimumMemberCount) { - this.minimumMemberCount = minimumMemberCount; - } - - public int getMaximumWeighting() { - return maximumWeighting; - } - - public void setMaximumWeighting(int maximumWeighting) { - this.maximumWeighting = maximumWeighting; - } - - - // Implementation methods - //------------------------------------------------------------------------- - - - /** - * Attempt to make a new group with the current node as the master - * or if the node cannot be a master node - * - * @return the newly created group or false if none was created. - */ - protected Group makeNewGroup(Node node) { - // no pending groups available so lets try and create a new group - if (canCreateGroup(node)) { - Group group = createGroup(node); - addNodeToGroup(group, node); - return group; - } - else { - return null; - } - } - - protected void tryToFillGroupWithBuddies(Group group) { - boolean continueFillingGroups = true; - while (!group.isUsable() && continueFillingGroups) { - continueFillingGroups = tryToAddBuddy(group); - } - - if (continueFillingGroups) { - // lets try fill more unfilled nodes - for (Iterator iter = new ArrayList(incompleteGroups).iterator(); iter.hasNext() && continueFillingGroups;) { - group = (Group) iter.next(); - - boolean wasFull = group.isFull(); - boolean wasUsable = group.isUsable(); - - while (!group.isUsable() && continueFillingGroups) { - continueFillingGroups = tryToAddBuddy(group); - } - - if (group.isUsable()) { - updateGroupCollections(group, wasFull, wasUsable); - } - } - } - } - - protected boolean tryToAddBuddy(Group group) { - boolean continueFillingGroups = true; - // TODO we could make this much faster using a weighting-sorted collection - NodeMemberships lowest = null; - int lowestWeight = 0; - for (Iterator iter = nodeMemberships.values().iterator(); iter.hasNext();) { - NodeMemberships memberships = (NodeMemberships) iter.next(); - if (!memberships.isMember(group)) { - int weighting = memberships.getWeighting(); - if ((lowest == null || weighting < lowestWeight) && weighting < maximumWeighting) { - lowest = memberships; - lowestWeight = weighting; - } - } - } - if (lowest == null) { - continueFillingGroups = false; - } - else { - addNodeToGroup(group, lowest.getNode()); - } - return continueFillingGroups; - } - - /** - * Lets move the group from its current state collection to the new collection if its - * state has changed - */ - protected void updateGroupCollections(Group group, boolean wasFull, boolean wasUsable) { - boolean full = group.isFull(); - if (wasFull && !full) { - fullGroups.remove(group); - } - boolean usable = group.isUsable(); - if (wasUsable && !usable) { - completeGroups.remove(group); - } - if ((!usable || !full) && (wasFull || wasUsable)) { - incompleteGroups.add(group); - } - } - - protected void addToUnusedNodes(Node node) { - // lets add the node to the pool ready to be used if a node fails - unusedNodes.add(node); - } - - /** - * Attempts to add the node to an incomplete group, or - * a not-full group and returns true if its possible - else returns false - * - * @return true if the node has been added to a groupu - */ - protected boolean addToExistingGroup(Node node) { - if (!addToIncompleteGroup(node)) { - if (!addToNotFullGroup(node)) { - return false; - } - } - return true; - } - - protected boolean addToNotFullGroup(Node node) { - return addToPendingGroup(completeGroups, node); - } - - protected boolean addToIncompleteGroup(Node node) { - return addToPendingGroup(incompleteGroups, node); - } - - /** - * Adds the given node to the first pending group if possible - * - * @return true if the node was added to the first available group - */ - protected boolean addToPendingGroup(LinkedList list, Node node) { - if (!list.isEmpty()) { - Group group = (Group) list.getFirst(); - addNodeToGroup(group, node); - if (group.isFull()) { - list.removeFirst(); - fullGroups.add(group); - } - else if (group.isUsable()) { - list.removeFirst(); - completeGroups.add(group); - } - return true; - } - return false; - } - - protected void addNodeToGroup(Group group, Node node) { - NodeMemberships memberships = (NodeMemberships) nodeMemberships.get(node); - if (memberships == null) { - memberships = new NodeMemberships(node); - nodeMemberships.put(node, memberships); - } - memberships.addToGroup(group); - } - - protected boolean removeNodeFromGroup(Group group, Node node) { - NodeMemberships memberships = (NodeMemberships) nodeMemberships.get(node); - if (memberships != null) { - return memberships.removeFromGroup(group); - } - return false; - } - - - protected void addGroup(Group group) { - groups.add(group); - if (group.isFull()) { - fullGroups.add(group); - } - else if (group.isUsable()) { - completeGroups.add(group); - } - else { - incompleteGroups.add(group); - } - } - - protected Group createGroup(Node node) { - return new Group(minimumMemberCount, maximumMemberCount); - } - - /** - * Returns true if we can add a new group to the cluster - */ - protected boolean canCreateGroup(Node node) { - return (maximumGroups < 0 || groups.size() < maximumGroups) && canBeMaster(node); - } - - /** - * Returns true if the given node can be a master - */ - protected boolean canBeMaster(Node node) { - return masterFilter == null || masterFilter.evaluate(node); - } -} diff --git a/activecluster/src/java/org/apache/activecluster/group/MasterZoneFilter.java b/activecluster/src/java/org/apache/activecluster/group/MasterZoneFilter.java deleted file mode 100644 index 31fd64f443..0000000000 --- a/activecluster/src/java/org/apache/activecluster/group/MasterZoneFilter.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster.group; - -import org.apache.activecluster.Node; - -import java.util.List; - -/** - * A filter configured with a list of DMZ zones on which to restrict which nodes - * are allowed to be master nodes. - * - * @version $Revision: 1.2 $ - */ -public class MasterZoneFilter implements NodeFilter { - private List zones; - - public MasterZoneFilter(List zones) { - this.zones = zones; - } - - public boolean evaluate(Node node) { - Object zone = node.getZone(); - return zones.contains(zone); - } -} diff --git a/activecluster/src/java/org/apache/activecluster/group/Membership.java b/activecluster/src/java/org/apache/activecluster/group/Membership.java deleted file mode 100644 index 9b9f7ee7ce..0000000000 --- a/activecluster/src/java/org/apache/activecluster/group/Membership.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster.group; - -/** - * Represents the membership of a Group for a Node - * - * @version $Revision: 1.2 $ - */ -public class Membership { - public static final int STATUS_REQUESTED = 1; - public static final int STATUS_SYNCHONIZING = 2; - public static final int STATUS_FAILED = 3; - public static final int STATUS_OK = 4; - - private Group group; - private int index; - private int status = STATUS_REQUESTED; - - public Membership(Group group, int index) { - this.group = group; - this.index = index; - } - - public int getIndex() { - return index; - } - - public void setIndex(int index) { - this.index = index; - } - - public int getStatus() { - return status; - } - - public void setStatus(int status) { - this.status = status; - } - - /** - * @return the weighting of this membership - */ - public int getWeighting() { - // lets make master heavy and the further from the end of the - // list of slaves, the lighter we become - return group.getMaximumMemberCount() - getIndex(); - } -} diff --git a/activecluster/src/java/org/apache/activecluster/group/NodeFilter.java b/activecluster/src/java/org/apache/activecluster/group/NodeFilter.java deleted file mode 100644 index 1a90c2b29e..0000000000 --- a/activecluster/src/java/org/apache/activecluster/group/NodeFilter.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster.group; - -import org.apache.activecluster.Node; - -/** - * Represents a filter on a Node to allow a pluggable - * Strategy Pattern to decide which nodes can be master nodes etc. - * - * @version $Revision: 1.2 $ - */ -public interface NodeFilter { - - /** - * Returns true if the given node matches the filter - */ - public boolean evaluate(Node node); -} diff --git a/activecluster/src/java/org/apache/activecluster/group/NodeMemberships.java b/activecluster/src/java/org/apache/activecluster/group/NodeMemberships.java deleted file mode 100644 index 2a64620b6e..0000000000 --- a/activecluster/src/java/org/apache/activecluster/group/NodeMemberships.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster.group; - -import org.apache.activecluster.Node; - -import java.util.HashMap; -import java.util.Map; - -/** - * Represents all of the memberhips of a node and can be used to act - * as a weighting to decide which is the least heavily loaded Node - * to be assigned to a buddy group. - * - * @version $Revision: 1.2 $ - */ -public class NodeMemberships { - private Node node; - private Map memberships = new HashMap(); - private int weighting; - - public NodeMemberships(Node node) { - this.node = node; - } - - public void addToGroup(Group group) { - if (!isMember(group)) { - int index = group.addMember(node); - Membership membership = new Membership(group, index); - memberships.put(group, membership); - weighting += membership.getWeighting(); - } - } - - public boolean removeFromGroup(Group group) { - // TODO when we remove a node from a group, we need to reweight the - // other nodes in the group - - memberships.remove(group); - return group.removeMember(node); - } - - public Node getNode() { - return node; - } - - /** - * Returns the weighting of how heavily loaded the node is - * so that a decision can be made on which node to buddy group - * with - */ - public int getWeighting() { - return weighting; - } - - /** - * Returns true if this node is a member of the given group - */ - public boolean isMember(Group group) { - return memberships.containsKey(group); - } -} diff --git a/activecluster/src/java/org/apache/activecluster/group/package.html b/activecluster/src/java/org/apache/activecluster/group/package.html deleted file mode 100644 index a346b463a5..0000000000 --- a/activecluster/src/java/org/apache/activecluster/group/package.html +++ /dev/null @@ -1,26 +0,0 @@ - - - - - - -Contains Group Organsisation models and policies for arranging {@link org.apache.activecluster.Node} instances into -groups, such as buddy-groups (failover nodes) or master/slave groups for High Availability (HA) protocols. - - - diff --git a/activecluster/src/java/org/apache/activecluster/impl/ActiveMQClusterFactory.java b/activecluster/src/java/org/apache/activecluster/impl/ActiveMQClusterFactory.java deleted file mode 100644 index ab4fa67a1f..0000000000 --- a/activecluster/src/java/org/apache/activecluster/impl/ActiveMQClusterFactory.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster.impl; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activecluster.impl.DefaultClusterFactory; - -/** - * An implementation of {@link org.apache.activecluster.ClusterFactory} using - * ActiveMQ - * - * @version $Revision: 1.4 $ - */ -public class ActiveMQClusterFactory extends DefaultClusterFactory { - - public static String DEFAULT_CLUSTER_URL = "peer://org.apache.activecluster?persistent=false"; - - public ActiveMQClusterFactory() { - super(new ActiveMQConnectionFactory(DEFAULT_CLUSTER_URL)); - } - - public ActiveMQClusterFactory(String brokerURL) { - super(new ActiveMQConnectionFactory(brokerURL)); - } - - public ActiveMQClusterFactory(ActiveMQConnectionFactory connectionFactory) { - super(connectionFactory); - } - - public ActiveMQClusterFactory(boolean transacted, int acknowledgeMode, String dataTopicPrefix, long inactiveTime) { - super(new ActiveMQConnectionFactory(DEFAULT_CLUSTER_URL), transacted, acknowledgeMode, dataTopicPrefix, inactiveTime); - } - - public ActiveMQClusterFactory(ActiveMQConnectionFactory connectionFactory, boolean transacted, int acknowledgeMode, String dataTopicPrefix, long inactiveTime) { - super(connectionFactory, transacted, acknowledgeMode, dataTopicPrefix, inactiveTime); - } - - public ActiveMQConnectionFactory getActiveMQConnectionFactory() { - return (ActiveMQConnectionFactory) getConnectionFactory(); - } - - public void setActiveMQConnectionFactory(ActiveMQConnectionFactory connectionFactory) { - setConnectionFactory(connectionFactory); - } - -} diff --git a/activecluster/src/java/org/apache/activecluster/impl/DefaultCluster.java b/activecluster/src/java/org/apache/activecluster/impl/DefaultCluster.java deleted file mode 100644 index e3c0810d44..0000000000 --- a/activecluster/src/java/org/apache/activecluster/impl/DefaultCluster.java +++ /dev/null @@ -1,233 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ -package org.apache.activecluster.impl; - -import java.io.Serializable; -import java.util.Map; -import java.util.Timer; -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Session; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; - -import org.apache.activecluster.Cluster; -import org.apache.activecluster.ClusterListener; -import org.apache.activecluster.DestinationMarshaller; -import org.apache.activecluster.LocalNode; -import org.apache.activecluster.Service; -import org.apache.activecluster.election.ElectionStrategy; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import edu.emory.mathcs.backport.java.util.concurrent.atomic.*; -/** - * A default implementation of ActiveCluster which uses standard JMS operations - * - * @version $Revision: 1.6 $ - */ -public class DefaultCluster implements Cluster { - - private final static Log log = LogFactory.getLog(DefaultCluster.class); - - private StateServiceImpl stateService; - private LocalNode localNode; - private Destination destination; - private Connection connection; - private Session session; - private MessageProducer producer; - private MessageConsumer consumer; - private Timer timer; - private DestinationMarshaller marshaller; - private AtomicBoolean started = new AtomicBoolean(false); - private Object clusterLock = new Object(); - - /** - * Construct this beast - * @param localNode - * @param dataDestination - * @param destination - * @param marshaller - * @param connection - * @param session - * @param producer - * @param timer - * @param inactiveTime - * @throws JMSException - */ - public DefaultCluster(final LocalNode localNode,Destination dataDestination,Destination destination, - DestinationMarshaller marshaller,Connection connection,Session session,MessageProducer producer, - Timer timer,long inactiveTime) throws JMSException{ - this.localNode=localNode; - this.destination=destination; - this.marshaller=marshaller; - this.connection=connection; - this.session=session; - this.producer=producer; - this.timer=timer; - if(producer==null){ - throw new IllegalArgumentException("No producer specified!"); - } - // now lets subscribe the service to the updates from the data topic - consumer=session.createConsumer(dataDestination,null,true); - log.info("Creating data consumer on topic: "+dataDestination); - this.stateService=new StateServiceImpl(this,clusterLock,new Runnable(){ - public void run(){ - if(localNode instanceof ReplicatedLocalNode){ - ((ReplicatedLocalNode) localNode).pingRemoteNodes(); - } - } - },timer,inactiveTime); - consumer.setMessageListener(new StateConsumer(stateService,marshaller)); - } - - public void addClusterListener(ClusterListener listener) { - stateService.addClusterListener(listener); - } - - public void removeClusterListener(ClusterListener listener) { - stateService.removeClusterListener(listener); - } - - public Destination getDestination() { - return destination; - } - - public LocalNode getLocalNode() { - return localNode; - } - - public Map getNodes() { - return stateService.getNodes(); - } - - public void setElectionStrategy(ElectionStrategy strategy) { - stateService.setElectionStrategy(strategy); - } - - - public void send(Destination replyTo, Message message) throws JMSException{ - producer.send(replyTo,message); - } - - public MessageConsumer createConsumer(Destination destination) throws JMSException { - return getSession().createConsumer(destination); - } - - public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException { - return getSession().createConsumer(destination, selector); - } - - public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException { - return getSession().createConsumer(destination, selector, noLocal); - } - - public Message createMessage() throws JMSException { - return getSession().createMessage(); - } - - public BytesMessage createBytesMessage() throws JMSException { - return getSession().createBytesMessage(); - } - - public MapMessage createMapMessage() throws JMSException { - return getSession().createMapMessage(); - } - - public ObjectMessage createObjectMessage() throws JMSException { - return getSession().createObjectMessage(); - } - - public ObjectMessage createObjectMessage(Serializable object) throws JMSException { - return getSession().createObjectMessage(object); - } - - public StreamMessage createStreamMessage() throws JMSException { - return getSession().createStreamMessage(); - } - - public TextMessage createTextMessage() throws JMSException { - return getSession().createTextMessage(); - } - - public TextMessage createTextMessage(String text) throws JMSException { - return getSession().createTextMessage(text); - } - - public void start() throws JMSException { - if (started.compareAndSet(false, true)) { - connection.start(); - } - } - - public void stop() throws JMSException { - try { - if (localNode instanceof Service) { - ((Service) localNode).stop(); - } - timer.cancel(); - session.close(); - connection.stop(); - connection.close(); - } - finally { - connection = null; - session = null; - } - } - - public boolean waitForClusterToComplete(int expectedCount, long timeout) throws InterruptedException { - timeout = timeout > 0 ? timeout : Long.MAX_VALUE; - long increment = 500; - increment = increment < timeout ? increment : timeout; - long waitTime = timeout; - long start = System.currentTimeMillis(); - synchronized (clusterLock) { - while (stateService.getNodes().size() < expectedCount && started.get() && waitTime > 0) { - clusterLock.wait(increment); - waitTime = timeout - (System.currentTimeMillis() - start); - } - } - return stateService.getNodes().size() >= expectedCount; - } - - protected Session getSession() throws JMSException { - if (session == null) { - throw new JMSException("Cannot perform operation, this cluster connection is now closed"); - } - return session; - } - - /** - * Create a named Destination - * @param name - * @return the Destinatiion - * @throws JMSException - */ - public Destination createDestination(String name) throws JMSException{ - Destination result = getSession().createTopic(name); - return result; - } -} diff --git a/activecluster/src/java/org/apache/activecluster/impl/DefaultClusterFactory.java b/activecluster/src/java/org/apache/activecluster/impl/DefaultClusterFactory.java deleted file mode 100644 index 50d33d74ad..0000000000 --- a/activecluster/src/java/org/apache/activecluster/impl/DefaultClusterFactory.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster.impl; - -import java.util.Timer; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; -import org.apache.activemq.util.IdGenerator; -import org.apache.activecluster.Cluster; -import org.apache.activecluster.ClusterException; -import org.apache.activecluster.ClusterFactory; -import org.apache.activecluster.DestinationMarshaller; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * A Factory of DefaultCluster instances - * - * @version $Revision: 1.4 $ - */ -public class DefaultClusterFactory implements ClusterFactory { - - private final static Log log = LogFactory.getLog(DefaultClusterFactory.class); - - private ConnectionFactory connectionFactory; - private boolean transacted; - private int acknowledgeMode; - private String dataTopicPrefix; - private long inactiveTime; - private boolean useQueueForInbox = false; - private int deliveryMode = DeliveryMode.NON_PERSISTENT; - private IdGenerator idGenerator = new IdGenerator(); - - public DefaultClusterFactory(ConnectionFactory connectionFactory, boolean transacted, int acknowledgeMode, String dataTopicPrefix, long inactiveTime) { - this.connectionFactory = connectionFactory; - this.transacted = transacted; - this.acknowledgeMode = acknowledgeMode; - this.dataTopicPrefix = dataTopicPrefix; - this.inactiveTime = inactiveTime; - } - - public DefaultClusterFactory(ConnectionFactory connectionFactory) { - this(connectionFactory, false, Session.AUTO_ACKNOWLEDGE, "ACTIVECLUSTER.DATA.", 6000L); - } - - public Cluster createCluster(Destination groupDestination) throws JMSException { - return createCluster(idGenerator.generateId(), groupDestination); - } - - public Cluster createCluster(String name,Destination groupDestination) throws JMSException { - Connection connection = getConnectionFactory().createConnection(); - Session session = createSession(connection); - return createCluster(connection, session, name,groupDestination,new DefaultDestinationMarshaller(session)); - } - - public Cluster createCluster(String name,Destination groupDestination,DestinationMarshaller marshaller) throws JMSException { - Connection connection = getConnectionFactory().createConnection(); - Session session = createSession(connection); - return createCluster(connection, session, name,groupDestination,marshaller); - } - - - public Cluster createCluster(String name,String groupDestinationName) throws JMSException{ - Connection connection = getConnectionFactory().createConnection(); - Session session = createSession(connection); - return createCluster(connection, session, name,session.createTopic(groupDestinationName),new DefaultDestinationMarshaller(session)); - } - - public Cluster createCluster(String name,String groupDestinationName,DestinationMarshaller marshaller) throws JMSException{ - Connection connection = getConnectionFactory().createConnection(); - Session session = createSession(connection); - return createCluster(connection, session, name,session.createTopic(groupDestinationName),marshaller); - } - - - - public Cluster createCluster(String groupDestinationName) throws JMSException{ - return createCluster(idGenerator.generateId(), groupDestinationName); - } - - - // Properties - //------------------------------------------------------------------------- - public String getDataTopicPrefix() { - return dataTopicPrefix; - } - - public void setDataTopicPrefix(String dataTopicPrefix) { - this.dataTopicPrefix = dataTopicPrefix; - } - - public int getAcknowledgeMode() { - return acknowledgeMode; - } - - public void setAcknowledgeMode(int acknowledgeMode) { - this.acknowledgeMode = acknowledgeMode; - } - - public long getInactiveTime() { - return inactiveTime; - } - - public void setInactiveTime(long inactiveTime) { - this.inactiveTime = inactiveTime; - } - - public boolean isTransacted() { - return transacted; - } - - public void setTransacted(boolean transacted) { - this.transacted = transacted; - } - - public boolean isUseQueueForInbox() { - return useQueueForInbox; - } - - public void setUseQueueForInbox(boolean useQueueForInbox) { - this.useQueueForInbox = useQueueForInbox; - } - - public ConnectionFactory getConnectionFactory() { - return connectionFactory; - } - - public void setConnectionFactory(ConnectionFactory connectionFactory) { - this.connectionFactory = connectionFactory; - } - - public int getDeliveryMode() { - return deliveryMode; - } - - /** - * Sets the delivery mode of the group based producer - */ - public void setDeliveryMode(int deliveryMode) { - this.deliveryMode = deliveryMode; - } - - public Cluster createCluster(Connection connection,Session session,String name,Destination groupDestination, - DestinationMarshaller marshaller) throws JMSException{ - String dataDestination = dataTopicPrefix + marshaller.getDestinationName(groupDestination); - log.info("Creating cluster group producer on topic: "+groupDestination); - MessageProducer producer=createProducer(session,null); - producer.setDeliveryMode(deliveryMode); - log.info("Creating cluster data producer on data destination: "+dataDestination); - Topic dataTopic=session.createTopic(dataDestination); - MessageProducer keepAliveProducer=session.createProducer(dataTopic); - keepAliveProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - StateService serviceStub=new StateServiceStub(session,keepAliveProducer,marshaller); - Destination localInboxDestination=session.createTopic(dataDestination+"."+name); - ReplicatedLocalNode localNode=new ReplicatedLocalNode(name,localInboxDestination,serviceStub); - Timer timer=new Timer(); - DefaultCluster answer=new DefaultCluster(localNode,dataTopic,groupDestination,marshaller,connection,session, - producer,timer,inactiveTime); - return answer; - } - - /* - * protected Cluster createInternalCluster(Session session, Topic dataDestination) { MessageProducer producer = - * createProducer(session); return new DefaultCluster(new NonReplicatedLocalNode(), dataDestination, connection, - * session, producer); } - */ - - protected MessageProducer createProducer(Session session, Topic groupDestination) throws JMSException { - return session.createProducer(groupDestination); - } - - protected Session createSession(Connection connection) throws JMSException { - return connection.createSession(transacted, acknowledgeMode); - } -} \ No newline at end of file diff --git a/activecluster/src/java/org/apache/activecluster/impl/DefaultDestinationMarshaller.java b/activecluster/src/java/org/apache/activecluster/impl/DefaultDestinationMarshaller.java deleted file mode 100644 index 7c45e0c84d..0000000000 --- a/activecluster/src/java/org/apache/activecluster/impl/DefaultDestinationMarshaller.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ - -package org.apache.activecluster.impl; - - - -import java.util.Map; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.Topic; - -import org.apache.activecluster.DestinationMarshaller; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; - -/** - * A simple marshaller for Destinations - * - * @version $Revision: 1.5 $ - */ -public class DefaultDestinationMarshaller implements DestinationMarshaller { - private final static Log log = LogFactory.getLog(DefaultDestinationMarshaller.class); - - /** - * Keep a cache of name to destination mappings for fast lookup. - */ - private final Map destinations = new ConcurrentHashMap(); - /** - * The active session used to create a new Destination from a name. - */ - private final Session session; - - /** - * Create a marshaller for this specific session. - * @param session the session to use when mapping destinations. - */ - public DefaultDestinationMarshaller(Session session) { - this.session = session; - } - - /** - * Builds a destination from a destinationName - * @param destinationName - * - * @return the destination to send messages to all members of the cluster - */ - public Destination getDestination(String destinationName) throws JMSException { - if (!destinations.containsKey(destinationName)) { - destinations.put(destinationName, session.createTopic(destinationName)); - } - return (Destination) destinations.get(destinationName); - } - - /** - * Gets a destination's physical name - * @param destination - * @return the destination's physical name - */ - public String getDestinationName(Destination destination){ - String result = null; - if (destination != null){ - if (destination instanceof Topic){ - Topic topic = (Topic) destination; - try{ - result = topic.getTopicName(); - }catch(JMSException e){ - log.error("Failed to get topic name for " + destination,e); - } - }else{ - Queue queue = (Queue) destination; - try{ - result = queue.getQueueName(); - }catch(JMSException e){ - log.error("Failed to get queue name for " + destination,e); - } - } - } - return result; - } -} \ No newline at end of file diff --git a/activecluster/src/java/org/apache/activecluster/impl/NodeImpl.java b/activecluster/src/java/org/apache/activecluster/impl/NodeImpl.java deleted file mode 100644 index 10fa43a7c3..0000000000 --- a/activecluster/src/java/org/apache/activecluster/impl/NodeImpl.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ -package org.apache.activecluster.impl; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.HashMap; -import java.util.Map; -import javax.jms.Destination; -import javax.jms.JMSException; - -import org.apache.activecluster.DestinationMarshaller; -import org.apache.activecluster.Node; - - -/** - * Default implementation of a remote Node - * - * @version $Revision: 1.3 $ - */ -public class NodeImpl implements Node{ - private static final long serialVersionUID=-3909792803360045064L; - private String name; - private Destination destination; - protected Map state; - protected boolean coordinator; - - - /** - * Construct an Node from a NodeState - * @param nodeState - * @param marshaller - * @throws JMSException - */ - public NodeImpl(NodeState nodeState,DestinationMarshaller marshaller) throws JMSException{ - this(nodeState.getName(),marshaller.getDestination(nodeState.getDestinationName()),nodeState.getState()); - } - /** - * Allow a node to be copied for sending it as a message - * - * @param node - */ - public NodeImpl(Node node) { - this(node.getName(),node.getDestination(), node.getState()); - } - - /** - * Create a Node - * @param name - * @param destination - */ - public NodeImpl(String name,Destination destination) { - this(name,destination, new HashMap()); - } - - /** - * Create A Node - * @param name - * @param destination - * @param state - */ - public NodeImpl(String name,Destination destination, Map state) { - this.name = name; - this.destination = destination; - this.state = state; - } - - /** - * @return the name of the node - */ - public String getName() { - return name; - } - - /** - * @return pretty print of the node - */ - public String toString() { - return "Node[<" + name + ">destination: " + destination + " state: " + state + "]"; - } - - /** - * @return the destination of the node - */ - public Destination getDestination() { - return destination; - } - - /** - * Get the State - * @return the State of the Node - */ - public synchronized Map getState() { - return new HashMap(state); - } - - - /** - * @return true if this node has been elected as coordinator - */ - public boolean isCoordinator() { - return coordinator; - } - - /** - * Get the zone - * @return the Zone - */ - public Object getZone() { - return state.get("zone"); - } - - // Implementation methods - //------------------------------------------------------------------------- - - protected synchronized void setState(Map state) { - this.state = state; - } - - protected void setCoordinator(boolean value) { - coordinator = value; - } - - public void writeExternal(ObjectOutput out) throws IOException{ - // TODO Auto-generated method stub - - } - - public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException{ - // TODO Auto-generated method stub - - } -} diff --git a/activecluster/src/java/org/apache/activecluster/impl/NodeState.java b/activecluster/src/java/org/apache/activecluster/impl/NodeState.java deleted file mode 100644 index 5d3f0b9f69..0000000000 --- a/activecluster/src/java/org/apache/activecluster/impl/NodeState.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ -package org.apache.activecluster.impl; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Map; - -import org.apache.activecluster.DestinationMarshaller; -import org.apache.activecluster.Node; -/** - * Default implementation of a remote Node - * - * @version $Revision: 1.3 $ - */ -public class NodeState implements Externalizable{ - private static final long serialVersionUID=-3909792803360045064L; - private String name; - private String destinationName; - protected Map state; - protected boolean coordinator; - - /** - * DefaultConstructor - * - */ - public NodeState(){ - } - - /** - * Construct a NodeState from a Node - * @param node - * @param marshaller - */ - public NodeState(Node node, DestinationMarshaller marshaller){ - this.name = node.getName(); - this.destinationName = marshaller.getDestinationName(node.getDestination()); - this.state = node.getState(); - this.coordinator = node.isCoordinator(); - } - - /** - * @return pretty print of the node - */ - public String toString(){ - return "NodeState[<"+name+">destinationName: "+destinationName+" state: "+state+"]"; - } - - /** - * @return Returns the coordinator. - */ - public boolean isCoordinator(){ - return coordinator; - } - - /** - * @param coordinator - * The coordinator to set. - */ - public void setCoordinator(boolean coordinator){ - this.coordinator=coordinator; - } - - /** - * @return Returns the destinationName. - */ - public String getDestinationName(){ - return destinationName; - } - - /** - * @param destinationName - * The destinationName to set. - */ - public void setDestinationName(String destinationName){ - this.destinationName=destinationName; - } - - /** - * @return Returns the name. - */ - public String getName(){ - return name; - } - - /** - * @param name - * The name to set. - */ - public void setName(String name){ - this.name=name; - } - - /** - * @return Returns the state. - */ - public Map getState(){ - return state; - } - - /** - * @param state - * The state to set. - */ - public void setState(Map state){ - this.state=state; - } - - /** - * write to a stream - * - * @param out - * @throws IOException - */ - public void writeExternal(ObjectOutput out) throws IOException{ - out.writeUTF((name!=null?name:"")); - out.writeUTF((destinationName!=null?destinationName:"")); - out.writeBoolean(coordinator); - out.writeObject(state); - } - - /** - * read from a stream - * - * @param in - * @throws IOException - * @throws ClassNotFoundException - */ - public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException{ - this.name=in.readUTF(); - this.destinationName=in.readUTF(); - this.coordinator=in.readBoolean(); - this.state=(Map) in.readObject(); - } -} diff --git a/activecluster/src/java/org/apache/activecluster/impl/NonReplicatedLocalNode.java b/activecluster/src/java/org/apache/activecluster/impl/NonReplicatedLocalNode.java deleted file mode 100644 index cace6a104c..0000000000 --- a/activecluster/src/java/org/apache/activecluster/impl/NonReplicatedLocalNode.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ -package org.apache.activecluster.impl; - -import java.util.Map; -import javax.jms.Destination; - -import org.apache.activecluster.LocalNode; - -/** - * Default implementation of a local Node which doesn't - * have its state replicated - * - * @version $Revision: 1.4 $ - */ -public class NonReplicatedLocalNode extends NodeImpl implements LocalNode { - private static final long serialVersionUID=2525565639637967143L; - - /** - * Create a Non-replicated local node - * @param name - * @param destination - */ - public NonReplicatedLocalNode(String name, Destination destination) { - super(name,destination); - } - - /** - * Set the local state - * @param state - */ - public void setState(Map state) { - super.setState(state); - } - - /** - * Shouldn't be called for non-replicated local nodes - */ - public void pingRemoteNodes() { - throw new RuntimeException("Non-Replicated Local Node should not distribute it's state!"); - } -} \ No newline at end of file diff --git a/activecluster/src/java/org/apache/activecluster/impl/ReplicatedLocalNode.java b/activecluster/src/java/org/apache/activecluster/impl/ReplicatedLocalNode.java deleted file mode 100644 index 15c76a5311..0000000000 --- a/activecluster/src/java/org/apache/activecluster/impl/ReplicatedLocalNode.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ -package org.apache.activecluster.impl; - -import java.util.Map; -import javax.jms.Destination; -import javax.jms.JMSException; - -import org.apache.activecluster.LocalNode; -import org.apache.activecluster.Service; - - -/** - * Default implementation of a local Node which has its - * state replicated across the cluster - * - * @version $Revision: 1.3 $ - */ -public class ReplicatedLocalNode extends NodeImpl implements LocalNode, Service { - - /** - * - */ - private static final long serialVersionUID=4626381612145333540L; - private transient StateService serviceStub; - - /** - * Create ReplicatedLocalNode - * @param name - * @param destination - * @param serviceStub - */ - public ReplicatedLocalNode(String name,Destination destination, StateService serviceStub) { - super(name,destination); - this.serviceStub = serviceStub; - } - - /** - * Set the State of the local node - * @param state - */ - public void setState(Map state) { - super.setState(state); - serviceStub.keepAlive(this); - } - - /** - * ping remote nodes - * - */ - public void pingRemoteNodes() { - serviceStub.keepAlive(this); - } - - /** - * start (lifecycle) - * @throws JMSException - */ - public void start() throws JMSException { - } - - /** - * stop (lifecycle) - * @throws JMSException - */ - public void stop() throws JMSException { - } -} diff --git a/activecluster/src/java/org/apache/activecluster/impl/SimpleDestinationMarshaller.java b/activecluster/src/java/org/apache/activecluster/impl/SimpleDestinationMarshaller.java deleted file mode 100644 index e9e36e0967..0000000000 --- a/activecluster/src/java/org/apache/activecluster/impl/SimpleDestinationMarshaller.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ - -package org.apache.activecluster.impl; - - - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.Topic; -import org.apache.activecluster.DestinationMarshaller; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * A simple marshaller for Destinations - * - * @version $Revision: 1.5 $ - */ -public class SimpleDestinationMarshaller implements DestinationMarshaller { - private final static Log log = LogFactory.getLog(SimpleDestinationMarshaller.class); - /** - * Builds a destination from a destinationName - * @param destinationName - * - * @return the destination to send messages to all members of the cluster - */ - public Destination getDestination(String destinationName){ - return new ActiveMQTopic(destinationName); - } - - /** - * Gets a destination's physical name - * @param destination - * @return the destination's physical name - */ - public String getDestinationName(Destination destination){ - String result = null; - if (destination != null){ - if (destination instanceof Topic){ - Topic topic = (Topic) destination; - try{ - result = topic.getTopicName(); - }catch(JMSException e){ - log.error("Failed to get topic name for " + destination,e); - } - }else{ - Queue queue = (Queue) destination; - try{ - result = queue.getQueueName(); - }catch(JMSException e){ - log.error("Failed to get queue name for " + destination,e); - } - } - } - return result; - } -} \ No newline at end of file diff --git a/activecluster/src/java/org/apache/activecluster/impl/StateConsumer.java b/activecluster/src/java/org/apache/activecluster/impl/StateConsumer.java deleted file mode 100644 index 87de809056..0000000000 --- a/activecluster/src/java/org/apache/activecluster/impl/StateConsumer.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ -package org.apache.activecluster.impl; - -import org.apache.activecluster.DestinationMarshaller; -import org.apache.activecluster.Node; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.ObjectMessage; - - -/** - * A JMS MessageListener which processes inbound messages and - * applies them to a StateService - * - * @version $Revision: 1.2 $ - */ -public class StateConsumer implements MessageListener { - - private final static Log log = LogFactory.getLog(StateConsumer.class); - - private StateService stateService; - private DestinationMarshaller marshaller; - - public StateConsumer(StateService stateService,DestinationMarshaller marshaller) { - if (stateService == null) { - throw new IllegalArgumentException("Must specify a valid StateService implementation"); - } - this.stateService = stateService; - this.marshaller = marshaller; - } - - public void onMessage(Message message) { - if (log.isDebugEnabled()) { - log.debug("Received cluster data message!: " + message); - } - - if (message instanceof ObjectMessage) { - ObjectMessage objectMessage = (ObjectMessage) message; - try { - NodeState nodeState = (NodeState) objectMessage.getObject(); - Node node = new NodeImpl(nodeState,marshaller); - String type = objectMessage.getJMSType(); - if (type != null && type.equals("shutdown")) { - stateService.shutdown(node); - } - else { - stateService.keepAlive(node); - } - } - catch (Exception e) { - log.error("Could not extract node from message: " + e + ". Message: " + message, e); - } - } - else { - log.warn("Ignoring message: " + message); - } - } -} diff --git a/activecluster/src/java/org/apache/activecluster/impl/StateService.java b/activecluster/src/java/org/apache/activecluster/impl/StateService.java deleted file mode 100644 index 313159ec1b..0000000000 --- a/activecluster/src/java/org/apache/activecluster/impl/StateService.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster.impl; - -import org.apache.activecluster.Node; - - -/** - * A client side proxy to the remove cluster - * - * @version $Revision: 1.3 $ - */ -public interface StateService { - - /** - * Sends a keep alive to the cluster - * - * @param node - */ - public void keepAlive(Node node); - - /** - * Sends a shutdown message to the cluster - */ - public void shutdown(Node node); - -} diff --git a/activecluster/src/java/org/apache/activecluster/impl/StateServiceImpl.java b/activecluster/src/java/org/apache/activecluster/impl/StateServiceImpl.java deleted file mode 100644 index e3873e78d1..0000000000 --- a/activecluster/src/java/org/apache/activecluster/impl/StateServiceImpl.java +++ /dev/null @@ -1,302 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ -package org.apache.activecluster.impl; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.Map.Entry; -import javax.jms.Destination; -import javax.jms.JMSException; - -import org.apache.activecluster.Cluster; -import org.apache.activecluster.ClusterEvent; -import org.apache.activecluster.ClusterListener; -import org.apache.activecluster.Node; -import org.apache.activecluster.election.ElectionStrategy; -import org.apache.activecluster.election.impl.BullyElectionStrategy; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; -import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; - - -/** - * Represents a node list - * - * @version $Revision: 1.4 $ - */ -public class StateServiceImpl implements StateService { - - private final static Log log = LogFactory.getLog(StateServiceImpl.class); - private Cluster cluster; - private Object clusterLock; - private Map nodes = new ConcurrentHashMap(); - private long inactiveTime; - private List listeners = new CopyOnWriteArrayList(); - private Destination localDestination; - private Runnable localNodePing; - private NodeImpl coordinator; - private ElectionStrategy electionStrategy; - - /** - * @param cluster - * @param clusterLock - * @param localNodePing - * @param timer - * @param inactiveTime - */ - /** - * Constructor StateServiceImpl - * @param cluster - * @param clusterLock - * @param localNodePing - * @param timer - * @param inactiveTime - */ - public StateServiceImpl(Cluster cluster, Object clusterLock, Runnable localNodePing, Timer timer, long inactiveTime) { - this.cluster = cluster; - this.clusterLock = clusterLock; - this.localDestination = cluster.getLocalNode().getDestination(); - this.localNodePing = localNodePing; - this.inactiveTime = inactiveTime; - long delay = inactiveTime / 3; - timer.scheduleAtFixedRate(createTimerTask(), delay, delay); - (this.coordinator = (NodeImpl) cluster.getLocalNode()).setCoordinator(true); - this.electionStrategy = new BullyElectionStrategy(); - } - - /** - * @return the current election strategy - */ - public ElectionStrategy getElectionStrategy() { - return electionStrategy; - } - - /** - * set the election strategy - * - * @param electionStrategy - */ - public void setElectionStrategy(ElectionStrategy electionStrategy) { - this.electionStrategy = electionStrategy; - } - - /** - * Get time of since last communication - * @return length of time inactive - */ - public long getInactiveTime() { - return inactiveTime; - } - - /** - * Set the time inactive - * @param inactiveTime - */ - public void setInactiveTime(long inactiveTime) { - this.inactiveTime = inactiveTime; - } - - /** - * Get A Map of nodes - where key = destination, value = node - * @return map of destination/nodes - */ - public Map getNodes() { - HashMap answer = new HashMap(nodes.size()); - for (Iterator iter = nodes.entrySet().iterator(); iter.hasNext();) { - Map.Entry entry = (Map.Entry) iter.next(); - Object key = entry.getKey(); - NodeEntry nodeEntry = (NodeEntry) entry.getValue(); - answer.put(key, nodeEntry.node); - } - return answer; - } - - /** - * Got a keepalive - * @param node - */ - public void keepAlive(Node node) { - Object key = node.getDestination(); - if (key != null && !localDestination.equals(key)) { - NodeEntry entry = (NodeEntry) nodes.get(key); - if (entry == null) { - entry = new NodeEntry(); - entry.node = node; - nodes.put(key, entry); - nodeAdded(node); - synchronized (clusterLock) { - clusterLock.notifyAll(); - } - } - else { - // has the data changed - if (stateHasChanged(entry.node, node)) { - entry.node = node; - nodeUpdated(node); - } - } - - // lets update the timer at which the node will be considered - // to be dead - entry.lastKeepAlive = getTimeMillis(); - } - } - - /** - * shutdown the node - */ - public void shutdown(Node node){ - Object key=node.getDestination(); - if(key!=null){ - nodes.remove(key); - ClusterEvent event=new ClusterEvent(cluster,node,ClusterEvent.ADD_NODE); - for (Iterator i = listeners.iterator(); i.hasNext();){ - ClusterListener listener=(ClusterListener) i.next(); - listener.onNodeRemoved(event); - } - } - } - - /** - * check nodes are alive - * - */ - - public void checkForTimeouts() { - localNodePing.run(); - long time = getTimeMillis(); - for (Iterator iter = nodes.entrySet().iterator(); iter.hasNext();) { - Map.Entry entry = (Entry) iter.next(); - NodeEntry nodeEntry = (NodeEntry) entry.getValue(); - if (nodeEntry.lastKeepAlive + inactiveTime < time) { - iter.remove(); - nodeFailed(nodeEntry.node); - } - } - } - - public TimerTask createTimerTask() { - return new TimerTask() { - public void run() { - checkForTimeouts(); - } - }; - } - - public void addClusterListener(ClusterListener listener) { - listeners.add(listener); - } - - public void removeClusterListener(ClusterListener listener) { - listeners.remove(listener); - } - - protected void nodeAdded(Node node) { - ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.ADD_NODE); - // lets take a copy to make contention easier - Object[] array = listeners.toArray(); - for (int i = 0, size = array.length; i < size; i++) { - ClusterListener listener = (ClusterListener) array[i]; - listener.onNodeAdd(event); - } - doElection(); - } - - protected void nodeUpdated(Node node) { - ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.UPDATE_NODE); - // lets take a copy to make contention easier - Object[] array = listeners.toArray(); - for (int i = 0, size = array.length; i < size; i++) { - ClusterListener listener = (ClusterListener) array[i]; - listener.onNodeUpdate(event); - } - } - - protected void nodeFailed(Node node) { - ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.REMOVE_NODE); - // lets take a copy to make contention easier - Object[] array = listeners.toArray(); - for (int i = 0, size = array.length; i < size; i++) { - ClusterListener listener = (ClusterListener) array[i]; - listener.onNodeFailed(event); - } - doElection(); - } - - protected void coordinatorChanged(Node node) { - ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.ELECTED_COORDINATOR); - // lets take a copy to make contention easier - Object[] array = listeners.toArray(); - for (int i = 0, size = array.length; i < size; i++) { - ClusterListener listener = (ClusterListener) array[i]; - listener.onCoordinatorChanged(event); - } - } - - protected void doElection() { - if (electionStrategy != null) { - try { - NodeImpl newElected = (NodeImpl) electionStrategy.doElection(cluster); - if (newElected != null && !newElected.equals(coordinator)) { - coordinator.setCoordinator(false); - coordinator = newElected; - coordinator.setCoordinator(true); - coordinatorChanged(coordinator); - } - } - catch (JMSException jmsEx) { - log.error("do election failed", jmsEx); - } - } - } - - /** - * For performance we may wish to use a less granualar timing mechanism - * only updating the time every x millis since we're only using - * the time as a judge of when a node has not pinged for at least a few - * hundred millis etc. - */ - protected long getTimeMillis() { - return System.currentTimeMillis(); - } - - protected static class NodeEntry { - public Node node; - public long lastKeepAlive; - } - - - /** - * @return true if the node has changed state from the old in memory copy to the - * newly arrived copy - */ - protected boolean stateHasChanged(Node oldNode, Node newNode) { - Map oldState = oldNode.getState(); - Map newState = newNode.getState(); - if (oldState == newState) { - return false; - } - return oldState == null || newState == null || !oldState.equals(newState); - } -} diff --git a/activecluster/src/java/org/apache/activecluster/impl/StateServiceStub.java b/activecluster/src/java/org/apache/activecluster/impl/StateServiceStub.java deleted file mode 100644 index 35bcf696e8..0000000000 --- a/activecluster/src/java/org/apache/activecluster/impl/StateServiceStub.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ -package org.apache.activecluster.impl; - -import org.apache.activecluster.DestinationMarshaller; -import org.apache.activecluster.Node; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; - - -/** - * A local stub for the state service which sends JMS messages - * to the cluster - * - * @version $Revision: 1.2 $ - */ -public class StateServiceStub implements StateService { - - private final Log log = LogFactory.getLog(getClass()); - - private Session session; - private MessageProducer producer; - private DestinationMarshaller marshaller; - - public StateServiceStub(Session session, MessageProducer producer,DestinationMarshaller marshaller) { - this.session = session; - this.producer = producer; - this.marshaller = marshaller; - } - - public void keepAlive(Node node) { - try { - if (log.isDebugEnabled()) { - log.debug("Sending cluster data message: " + node); - } - - Message message = session.createObjectMessage(new NodeState(node,marshaller)); - producer.send(message); - } - catch (JMSException e) { - log.error("Could not send JMS message: " + e, e); - } - } - - public void shutdown(Node node) { - try { - if (log.isDebugEnabled()) { - log.debug("Sending shutdown message: " + node); - } - - Message message = session.createObjectMessage(new NodeState(node,marshaller)); - message.setJMSType("shutdown"); - producer.send(message); - } - catch (JMSException e) { - log.error("Could not send JMS message: " + e, e); - } - } -} diff --git a/activecluster/src/java/org/apache/activecluster/impl/package.html b/activecluster/src/java/org/apache/activecluster/impl/package.html deleted file mode 100644 index a3a9b7985c..0000000000 --- a/activecluster/src/java/org/apache/activecluster/impl/package.html +++ /dev/null @@ -1,27 +0,0 @@ - - - - - - -

- Default implementation of ActiveCluster using standard JMS API to build the cluster. -

- - - diff --git a/activecluster/src/java/org/apache/activecluster/package.html b/activecluster/src/java/org/apache/activecluster/package.html deleted file mode 100644 index 2d428ac6bb..0000000000 --- a/activecluster/src/java/org/apache/activecluster/package.html +++ /dev/null @@ -1,31 +0,0 @@ - - - - - - -

- ActiveCluster API for working with a simple cluster abstraction for building cluster algorithms - like buddy systems, voting, master/slave protocols, electing a controller and so forth. -

- Clusters communicate across a common destination (typically a Topic) but every member (node)of a - cluster has a unique name (the generation of unique names is left to the developer), and a - unique destination (which uses the unique name). - - - diff --git a/activecluster/src/test/org/apache/activecluster/ChatDemo.java b/activecluster/src/test/org/apache/activecluster/ChatDemo.java deleted file mode 100644 index 97dbb875ef..0000000000 --- a/activecluster/src/test/org/apache/activecluster/ChatDemo.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster; - -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.util.HashMap; -import java.util.Map; -import javax.jms.JMSException; - -import org.apache.activecluster.Cluster; -import org.apache.activecluster.ClusterEvent; -import org.apache.activecluster.ClusterException; -import org.apache.activecluster.ClusterFactory; -import org.apache.activecluster.ClusterListener; -import org.apache.activecluster.impl.ActiveMQClusterFactory; - -/** - * @version $Revision: 1.2 $ - */ -public class ChatDemo implements ClusterListener { - private Cluster cluster; - private String name = "unknown"; - - public static void main(String[] args) { - try { - ChatDemo test = new ChatDemo(); - test.run(); - } - catch (JMSException e) { - System.out.println("Caught: " + e); - e.printStackTrace(); - Exception c = e.getLinkedException(); - if (c != null) { - System.out.println("Cause: " + c); - c.printStackTrace(); - } - } - catch (Exception e) { - System.out.println("Caught: " + e); - e.printStackTrace(); - } - } - - public void run() throws Exception { - cluster = createCluster(); - cluster.addClusterListener(this); - cluster.start(); - - - System.out.println(); - System.out.println(); - System.out.println("Welcome to the ActiveCluster Chat Demo!"); - System.out.println(); - System.out.println("Enter text to talk or type"); - System.out.println(" /quit to terminate the application"); - System.out.println(" /name foo to change your name to be 'foo'"); - - BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); - boolean running = true; - while (running) { - String line = reader.readLine(); - if (line == null || line.trim().equalsIgnoreCase("quit")) { - break; - } - else { - running = processCommand(line.trim()); - } - } - stop(); - } - - protected boolean processCommand(String text) throws JMSException { - if (text.equals("/quit")) { - return false; - } - else { - if (text.startsWith("/name")) { - name = text.substring(5).trim(); - System.out.println("* name now changed to: " + name); - } - else { - // lets talk - Map map = new HashMap(); - map.put("text", text); - map.put("name", name); - cluster.getLocalNode().setState(map); - } - return true; - } - } - - - public void onNodeAdd(ClusterEvent event) { - System.out.println("* " + getName(event) + " has joined the room"); - } - - public void onNodeUpdate(ClusterEvent event) { - System.out.println(getName(event) + "> " + getText(event)); - } - - public void onNodeRemoved(ClusterEvent event) { - System.out.println("* " + getName(event) + " has left the room"); - } - - public void onNodeFailed(ClusterEvent event) { - System.out.println("* " + getName(event) + " has failed unexpectedly"); - } - - public void onCoordinatorChanged(ClusterEvent event){ - - } - - protected Object getName(ClusterEvent event) { - return event.getNode().getState().get("name"); - } - - protected Object getText(ClusterEvent event) { - return event.getNode().getState().get("text"); - } - - protected void stop() throws JMSException { - cluster.stop(); - } - - protected Cluster createCluster() throws JMSException, ClusterException { - ClusterFactory factory = new ActiveMQClusterFactory(); - return factory.createCluster("ORG.CODEHAUS.ACTIVEMQ.TEST.CLUSTER"); - } - -} diff --git a/activecluster/src/test/org/apache/activecluster/ClusterDemo.java b/activecluster/src/test/org/apache/activecluster/ClusterDemo.java deleted file mode 100644 index ddcc16c0bc..0000000000 --- a/activecluster/src/test/org/apache/activecluster/ClusterDemo.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster; - -import org.apache.activecluster.Cluster; -import org.apache.activecluster.ClusterException; -import org.apache.activecluster.ClusterFactory; -import org.apache.activecluster.election.ElectionStrategy; -import org.apache.activecluster.election.impl.BullyElectionStrategy; -import org.apache.activecluster.impl.ActiveMQClusterFactory; -import org.apache.activecluster.impl.DefaultClusterFactory; - -import javax.jms.JMSException; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.util.HashMap; -import java.util.Map; - -/** - * @version $Revision: 1.2 $ - */ -public class ClusterDemo { - protected Cluster cluster; - private String name; - private ElectionStrategy electionStrategy; - - public static void main(String[] args) { - try { - ClusterDemo test = new ClusterDemo(); - if (args.length > 0) { - test.name = args[0]; - } - test.demo(); - } - catch (JMSException e) { - System.out.println("Caught: " + e); - e.printStackTrace(); - Exception c = e.getLinkedException(); - if (c != null) { - System.out.println("Cause: " + c); - c.printStackTrace(); - } - } - catch (Exception e) { - System.out.println("Caught: " + e); - e.printStackTrace(); - } - } - - public void demo() throws Exception { - start(); - - cluster.addClusterListener(new TestingClusterListener(cluster)); - - System.out.println("Enter 'quit' to terminate"); - BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); - while (true) { - String line = reader.readLine(); - if (line == null || line.trim().equalsIgnoreCase("quit")) { - break; - } - else { - Map map = new HashMap(); - map.put("text", line); - cluster.getLocalNode().setState(map); - } - } - stop(); - } - - - protected void start() throws JMSException, ClusterException { - cluster = createCluster(); - if (name != null) { - System.out.println("Starting node: " + name); - - // TODO could we do cluster.setName() ? - Map state = new HashMap(); - state.put("name", name); - cluster.getLocalNode().setState(state); - } - cluster.start(); - if (electionStrategy == null) { - electionStrategy = new BullyElectionStrategy(); - } - electionStrategy.doElection(cluster); - } - - protected void stop() throws JMSException { - cluster.stop(); - } - - protected Cluster createCluster() throws JMSException, ClusterException { - ClusterFactory factory = new ActiveMQClusterFactory(); - return factory.createCluster("ORG.CODEHAUS.ACTIVEMQ.TEST.CLUSTER"); - } -} diff --git a/activecluster/src/test/org/apache/activecluster/ClusterFunctionTest.java b/activecluster/src/test/org/apache/activecluster/ClusterFunctionTest.java deleted file mode 100644 index 70e08cc3ea..0000000000 --- a/activecluster/src/test/org/apache/activecluster/ClusterFunctionTest.java +++ /dev/null @@ -1,209 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster; -import java.util.HashMap; -import java.util.Map; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.ObjectMessage; -import junit.framework.TestCase; - -import org.apache.activecluster.Cluster; -import org.apache.activecluster.ClusterEvent; -import org.apache.activecluster.ClusterListener; -import org.apache.activecluster.impl.DefaultClusterFactory; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.activemq.ActiveMQConnectionFactory; - -/** - * Test ActiveCluster, ActiveMQ, with an eye to putting WADI on top of them. - * - * @author Jules Gosnell - * @version $Revision: 1.4 $ - */ -public class ClusterFunctionTest extends TestCase { - protected Log _log = LogFactory.getLog(ClusterFunctionTest.class); - - public ClusterFunctionTest(String name) { - super(name); - } - protected ActiveMQConnectionFactory _connectionFactory; - protected Connection _connection; - protected DefaultClusterFactory _clusterFactory; - protected Cluster _cluster0; - protected Cluster _cluster1; - - protected void setUp() throws Exception { - testResponsePassed = false; - _connectionFactory = new ActiveMQConnectionFactory("peer://cluster?persistent=false"); - _clusterFactory = new DefaultClusterFactory(_connectionFactory); - _cluster0 = _clusterFactory.createCluster("ORG.CODEHAUS.WADI.TEST.CLUSTER"); - _cluster1 = _clusterFactory.createCluster("ORG.CODEHAUS.WADI.TEST.CLUSTER"); - _cluster0.start(); - _log.info("started node0: " + _cluster0.getLocalNode().getDestination()); - _cluster1.start(); - _log.info("started node1: " + _cluster1.getLocalNode().getDestination()); - } - - protected void tearDown() throws JMSException { - // _cluster1.stop(); - _cluster1 = null; - // _cluster0.stop(); - _cluster0 = null; - _clusterFactory = null; - // _connection.stop(); - _connection = null; - // _connectionFactory.stop(); - } - //---------------------------------------- - class MyClusterListener implements ClusterListener { - public void onNodeAdd(ClusterEvent ce) { - _log.info("node added: " + ce.getNode()); - } - - public void onNodeFailed(ClusterEvent ce) { - _log.info("node failed: " + ce.getNode()); - } - - public void onNodeRemoved(ClusterEvent ce) { - _log.info("node removed: " + ce.getNode()); - } - - public void onNodeUpdate(ClusterEvent ce) { - _log.info("node updated: " + ce.getNode()); - } - - public void onCoordinatorChanged(ClusterEvent ce) { - _log.info("coordinator changed: " + ce.getNode()); - } - } - - public void testCluster() throws Exception { - _cluster0.addClusterListener(new MyClusterListener()); - Map map = new HashMap(); - map.put("text", "testing123"); - _cluster0.getLocalNode().setState(map); - _log.info("nodes: " + _cluster0.getNodes()); - Thread.sleep(10000); - assertTrue(true); - } - /** - * An invokable piece of work. - */ - static interface Invocation extends java.io.Serializable { - public void invoke(Cluster cluster, ObjectMessage om); - } - /** - * Listen for messages, if they contain Invocations, invoke() them. - */ - class InvocationListener implements MessageListener { - protected Cluster _cluster; - - public InvocationListener(Cluster cluster) { - _cluster = cluster; - } - - public void onMessage(Message message) { - _log.info("message received: " + message); - ObjectMessage om = null; - Object tmp = null; - Invocation invocation = null; - try { - if (message instanceof ObjectMessage && (om = (ObjectMessage) message) != null - && (tmp = om.getObject()) != null && tmp instanceof Invocation - && (invocation = (Invocation) tmp) != null) { - _log.info("invoking message on: " + _cluster.getLocalNode()); - invocation.invoke(_cluster, om); - _log.info("message successfully invoked on: " + _cluster.getLocalNode()); - } - else { - _log.warn("bad message: " + message); - } - } - catch (JMSException e) { - _log.warn("unexpected problem", e); - } - } - } - /** - * A request for a piece of work which involves sending a response back to the original requester. - */ - static class Request implements Invocation { - public void invoke(Cluster cluster, ObjectMessage om2) { - try { - System.out.println("request received"); - ObjectMessage om = cluster.createObjectMessage(); - om.setJMSReplyTo(cluster.getLocalNode().getDestination()); - om.setObject(new Response()); - System.out.println("sending response"); - cluster.send(om2.getJMSReplyTo(), om); - System.out.println("request processed"); - } - catch (JMSException e) { - System.err.println("problem sending response"); - e.printStackTrace(); - } - } - } - static boolean testResponsePassed = false; - /** - * A response containing a piece of work. - */ - static class Response implements Invocation { - public void invoke(Cluster cluster, ObjectMessage om) { - try { - System.out.println("response arrived from: " + om.getJMSReplyTo()); - // set a flag to test later - ClusterFunctionTest.testResponsePassed = true; - System.out.println("response processed on: " + cluster.getLocalNode().getDestination()); - } - catch (JMSException e) { - System.err.println("problem processing response"); - } - } - } - - public void testResponse() throws Exception { - MessageListener listener0 = new InvocationListener(_cluster0); - MessageListener listener1 = new InvocationListener(_cluster1); - // 1->(n-1) messages (excludes self) - _cluster0.createConsumer(_cluster0.getDestination(), null, true).setMessageListener(listener0); - // 1->1 messages - _cluster0.createConsumer(_cluster0.getLocalNode().getDestination()).setMessageListener(listener0); - // 1->(n-1) messages (excludes self) - _cluster1.createConsumer(_cluster1.getDestination(), null, true).setMessageListener(listener1); - // 1->1 messages - _cluster1.createConsumer(_cluster1.getLocalNode().getDestination()).setMessageListener(listener1); - ObjectMessage om = _cluster0.createObjectMessage(); - om.setJMSReplyTo(_cluster0.getLocalNode().getDestination()); - om.setObject(new Request()); - testResponsePassed = false; - _cluster0.send(_cluster0.getLocalNode().getDestination(), om); - Thread.sleep(3000); - assertTrue(testResponsePassed); - _log.info("request/response between same node OK"); - testResponsePassed = false; - _cluster0.send(_cluster1.getLocalNode().getDestination(), om); - Thread.sleep(3000); - assertTrue(testResponsePassed); - _log.info("request/response between two different nodes OK"); - } -} diff --git a/activecluster/src/test/org/apache/activecluster/ClusterTest.java b/activecluster/src/test/org/apache/activecluster/ClusterTest.java deleted file mode 100644 index a2ece198be..0000000000 --- a/activecluster/src/test/org/apache/activecluster/ClusterTest.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ -package org.apache.activecluster; - -import java.util.List; -import java.util.Map; -import javax.jms.Destination; -import javax.jms.Message; - -import org.apache.activecluster.Cluster; -import org.apache.activecluster.LocalNode; -import org.apache.activecluster.Node; - -/** - * @version $Revision: 1.4 $ - */ -public class ClusterTest extends ClusterTestSupport { - - protected int count = 3; - - public void testCluster() throws Exception { - cluster = createCluster(); - - subscribeToCluster(); - - cluster.start(); - - Destination destination = cluster.getDestination(); - Message message = cluster.createTextMessage("abcdef"); - cluster.send(destination, message); - - //clusterListener.waitForMessageToArrive(); - Thread.sleep(5000); - - List list = clusterListener.flushMessages(); - assertEquals("Should have received a message: " + list, 1, list.size()); - - System.out.println("Received message: " + list.get(0)); - } - - - public void testMembershipCluster() throws Exception { - Cluster[] clusters = new Cluster[count]; - for (int i = 0; i < count; i++) { - Cluster cluster = createCluster("node:" + i); - clusters[i] = cluster; - if (i==0){ - cluster.addClusterListener(new TestingClusterListener(cluster)); - } - cluster.start(); - System.out.println("started " + clusters[i].getLocalNode().getName()); - - } - - System.out.println("waiting to complete ..."); - for (int i = count - 1; i >= 0; i--) { - Cluster cluster = clusters[i]; - String localName = cluster.getLocalNode().getName(); - boolean completed = cluster.waitForClusterToComplete(count - 1, 5000); - assertTrue("Node: " + i + " with contents: " + dumpConnectedNodes(cluster.getNodes()), completed); - - System.out.println(localName + " completed = " + completed + " nodes = " - + dumpConnectedNodes(cluster.getNodes())); - } - - assertClusterMembership(clusters); - - - assertClusterMembership(clusters); - - Cluster testCluster = clusters[0]; - LocalNode testNode = testCluster.getLocalNode(); - String key = "key"; - String value = "value"; - - Map map = testNode.getState(); - map.put(key, value); - testNode.setState(map); - - - Thread.sleep(500); - for (int i = 1; i < count; i++) { - Node node = (Node) clusters[i].getNodes().get(testNode.getDestination()); - - assertTrue("The current test node should be in the cluster: " + i, node != null); - assertTrue(node.getState().get(key).equals(value)); - } - - for (int i = 0; i < count; i++) { - System.out.println(clusters[i].getLocalNode().getName() + " Is coordinator = " + clusters[i].getLocalNode().isCoordinator()); - clusters[i].stop(); - Thread.sleep(250); - - } - } - - protected void assertClusterMembership(Cluster[] clusters) { - for (int i = 0; i < count; i++) { - System.out.println("Cluster: " + i + " = " + clusters[i].getNodes()); - - assertEquals("Size of clusters for cluster: " + i, count - 1, clusters[i].getNodes().size()); - System.out.println(clusters[i].getLocalNode().getName() + " Is coordinator = " + clusters[i].getLocalNode().isCoordinator()); - } - } - -} diff --git a/activecluster/src/test/org/apache/activecluster/ClusterTestSupport.java b/activecluster/src/test/org/apache/activecluster/ClusterTestSupport.java deleted file mode 100644 index f3b12b6c6f..0000000000 --- a/activecluster/src/test/org/apache/activecluster/ClusterTestSupport.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ -package org.apache.activecluster; - -import org.apache.activecluster.Cluster; -import org.apache.activecluster.Node; -import org.apache.activecluster.impl.ActiveMQClusterFactory; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Topic; - -/** - * @version $Revision: 1.3 $ - */ -public abstract class ClusterTestSupport extends TestSupport { - - protected Cluster cluster; - protected StubMessageListener clusterListener = new StubMessageListener(); - protected StubMessageListener inboxListener = new StubMessageListener(); - private MessageConsumer clusterConsumer; - private MessageConsumer inboxConsumer; - - - protected void sendMessageToNode(Node node, String text) throws Exception { - Message message = cluster.createTextMessage(text); - cluster.send(node.getDestination(), message); - } - - protected void sendMessageToCluster(String text) throws Exception { - Message message = cluster.createTextMessage(text); - cluster.send(cluster.getDestination(), message); - } - - protected void subscribeToCluster() throws Exception { - - // listen to cluster messages - Destination clusterDestination = cluster.getDestination(); - assertTrue("Local destination must not be null", clusterDestination != null); - clusterConsumer = cluster.createConsumer(clusterDestination); - clusterConsumer.setMessageListener(clusterListener); - - // listen to inbox messages (individual messages) - Destination localDestination = cluster.getLocalNode().getDestination(); - assertTrue("Local destination must not be null", localDestination != null); - - System.out.println("Consuming from local destination: " + localDestination); - inboxConsumer = cluster.createConsumer(localDestination); - inboxConsumer.setMessageListener(inboxListener); - } - - protected void setUp() throws Exception { - } - - protected void tearDown() throws Exception { - if (cluster != null) { - cluster.stop(); - } - } -} diff --git a/activecluster/src/test/org/apache/activecluster/StubMessageListener.java b/activecluster/src/test/org/apache/activecluster/StubMessageListener.java deleted file mode 100644 index fa03f7b835..0000000000 --- a/activecluster/src/test/org/apache/activecluster/StubMessageListener.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster; - -import javax.jms.Message; -import javax.jms.MessageListener; -import java.util.ArrayList; -import java.util.List; - -/** - * A mock message listener for testing - * - * @version $Revision: 1.2 $ - */ -public class StubMessageListener implements MessageListener { - private List messages = new ArrayList(); - private Object semaphore; - - public StubMessageListener() { - this(new Object()); - } - - public StubMessageListener(Object semaphore) { - this.semaphore = semaphore; - } - - /** - * @return all the messages on the list so far, clearing the buffer - */ - public synchronized List flushMessages() { - List answer = new ArrayList(messages); - messages.clear(); - return answer; - } - - public synchronized void onMessage(Message message) { - messages.add(message); - synchronized (semaphore) { - semaphore.notifyAll(); - } - } - - public void waitForMessageToArrive() { - System.out.println("Waiting for message to arrive"); - - long start = System.currentTimeMillis(); - - try { - if (messages.isEmpty()) { - synchronized (semaphore) { - semaphore.wait(4000); - } - } - } - catch (InterruptedException e) { - System.out.println("Caught: " + e); - } - long end = System.currentTimeMillis() - start; - - System.out.println("End of wait for " + end + " millis"); - } -} diff --git a/activecluster/src/test/org/apache/activecluster/TestSupport.java b/activecluster/src/test/org/apache/activecluster/TestSupport.java deleted file mode 100644 index c364c81425..0000000000 --- a/activecluster/src/test/org/apache/activecluster/TestSupport.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ -package org.apache.activecluster; - -import junit.framework.TestCase; - -import javax.jms.JMSException; -import java.util.Iterator; -import java.util.Map; - -import org.apache.activecluster.Cluster; -import org.apache.activecluster.ClusterException; -import org.apache.activecluster.ClusterFactory; -import org.apache.activecluster.Node; -import org.apache.activecluster.impl.ActiveMQClusterFactory; - -/** - * @version $Revision: 1.2 $ - */ -public class TestSupport extends TestCase { - protected String dumpConnectedNodes(Map nodes) { - String result = ""; - for (Iterator i = nodes.values().iterator(); i.hasNext();) { - - Object value = i.next(); - if (value instanceof Node) { - Node node = (Node) value; - result += node.getName() + ","; - } - else { - System.out.println("Got node of type: " + value.getClass()); - result += value + ","; - } - } - return result; - } - - protected Cluster createCluster() throws JMSException, ClusterException { - ClusterFactory factory = new ActiveMQClusterFactory(); - return factory.createCluster("ORG.CODEHAUS.ACTIVEMQ.TEST.CLUSTER"); - } - - protected Cluster createCluster(String name) throws JMSException, ClusterException { - ClusterFactory factory = new ActiveMQClusterFactory(); - return factory.createCluster(name,"ORG.CODEHAUS.ACTIVEMQ.TEST.CLUSTER"); - } -} diff --git a/activecluster/src/test/org/apache/activecluster/TestingClusterListener.java b/activecluster/src/test/org/apache/activecluster/TestingClusterListener.java deleted file mode 100644 index 1d41c088a3..0000000000 --- a/activecluster/src/test/org/apache/activecluster/TestingClusterListener.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster; - -import org.apache.activecluster.Cluster; -import org.apache.activecluster.ClusterEvent; -import org.apache.activecluster.ClusterListener; -import org.apache.activecluster.impl.DefaultCluster; - -/** - * @version $Revision: 1.2 $ - */ -public class TestingClusterListener implements ClusterListener { - private Cluster cluster; - - public TestingClusterListener(Cluster cluster){ - this.cluster = cluster; - } - public void onNodeAdd(ClusterEvent event) { - printEvent("ADDED: ", event); - } - - public void onNodeUpdate(ClusterEvent event) { - printEvent("UPDATED: ", event); - } - - public void onNodeRemoved(ClusterEvent event) { - printEvent("REMOVED: ", event); - } - - public void onNodeFailed(ClusterEvent event) { - printEvent("FAILED: ", event); - } - - public void onCoordinatorChanged(ClusterEvent event) { - printEvent("COORDINATOR: ", event); - } - - protected void printEvent(String text, ClusterEvent event) { - System.out.println(text + event.getNode()); - System.out.println("Current cluster is now: " + cluster.getNodes().keySet()); - } - } diff --git a/activecluster/src/test/org/apache/activecluster/group/BuddyGroupModelTest.java b/activecluster/src/test/org/apache/activecluster/group/BuddyGroupModelTest.java deleted file mode 100644 index a0df440614..0000000000 --- a/activecluster/src/test/org/apache/activecluster/group/BuddyGroupModelTest.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster.group; - -import java.util.List; -import javax.jms.JMSException; - -import org.apache.activecluster.group.BuddyGroupModel; -import org.apache.activecluster.group.Group; -import org.apache.activecluster.group.GroupModel; - -/** - * @version $Revision: 1.4 $ - */ -public class BuddyGroupModelTest extends GroupTestSupport { - - public void testGroups() throws Exception { - addNode("a"); - - // lets check how many groups have been created - List groups = model.getGroups(); - assertEquals("number of groups: " + groups, 1, model.getGroups().size()); - - Group group = (Group) model.getGroups().get(0); - assertIncomplete(group); - - addNode("b"); - assertEquals("number of groups: " + groups, 2, model.getGroups().size()); - - // lets see if the first node is now complete - assertUsable(group); - - group = (Group) model.getGroups().get(1); - assertUsable(group); - - - addNode("c"); - assertEquals("number of groups: " + groups, 3, model.getGroups().size()); - group = (Group) model.getGroups().get(2); - assertUsable(group); - - - addNode("d"); - assertEquals("number of groups: " + groups, 4, model.getGroups().size()); - group = (Group) model.getGroups().get(3); - assertUsable(group); - - } - - public void testRemoveGroups() throws JMSException { - String[] nodeNames = {"a", "b", "c"}; - addNodes(nodeNames); - - // TODO now lets remove the nodes and check group states.. - } - - protected GroupModel createGroupModel() { - return new BuddyGroupModel(); - } -} diff --git a/activecluster/src/test/org/apache/activecluster/group/GroupModelTest.java b/activecluster/src/test/org/apache/activecluster/group/GroupModelTest.java deleted file mode 100644 index 13f9474992..0000000000 --- a/activecluster/src/test/org/apache/activecluster/group/GroupModelTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activecluster.group; - -import java.util.List; -import javax.jms.JMSException; - -import org.apache.activecluster.group.Group; - -/** - * @version $Revision: 1.4 $ - */ -public class GroupModelTest extends GroupTestSupport { - - public void testGroups() throws Exception { - addNode("a"); - - // lets check how many groups have been created - List groups = model.getGroups(); - assertEquals("number of groups: " + groups, 1, model.getGroups().size()); - - Group group = (Group) model.getGroups().get(0); - assertIncomplete(group); - - addNode("b"); - assertNotFullButUsable(group); - assertEquals("number of groups: " + groups, 1, model.getGroups().size()); - - addNode("c"); - assertFull(group); - assertEquals("number of groups: " + groups, 1, model.getGroups().size()); - - - addNode("d"); - assertEquals("number of groups: " + groups, 2, model.getGroups().size()); - group = (Group) model.getGroups().get(1); - assertIncomplete(group); - } - - public void testRemoveGroups() throws JMSException { - String[] nodeNames = {"a", "b", "c"}; - addNodes(nodeNames); - - // TODO now lets remove the nodes and check group states.. - } - -} diff --git a/activecluster/src/test/org/apache/activecluster/group/GroupTestSupport.java b/activecluster/src/test/org/apache/activecluster/group/GroupTestSupport.java deleted file mode 100644 index 45db862023..0000000000 --- a/activecluster/src/test/org/apache/activecluster/group/GroupTestSupport.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ -package org.apache.activecluster.group; - -import java.util.HashMap; -import java.util.Map; -import javax.jms.JMSException; -import junit.framework.TestCase; -import org.apache.activecluster.Cluster; -import org.apache.activecluster.ClusterEvent; -import org.apache.activecluster.ClusterListener; -import org.apache.activecluster.DestinationMarshaller; -import org.apache.activecluster.Node; -import org.apache.activecluster.impl.NodeImpl; -import org.apache.activecluster.impl.SimpleDestinationMarshaller; - -/** - * A base class for Group model testing - * - * @version $Revision: 1.5 $ - */ -public abstract class GroupTestSupport extends TestCase { - - protected GroupModel model; - private ClusterListener listener; - private Cluster cluster; - private Map nodes = new HashMap(); - private DestinationMarshaller marshaller = new SimpleDestinationMarshaller(); - - protected void addNodes(String[] nodeNames) throws JMSException { - for (int i = 0; i < nodeNames.length; i++) { - String nodeName = nodeNames[i]; - addNode(nodeName); - } - } - - protected void addNode(String nodeName) throws JMSException { - - Node node = new NodeImpl(nodeName,marshaller.getDestination(nodeName)); - nodes.put(nodeName, node); - listener.onNodeAdd(new ClusterEvent(cluster, node, ClusterEvent.ADD_NODE)); - } - - protected void assertFull(Group group) { - assertTrue("Group is not full and usable. Members: " + group.getMembers(), group.isFull() && group.isUsable()); - } - - protected void assertNotFullButUsable(Group group) { - assertTrue("Group is not not full but usable. Members: " + group.getMembers(), !group.isFull() && group.isUsable()); - } - - protected void assertIncomplete(Group group) { - assertTrue("Group is not not full or usable. Members: " + group.getMembers(), !group.isFull() && !group.isUsable()); - } - - protected void assertUsable(Group group) { - assertTrue("Group is not usable. Members: " + group.getMembers(), group.isUsable()); - } - - protected void setUp() throws Exception { - model = createGroupModel(); - listener = new GroupClusterListener(model); - } - - protected GroupModel createGroupModel() { - return new GroupModel(); - } -}