git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/branches/activemq-4.0@467022 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-10-23 16:49:38 +00:00
parent 6429b1d01b
commit a717c86c2f
51 changed files with 0 additions and 4787 deletions

View File

@ -1,23 +0,0 @@
Welcome to ActiveCluster!
=========================
ActiveCluster is an Apache 2.0 licenced cluster communication toolkit.
To help you get started, try surfing the following links...
Building
http://activecluster.codehaus.org/Building
Examples
http://activecluster.codehaus.org/Examples
Refer to the website for details of finding the issue tracker, email lists, wiki or IRC channel
http://activecluster.codehaus.org/
Please help us make ActiveCluster better - we appreciate any feedback you may have.
Enjoy!
----------------------
The ActiveCluster team

View File

@ -1,80 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project default="default" xmlns:j="jelly:core">
<goal name="default" prereqs="jar:install"/>
<goal name="dc">
<attain>
<attainGoal name="clean"/>
<attainGoal name="site:deploy"/>
<j:set var="maven.test.skip" value="true"/>
<attainGoal name="jar:deploy"/>
<attainGoal name="dist:deploy"/>
</attain>
</goal>
<preGoal name="site:deploy">
<attainGoal name="javadoc"/>
<attainGoal name="java:compile"/>
<attainGoal name="clover"/>
</preGoal>
<goal name="setclasspath" prereqs="java:compile, test:compile">
<path id="test.classpath">
<pathelement path="${maven.build.dest}"/>
<pathelement path="target/classes"/>
<pathelement path="target/test-classes"/>
<path refid="maven.dependency.classpath"/>
</path>
</goal>
<!-- Sample programs -->
<goal name="demo:A" prereqs="setclasspath"
description="Runs the simple ActiveCluster Demo">
<echo>Running the ActiveCluster demo...</echo>
<java classname="org.apache.activecluster.ClusterDemo" fork="true">
<classpath refid="test.classpath"/>
<sysproperty key="org.apache.commons.logging.simplelog.defaultlog" value="debug"/>
<arg value="A"/>
</java>
</goal>
<goal name="demo:B" prereqs="setclasspath"
description="Runs the simple ActiveCluster Demo">
<echo>Running the ActiveCluster demo...</echo>
<java classname="org.apache.activecluster.ClusterDemo" fork="true">
<classpath refid="test.classpath"/>
<arg value="B"/>
</java>
</goal>
<!--
Uses keyboard input so can't be run in Maven
<goal name="chat" prereqs="setclasspath"
description="Runs the ActiveCluster Chat Demo">
<echo>Running the ActiveCluster Chat demo...</echo>
<java classname="org.apache.activecluster.ChatDemo" fork="true">
<classpath refid="test.classpath"/>
</java>
</goal>
-->
</project>

View File

@ -1,66 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>incubator-activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>4.0.2-SNAPSHOT</version>
</parent>
<artifactId>activecluster</artifactId>
<name>ActiveCluster</name>
<dependencies>
<dependency>
<groupId>${pom.groupId}</groupId>
<artifactId>activemq-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/java</sourceDirectory>
<testSourceDirectory>src/test</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude implementation="java.lang.String">**/Testing*.java</exclude>
<exclude implementation="java.lang.String">**/TestSupport.java</exclude>
<exclude implementation="java.lang.String">**/ClusterTest.java</exclude>
<exclude implementation="java.lang.String">**/ClusterFunctionTest.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -1,57 +0,0 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
maven.repo.remote=http://www.ibiblio.org/maven,http://dist.codehaus.org,http://people.apache.org/repository
maven.compile.source=1.4
maven.compile.target=1.4
maven.test.source=1.4
maven.compile.deprecation=true
maven.compile.debug=true
maven.compile.optimize=true
maven.javadoc.links=http://java.sun.com/j2se/1.4.1/docs/api/,http://java.sun.com/j2ee/1.4/docs/api/
maven.javadoc.source=1.4
maven.junit.fork = true
# use Sun coding standards
checkstyle.lcurly.type = eol
checkstyle.lcurly.method = eol
checkstyle.lcurly.other = eol
checkstyle.header.ignore.line = 1,2,3,4,5,6
checkstyle.const.pattern = ^[a-z][a-zA-Z0-9]*$
# disable these non-critical errors to highlight
# more important ones line missing javadoc
checkstyle.maxlinelen = 132
checkstyle.ignore.whitespace = true
#maven.checkstyle.ignore.public.in.interface = true
checkstyle.paren.pad = ignore
#####################################################
# codehaus theme
#####################################################
maven.xdoc.theme.url=http://codehaus.org/codehaus-style.css
maven.repo.central = dist.codehaus.org
maven.repo.central.directory = /dist
maven.remote.group = activecluster

View File

@ -1,165 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project>
<pomVersion>3</pomVersion>
<extend>${basedir}/../etc/project.xml</extend>
<name>ActiveCluster</name>
<artifactId>activecluster</artifactId>
<package>org.apache.activecluster</package>
<packageGroups>
<packageGroup>
<title>Core ActiveCluster API</title>
<packages>org.apache.activecluster</packages>
</packageGroup>
<packageGroup>
<title>Group organisation protocols</title>
<packages>org.apache.activecluster.group</packages>
</packageGroup>
<packageGroup>
<title>Election protocols</title>
<packages>org.apache.activecluster.election</packages>
</packageGroup>
<packageGroup>
<title>ActiveMQ specific implementation classes</title>
<packages>org.apache.activecluster.activemq</packages>
</packageGroup>
<packageGroup>
<title>Implementation classes</title>
<packages>org.apache.activecluster.impl:org.apache.activecluster.election.impl</packages>
</packageGroup>
</packageGroups>
<shortDescription>ActiveCluster is a framework for building cluster-aware software</shortDescription>
<gumpRepositoryId>activecluster</gumpRepositoryId>
<description>
ActiveCluster is a framework for building cluster-aware software
</description>
<url>http://activecluster.codehaus.org/</url>
<issueTrackingUrl>http://jira.codehaus.org/browse/ACL</issueTrackingUrl>
<dependencies>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.0.4</version>
<url>http://jakarta.apache.org/commons/logging/</url>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jta_1.0.1B_spec</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-j2ee-management_1.0_spec</artifactId>
<version>1.0</version>
</dependency>
<!-- default JMS provider used for implementation -->
<dependency>
<groupId>${pom.groupId}</groupId>
<artifactId>activemq-core</artifactId>
<version>${pom.currentVersion}</version>
</dependency>
<dependency>
<groupId>${pom.groupId}</groupId>
<artifactId>activeio-core</artifactId>
<version>${activeio_version}</version>
</dependency>
<!-- for unit test cases -->
<dependency>
<id>junit</id>
<version>3.8.1</version>
</dependency>
<!--concurrency -->
<dependency>
<id>backport-util-concurrent</id>
<version>2.0_01_pd</version>
</dependency>
<!-- for multi-threaded unit test cases -->
<dependency>
<id>sysunit</id>
<version>1.0-beta-13</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>${derby_version}</version>
</dependency>
</dependencies>
<build>
<nagEmailAddress>scm@activecluster.codehaus.org</nagEmailAddress>
<sourceDirectory>src/java</sourceDirectory>
<unitTestSourceDirectory>src/test</unitTestSourceDirectory>
<integrationUnitTestSourceDirectory/>
<aspectSourceDirectory/>
<unitTest>
<resources>
<resource>
<directory>src/java</directory>
<includes>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<includes>
<include>**/*Test.*</include>
</includes>
<excludes>
<!-- disable failing tests on some platforms -->
<exclude>**/ClusterTest.*</exclude>
<exclude>**/ClusterFunctionTest.*</exclude>
</excludes>
</unitTest>
<resources>
<!--
<resource>
<directory>src/jar</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
</resource>
-->
</resources>
</build>
</project>

View File

@ -1,228 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activecluster;
import java.io.Serializable;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activecluster.election.ElectionStrategy;
/**
* Represents a logical connection to a cluster. From this object you can
* obtain the destination to send messages to, view the members of the cluster,
* watch cluster events (nodes joining, leaving, updating their state) as well
* as viewing each members state.
* <p/>
* You may also update the local node's state.
*
* @version $Revision: 1.5 $
*/
public interface Cluster extends Service {
/**
* Returns the destination used to send a message to all members of the cluster
*
* @return the destination to send messages to all members of the cluster
*/
public Destination getDestination();
/**
* A snapshot of the nodes in the cluster indexed by the Destination
* @return a Map containing all the nodes in the cluster, where key=node destination,value=node
*/
public Map getNodes();
/**
* Adds a new listener to cluster events
*
* @param listener
*/
public void addClusterListener(ClusterListener listener);
/**
* Removes a listener to cluster events
*
* @param listener
*/
public void removeClusterListener(ClusterListener listener);
/**
* The local Node which allows you to mutate the state or subscribe to the
* nodes temporary queue for inbound messages direct to the Node
* @return the Node representing this peer in the cluster
*/
public LocalNode getLocalNode();
/**
* Allows overriding of the default election strategy with a custom
* implementation.
* @param strategy
*/
public void setElectionStrategy(ElectionStrategy strategy);
// Messaging helper methods
//-------------------------------------------------------------------------
/**
* Sends a message to a destination, which could be to the entire group
* or could be a single Node's destination
*
* @param destination is either the group topic or a node's destination
* @param message the message to be sent
* @throws JMSException
*/
public void send(Destination destination, Message message) throws JMSException;
/**
* Creates a consumer of all the messags sent to the given destination,
* including messages sent via the send() messages
*
* @param destination
* @return a newly created message consumer
* @throws JMSException
*/
public MessageConsumer createConsumer(Destination destination) throws JMSException;
/**
* Creates a consumer of all message sent to the given destination,
* including messages sent via the send() message with an optional SQL 92 based selector to filter
* messages
*
* @param destination
* @param selector
* @return a newly created message consumer
* @throws JMSException
*/
public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException;
/**
* Creates a consumer of all message sent to the given destination,
* including messages sent via the send() message with an optional SQL 92 based selector to filter
* messages along with optionally ignoring local traffic - messages sent via the send()
* method on this object.
*
* @param destination the destination to consume from
* @param selector an optional SQL 92 filter of messages which could be null
* @param noLocal which if true messages sent via send() on this object will not be delivered to the consumer
* @return a newly created message consumer
* @throws JMSException
*/
public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException;
// Message factory methods
//-------------------------------------------------------------------------
/**
* Creates a new message without a body
* @return the create Message
*
* @throws JMSException
*/
public Message createMessage() throws JMSException;
/**
* Creates a new bytes message
* @return the create BytesMessage
*
* @throws JMSException
*/
public BytesMessage createBytesMessage() throws JMSException;
/**
* Creates a new {@link MapMessage}
* @return the created MapMessage
*
* @throws JMSException
*/
public MapMessage createMapMessage() throws JMSException;
/**
* Creates a new {@link ObjectMessage}
* @return the created ObjectMessage
*
* @throws JMSException
*/
public ObjectMessage createObjectMessage() throws JMSException;
/**
* Creates a new {@link ObjectMessage}
*
* @param object
* @return the createdObjectMessage
* @throws JMSException
*/
public ObjectMessage createObjectMessage(Serializable object) throws JMSException;
/**
* Creates a new {@link StreamMessage}
* @return the create StreamMessage
*
* @throws JMSException
*/
public StreamMessage createStreamMessage() throws JMSException;
/**
* Creates a new {@link TextMessage}
* @return the create TextMessage
*
* @throws JMSException
*/
public TextMessage createTextMessage() throws JMSException;
/**
* Creates a new {@link TextMessage}
*
* @param text
* @return the create TextMessage
* @throws JMSException
*/
public TextMessage createTextMessage(String text) throws JMSException;
/**
* Create a named Destination
* @param name
* @return the Destinatiion
* @throws JMSException
*/
public Destination createDestination(String name) throws JMSException;
/**
* wait until a the cardimality of the cluster is reaches the expected count. This method will return false if the
* cluster isn't started or stopped while waiting
*
* @param expectedCount the number of expected members of a cluster
* @param timeout timeout in milliseconds
* @return true if the cluster is fully connected
* @throws InterruptedException
*/
boolean waitForClusterToComplete(int expectedCount, long timeout) throws InterruptedException;
}

