This closes #927

This commit is contained in:
Clebert Suconic 2016-12-15 17:53:50 -05:00
commit db6ee74a33
11 changed files with 223 additions and 25 deletions

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.api.config;
import org.apache.activemq.artemis.ArtemisConstants; import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.RoutingType;
/** /**
@ -361,7 +362,7 @@ public final class ActiveMQDefaultConfiguration {
private static boolean DEFAULT_DIVERT_EXCLUSIVE = false; private static boolean DEFAULT_DIVERT_EXCLUSIVE = false;
// how the divert should handle the message's routing type // how the divert should handle the message's routing type
private static String DEFAULT_DIVERT_ROUTING_TYPE = RoutingType.STRIP.toString(); private static String DEFAULT_DIVERT_ROUTING_TYPE = DivertConfigurationRoutingType.STRIP.toString();
// If true then the server will request a backup on another node // If true then the server will request a backup on another node
private static boolean DEFAULT_HAPOLICY_REQUEST_BACKUP = false; private static boolean DEFAULT_HAPOLICY_REQUEST_BACKUP = false;

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server;
/**
* This class essentially mirrors {@code RoutingType} except it has some additional members to support special
* configuration semantics for diverts. These additional members weren't put in {@code RoutingType} so as to not
* confuse users.
*/
public enum DivertConfigurationRoutingType {
MULTICAST, ANYCAST, STRIP, PASS;
public byte getType() {
switch (this) {
case MULTICAST:
return 0;
case ANYCAST:
return 1;
case STRIP:
return 2;
case PASS:
return 3;
default:
return -1;
}
}
public static DivertConfigurationRoutingType getType(byte type) {
switch (type) {
case 0:
return MULTICAST;
case 1:
return ANYCAST;
case 2:
return STRIP;
case 3:
return PASS;
default:
return null;
}
}
}

View File

@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.server;
public enum RoutingType { public enum RoutingType {
MULTICAST, ANYCAST, STRIP, PASS; MULTICAST, ANYCAST;
public byte getType() { public byte getType() {
switch (this) { switch (this) {
@ -26,10 +26,6 @@ public enum RoutingType {
return 0; return 0;
case ANYCAST: case ANYCAST:
return 1; return 1;
case STRIP:
return 2;
case PASS:
return 3;
default: default:
return -1; return -1;
} }
@ -41,10 +37,6 @@ public enum RoutingType {
return MULTICAST; return MULTICAST;
case 1: case 1:
return ANYCAST; return ANYCAST;
case 2:
return STRIP;
case 3:
return PASS;
default: default:
return null; return null;
} }

View File

@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.config;
import java.io.Serializable; import java.io.Serializable;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
public class DivertConfiguration implements Serializable { public class DivertConfiguration implements Serializable {
@ -40,7 +40,7 @@ public class DivertConfiguration implements Serializable {
private String transformerClassName = null; private String transformerClassName = null;
private RoutingType routingType = RoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType()); private DivertConfigurationRoutingType routingType = DivertConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType());
public DivertConfiguration() { public DivertConfiguration() {
} }
@ -73,7 +73,7 @@ public class DivertConfiguration implements Serializable {
return transformerClassName; return transformerClassName;
} }
public RoutingType getRoutingType() { public DivertConfigurationRoutingType getRoutingType() {
return routingType; return routingType;
} }
@ -140,7 +140,7 @@ public class DivertConfiguration implements Serializable {
/** /**
* @param routingType the routingType to set * @param routingType the routingType to set
*/ */
public DivertConfiguration setRoutingType(final RoutingType routingType) { public DivertConfiguration setRoutingType(final DivertConfigurationRoutingType routingType) {
this.routingType = routingType; this.routingType = routingType;
return this; return this;
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.config.impl; package org.apache.activemq.artemis.core.config.impl;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
@ -180,10 +181,10 @@ public final class Validators {
@Override @Override
public void validate(final String name, final Object value) { public void validate(final String name, final Object value) {
String val = (String) value; String val = (String) value;
if (val == null || !val.equals(RoutingType.ANYCAST.toString()) && if (val == null || !val.equals(DivertConfigurationRoutingType.ANYCAST.toString()) &&
!val.equals(RoutingType.MULTICAST.toString()) && !val.equals(DivertConfigurationRoutingType.MULTICAST.toString()) &&
!val.equals(RoutingType.PASS.toString()) && !val.equals(DivertConfigurationRoutingType.PASS.toString()) &&
!val.equals(RoutingType.STRIP.toString())) { !val.equals(DivertConfigurationRoutingType.STRIP.toString())) {
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingType(val); throw ActiveMQMessageBundle.BUNDLE.invalidRoutingType(val);
} }
} }

View File

@ -60,6 +60,7 @@ import org.apache.activemq.artemis.core.config.storage.FileStorageConfiguration;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
@ -1592,7 +1593,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
String transformerClassName = getString(e, "transformer-class-name", null, Validators.NO_CHECK); String transformerClassName = getString(e, "transformer-class-name", null, Validators.NO_CHECK);
RoutingType routingType = RoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(), Validators.DIVERT_ROUTING_TYPE)); DivertConfigurationRoutingType routingType = DivertConfigurationRoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(), Validators.DIVERT_ROUTING_TYPE));
String filterString = null; String filterString = null;

View File

@ -79,6 +79,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory; import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.RoutingType;
@ -1996,7 +1997,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO(); clearIO();
try { try {
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName).setRoutingType(RoutingType.valueOf(routingType)); DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName).setRoutingType(DivertConfigurationRoutingType.valueOf(routingType));
server.deployDivert(config); server.deployDivert(config);
} finally { } finally {
blockOnIO(); blockOnIO();

View File

@ -1561,9 +1561,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean temporary, final boolean temporary,
final boolean autoCreated) throws Exception { final boolean autoCreated) throws Exception {
AddressSettings as = getAddressSettingsRepository().getMatch(address.toString()); AddressSettings as = getAddressSettingsRepository().getMatch(address.toString());
return createQueue(address, routingType, queueName, filterString, user, durable, temporary, return createQueue(address, routingType, queueName, filterString, user, durable, temporary, autoCreated,
as.getDefaultMaxConsumers(), as.getDefaultMaxConsumers(),
as.isDefaultDeleteOnNoConsumers(), autoCreated); as.isDefaultDeleteOnNoConsumers(), as.isAutoCreateAddresses());
} }
@Override @Override
@ -1725,7 +1725,19 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final SecurityAuth session, final SecurityAuth session,
final boolean checkConsumerCount, final boolean checkConsumerCount,
final boolean removeConsumers) throws Exception { final boolean removeConsumers) throws Exception {
destroyQueue(queueName, session, checkConsumerCount, removeConsumers, true); if (postOffice == null) {
return;
}
Binding binding = postOffice.getBinding(queueName);
if (binding == null) {
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
}
String address = binding.getAddress().toString();
destroyQueue(queueName, session, checkConsumerCount, removeConsumers, addressSettingsRepository.getMatch(address).isAutoDeleteAddresses());
} }
@Override @Override

