Use try-with-resources more

This commit is contained in:
Ville Skyttä 2016-02-20 10:33:20 +02:00
parent f9aca6c667
commit aa3f3bd6a7
34 changed files with 233 additions and 423 deletions

View File

@ -80,17 +80,11 @@ public class EncodeJournal extends LockAbstract {
final int minFiles, final int minFiles,
final int fileSize, final int fileSize,
final String fileName) throws Exception { final String fileName) throws Exception {
FileOutputStream fileOutputStream = new FileOutputStream(fileName); try (FileOutputStream fileOutputStream = new FileOutputStream(fileName);
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream); BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
PrintStream out = new PrintStream(bufferedOutputStream); PrintStream out = new PrintStream(bufferedOutputStream)) {
try {
exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out); exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
} }
finally {
out.close();
fileOutputStream.close();
}
} }
public static void exportJournal(final String directory, public static void exportJournal(final String directory,

View File

@ -351,9 +351,9 @@ public final class XmlDataImporter extends ActionAbstract {
ActiveMQServerLogger.LOGGER.debug(logMessage); ActiveMQServerLogger.LOGGER.debug(logMessage);
message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array()); message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
ClientProducer producer = session.createProducer(destination); try (ClientProducer producer = session.createProducer(destination)) {
producer.send(message); producer.send(message);
producer.close(); }
if (tempFileName.length() > 0) { if (tempFileName.length() > 0) {
File tempFile = new File(tempFileName); File tempFile = new File(tempFileName);

View File

@ -182,8 +182,7 @@ public class ProducerThread extends Thread {
} }
private String readInputStream(InputStream is, int size, int messageNumber) throws IOException { private String readInputStream(InputStream is, int size, int messageNumber) throws IOException {
InputStreamReader reader = new InputStreamReader(is); try (InputStreamReader reader = new InputStreamReader(is)) {
try {
char[] buffer; char[] buffer;
if (size > 0) { if (size > 0) {
buffer = new char[size]; buffer = new char[size];
@ -203,9 +202,6 @@ public class ProducerThread extends Thread {
catch (IOException ioe) { catch (IOException ioe) {
return createDefaultMessage(messageNumber); return createDefaultMessage(messageNumber);
} }
finally {
reader.close();
}
} }
private String createDefaultMessage(int messageNumber) { private String createDefaultMessage(int messageNumber) {

View File

@ -83,10 +83,8 @@ public class ServerUtil {
public static void waitForServerToStart(String uri, long timeout) throws InterruptedException { public static void waitForServerToStart(String uri, long timeout) throws InterruptedException {
long realTimeout = System.currentTimeMillis() + timeout; long realTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < realTimeout) { while (System.currentTimeMillis() < realTimeout) {
try { try (ActiveMQConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(uri, null)) {
ActiveMQConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(uri, null);
cf.createConnection().close(); cf.createConnection().close();
cf.close();
System.out.println("server " + uri + " started"); System.out.println("server " + uri + " started");
} }
catch (Exception e) { catch (Exception e) {

View File

@ -41,14 +41,15 @@ public class CompressionUtilTest extends Assert {
ByteArrayInputStream inputStream = new ByteArrayInputStream(input); ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
AtomicLong counter = new AtomicLong(0); AtomicLong counter = new AtomicLong(0);
DeflaterReader reader = new DeflaterReader(inputStream, counter);
ArrayList<Integer> zipHolder = new ArrayList<>(); ArrayList<Integer> zipHolder = new ArrayList<>();
int b = reader.read();
while (b != -1) { try (DeflaterReader reader = new DeflaterReader(inputStream, counter)) {
zipHolder.add(b); int b = reader.read();
b = reader.read();
while (b != -1) {
zipHolder.add(b);
b = reader.read();
}
} }
assertEquals(input.length, counter.get()); assertEquals(input.length, counter.get());
@ -65,7 +66,6 @@ public class CompressionUtilTest extends Assert {
int compressedDataLength = compresser.deflate(output); int compressedDataLength = compresser.deflate(output);
compareByteArray(allCompressed, output, compressedDataLength); compareByteArray(allCompressed, output, compressedDataLength);
reader.close();
} }
@Test @Test
@ -76,17 +76,17 @@ public class CompressionUtilTest extends Assert {
ByteArrayInputStream inputStream = new ByteArrayInputStream(input); ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
AtomicLong counter = new AtomicLong(0); AtomicLong counter = new AtomicLong(0);
DeflaterReader reader = new DeflaterReader(inputStream, counter);
byte[] buffer = new byte[7]; byte[] buffer = new byte[7];
ArrayList<Integer> zipHolder = new ArrayList<>(); ArrayList<Integer> zipHolder = new ArrayList<>();
int n = reader.read(buffer); try (DeflaterReader reader = new DeflaterReader(inputStream, counter)) {
while (n != -1) { int n = reader.read(buffer);
for (int i = 0; i < n; i++) { while (n != -1) {
zipHolder.add((int) buffer[i]); for (int i = 0; i < n; i++) {
zipHolder.add((int) buffer[i]);
}
n = reader.read(buffer);
} }
n = reader.read(buffer);
} }
assertEquals(input.length, counter.get()); assertEquals(input.length, counter.get());
@ -103,7 +103,6 @@ public class CompressionUtilTest extends Assert {
int compressedDataLength = compresser.deflate(output); int compressedDataLength = compresser.deflate(output);
compareByteArray(allCompressed, output, compressedDataLength); compareByteArray(allCompressed, output, compressedDataLength);
reader.close();
} }
@Test @Test
@ -121,13 +120,14 @@ public class CompressionUtilTest extends Assert {
System.arraycopy(output, 0, zipBytes, 0, compressedDataLength); System.arraycopy(output, 0, zipBytes, 0, compressedDataLength);
ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes); ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
InflaterReader inflater = new InflaterReader(byteInput);
ArrayList<Integer> holder = new ArrayList<>(); ArrayList<Integer> holder = new ArrayList<>();
int read = inflater.read(); try (InflaterReader inflater = new InflaterReader(byteInput)) {
int read = inflater.read();
while (read != -1) { while (read != -1) {
holder.add(read); holder.add(read);
read = inflater.read(); read = inflater.read();
}
} }
byte[] result = new byte[holder.size()]; byte[] result = new byte[holder.size()];
@ -139,7 +139,6 @@ public class CompressionUtilTest extends Assert {
String txt = new String(result); String txt = new String(result);
assertEquals(inputString, txt); assertEquals(inputString, txt);
inflater.close();
} }
@Test @Test
@ -158,18 +157,16 @@ public class CompressionUtilTest extends Assert {
ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes); ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(); ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
InflaterWriter writer = new InflaterWriter(byteOutput);
byte[] zipBuffer = new byte[12]; byte[] zipBuffer = new byte[12];
int n = byteInput.read(zipBuffer); try (InflaterWriter writer = new InflaterWriter(byteOutput)) {
while (n > 0) { int n = byteInput.read(zipBuffer);
writer.write(zipBuffer, 0, n); while (n > 0) {
n = byteInput.read(zipBuffer); writer.write(zipBuffer, 0, n);
n = byteInput.read(zipBuffer);
}
} }
writer.close();
byte[] outcome = byteOutput.toByteArray(); byte[] outcome = byteOutput.toByteArray();
String outStr = new String(outcome); String outStr = new String(outcome);

View File

@ -115,15 +115,16 @@ public class ConnectionFactoryURITest {
private void persistIP6(String ipv6, ActiveMQConnectionFactory factory) throws IOException, ClassNotFoundException { private void persistIP6(String ipv6, ActiveMQConnectionFactory factory) throws IOException, ClassNotFoundException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream outStream = new ObjectOutputStream(baos); try (ObjectOutputStream outStream = new ObjectOutputStream(baos)) {
outStream.writeObject(factory); outStream.writeObject(factory);
outStream.close(); }
baos.close(); finally {
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); baos.close();
ObjectInputStream in = new ObjectInputStream(bais); }
factory = (ActiveMQConnectionFactory) in.readObject(); try (ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
in.close(); ObjectInputStream in = new ObjectInputStream(bais)) {
bais.close(); factory = (ActiveMQConnectionFactory) in.readObject();
}
Assert.assertEquals("[" + ipv6 + "]", factory.getStaticConnectors()[0].getParams().get("host")); Assert.assertEquals("[" + ipv6 + "]", factory.getStaticConnectors()[0].getParams().get("host"));
} }

View File

@ -159,9 +159,8 @@ public class OpenWireMessageConverter implements MessageConverter {
if (messageCompressed) { if (messageCompressed) {
InputStream ois = new ByteArrayInputStream(contents); InputStream ois = new ByteArrayInputStream(contents);
ois = new InflaterInputStream(ois); ois = new InflaterInputStream(ois);
org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
try { try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
byte[] buf = new byte[1024]; byte[] buf = new byte[1024];
int n = ois.read(buf); int n = ois.read(buf);
while (n != -1) { while (n != -1) {
@ -171,9 +170,6 @@ public class OpenWireMessageConverter implements MessageConverter {
//read done //read done
contents = decompressed.toByteSequence(); contents = decompressed.toByteSequence();
} }
finally {
decompressed.close();
}
} }
body.writeInt(contents.length); body.writeInt(contents.length);
body.writeBytes(contents.data, contents.offset, contents.length); body.writeBytes(contents.data, contents.offset, contents.length);
@ -279,19 +275,14 @@ public class OpenWireMessageConverter implements MessageConverter {
break; break;
default: default:
if (messageCompressed) { if (messageCompressed) {
org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream(); try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
OutputStream os = new InflaterOutputStream(decompressed); OutputStream os = new InflaterOutputStream(decompressed)) {
try {
os.write(contents.data, contents.offset, contents.getLength()); os.write(contents.data, contents.offset, contents.getLength());
contents = decompressed.toByteSequence(); contents = decompressed.toByteSequence();
} }
catch (Exception e) { catch (Exception e) {
throw new IOException(e); throw new IOException(e);
} }
finally {
os.close();
decompressed.close();
}
} }
body.writeBytes(contents.data, contents.offset, contents.length); body.writeBytes(contents.data, contents.offset, contents.length);
break; break;
@ -536,9 +527,9 @@ public class OpenWireMessageConverter implements MessageConverter {
buffer.readBytes(bytes); buffer.readBytes(bytes);
if (isCompressed) { if (isCompressed) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
DeflaterOutputStream out = new DeflaterOutputStream(bytesOut); try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut)) {
out.write(bytes); out.write(bytes);
out.close(); }
bytes = bytesOut.toByteArray(); bytes = bytesOut.toByteArray();
} }
} }
@ -637,16 +628,11 @@ public class OpenWireMessageConverter implements MessageConverter {
bytes = new byte[n]; bytes = new byte[n];
buffer.readBytes(bytes); buffer.readBytes(bytes);
if (isCompressed) { if (isCompressed) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
DeflaterOutputStream out = new DeflaterOutputStream(bytesOut); DeflaterOutputStream out = new DeflaterOutputStream(bytesOut)) {
try {
out.write(bytes); out.write(bytes);
bytes = bytesOut.toByteArray(); bytes = bytesOut.toByteArray();
} }
finally {
out.close();
bytesOut.close();
}
} }
} }

View File

@ -76,12 +76,12 @@ public class QueueServiceManager extends DestinationServiceManager {
throw new Exception("You must start() this class instance before deploying"); throw new Exception("You must start() this class instance before deploying");
} }
String queueName = queueDeployment.getName(); String queueName = queueDeployment.getName();
ClientSession session = sessionFactory.createSession(false, false, false); try (ClientSession session = sessionFactory.createSession(false, false, false)) {
ClientSession.QueueQuery query = session.queueQuery(new SimpleString(queueName)); ClientSession.QueueQuery query = session.queueQuery(new SimpleString(queueName));
if (!query.isExists()) { if (!query.isExists()) {
session.createQueue(queueName, queueName, queueDeployment.isDurableSend()); session.createQueue(queueName, queueName, queueDeployment.isDurableSend());
}
} }
session.close();
destination.createQueueResource(queueName, queueDeployment.isDurableSend(), queueDeployment.getConsumerSessionTimeoutSeconds(), queueDeployment.isDuplicatesAllowed()); destination.createQueueResource(queueName, queueDeployment.isDurableSend(), queueDeployment.getConsumerSessionTimeoutSeconds(), queueDeployment.isDuplicatesAllowed());

View File

@ -77,16 +77,17 @@ public class TopicServiceManager extends DestinationServiceManager {
throw new Exception("You must start() this class instance before deploying"); throw new Exception("You must start() this class instance before deploying");
} }
String queueName = topicDeployment.getName(); String queueName = topicDeployment.getName();
ClientSession session = sessionFactory.createSession(false, false, false); boolean defaultDurable;
ClientSession.QueueQuery query = session.queueQuery(new SimpleString(queueName)); try (ClientSession session = sessionFactory.createSession(false, false, false)) {
boolean defaultDurable = topicDeployment.isDurableSend(); ClientSession.QueueQuery query = session.queueQuery(new SimpleString(queueName));
if (query.isExists()) { defaultDurable = topicDeployment.isDurableSend();
defaultDurable = query.isDurable(); if (query.isExists()) {
defaultDurable = query.isDurable();
}
else {
session.createQueue(queueName, queueName, topicDeployment.isDurableSend());
}
} }
else {
session.createQueue(queueName, queueName, topicDeployment.isDurableSend());
}
session.close();
destination.createTopicResource(queueName, defaultDurable, topicDeployment.getConsumerSessionTimeoutSeconds(), topicDeployment.isDuplicatesAllowed()); destination.createTopicResource(queueName, defaultDurable, topicDeployment.getConsumerSessionTimeoutSeconds(), topicDeployment.isDuplicatesAllowed());
} }

View File

@ -1666,8 +1666,10 @@ public class ConfigurationImpl implements Configuration, Serializable {
ByteArrayOutputStream bos = new ByteArrayOutputStream(); ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream os = new ObjectOutputStream(bos); ObjectOutputStream os = new ObjectOutputStream(bos);
os.writeObject(ConfigurationImpl.this); os.writeObject(ConfigurationImpl.this);
ObjectInputStream ois = new ObjectInputStreamWithClassLoader(new ByteArrayInputStream(bos.toByteArray())); Configuration config;
Configuration config = (Configuration) ois.readObject(); try (ObjectInputStream ois = new ObjectInputStreamWithClassLoader(new ByteArrayInputStream(bos.toByteArray()))) {
config = (Configuration) ois.readObject();
}
// this is transient because of possible jgroups integration, we need to copy it manually // this is transient because of possible jgroups integration, we need to copy it manually
config.setBroadcastGroupConfigurations(ConfigurationImpl.this.getBroadcastGroupConfigurations()); config.setBroadcastGroupConfigurations(ConfigurationImpl.this.getBroadcastGroupConfigurations());

View File

@ -406,8 +406,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
Filter filter = FilterImpl.createFilter(filterStr); Filter filter = FilterImpl.createFilter(filterStr);
List<Map<String, Object>> messages = new ArrayList<>(); List<Map<String, Object>> messages = new ArrayList<>();
queue.flushExecutor(); queue.flushExecutor();
LinkedListIterator<MessageReference> iterator = queue.totalIterator(); try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
try {
while (iterator.hasNext()) { while (iterator.hasNext()) {
MessageReference ref = iterator.next(); MessageReference ref = iterator.next();
if (filter == null || filter.match(ref.getMessage())) { if (filter == null || filter.match(ref.getMessage())) {
@ -417,9 +416,6 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
} }
return messages.toArray(new Map[messages.size()]); return messages.toArray(new Map[messages.size()]);
} }
finally {
iterator.close();
}
} }
catch (ActiveMQException e) { catch (ActiveMQException e) {
throw new IllegalStateException(e.getMessage()); throw new IllegalStateException(e.getMessage());
@ -449,8 +445,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
try { try {
List<Map<String, Object>> messages = new ArrayList<>(); List<Map<String, Object>> messages = new ArrayList<>();
queue.flushExecutor(); queue.flushExecutor();
LinkedListIterator<MessageReference> iterator = queue.totalIterator(); try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
try {
// returns just the first, as it's the first only // returns just the first, as it's the first only
if (iterator.hasNext()) { if (iterator.hasNext()) {
MessageReference ref = iterator.next(); MessageReference ref = iterator.next();
@ -459,9 +454,6 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
} }
return messages.toArray(new Map[1]); return messages.toArray(new Map[1]);
} }
finally {
iterator.close();
}
} }
finally { finally {
blockOnIO(); blockOnIO();
@ -508,8 +500,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
return getMessageCount(); return getMessageCount();
} }
else { else {
LinkedListIterator<MessageReference> iterator = queue.totalIterator(); try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
try {
int count = 0; int count = 0;
while (iterator.hasNext()) { while (iterator.hasNext()) {
MessageReference ref = iterator.next(); MessageReference ref = iterator.next();
@ -519,9 +510,6 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
} }
return count; return count;
} }
finally {
iterator.close();
}
} }
} }
finally { finally {
@ -919,8 +907,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
ArrayList<CompositeData> c = new ArrayList<>(); ArrayList<CompositeData> c = new ArrayList<>();
Filter filter = FilterImpl.createFilter(filterStr); Filter filter = FilterImpl.createFilter(filterStr);
queue.flushExecutor(); queue.flushExecutor();
LinkedListIterator<MessageReference> iterator = queue.totalIterator(); try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
try {
while (iterator.hasNext() && currentPageSize++ < pageSize) { while (iterator.hasNext() && currentPageSize++ < pageSize) {
MessageReference ref = iterator.next(); MessageReference ref = iterator.next();
if (filter == null || filter.match(ref.getMessage())) { if (filter == null || filter.match(ref.getMessage())) {
@ -932,9 +919,6 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
c.toArray(rc); c.toArray(rc);
return rc; return rc;
} }
finally {
iterator.close();
}
} }
catch (ActiveMQException e) { catch (ActiveMQException e) {
throw new IllegalStateException(e.getMessage()); throw new IllegalStateException(e.getMessage());

View File

@ -110,15 +110,10 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
File.separatorChar + File.separatorChar +
PagingStoreFactoryNIO.ADDRESS_FILE); PagingStoreFactoryNIO.ADDRESS_FILE);
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(fileWithID))); try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(fileWithID)))) {
try {
writer.write(address.toString()); writer.write(address.toString());
writer.newLine(); writer.newLine();
} }
finally {
writer.close();
}
return factory; return factory;
} }
@ -149,16 +144,11 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
continue; continue;
} }
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(addressFile)));
String addressString; String addressString;
try { try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(addressFile)))) {
addressString = reader.readLine(); addressString = reader.readLine();
} }
finally {
reader.close();
}
SimpleString address = new SimpleString(addressString); SimpleString address = new SimpleString(addressString);