View File

@ -1,150 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
/**
* A cluster event
*
* @version $Revision: 1.3 $
*/
public class ClusterEvent implements Externalizable {
private static final long serialVersionUID=-4103732679231950873L;
/**
* A node has joined the cluster
*/
public static final int ADD_NODE = 1;
/**
* existing node has updated it's state
*/
public static final int UPDATE_NODE = 2;
/**
* A node has left the Cluster
*/
public static final int REMOVE_NODE = 3;
/**
* A node has failed due to a system/network error
*/
public static final int FAILED_NODE = 4;
/**
* this node has been elected Coordinator
*/
public static final int ELECTED_COORDINATOR = 5;
private transient Cluster cluster;
private Node node;
private int type;
/**
* empty constructor
*/
public ClusterEvent() {
}
/**
* @param source
* @param node
* @param type
*/
public ClusterEvent(Cluster source, Node node, int type) {
this.cluster = source;
this.node = node;
this.type = type;
}
/**
* @return the Cluster
*/
public Cluster getCluster() {
return cluster;
}
/**
* set the cluster
* @param source
*/
public void setCluster(Cluster source){
this.cluster = source;
}
/**
* @return the node
*/
public Node getNode() {
return node;
}
/**
* @return the type of event
*/
public int getType() {
return type;
}
/**
* @return pretty type
*/
public String toString() {
return "ClusterEvent[" + getTypeAsString() + " : " + node + "]";
}
/**
* dump on to a stream
*
* @param out
* @throws IOException
*/
public void writeExternal(ObjectOutput out) throws IOException {
out.writeByte(type);
out.writeObject(node);
}
/**
* read from stream
*
* @param in
* @throws IOException
* @throws ClassNotFoundException
*/
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
type = in.readByte();
node = (Node) in.readObject();
}
private String getTypeAsString() {
String result = "unknown type";
if (type == ADD_NODE) {
result = "ADD_NODE";
}
else if (type == REMOVE_NODE) {
result = "REMOVE_NODE";
}
else if (type == UPDATE_NODE) {
result = "UPDATE_NODE";
}
else if (type == FAILED_NODE) {
result = "FAILED_NODE";
}
return result;
}
}

View File

@ -1,28 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster;
/**
* Represents a Cluster related exception
*
* @version $Revision: 1.2 $
*/
public class ClusterException extends Exception {
}

View File

@ -1,94 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activecluster;
import javax.jms.Destination;
import javax.jms.JMSException;
/**
* A Factory of Cluster instances
*
* @version $Revision: 1.3 $
*/
public interface ClusterFactory {
/**
* Creates a new cluster connection using the given local name and destination name
* @param localName
* @param destinationName
*
* @return Cluster
* @throws JMSException
*/
public Cluster createCluster(String localName,String destinationName) throws JMSException;
/**
* Creates a new cluster connection using the given local name and destination name
* @param localName
* @param destinationName
* @param marshaller
*
* @return Cluster
* @throws JMSException
*/
public Cluster createCluster(String localName,String destinationName,DestinationMarshaller marshaller) throws JMSException;
/**
* Creates a new cluster connection - generating the localName automatically
* @param destinationName
* @return the Cluster
* @throws JMSException
*/
public Cluster createCluster(String destinationName) throws JMSException;
/**
* Creates a new cluster connection using the given local name and destination name
* @param localName
* @param destination
*
* @return Cluster
* @throws JMSException
*/
public Cluster createCluster(String localName,Destination destination) throws JMSException;
/**
* Creates a new cluster connection using the given local name and destination name
* @param localName
* @param destination
* @param marshaller
*
* @return Cluster
* @throws JMSException
*/
public Cluster createCluster(String localName,Destination destination, DestinationMarshaller marshaller) throws JMSException;
/**
* Creates a new cluster connection - generating the localName automatically
* @param destination
* @return the Cluster
* @throws JMSException
*/
public Cluster createCluster(Destination destination) throws JMSException;
}

View File

@ -1,64 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster;
import java.util.EventListener;
/**
* Listener to events occuring on the cluster
*
* @version $Revision: 1.2 $
*/
public interface ClusterListener extends EventListener {
/**
* A new node has been added
*
* @param event
*/
public void onNodeAdd(ClusterEvent event);
/**
* A node has updated its state
*
* @param event
*/
public void onNodeUpdate(ClusterEvent event);
/**
* A node has been removed (a clean shutdown)
*
* @param event
*/
public void onNodeRemoved(ClusterEvent event);
/**
* A node has failed due to process or network failure
*
* @param event
*/
public void onNodeFailed(ClusterEvent event);
/**
* An election has occurred and a new coordinator has been selected
* @param event
*/
public void onCoordinatorChanged(ClusterEvent event);
}

View File

@ -1,46 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activecluster;
import javax.jms.Destination;
import javax.jms.JMSException;
/**
* A simple marshaller for Destinations
*
* @version $Revision: 1.5 $
*/
public interface DestinationMarshaller {
/**
* Builds a destination from a destinationName
* @param destinationName
*
* @return the destination to send messages to all members of the cluster
*/
public Destination getDestination(String destinationName) throws JMSException;
/**
* Gets a destination's physical name
* @param destination
* @return the destination's physical name
*/
public String getDestinationName(Destination destination);
}

View File

@ -1,38 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster;
import javax.jms.JMSException;
import java.util.Map;
/**
* Represents the local (in process) node
*
* @version $Revision: 1.2 $
*/
public interface LocalNode extends Node {
/**
* Allows the local state to be modified, which will
* be replicated asynchronously around the cluster
* @param state
* @throws JMSException
*/
public void setState(Map state) throws JMSException;
}

View File

@ -1,59 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activecluster;
import java.util.Map;
import javax.jms.Destination;
/**
* Represents a node member in a cluster
*
* @version $Revision: 1.3 $
*/
public interface Node {
/**
* Access to the queue to send messages direct to this node.
*
* @return the destination to send messages to this node while its available
*/
public Destination getDestination();
/**
* @return an immutable map of the nodes state
*/
public Map getState();
/**
* @return the name of the node
*/
public String getName();
/**
* @return true if this node has been elected as coordinator
*/
public boolean isCoordinator();
/**
* Returns the Zone of this node - typically the DMZ zone or the subnet on which the
* node is on
*/
public Object getZone();
}

View File

@ -1,42 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster;
import javax.jms.JMSException;
/**
* <p><code>Service</code> represents some service of some kind with a simple start/stop lifecycle.</p>
*
* @version $Revision: 1.2 $
*/
public interface Service {
/**
* Called to start the service
* @throws JMSException
*/
public void start() throws JMSException;
/**
* Called to shutdown the service
* @throws JMSException
*/
public void stop() throws JMSException;
}

View File

@ -1,40 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster.election;
import javax.jms.JMSException;
import org.apache.activecluster.Cluster;
import org.apache.activecluster.Node;
/**
* <p><code>Service</code> Used by the Cluster to elect a coordinator.</p>
*
* @version $Revision: 1.2 $
*/
public interface ElectionStrategy {
/**
* Elect a coordinator.
* @param cluster
* @return the elected Node
* @throws JMSException
*/
public Node doElection(Cluster cluster) throws JMSException;
}

View File

@ -1,58 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster.election.impl;
import org.apache.activecluster.Cluster;
import org.apache.activecluster.Node;
import org.apache.activecluster.election.ElectionStrategy;
import javax.jms.JMSException;
import java.util.Iterator;
import java.util.Map;
/**
* <p><code>BullyElectionStrategy</code> Use a simple bully algorithm to elect a coordinator.
* the member with the lowest lexicographical name is choosen</p>
*
* @version $Revision: 1.2 $
*/
public class BullyElectionStrategy implements ElectionStrategy {
/**
* Elect a coordinator.
*
* @param cluster
* @return the elected Node
* @throws JMSException
*/
public Node doElection(Cluster cluster) throws JMSException {
Node elect = cluster.getLocalNode();
Map nodes = cluster.getNodes();
for (Iterator i = nodes.values().iterator(); i.hasNext();) {
Node node = (Node) i.next();
if (elect.getName().compareTo(node.getName()) < 0) {
elect = node;
}
}
return elect;
}
}

View File

@ -1,46 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster.group;
import org.apache.activecluster.Node;
/**
* A kind of {@link GroupModel} in which every {@link Node} has its
* own {@link Group} and other nodes in the cluster act as buddies (slaves)
*
* @version $Revision: 1.2 $
*/
public class BuddyGroupModel extends GroupModel {
public synchronized void addNode(Node node) {
Group group = makeNewGroup(node);
if (group == null) {
if (!addToExistingGroup(node)) {
addToUnusedNodes(node);
}
}
else {
// now lets try choose some existing nodes to add as buddy's
tryToFillGroupWithBuddies(group);
// now that the group may well be filled, add it to the collections
addGroup(group);
}
}
}

View File

@ -1,114 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster.group;
import org.apache.activecluster.Node;
import java.util.ArrayList;
import java.util.List;
/**
* Represents a logical group of nodes in a cluster,
* such as a Master and a number of Slaves which operate as a
* logical unit.
* <p/>
* A cluster can be divided into a single group, or many groups
* depending on the policy required.
* <p/>
* The number of groups could be application defined; created on demand
* or there could even be one group for each node, with other nodes acting
* as buddy nodes in each nodes' group (i.e. each node is a master with N
* buddies/slaves)
*
* @version $Revision: 1.2 $
*/
public class Group {
private int minimumMemberCount;
private int maximumMemberCount;
private List members = new ArrayList();
private int memberCount;
public Group() {
}
public Group(int minimumMemberCount, int maximumMemberCount) {
this.minimumMemberCount = minimumMemberCount;
this.maximumMemberCount = maximumMemberCount;
}
public synchronized List getMembers() {
return new ArrayList(members);
}
/**
* Adds a node to the given group
*
* @return the index of the node in the group (0 = master, 1..N = slave)
*/
public synchronized int addMember(Node node) {
int index = members.indexOf(node);
if (index >= 0) {
return index;
}
members.add(node);
return memberCount++;
}
public synchronized boolean removeMember(Node node) {
boolean answer = members.remove(node);
if (answer) {
memberCount--;
}
return answer;
}
/**
* Returns true if the group is usable, that it has enough members to be used.
*/
public boolean isUsable() {
return memberCount >= minimumMemberCount;
}
/**
* Returns true if the group cannot accept any more new members
*/
public boolean isFull() {
return memberCount >= maximumMemberCount;
}
public int getMemberCount() {
return memberCount;
}
public int getMaximumMemberCount() {
return maximumMemberCount;
}
public void setMaximumMemberCount(int maximumMemberCount) {
this.maximumMemberCount = maximumMemberCount;
}
public int getMinimumMemberCount() {
return minimumMemberCount;
}
public void setMinimumMemberCount(int minimumMemberCount) {
this.minimumMemberCount = minimumMemberCount;
}
}

