This closes #397
This commit is contained in:
commit
51d033adec
|
@ -80,17 +80,11 @@ public class EncodeJournal extends LockAbstract {
|
|||
final int minFiles,
|
||||
final int fileSize,
|
||||
final String fileName) throws Exception {
|
||||
FileOutputStream fileOutputStream = new FileOutputStream(fileName);
|
||||
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
|
||||
PrintStream out = new PrintStream(bufferedOutputStream);
|
||||
try {
|
||||
try (FileOutputStream fileOutputStream = new FileOutputStream(fileName);
|
||||
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
|
||||
PrintStream out = new PrintStream(bufferedOutputStream)) {
|
||||
exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
|
||||
}
|
||||
finally {
|
||||
out.close();
|
||||
fileOutputStream.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static void exportJournal(final String directory,
|
||||
|
|
|
@ -351,9 +351,9 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
ActiveMQServerLogger.LOGGER.debug(logMessage);
|
||||
|
||||
message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
|
||||
ClientProducer producer = session.createProducer(destination);
|
||||
producer.send(message);
|
||||
producer.close();
|
||||
try (ClientProducer producer = session.createProducer(destination)) {
|
||||
producer.send(message);
|
||||
}
|
||||
|
||||
if (tempFileName.length() > 0) {
|
||||
File tempFile = new File(tempFileName);
|
||||
|
|
|
@ -182,8 +182,7 @@ public class ProducerThread extends Thread {
|
|||
}
|
||||
|
||||
private String readInputStream(InputStream is, int size, int messageNumber) throws IOException {
|
||||
InputStreamReader reader = new InputStreamReader(is);
|
||||
try {
|
||||
try (InputStreamReader reader = new InputStreamReader(is)) {
|
||||
char[] buffer;
|
||||
if (size > 0) {
|
||||
buffer = new char[size];
|
||||
|
@ -203,9 +202,6 @@ public class ProducerThread extends Thread {
|
|||
catch (IOException ioe) {
|
||||
return createDefaultMessage(messageNumber);
|
||||
}
|
||||
finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
|
||||
private String createDefaultMessage(int messageNumber) {
|
||||
|
|
|
@ -83,10 +83,8 @@ public class ServerUtil {
|
|||
public static void waitForServerToStart(String uri, long timeout) throws InterruptedException {
|
||||
long realTimeout = System.currentTimeMillis() + timeout;
|
||||
while (System.currentTimeMillis() < realTimeout) {
|
||||
try {
|
||||
ActiveMQConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(uri, null);
|
||||
try (ActiveMQConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(uri, null)) {
|
||||
cf.createConnection().close();
|
||||
cf.close();
|
||||
System.out.println("server " + uri + " started");
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
|
|
@ -41,14 +41,15 @@ public class CompressionUtilTest extends Assert {
|
|||
ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
|
||||
|
||||
AtomicLong counter = new AtomicLong(0);
|
||||
DeflaterReader reader = new DeflaterReader(inputStream, counter);
|
||||
|
||||
ArrayList<Integer> zipHolder = new ArrayList<>();
|
||||
int b = reader.read();
|
||||
|
||||
while (b != -1) {
|
||||
zipHolder.add(b);
|
||||
b = reader.read();
|
||||
try (DeflaterReader reader = new DeflaterReader(inputStream, counter)) {
|
||||
int b = reader.read();
|
||||
|
||||
while (b != -1) {
|
||||
zipHolder.add(b);
|
||||
b = reader.read();
|
||||
}
|
||||
}
|
||||
|
||||
assertEquals(input.length, counter.get());
|
||||
|
@ -65,7 +66,6 @@ public class CompressionUtilTest extends Assert {
|
|||
int compressedDataLength = compresser.deflate(output);
|
||||
|
||||
compareByteArray(allCompressed, output, compressedDataLength);
|
||||
reader.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -76,17 +76,17 @@ public class CompressionUtilTest extends Assert {
|
|||
ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
|
||||
AtomicLong counter = new AtomicLong(0);
|
||||
|
||||
DeflaterReader reader = new DeflaterReader(inputStream, counter);
|
||||
|
||||
byte[] buffer = new byte[7];
|
||||
ArrayList<Integer> zipHolder = new ArrayList<>();
|
||||
|
||||
int n = reader.read(buffer);
|
||||
while (n != -1) {
|
||||
for (int i = 0; i < n; i++) {
|
||||
zipHolder.add((int) buffer[i]);
|
||||
try (DeflaterReader reader = new DeflaterReader(inputStream, counter)) {
|
||||
int n = reader.read(buffer);
|
||||
while (n != -1) {
|
||||
for (int i = 0; i < n; i++) {
|
||||
zipHolder.add((int) buffer[i]);
|
||||
}
|
||||
n = reader.read(buffer);
|
||||
}
|
||||
n = reader.read(buffer);
|
||||
}
|
||||
|
||||
assertEquals(input.length, counter.get());
|
||||
|
@ -103,7 +103,6 @@ public class CompressionUtilTest extends Assert {
|
|||
int compressedDataLength = compresser.deflate(output);
|
||||
|
||||
compareByteArray(allCompressed, output, compressedDataLength);
|
||||
reader.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -121,13 +120,14 @@ public class CompressionUtilTest extends Assert {
|
|||
System.arraycopy(output, 0, zipBytes, 0, compressedDataLength);
|
||||
ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
|
||||
|
||||
InflaterReader inflater = new InflaterReader(byteInput);
|
||||
ArrayList<Integer> holder = new ArrayList<>();
|
||||
int read = inflater.read();
|
||||
try (InflaterReader inflater = new InflaterReader(byteInput)) {
|
||||
int read = inflater.read();
|
||||
|
||||
while (read != -1) {
|
||||
holder.add(read);
|
||||
read = inflater.read();
|
||||
while (read != -1) {
|
||||
holder.add(read);
|
||||
read = inflater.read();
|
||||
}
|
||||
}
|
||||
|
||||
byte[] result = new byte[holder.size()];
|
||||
|
@ -139,7 +139,6 @@ public class CompressionUtilTest extends Assert {
|
|||
String txt = new String(result);
|
||||
|
||||
assertEquals(inputString, txt);
|
||||
inflater.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -158,18 +157,16 @@ public class CompressionUtilTest extends Assert {
|
|||
ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
|
||||
|
||||
ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
|
||||
InflaterWriter writer = new InflaterWriter(byteOutput);
|
||||
|
||||
byte[] zipBuffer = new byte[12];
|
||||
|
||||
int n = byteInput.read(zipBuffer);
|
||||
while (n > 0) {
|
||||
writer.write(zipBuffer, 0, n);
|
||||
n = byteInput.read(zipBuffer);
|
||||
try (InflaterWriter writer = new InflaterWriter(byteOutput)) {
|
||||
int n = byteInput.read(zipBuffer);
|
||||
while (n > 0) {
|
||||
writer.write(zipBuffer, 0, n);
|
||||
n = byteInput.read(zipBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
writer.close();
|
||||
|
||||
byte[] outcome = byteOutput.toByteArray();
|
||||
String outStr = new String(outcome);
|
||||
|
||||
|
|
|
@ -115,15 +115,16 @@ public class ConnectionFactoryURITest {
|
|||
|
||||
private void persistIP6(String ipv6, ActiveMQConnectionFactory factory) throws IOException, ClassNotFoundException {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
ObjectOutputStream outStream = new ObjectOutputStream(baos);
|
||||
outStream.writeObject(factory);
|
||||
outStream.close();
|
||||
baos.close();
|
||||
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
|
||||
ObjectInputStream in = new ObjectInputStream(bais);
|
||||
factory = (ActiveMQConnectionFactory) in.readObject();
|
||||
in.close();
|
||||
bais.close();
|
||||
try (ObjectOutputStream outStream = new ObjectOutputStream(baos)) {
|
||||
outStream.writeObject(factory);
|
||||
}
|
||||
finally {
|
||||
baos.close();
|
||||
}
|
||||
try (ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
|
||||
ObjectInputStream in = new ObjectInputStream(bais)) {
|
||||
factory = (ActiveMQConnectionFactory) in.readObject();
|
||||
}
|
||||
Assert.assertEquals("[" + ipv6 + "]", factory.getStaticConnectors()[0].getParams().get("host"));
|
||||
}
|
||||
|
||||
|
|
|
@ -159,9 +159,8 @@ public class OpenWireMessageConverter implements MessageConverter {
|
|||
if (messageCompressed) {
|
||||
InputStream ois = new ByteArrayInputStream(contents);
|
||||
ois = new InflaterInputStream(ois);
|
||||
org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
|
||||
|
||||
try {
|
||||
try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
|
||||
byte[] buf = new byte[1024];
|
||||
int n = ois.read(buf);
|
||||
while (n != -1) {
|
||||
|
@ -171,9 +170,6 @@ public class OpenWireMessageConverter implements MessageConverter {
|
|||
//read done
|
||||
contents = decompressed.toByteSequence();
|
||||
}
|
||||
finally {
|
||||
decompressed.close();
|
||||
}
|
||||
}
|
||||
body.writeInt(contents.length);
|
||||
body.writeBytes(contents.data, contents.offset, contents.length);
|
||||
|
@ -279,19 +275,14 @@ public class OpenWireMessageConverter implements MessageConverter {
|
|||
break;
|
||||
default:
|
||||
if (messageCompressed) {
|
||||
org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
|
||||
OutputStream os = new InflaterOutputStream(decompressed);
|
||||
try {
|
||||
try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
|
||||
OutputStream os = new InflaterOutputStream(decompressed)) {
|
||||
os.write(contents.data, contents.offset, contents.getLength());
|
||||
contents = decompressed.toByteSequence();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
finally {
|
||||
os.close();
|
||||
decompressed.close();
|
||||
}
|
||||
}
|
||||
body.writeBytes(contents.data, contents.offset, contents.length);
|
||||
break;
|
||||
|
@ -536,9 +527,9 @@ public class OpenWireMessageConverter implements MessageConverter {
|
|||
buffer.readBytes(bytes);
|
||||
if (isCompressed) {
|
||||
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
|
||||
DeflaterOutputStream out = new DeflaterOutputStream(bytesOut);
|
||||
out.write(bytes);
|
||||
out.close();
|
||||
try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut)) {
|
||||
out.write(bytes);
|
||||
}
|
||||
bytes = bytesOut.toByteArray();
|
||||
}
|
||||
}
|
||||
|
@ -637,16 +628,11 @@ public class OpenWireMessageConverter implements MessageConverter {
|
|||
bytes = new byte[n];
|
||||
buffer.readBytes(bytes);
|
||||
if (isCompressed) {
|
||||
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
|
||||
DeflaterOutputStream out = new DeflaterOutputStream(bytesOut);
|
||||
try {
|
||||
try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
|
||||
DeflaterOutputStream out = new DeflaterOutputStream(bytesOut)) {
|
||||
out.write(bytes);
|
||||
bytes = bytesOut.toByteArray();
|
||||
}
|
||||
finally {
|
||||
out.close();
|
||||
bytesOut.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -76,12 +76,12 @@ public class QueueServiceManager extends DestinationServiceManager {
|
|||
throw new Exception("You must start() this class instance before deploying");
|
||||
}
|
||||
String queueName = queueDeployment.getName();
|
||||
ClientSession session = sessionFactory.createSession(false, false, false);
|
||||
ClientSession.QueueQuery query = session.queueQuery(new SimpleString(queueName));
|
||||
if (!query.isExists()) {
|
||||
session.createQueue(queueName, queueName, queueDeployment.isDurableSend());
|
||||
try (ClientSession session = sessionFactory.createSession(false, false, false)) {
|
||||
ClientSession.QueueQuery query = session.queueQuery(new SimpleString(queueName));
|
||||
if (!query.isExists()) {
|
||||
session.createQueue(queueName, queueName, queueDeployment.isDurableSend());
|
||||
}
|
||||
}
|
||||
session.close();
|
||||
|
||||
destination.createQueueResource(queueName, queueDeployment.isDurableSend(), queueDeployment.getConsumerSessionTimeoutSeconds(), queueDeployment.isDuplicatesAllowed());
|
||||
|
||||
|
|
|
@ -77,16 +77,17 @@ public class TopicServiceManager extends DestinationServiceManager {
|
|||
throw new Exception("You must start() this class instance before deploying");
|
||||
}
|
||||
String queueName = topicDeployment.getName();
|
||||
ClientSession session = sessionFactory.createSession(false, false, false);
|
||||
ClientSession.QueueQuery query = session.queueQuery(new SimpleString(queueName));
|
||||
boolean defaultDurable = topicDeployment.isDurableSend();
|
||||
if (query.isExists()) {
|
||||
defaultDurable = query.isDurable();
|
||||
boolean defaultDurable;
|
||||
try (ClientSession session = sessionFactory.createSession(false, false, false)) {
|
||||
ClientSession.QueueQuery query = session.queueQuery(new SimpleString(queueName));
|
||||
defaultDurable = topicDeployment.isDurableSend();
|
||||
if (query.isExists()) {
|
||||
defaultDurable = query.isDurable();
|
||||
}
|
||||
else {
|
||||
session.createQueue(queueName, queueName, topicDeployment.isDurableSend());
|
||||
}
|
||||
}
|
||||
else {
|
||||
session.createQueue(queueName, queueName, topicDeployment.isDurableSend());
|
||||
}
|
||||
session.close();
|
||||
|
||||
destination.createTopicResource(queueName, defaultDurable, topicDeployment.getConsumerSessionTimeoutSeconds(), topicDeployment.isDuplicatesAllowed());
|
||||
}
|
||||
|
|
|
@ -1666,8 +1666,10 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
|||
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
ObjectOutputStream os = new ObjectOutputStream(bos);
|
||||
os.writeObject(ConfigurationImpl.this);
|
||||
ObjectInputStream ois = new ObjectInputStreamWithClassLoader(new ByteArrayInputStream(bos.toByteArray()));
|
||||
Configuration config = (Configuration) ois.readObject();
|
||||
Configuration config;
|
||||
try (ObjectInputStream ois = new ObjectInputStreamWithClassLoader(new ByteArrayInputStream(bos.toByteArray()))) {
|
||||
config = (Configuration) ois.readObject();
|
||||
}
|
||||
|
||||
// this is transient because of possible jgroups integration, we need to copy it manually
|
||||
config.setBroadcastGroupConfigurations(ConfigurationImpl.this.getBroadcastGroupConfigurations());
|
||||
|
|
|
@ -406,8 +406,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
Filter filter = FilterImpl.createFilter(filterStr);
|
||||
List<Map<String, Object>> messages = new ArrayList<>();
|
||||
queue.flushExecutor();
|
||||
LinkedListIterator<MessageReference> iterator = queue.totalIterator();
|
||||
try {
|
||||
try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
|
||||
while (iterator.hasNext()) {
|
||||
MessageReference ref = iterator.next();
|
||||
if (filter == null || filter.match(ref.getMessage())) {
|
||||
|
@ -417,9 +416,6 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
}
|
||||
return messages.toArray(new Map[messages.size()]);
|
||||
}
|
||||
finally {
|
||||
iterator.close();
|
||||
}
|
||||
}
|
||||
catch (ActiveMQException e) {
|
||||
throw new IllegalStateException(e.getMessage());
|
||||
|
@ -449,8 +445,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
try {
|
||||
List<Map<String, Object>> messages = new ArrayList<>();
|
||||
queue.flushExecutor();
|
||||
LinkedListIterator<MessageReference> iterator = queue.totalIterator();
|
||||
try {
|
||||
try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
|
||||
// returns just the first, as it's the first only
|
||||
if (iterator.hasNext()) {
|
||||
MessageReference ref = iterator.next();
|
||||
|
@ -459,9 +454,6 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
}
|
||||
return messages.toArray(new Map[1]);
|
||||
}
|
||||
finally {
|
||||
iterator.close();
|
||||
}
|
||||
}
|
||||
finally {
|
||||
blockOnIO();
|
||||
|
@ -508,8 +500,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
return getMessageCount();
|
||||
}
|
||||
else {
|
||||
LinkedListIterator<MessageReference> iterator = queue.totalIterator();
|
||||
try {
|
||||
try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
|
||||
int count = 0;
|
||||
while (iterator.hasNext()) {
|
||||
MessageReference ref = iterator.next();
|
||||
|
@ -519,9 +510,6 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
}
|
||||
return count;
|
||||
}
|
||||
finally {
|
||||
iterator.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
|
@ -919,8 +907,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
ArrayList<CompositeData> c = new ArrayList<>();
|
||||
Filter filter = FilterImpl.createFilter(filterStr);
|
||||
queue.flushExecutor();
|
||||
LinkedListIterator<MessageReference> iterator = queue.totalIterator();
|
||||
try {
|
||||
try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
|
||||
while (iterator.hasNext() && currentPageSize++ < pageSize) {
|
||||
MessageReference ref = iterator.next();
|
||||
if (filter == null || filter.match(ref.getMessage())) {
|
||||
|
@ -932,9 +919,6 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
c.toArray(rc);
|
||||
return rc;
|
||||
}
|
||||
finally {
|
||||
iterator.close();
|
||||
}
|
||||
}
|
||||
catch (ActiveMQException e) {
|
||||
throw new IllegalStateException(e.getMessage());
|
||||
|
|
|
@ -110,15 +110,10 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
|
|||
File.separatorChar +
|
||||
PagingStoreFactoryNIO.ADDRESS_FILE);
|
||||
|
||||
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(fileWithID)));
|
||||
|
||||
try {
|
||||
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(fileWithID)))) {
|
||||
writer.write(address.toString());
|
||||
writer.newLine();
|
||||
}
|
||||
finally {
|
||||
writer.close();
|
||||
}
|
||||
|
||||
return factory;
|
||||
}
|
||||
|
@ -149,16 +144,11 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
|
|||
continue;
|
||||
}
|
||||
|
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(addressFile)));
|
||||
|
||||
String addressString;
|
||||
|
||||
try {
|
||||
try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(addressFile)))) {
|
||||
addressString = reader.readLine();
|
||||
}
|
||||
finally {
|
||||
reader.close();
|
||||
}
|
||||
|
||||
SimpleString address = new SimpleString(addressString);
|
||||
|
||||
|
|
|
@ -226,14 +226,14 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
|
|||
byte[] buffer = new byte[1 << 4];
|
||||
MessageDigest md = MessageDigest.getInstance("MD5");
|
||||
|
||||
FileInputStream is = new FileInputStream(file);
|
||||
DigestInputStream is2 = new DigestInputStream(is, md);
|
||||
while (is2.read(buffer) > 0) {
|
||||
continue;
|
||||
byte[] digest;
|
||||
try (FileInputStream is = new FileInputStream(file);
|
||||
DigestInputStream is2 = new DigestInputStream(is, md)) {
|
||||
while (is2.read(buffer) > 0) {
|
||||
continue;
|
||||
}
|
||||
digest = md.digest();
|
||||
}
|
||||
byte[] digest = md.digest();
|
||||
is.close();
|
||||
is2.close();
|
||||
return Base64.encodeBytes(digest);
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
|
|
@ -508,42 +508,33 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
|
|||
file.open();
|
||||
}
|
||||
try {
|
||||
final FileInputStream fis = new FileInputStream(file.getJavaFile());
|
||||
try {
|
||||
final FileChannel channel = fis.getChannel();
|
||||
try {
|
||||
// We can afford having a single buffer here for this entire loop
|
||||
// because sendReplicatePacket will encode the packet as a NettyBuffer
|
||||
// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(1 << 17); // 1 << 17 == 131072 == 128 * 1024
|
||||
while (true) {
|
||||
buffer.clear();
|
||||
final int bytesRead = channel.read(buffer);
|
||||
int toSend = bytesRead;
|
||||
if (bytesRead > 0) {
|
||||
if (bytesRead >= maxBytesToSend) {
|
||||
toSend = (int) maxBytesToSend;
|
||||
maxBytesToSend = 0;
|
||||
}
|
||||
else {
|
||||
maxBytesToSend = maxBytesToSend - bytesRead;
|
||||
}
|
||||
buffer.limit(toSend);
|
||||
try (final FileInputStream fis = new FileInputStream(file.getJavaFile());
|
||||
final FileChannel channel = fis.getChannel()) {
|
||||
// We can afford having a single buffer here for this entire loop
|
||||
// because sendReplicatePacket will encode the packet as a NettyBuffer
|
||||
// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(1 << 17); // 1 << 17 == 131072 == 128 * 1024
|
||||
while (true) {
|
||||
buffer.clear();
|
||||
final int bytesRead = channel.read(buffer);
|
||||
int toSend = bytesRead;
|
||||
if (bytesRead > 0) {
|
||||
if (bytesRead >= maxBytesToSend) {
|
||||
toSend = (int) maxBytesToSend;
|
||||
maxBytesToSend = 0;
|
||||
}
|
||||
buffer.rewind();
|
||||
|
||||
// sending -1 or 0 bytes will close the file at the backup
|
||||
sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer));
|
||||
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
|
||||
break;
|
||||
else {
|
||||
maxBytesToSend = maxBytesToSend - bytesRead;
|
||||
}
|
||||
buffer.limit(toSend);
|
||||
}
|
||||
buffer.rewind();
|
||||
|
||||
// sending -1 or 0 bytes will close the file at the backup
|
||||
sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer));
|
||||
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
|
||||
break;
|
||||
}
|
||||
finally {
|
||||
channel.close();
|
||||
}
|
||||
}
|
||||
finally {
|
||||
fis.close();
|
||||
}
|
||||
}
|
||||
finally {
|
||||
|
|
|
@ -255,14 +255,13 @@ public class ClusterConnectionBridge extends BridgeImpl {
|
|||
}
|
||||
ManagementHelper.putOperationInvocation(message, ResourceNames.CORE_SERVER, "sendQueueInfoToQueue", notifQueueName.toString(), flowRecord.getAddress());
|
||||
|
||||
ClientProducer prod = sessionConsumer.createProducer(managementAddress);
|
||||
try (ClientProducer prod = sessionConsumer.createProducer(managementAddress)) {
|
||||
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.debug("Cluster connection bridge on " + clusterConnection + " requesting information on queues");
|
||||
}
|
||||
|
||||
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQClientLogger.LOGGER.debug("Cluster connection bridge on " + clusterConnection + " requesting information on queues");
|
||||
prod.send(message);
|
||||
}
|
||||
|
||||
prod.send(message);
|
||||
prod.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -869,9 +869,7 @@ public class QueueImpl implements Queue {
|
|||
|
||||
@Override
|
||||
public synchronized MessageReference removeReferenceWithID(final long id1) throws Exception {
|
||||
LinkedListIterator<MessageReference> iterator = iterator();
|
||||
|
||||
try {
|
||||
try (LinkedListIterator<MessageReference> iterator = iterator()) {
|
||||
|
||||
MessageReference removed = null;
|
||||
|
||||
|
@ -895,16 +893,11 @@ public class QueueImpl implements Queue {
|
|||
|
||||
return removed;
|
||||
}
|
||||
finally {
|
||||
iterator.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized MessageReference getReference(final long id1) throws ActiveMQException {
|
||||
LinkedListIterator<MessageReference> iterator = iterator();
|
||||
|
||||
try {
|
||||
try (LinkedListIterator<MessageReference> iterator = iterator()) {
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
MessageReference ref = iterator.next();
|
||||
|
@ -916,9 +909,6 @@ public class QueueImpl implements Queue {
|
|||
|
||||
return null;
|
||||
}
|
||||
finally {
|
||||
iterator.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1181,8 +1171,7 @@ public class QueueImpl implements Queue {
|
|||
|
||||
Transaction tx = new TransactionImpl(storageManager);
|
||||
|
||||
LinkedListIterator<MessageReference> iter = iterator();
|
||||
try {
|
||||
try (LinkedListIterator<MessageReference> iter = iterator()) {
|
||||
|
||||
while (iter.hasNext()) {
|
||||
MessageReference ref = iter.next();
|
||||
|
@ -1256,9 +1245,6 @@ public class QueueImpl implements Queue {
|
|||
|
||||
return count;
|
||||
}
|
||||
finally {
|
||||
iter.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1276,8 +1262,7 @@ public class QueueImpl implements Queue {
|
|||
|
||||
Transaction tx = new TransactionImpl(storageManager);
|
||||
|
||||
LinkedListIterator<MessageReference> iter = iterator();
|
||||
try {
|
||||
try (LinkedListIterator<MessageReference> iter = iterator()) {
|
||||
|
||||
while (iter.hasNext()) {
|
||||
MessageReference ref = iter.next();
|
||||
|
@ -1300,9 +1285,6 @@ public class QueueImpl implements Queue {
|
|||
|
||||
return deleted;
|
||||
}
|
||||
finally {
|
||||
iter.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1358,8 +1340,7 @@ public class QueueImpl implements Queue {
|
|||
return false;
|
||||
}
|
||||
|
||||
LinkedListIterator<MessageReference> iter = iterator();
|
||||
try {
|
||||
try (LinkedListIterator<MessageReference> iter = iterator()) {
|
||||
|
||||
while (iter.hasNext()) {
|
||||
MessageReference ref = iter.next();
|
||||
|
@ -1373,9 +1354,6 @@ public class QueueImpl implements Queue {
|
|||
}
|
||||
return false;
|
||||
}
|
||||
finally {
|
||||
iter.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1390,9 +1368,8 @@ public class QueueImpl implements Queue {
|
|||
Transaction tx = new TransactionImpl(storageManager);
|
||||
|
||||
int count = 0;
|
||||
LinkedListIterator<MessageReference> iter = iterator();
|
||||
|
||||
try {
|
||||
try (LinkedListIterator<MessageReference> iter = iterator()) {
|
||||
|
||||
while (iter.hasNext()) {
|
||||
MessageReference ref = iter.next();
|
||||
|
@ -1409,9 +1386,6 @@ public class QueueImpl implements Queue {
|
|||
|
||||
return count;
|
||||
}
|
||||
finally {
|
||||
iter.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1482,9 +1456,7 @@ public class QueueImpl implements Queue {
|
|||
|
||||
@Override
|
||||
public synchronized boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {
|
||||
LinkedListIterator<MessageReference> iter = iterator();
|
||||
|
||||
try {
|
||||
try (LinkedListIterator<MessageReference> iter = iterator()) {
|
||||
while (iter.hasNext()) {
|
||||
MessageReference ref = iter.next();
|
||||
if (ref.getMessage().getMessageID() == messageID) {
|
||||
|
@ -1497,17 +1469,13 @@ public class QueueImpl implements Queue {
|
|||
}
|
||||
return false;
|
||||
}
|
||||
finally {
|
||||
iter.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
|
||||
int count = 0;
|
||||
LinkedListIterator<MessageReference> iter = iterator();
|
||||
|
||||
try {
|
||||
try (LinkedListIterator<MessageReference> iter = iterator()) {
|
||||
while (iter.hasNext()) {
|
||||
MessageReference ref = iter.next();
|
||||
if (filter == null || filter.match(ref.getMessage())) {
|
||||
|
@ -1520,9 +1488,6 @@ public class QueueImpl implements Queue {
|
|||
}
|
||||
return count;
|
||||
}
|
||||
finally {
|
||||
iter.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1534,9 +1499,7 @@ public class QueueImpl implements Queue {
|
|||
public synchronized boolean moveReference(final long messageID,
|
||||
final SimpleString toAddress,
|
||||
final boolean rejectDuplicate) throws Exception {
|
||||
LinkedListIterator<MessageReference> iter = iterator();
|
||||
|
||||
try {
|
||||
try (LinkedListIterator<MessageReference> iter = iterator()) {
|
||||
while (iter.hasNext()) {
|
||||
MessageReference ref = iter.next();
|
||||
if (ref.getMessage().getMessageID() == messageID) {
|
||||
|
@ -1555,9 +1518,6 @@ public class QueueImpl implements Queue {
|
|||
}
|
||||
return false;
|
||||
}
|
||||
finally {
|
||||
iter.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1651,9 +1611,7 @@ public class QueueImpl implements Queue {
|
|||
|
||||
@Override
|
||||
public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception {
|
||||
LinkedListIterator<MessageReference> iter = iterator();
|
||||
|
||||
try {
|
||||
try (LinkedListIterator<MessageReference> iter = iterator()) {
|
||||
|
||||
while (iter.hasNext()) {
|
||||
MessageReference ref = iter.next();
|
||||
|
@ -1668,16 +1626,11 @@ public class QueueImpl implements Queue {
|
|||
|
||||
return false;
|
||||
}
|
||||
finally {
|
||||
iter.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception {
|
||||
LinkedListIterator<MessageReference> iter = iterator();
|
||||
|
||||
try {
|
||||
try (LinkedListIterator<MessageReference> iter = iterator()) {
|
||||
int count = 0;
|
||||
while (iter.hasNext()) {
|
||||
MessageReference ref = iter.next();
|
||||
|
@ -1691,9 +1644,6 @@ public class QueueImpl implements Queue {
|
|||
}
|
||||
return count;
|
||||
}
|
||||
finally {
|
||||
iter.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -403,21 +403,21 @@ public class ScaleDownHandler {
|
|||
SimpleString managementAddress,
|
||||
String user,
|
||||
String password) throws Exception {
|
||||
ClientSession session = sessionFactory.createSession(user, password, true, false, false, false, 0);
|
||||
ClientProducer producer = session.createProducer(managementAddress);
|
||||
//todo - https://issues.jboss.org/browse/HORNETQ-1336
|
||||
for (SimpleString address : duplicateIDMap.keySet()) {
|
||||
ClientMessage message = session.createMessage(false);
|
||||
List<Pair<byte[], Long>> list = duplicateIDMap.get(address);
|
||||
String[] array = new String[list.size()];
|
||||
for (int i = 0; i < list.size(); i++) {
|
||||
Pair<byte[], Long> pair = list.get(i);
|
||||
array[i] = new String(pair.getA());
|
||||
try (ClientSession session = sessionFactory.createSession(user, password, true, false, false, false, 0);
|
||||
ClientProducer producer = session.createProducer(managementAddress)) {
|
||||
//todo - https://issues.jboss.org/browse/HORNETQ-1336
|
||||
for (SimpleString address : duplicateIDMap.keySet()) {
|
||||
ClientMessage message = session.createMessage(false);
|
||||
List<Pair<byte[], Long>> list = duplicateIDMap.get(address);
|
||||
String[] array = new String[list.size()];
|
||||
for (int i = 0; i < list.size(); i++) {
|
||||
Pair<byte[], Long> pair = list.get(i);
|
||||
array[i] = new String(pair.getA());
|
||||
}
|
||||
ManagementHelper.putOperationInvocation(message, ResourceNames.CORE_SERVER, "updateDuplicateIdCache", address.toString(), array);
|
||||
producer.send(message);
|
||||
}
|
||||
ManagementHelper.putOperationInvocation(message, ResourceNames.CORE_SERVER, "updateDuplicateIdCache", address.toString(), array);
|
||||
producer.send(message);
|
||||
}
|
||||
session.close();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -274,22 +274,15 @@ public class SharedNothingLiveActivation extends LiveActivation {
|
|||
nodeId0 = null;
|
||||
}
|
||||
|
||||
ServerLocatorInternal locator;
|
||||
|
||||
ClusterConnectionConfiguration config = ConfigurationUtils.getReplicationClusterConfiguration(activeMQServer.getConfiguration(), replicatedPolicy.getClusterName());
|
||||
|
||||
locator = getLocator(config);
|
||||
|
||||
ClientSessionFactoryInternal factory = null;
|
||||
|
||||
NodeIdListener listener = new NodeIdListener(nodeId0);
|
||||
|
||||
locator.addClusterTopologyListener(listener);
|
||||
try {
|
||||
try (ServerLocatorInternal locator = getLocator(config)) {
|
||||
locator.addClusterTopologyListener(listener);
|
||||
locator.setReconnectAttempts(0);
|
||||
try {
|
||||
locator.addClusterTopologyListener(listener);
|
||||
factory = locator.connectNoWarnings();
|
||||
try (ClientSessionFactoryInternal factory = locator.connectNoWarnings()) {
|
||||
// Just try connecting
|
||||
}
|
||||
catch (Exception notConnected) {
|
||||
return false;
|
||||
|
@ -299,12 +292,6 @@ public class SharedNothingLiveActivation extends LiveActivation {
|
|||
|
||||
return listener.isNodePresent;
|
||||
}
|
||||
finally {
|
||||
if (factory != null)
|
||||
factory.close();
|
||||
if (locator != null)
|
||||
locator.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -73,8 +73,7 @@ public class ReloadableProperties {
|
|||
|
||||
private void load(final File source,
|
||||
Properties props) throws IOException {
|
||||
FileInputStream in = new FileInputStream(source);
|
||||
try {
|
||||
try (FileInputStream in = new FileInputStream(source)) {
|
||||
props.load(in);
|
||||
// if (key.isDecrypt()) {
|
||||
// try {
|
||||
|
@ -87,9 +86,6 @@ public class ReloadableProperties {
|
|||
// }
|
||||
|
||||
}
|
||||
finally {
|
||||
in.close();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean hasModificationAfter(long reloadTime) {
|
||||
|
|
|
@ -85,25 +85,25 @@ public class MessagePropertyTest extends ActiveMQTestBase {
|
|||
private void receiveMessages() throws Exception {
|
||||
ClientSession session = sf.createSession(true, true);
|
||||
session.start();
|
||||
ClientConsumer consumer = session.createConsumer(ADDRESS);
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage message = consumer.receive(100);
|
||||
assertNotNull("Expecting a message " + i, message);
|
||||
assertMessageBody(i, message);
|
||||
assertEquals(i, message.getIntProperty("int").intValue());
|
||||
assertEquals((short) i, message.getShortProperty("short").shortValue());
|
||||
assertEquals((byte) i, message.getByteProperty("byte").byteValue());
|
||||
assertEquals(floatValue(i), message.getFloatProperty("float").floatValue(), 0.001);
|
||||
assertEquals(new SimpleString(Integer.toString(i)), message.getSimpleStringProperty(SIMPLE_STRING_KEY.toString()));
|
||||
assertEqualsByteArrays(byteArray(i), message.getBytesProperty("byte[]"));
|
||||
try (ClientConsumer consumer = session.createConsumer(ADDRESS)) {
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage message = consumer.receive(100);
|
||||
assertNotNull("Expecting a message " + i, message);
|
||||
assertMessageBody(i, message);
|
||||
assertEquals(i, message.getIntProperty("int").intValue());
|
||||
assertEquals((short) i, message.getShortProperty("short").shortValue());
|
||||
assertEquals((byte) i, message.getByteProperty("byte").byteValue());
|
||||
assertEquals(floatValue(i), message.getFloatProperty("float").floatValue(), 0.001);
|
||||
assertEquals(new SimpleString(Integer.toString(i)), message.getSimpleStringProperty(SIMPLE_STRING_KEY.toString()));
|
||||
assertEqualsByteArrays(byteArray(i), message.getBytesProperty("byte[]"));
|
||||
|
||||
assertTrue(message.containsProperty("null-value"));
|
||||
assertEquals(message.getObjectProperty("null-value"), null);
|
||||
assertTrue(message.containsProperty("null-value"));
|
||||
assertEquals(message.getObjectProperty("null-value"), null);
|
||||
|
||||
message.acknowledge();
|
||||
message.acknowledge();
|
||||
}
|
||||
assertNull("no more messages", consumer.receive(50));
|
||||
}
|
||||
assertNull("no more messages", consumer.receive(50));
|
||||
consumer.close();
|
||||
session.commit();
|
||||
}
|
||||
|
||||
|
|
|
@ -169,13 +169,9 @@ public class PropertiesLoginModuleRaceConditionTest {
|
|||
}
|
||||
|
||||
private void store(Properties from, File to) throws FileNotFoundException, IOException {
|
||||
FileOutputStream output = new FileOutputStream(to);
|
||||
try {
|
||||
try (FileOutputStream output = new FileOutputStream(to)) {
|
||||
from.store(output, "Generated by " + name.getMethodName());
|
||||
}
|
||||
finally {
|
||||
output.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void createGroups() throws FileNotFoundException, IOException {
|
||||
|
|
|
@ -56,13 +56,13 @@ public class ClientSideLoadBalancingExample {
|
|||
|
||||
// Step 4. We create 3 JMS connections from the same connection factory. Since we are using round-robin
|
||||
// load-balancing this should result in each sessions being connected to a different node of the cluster
|
||||
Connection conn = connectionFactory.createConnection();
|
||||
// Wait a little while to make sure broadcasts from all nodes have reached the client
|
||||
Thread.sleep(5000);
|
||||
connectionA = connectionFactory.createConnection();
|
||||
connectionB = connectionFactory.createConnection();
|
||||
connectionC = connectionFactory.createConnection();
|
||||
conn.close();
|
||||
try (Connection conn = connectionFactory.createConnection()) {
|
||||
// Wait a little while to make sure broadcasts from all nodes have reached the client
|
||||
Thread.sleep(5000);
|
||||
connectionA = connectionFactory.createConnection();
|
||||
connectionB = connectionFactory.createConnection();
|
||||
connectionC = connectionFactory.createConnection();
|
||||
}
|
||||
|
||||
// Step 5. We create JMS Sessions
|
||||
Session sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
|
|
@ -73,20 +73,11 @@ public abstract class PerfBase {
|
|||
protected static PerfParams getParams(final String fileName) throws Exception {
|
||||
Properties props = null;
|
||||
|
||||
InputStream is = null;
|
||||
|
||||
try {
|
||||
is = new FileInputStream(fileName);
|
||||
|
||||
try (InputStream is = new FileInputStream(fileName)) {
|
||||
props = new Properties();
|
||||
|
||||
props.load(is);
|
||||
}
|
||||
finally {
|
||||
if (is != null) {
|
||||
is.close();
|
||||
}
|
||||
}
|
||||
|
||||
int noOfMessages = Integer.valueOf(props.getProperty("num-messages"));
|
||||
int noOfWarmupMessages = Integer.valueOf(props.getProperty("num-warmup-messages"));
|
||||
|
|
|
@ -53,20 +53,11 @@ public class SoakBase {
|
|||
protected static SoakParams getParams(final String fileName) throws Exception {
|
||||
Properties props = null;
|
||||
|
||||
InputStream is = null;
|
||||
|
||||
try {
|
||||
is = new FileInputStream(fileName);
|
||||
|
||||
try (InputStream is = new FileInputStream(fileName)) {
|
||||
props = new Properties();
|
||||
|
||||
props.load(is);
|
||||
}
|
||||
finally {
|
||||
if (is != null) {
|
||||
is.close();
|
||||
}
|
||||
}
|
||||
|
||||
int durationInMinutes = Integer.valueOf(props.getProperty("duration-in-minutes"));
|
||||
int noOfWarmupMessages = Integer.valueOf(props.getProperty("num-warmup-messages"));
|
||||
|
|
|
@ -62,9 +62,7 @@ public class EmbeddedExample {
|
|||
Queue queue = (Queue) jmsServer.lookup("queue/exampleQueue");
|
||||
|
||||
// Step 10. Send and receive a message using JMS API
|
||||
Connection connection = null;
|
||||
try {
|
||||
connection = cf.createConnection();
|
||||
try (Connection connection = cf.createConnection()) {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
TextMessage message = session.createTextMessage("Hello sent at " + new Date());
|
||||
|
@ -76,10 +74,6 @@ public class EmbeddedExample {
|
|||
System.out.println("Received message:" + messageReceived.getText());
|
||||
}
|
||||
finally {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
|
||||
// Step 11. Stop the JMS server
|
||||
jmsServer.stop();
|
||||
System.out.println("Stopped the JMS Server");
|
||||
|
|
|
@ -32,11 +32,8 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
|||
public class InterceptorExample {
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
Connection connection = null;
|
||||
try {
|
||||
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?incomingInterceptorList=" + SimpleInterceptor.class.getName());
|
||||
connection = cf.createConnection();
|
||||
|
||||
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?incomingInterceptorList=" + SimpleInterceptor.class.getName());
|
||||
try (Connection connection = cf.createConnection()) {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
Queue queue = session.createQueue("exampleQueue");
|
||||
|
@ -65,10 +62,5 @@ public class InterceptorExample {
|
|||
throw new IllegalStateException("Check your configuration as the example interceptor wasn't actually called!");
|
||||
}
|
||||
}
|
||||
finally {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -155,14 +155,12 @@ public class LargeMessageExample {
|
|||
|
||||
File outputFile = new File("huge_message_received.dat");
|
||||
|
||||
FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
|
||||
try (FileOutputStream fileOutputStream = new FileOutputStream(outputFile)) {
|
||||
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
|
||||
|
||||
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
|
||||
|
||||
// Step 14. This will save the stream and wait until the entire message is written before continuing.
|
||||
messageReceived.setObjectProperty("JMS_AMQ_SaveStream", bufferedOutput);
|
||||
|
||||
fileOutputStream.close();
|
||||
// Step 14. This will save the stream and wait until the entire message is written before continuing.
|
||||
messageReceived.setObjectProperty("JMS_AMQ_SaveStream", bufferedOutput);
|
||||
}
|
||||
|
||||
System.out.println("File streamed to disk. Size of received file on disk is " + outputFile.length());
|
||||
}
|
||||
|
@ -182,12 +180,12 @@ public class LargeMessageExample {
|
|||
|
||||
private static void createFile(final File file, final long fileSize) throws IOException {
|
||||
FileOutputStream fileOut = new FileOutputStream(file);
|
||||
BufferedOutputStream buffOut = new BufferedOutputStream(fileOut);
|
||||
byte[] outBuffer = new byte[1024 * 1024];
|
||||
for (long i = 0; i < fileSize; i += outBuffer.length) {
|
||||
buffOut.write(outBuffer);
|
||||
try (BufferedOutputStream buffOut = new BufferedOutputStream(fileOut)) {
|
||||
byte[] outBuffer = new byte[1024 * 1024];
|
||||
for (long i = 0; i < fileSize; i += outBuffer.length) {
|
||||
buffOut.write(outBuffer);
|
||||
}
|
||||
}
|
||||
buffOut.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -67,13 +67,13 @@ public class LastValueQueueExample {
|
|||
System.out.format("Sent message: %s%n", message.getText());
|
||||
|
||||
// Step 8. Browse the queue. There is only 1 message in it, the last sent
|
||||
QueueBrowser browser = session.createBrowser(queue);
|
||||
Enumeration enumeration = browser.getEnumeration();
|
||||
while (enumeration.hasMoreElements()) {
|
||||
TextMessage messageInTheQueue = (TextMessage) enumeration.nextElement();
|
||||
System.out.format("Message in the queue: %s%n", messageInTheQueue.getText());
|
||||
try (QueueBrowser browser = session.createBrowser(queue)) {
|
||||
Enumeration enumeration = browser.getEnumeration();
|
||||
while (enumeration.hasMoreElements()) {
|
||||
TextMessage messageInTheQueue = (TextMessage) enumeration.nextElement();
|
||||
System.out.format("Message in the queue: %s%n", messageInTheQueue.getText());
|
||||
}
|
||||
}
|
||||
browser.close();
|
||||
|
||||
// Step 9. Create a JMS Message Consumer for the queue
|
||||
MessageConsumer messageConsumer = session.createConsumer(queue);
|
||||
|
|
|
@ -78,11 +78,10 @@ public class TextReverserService implements MessageListener {
|
|||
// retrieve the destination to reply to
|
||||
Destination replyTo = request.getJMSReplyTo();
|
||||
// create a producer to send the reply
|
||||
MessageProducer producer = session.createProducer(replyTo);
|
||||
// send the reply
|
||||
producer.send(reply);
|
||||
// close the producer
|
||||
producer.close();
|
||||
try (MessageProducer producer = session.createProducer(replyTo)) {
|
||||
// send the reply
|
||||
producer.send(reply);
|
||||
}
|
||||
}
|
||||
catch (JMSException e) {
|
||||
e.printStackTrace();
|
||||
|
|
|
@ -33,8 +33,7 @@ public class JmsReceive {
|
|||
ConnectionFactory factory = JmsHelper.createConnectionFactory("activemq-client.xml");
|
||||
Destination destination = ActiveMQDestination.fromAddress("jms.queue.orders");
|
||||
|
||||
Connection conn = factory.createConnection();
|
||||
try {
|
||||
try (Connection conn = factory.createConnection()) {
|
||||
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
consumer.setMessageListener(new MessageListener() {
|
||||
|
@ -48,8 +47,5 @@ public class JmsReceive {
|
|||
conn.start();
|
||||
Thread.sleep(1000000);
|
||||
}
|
||||
finally {
|
||||
conn.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -30,8 +30,7 @@ public class JmsSend {
|
|||
ConnectionFactory factory = JmsHelper.createConnectionFactory("activemq-client.xml");
|
||||
Destination destination = ActiveMQDestination.fromAddress("jms.queue.orders");
|
||||
|
||||
Connection conn = factory.createConnection();
|
||||
try {
|
||||
try (Connection conn = factory.createConnection()) {
|
||||
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
ObjectMessage message = session.createObjectMessage();
|
||||
|
@ -40,8 +39,5 @@ public class JmsSend {
|
|||
message.setObject(order);
|
||||
producer.send(message);
|
||||
}
|
||||
finally {
|
||||
conn.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,8 +30,7 @@ public class PostOrder {
|
|||
ConnectionFactory factory = JmsHelper.createConnectionFactory("activemq-client.xml");
|
||||
Destination destination = ActiveMQDestination.fromAddress("jms.queue.orders");
|
||||
|
||||
Connection conn = factory.createConnection();
|
||||
try {
|
||||
try (Connection conn = factory.createConnection()) {
|
||||
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
ObjectMessage message = session.createObjectMessage();
|
||||
|
@ -40,8 +39,5 @@ public class PostOrder {
|
|||
message.setObject(order);
|
||||
producer.send(message);
|
||||
}
|
||||
finally {
|
||||
conn.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,8 +32,7 @@ public class ReceiveShipping {
|
|||
ConnectionFactory factory = JmsHelper.createConnectionFactory("activemq-client.xml");
|
||||
Destination destination = ActiveMQDestination.fromAddress("jms.queue.shipping");
|
||||
|
||||
Connection conn = factory.createConnection();
|
||||
try {
|
||||
try (Connection conn = factory.createConnection()) {
|
||||
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
consumer.setMessageListener(new MessageListener() {
|
||||
|
@ -47,8 +46,5 @@ public class ReceiveShipping {
|
|||
conn.start();
|
||||
Thread.sleep(1000000);
|
||||
}
|
||||
finally {
|
||||
conn.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -35,61 +35,54 @@ public class StompEmbeddedWithInterceptorExample {
|
|||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
// Step 1. Create a TCP socket to connect to the Stomp port
|
||||
Socket socket = new Socket("localhost", 61616);
|
||||
try (Socket socket = new Socket("localhost", 61616)) {
|
||||
|
||||
// Step 2. Send a CONNECT frame to connect to the server
|
||||
String connectFrame = "CONNECT\n" +
|
||||
"accept-version:1.2\n" +
|
||||
"host:localhost\n" +
|
||||
"login:guest\n" +
|
||||
"passcode:guest\n" +
|
||||
"request-id:1\n" +
|
||||
"\n" +
|
||||
END_OF_FRAME;
|
||||
sendFrame(socket, connectFrame);
|
||||
// Step 2. Send a CONNECT frame to connect to the server
|
||||
String connectFrame = "CONNECT\n" +
|
||||
"accept-version:1.2\n" +
|
||||
"host:localhost\n" +
|
||||
"login:guest\n" +
|
||||
"passcode:guest\n" +
|
||||
"request-id:1\n" +
|
||||
"\n" +
|
||||
END_OF_FRAME;
|
||||
sendFrame(socket, connectFrame);
|
||||
|
||||
// Step 3. Send a SEND frame (a Stomp message) to the
|
||||
// jms.queue.exampleQueue address with a text body
|
||||
String text = "Hello World from Stomp 1.2 !";
|
||||
String message = "SEND\n" +
|
||||
"destination:jms.queue.exampleQueue" +
|
||||
"\n" +
|
||||
text +
|
||||
END_OF_FRAME;
|
||||
sendFrame(socket, message);
|
||||
System.out.println("Sent Stomp message: " + text);
|
||||
|
||||
// Step 4. Send a DISCONNECT frame to disconnect from the server
|
||||
String disconnectFrame = "DISCONNECT\n" +
|
||||
"\n" +
|
||||
END_OF_FRAME;
|
||||
sendFrame(socket, disconnectFrame);
|
||||
|
||||
// Step 5. Slose the TCP socket
|
||||
socket.close();
|
||||
// Step 3. Send a SEND frame (a Stomp message) to the
|
||||
// jms.queue.exampleQueue address with a text body
|
||||
String text = "Hello World from Stomp 1.2 !";
|
||||
String message = "SEND\n" +
|
||||
"destination:jms.queue.exampleQueue" +
|
||||
"\n" +
|
||||
text +
|
||||
END_OF_FRAME;
|
||||
sendFrame(socket, message);
|
||||
System.out.println("Sent Stomp message: " + text);
|
||||
|
||||
// Step 4. Send a DISCONNECT frame to disconnect from the server
|
||||
String disconnectFrame = "DISCONNECT\n" +
|
||||
"\n" +
|
||||
END_OF_FRAME;
|
||||
sendFrame(socket, disconnectFrame);
|
||||
}
|
||||
|
||||
// It will use a regular JMS connection to show how the injected data will appear at the final message
|
||||
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession();
|
||||
connection.start();
|
||||
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession()) {
|
||||
connection.start();
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(session.createQueue("exampleQueue"));
|
||||
Message messageReceived = consumer.receive(5000);
|
||||
MessageConsumer consumer = session.createConsumer(session.createQueue("exampleQueue"));
|
||||
Message messageReceived = consumer.receive(5000);
|
||||
|
||||
String propStomp = messageReceived.getStringProperty("stompIntercepted");
|
||||
String propStomp = messageReceived.getStringProperty("stompIntercepted");
|
||||
|
||||
String propRegular = messageReceived.getStringProperty("regularIntercepted");
|
||||
String propRegular = messageReceived.getStringProperty("regularIntercepted");
|
||||
|
||||
System.out.println("propStomp is Hello!! - " + propStomp.equals("Hello"));
|
||||
System.out.println("propRegular is HelloAgain!! - " + propRegular.equals("HelloAgain"));
|
||||
|
||||
session.close();
|
||||
connection.close();
|
||||
|
||||
factory.close();
|
||||
System.out.println("propStomp is Hello!! - " + propStomp.equals("Hello"));
|
||||
System.out.println("propRegular is HelloAgain!! - " + propRegular.equals("HelloAgain"));
|
||||
}
|
||||
}
|
||||
|
||||
private static void sendFrame(Socket socket, String data) throws Exception {
|
||||
|
|
Loading…
Reference in New Issue