View File

@ -226,14 +226,14 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
byte[] buffer = new byte[1 << 4]; byte[] buffer = new byte[1 << 4];
MessageDigest md = MessageDigest.getInstance("MD5"); MessageDigest md = MessageDigest.getInstance("MD5");
FileInputStream is = new FileInputStream(file); byte[] digest;
DigestInputStream is2 = new DigestInputStream(is, md); try (FileInputStream is = new FileInputStream(file);
while (is2.read(buffer) > 0) { DigestInputStream is2 = new DigestInputStream(is, md)) {
continue; while (is2.read(buffer) > 0) {
continue;
}
digest = md.digest();
} }
byte[] digest = md.digest();
is.close();
is2.close();
return Base64.encodeBytes(digest); return Base64.encodeBytes(digest);
} }
catch (Exception e) { catch (Exception e) {

View File

@ -508,42 +508,33 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
file.open(); file.open();
} }
try { try {
final FileInputStream fis = new FileInputStream(file.getJavaFile()); try (final FileInputStream fis = new FileInputStream(file.getJavaFile());
try { final FileChannel channel = fis.getChannel()) {
final FileChannel channel = fis.getChannel(); // We can afford having a single buffer here for this entire loop
try { // because sendReplicatePacket will encode the packet as a NettyBuffer
// We can afford having a single buffer here for this entire loop // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
// because sendReplicatePacket will encode the packet as a NettyBuffer final ByteBuffer buffer = ByteBuffer.allocate(1 << 17); // 1 << 17 == 131072 == 128 * 1024
// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy while (true) {
final ByteBuffer buffer = ByteBuffer.allocate(1 << 17); // 1 << 17 == 131072 == 128 * 1024 buffer.clear();
while (true) { final int bytesRead = channel.read(buffer);
buffer.clear(); int toSend = bytesRead;
final int bytesRead = channel.read(buffer); if (bytesRead > 0) {
int toSend = bytesRead; if (bytesRead >= maxBytesToSend) {
if (bytesRead > 0) { toSend = (int) maxBytesToSend;
if (bytesRead >= maxBytesToSend) { maxBytesToSend = 0;
toSend = (int) maxBytesToSend;
maxBytesToSend = 0;
}
else {
maxBytesToSend = maxBytesToSend - bytesRead;
}
buffer.limit(toSend);
} }
buffer.rewind(); else {
maxBytesToSend = maxBytesToSend - bytesRead;
// sending -1 or 0 bytes will close the file at the backup }
sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer)); buffer.limit(toSend);
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
break;
} }
buffer.rewind();
// sending -1 or 0 bytes will close the file at the backup
sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer));
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
break;
} }
finally {
channel.close();
}
}
finally {
fis.close();
} }
} }
finally { finally {

View File

@ -255,14 +255,13 @@ public class ClusterConnectionBridge extends BridgeImpl {
} }
ManagementHelper.putOperationInvocation(message, ResourceNames.CORE_SERVER, "sendQueueInfoToQueue", notifQueueName.toString(), flowRecord.getAddress()); ManagementHelper.putOperationInvocation(message, ResourceNames.CORE_SERVER, "sendQueueInfoToQueue", notifQueueName.toString(), flowRecord.getAddress());
ClientProducer prod = sessionConsumer.createProducer(managementAddress); try (ClientProducer prod = sessionConsumer.createProducer(managementAddress)) {
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
ActiveMQClientLogger.LOGGER.debug("Cluster connection bridge on " + clusterConnection + " requesting information on queues");
}
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) { prod.send(message);
ActiveMQClientLogger.LOGGER.debug("Cluster connection bridge on " + clusterConnection + " requesting information on queues");
} }
prod.send(message);
prod.close();
} }
} }