View File

@ -1,60 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster.group;
import org.apache.activecluster.ClusterEvent;
import org.apache.activecluster.ClusterListener;
/**
* A {@link ClusterListener} which maintains a {@link GroupModel} implementation
*
* @version $Revision: 1.2 $
*/
public class GroupClusterListener implements ClusterListener {
private GroupModel model;
public GroupClusterListener(GroupModel model) {
this.model = model;
}
// Properties
//-------------------------------------------------------------------------
public GroupModel getModel() {
return model;
}
// ClusterListener interface
//-------------------------------------------------------------------------
public void onNodeAdd(ClusterEvent event) {
model.addNode(event.getNode());
}
public void onNodeUpdate(ClusterEvent event) {
}
public void onNodeRemoved(ClusterEvent event) {
model.removeNode(event.getNode());
}
public void onNodeFailed(ClusterEvent event) {
model.removeNode(event.getNode());
}
public void onCoordinatorChanged(ClusterEvent event) {
}
}

View File

@ -1,330 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster.group;
import org.apache.activecluster.Node;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* Represents a collection of zero or more groups in a cluster.
* The default implementation will create groups as nodes are added to the cluster; filling
* the groups with its required number of buddies / slaves until a new group can be created.
* <p/>
* Nodes which are not allowed to be master nodes will be kept around in a pool ready to be added
* as slaves when a new master arrives and forces the creation of a group.
*
* @version $Revision: 1.2 $
* @see Group
*/
public class GroupModel {
private int maximumGroups = -1;
private int minimumMemberCount = 2;
private int maximumMemberCount = 3;
private List groups = new ArrayList();
private LinkedList incompleteGroups = new LinkedList();
private LinkedList completeGroups = new LinkedList();
private LinkedList fullGroups = new LinkedList();
private LinkedList unusedNodes = new LinkedList();
private NodeFilter masterFilter;
private Map nodeMemberships = new HashMap();
// allow a node to be a master and 2 buddies
private int maximumWeighting = 10;
/**
* Adds the new node to this group model; we assume the node has not been added before.
*
* @param node
*/
public synchronized void addNode(Node node) {
if (!addToExistingGroup(node)) {
Group group = makeNewGroup(node);
if (group == null) {
addToUnusedNodes(node);
}
else {
addGroup(group);
}
}
}
/**
* Removes the node from the group model
*
* @param node
*/
public synchronized void removeNode(Node node) {
unusedNodes.remove(node);
// lets remove the node from each group
for (Iterator iter = groups.iterator(); iter.hasNext();) {
Group group = (Group) iter.next();
boolean wasFull = group.isFull();
boolean wasUsable = group.isUsable();
if (removeNodeFromGroup(group, node)) {
updateGroupCollections(group, wasFull, wasUsable);
}
}
}
// Properties
//-------------------------------------------------------------------------
/**
* Returns a snapshot of the groups currently available
*/
public synchronized List getGroups() {
return new ArrayList(groups);
}
public NodeFilter getMasterFilter() {
return masterFilter;
}
public void setMasterFilter(NodeFilter masterFilter) {
this.masterFilter = masterFilter;
}
public int getMaximumGroups() {
return maximumGroups;
}
public void setMaximumGroups(int maximumGroups) {
this.maximumGroups = maximumGroups;
}
public int getMaximumMemberCount() {
return maximumMemberCount;
}
public void setMaximumMemberCount(int maximumMemberCount) {
this.maximumMemberCount = maximumMemberCount;
}
public int getMinimumMemberCount() {
return minimumMemberCount;
}
public void setMinimumMemberCount(int minimumMemberCount) {
this.minimumMemberCount = minimumMemberCount;
}
public int getMaximumWeighting() {
return maximumWeighting;
}
public void setMaximumWeighting(int maximumWeighting) {
this.maximumWeighting = maximumWeighting;
}
// Implementation methods
//-------------------------------------------------------------------------
/**
* Attempt to make a new group with the current node as the master
* or if the node cannot be a master node
*
* @return the newly created group or false if none was created.
*/
protected Group makeNewGroup(Node node) {
// no pending groups available so lets try and create a new group
if (canCreateGroup(node)) {
Group group = createGroup(node);
addNodeToGroup(group, node);
return group;
}
else {
return null;
}
}
protected void tryToFillGroupWithBuddies(Group group) {
boolean continueFillingGroups = true;
while (!group.isUsable() && continueFillingGroups) {
continueFillingGroups = tryToAddBuddy(group);
}
if (continueFillingGroups) {
// lets try fill more unfilled nodes
for (Iterator iter = new ArrayList(incompleteGroups).iterator(); iter.hasNext() && continueFillingGroups;) {
group = (Group) iter.next();
boolean wasFull = group.isFull();
boolean wasUsable = group.isUsable();
while (!group.isUsable() && continueFillingGroups) {
continueFillingGroups = tryToAddBuddy(group);
}
if (group.isUsable()) {
updateGroupCollections(group, wasFull, wasUsable);
}
}
}
}
protected boolean tryToAddBuddy(Group group) {
boolean continueFillingGroups = true;
// TODO we could make this much faster using a weighting-sorted collection
NodeMemberships lowest = null;
int lowestWeight = 0;
for (Iterator iter = nodeMemberships.values().iterator(); iter.hasNext();) {
NodeMemberships memberships = (NodeMemberships) iter.next();
if (!memberships.isMember(group)) {
int weighting = memberships.getWeighting();
if ((lowest == null || weighting < lowestWeight) && weighting < maximumWeighting) {
lowest = memberships;
lowestWeight = weighting;
}
}
}
if (lowest == null) {
continueFillingGroups = false;
}
else {
addNodeToGroup(group, lowest.getNode());
}
return continueFillingGroups;
}
/**
* Lets move the group from its current state collection to the new collection if its
* state has changed
*/
protected void updateGroupCollections(Group group, boolean wasFull, boolean wasUsable) {
boolean full = group.isFull();
if (wasFull && !full) {
fullGroups.remove(group);
}
boolean usable = group.isUsable();
if (wasUsable && !usable) {
completeGroups.remove(group);
}
if ((!usable || !full) && (wasFull || wasUsable)) {
incompleteGroups.add(group);
}
}
protected void addToUnusedNodes(Node node) {
// lets add the node to the pool ready to be used if a node fails
unusedNodes.add(node);
}
/**
* Attempts to add the node to an incomplete group, or
* a not-full group and returns true if its possible - else returns false
*
* @return true if the node has been added to a groupu
*/
protected boolean addToExistingGroup(Node node) {
if (!addToIncompleteGroup(node)) {
if (!addToNotFullGroup(node)) {
return false;
}
}
return true;
}
protected boolean addToNotFullGroup(Node node) {
return addToPendingGroup(completeGroups, node);
}
protected boolean addToIncompleteGroup(Node node) {
return addToPendingGroup(incompleteGroups, node);
}
/**
* Adds the given node to the first pending group if possible
*
* @return true if the node was added to the first available group
*/
protected boolean addToPendingGroup(LinkedList list, Node node) {
if (!list.isEmpty()) {
Group group = (Group) list.getFirst();
addNodeToGroup(group, node);
if (group.isFull()) {
list.removeFirst();
fullGroups.add(group);
}
else if (group.isUsable()) {
list.removeFirst();
completeGroups.add(group);
}
return true;
}
return false;
}
protected void addNodeToGroup(Group group, Node node) {
NodeMemberships memberships = (NodeMemberships) nodeMemberships.get(node);
if (memberships == null) {
memberships = new NodeMemberships(node);
nodeMemberships.put(node, memberships);
}
memberships.addToGroup(group);
}
protected boolean removeNodeFromGroup(Group group, Node node) {
NodeMemberships memberships = (NodeMemberships) nodeMemberships.get(node);
if (memberships != null) {
return memberships.removeFromGroup(group);
}
return false;
}
protected void addGroup(Group group) {
groups.add(group);
if (group.isFull()) {
fullGroups.add(group);
}
else if (group.isUsable()) {
completeGroups.add(group);
}
else {
incompleteGroups.add(group);
}
}
protected Group createGroup(Node node) {
return new Group(minimumMemberCount, maximumMemberCount);
}
/**
* Returns true if we can add a new group to the cluster
*/
protected boolean canCreateGroup(Node node) {
return (maximumGroups < 0 || groups.size() < maximumGroups) && canBeMaster(node);
}
/**
* Returns true if the given node can be a master
*/
protected boolean canBeMaster(Node node) {
return masterFilter == null || masterFilter.evaluate(node);
}
}

View File

@ -1,41 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster.group;
import org.apache.activecluster.Node;
import java.util.List;
/**
* A filter configured with a list of DMZ zones on which to restrict which nodes
* are allowed to be master nodes.
*
* @version $Revision: 1.2 $
*/
public class MasterZoneFilter implements NodeFilter {
private List zones;
public MasterZoneFilter(List zones) {
this.zones = zones;
}
public boolean evaluate(Node node) {
Object zone = node.getZone();
return zones.contains(zone);
}
}

View File

