This closes #685

This commit is contained in:
Clebert Suconic 2016-07-29 15:32:57 -04:00
commit 5fe10d044a
6 changed files with 78 additions and 106 deletions

View File

@ -84,9 +84,9 @@ public abstract class AbstractJDBCDriver {
public void destroy() throws Exception { public void destroy() throws Exception {
try { try {
connection.setAutoCommit(false); connection.setAutoCommit(false);
Statement statement = connection.createStatement(); try (Statement statement = connection.createStatement()) {
statement.executeUpdate("DROP TABLE " + sqlProvider.getTableName()); statement.executeUpdate("DROP TABLE " + sqlProvider.getTableName());
statement.close(); }
connection.commit(); connection.commit();
} }
catch (SQLException e) { catch (SQLException e) {

View File

@ -135,9 +135,7 @@ public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMess
return null; return null;
} }
try { try (ObjectInputStream ois = new ObjectInputStreamWithClassLoader(new ByteArrayInputStream(data))) {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStreamWithClassLoader(bais);
Serializable object = (Serializable) ois.readObject(); Serializable object = (Serializable) ois.readObject();
return object; return object;
} }

View File

@ -164,10 +164,8 @@ public class OpenWireMessageConverter implements MessageConverter {
break; break;
case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE: case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
if (messageCompressed) { if (messageCompressed) {
InputStream ois = new ByteArrayInputStream(contents); try (InputStream ois = new InflaterInputStream(new ByteArrayInputStream(contents));
ois = new InflaterInputStream(ois); org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
byte[] buf = new byte[1024]; byte[] buf = new byte[1024];
int n = ois.read(buf); int n = ois.read(buf);
while (n != -1) { while (n != -1) {
@ -258,8 +256,7 @@ public class OpenWireMessageConverter implements MessageConverter {
case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE: case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
if (messageCompressed) { if (messageCompressed) {
Inflater inflater = new Inflater(); Inflater inflater = new Inflater();
org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream(); try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
try {
int length = ByteSequenceData.readIntBig(contents); int length = ByteSequenceData.readIntBig(contents);
contents.offset = 0; contents.offset = 0;
byte[] data = Arrays.copyOfRange(contents.getData(), 4, contents.getLength()); byte[] data = Arrays.copyOfRange(contents.getData(), 4, contents.getLength());
@ -275,7 +272,6 @@ public class OpenWireMessageConverter implements MessageConverter {
} }
finally { finally {
inflater.end(); inflater.end();
decompressed.close();
} }
} }
body.writeBytes(contents.data, contents.offset, contents.length); body.writeBytes(contents.data, contents.offset, contents.length);
@ -506,10 +502,10 @@ public class OpenWireMessageConverter implements MessageConverter {
if (isCompressed) { if (isCompressed) {
out = new DeflaterOutputStream(out); out = new DeflaterOutputStream(out);
} }
DataOutputStream dataOut = new DataOutputStream(out); try (DataOutputStream dataOut = new DataOutputStream(out)) {
MarshallingSupport.writeUTF8(dataOut, text.toString()); MarshallingSupport.writeUTF8(dataOut, text.toString());
bytes = bytesOut.toByteArray(); bytes = bytesOut.toByteArray();
out.close(); }
} }
} }
else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) { else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
@ -522,9 +518,9 @@ public class OpenWireMessageConverter implements MessageConverter {
if (isCompressed) { if (isCompressed) {
os = new DeflaterOutputStream(os); os = new DeflaterOutputStream(os);
} }
DataOutputStream dataOut = new DataOutputStream(os); try (DataOutputStream dataOut = new DataOutputStream(os)) {
MarshallingSupport.marshalPrimitiveMap(map, dataOut); MarshallingSupport.marshalPrimitiveMap(map, dataOut);
dataOut.close(); }
bytes = out.toByteArray(); bytes = out.toByteArray();
} }
else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) { else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
@ -545,7 +541,7 @@ public class OpenWireMessageConverter implements MessageConverter {
if (isCompressed) { if (isCompressed) {
out = new DeflaterOutputStream(bytesOut); out = new DeflaterOutputStream(bytesOut);
} }
DataOutputStream dataOut = new DataOutputStream(out); try (DataOutputStream dataOut = new DataOutputStream(out)) {
boolean stop = false; boolean stop = false;
while (!stop && buffer.readable()) { while (!stop && buffer.readable()) {
@ -599,7 +595,7 @@ public class OpenWireMessageConverter implements MessageConverter {
break; break;
} }
} }
dataOut.close(); }
bytes = bytesOut.toByteArray(); bytes = bytesOut.toByteArray();
} }
else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) { else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
@ -608,10 +604,9 @@ public class OpenWireMessageConverter implements MessageConverter {
buffer.readBytes(bytes); buffer.readBytes(bytes);
if (isCompressed) { if (isCompressed) {
int length = bytes.length; int length = bytes.length;
org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream();
compressed.write(new byte[4]);
Deflater deflater = new Deflater(); Deflater deflater = new Deflater();
try { try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
compressed.write(new byte[4]);
deflater.setInput(bytes); deflater.setInput(bytes);
deflater.finish(); deflater.finish();
byte[] bytesBuf = new byte[1024]; byte[] bytesBuf = new byte[1024];
@ -625,7 +620,6 @@ public class OpenWireMessageConverter implements MessageConverter {
} }
finally { finally {
deflater.end(); deflater.end();
compressed.close();
} }
} }
} }

View File

@ -88,12 +88,11 @@ public class HornetQProtocolManagerTest extends ActiveMQTestBase {
manager.register(connectionFactory2, null, null, new ConcurrentHashMap<String, String>()); manager.register(connectionFactory2, null, null, new ConcurrentHashMap<String, String>());
for (XARecoveryConfig resource :manager.getResources()) { for (XARecoveryConfig resource :manager.getResources()) {
ServerLocator locator = resource.createServerLocator(); try (ServerLocator locator = resource.createServerLocator();
ClientSessionFactory factory = locator.createSessionFactory(); ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(); ClientSession session = factory.createSession()) {
session.close(); // Nothing
factory.close(); }
locator.close();
} }
} }

View File

@ -142,8 +142,7 @@ public class ConsumerWindowSizeTest extends ActiveMQTestBase {
@Test @Test
public void testReceiveImmediateWithZeroWindow2() throws Exception { public void testReceiveImmediateWithZeroWindow2() throws Exception {
ActiveMQServer server = createServer(true); ActiveMQServer server = createServer(true);
ServerLocator locator = createInVMNonHALocator(); try (ServerLocator locator = createInVMNonHALocator()) {
try {
server.start(); server.start();
locator.setConsumerWindowSize(0); locator.setConsumerWindowSize(0);
@ -183,9 +182,6 @@ public class ConsumerWindowSizeTest extends ActiveMQTestBase {
session1.close(); session1.close();
sessionProd.close(); sessionProd.close();
} }
finally {
locator.close();
}
} }
// https://jira.jboss.org/jira/browse/HORNETQ-385 // https://jira.jboss.org/jira/browse/HORNETQ-385

View File

@ -641,30 +641,15 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
private ClientMessage receiveFromQueue(String queueName) throws Exception { private ClientMessage receiveFromQueue(String queueName) throws Exception {
ClientMessage msg = null; ClientMessage msg = null;
ServerLocator locator = null; try (ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sf = null; ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = null; ClientSession session = sf.createSession(false, true, true)) {
try {
locator = createInVMNonHALocator();
sf = createSessionFactory(locator);
session = sf.createSession(false, true, true);
ClientConsumer consumer = session.createConsumer(queueName); ClientConsumer consumer = session.createConsumer(queueName);
session.start(); session.start();
msg = consumer.receive(60 * 1000); msg = consumer.receive(60 * 1000);
msg.acknowledge(); msg.acknowledge();
}
finally {
if (session != null) {
session.commit(); session.commit();
session.close();
}
if (sf != null)
sf.close();
if (locator != null)
locator.close();
} }
return msg; return msg;
} }