View File

@ -869,9 +869,7 @@ public class QueueImpl implements Queue {
@Override @Override
public synchronized MessageReference removeReferenceWithID(final long id1) throws Exception { public synchronized MessageReference removeReferenceWithID(final long id1) throws Exception {
LinkedListIterator<MessageReference> iterator = iterator(); try (LinkedListIterator<MessageReference> iterator = iterator()) {
try {
MessageReference removed = null; MessageReference removed = null;
@ -895,16 +893,11 @@ public class QueueImpl implements Queue {
return removed; return removed;
} }
finally {
iterator.close();
}
} }
@Override @Override
public synchronized MessageReference getReference(final long id1) throws ActiveMQException { public synchronized MessageReference getReference(final long id1) throws ActiveMQException {
LinkedListIterator<MessageReference> iterator = iterator(); try (LinkedListIterator<MessageReference> iterator = iterator()) {
try {
while (iterator.hasNext()) { while (iterator.hasNext()) {
MessageReference ref = iterator.next(); MessageReference ref = iterator.next();
@ -916,9 +909,6 @@ public class QueueImpl implements Queue {
return null; return null;
} }
finally {
iterator.close();
}
} }
@Override @Override
@ -1181,8 +1171,7 @@ public class QueueImpl implements Queue {
Transaction tx = new TransactionImpl(storageManager); Transaction tx = new TransactionImpl(storageManager);
LinkedListIterator<MessageReference> iter = iterator(); try (LinkedListIterator<MessageReference> iter = iterator()) {
try {
while (iter.hasNext()) { while (iter.hasNext()) {
MessageReference ref = iter.next(); MessageReference ref = iter.next();
@ -1256,9 +1245,6 @@ public class QueueImpl implements Queue {
return count; return count;
} }
finally {
iter.close();
}
} }
@Override @Override
@ -1276,8 +1262,7 @@ public class QueueImpl implements Queue {
Transaction tx = new TransactionImpl(storageManager); Transaction tx = new TransactionImpl(storageManager);
LinkedListIterator<MessageReference> iter = iterator(); try (LinkedListIterator<MessageReference> iter = iterator()) {
try {
while (iter.hasNext()) { while (iter.hasNext()) {
MessageReference ref = iter.next(); MessageReference ref = iter.next();
@ -1300,9 +1285,6 @@ public class QueueImpl implements Queue {
return deleted; return deleted;
} }
finally {
iter.close();
}
} }
@Override @Override
@ -1358,8 +1340,7 @@ public class QueueImpl implements Queue {
return false; return false;
} }
LinkedListIterator<MessageReference> iter = iterator(); try (LinkedListIterator<MessageReference> iter = iterator()) {
try {
while (iter.hasNext()) { while (iter.hasNext()) {
MessageReference ref = iter.next(); MessageReference ref = iter.next();
@ -1373,9 +1354,6 @@ public class QueueImpl implements Queue {
} }
return false; return false;
} }
finally {
iter.close();
}
} }
@Override @Override
@ -1390,9 +1368,8 @@ public class QueueImpl implements Queue {
Transaction tx = new TransactionImpl(storageManager); Transaction tx = new TransactionImpl(storageManager);
int count = 0; int count = 0;
LinkedListIterator<MessageReference> iter = iterator();
try { try (LinkedListIterator<MessageReference> iter = iterator()) {
while (iter.hasNext()) { while (iter.hasNext()) {
MessageReference ref = iter.next(); MessageReference ref = iter.next();
@ -1409,9 +1386,6 @@ public class QueueImpl implements Queue {
return count; return count;
} }
finally {
iter.close();
}
} }
@Override @Override
@ -1482,9 +1456,7 @@ public class QueueImpl implements Queue {
@Override @Override
public synchronized boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception { public synchronized boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {
LinkedListIterator<MessageReference> iter = iterator(); try (LinkedListIterator<MessageReference> iter = iterator()) {
try {
while (iter.hasNext()) { while (iter.hasNext()) {
MessageReference ref = iter.next(); MessageReference ref = iter.next();
if (ref.getMessage().getMessageID() == messageID) { if (ref.getMessage().getMessageID() == messageID) {
@ -1497,17 +1469,13 @@ public class QueueImpl implements Queue {
} }
return false; return false;
} }
finally {
iter.close();
}
} }
@Override @Override
public synchronized int sendMessagesToDeadLetterAddress(Filter filter) throws Exception { public synchronized int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
int count = 0; int count = 0;
LinkedListIterator<MessageReference> iter = iterator();
try { try (LinkedListIterator<MessageReference> iter = iterator()) {
while (iter.hasNext()) { while (iter.hasNext()) {
MessageReference ref = iter.next(); MessageReference ref = iter.next();
if (filter == null || filter.match(ref.getMessage())) { if (filter == null || filter.match(ref.getMessage())) {
@ -1520,9 +1488,6 @@ public class QueueImpl implements Queue {
} }
return count; return count;
} }
finally {
iter.close();
}
} }
@Override @Override
@ -1534,9 +1499,7 @@ public class QueueImpl implements Queue {
public synchronized boolean moveReference(final long messageID, public synchronized boolean moveReference(final long messageID,
final SimpleString toAddress, final SimpleString toAddress,
final boolean rejectDuplicate) throws Exception { final boolean rejectDuplicate) throws Exception {
LinkedListIterator<MessageReference> iter = iterator(); try (LinkedListIterator<MessageReference> iter = iterator()) {
try {
while (iter.hasNext()) { while (iter.hasNext()) {
MessageReference ref = iter.next(); MessageReference ref = iter.next();
if (ref.getMessage().getMessageID() == messageID) { if (ref.getMessage().getMessageID() == messageID) {
@ -1555,9 +1518,6 @@ public class QueueImpl implements Queue {
} }
return false; return false;
} }
finally {
iter.close();
}
} }
@Override @Override
@ -1651,9 +1611,7 @@ public class QueueImpl implements Queue {
@Override @Override
public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception { public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception {
LinkedListIterator<MessageReference> iter = iterator(); try (LinkedListIterator<MessageReference> iter = iterator()) {
try {
while (iter.hasNext()) { while (iter.hasNext()) {
MessageReference ref = iter.next(); MessageReference ref = iter.next();
@ -1668,16 +1626,11 @@ public class QueueImpl implements Queue {
return false; return false;
} }
finally {
iter.close();
}
} }
@Override @Override
public synchronized int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception { public synchronized int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception {
LinkedListIterator<MessageReference> iter = iterator(); try (LinkedListIterator<MessageReference> iter = iterator()) {
try {
int count = 0; int count = 0;
while (iter.hasNext()) { while (iter.hasNext()) {
MessageReference ref = iter.next(); MessageReference ref = iter.next();
@ -1691,9 +1644,6 @@ public class QueueImpl implements Queue {
} }
return count; return count;
} }
finally {
iter.close();
}
} }
@Override @Override

View File

@ -403,21 +403,21 @@ public class ScaleDownHandler {
SimpleString managementAddress, SimpleString managementAddress,
String user, String user,
String password) throws Exception { String password) throws Exception {
ClientSession session = sessionFactory.createSession(user, password, true, false, false, false, 0); try (ClientSession session = sessionFactory.createSession(user, password, true, false, false, false, 0);
ClientProducer producer = session.createProducer(managementAddress); ClientProducer producer = session.createProducer(managementAddress)) {
//todo - https://issues.jboss.org/browse/HORNETQ-1336 //todo - https://issues.jboss.org/browse/HORNETQ-1336
for (SimpleString address : duplicateIDMap.keySet()) { for (SimpleString address : duplicateIDMap.keySet()) {
ClientMessage message = session.createMessage(false); ClientMessage message = session.createMessage(false);
List<Pair<byte[], Long>> list = duplicateIDMap.get(address); List<Pair<byte[], Long>> list = duplicateIDMap.get(address);
String[] array = new String[list.size()]; String[] array = new String[list.size()];
for (int i = 0; i < list.size(); i++) { for (int i = 0; i < list.size(); i++) {
Pair<byte[], Long> pair = list.get(i); Pair<byte[], Long> pair = list.get(i);
array[i] = new String(pair.getA()); array[i] = new String(pair.getA());
}
ManagementHelper.putOperationInvocation(message, ResourceNames.CORE_SERVER, "updateDuplicateIdCache", address.toString(), array);
producer.send(message);
} }
ManagementHelper.putOperationInvocation(message, ResourceNames.CORE_SERVER, "updateDuplicateIdCache", address.toString(), array);
producer.send(message);
} }
session.close();
} }
/** /**

View File

@ -274,22 +274,15 @@ public class SharedNothingLiveActivation extends LiveActivation {
nodeId0 = null; nodeId0 = null;
} }
ServerLocatorInternal locator;
ClusterConnectionConfiguration config = ConfigurationUtils.getReplicationClusterConfiguration(activeMQServer.getConfiguration(), replicatedPolicy.getClusterName()); ClusterConnectionConfiguration config = ConfigurationUtils.getReplicationClusterConfiguration(activeMQServer.getConfiguration(), replicatedPolicy.getClusterName());
locator = getLocator(config);
ClientSessionFactoryInternal factory = null;
NodeIdListener listener = new NodeIdListener(nodeId0); NodeIdListener listener = new NodeIdListener(nodeId0);
locator.addClusterTopologyListener(listener); try (ServerLocatorInternal locator = getLocator(config)) {
try { locator.addClusterTopologyListener(listener);
locator.setReconnectAttempts(0); locator.setReconnectAttempts(0);
try { try (ClientSessionFactoryInternal factory = locator.connectNoWarnings()) {
locator.addClusterTopologyListener(listener); // Just try connecting
factory = locator.connectNoWarnings();
} }
catch (Exception notConnected) { catch (Exception notConnected) {
return false; return false;
@ -299,12 +292,6 @@ public class SharedNothingLiveActivation extends LiveActivation {
return listener.isNodePresent; return listener.isNodePresent;
} }
finally {
if (factory != null)
factory.close();
if (locator != null)
locator.close();
}
} }
@Override @Override

View File

@ -73,8 +73,7 @@ public class ReloadableProperties {
private void load(final File source, private void load(final File source,
Properties props) throws IOException { Properties props) throws IOException {
FileInputStream in = new FileInputStream(source); try (FileInputStream in = new FileInputStream(source)) {
try {
props.load(in); props.load(in);
// if (key.isDecrypt()) { // if (key.isDecrypt()) {
// try { // try {
@ -87,9 +86,6 @@ public class ReloadableProperties {
// } // }
} }
finally {
in.close();
}
} }
private boolean hasModificationAfter(long reloadTime) { private boolean hasModificationAfter(long reloadTime) {

View File

@ -85,25 +85,25 @@ public class MessagePropertyTest extends ActiveMQTestBase {
private void receiveMessages() throws Exception { private void receiveMessages() throws Exception {
ClientSession session = sf.createSession(true, true); ClientSession session = sf.createSession(true, true);
session.start(); session.start();
ClientConsumer consumer = session.createConsumer(ADDRESS); try (ClientConsumer consumer = session.createConsumer(ADDRESS)) {
for (int i = 0; i < numMessages; i++) { for (int i = 0; i < numMessages; i++) {
ClientMessage message = consumer.receive(100); ClientMessage message = consumer.receive(100);
assertNotNull("Expecting a message " + i, message); assertNotNull("Expecting a message " + i, message);
assertMessageBody(i, message); assertMessageBody(i, message);
assertEquals(i, message.getIntProperty("int").intValue()); assertEquals(i, message.getIntProperty("int").intValue());
assertEquals((short) i, message.getShortProperty("short").shortValue()); assertEquals((short) i, message.getShortProperty("short").shortValue());
assertEquals((byte) i, message.getByteProperty("byte").byteValue()); assertEquals((byte) i, message.getByteProperty("byte").byteValue());
assertEquals(floatValue(i), message.getFloatProperty("float").floatValue(), 0.001); assertEquals(floatValue(i), message.getFloatProperty("float").floatValue(), 0.001);
assertEquals(new SimpleString(Integer.toString(i)), message.getSimpleStringProperty(SIMPLE_STRING_KEY.toString())); assertEquals(new SimpleString(Integer.toString(i)), message.getSimpleStringProperty(SIMPLE_STRING_KEY.toString()));
assertEqualsByteArrays(byteArray(i), message.getBytesProperty("byte[]")); assertEqualsByteArrays(byteArray(i), message.getBytesProperty("byte[]"));
assertTrue(message.containsProperty("null-value")); assertTrue(message.containsProperty("null-value"));
assertEquals(message.getObjectProperty("null-value"), null); assertEquals(message.getObjectProperty("null-value"), null);
message.acknowledge(); message.acknowledge();
}
assertNull("no more messages", consumer.receive(50));
} }
assertNull("no more messages", consumer.receive(50));
consumer.close();
session.commit(); session.commit();
} }

View File

@ -169,13 +169,9 @@ public class PropertiesLoginModuleRaceConditionTest {
} }
private void store(Properties from, File to) throws FileNotFoundException, IOException { private void store(Properties from, File to) throws FileNotFoundException, IOException {
FileOutputStream output = new FileOutputStream(to); try (FileOutputStream output = new FileOutputStream(to)) {
try {
from.store(output, "Generated by " + name.getMethodName()); from.store(output, "Generated by " + name.getMethodName());
} }
finally {
output.close();
}
} }
private void createGroups() throws FileNotFoundException, IOException { private void createGroups() throws FileNotFoundException, IOException {

View File

@ -56,13 +56,13 @@ public class ClientSideLoadBalancingExample {
// Step 4. We create 3 JMS connections from the same connection factory. Since we are using round-robin // Step 4. We create 3 JMS connections from the same connection factory. Since we are using round-robin
// load-balancing this should result in each sessions being connected to a different node of the cluster // load-balancing this should result in each sessions being connected to a different node of the cluster
Connection conn = connectionFactory.createConnection(); try (Connection conn = connectionFactory.createConnection()) {
// Wait a little while to make sure broadcasts from all nodes have reached the client // Wait a little while to make sure broadcasts from all nodes have reached the client
Thread.sleep(5000); Thread.sleep(5000);
connectionA = connectionFactory.createConnection(); connectionA = connectionFactory.createConnection();
connectionB = connectionFactory.createConnection(); connectionB = connectionFactory.createConnection();
connectionC = connectionFactory.createConnection(); connectionC = connectionFactory.createConnection();
conn.close(); }
// Step 5. We create JMS Sessions // Step 5. We create JMS Sessions
Session sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE); Session sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -73,20 +73,11 @@ public abstract class PerfBase {
protected static PerfParams getParams(final String fileName) throws Exception { protected static PerfParams getParams(final String fileName) throws Exception {
Properties props = null; Properties props = null;
InputStream is = null; try (InputStream is = new FileInputStream(fileName)) {
try {
is = new FileInputStream(fileName);
props = new Properties(); props = new Properties();
props.load(is); props.load(is);
} }
finally {
if (is != null) {
is.close();
}
}
int noOfMessages = Integer.valueOf(props.getProperty("num-messages")); int noOfMessages = Integer.valueOf(props.getProperty("num-messages"));
int noOfWarmupMessages = Integer.valueOf(props.getProperty("num-warmup-messages")); int noOfWarmupMessages = Integer.valueOf(props.getProperty("num-warmup-messages"));

View File

@ -53,20 +53,11 @@ public class SoakBase {
protected static SoakParams getParams(final String fileName) throws Exception { protected static SoakParams getParams(final String fileName) throws Exception {
Properties props = null; Properties props = null;
InputStream is = null; try (InputStream is = new FileInputStream(fileName)) {
try {
is = new FileInputStream(fileName);
props = new Properties(); props = new Properties();
props.load(is); props.load(is);
} }
finally {
if (is != null) {
is.close();
}
}
int durationInMinutes = Integer.valueOf(props.getProperty("duration-in-minutes")); int durationInMinutes = Integer.valueOf(props.getProperty("duration-in-minutes"));
int noOfWarmupMessages = Integer.valueOf(props.getProperty("num-warmup-messages")); int noOfWarmupMessages = Integer.valueOf(props.getProperty("num-warmup-messages"));

View File

@ -62,9 +62,7 @@ public class EmbeddedExample {
Queue queue = (Queue) jmsServer.lookup("queue/exampleQueue"); Queue queue = (Queue) jmsServer.lookup("queue/exampleQueue");
// Step 10. Send and receive a message using JMS API // Step 10. Send and receive a message using JMS API
Connection connection = null; try (Connection connection = cf.createConnection()) {
try {
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Hello sent at " + new Date()); TextMessage message = session.createTextMessage("Hello sent at " + new Date());
@ -76,10 +74,6 @@ public class EmbeddedExample {
System.out.println("Received message:" + messageReceived.getText()); System.out.println("Received message:" + messageReceived.getText());
} }
finally { finally {
if (connection != null) {
connection.close();
}
// Step 11. Stop the JMS server // Step 11. Stop the JMS server
jmsServer.stop(); jmsServer.stop();
System.out.println("Stopped the JMS Server"); System.out.println("Stopped the JMS Server");

View File

@ -32,11 +32,8 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
public class InterceptorExample { public class InterceptorExample {
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
Connection connection = null; ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?incomingInterceptorList=" + SimpleInterceptor.class.getName());
try { try (Connection connection = cf.createConnection()) {
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?incomingInterceptorList=" + SimpleInterceptor.class.getName());
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("exampleQueue"); Queue queue = session.createQueue("exampleQueue");
@ -65,10 +62,5 @@ public class InterceptorExample {
throw new IllegalStateException("Check your configuration as the example interceptor wasn't actually called!"); throw new IllegalStateException("Check your configuration as the example interceptor wasn't actually called!");
} }
} }
finally {
if (connection != null) {
connection.close();
}
}
} }
} }

View File

@ -155,14 +155,12 @@ public class LargeMessageExample {
File outputFile = new File("huge_message_received.dat"); File outputFile = new File("huge_message_received.dat");
FileOutputStream fileOutputStream = new FileOutputStream(outputFile); try (FileOutputStream fileOutputStream = new FileOutputStream(outputFile)) {
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream); // Step 14. This will save the stream and wait until the entire message is written before continuing.
messageReceived.setObjectProperty("JMS_AMQ_SaveStream", bufferedOutput);
// Step 14. This will save the stream and wait until the entire message is written before continuing. }
messageReceived.setObjectProperty("JMS_AMQ_SaveStream", bufferedOutput);
fileOutputStream.close();
System.out.println("File streamed to disk. Size of received file on disk is " + outputFile.length()); System.out.println("File streamed to disk. Size of received file on disk is " + outputFile.length());
} }
@ -182,12 +180,12 @@ public class LargeMessageExample {
private static void createFile(final File file, final long fileSize) throws IOException { private static void createFile(final File file, final long fileSize) throws IOException {
FileOutputStream fileOut = new FileOutputStream(file); FileOutputStream fileOut = new FileOutputStream(file);
BufferedOutputStream buffOut = new BufferedOutputStream(fileOut); try (BufferedOutputStream buffOut = new BufferedOutputStream(fileOut)) {
byte[] outBuffer = new byte[1024 * 1024]; byte[] outBuffer = new byte[1024 * 1024];
for (long i = 0; i < fileSize; i += outBuffer.length) { for (long i = 0; i < fileSize; i += outBuffer.length) {
buffOut.write(outBuffer); buffOut.write(outBuffer);
}
} }
buffOut.close();
} }
} }

View File

@ -67,13 +67,13 @@ public class LastValueQueueExample {
System.out.format("Sent message: %s%n", message.getText()); System.out.format("Sent message: %s%n", message.getText());
// Step 8. Browse the queue. There is only 1 message in it, the last sent // Step 8. Browse the queue. There is only 1 message in it, the last sent
QueueBrowser browser = session.createBrowser(queue); try (QueueBrowser browser = session.createBrowser(queue)) {
Enumeration enumeration = browser.getEnumeration(); Enumeration enumeration = browser.getEnumeration();
while (enumeration.hasMoreElements()) { while (enumeration.hasMoreElements()) {
TextMessage messageInTheQueue = (TextMessage) enumeration.nextElement(); TextMessage messageInTheQueue = (TextMessage) enumeration.nextElement();
System.out.format("Message in the queue: %s%n", messageInTheQueue.getText()); System.out.format("Message in the queue: %s%n", messageInTheQueue.getText());
}
} }
browser.close();
// Step 9. Create a JMS Message Consumer for the queue // Step 9. Create a JMS Message Consumer for the queue
MessageConsumer messageConsumer = session.createConsumer(queue); MessageConsumer messageConsumer = session.createConsumer(queue);

View File

@ -78,11 +78,10 @@ public class TextReverserService implements MessageListener {
// retrieve the destination to reply to // retrieve the destination to reply to
Destination replyTo = request.getJMSReplyTo(); Destination replyTo = request.getJMSReplyTo();
// create a producer to send the reply // create a producer to send the reply
MessageProducer producer = session.createProducer(replyTo); try (MessageProducer producer = session.createProducer(replyTo)) {
// send the reply // send the reply
producer.send(reply); producer.send(reply);
// close the producer }
producer.close();
} }
catch (JMSException e) { catch (JMSException e) {
e.printStackTrace(); e.printStackTrace();

View File

@ -33,8 +33,7 @@ public class JmsReceive {
ConnectionFactory factory = JmsHelper.createConnectionFactory("activemq-client.xml"); ConnectionFactory factory = JmsHelper.createConnectionFactory("activemq-client.xml");
Destination destination = ActiveMQDestination.fromAddress("jms.queue.orders"); Destination destination = ActiveMQDestination.fromAddress("jms.queue.orders");
Connection conn = factory.createConnection(); try (Connection conn = factory.createConnection()) {
try {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination); MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() { consumer.setMessageListener(new MessageListener() {
@ -48,8 +47,5 @@ public class JmsReceive {
conn.start(); conn.start();
Thread.sleep(1000000); Thread.sleep(1000000);
} }
finally {
conn.close();
}
} }
} }

View File

@ -30,8 +30,7 @@ public class JmsSend {
ConnectionFactory factory = JmsHelper.createConnectionFactory("activemq-client.xml"); ConnectionFactory factory = JmsHelper.createConnectionFactory("activemq-client.xml");
Destination destination = ActiveMQDestination.fromAddress("jms.queue.orders"); Destination destination = ActiveMQDestination.fromAddress("jms.queue.orders");
Connection conn = factory.createConnection(); try (Connection conn = factory.createConnection()) {
try {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination); MessageProducer producer = session.createProducer(destination);
ObjectMessage message = session.createObjectMessage(); ObjectMessage message = session.createObjectMessage();
@ -40,8 +39,5 @@ public class JmsSend {
message.setObject(order); message.setObject(order);
producer.send(message); producer.send(message);
} }
finally {
conn.close();
}
} }
} }

View File

@ -30,8 +30,7 @@ public class PostOrder {
ConnectionFactory factory = JmsHelper.createConnectionFactory("activemq-client.xml"); ConnectionFactory factory = JmsHelper.createConnectionFactory("activemq-client.xml");
Destination destination = ActiveMQDestination.fromAddress("jms.queue.orders"); Destination destination = ActiveMQDestination.fromAddress("jms.queue.orders");
Connection conn = factory.createConnection(); try (Connection conn = factory.createConnection()) {
try {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination); MessageProducer producer = session.createProducer(destination);
ObjectMessage message = session.createObjectMessage(); ObjectMessage message = session.createObjectMessage();
@ -40,8 +39,5 @@ public class PostOrder {
message.setObject(order); message.setObject(order);
producer.send(message); producer.send(message);
} }
finally {
conn.close();
}
} }
} }

View File

@ -32,8 +32,7 @@ public class ReceiveShipping {
ConnectionFactory factory = JmsHelper.createConnectionFactory("activemq-client.xml"); ConnectionFactory factory = JmsHelper.createConnectionFactory("activemq-client.xml");
Destination destination = ActiveMQDestination.fromAddress("jms.queue.shipping"); Destination destination = ActiveMQDestination.fromAddress("jms.queue.shipping");
Connection conn = factory.createConnection(); try (Connection conn = factory.createConnection()) {
try {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination); MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() { consumer.setMessageListener(new MessageListener() {
@ -47,8 +46,5 @@ public class ReceiveShipping {
conn.start(); conn.start();
Thread.sleep(1000000); Thread.sleep(1000000);
} }
finally {
conn.close();
}
} }
} }

View File

@ -35,61 +35,54 @@ public class StompEmbeddedWithInterceptorExample {
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
// Step 1. Create a TCP socket to connect to the Stomp port // Step 1. Create a TCP socket to connect to the Stomp port
Socket socket = new Socket("localhost", 61616); try (Socket socket = new Socket("localhost", 61616)) {
// Step 2. Send a CONNECT frame to connect to the server // Step 2. Send a CONNECT frame to connect to the server
String connectFrame = "CONNECT\n" + String connectFrame = "CONNECT\n" +
"accept-version:1.2\n" + "accept-version:1.2\n" +
"host:localhost\n" + "host:localhost\n" +
"login:guest\n" + "login:guest\n" +
"passcode:guest\n" + "passcode:guest\n" +
"request-id:1\n" + "request-id:1\n" +
"\n" + "\n" +
END_OF_FRAME; END_OF_FRAME;
sendFrame(socket, connectFrame); sendFrame(socket, connectFrame);
// Step 3. Send a SEND frame (a Stomp message) to the // Step 3. Send a SEND frame (a Stomp message) to the
// jms.queue.exampleQueue address with a text body // jms.queue.exampleQueue address with a text body
String text = "Hello World from Stomp 1.2 !"; String text = "Hello World from Stomp 1.2 !";
String message = "SEND\n" + String message = "SEND\n" +
"destination:jms.queue.exampleQueue" + "destination:jms.queue.exampleQueue" +
"\n" + "\n" +
text + text +
END_OF_FRAME; END_OF_FRAME;
sendFrame(socket, message); sendFrame(socket, message);
System.out.println("Sent Stomp message: " + text); System.out.println("Sent Stomp message: " + text);
// Step 4. Send a DISCONNECT frame to disconnect from the server
String disconnectFrame = "DISCONNECT\n" +
"\n" +
END_OF_FRAME;
sendFrame(socket, disconnectFrame);
// Step 5. Slose the TCP socket
socket.close();
// Step 4. Send a DISCONNECT frame to disconnect from the server
String disconnectFrame = "DISCONNECT\n" +
"\n" +
END_OF_FRAME;
sendFrame(socket, disconnectFrame);
}
// It will use a regular JMS connection to show how the injected data will appear at the final message // It will use a regular JMS connection to show how the injected data will appear at the final message
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection(); Connection connection = factory.createConnection();
Session session = connection.createSession(); Session session = connection.createSession()) {
connection.start(); connection.start();
MessageConsumer consumer = session.createConsumer(session.createQueue("exampleQueue")); MessageConsumer consumer = session.createConsumer(session.createQueue("exampleQueue"));
Message messageReceived = consumer.receive(5000); Message messageReceived = consumer.receive(5000);
String propStomp = messageReceived.getStringProperty("stompIntercepted"); String propStomp = messageReceived.getStringProperty("stompIntercepted");
String propRegular = messageReceived.getStringProperty("regularIntercepted"); String propRegular = messageReceived.getStringProperty("regularIntercepted");
System.out.println("propStomp is Hello!! - " + propStomp.equals("Hello")); System.out.println("propStomp is Hello!! - " + propStomp.equals("Hello"));
System.out.println("propRegular is HelloAgain!! - " + propRegular.equals("HelloAgain")); System.out.println("propRegular is HelloAgain!! - " + propRegular.equals("HelloAgain"));
}
session.close();
connection.close();
factory.close();
} }
private static void sendFrame(Socket socket, String data) throws Exception { private static void sendFrame(Socket socket, String data) throws Exception {