@ -1,64 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster.group;
/**
* Represents the membership of a Group for a Node
*
* @version $Revision: 1.2 $
*/
public class Membership {
public static final int STATUS_REQUESTED = 1;
public static final int STATUS_SYNCHONIZING = 2;
public static final int STATUS_FAILED = 3;
public static final int STATUS_OK = 4;
private Group group;
private int index;
private int status = STATUS_REQUESTED;
public Membership(Group group, int index) {
this.group = group;
this.index = index;
}
public int getIndex() {
return index;
}
public void setIndex(int index) {
this.index = index;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
/**
* @return the weighting of this membership
*/
public int getWeighting() {
// lets make master heavy and the further from the end of the
// list of slaves, the lighter we become
return group.getMaximumMemberCount() - getIndex();
}
}

View File

@ -1,34 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster.group;
import org.apache.activecluster.Node;
/**
* Represents a filter on a Node to allow a pluggable
* Strategy Pattern to decide which nodes can be master nodes etc.
*
* @version $Revision: 1.2 $
*/
public interface NodeFilter {
/**
* Returns true if the given node matches the filter
*/
public boolean evaluate(Node node);
}

View File

@ -1,77 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster.group;
import org.apache.activecluster.Node;
import java.util.HashMap;
import java.util.Map;
/**
* Represents all of the memberhips of a node and can be used to act
* as a weighting to decide which is the least heavily loaded Node
* to be assigned to a buddy group.
*
* @version $Revision: 1.2 $
*/
public class NodeMemberships {
private Node node;
private Map memberships = new HashMap();
private int weighting;
public NodeMemberships(Node node) {
this.node = node;
}
public void addToGroup(Group group) {
if (!isMember(group)) {
int index = group.addMember(node);
Membership membership = new Membership(group, index);
memberships.put(group, membership);
weighting += membership.getWeighting();
}
}
public boolean removeFromGroup(Group group) {
// TODO when we remove a node from a group, we need to reweight the
// other nodes in the group
memberships.remove(group);
return group.removeMember(node);
}
public Node getNode() {
return node;
}
/**
* Returns the weighting of how heavily loaded the node is
* so that a decision can be made on which node to buddy group
* with
*/
public int getWeighting() {
return weighting;
}
/**
* Returns true if this node is a member of the given group
*/
public boolean isMember(Group group) {
return memberships.containsKey(group);
}
}

View File

@ -1,26 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
</head>
<body>
Contains Group Organsisation models and policies for arranging {@link org.apache.activecluster.Node} instances into
groups, such as buddy-groups (failover nodes) or master/slave groups for High Availability (HA) protocols.
</body>
</html>

View File

@ -1,61 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster.impl;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activecluster.impl.DefaultClusterFactory;
/**
* An implementation of {@link org.apache.activecluster.ClusterFactory} using
* <a href="http://activemq.codehaus.org/">ActiveMQ</a>
*
* @version $Revision: 1.4 $
*/
public class ActiveMQClusterFactory extends DefaultClusterFactory {
public static String DEFAULT_CLUSTER_URL = "peer://org.apache.activecluster?persistent=false";
public ActiveMQClusterFactory() {
super(new ActiveMQConnectionFactory(DEFAULT_CLUSTER_URL));
}
public ActiveMQClusterFactory(String brokerURL) {
super(new ActiveMQConnectionFactory(brokerURL));
}
public ActiveMQClusterFactory(ActiveMQConnectionFactory connectionFactory) {
super(connectionFactory);
}
public ActiveMQClusterFactory(boolean transacted, int acknowledgeMode, String dataTopicPrefix, long inactiveTime) {
super(new ActiveMQConnectionFactory(DEFAULT_CLUSTER_URL), transacted, acknowledgeMode, dataTopicPrefix, inactiveTime);
}
public ActiveMQClusterFactory(ActiveMQConnectionFactory connectionFactory, boolean transacted, int acknowledgeMode, String dataTopicPrefix, long inactiveTime) {
super(connectionFactory, transacted, acknowledgeMode, dataTopicPrefix, inactiveTime);
}
public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
return (ActiveMQConnectionFactory) getConnectionFactory();
}
public void setActiveMQConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
setConnectionFactory(connectionFactory);
}
}

View File

@ -1,233 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activecluster.impl;
import java.io.Serializable;
import java.util.Map;
import java.util.Timer;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activecluster.Cluster;
import org.apache.activecluster.ClusterListener;
import org.apache.activecluster.DestinationMarshaller;
import org.apache.activecluster.LocalNode;
import org.apache.activecluster.Service;
import org.apache.activecluster.election.ElectionStrategy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.*;
/**
* A default implementation of ActiveCluster which uses standard JMS operations
*
* @version $Revision: 1.6 $
*/
public class DefaultCluster implements Cluster {
private final static Log log = LogFactory.getLog(DefaultCluster.class);
private StateServiceImpl stateService;
private LocalNode localNode;
private Destination destination;
private Connection connection;
private Session session;
private MessageProducer producer;
private MessageConsumer consumer;
private Timer timer;
private DestinationMarshaller marshaller;
private AtomicBoolean started = new AtomicBoolean(false);
private Object clusterLock = new Object();
/**
* Construct this beast
* @param localNode
* @param dataDestination
* @param destination
* @param marshaller
* @param connection
* @param session
* @param producer
* @param timer
* @param inactiveTime
* @throws JMSException
*/
public DefaultCluster(final LocalNode localNode,Destination dataDestination,Destination destination,
DestinationMarshaller marshaller,Connection connection,Session session,MessageProducer producer,
Timer timer,long inactiveTime) throws JMSException{
this.localNode=localNode;
this.destination=destination;
this.marshaller=marshaller;
this.connection=connection;
this.session=session;
this.producer=producer;
this.timer=timer;
if(producer==null){
throw new IllegalArgumentException("No producer specified!");
}
// now lets subscribe the service to the updates from the data topic
consumer=session.createConsumer(dataDestination,null,true);
log.info("Creating data consumer on topic: "+dataDestination);
this.stateService=new StateServiceImpl(this,clusterLock,new Runnable(){
public void run(){
if(localNode instanceof ReplicatedLocalNode){
((ReplicatedLocalNode) localNode).pingRemoteNodes();
}
}
},timer,inactiveTime);
consumer.setMessageListener(new StateConsumer(stateService,marshaller));
}
public void addClusterListener(ClusterListener listener) {
stateService.addClusterListener(listener);
}
public void removeClusterListener(ClusterListener listener) {
stateService.removeClusterListener(listener);
}
public Destination getDestination() {
return destination;
}
public LocalNode getLocalNode() {
return localNode;
}
public Map getNodes() {
return stateService.getNodes();
}
public void setElectionStrategy(ElectionStrategy strategy) {
stateService.setElectionStrategy(strategy);
}
public void send(Destination replyTo, Message message) throws JMSException{
producer.send(replyTo,message);
}
public MessageConsumer createConsumer(Destination destination) throws JMSException {
return getSession().createConsumer(destination);
}
public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
return getSession().createConsumer(destination, selector);
}
public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
return getSession().createConsumer(destination, selector, noLocal);
}
public Message createMessage() throws JMSException {
return getSession().createMessage();
}
public BytesMessage createBytesMessage() throws JMSException {
return getSession().createBytesMessage();
}
public MapMessage createMapMessage() throws JMSException {
return getSession().createMapMessage();
}
public ObjectMessage createObjectMessage() throws JMSException {
return getSession().createObjectMessage();
}
public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
return getSession().createObjectMessage(object);
}
public StreamMessage createStreamMessage() throws JMSException {
return getSession().createStreamMessage();
}
public TextMessage createTextMessage() throws JMSException {
return getSession().createTextMessage();
}
public TextMessage createTextMessage(String text) throws JMSException {
return getSession().createTextMessage(text);
}
public void start() throws JMSException {
if (started.compareAndSet(false, true)) {
connection.start();
}
}
public void stop() throws JMSException {
try {
if (localNode instanceof Service) {
((Service) localNode).stop();
}
timer.cancel();
session.close();
connection.stop();
connection.close();
}
finally {
connection = null;
session = null;
}
}
public boolean waitForClusterToComplete(int expectedCount, long timeout) throws InterruptedException {
timeout = timeout > 0 ? timeout : Long.MAX_VALUE;
long increment = 500;
increment = increment < timeout ? increment : timeout;
long waitTime = timeout;
long start = System.currentTimeMillis();
synchronized (clusterLock) {
while (stateService.getNodes().size() < expectedCount && started.get() && waitTime > 0) {
clusterLock.wait(increment);
waitTime = timeout - (System.currentTimeMillis() - start);
}
}
return stateService.getNodes().size() >= expectedCount;
}
protected Session getSession() throws JMSException {
if (session == null) {
throw new JMSException("Cannot perform operation, this cluster connection is now closed");
}
return session;
}
/**
* Create a named Destination
* @param name
* @return the Destinatiion
* @throws JMSException
*/
public Destination createDestination(String name) throws JMSException{
Destination result = getSession().createTopic(name);
return result;
}
}

View File

@ -1,196 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster.impl;
import java.util.Timer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.util.IdGenerator;
import org.apache.activecluster.Cluster;
import org.apache.activecluster.ClusterException;
import org.apache.activecluster.ClusterFactory;
import org.apache.activecluster.DestinationMarshaller;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A Factory of DefaultCluster instances
*
* @version $Revision: 1.4 $
*/
public class DefaultClusterFactory implements ClusterFactory {
private final static Log log = LogFactory.getLog(DefaultClusterFactory.class);
private ConnectionFactory connectionFactory;
private boolean transacted;
private int acknowledgeMode;
private String dataTopicPrefix;
private long inactiveTime;
private boolean useQueueForInbox = false;
private int deliveryMode = DeliveryMode.NON_PERSISTENT;
private IdGenerator idGenerator = new IdGenerator();
public DefaultClusterFactory(ConnectionFactory connectionFactory, boolean transacted, int acknowledgeMode, String dataTopicPrefix, long inactiveTime) {
this.connectionFactory = connectionFactory;
this.transacted = transacted;
this.acknowledgeMode = acknowledgeMode;
this.dataTopicPrefix = dataTopicPrefix;
this.inactiveTime = inactiveTime;
}
public DefaultClusterFactory(ConnectionFactory connectionFactory) {
this(connectionFactory, false, Session.AUTO_ACKNOWLEDGE, "ACTIVECLUSTER.DATA.", 6000L);
}
public Cluster createCluster(Destination groupDestination) throws JMSException {
return createCluster(idGenerator.generateId(), groupDestination);
}
public Cluster createCluster(String name,Destination groupDestination) throws JMSException {
Connection connection = getConnectionFactory().createConnection();
Session session = createSession(connection);
return createCluster(connection, session, name,groupDestination,new DefaultDestinationMarshaller(session));
}
public Cluster createCluster(String name,Destination groupDestination,DestinationMarshaller marshaller) throws JMSException {
Connection connection = getConnectionFactory().createConnection();
Session session = createSession(connection);
return createCluster(connection, session, name,groupDestination,marshaller);
}
public Cluster createCluster(String name,String groupDestinationName) throws JMSException{
Connection connection = getConnectionFactory().createConnection();
Session session = createSession(connection);
return createCluster(connection, session, name,session.createTopic(groupDestinationName),new DefaultDestinationMarshaller(session));
}
public Cluster createCluster(String name,String groupDestinationName,DestinationMarshaller marshaller) throws JMSException{
Connection connection = getConnectionFactory().createConnection();
Session session = createSession(connection);
return createCluster(connection, session, name,session.createTopic(groupDestinationName),marshaller);
}
public Cluster createCluster(String groupDestinationName) throws JMSException{
return createCluster(idGenerator.generateId(), groupDestinationName);
}
// Properties
//-------------------------------------------------------------------------
public String getDataTopicPrefix() {
return dataTopicPrefix;
}
public void setDataTopicPrefix(String dataTopicPrefix) {
this.dataTopicPrefix = dataTopicPrefix;
}
public int getAcknowledgeMode() {
return acknowledgeMode;
}
public void setAcknowledgeMode(int acknowledgeMode) {
this.acknowledgeMode = acknowledgeMode;
}
public long getInactiveTime() {
return inactiveTime;
}
public void setInactiveTime(long inactiveTime) {
this.inactiveTime = inactiveTime;
}
public boolean isTransacted() {
return transacted;
}
public void setTransacted(boolean transacted) {
this.transacted = transacted;
}
public boolean isUseQueueForInbox() {
return useQueueForInbox;
}
public void setUseQueueForInbox(boolean useQueueForInbox) {
this.useQueueForInbox = useQueueForInbox;
}
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public int getDeliveryMode() {
return deliveryMode;
}
/**
* Sets the delivery mode of the group based producer
*/
public void setDeliveryMode(int deliveryMode) {
this.deliveryMode = deliveryMode;
}
public Cluster createCluster(Connection connection,Session session,String name,Destination groupDestination,
DestinationMarshaller marshaller) throws JMSException{
String dataDestination = dataTopicPrefix + marshaller.getDestinationName(groupDestination);
log.info("Creating cluster group producer on topic: "+groupDestination);
MessageProducer producer=createProducer(session,null);
producer.setDeliveryMode(deliveryMode);
log.info("Creating cluster data producer on data destination: "+dataDestination);
Topic dataTopic=session.createTopic(dataDestination);
MessageProducer keepAliveProducer=session.createProducer(dataTopic);
keepAliveProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
StateService serviceStub=new StateServiceStub(session,keepAliveProducer,marshaller);
Destination localInboxDestination=session.createTopic(dataDestination+"."+name);
ReplicatedLocalNode localNode=new ReplicatedLocalNode(name,localInboxDestination,serviceStub);
Timer timer=new Timer();
DefaultCluster answer=new DefaultCluster(localNode,dataTopic,groupDestination,marshaller,connection,session,
producer,timer,inactiveTime);
return answer;
}
/*
* protected Cluster createInternalCluster(Session session, Topic dataDestination) { MessageProducer producer =
* createProducer(session); return new DefaultCluster(new NonReplicatedLocalNode(), dataDestination, connection,
* session, producer); }
*/
protected MessageProducer createProducer(Session session, Topic groupDestination) throws JMSException {
return session.createProducer(groupDestination);
}
protected Session createSession(Connection connection) throws JMSException {
return connection.createSession(transacted, acknowledgeMode);
}
}

