mirror of https://github.com/apache/activemq.git
An improvement of AMQ-1075 so that the BLOB upload policy can be configured more easily together with a simple provider of the BlobUploadStrategy (with the actual implementation detail stubbed out). Also added brokerUploadUrl to BrokerInfo so that the broker can inform the clients where to upload out-of-band messages to. Finally added a JMSXMimeType helper method to the ActiveMQ extension API for doing selectors on MIME types
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@511598 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
26198c2094
commit
05a8cba7d0
|
@ -82,6 +82,7 @@ import org.apache.activemq.util.IntrospectionSupport;
|
|||
import org.apache.activemq.util.JMSExceptionSupport;
|
||||
import org.apache.activemq.util.LongSequenceGenerator;
|
||||
import org.apache.activemq.util.ServiceSupport;
|
||||
import org.apache.activemq.blob.BlobTransferPolicy;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
@ -116,6 +117,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
|
||||
// Configuration options variables
|
||||
private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
|
||||
private BlobTransferPolicy blobTransferPolicy;
|
||||
private RedeliveryPolicy redeliveryPolicy;
|
||||
private MessageTransformer transformer;
|
||||
|
||||
|
@ -1405,7 +1407,19 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
|
||||
this.redeliveryPolicy = redeliveryPolicy;
|
||||
}
|
||||
|
||||
|
||||
public BlobTransferPolicy getBlobTransferPolicy() {
|
||||
return blobTransferPolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the policy used to describe how out-of-band BLOBs (Binary Large OBjects)
|
||||
* are transferred from producers to brokers to consumers
|
||||
*/
|
||||
public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
|
||||
this.blobTransferPolicy = blobTransferPolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Returns the alwaysSessionAsync.
|
||||
*/
|
||||
|
@ -1490,6 +1504,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
this.brokerInfo = (BrokerInfo)command;
|
||||
brokerInfoReceived.countDown();
|
||||
this.optimizeAcknowledge &= !this.brokerInfo.isFaultTolerantConfiguration();
|
||||
getBlobTransferPolicy().setBrokerUploadUrl(brokerInfo.getBrokerUploadUrl());
|
||||
}
|
||||
else if (command instanceof ControlCommand) {
|
||||
onControlCommand((ControlCommand) command);
|
||||
|
|
|
@ -17,20 +17,6 @@
|
|||
*/
|
||||
package org.apache.activemq;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.QueueConnection;
|
||||
import javax.jms.QueueConnectionFactory;
|
||||
import javax.jms.TopicConnection;
|
||||
import javax.jms.TopicConnectionFactory;
|
||||
import javax.naming.Context;
|
||||
|
||||
import org.apache.activemq.jndi.JNDIBaseStorable;
|
||||
import org.apache.activemq.management.JMSStatsImpl;
|
||||
import org.apache.activemq.management.StatsCapable;
|
||||
|
@ -42,7 +28,20 @@ import org.apache.activemq.util.IntrospectionSupport;
|
|||
import org.apache.activemq.util.JMSExceptionSupport;
|
||||
import org.apache.activemq.util.URISupport;
|
||||
import org.apache.activemq.util.URISupport.CompositeData;
|
||||
import org.apache.activemq.blob.BlobTransferPolicy;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.QueueConnection;
|
||||
import javax.jms.QueueConnectionFactory;
|
||||
import javax.jms.TopicConnection;
|
||||
import javax.jms.TopicConnectionFactory;
|
||||
import javax.naming.Context;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
@ -69,9 +68,10 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
protected String password;
|
||||
protected String clientID;
|
||||
|
||||
// optimization flags
|
||||
// client policies
|
||||
private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
|
||||
private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
|
||||
private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
|
||||
private MessageTransformer transformer;
|
||||
|
||||
private boolean disableTimeStampsByDefault = false;
|
||||
|
@ -258,6 +258,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
|
||||
connection.setRedeliveryPolicy(getRedeliveryPolicy());
|
||||
connection.setTransformer(getTransformer());
|
||||
connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
|
||||
|
||||
transport.start();
|
||||
|
||||
|
@ -402,6 +403,18 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
return useAsyncSend;
|
||||
}
|
||||
|
||||
public BlobTransferPolicy getBlobTransferPolicy() {
|
||||
return blobTransferPolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the policy used to describe how out-of-band BLOBs (Binary Large OBjects)
|
||||
* are transferred from producers to brokers to consumers
|
||||
*/
|
||||
public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
|
||||
this.blobTransferPolicy = blobTransferPolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Forces the use of <a
|
||||
* href="http://incubator.apache.org/activemq/async-sends.html">Async Sends</a>
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.activemq.transaction.Synchronization;
|
|||
import org.apache.activemq.util.Callback;
|
||||
import org.apache.activemq.util.LongSequenceGenerator;
|
||||
import org.apache.activemq.blob.BlobUploader;
|
||||
import org.apache.activemq.blob.BlobUploadStrategy;
|
||||
import org.apache.activemq.blob.BlobTransferPolicy;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
@ -130,7 +130,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
* @see javax.jms.XASession
|
||||
*/
|
||||
public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
|
||||
private BlobUploadStrategy blobUploadStrategy;
|
||||
|
||||
public static interface DeliveryListener {
|
||||
public void beforeDelivery(ActiveMQSession session, Message msg);
|
||||
|
@ -146,6 +145,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
|||
private TransactionContext transactionContext;
|
||||
private DeliveryListener deliveryListener;
|
||||
private MessageTransformer transformer;
|
||||
private BlobTransferPolicy blobTransferPolicy;
|
||||
|
||||
protected final ActiveMQConnection connection;
|
||||
protected final SessionInfo info;
|
||||
|
@ -190,6 +190,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
|||
stats = new JMSSessionStatsImpl(producers, consumers);
|
||||
this.connection.asyncSendPacket(info);
|
||||
setTransformer(connection.getTransformer());
|
||||
setBlobTransferPolicy(connection.getBlobTransferPolicy());
|
||||
|
||||
if( connection.isStarted() )
|
||||
start();
|
||||
|
@ -408,8 +409,9 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
|||
public BlobMessage createBlobMessage(File file) throws JMSException {
|
||||
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
|
||||
configureMessage(message);
|
||||
message.setBlobUploader(new BlobUploader(blobUploadStrategy, file));
|
||||
message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
|
||||
message.setDeletedByBroker(true);
|
||||
message.setName(file.getName());
|
||||
return message;
|
||||
}
|
||||
|
||||
|
@ -431,7 +433,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
|||
public BlobMessage createBlobMessage(InputStream in) throws JMSException {
|
||||
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
|
||||
configureMessage(message);
|
||||
message.setBlobUploader(new BlobUploader(blobUploadStrategy, in));
|
||||
message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
|
||||
message.setDeletedByBroker(true);
|
||||
return message;
|
||||
}
|
||||
|
@ -1743,23 +1745,23 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
|||
this.transformer = transformer;
|
||||
}
|
||||
|
||||
public BlobTransferPolicy getBlobTransferPolicy() {
|
||||
return blobTransferPolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the policy used to describe how out-of-band BLOBs (Binary Large OBjects)
|
||||
* are transferred from producers to brokers to consumers
|
||||
*/
|
||||
public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
|
||||
this.blobTransferPolicy = blobTransferPolicy;
|
||||
}
|
||||
|
||||
public List getUnconsumedMessages() {
|
||||
return executor.getUnconsumedMessages();
|
||||
}
|
||||
|
||||
|
||||
public BlobUploadStrategy getBlobUploadStrategy() {
|
||||
return blobUploadStrategy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the upload strategy for BLOBs which are sent out-of-band by uploading them
|
||||
* to some remote repository or the broker
|
||||
*/
|
||||
public void setBlobUploadStrategy(BlobUploadStrategy blobUploadStrategy) {
|
||||
this.blobUploadStrategy = blobUploadStrategy;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "ActiveMQSession {id="+info.getSessionId()+",started="+started.get()+"}";
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.apache.activemq;
|
||||
|
||||
import javax.jms.Message;
|
||||
import javax.jms.JMSException;
|
||||
import java.net.URL;
|
||||
import java.net.MalformedURLException;
|
||||
|
@ -54,4 +53,11 @@ public interface BlobMessage extends Message {
|
|||
*/
|
||||
void setMimeType(String mimeType);
|
||||
|
||||
|
||||
String getName();
|
||||
|
||||
/**
|
||||
* The name of the attachment which can be useful information if transmitting files over ActiveMQ
|
||||
*/
|
||||
void setName(String name);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq;
|
||||
|
||||
/**
|
||||
* Represents the JMS extension methods in Apache ActiveMQ
|
||||
*
|
||||
* @version $Revision: $
|
||||
*/
|
||||
public interface Message extends javax.jms.Message {
|
||||
|
||||
/**
|
||||
* Returns the MIME type of this mesage. This can be used in selectors to filter on
|
||||
* the MIME types of the different JMS messages, or in the case of {@link org.apache.activemq.BlobMessage}
|
||||
* it allows you to create a selector on the MIME type of the BLOB body
|
||||
*/
|
||||
public String getJMSXMimeType();
|
||||
|
||||
}
|
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.blob;
|
||||
|
||||
/**
|
||||
* The policy for configuring how BLOBs (Binary Large OBjects) are transferred
|
||||
* out of band between producers, brokers and consumers.
|
||||
*
|
||||
* @version $Revision: $
|
||||
*/
|
||||
public class BlobTransferPolicy {
|
||||
private String defaultUploadUrl = "http://localhost:8080";
|
||||
private String brokerUploadUrl;
|
||||
private String uploadUrl;
|
||||
private BlobUploadStrategy uploadStrategy;
|
||||
|
||||
/**
|
||||
* Returns a copy of this policy object
|
||||
*/
|
||||
public BlobTransferPolicy copy() {
|
||||
BlobTransferPolicy that = new BlobTransferPolicy();
|
||||
that.defaultUploadUrl = this.defaultUploadUrl;
|
||||
that.brokerUploadUrl = this.brokerUploadUrl;
|
||||
that.uploadUrl = this.uploadUrl;
|
||||
that.uploadStrategy = this.uploadStrategy;
|
||||
return that;
|
||||
}
|
||||
|
||||
public String getUploadUrl() {
|
||||
if (uploadUrl == null) {
|
||||
uploadUrl = getBrokerUploadUrl();
|
||||
if (uploadUrl == null) {
|
||||
uploadUrl = getDefaultUploadUrl();
|
||||
}
|
||||
}
|
||||
return uploadUrl;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the upload URL to use explicitly on the client which will
|
||||
* overload the default or the broker's URL. This allows the client to decide
|
||||
* where to upload files to irrespective of the brokers configuration.
|
||||
*/
|
||||
public void setUploadUrl(String uploadUrl) {
|
||||
this.uploadUrl = uploadUrl;
|
||||
}
|
||||
|
||||
public String getBrokerUploadUrl() {
|
||||
return brokerUploadUrl;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by the JMS client when a broker advertises its upload URL
|
||||
*/
|
||||
public void setBrokerUploadUrl(String brokerUploadUrl) {
|
||||
this.brokerUploadUrl = brokerUploadUrl;
|
||||
}
|
||||
|
||||
public String getDefaultUploadUrl() {
|
||||
return defaultUploadUrl;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the default upload URL to use if the broker does not
|
||||
* have a configured upload URL
|
||||
*/
|
||||
public void setDefaultUploadUrl(String defaultUploadUrl) {
|
||||
this.defaultUploadUrl = defaultUploadUrl;
|
||||
}
|
||||
|
||||
public BlobUploadStrategy getUploadStrategy() {
|
||||
if (uploadStrategy == null) {
|
||||
uploadStrategy = createUploadStrategy();
|
||||
}
|
||||
return uploadStrategy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the upload strategy to use for uploading BLOBs to some URL
|
||||
*/
|
||||
public void setUploadStrategy(BlobUploadStrategy uploadStrategy) {
|
||||
this.uploadStrategy = uploadStrategy;
|
||||
}
|
||||
|
||||
protected BlobUploadStrategy createUploadStrategy() {
|
||||
return new DefaultBlobUploadStrategy(this);
|
||||
}
|
||||
}
|
|
@ -20,8 +20,8 @@ import org.apache.activemq.command.ActiveMQBlobMessage;
|
|||
|
||||
import javax.jms.JMSException;
|
||||
import java.io.File;
|
||||
import java.io.*;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URL;
|
||||
|
||||
/**
|
||||
|
@ -31,27 +31,36 @@ import java.net.URL;
|
|||
*/
|
||||
public class BlobUploader {
|
||||
|
||||
private BlobUploadStrategy strategy;
|
||||
private BlobTransferPolicy blobTransferPolicy;
|
||||
private File file;
|
||||
private InputStream in;
|
||||
|
||||
|
||||
public BlobUploader(BlobUploadStrategy strategy, File file) {
|
||||
this.strategy = strategy;
|
||||
this.file = file;
|
||||
public BlobUploader(BlobTransferPolicy blobTransferPolicy, InputStream in) {
|
||||
this.blobTransferPolicy = blobTransferPolicy;
|
||||
this.in = in;
|
||||
}
|
||||
|
||||
public BlobUploader(BlobUploadStrategy strategy, InputStream in) {
|
||||
this.strategy = strategy;
|
||||
this.in = in;
|
||||
public BlobUploader(BlobTransferPolicy blobTransferPolicy, File file) {
|
||||
this.blobTransferPolicy = blobTransferPolicy;
|
||||
this.file = file;
|
||||
}
|
||||
|
||||
public URL upload(ActiveMQBlobMessage message) throws JMSException, IOException {
|
||||
if (file != null) {
|
||||
return strategy.uploadFile(message, file);
|
||||
return getStrategy().uploadFile(message, file);
|
||||
}
|
||||
else {
|
||||
return strategy.uploadStream(message, in);
|
||||
return getStrategy().uploadStream(message, in);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public BlobTransferPolicy getBlobTransferPolicy() {
|
||||
return blobTransferPolicy;
|
||||
}
|
||||
|
||||
public BlobUploadStrategy getStrategy() {
|
||||
return getBlobTransferPolicy().getUploadStrategy();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.blob;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQBlobMessage;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
|
||||
/**
|
||||
* A default implementation of {@link BlobUploadStrategy} which uses the URL class to upload
|
||||
* files or streams to a remote URL
|
||||
*/
|
||||
public class DefaultBlobUploadStrategy implements BlobUploadStrategy {
|
||||
private BlobTransferPolicy transferPolicy;
|
||||
|
||||
public DefaultBlobUploadStrategy(BlobTransferPolicy transferPolicy) {
|
||||
this.transferPolicy = transferPolicy;
|
||||
}
|
||||
|
||||
public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException {
|
||||
URL url = createUploadURL(message);
|
||||
// TODO upload to URL
|
||||
// return url;
|
||||
throw new JMSException("Not implemented yet!");
|
||||
}
|
||||
|
||||
public URL uploadStream(ActiveMQBlobMessage message, InputStream in) throws JMSException, IOException {
|
||||
URL url = createUploadURL(message);
|
||||
// TODO upload to URL
|
||||
// return url;
|
||||
throw new JMSException("Not implemented yet!");
|
||||
}
|
||||
|
||||
protected URL createUploadURL(ActiveMQBlobMessage message) throws JMSException, MalformedURLException {
|
||||
return new URL(transferPolicy.getUploadUrl() + message.getMessageId().toString());
|
||||
}
|
||||
}
|
|
@ -39,6 +39,7 @@ public class ActiveMQBlobMessage extends ActiveMQMessage implements BlobMessage
|
|||
|
||||
private String remoteBlobUrl;
|
||||
private String mimeType;
|
||||
private String name;
|
||||
private boolean deletedByBroker;
|
||||
|
||||
private transient BlobUploader blobUploader;
|
||||
|
@ -90,6 +91,19 @@ public class ActiveMQBlobMessage extends ActiveMQMessage implements BlobMessage
|
|||
this.mimeType = mimeType;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* The name of the attachment which can be useful information if transmitting files over ActiveMQ
|
||||
*
|
||||
* @openwire:property version=3 cache=false
|
||||
*/
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
/**
|
||||
* @openwire:property version=3 cache=false
|
||||
*/
|
||||
|
@ -101,6 +115,10 @@ public class ActiveMQBlobMessage extends ActiveMQMessage implements BlobMessage
|
|||
this.deletedByBroker = deletedByBroker;
|
||||
}
|
||||
|
||||
public String getJMSXMimeType() {
|
||||
return getMimeType();
|
||||
}
|
||||
|
||||
public InputStream getInputStream() throws IOException, JMSException {
|
||||
URL value = getURL();
|
||||
if (value == null) {
|
||||
|
|
|
@ -107,6 +107,11 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
|
|||
public byte getDataStructureType() {
|
||||
return DATA_STRUCTURE_TYPE;
|
||||
}
|
||||
|
||||
public String getJMSXMimeType() {
|
||||
return "jms/bytes-message";
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Clears out the message body. Clearing a message's body does not clear its header values or property entries. <P>
|
||||
|
|
|
@ -147,6 +147,11 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
|
|||
public byte getDataStructureType() {
|
||||
return DATA_STRUCTURE_TYPE;
|
||||
}
|
||||
|
||||
public String getJMSXMimeType() {
|
||||
return "jms/map-message";
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Clears out the message body. Clearing a message's body does not clear its header values or property entries. <P>
|
||||
|
|
|
@ -44,7 +44,7 @@ import org.apache.activemq.util.TypeConversionSupport;
|
|||
*
|
||||
* @version $Revision:$
|
||||
*/
|
||||
public class ActiveMQMessage extends Message implements javax.jms.Message {
|
||||
public class ActiveMQMessage extends Message implements org.apache.activemq.Message {
|
||||
|
||||
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MESSAGE;
|
||||
|
||||
|
@ -180,7 +180,11 @@ public class ActiveMQMessage extends Message implements javax.jms.Message {
|
|||
public void setJMSCorrelationIDAsBytes(byte[] correlationId) throws JMSException {
|
||||
this.setCorrelationId(decodeString(correlationId));
|
||||
}
|
||||
|
||||
|
||||
public String getJMSXMimeType() {
|
||||
return "jms/message";
|
||||
}
|
||||
|
||||
static protected String decodeString(byte[] data) throws JMSException {
|
||||
try {
|
||||
if (data == null) {
|
||||
|
|
|
@ -21,6 +21,15 @@ package org.apache.activemq.command;
|
|||
|
||||
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.util.ByteArrayInputStream;
|
||||
import org.apache.activemq.util.ByteArrayOutputStream;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
|
||||
import org.apache.activemq.util.JMSExceptionSupport;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.ObjectMessage;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
|
@ -31,16 +40,6 @@ import java.io.Serializable;
|
|||
import java.util.zip.DeflaterOutputStream;
|
||||
import java.util.zip.InflaterInputStream;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.ObjectMessage;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.util.ByteArrayInputStream;
|
||||
import org.apache.activemq.util.ByteArrayOutputStream;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
|
||||
import org.apache.activemq.util.JMSExceptionSupport;
|
||||
|
||||
/**
|
||||
* An <CODE>ObjectMessage</CODE> object is used to send a message that contains a serializable object in the Java
|
||||
* programming language ("Java object"). It inherits from the <CODE>Message</CODE> interface and adds a body containing
|
||||
|
@ -108,6 +107,11 @@ public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMess
|
|||
return DATA_STRUCTURE_TYPE;
|
||||
}
|
||||
|
||||
public String getJMSXMimeType() {
|
||||
return "jms/object-message";
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Clears out the message body. Clearing a message's body does not clear its header values or property entries.
|
||||
* <p/>
|
||||
|
|
|
@ -151,6 +151,11 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
|
|||
return DATA_STRUCTURE_TYPE;
|
||||
}
|
||||
|
||||
public String getJMSXMimeType() {
|
||||
return "jms/stream-message";
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Clears out the message body. Clearing a message's body does not clear its
|
||||
* header values or property entries. <p/>
|
||||
|
|
|
@ -17,18 +17,6 @@
|
|||
*/
|
||||
package org.apache.activemq.command;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.zip.DeflaterOutputStream;
|
||||
import java.util.zip.InflaterInputStream;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageNotWriteableException;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.util.ByteArrayInputStream;
|
||||
import org.apache.activemq.util.ByteArrayOutputStream;
|
||||
|
@ -37,6 +25,17 @@ import org.apache.activemq.util.JMSExceptionSupport;
|
|||
import org.apache.activemq.util.MarshallingSupport;
|
||||
import org.apache.activemq.wireformat.WireFormat;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageNotWriteableException;
|
||||
import javax.jms.TextMessage;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.zip.DeflaterOutputStream;
|
||||
import java.util.zip.InflaterInputStream;
|
||||
|
||||
/**
|
||||
*
|
||||
* @openwire:marshaller code="28"
|
||||
|
@ -62,6 +61,10 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
|
|||
public byte getDataStructureType() {
|
||||
return DATA_STRUCTURE_TYPE;
|
||||
}
|
||||
|
||||
public String getJMSXMimeType() {
|
||||
return "jms/text-message";
|
||||
}
|
||||
|
||||
public void setText(String text) throws MessageNotWriteableException {
|
||||
checkReadOnlyBody();
|
||||
|
|
|
@ -38,6 +38,8 @@ public class BrokerInfo extends BaseCommand{
|
|||
BrokerInfo peerBrokerInfos[];
|
||||
String brokerName;
|
||||
long connectionId;
|
||||
String brokerUploadUrl;
|
||||
|
||||
|
||||
public boolean isBrokerInfo(){
|
||||
return true;
|
||||
|
@ -184,5 +186,17 @@ public class BrokerInfo extends BaseCommand{
|
|||
this.connectionId = connectionId;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* The URL to use when uploading BLOBs to the broker or some other external file/http server
|
||||
*
|
||||
* @openwire:property version=3
|
||||
*/
|
||||
public String getBrokerUploadUrl() {
|
||||
return brokerUploadUrl;
|
||||
}
|
||||
|
||||
public void setBrokerUploadUrl(String brokerUploadUrl) {
|
||||
this.brokerUploadUrl = brokerUploadUrl;
|
||||
}
|
||||
}
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -20,7 +20,7 @@
|
|||
#
|
||||
log4j.rootLogger=WARN, out
|
||||
|
||||
log4j.logger.org.apache.activemq=INFO
|
||||
log4j.logger.org.apache.activemq=DEBUG
|
||||
|
||||
# CONSOLE appender not used by default
|
||||
log4j.appender.out=org.apache.log4j.ConsoleAppender
|
||||
|
|
|
@ -17,13 +17,22 @@
|
|||
*/
|
||||
package org.apache.activemq;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import javax.jms.*;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.InvalidDestinationException;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TextMessage;
|
||||
import java.util.ArrayList;
|
||||
|
||||
/**
|
||||
* @version
|
||||
*/
|
||||
|
|
|
@ -21,7 +21,15 @@ import org.apache.activemq.spring.SpringConsumer;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import javax.jms.*;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
/**
|
||||
*
|
||||
|
|
|
@ -1 +1 @@
|
|||
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.openwire.v3;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.activemq.openwire.*;
import org.apache.activemq.command.*;
/**
* Test case for the OpenWire marshalling for BrokerInfo
*
*
* NOTE!: This file is auto generated - do not modify!
* if you need to make a change, please see the modify the groovy scripts in the
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
* @version $Revision: $
*/
public class BrokerInfoTest extends BaseCommandTestSupport {
public static BrokerInfoTest SINGLETON = new BrokerInfoTest();
public Object createObject() throws Exception {
BrokerInfo info = new BrokerInfo();
populateObject(info);
return info;
}
protected void populateObject(Object object) throws Exception {
super.populateObject(object);
BrokerInfo info = (BrokerInfo) object;
info.setBrokerId(createBrokerId("BrokerId:1"));
info.setBrokerURL("BrokerURL:2");
{
BrokerInfo value[] = new BrokerInfo[0];
for( int i=0; i < 0; i++ ) {
value[i] = createBrokerInfo("PeerBrokerInfos:3");
}
info.setPeerBrokerInfos(value);
}
info.setBrokerName("BrokerName:4");
info.setSlaveBroker(true);
info.setMasterBroker(false);
info.setFaultTolerantConfiguration(true);
info.setDuplexConnection(false);
info.setNetworkConnection(true);
info.setConnectionId(1);
}
}
|
||||
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.openwire.v3;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.activemq.openwire.*;
import org.apache.activemq.command.*;
/**
* Test case for the OpenWire marshalling for BrokerInfo
*
*
* NOTE!: This file is auto generated - do not modify!
* if you need to make a change, please see the modify the groovy scripts in the
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
* @version $Revision: $
*/
public class BrokerInfoTest extends BaseCommandTestSupport {
public static BrokerInfoTest SINGLETON = new BrokerInfoTest();
public Object createObject() throws Exception {
BrokerInfo info = new BrokerInfo();
populateObject(info);
return info;
}
protected void populateObject(Object object) throws Exception {
super.populateObject(object);
BrokerInfo info = (BrokerInfo) object;
info.setBrokerId(createBrokerId("BrokerId:1"));
info.setBrokerURL("BrokerURL:2");
{
BrokerInfo value[] = new BrokerInfo[0];
for( int i=0; i < 0; i++ ) {
value[i] = createBrokerInfo("PeerBrokerInfos:3");
}
info.setPeerBrokerInfos(value);
}
info.setBrokerName("BrokerName:4");
info.setSlaveBroker(true);
info.setMasterBroker(false);
info.setFaultTolerantConfiguration(true);
info.setDuplexConnection(false);
info.setNetworkConnection(true);
info.setConnectionId(1);
info.setBrokerUploadUrl("BrokerUploadUrl:5");
}
}
|
Loading…
Reference in New Issue