This closes #116 openwire fixes
This commit is contained in:
commit
53fe8ec9f7
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -633,6 +634,22 @@ public class BrokerService implements Service {
|
||||||
map.clear();
|
map.clear();
|
||||||
return runningBrokers;
|
return runningBrokers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public URI getConnectURI() {
|
||||||
|
URI uri = null;
|
||||||
|
try {
|
||||||
|
if (this.extraConnectors.size() > 0) {
|
||||||
|
Integer port = extraConnectors.iterator().next();
|
||||||
|
uri = new URI("tcp://localhost:" + port);
|
||||||
|
} else {
|
||||||
|
uri = new URI(this.getDefaultUri());
|
||||||
|
}
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
//ignore
|
||||||
|
}
|
||||||
|
return uri;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -199,6 +199,12 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
|
||||||
settings.setSlowConsumerCheckPeriod(1);
|
settings.setSlowConsumerCheckPeriod(1);
|
||||||
settings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
|
settings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
|
||||||
}
|
}
|
||||||
|
if (entry.isProducerFlowControl()) {
|
||||||
|
settings.setMaxSizeBytes(10240).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
|
||||||
|
if (bservice.getSystemUsage().isSendFailIfNoSpace()) {
|
||||||
|
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
PolicyEntry defaultEntry = policyMap.getDefaultEntry();
|
PolicyEntry defaultEntry = policyMap.getDefaultEntry();
|
||||||
|
@ -209,7 +215,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
|
||||||
settingsMap.put("#", defSettings);
|
settingsMap.put("#", defSettings);
|
||||||
}
|
}
|
||||||
if (defaultEntry.isProducerFlowControl()) {
|
if (defaultEntry.isProducerFlowControl()) {
|
||||||
defSettings.setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
|
defSettings.setMaxSizeBytes(10240).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
|
||||||
if (bservice.getSystemUsage().isSendFailIfNoSpace()) {
|
if (bservice.getSystemUsage().isSendFailIfNoSpace()) {
|
||||||
defSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
|
defSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1653,10 +1653,6 @@ public class BrokerTest extends BrokerTestSupport {
|
||||||
connection.request(createAck(consumerInfo, m3, 1, MessageAck.DELIVERED_ACK_TYPE));
|
connection.request(createAck(consumerInfo, m3, 1, MessageAck.DELIVERED_ACK_TYPE));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testGetServices() throws Exception {
|
|
||||||
assertTrue(broker.getServices().length != 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Test suite() {
|
public static Test suite() {
|
||||||
return suite(BrokerTest.class);
|
return suite(BrokerTest.class);
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.BrokerTest;
|
import org.apache.activemq.broker.BrokerTest;
|
||||||
import org.apache.activemq.broker.StubConnection;
|
import org.apache.activemq.broker.StubConnection;
|
||||||
import org.apache.activemq.broker.TransportConnector;
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
|
import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper;
|
||||||
|
|
||||||
public abstract class TransportBrokerTestSupport extends BrokerTest {
|
public abstract class TransportBrokerTestSupport extends BrokerTest {
|
||||||
|
|
||||||
|
@ -65,7 +66,7 @@ public abstract class TransportBrokerTestSupport extends BrokerTest {
|
||||||
// Note: on platforms like OS X we cannot bind to the actual hostname, so we
|
// Note: on platforms like OS X we cannot bind to the actual hostname, so we
|
||||||
// instead use the original host name (typically localhost) to bind to
|
// instead use the original host name (typically localhost) to bind to
|
||||||
|
|
||||||
URI actualURI = connector.getServer().getConnectURI();
|
URI actualURI = this.broker.getConnectURI();
|
||||||
URI connectURI = new URI(actualURI.getScheme(), actualURI.getUserInfo(), bindURI.getHost(), actualURI.getPort(), actualURI.getPath(), bindURI.getQuery(), bindURI.getFragment());
|
URI connectURI = new URI(actualURI.getScheme(), actualURI.getUserInfo(), bindURI.getHost(), actualURI.getPort(), actualURI.getPath(), bindURI.getQuery(), bindURI.getFragment());
|
||||||
|
|
||||||
Transport transport = TransportFactory.connect(connectURI);
|
Transport transport = TransportFactory.connect(connectURI);
|
||||||
|
|
Loading…
Reference in New Issue