View File

@ -1,100 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activecluster.impl;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activecluster.DestinationMarshaller;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
* A simple marshaller for Destinations
*
* @version $Revision: 1.5 $
*/
public class DefaultDestinationMarshaller implements DestinationMarshaller {
private final static Log log = LogFactory.getLog(DefaultDestinationMarshaller.class);
/**
* Keep a cache of name to destination mappings for fast lookup.
*/
private final Map destinations = new ConcurrentHashMap();
/**
* The active session used to create a new Destination from a name.
*/
private final Session session;
/**
* Create a marshaller for this specific session.
* @param session the session to use when mapping destinations.
*/
public DefaultDestinationMarshaller(Session session) {
this.session = session;
}
/**
* Builds a destination from a destinationName
* @param destinationName
*
* @return the destination to send messages to all members of the cluster
*/
public Destination getDestination(String destinationName) throws JMSException {
if (!destinations.containsKey(destinationName)) {
destinations.put(destinationName, session.createTopic(destinationName));
}
return (Destination) destinations.get(destinationName);
}
/**
* Gets a destination's physical name
* @param destination
* @return the destination's physical name
*/
public String getDestinationName(Destination destination){
String result = null;
if (destination != null){
if (destination instanceof Topic){
Topic topic = (Topic) destination;
try{
result = topic.getTopicName();
}catch(JMSException e){
log.error("Failed to get topic name for " + destination,e);
}
}else{
Queue queue = (Queue) destination;
try{
result = queue.getQueueName();
}catch(JMSException e){
log.error("Failed to get queue name for " + destination,e);
}
}
}
return result;
}
}

View File

@ -1,150 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activecluster.impl;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.HashMap;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activecluster.DestinationMarshaller;
import org.apache.activecluster.Node;
/**
* Default implementation of a remote Node
*
* @version $Revision: 1.3 $
*/
public class NodeImpl implements Node{
private static final long serialVersionUID=-3909792803360045064L;
private String name;
private Destination destination;
protected Map state;
protected boolean coordinator;
/**
* Construct an Node from a NodeState
* @param nodeState
* @param marshaller
* @throws JMSException
*/
public NodeImpl(NodeState nodeState,DestinationMarshaller marshaller) throws JMSException{
this(nodeState.getName(),marshaller.getDestination(nodeState.getDestinationName()),nodeState.getState());
}
/**
* Allow a node to be copied for sending it as a message
*
* @param node
*/
public NodeImpl(Node node) {
this(node.getName(),node.getDestination(), node.getState());
}
/**
* Create a Node
* @param name
* @param destination
*/
public NodeImpl(String name,Destination destination) {
this(name,destination, new HashMap());
}
/**
* Create A Node
* @param name
* @param destination
* @param state
*/
public NodeImpl(String name,Destination destination, Map state) {
this.name = name;
this.destination = destination;
this.state = state;
}
/**
* @return the name of the node
*/
public String getName() {
return name;
}
/**
* @return pretty print of the node
*/
public String toString() {
return "Node[<" + name + ">destination: " + destination + " state: " + state + "]";
}
/**
* @return the destination of the node
*/
public Destination getDestination() {
return destination;
}
/**
* Get the State
* @return the State of the Node
*/
public synchronized Map getState() {
return new HashMap(state);
}
/**
* @return true if this node has been elected as coordinator
*/
public boolean isCoordinator() {
return coordinator;
}
/**
* Get the zone
* @return the Zone
*/
public Object getZone() {
return state.get("zone");
}
// Implementation methods
//-------------------------------------------------------------------------
protected synchronized void setState(Map state) {
this.state = state;
}
protected void setCoordinator(boolean value) {
coordinator = value;
}
public void writeExternal(ObjectOutput out) throws IOException{
// TODO Auto-generated method stub
}
public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException{
// TODO Auto-generated method stub
}
}

View File

@ -1,153 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activecluster.impl;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Map;
import org.apache.activecluster.DestinationMarshaller;
import org.apache.activecluster.Node;
/**
* Default implementation of a remote Node
*
* @version $Revision: 1.3 $
*/
public class NodeState implements Externalizable{
private static final long serialVersionUID=-3909792803360045064L;
private String name;
private String destinationName;
protected Map state;
protected boolean coordinator;
/**
* DefaultConstructor
*
*/
public NodeState(){
}
/**
* Construct a NodeState from a Node
* @param node
* @param marshaller
*/
public NodeState(Node node, DestinationMarshaller marshaller){
this.name = node.getName();
this.destinationName = marshaller.getDestinationName(node.getDestination());
this.state = node.getState();
this.coordinator = node.isCoordinator();
}
/**
* @return pretty print of the node
*/
public String toString(){
return "NodeState[<"+name+">destinationName: "+destinationName+" state: "+state+"]";
}
/**
* @return Returns the coordinator.
*/
public boolean isCoordinator(){
return coordinator;
}
/**
* @param coordinator
* The coordinator to set.
*/
public void setCoordinator(boolean coordinator){
this.coordinator=coordinator;
}
/**
* @return Returns the destinationName.
*/
public String getDestinationName(){
return destinationName;
}
/**
* @param destinationName
* The destinationName to set.
*/
public void setDestinationName(String destinationName){
this.destinationName=destinationName;
}
/**
* @return Returns the name.
*/
public String getName(){
return name;
}
/**
* @param name
* The name to set.
*/
public void setName(String name){
this.name=name;
}
/**
* @return Returns the state.
*/
public Map getState(){
return state;
}
/**
* @param state
* The state to set.
*/
public void setState(Map state){
this.state=state;
}
/**
* write to a stream
*
* @param out
* @throws IOException
*/
public void writeExternal(ObjectOutput out) throws IOException{
out.writeUTF((name!=null?name:""));
out.writeUTF((destinationName!=null?destinationName:""));
out.writeBoolean(coordinator);
out.writeObject(state);
}
/**
* read from a stream
*
* @param in
* @throws IOException
* @throws ClassNotFoundException
*/
public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException{
this.name=in.readUTF();
this.destinationName=in.readUTF();
this.coordinator=in.readBoolean();
this.state=(Map) in.readObject();
}
}

View File

@ -1,58 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activecluster.impl;
import java.util.Map;
import javax.jms.Destination;
import org.apache.activecluster.LocalNode;
/**
* Default implementation of a local Node which doesn't
* have its state replicated
*
* @version $Revision: 1.4 $
*/
public class NonReplicatedLocalNode extends NodeImpl implements LocalNode {
private static final long serialVersionUID=2525565639637967143L;
/**
* Create a Non-replicated local node
* @param name
* @param destination
*/
public NonReplicatedLocalNode(String name, Destination destination) {
super(name,destination);
}
/**
* Set the local state
* @param state
*/
public void setState(Map state) {
super.setState(state);
}
/**
* Shouldn't be called for non-replicated local nodes
*/
public void pingRemoteNodes() {
throw new RuntimeException("Non-Replicated Local Node should not distribute it's state!");
}
}

View File

@ -1,84 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activecluster.impl;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activecluster.LocalNode;
import org.apache.activecluster.Service;
/**
* Default implementation of a local Node which has its
* state replicated across the cluster
*
* @version $Revision: 1.3 $
*/
public class ReplicatedLocalNode extends NodeImpl implements LocalNode, Service {
/**
*
*/
private static final long serialVersionUID=4626381612145333540L;
private transient StateService serviceStub;
/**
* Create ReplicatedLocalNode
* @param name
* @param destination
* @param serviceStub
*/
public ReplicatedLocalNode(String name,Destination destination, StateService serviceStub) {
super(name,destination);
this.serviceStub = serviceStub;
}
/**
* Set the State of the local node
* @param state
*/
public void setState(Map state) {
super.setState(state);
serviceStub.keepAlive(this);
}
/**
* ping remote nodes
*
*/
public void pingRemoteNodes() {
serviceStub.keepAlive(this);
}
/**
* start (lifecycle)
* @throws JMSException
*/
public void start() throws JMSException {
}
/**
* stop (lifecycle)
* @throws JMSException
*/
public void stop() throws JMSException {
}
}

View File

@ -1,76 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activecluster.impl;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Topic;
import org.apache.activecluster.DestinationMarshaller;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A simple marshaller for Destinations
*
* @version $Revision: 1.5 $
*/
public class SimpleDestinationMarshaller implements DestinationMarshaller {
private final static Log log = LogFactory.getLog(SimpleDestinationMarshaller.class);
/**
* Builds a destination from a destinationName
* @param destinationName
*
* @return the destination to send messages to all members of the cluster
*/
public Destination getDestination(String destinationName){
return new ActiveMQTopic(destinationName);
}
/**
* Gets a destination's physical name
* @param destination
* @return the destination's physical name
*/
public String getDestinationName(Destination destination){
String result = null;
if (destination != null){
if (destination instanceof Topic){
Topic topic = (Topic) destination;
try{
result = topic.getTopicName();
}catch(JMSException e){
log.error("Failed to get topic name for " + destination,e);
}
}else{
Queue queue = (Queue) destination;
try{
result = queue.getQueueName();
}catch(JMSException e){
log.error("Failed to get queue name for " + destination,e);
}
}
}
return result;
}
}

View File

