This closes #516

This commit is contained in:
Martyn Taylor 2016-05-16 10:43:24 +01:00
commit 9a9968c35c
15 changed files with 36 additions and 36 deletions

View File

@ -104,7 +104,7 @@ public class BrokerService implements Service {
private PolicyMap destinationPolicy; private PolicyMap destinationPolicy;
private SystemUsage systemUsage; private SystemUsage systemUsage;
private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>(); private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<>();
private TemporaryFolder tmpfolder; private TemporaryFolder tmpfolder;

View File

@ -70,7 +70,7 @@ public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
LOG.info("Sent 0: " + outbound[0]); LOG.info("Sent 0: " + outbound[0]);
LOG.info("Sent 1: " + outbound[1]); LOG.info("Sent 1: " + outbound[1]);
ArrayList<Message> messages = new ArrayList<Message>(); ArrayList<Message> messages = new ArrayList<>();
beginTx(); beginTx();
Message message = consumer.receive(2000); Message message = consumer.receive(2000);
assertEquals(outbound[0], message); assertEquals(outbound[0], message);

View File

@ -61,8 +61,8 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
protected BrokerService broker; protected BrokerService broker;
// for message listener test // for message listener test
private final List<Message> unackMessages = new ArrayList<Message>(MESSAGE_COUNT); private final List<Message> unackMessages = new ArrayList<>(MESSAGE_COUNT);
private final List<Message> ackMessages = new ArrayList<Message>(MESSAGE_COUNT); private final List<Message> ackMessages = new ArrayList<>(MESSAGE_COUNT);
private boolean resendPhase; private boolean resendPhase;
public JmsTransactionTestSupport() { public JmsTransactionTestSupport() {
@ -208,7 +208,7 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
// receives the first message // receives the first message
beginTx(); beginTx();
ArrayList<Message> messages = new ArrayList<Message>(); ArrayList<Message> messages = new ArrayList<>();
LOG.info("About to consume message 1"); LOG.info("About to consume message 1");
Message message = consumer.receive(1000); Message message = consumer.receive(1000);
messages.add(message); messages.add(message);
@ -243,7 +243,7 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
// receives the first message // receives the first message
beginTx(); beginTx();
ArrayList<Message> messages = new ArrayList<Message>(); ArrayList<Message> messages = new ArrayList<>();
LOG.info("About to consume message 1"); LOG.info("About to consume message 1");
Message message = consumer.receive(1000); Message message = consumer.receive(1000);
messages.add(message); messages.add(message);
@ -283,7 +283,7 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
commitTx(); commitTx();
// receives the first message // receives the first message
ArrayList<Message> messages = new ArrayList<Message>(); ArrayList<Message> messages = new ArrayList<>();
LOG.info("About to consume message 1"); LOG.info("About to consume message 1");
beginTx(); beginTx();
Message message = consumer.receive(1000); Message message = consumer.receive(1000);
@ -331,7 +331,7 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
commitTx(); commitTx();
// receives the first message // receives the first message
ArrayList<Message> messages = new ArrayList<Message>(); ArrayList<Message> messages = new ArrayList<>();
LOG.info("About to consume message 1"); LOG.info("About to consume message 1");
beginTx(); beginTx();
Message message = consumer.receive(1000); Message message = consumer.receive(1000);
@ -375,7 +375,7 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
LOG.info("Sent 0: " + outbound[0]); LOG.info("Sent 0: " + outbound[0]);
LOG.info("Sent 1: " + outbound[1]); LOG.info("Sent 1: " + outbound[1]);
ArrayList<Message> messages = new ArrayList<Message>(); ArrayList<Message> messages = new ArrayList<>();
beginTx(); beginTx();
Message message = consumer.receive(1000); Message message = consumer.receive(1000);
messages.add(message); messages.add(message);
@ -426,7 +426,7 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
LOG.info("Sent 0: " + outbound[0]); LOG.info("Sent 0: " + outbound[0]);
LOG.info("Sent 1: " + outbound[1]); LOG.info("Sent 1: " + outbound[1]);
ArrayList<Message> messages = new ArrayList<Message>(); ArrayList<Message> messages = new ArrayList<>();
beginTx(); beginTx();
Message message = consumer.receive(1000); Message message = consumer.receive(1000);
assertEquals(outbound[0], message); assertEquals(outbound[0], message);
@ -566,7 +566,7 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
} }
public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception { public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception {
ArrayList<String> list = new ArrayList<String>(); ArrayList<String> list = new ArrayList<>();
list.add("First"); list.add("First");
Message outbound = session.createObjectMessage(list); Message outbound = session.createObjectMessage(list);
outbound.setStringProperty("foo", "abc"); outbound.setStringProperty("foo", "abc");

View File

@ -44,7 +44,7 @@ public class ConnectionHangOnStartupTest extends OpenwireArtemisBaseTest {
// connection sooner) // connection sooner)
protected String uriString = "failover://(tcp://localhost:62001?wireFormat.maxInactivityDurationInitalDelay=1,tcp://localhost:62002?wireFormat.maxInactivityDurationInitalDelay=1)?randomize=false&maxReconnectDelay=200"; protected String uriString = "failover://(tcp://localhost:62001?wireFormat.maxInactivityDurationInitalDelay=1,tcp://localhost:62002?wireFormat.maxInactivityDurationInitalDelay=1)?randomize=false&maxReconnectDelay=200";
protected EmbeddedJMS master = null; protected EmbeddedJMS master = null;
protected AtomicReference<EmbeddedJMS> slave = new AtomicReference<EmbeddedJMS>(); protected AtomicReference<EmbeddedJMS> slave = new AtomicReference<>();
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {

View File

@ -52,7 +52,7 @@ public class FailoverClusterTest extends OpenwireArtemisBaseTest {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
Map<String, String> params = new HashMap<String, String>(); Map<String, String> params = new HashMap<>();
params.put("rebalanceClusterClients", "true"); params.put("rebalanceClusterClients", "true");
params.put("updateClusterClients", "true"); params.put("updateClusterClients", "true");
@ -110,7 +110,7 @@ public class FailoverClusterTest extends OpenwireArtemisBaseTest {
Thread.sleep(3000); Thread.sleep(3000);
Set<String> set = new HashSet<String>(); Set<String> set = new HashSet<>();
for (ActiveMQConnection c : connections) { for (ActiveMQConnection c : connections) {
set.add(c.getTransportChannel().getRemoteAddress()); set.add(c.getTransportChannel().getRemoteAddress());
} }

View File

@ -56,7 +56,7 @@ public class FailoverComplexClusterTest extends OpenwireArtemisBaseTest {
private EmbeddedJMS[] servers = new EmbeddedJMS[3]; private EmbeddedJMS[] servers = new EmbeddedJMS[3];
private static final int NUMBER_OF_CLIENTS = 30; private static final int NUMBER_OF_CLIENTS = 30;
private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>(); private final List<ActiveMQConnection> connections = new ArrayList<>();
@Before @Before
@ -65,7 +65,7 @@ public class FailoverComplexClusterTest extends OpenwireArtemisBaseTest {
//default setup for most tests //default setup for most tests
private void commonSetup() throws Exception { private void commonSetup() throws Exception {
Map<String, String> params = new HashMap<String, String>(); Map<String, String> params = new HashMap<>();
params.put("rebalanceClusterClients", "true"); params.put("rebalanceClusterClients", "true");
params.put("updateClusterClients", "true"); params.put("updateClusterClients", "true");
@ -349,7 +349,7 @@ public class FailoverComplexClusterTest extends OpenwireArtemisBaseTest {
} }
protected void assertClientsConnectedToThreeBrokers() { protected void assertClientsConnectedToThreeBrokers() {
Set<String> set = new HashSet<String>(); Set<String> set = new HashSet<>();
for (ActiveMQConnection c : connections) { for (ActiveMQConnection c : connections) {
if (c.getTransportChannel().getRemoteAddress() != null) { if (c.getTransportChannel().getRemoteAddress() != null) {
set.add(c.getTransportChannel().getRemoteAddress()); set.add(c.getTransportChannel().getRemoteAddress());
@ -359,7 +359,7 @@ public class FailoverComplexClusterTest extends OpenwireArtemisBaseTest {
} }
protected void assertClientsConnectedToTwoBrokers() { protected void assertClientsConnectedToTwoBrokers() {
Set<String> set = new HashSet<String>(); Set<String> set = new HashSet<>();
for (ActiveMQConnection c : connections) { for (ActiveMQConnection c : connections) {
if (c.getTransportChannel().getRemoteAddress() != null) { if (c.getTransportChannel().getRemoteAddress() != null) {
set.add(c.getTransportChannel().getRemoteAddress()); set.add(c.getTransportChannel().getRemoteAddress());
@ -415,7 +415,7 @@ public class FailoverComplexClusterTest extends OpenwireArtemisBaseTest {
} }
protected void assertClientsConnectionsEvenlyDistributed(double minimumPercentage, double maximumPercentage) { protected void assertClientsConnectionsEvenlyDistributed(double minimumPercentage, double maximumPercentage) {
Map<String, Double> clientConnectionCounts = new HashMap<String, Double>(); Map<String, Double> clientConnectionCounts = new HashMap<>();
int total = 0; int total = 0;
for (ActiveMQConnection c : connections) { for (ActiveMQConnection c : connections) {
String key = c.getTransportChannel().getRemoteAddress(); String key = c.getTransportChannel().getRemoteAddress();
@ -432,7 +432,7 @@ public class FailoverComplexClusterTest extends OpenwireArtemisBaseTest {
} }
} }
Set<String> keys = clientConnectionCounts.keySet(); Set<String> keys = clientConnectionCounts.keySet();
List<String> errorMsgs = new ArrayList<String>(); List<String> errorMsgs = new ArrayList<>();
for (String key : keys) { for (String key : keys) {
double count = clientConnectionCounts.get(key); double count = clientConnectionCounts.get(key);
double percentage = count / total; double percentage = count / total;

View File

@ -228,7 +228,7 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
final CountDownLatch commitDoneLatch = new CountDownLatch(1); final CountDownLatch commitDoneLatch = new CountDownLatch(1);
final CountDownLatch messagesReceived = new CountDownLatch(3); final CountDownLatch messagesReceived = new CountDownLatch(3);
final AtomicBoolean gotCommitException = new AtomicBoolean(false); final AtomicBoolean gotCommitException = new AtomicBoolean(false);
final ArrayList<TextMessage> receivedMessages = new ArrayList<TextMessage>(); final ArrayList<TextMessage> receivedMessages = new ArrayList<>();
final MessageConsumer testConsumer = consumerSession.createConsumer(destination); final MessageConsumer testConsumer = consumerSession.createConsumer(destination);
doByteman.set(true); doByteman.set(true);
testConsumer.setMessageListener(new MessageListener() { testConsumer.setMessageListener(new MessageListener() {

View File

@ -149,7 +149,7 @@ public class FailoverConsumerUnconsumedTest extends OpenwireArtemisBaseTest {
doByteman.set(true); doByteman.set(true);
final Vector<TestConsumer> testConsumers = new Vector<TestConsumer>(); final Vector<TestConsumer> testConsumers = new Vector<>();
TestConsumer testConsumer = new TestConsumer(consumerSession, destination, connection); TestConsumer testConsumer = new TestConsumer(consumerSession, destination, connection);
testConsumer.setMessageListener(new MessageListener() { testConsumer.setMessageListener(new MessageListener() {
@Override @Override

View File

@ -47,10 +47,10 @@ public class FailoverPriorityTest extends OpenwireArtemisBaseTest {
private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61618"; private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61618";
private final HashMap<Integer, String> urls = new HashMap<>(); private final HashMap<Integer, String> urls = new HashMap<>();
private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>(); private final List<ActiveMQConnection> connections = new ArrayList<>();
private EmbeddedJMS[] servers = new EmbeddedJMS[3]; private EmbeddedJMS[] servers = new EmbeddedJMS[3];
private String clientUrl; private String clientUrl;
private Map<String, String> params = new HashMap<String, String>(); private Map<String, String> params = new HashMap<>();
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {

View File

@ -38,7 +38,7 @@ public class FailoverRandomTest extends OpenwireArtemisBaseTest {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
Map<String, String> params = new HashMap<String, String>(); Map<String, String> params = new HashMap<>();
params.put("rebalanceClusterClients", "true"); params.put("rebalanceClusterClients", "true");
params.put("updateClusterClients", "true"); params.put("updateClusterClients", "true");

View File

@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
public class FailoverTransportBrokerTest extends OpenwireArtemisBaseTest { public class FailoverTransportBrokerTest extends OpenwireArtemisBaseTest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverTransportBrokerTest.class); private static final Logger LOG = LoggerFactory.getLogger(FailoverTransportBrokerTest.class);
protected ArrayList<StubConnection> connections = new ArrayList<StubConnection>(); protected ArrayList<StubConnection> connections = new ArrayList<>();
protected long idGenerator; protected long idGenerator;
protected int msgIdGenerator; protected int msgIdGenerator;
protected int maxWait = 10000; protected int maxWait = 10000;

View File

@ -119,7 +119,7 @@ public class FailoverUpdateURIsTest extends OpenwireArtemisBaseTest {
@Test @Test
public void testAutoUpdateURIs() throws Exception { public void testAutoUpdateURIs() throws Exception {
Map<String, String> params = new HashMap<String, String>(); Map<String, String> params = new HashMap<>();
params.put("updateClusterClients", "true"); params.put("updateClusterClients", "true");
Configuration config0 = createConfig("localhost", 0, params); Configuration config0 = createConfig("localhost", 0, params);
deployClusterConfiguration(config0, 10); deployClusterConfiguration(config0, 10);

View File

@ -41,7 +41,7 @@ import java.util.concurrent.TimeUnit;
public class TwoBrokerFailoverClusterTest extends OpenwireArtemisBaseTest { public class TwoBrokerFailoverClusterTest extends OpenwireArtemisBaseTest {
private static final int NUMBER_OF_CLIENTS = 30; private static final int NUMBER_OF_CLIENTS = 30;
private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>(); private final List<ActiveMQConnection> connections = new ArrayList<>();
private EmbeddedJMS server0; private EmbeddedJMS server0;
private EmbeddedJMS server1; private EmbeddedJMS server1;
private String clientUrl; private String clientUrl;
@ -126,7 +126,7 @@ public class TwoBrokerFailoverClusterTest extends OpenwireArtemisBaseTest {
} }
protected void assertClientsConnectedToTwoBrokers() { protected void assertClientsConnectedToTwoBrokers() {
Set<String> set = new HashSet<String>(); Set<String> set = new HashSet<>();
for (ActiveMQConnection c : connections) { for (ActiveMQConnection c : connections) {
if (c.getTransportChannel().getRemoteAddress() != null) { if (c.getTransportChannel().getRemoteAddress() != null) {
set.add(c.getTransportChannel().getRemoteAddress()); set.add(c.getTransportChannel().getRemoteAddress());
@ -136,7 +136,7 @@ public class TwoBrokerFailoverClusterTest extends OpenwireArtemisBaseTest {
} }
protected void assertClientsConnectionsEvenlyDistributed(double minimumPercentage) { protected void assertClientsConnectionsEvenlyDistributed(double minimumPercentage) {
Map<String, Double> clientConnectionCounts = new HashMap<String, Double>(); Map<String, Double> clientConnectionCounts = new HashMap<>();
int total = 0; int total = 0;
for (ActiveMQConnection c : connections) { for (ActiveMQConnection c : connections) {
String key = c.getTransportChannel().getRemoteAddress(); String key = c.getTransportChannel().getRemoteAddress();

View File

@ -61,7 +61,7 @@ import org.slf4j.LoggerFactory;
public class FanoutTransportBrokerTest extends OpenwireArtemisBaseTest { public class FanoutTransportBrokerTest extends OpenwireArtemisBaseTest {
public static final boolean FAST_NO_MESSAGE_LEFT_ASSERT = System.getProperty("FAST_NO_MESSAGE_LEFT_ASSERT", "true").equals("true"); public static final boolean FAST_NO_MESSAGE_LEFT_ASSERT = System.getProperty("FAST_NO_MESSAGE_LEFT_ASSERT", "true").equals("true");
protected ArrayList<StubConnection> connections = new ArrayList<StubConnection>(); protected ArrayList<StubConnection> connections = new ArrayList<>();
protected long idGenerator; protected long idGenerator;
protected int msgIdGenerator; protected int msgIdGenerator;
protected int maxWait = 10000; protected int maxWait = 10000;

View File

@ -52,7 +52,7 @@ public class SocketProxy {
private CountDownLatch closed = new CountDownLatch(1); private CountDownLatch closed = new CountDownLatch(1);
public final List<Bridge> connections = new LinkedList<Bridge>(); public final List<Bridge> connections = new LinkedList<>();
private int listenPort = 0; private int listenPort = 0;
@ -131,7 +131,7 @@ public class SocketProxy {
public void close() { public void close() {
List<Bridge> connections; List<Bridge> connections;
synchronized(this.connections) { synchronized(this.connections) {
connections = new ArrayList<Bridge>(this.connections); connections = new ArrayList<>(this.connections);
} }
LOG.info("close, numConnections=" + connections.size()); LOG.info("close, numConnections=" + connections.size());
for (Bridge con : connections) { for (Bridge con : connections) {
@ -148,7 +148,7 @@ public class SocketProxy {
public void halfClose() { public void halfClose() {
List<Bridge> connections; List<Bridge> connections;
synchronized(this.connections) { synchronized(this.connections) {
connections = new ArrayList<Bridge>(this.connections); connections = new ArrayList<>(this.connections);
} }
LOG.info("halfClose, numConnections=" + connections.size()); LOG.info("halfClose, numConnections=" + connections.size());
for (Bridge con : connections) { for (Bridge con : connections) {
@ -288,7 +288,7 @@ public class SocketProxy {
protected Socket src; protected Socket src;
private Socket destination; private Socket destination;
private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>(); private AtomicReference<CountDownLatch> pause = new AtomicReference<>();
public Pump(Socket source, Socket dest) { public Pump(Socket source, Socket dest) {
super("SocketProxy-DataTransfer-" + source.getPort() + ":" + dest.getPort()); super("SocketProxy-DataTransfer-" + source.getPort() + ":" + dest.getPort());
@ -339,7 +339,7 @@ public class SocketProxy {
private ServerSocket socket; private ServerSocket socket;
private URI target; private URI target;
private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>(); private AtomicReference<CountDownLatch> pause = new AtomicReference<>();
public Acceptor(ServerSocket serverSocket, URI uri) { public Acceptor(ServerSocket serverSocket, URI uri) {