This commit is contained in:
Martyn Taylor 2015-11-25 11:26:58 +00:00
commit 686e645c3a
6 changed files with 167 additions and 19 deletions

View File

@ -160,6 +160,19 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
this.refCount = 1;
this.channelName = channelName;
this.channel = channel;
//we always add this for the first ref count
channel.setReceiver(new ReceiverAdapter() {
@Override
public void receive(org.jgroups.Message msg) {
synchronized (receivers) {
for (JGroupsReceiver r : receivers) {
r.receive(msg);
}
}
}
});
}
public synchronized void close(boolean closeWrappedChannel) {
@ -171,6 +184,8 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
else {
JChannelManager.removeChannel(this.channelName);
}
//we always remove the receiver as its no longer needed
channel.setReceiver(null);
}
}
@ -183,17 +198,6 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
public synchronized void connect() throws Exception {
if (channel.isConnected())
return;
channel.setReceiver(new ReceiverAdapter() {
@Override
public void receive(org.jgroups.Message msg) {
synchronized (receivers) {
for (JGroupsReceiver r : receivers) {
r.receive(msg);
}
}
}
});
channel.connect(channelName);
}

View File

@ -81,22 +81,36 @@ public final class ConfigurationUtils {
case COLOCATED: {
ColocatedPolicyConfiguration pc = (ColocatedPolicyConfiguration) conf;
HAPolicyConfiguration backupConf = pc.getBackupConfig();
BackupPolicy backupPolicy;
if (backupConf == null) {
backupPolicy = new ReplicaPolicy();
}
else {
backupPolicy = (BackupPolicy) getHAPolicy(backupConf);
}
HAPolicyConfiguration liveConf = pc.getLiveConfig();
HAPolicy livePolicy;
//if null default to colocated
if (liveConf == null) {
livePolicy = new ReplicatedPolicy();
}
else {
livePolicy = getHAPolicy(liveConf);
}
HAPolicyConfiguration backupConf = pc.getBackupConfig();
BackupPolicy backupPolicy;
if (backupConf == null) {
if (livePolicy instanceof ReplicatedPolicy) {
backupPolicy = new ReplicaPolicy();
}
else if (livePolicy instanceof SharedStoreMasterPolicy) {
backupPolicy = new SharedStoreSlavePolicy();
}
else {
throw ActiveMQMessageBundle.BUNDLE.liveBackupMismatch();
}
}
else {
backupPolicy = (BackupPolicy) getHAPolicy(backupConf);
}
if ((livePolicy instanceof ReplicatedPolicy && !(backupPolicy instanceof ReplicaPolicy)) ||
(livePolicy instanceof SharedStoreMasterPolicy && !(backupPolicy instanceof SharedStoreSlavePolicy))) {
throw ActiveMQMessageBundle.BUNDLE.liveBackupMismatch();
}
return new ColocatedPolicy(pc.isRequestBackup(), pc.getBackupRequestRetries(), pc.getBackupRequestRetryInterval(), pc.getMaxBackups(), pc.getBackupPortOffset(), pc.getExcludedConnectors(), livePolicy, backupPolicy);
}

View File

@ -362,4 +362,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 119114, value = "Replication synchronization process timed out after waiting {0} milliseconds", format = Message.Format.MESSAGE_FORMAT)
IllegalStateException replicationSynchronizationTimeout(long timeout);
@Message(id = 119115, value = "Colocated Policy hasn't different type live and backup", format = Message.Format.MESSAGE_FORMAT)
ActiveMQIllegalStateException liveBackupMismatch();
}

View File

@ -333,6 +333,31 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
}
}
@Test
public void colocatedTestNullBackup() throws Exception {
Configuration configuration = createConfiguration("colocated-hapolicy-config-null-backup.xml");
ActiveMQServerImpl server = new ActiveMQServerImpl(configuration);
try {
server.start();
Activation activation = server.getActivation();
assertTrue(activation instanceof ColocatedActivation);
HAPolicy haPolicy = server.getHAPolicy();
assertTrue(haPolicy instanceof ColocatedPolicy);
ColocatedPolicy colocatedPolicy = (ColocatedPolicy) haPolicy;
ReplicatedPolicy livePolicy = (ReplicatedPolicy) colocatedPolicy.getLivePolicy();
assertNotNull(livePolicy);
assertEquals(livePolicy.getGroupName(), "purple");
assertTrue(livePolicy.isCheckForLiveServer());
assertEquals(livePolicy.getClusterName(), "abcdefg");
ReplicaPolicy backupPolicy = (ReplicaPolicy) colocatedPolicy.getBackupPolicy();
assertNotNull(backupPolicy);
}
finally {
server.stop();
}
}
@Test
public void colocatedTest2() throws Exception {
Configuration configuration = createConfiguration("colocated-hapolicy-config2.xml");
@ -358,6 +383,29 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
}
}
@Test
public void colocatedTest2nullbackup() throws Exception {
Configuration configuration = createConfiguration("colocated-hapolicy-config2-null-backup.xml");
ActiveMQServerImpl server = new ActiveMQServerImpl(configuration);
try {
server.start();
Activation activation = server.getActivation();
assertTrue(activation instanceof ColocatedActivation);
HAPolicy haPolicy = server.getHAPolicy();
assertTrue(haPolicy instanceof ColocatedPolicy);
ColocatedPolicy colocatedPolicy = (ColocatedPolicy) haPolicy;
SharedStoreMasterPolicy livePolicy = (SharedStoreMasterPolicy) colocatedPolicy.getLivePolicy();
assertNotNull(livePolicy);
assertFalse(livePolicy.isFailoverOnServerShutdown());
SharedStoreSlavePolicy backupPolicy = (SharedStoreSlavePolicy) colocatedPolicy.getBackupPolicy();
assertNotNull(backupPolicy);
}
finally {
server.stop();
}
}
private void liveOnlyTest(String file) throws Exception {
Configuration configuration = createConfiguration(file);
ActiveMQServerImpl server = new ActiveMQServerImpl(configuration);

View File

@ -0,0 +1,42 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration
xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq ../../../../activemq-server/src/main/resources/schema/artemis-server.xsd">
<core xmlns="urn:activemq:core">
<discovery-groups>
<discovery-group name="wahey"/>
</discovery-groups>
<ha-policy>
<replication>
<colocated>
<backup-request-retries>44</backup-request-retries>
<backup-request-retry-interval>33</backup-request-retry-interval>
<max-backups>3</max-backups>
<request-backup>false</request-backup>
<backup-port-offset>33</backup-port-offset>
<master>
<group-name>purple</group-name>
<check-for-live-server>true</check-for-live-server>
<cluster-name>abcdefg</cluster-name>
</master>
</colocated>
</replication>
</ha-policy>
</core>
</configuration>

View File

@ -0,0 +1,37 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration
xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq ../../../../activemq-server/src/main/resources/schema/artemis-server.xsd">
<core xmlns="urn:activemq:core">
<ha-policy>
<shared-store>
<colocated>
<backup-request-retries>44</backup-request-retries>
<backup-request-retry-interval>33</backup-request-retry-interval>
<max-backups>3</max-backups>
<request-backup>false</request-backup>
<backup-port-offset>33</backup-port-offset>
<master>
<failover-on-shutdown>false</failover-on-shutdown>
</master>
</colocated>
</shared-store>
</ha-policy>
</core>
</configuration>