@ -1,78 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activecluster.impl;
import org.apache.activecluster.DestinationMarshaller;
import org.apache.activecluster.Node;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
/**
* A JMS MessageListener which processes inbound messages and
* applies them to a StateService
*
* @version $Revision: 1.2 $
*/
public class StateConsumer implements MessageListener {
private final static Log log = LogFactory.getLog(StateConsumer.class);
private StateService stateService;
private DestinationMarshaller marshaller;
public StateConsumer(StateService stateService,DestinationMarshaller marshaller) {
if (stateService == null) {
throw new IllegalArgumentException("Must specify a valid StateService implementation");
}
this.stateService = stateService;
this.marshaller = marshaller;
}
public void onMessage(Message message) {
if (log.isDebugEnabled()) {
log.debug("Received cluster data message!: " + message);
}
if (message instanceof ObjectMessage) {
ObjectMessage objectMessage = (ObjectMessage) message;
try {
NodeState nodeState = (NodeState) objectMessage.getObject();
Node node = new NodeImpl(nodeState,marshaller);
String type = objectMessage.getJMSType();
if (type != null && type.equals("shutdown")) {
stateService.shutdown(node);
}
else {
stateService.keepAlive(node);
}
}
catch (Exception e) {
log.error("Could not extract node from message: " + e + ". Message: " + message, e);
}
}
else {
log.warn("Ignoring message: " + message);
}
}
}

View File

@ -1,42 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster.impl;
import org.apache.activecluster.Node;
/**
* A client side proxy to the remove cluster
*
* @version $Revision: 1.3 $
*/
public interface StateService {
/**
* Sends a keep alive to the cluster
*
* @param node
*/
public void keepAlive(Node node);
/**
* Sends a shutdown message to the cluster
*/
public void shutdown(Node node);
}

View File

@ -1,302 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activecluster.impl;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Map.Entry;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activecluster.Cluster;
import org.apache.activecluster.ClusterEvent;
import org.apache.activecluster.ClusterListener;
import org.apache.activecluster.Node;
import org.apache.activecluster.election.ElectionStrategy;
import org.apache.activecluster.election.impl.BullyElectionStrategy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
/**
* Represents a node list
*
* @version $Revision: 1.4 $
*/
public class StateServiceImpl implements StateService {
private final static Log log = LogFactory.getLog(StateServiceImpl.class);
private Cluster cluster;
private Object clusterLock;
private Map nodes = new ConcurrentHashMap();
private long inactiveTime;
private List listeners = new CopyOnWriteArrayList();
private Destination localDestination;
private Runnable localNodePing;
private NodeImpl coordinator;
private ElectionStrategy electionStrategy;
/**
* @param cluster
* @param clusterLock
* @param localNodePing
* @param timer
* @param inactiveTime
*/
/**
* Constructor StateServiceImpl
* @param cluster
* @param clusterLock
* @param localNodePing
* @param timer
* @param inactiveTime
*/
public StateServiceImpl(Cluster cluster, Object clusterLock, Runnable localNodePing, Timer timer, long inactiveTime) {
this.cluster = cluster;
this.clusterLock = clusterLock;
this.localDestination = cluster.getLocalNode().getDestination();
this.localNodePing = localNodePing;
this.inactiveTime = inactiveTime;
long delay = inactiveTime / 3;
timer.scheduleAtFixedRate(createTimerTask(), delay, delay);
(this.coordinator = (NodeImpl) cluster.getLocalNode()).setCoordinator(true);
this.electionStrategy = new BullyElectionStrategy();
}
/**
* @return the current election strategy
*/
public ElectionStrategy getElectionStrategy() {
return electionStrategy;
}
/**
* set the election strategy
*
* @param electionStrategy
*/
public void setElectionStrategy(ElectionStrategy electionStrategy) {
this.electionStrategy = electionStrategy;
}
/**
* Get time of since last communication
* @return length of time inactive
*/
public long getInactiveTime() {
return inactiveTime;
}
/**
* Set the time inactive
* @param inactiveTime
*/
public void setInactiveTime(long inactiveTime) {
this.inactiveTime = inactiveTime;
}
/**
* Get A Map of nodes - where key = destination, value = node
* @return map of destination/nodes
*/
public Map getNodes() {
HashMap answer = new HashMap(nodes.size());
for (Iterator iter = nodes.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry) iter.next();
Object key = entry.getKey();
NodeEntry nodeEntry = (NodeEntry) entry.getValue();
answer.put(key, nodeEntry.node);
}
return answer;
}
/**
* Got a keepalive
* @param node
*/
public void keepAlive(Node node) {
Object key = node.getDestination();
if (key != null && !localDestination.equals(key)) {
NodeEntry entry = (NodeEntry) nodes.get(key);
if (entry == null) {
entry = new NodeEntry();
entry.node = node;
nodes.put(key, entry);
nodeAdded(node);
synchronized (clusterLock) {
clusterLock.notifyAll();
}
}
else {
// has the data changed
if (stateHasChanged(entry.node, node)) {
entry.node = node;
nodeUpdated(node);
}
}
// lets update the timer at which the node will be considered
// to be dead
entry.lastKeepAlive = getTimeMillis();
}
}
/**
* shutdown the node
*/
public void shutdown(Node node){
Object key=node.getDestination();
if(key!=null){
nodes.remove(key);
ClusterEvent event=new ClusterEvent(cluster,node,ClusterEvent.ADD_NODE);
for (Iterator i = listeners.iterator(); i.hasNext();){
ClusterListener listener=(ClusterListener) i.next();
listener.onNodeRemoved(event);
}
}
}
/**
* check nodes are alive
*
*/
public void checkForTimeouts() {
localNodePing.run();
long time = getTimeMillis();
for (Iterator iter = nodes.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Entry) iter.next();
NodeEntry nodeEntry = (NodeEntry) entry.getValue();
if (nodeEntry.lastKeepAlive + inactiveTime < time) {
iter.remove();
nodeFailed(nodeEntry.node);
}
}
}
public TimerTask createTimerTask() {
return new TimerTask() {
public void run() {
checkForTimeouts();
}
};
}
public void addClusterListener(ClusterListener listener) {
listeners.add(listener);
}
public void removeClusterListener(ClusterListener listener) {
listeners.remove(listener);
}
protected void nodeAdded(Node node) {
ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.ADD_NODE);
// lets take a copy to make contention easier
Object[] array = listeners.toArray();
for (int i = 0, size = array.length; i < size; i++) {
ClusterListener listener = (ClusterListener) array[i];
listener.onNodeAdd(event);
}
doElection();
}
protected void nodeUpdated(Node node) {
ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.UPDATE_NODE);
// lets take a copy to make contention easier
Object[] array = listeners.toArray();
for (int i = 0, size = array.length; i < size; i++) {
ClusterListener listener = (ClusterListener) array[i];
listener.onNodeUpdate(event);
}
}
protected void nodeFailed(Node node) {
ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.REMOVE_NODE);
// lets take a copy to make contention easier
Object[] array = listeners.toArray();
for (int i = 0, size = array.length; i < size; i++) {
ClusterListener listener = (ClusterListener) array[i];
listener.onNodeFailed(event);
}
doElection();
}
protected void coordinatorChanged(Node node) {
ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.ELECTED_COORDINATOR);
// lets take a copy to make contention easier
Object[] array = listeners.toArray();
for (int i = 0, size = array.length; i < size; i++) {
ClusterListener listener = (ClusterListener) array[i];
listener.onCoordinatorChanged(event);
}
}
protected void doElection() {
if (electionStrategy != null) {
try {
NodeImpl newElected = (NodeImpl) electionStrategy.doElection(cluster);
if (newElected != null && !newElected.equals(coordinator)) {
coordinator.setCoordinator(false);
coordinator = newElected;
coordinator.setCoordinator(true);
coordinatorChanged(coordinator);
}
}
catch (JMSException jmsEx) {
log.error("do election failed", jmsEx);
}
}
}
/**
* For performance we may wish to use a less granualar timing mechanism
* only updating the time every x millis since we're only using
* the time as a judge of when a node has not pinged for at least a few
* hundred millis etc.
*/
protected long getTimeMillis() {
return System.currentTimeMillis();
}
protected static class NodeEntry {
public Node node;
public long lastKeepAlive;
}
/**
* @return true if the node has changed state from the old in memory copy to the
* newly arrived copy
*/
protected boolean stateHasChanged(Node oldNode, Node newNode) {
Map oldState = oldNode.getState();
Map newState = newNode.getState();
if (oldState == newState) {
return false;
}
return oldState == null || newState == null || !oldState.equals(newState);
}
}

View File

@ -1,80 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activecluster.impl;
import org.apache.activecluster.DestinationMarshaller;
import org.apache.activecluster.Node;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
/**
* A local stub for the state service which sends JMS messages
* to the cluster
*
* @version $Revision: 1.2 $
*/
public class StateServiceStub implements StateService {
private final Log log = LogFactory.getLog(getClass());
private Session session;
private MessageProducer producer;
private DestinationMarshaller marshaller;
public StateServiceStub(Session session, MessageProducer producer,DestinationMarshaller marshaller) {
this.session = session;
this.producer = producer;
this.marshaller = marshaller;
}
public void keepAlive(Node node) {
try {
if (log.isDebugEnabled()) {
log.debug("Sending cluster data message: " + node);
}
Message message = session.createObjectMessage(new NodeState(node,marshaller));
producer.send(message);
}
catch (JMSException e) {
log.error("Could not send JMS message: " + e, e);
}
}
public void shutdown(Node node) {
try {
if (log.isDebugEnabled()) {
log.debug("Sending shutdown message: " + node);
}
Message message = session.createObjectMessage(new NodeState(node,marshaller));
message.setJMSType("shutdown");
producer.send(message);
}
catch (JMSException e) {
log.error("Could not send JMS message: " + e, e);
}
}
}

View File

@ -1,27 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
</head>
<body>
<p>
Default implementation of ActiveCluster using standard JMS API to build the cluster.
</p>
</body>
</html>

View File

@ -1,31 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
</head>
<body>
<p>
ActiveCluster API for working with a simple cluster abstraction for building cluster algorithms
like buddy systems, voting, master/slave protocols, electing a controller and so forth.
</p>
Clusters communicate across a common destination (typically a Topic) but every member (node)of a
cluster has a unique name (the generation of unique names is left to the developer), and a
unique destination (which uses the unique name).
</body>
</html>

View File