View File

@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerMessage;
@ -51,7 +52,7 @@ public class DivertImpl implements Divert {
private final StorageManager storageManager; private final StorageManager storageManager;
private final RoutingType routingType; private final DivertConfigurationRoutingType routingType;
public DivertImpl(final SimpleString forwardAddress, public DivertImpl(final SimpleString forwardAddress,
final SimpleString uniqueName, final SimpleString uniqueName,
@ -61,7 +62,7 @@ public class DivertImpl implements Divert {
final Transformer transformer, final Transformer transformer,
final PostOffice postOffice, final PostOffice postOffice,
final StorageManager storageManager, final StorageManager storageManager,
final RoutingType routingType) { final DivertConfigurationRoutingType routingType) {
this.forwardAddress = forwardAddress; this.forwardAddress = forwardAddress;
this.uniqueName = uniqueName; this.uniqueName = uniqueName;

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
import org.junit.Test;
public class AutoDeleteAddressTest extends ActiveMQTestBase {
public final SimpleString addressA = new SimpleString("addressA");
public final SimpleString queueA = new SimpleString("queueA");
private ServerLocator locator;
private ActiveMQServer server;
private ClientSessionFactory cf;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
locator = createInVMNonHALocator();
server = createServer(false);
server.start();
cf = createSessionFactory(locator);
}
@Test
public void testAutoDeleteAutoCreatedAddress() throws Exception {
// auto-delete-addresses defaults to true
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, true);
assertNotNull(server.getAddressInfo(addressA));
cf.createSession().createConsumer(queueA).close();
assertNull(server.getAddressInfo(addressA));
}
@Test
public void testNegativeAutoDeleteAutoCreatedAddress() throws Exception {
server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings().setAutoDeleteAddresses(false));
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, true);
assertNotNull(server.getAddressInfo(addressA));
cf.createSession().createConsumer(queueA).close();
assertNotNull(server.getAddressInfo(addressA));
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
import org.junit.Test;
public class AutoDeleteQueueTest extends ActiveMQTestBase {
public final SimpleString addressA = new SimpleString("addressA");
public final SimpleString queueA = new SimpleString("queueA");
private ServerLocator locator;
private ActiveMQServer server;
private ClientSessionFactory cf;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
locator = createInVMNonHALocator();
server = createServer(false);
server.start();
cf = createSessionFactory(locator);
}
@Test
public void testAutoDeleteAutoCreatedQueue() throws Exception {
// auto-delete-queues defaults to true
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, true);
assertNotNull(server.locateQueue(queueA));
cf.createSession().createConsumer(queueA).close();
assertNull(server.locateQueue(queueA));
}
@Test
public void testNegativeAutoDeleteAutoCreatedQueue() throws Exception {
server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings().setAutoDeleteQueues(false));
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, true);
assertNotNull(server.locateQueue(queueA));
cf.createSession().createConsumer(queueA).close();
assertNotNull(server.locateQueue(queueA));
}
}