This closes #112 openwire fix
This commit is contained in:
commit
2cc4d399b2
|
@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
|
||||||
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
|
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||||
import org.apache.activemq.artemis.core.security.Role;
|
import org.apache.activemq.artemis.core.security.Role;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||||
|
@ -196,12 +197,12 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase
|
||||||
private void translatePolicyMap(Configuration serverConfig, PolicyMap policyMap)
|
private void translatePolicyMap(Configuration serverConfig, PolicyMap policyMap)
|
||||||
{
|
{
|
||||||
List allEntries = policyMap.getAllEntries();
|
List allEntries = policyMap.getAllEntries();
|
||||||
|
Map<String, AddressSettings> settingsMap = serverConfig.getAddressesSettings();
|
||||||
for (Object o : allEntries)
|
for (Object o : allEntries)
|
||||||
{
|
{
|
||||||
PolicyEntry entry = (PolicyEntry)o;
|
PolicyEntry entry = (PolicyEntry)o;
|
||||||
org.apache.activemq.command.ActiveMQDestination targetDest = entry.getDestination();
|
org.apache.activemq.command.ActiveMQDestination targetDest = entry.getDestination();
|
||||||
String match = getCorePattern(targetDest);
|
String match = getCorePattern(targetDest);
|
||||||
Map<String, AddressSettings> settingsMap = serverConfig.getAddressesSettings();
|
|
||||||
AddressSettings settings = settingsMap.get(match);
|
AddressSettings settings = settingsMap.get(match);
|
||||||
if (settings == null)
|
if (settings == null)
|
||||||
{
|
{
|
||||||
|
@ -216,6 +217,25 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase
|
||||||
settings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
|
settings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PolicyEntry defaultEntry = policyMap.getDefaultEntry();
|
||||||
|
if (defaultEntry != null)
|
||||||
|
{
|
||||||
|
AddressSettings defSettings = settingsMap.get("#");
|
||||||
|
if (defSettings == null)
|
||||||
|
{
|
||||||
|
defSettings = new AddressSettings();
|
||||||
|
settingsMap.put("#", defSettings);
|
||||||
|
}
|
||||||
|
if (defaultEntry.isProducerFlowControl())
|
||||||
|
{
|
||||||
|
defSettings.setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
|
||||||
|
if (bservice.getSystemUsage().isSendFailIfNoSpace())
|
||||||
|
{
|
||||||
|
defSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getCorePattern(org.apache.activemq.command.ActiveMQDestination dest)
|
private String getCorePattern(org.apache.activemq.command.ActiveMQDestination dest)
|
||||||
|
@ -237,9 +257,20 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase
|
||||||
@Override
|
@Override
|
||||||
public void stop() throws Exception
|
public void stop() throws Exception
|
||||||
{
|
{
|
||||||
server.stop();
|
try
|
||||||
testQueues.clear();
|
{
|
||||||
stopped = true;
|
server.stop();
|
||||||
|
testQueues.clear();
|
||||||
|
stopped = true;
|
||||||
|
}
|
||||||
|
catch (Throwable t)
|
||||||
|
{
|
||||||
|
//ignore
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
server = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void makeSureQueueExists(String qname) throws Exception
|
public void makeSureQueueExists(String qname) throws Exception
|
||||||
|
|
Loading…
Reference in New Issue