@ -1,146 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import javax.jms.JMSException;
import org.apache.activecluster.Cluster;
import org.apache.activecluster.ClusterEvent;
import org.apache.activecluster.ClusterException;
import org.apache.activecluster.ClusterFactory;
import org.apache.activecluster.ClusterListener;
import org.apache.activecluster.impl.ActiveMQClusterFactory;
/**
* @version $Revision: 1.2 $
*/
public class ChatDemo implements ClusterListener {
private Cluster cluster;
private String name = "unknown";
public static void main(String[] args) {
try {
ChatDemo test = new ChatDemo();
test.run();
}
catch (JMSException e) {
System.out.println("Caught: " + e);
e.printStackTrace();
Exception c = e.getLinkedException();
if (c != null) {
System.out.println("Cause: " + c);
c.printStackTrace();
}
}
catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
public void run() throws Exception {
cluster = createCluster();
cluster.addClusterListener(this);
cluster.start();
System.out.println();
System.out.println();
System.out.println("Welcome to the ActiveCluster Chat Demo!");
System.out.println();
System.out.println("Enter text to talk or type");
System.out.println(" /quit to terminate the application");
System.out.println(" /name foo to change your name to be 'foo'");
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
boolean running = true;
while (running) {
String line = reader.readLine();
if (line == null || line.trim().equalsIgnoreCase("quit")) {
break;
}
else {
running = processCommand(line.trim());
}
}
stop();
}
protected boolean processCommand(String text) throws JMSException {
if (text.equals("/quit")) {
return false;
}
else {
if (text.startsWith("/name")) {
name = text.substring(5).trim();
System.out.println("* name now changed to: " + name);
}
else {
// lets talk
Map map = new HashMap();
map.put("text", text);
map.put("name", name);
cluster.getLocalNode().setState(map);
}
return true;
}
}
public void onNodeAdd(ClusterEvent event) {
System.out.println("* " + getName(event) + " has joined the room");
}
public void onNodeUpdate(ClusterEvent event) {
System.out.println(getName(event) + "> " + getText(event));
}
public void onNodeRemoved(ClusterEvent event) {
System.out.println("* " + getName(event) + " has left the room");
}
public void onNodeFailed(ClusterEvent event) {
System.out.println("* " + getName(event) + " has failed unexpectedly");
}
public void onCoordinatorChanged(ClusterEvent event){
}
protected Object getName(ClusterEvent event) {
return event.getNode().getState().get("name");
}
protected Object getText(ClusterEvent event) {
return event.getNode().getState().get("text");
}
protected void stop() throws JMSException {
cluster.stop();
}
protected Cluster createCluster() throws JMSException, ClusterException {
ClusterFactory factory = new ActiveMQClusterFactory();
return factory.createCluster("ORG.CODEHAUS.ACTIVEMQ.TEST.CLUSTER");
}
}

View File

@ -1,112 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster;
import org.apache.activecluster.Cluster;
import org.apache.activecluster.ClusterException;
import org.apache.activecluster.ClusterFactory;
import org.apache.activecluster.election.ElectionStrategy;
import org.apache.activecluster.election.impl.BullyElectionStrategy;
import org.apache.activecluster.impl.ActiveMQClusterFactory;
import org.apache.activecluster.impl.DefaultClusterFactory;
import javax.jms.JMSException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
/**
* @version $Revision: 1.2 $
*/
public class ClusterDemo {
protected Cluster cluster;
private String name;
private ElectionStrategy electionStrategy;
public static void main(String[] args) {
try {
ClusterDemo test = new ClusterDemo();
if (args.length > 0) {
test.name = args[0];
}
test.demo();
}
catch (JMSException e) {
System.out.println("Caught: " + e);
e.printStackTrace();
Exception c = e.getLinkedException();
if (c != null) {
System.out.println("Cause: " + c);
c.printStackTrace();
}
}
catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
public void demo() throws Exception {
start();
cluster.addClusterListener(new TestingClusterListener(cluster));
System.out.println("Enter 'quit' to terminate");
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String line = reader.readLine();
if (line == null || line.trim().equalsIgnoreCase("quit")) {
break;
}
else {
Map map = new HashMap();
map.put("text", line);
cluster.getLocalNode().setState(map);
}
}
stop();
}
protected void start() throws JMSException, ClusterException {
cluster = createCluster();
if (name != null) {
System.out.println("Starting node: " + name);
// TODO could we do cluster.setName() ?
Map state = new HashMap();
state.put("name", name);
cluster.getLocalNode().setState(state);
}
cluster.start();
if (electionStrategy == null) {
electionStrategy = new BullyElectionStrategy();
}
electionStrategy.doElection(cluster);
}
protected void stop() throws JMSException {
cluster.stop();
}
protected Cluster createCluster() throws JMSException, ClusterException {
ClusterFactory factory = new ActiveMQClusterFactory();
return factory.createCluster("ORG.CODEHAUS.ACTIVEMQ.TEST.CLUSTER");
}
}

View File

@ -1,209 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster;
import java.util.HashMap;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import junit.framework.TestCase;
import org.apache.activecluster.Cluster;
import org.apache.activecluster.ClusterEvent;
import org.apache.activecluster.ClusterListener;
import org.apache.activecluster.impl.DefaultClusterFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* Test ActiveCluster, ActiveMQ, with an eye to putting WADI on top of them.
*
* @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell </a>
* @version $Revision: 1.4 $
*/
public class ClusterFunctionTest extends TestCase {
protected Log _log = LogFactory.getLog(ClusterFunctionTest.class);
public ClusterFunctionTest(String name) {
super(name);
}
protected ActiveMQConnectionFactory _connectionFactory;
protected Connection _connection;
protected DefaultClusterFactory _clusterFactory;
protected Cluster _cluster0;
protected Cluster _cluster1;
protected void setUp() throws Exception {
testResponsePassed = false;
_connectionFactory = new ActiveMQConnectionFactory("peer://cluster?persistent=false");
_clusterFactory = new DefaultClusterFactory(_connectionFactory);
_cluster0 = _clusterFactory.createCluster("ORG.CODEHAUS.WADI.TEST.CLUSTER");
_cluster1 = _clusterFactory.createCluster("ORG.CODEHAUS.WADI.TEST.CLUSTER");
_cluster0.start();
_log.info("started node0: " + _cluster0.getLocalNode().getDestination());
_cluster1.start();
_log.info("started node1: " + _cluster1.getLocalNode().getDestination());
}
protected void tearDown() throws JMSException {
// _cluster1.stop();
_cluster1 = null;
// _cluster0.stop();
_cluster0 = null;
_clusterFactory = null;
// _connection.stop();
_connection = null;
// _connectionFactory.stop();
}
//----------------------------------------
class MyClusterListener implements ClusterListener {
public void onNodeAdd(ClusterEvent ce) {
_log.info("node added: " + ce.getNode());
}
public void onNodeFailed(ClusterEvent ce) {
_log.info("node failed: " + ce.getNode());
}
public void onNodeRemoved(ClusterEvent ce) {
_log.info("node removed: " + ce.getNode());
}
public void onNodeUpdate(ClusterEvent ce) {
_log.info("node updated: " + ce.getNode());
}
public void onCoordinatorChanged(ClusterEvent ce) {
_log.info("coordinator changed: " + ce.getNode());
}
}
public void testCluster() throws Exception {
_cluster0.addClusterListener(new MyClusterListener());
Map map = new HashMap();
map.put("text", "testing123");
_cluster0.getLocalNode().setState(map);
_log.info("nodes: " + _cluster0.getNodes());
Thread.sleep(10000);
assertTrue(true);
}
/**
* An invokable piece of work.
*/
static interface Invocation extends java.io.Serializable {
public void invoke(Cluster cluster, ObjectMessage om);
}
/**
* Listen for messages, if they contain Invocations, invoke() them.
*/
class InvocationListener implements MessageListener {
protected Cluster _cluster;
public InvocationListener(Cluster cluster) {
_cluster = cluster;
}
public void onMessage(Message message) {
_log.info("message received: " + message);
ObjectMessage om = null;
Object tmp = null;
Invocation invocation = null;
try {
if (message instanceof ObjectMessage && (om = (ObjectMessage) message) != null
&& (tmp = om.getObject()) != null && tmp instanceof Invocation
&& (invocation = (Invocation) tmp) != null) {
_log.info("invoking message on: " + _cluster.getLocalNode());
invocation.invoke(_cluster, om);
_log.info("message successfully invoked on: " + _cluster.getLocalNode());
}
else {
_log.warn("bad message: " + message);
}
}
catch (JMSException e) {
_log.warn("unexpected problem", e);
}
}
}
/**
* A request for a piece of work which involves sending a response back to the original requester.
*/
static class Request implements Invocation {
public void invoke(Cluster cluster, ObjectMessage om2) {
try {
System.out.println("request received");
ObjectMessage om = cluster.createObjectMessage();
om.setJMSReplyTo(cluster.getLocalNode().getDestination());
om.setObject(new Response());
System.out.println("sending response");
cluster.send(om2.getJMSReplyTo(), om);
System.out.println("request processed");
}
catch (JMSException e) {
System.err.println("problem sending response");
e.printStackTrace();
}
}
}
static boolean testResponsePassed = false;
/**
* A response containing a piece of work.
*/
static class Response implements Invocation {
public void invoke(Cluster cluster, ObjectMessage om) {
try {
System.out.println("response arrived from: " + om.getJMSReplyTo());
// set a flag to test later
ClusterFunctionTest.testResponsePassed = true;
System.out.println("response processed on: " + cluster.getLocalNode().getDestination());
}
catch (JMSException e) {
System.err.println("problem processing response");
}
}
}
public void testResponse() throws Exception {
MessageListener listener0 = new InvocationListener(_cluster0);
MessageListener listener1 = new InvocationListener(_cluster1);
// 1->(n-1) messages (excludes self)
_cluster0.createConsumer(_cluster0.getDestination(), null, true).setMessageListener(listener0);
// 1->1 messages
_cluster0.createConsumer(_cluster0.getLocalNode().getDestination()).setMessageListener(listener0);
// 1->(n-1) messages (excludes self)
_cluster1.createConsumer(_cluster1.getDestination(), null, true).setMessageListener(listener1);
// 1->1 messages
_cluster1.createConsumer(_cluster1.getLocalNode().getDestination()).setMessageListener(listener1);
ObjectMessage om = _cluster0.createObjectMessage();
om.setJMSReplyTo(_cluster0.getLocalNode().getDestination());
om.setObject(new Request());
testResponsePassed = false;
_cluster0.send(_cluster0.getLocalNode().getDestination(), om);
Thread.sleep(3000);
assertTrue(testResponsePassed);
_log.info("request/response between same node OK");
testResponsePassed = false;
_cluster0.send(_cluster1.getLocalNode().getDestination(), om);
Thread.sleep(3000);
assertTrue(testResponsePassed);
_log.info("request/response between two different nodes OK");
}
}

View File

@ -1,122 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activecluster;
import java.util.List;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.Message;
import org.apache.activecluster.Cluster;
import org.apache.activecluster.LocalNode;
import org.apache.activecluster.Node;
/**
* @version $Revision: 1.4 $
*/
public class ClusterTest extends ClusterTestSupport {
protected int count = 3;
public void testCluster() throws Exception {
cluster = createCluster();
subscribeToCluster();
cluster.start();
Destination destination = cluster.getDestination();
Message message = cluster.createTextMessage("abcdef");
cluster.send(destination, message);
//clusterListener.waitForMessageToArrive();
Thread.sleep(5000);
List list = clusterListener.flushMessages();
assertEquals("Should have received a message: " + list, 1, list.size());
System.out.println("Received message: " + list.get(0));
}
public void testMembershipCluster() throws Exception {
Cluster[] clusters = new Cluster[count];
for (int i = 0; i < count; i++) {
Cluster cluster = createCluster("node:" + i);
clusters[i] = cluster;
if (i==0){
cluster.addClusterListener(new TestingClusterListener(cluster));
}
cluster.start();
System.out.println("started " + clusters[i].getLocalNode().getName());
}
System.out.println("waiting to complete ...");
for (int i = count - 1; i >= 0; i--) {
Cluster cluster = clusters[i];
String localName = cluster.getLocalNode().getName();
boolean completed = cluster.waitForClusterToComplete(count - 1, 5000);
assertTrue("Node: " + i + " with contents: " + dumpConnectedNodes(cluster.getNodes()), completed);
System.out.println(localName + " completed = " + completed + " nodes = "
+ dumpConnectedNodes(cluster.getNodes()));
}
assertClusterMembership(clusters);
assertClusterMembership(clusters);
Cluster testCluster = clusters[0];
LocalNode testNode = testCluster.getLocalNode();
String key = "key";
String value = "value";
Map map = testNode.getState();
map.put(key, value);
testNode.setState(map);
Thread.sleep(500);
for (int i = 1; i < count; i++) {
Node node = (Node) clusters[i].getNodes().get(testNode.getDestination());
assertTrue("The current test node should be in the cluster: " + i, node != null);
assertTrue(node.getState().get(key).equals(value));
}
for (int i = 0; i < count; i++) {
System.out.println(clusters[i].getLocalNode().getName() + " Is coordinator = " + clusters[i].getLocalNode().isCoordinator());
clusters[i].stop();
Thread.sleep(250);
}
}
protected void assertClusterMembership(Cluster[] clusters) {
for (int i = 0; i < count; i++) {
System.out.println("Cluster: " + i + " = " + clusters[i].getNodes());
assertEquals("Size of clusters for cluster: " + i, count - 1, clusters[i].getNodes().size());
System.out.println(clusters[i].getLocalNode().getName() + " Is coordinator = " + clusters[i].getLocalNode().isCoordinator());
}
}
}

View File

@ -1,78 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activecluster;
import org.apache.activecluster.Cluster;
import org.apache.activecluster.Node;
import org.apache.activecluster.impl.ActiveMQClusterFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Topic;
/**
* @version $Revision: 1.3 $
*/
public abstract class ClusterTestSupport extends TestSupport {
protected Cluster cluster;
protected StubMessageListener clusterListener = new StubMessageListener();
protected StubMessageListener inboxListener = new StubMessageListener();
private MessageConsumer clusterConsumer;
private MessageConsumer inboxConsumer;
protected void sendMessageToNode(Node node, String text) throws Exception {
Message message = cluster.createTextMessage(text);
cluster.send(node.getDestination(), message);
}
protected void sendMessageToCluster(String text) throws Exception {
Message message = cluster.createTextMessage(text);
cluster.send(cluster.getDestination(), message);
}
protected void subscribeToCluster() throws Exception {
// listen to cluster messages
Destination clusterDestination = cluster.getDestination();
assertTrue("Local destination must not be null", clusterDestination != null);
clusterConsumer = cluster.createConsumer(clusterDestination);
clusterConsumer.setMessageListener(clusterListener);
// listen to inbox messages (individual messages)
Destination localDestination = cluster.getLocalNode().getDestination();
assertTrue("Local destination must not be null", localDestination != null);
System.out.println("Consuming from local destination: " + localDestination);
inboxConsumer = cluster.createConsumer(localDestination);
inboxConsumer.setMessageListener(inboxListener);
}
protected void setUp() throws Exception {
}
protected void tearDown() throws Exception {
if (cluster != null) {
cluster.stop();
}
}
}

View File

@ -1,77 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.ArrayList;
import java.util.List;
/**
* A mock message listener for testing
*
* @version $Revision: 1.2 $
*/
public class StubMessageListener implements MessageListener {
private List messages = new ArrayList();
private Object semaphore;
public StubMessageListener() {
this(new Object());
}
public StubMessageListener(Object semaphore) {
this.semaphore = semaphore;
}
/**
* @return all the messages on the list so far, clearing the buffer
*/
public synchronized List flushMessages() {
List answer = new ArrayList(messages);
messages.clear();
return answer;
}
public synchronized void onMessage(Message message) {
messages.add(message);
synchronized (semaphore) {
semaphore.notifyAll();
}
}
public void waitForMessageToArrive() {
System.out.println("Waiting for message to arrive");
long start = System.currentTimeMillis();
try {
if (messages.isEmpty()) {
synchronized (semaphore) {
semaphore.wait(4000);
}
}
}
catch (InterruptedException e) {
System.out.println("Caught: " + e);
}
long end = System.currentTimeMillis() - start;
System.out.println("End of wait for " + end + " millis");
}
}

View File

@ -1,63 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activecluster;
import junit.framework.TestCase;
import javax.jms.JMSException;
import java.util.Iterator;
import java.util.Map;
import org.apache.activecluster.Cluster;
import org.apache.activecluster.ClusterException;
import org.apache.activecluster.ClusterFactory;
import org.apache.activecluster.Node;
import org.apache.activecluster.impl.ActiveMQClusterFactory;
/**
* @version $Revision: 1.2 $
*/
public class TestSupport extends TestCase {
protected String dumpConnectedNodes(Map nodes) {
String result = "";
for (Iterator i = nodes.values().iterator(); i.hasNext();) {
Object value = i.next();
if (value instanceof Node) {
Node node = (Node) value;
result += node.getName() + ",";
}
else {
System.out.println("Got node of type: " + value.getClass());
result += value + ",";
}
}
return result;
}
protected Cluster createCluster() throws JMSException, ClusterException {
ClusterFactory factory = new ActiveMQClusterFactory();
return factory.createCluster("ORG.CODEHAUS.ACTIVEMQ.TEST.CLUSTER");
}
protected Cluster createCluster(String name) throws JMSException, ClusterException {
ClusterFactory factory = new ActiveMQClusterFactory();
return factory.createCluster(name,"ORG.CODEHAUS.ACTIVEMQ.TEST.CLUSTER");
}
}

View File

@ -1,58 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster;
import org.apache.activecluster.Cluster;
import org.apache.activecluster.ClusterEvent;
import org.apache.activecluster.ClusterListener;
import org.apache.activecluster.impl.DefaultCluster;
/**
* @version $Revision: 1.2 $
*/
public class TestingClusterListener implements ClusterListener {
private Cluster cluster;
public TestingClusterListener(Cluster cluster){
this.cluster = cluster;
}
public void onNodeAdd(ClusterEvent event) {
printEvent("ADDED: ", event);
}
public void onNodeUpdate(ClusterEvent event) {
printEvent("UPDATED: ", event);
}
public void onNodeRemoved(ClusterEvent event) {
printEvent("REMOVED: ", event);
}
public void onNodeFailed(ClusterEvent event) {
printEvent("FAILED: ", event);
}
public void onCoordinatorChanged(ClusterEvent event) {
printEvent("COORDINATOR: ", event);
}
protected void printEvent(String text, ClusterEvent event) {
System.out.println(text + event.getNode());
System.out.println("Current cluster is now: " + cluster.getNodes().keySet());
}
}

View File

@ -1,75 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster.group;
import java.util.List;
import javax.jms.JMSException;
import org.apache.activecluster.group.BuddyGroupModel;
import org.apache.activecluster.group.Group;
import org.apache.activecluster.group.GroupModel;
/**
* @version $Revision: 1.4 $
*/
public class BuddyGroupModelTest extends GroupTestSupport {
public void testGroups() throws Exception {
addNode("a");
// lets check how many groups have been created
List groups = model.getGroups();
assertEquals("number of groups: " + groups, 1, model.getGroups().size());
Group group = (Group) model.getGroups().get(0);
assertIncomplete(group);
addNode("b");
assertEquals("number of groups: " + groups, 2, model.getGroups().size());
// lets see if the first node is now complete
assertUsable(group);
group = (Group) model.getGroups().get(1);
assertUsable(group);
addNode("c");
assertEquals("number of groups: " + groups, 3, model.getGroups().size());
group = (Group) model.getGroups().get(2);
assertUsable(group);
addNode("d");
assertEquals("number of groups: " + groups, 4, model.getGroups().size());
group = (Group) model.getGroups().get(3);
assertUsable(group);
}
public void testRemoveGroups() throws JMSException {
String[] nodeNames = {"a", "b", "c"};
addNodes(nodeNames);
// TODO now lets remove the nodes and check group states..
}
protected GroupModel createGroupModel() {
return new BuddyGroupModel();
}
}

View File

@ -1,62 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activecluster.group;
import java.util.List;
import javax.jms.JMSException;
import org.apache.activecluster.group.Group;
/**
* @version $Revision: 1.4 $
*/
public class GroupModelTest extends GroupTestSupport {
public void testGroups() throws Exception {
addNode("a");
// lets check how many groups have been created
List groups = model.getGroups();
assertEquals("number of groups: " + groups, 1, model.getGroups().size());
Group group = (Group) model.getGroups().get(0);
assertIncomplete(group);
addNode("b");
assertNotFullButUsable(group);
assertEquals("number of groups: " + groups, 1, model.getGroups().size());
addNode("c");
assertFull(group);
assertEquals("number of groups: " + groups, 1, model.getGroups().size());
addNode("d");
assertEquals("number of groups: " + groups, 2, model.getGroups().size());
group = (Group) model.getGroups().get(1);
assertIncomplete(group);
}
public void testRemoveGroups() throws JMSException {
String[] nodeNames = {"a", "b", "c"};
addNodes(nodeNames);
// TODO now lets remove the nodes and check group states..
}
}

View File

@ -1,84 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activecluster.group;
import java.util.HashMap;
import java.util.Map;
import javax.jms.JMSException;
import junit.framework.TestCase;
import org.apache.activecluster.Cluster;
import org.apache.activecluster.ClusterEvent;
import org.apache.activecluster.ClusterListener;
import org.apache.activecluster.DestinationMarshaller;
import org.apache.activecluster.Node;
import org.apache.activecluster.impl.NodeImpl;
import org.apache.activecluster.impl.SimpleDestinationMarshaller;
/**
* A base class for Group model testing
*
* @version $Revision: 1.5 $
*/
public abstract class GroupTestSupport extends TestCase {
protected GroupModel model;
private ClusterListener listener;
private Cluster cluster;
private Map nodes = new HashMap();
private DestinationMarshaller marshaller = new SimpleDestinationMarshaller();
protected void addNodes(String[] nodeNames) throws JMSException {
for (int i = 0; i < nodeNames.length; i++) {
String nodeName = nodeNames[i];
addNode(nodeName);
}
}
protected void addNode(String nodeName) throws JMSException {
Node node = new NodeImpl(nodeName,marshaller.getDestination(nodeName));
nodes.put(nodeName, node);
listener.onNodeAdd(new ClusterEvent(cluster, node, ClusterEvent.ADD_NODE));
}
protected void assertFull(Group group) {
assertTrue("Group is not full and usable. Members: " + group.getMembers(), group.isFull() && group.isUsable());
}
protected void assertNotFullButUsable(Group group) {
assertTrue("Group is not not full but usable. Members: " + group.getMembers(), !group.isFull() && group.isUsable());
}
protected void assertIncomplete(Group group) {
assertTrue("Group is not not full or usable. Members: " + group.getMembers(), !group.isFull() && !group.isUsable());
}
protected void assertUsable(Group group) {
assertTrue("Group is not usable. Members: " + group.getMembers(), group.isUsable());
}
protected void setUp() throws Exception {
model = createGroupModel();
listener = new GroupClusterListener(model);
}
protected GroupModel createGroupModel() {
return new GroupModel();
}
}