fine tuning, client control commands etc.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@393912 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-04-13 20:15:35 +00:00
parent 3e69e735b0
commit ef0734bccf
31 changed files with 663 additions and 110 deletions

View File

@ -51,9 +51,11 @@ import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ControlCommand;
@ -74,6 +76,7 @@ import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.JMSExceptionSupport;
@ -163,6 +166,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
protected ActiveMQConnection(Transport transport, JMSStatsImpl factoryStats)
throws Exception {
this.info = new ConnectionInfo(new ConnectionId(connectionIdGenerator.generateId()));
this.info.setManageable(true);
this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
this.transport = transport;
@ -1206,8 +1210,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// broker without having to do an RPC to the broker.
ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1),consumerIdGenerator.getNextSequenceId());
advisoryConsumer = new AdvisoryConsumer(this, consumerId);
advisoryConsumer = new AdvisoryConsumer(this, consumerId);
}
/**
@ -1407,12 +1410,17 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} else if ( command.isBrokerInfo() ) {
this.brokerInfo = (BrokerInfo)command;
brokerInfoReceived.countDown();
this.optimizeAcknowledge &= !this.brokerInfo.isFaultTolerantConfiguration();
}
else if (command instanceof ControlCommand) {
onControlCommand((ControlCommand) command);
}
else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
onAsyncException(((ConnectionError)command).getException());
}else if (command instanceof ConnectionControl){
onConnectionControl((ConnectionControl) command);
}else if (command instanceof ConsumerControl){
onConsumerControl((ConsumerControl) command);
}
}
for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
@ -1451,6 +1459,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
public void transportInterupted() {
for (Iterator i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession) i.next();
s.clearMessagesInProgress();
}
for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = (TransportListener) iter.next();
listener.transportInterupted();
@ -1462,6 +1474,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
TransportListener listener = (TransportListener) iter.next();
listener.transportResumed();
}
for (Iterator i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession) i.next();
s.deliverAcks();
}
}
@ -1717,6 +1733,30 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
}
protected void onConnectionControl(ConnectionControl command){
if (command.isFaultTolerant()){
this.optimizeAcknowledge = false;
for(Iterator i=this.sessions.iterator();i.hasNext();){
ActiveMQSession s=(ActiveMQSession) i.next();
s.setOptimizeAcknowledge(false);
}
}
}
protected void onConsumerControl(ConsumerControl command){
if(command.isClose()){
for(Iterator i=this.sessions.iterator();i.hasNext();){
ActiveMQSession s=(ActiveMQSession) i.next();
s.close(command.getConsumerId());
}
}else{
for(Iterator i=this.sessions.iterator();i.hasNext();){
ActiveMQSession s=(ActiveMQSession) i.next();
s.setPrefetchSize(command.getConsumerId(),command.getPrefetch());
}
}
}
protected void transportFailed(IOException error){
transportFailed.set(true);
if (firstFailureError == null) {

View File

@ -111,8 +111,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private MessageAvailableListener availableListener;
private RedeliveryPolicy redeliveryPolicy;
private boolean optimizeAcknowledge;
private AtomicBoolean optimizeAcknowledge = new AtomicBoolean();
/**
* Create a MessageConsumer
*
@ -182,6 +182,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
this.optimizeAcknowledge.set(session.connection.isOptimizeAcknowledge()&&session.isAutoAcknowledge()
&&!info.isBrowser());
this.info.setOptimizedAcknowledge(this.optimizeAcknowledge.get());
try {
this.session.addConsumer(this);
this.session.syncSendPacket(info);
@ -189,8 +192,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
this.session.removeConsumer(this);
throw e;
}
this.optimizeAcknowledge=session.connection.isOptimizeAcknowledge()&&session.isAutoAcknowledge()
&&!info.isDurable()&&!info.getDestination().isQueue();
if(session.connection.isStarted())
start();
}
@ -509,15 +511,34 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
public void clearMessagesInProgress(){
void clearMessagesInProgress(){
unconsumedMessages.clear();
}
void deliverAcks(){
synchronized(optimizeAcknowledge){
if(this.optimizeAcknowledge.get()){
if(!deliveredMessages.isEmpty()){
MessageDispatch md=(MessageDispatch) deliveredMessages.getFirst();
MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
try{
session.asyncSendPacket(ack);
}catch(JMSException e){
log.error("Failed to delivered acknowledgements",e);
}
deliveredMessages.clear();
ackCounter=0;
}
}
}
}
public void dispose() throws JMSException {
if (!unconsumedMessages.isClosed()) {
// Do we have any acks we need to send out before closing?
// Ack any delivered messages now. (session may still
// commit/rollback the acks).
deliverAcks();//only processes optimized acknowledgements
if ((session.isTransacted() || session.isDupsOkAcknowledge())) {
acknowledge();
}
@ -539,6 +560,18 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
protected void checkMessageListener() throws JMSException {
session.checkMessageListener();
}
protected void setOptimizeAcknowledge(boolean value){
synchronized(optimizeAcknowledge){
deliverAcks();
optimizeAcknowledge.set(value);
}
}
protected void setPrefetchSize(int prefetch){
deliverAcks();
this.info.setPrefetchSize(prefetch);
}
private void beforeMessageIsConsumed(MessageDispatch md) {
md.setDeliverySequenceId(session.getNextDeliveryId());
@ -557,18 +590,20 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
}else if(session.isAutoAcknowledge()){
if(!deliveredMessages.isEmpty()){
if(this.optimizeAcknowledge){
ackCounter++;
if(ackCounter>=(info.getPrefetchSize()*.75)){
MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,ackCounter);
synchronized(optimizeAcknowledge){
if(this.optimizeAcknowledge.get()){
ackCounter++;
if(ackCounter>=(info.getPrefetchSize()*.75)){
MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,ackCounter);
session.asyncSendPacket(ack);
ackCounter=0;
deliveredMessages.clear();
}
}else{
MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
session.asyncSendPacket(ack);
ackCounter=0;
deliveredMessages.clear();
}
}else{
MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
session.asyncSendPacket(ack);
deliveredMessages.clear();
}
}
}else if(session.isDupsOkAcknowledge()){
@ -662,11 +697,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public void rollback() throws JMSException{
synchronized(unconsumedMessages.getMutex()){
if(optimizeAcknowledge){
// remove messages read but not acked at the broker yet through optimizeAcknowledge
for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
deliveredMessages.removeLast();
synchronized(optimizeAcknowledge){
if(optimizeAcknowledge.get()){
// remove messages read but not acked at the broker yet through optimizeAcknowledge
for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
deliveredMessages.removeLast();
}
}
}
if(deliveredMessages.isEmpty())

View File

@ -517,6 +517,21 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
connection.asyncSendPacket(info.createRemoveCommand());
}
}
void clearMessagesInProgress(){
executor.clearMessagesInProgress();
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) iter.next();
consumer.clearMessagesInProgress();
}
}
void deliverAcks(){
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) iter.next();
consumer.deliverAcks();
}
}
synchronized public void dispose() throws JMSException {
if (!closed) {
@ -1707,6 +1722,38 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
}
}
}
protected void setOptimizeAcknowledge(boolean value){
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
ActiveMQMessageConsumer c = (ActiveMQMessageConsumer) iter.next();
c.setOptimizeAcknowledge(value);
}
}
protected void setPrefetchSize(ConsumerId id,int prefetch){
for(Iterator iter=consumers.iterator();iter.hasNext();){
ActiveMQMessageConsumer c=(ActiveMQMessageConsumer) iter.next();
if(c.getConsumerId().equals(id)){
c.setPrefetchSize(prefetch);
break;
}
}
}
protected void close(ConsumerId id){
for(Iterator iter=consumers.iterator();iter.hasNext();){
ActiveMQMessageConsumer c=(ActiveMQMessageConsumer) iter.next();
if(c.getConsumerId().equals(id)){
try{
c.close();
}catch(JMSException e){
log.warn("Exception closing consumer",e);
}
log.warn("Closed consumer on Command");
break;
}
}
}

View File

@ -234,4 +234,8 @@ public interface Broker extends Region, Service {
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception;
/**
* @return true if fault tolerant
*/
public boolean isFaultTolerantConfiguration();
}

View File

@ -205,4 +205,9 @@ public class BrokerFilter implements Broker {
}
public boolean isFaultTolerantConfiguration(){
return next.isFaultTolerantConfiguration();
}
}

View File

@ -90,5 +90,10 @@ public interface Connection extends Service {
* Returns the statistics for this connection
*/
public ConnectionStatistics getStatistics();
/**
* @return true if the Connection will process control commands
*/
public boolean isManageable();
}

View File

@ -198,6 +198,10 @@ public class EmptyBroker implements Broker{
public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
}
public boolean isFaultTolerantConfiguration(){
return false;
}
}

View File

@ -200,6 +200,10 @@ public class ErrorBroker implements Broker {
throw new IllegalStateException(this.message);
}
public boolean isFaultTolerantConfiguration(){
throw new IllegalStateException(this.message);
}
}

View File

@ -215,4 +215,8 @@ public class MutableBrokerFilter implements Broker {
}
public boolean isFaultTolerantConfiguration(){
return getNext().isFaultTolerantConfiguration();
}
}

View File

@ -18,11 +18,13 @@ package org.apache.activemq.broker.ft;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.InsertableMutableBrokerFilter;
import org.apache.activemq.broker.MutableBrokerFilter;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse;
@ -67,11 +69,26 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
*/
public void startProcessing(){
started.set(true);
try{
Connection[] connections=getClients();
ConnectionControl command=new ConnectionControl();
command.setFaultTolerant(true);
if(connections!=null){
for(int i=0;i<connections.length;i++){
if(connections[i].isActive()&&connections[i].isManageable()){
connections[i].dispatchAsync(command);
}
}
}
}catch(Exception e){
log.error("Failed to get Connections",e);
}
}
/**
* stop the broker
* @throws Exception
*
* @throws Exception
*/
public void stop() throws Exception{
super.stop();
@ -303,6 +320,10 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
}
public boolean isFaultTolerantConfiguration(){
return true;
}
protected void sendToSlave(Message message){

View File

@ -131,6 +131,9 @@ public class MasterConnector implements Service{
connectionInfo.setUserName(userName);
connectionInfo.setPassword(password);
localBroker.oneway(connectionInfo);
ConnectionInfo remoteInfo=new ConnectionInfo();
connectionInfo.copy(remoteInfo);
remoteInfo.setBrokerMasterConnector(true);
remoteBroker.oneway(connectionInfo);
sessionInfo=new SessionInfo(connectionInfo,1);

View File

@ -470,6 +470,10 @@ public class RegionBroker implements Broker {
public Set getDurableDestinations(){
return adaptor != null ? adaptor.getDestinations() : Collections.EMPTY_SET;
}
public boolean isFaultTolerantConfiguration(){
return false;
}
protected void doStop(ServiceStopper ss) {
@ -487,5 +491,7 @@ public class RegionBroker implements Broker {
this.keepDurableSubsActive = keepDurableSubsActive;
}
}

View File

@ -1,105 +1,132 @@
/**
*
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.command;
import org.apache.activemq.state.CommandVisitor;
/**
* When a client connects to a broker, the broker send the client a BrokerInfo
* so that the client knows which broker node he's talking to and also any peers
* that the node has in his cluster. This is the broker helping the client out
* When a client connects to a broker, the broker send the client a BrokerInfo so that the client knows which broker
* node he's talking to and also any peers that the node has in his cluster. This is the broker helping the client out
* in discovering other nodes in the cluster.
*
* @openwire:marshaller code="2"
* @version $Revision: 1.7 $
*/
public class BrokerInfo extends BaseCommand {
public class BrokerInfo extends BaseCommand{
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.BROKER_INFO;
BrokerId brokerId;
String brokerURL;
boolean slaveBroker;
boolean masterBroker;
boolean faultTolerantConfiguration;
BrokerInfo peerBrokerInfos[];
String brokerName;
public boolean isBrokerInfo() {
public boolean isBrokerInfo(){
return true;
}
public byte getDataStructureType() {
public byte getDataStructureType(){
return DATA_STRUCTURE_TYPE;
}
/**
* @openwire:property version=1 cache=true
*/
public BrokerId getBrokerId() {
public BrokerId getBrokerId(){
return brokerId;
}
public void setBrokerId(BrokerId brokerId) {
this.brokerId = brokerId;
public void setBrokerId(BrokerId brokerId){
this.brokerId=brokerId;
}
/**
* @openwire:property version=1
*/
public String getBrokerURL() {
public String getBrokerURL(){
return brokerURL;
}
public void setBrokerURL(String brokerURL) {
this.brokerURL = brokerURL;
public void setBrokerURL(String brokerURL){
this.brokerURL=brokerURL;
}
/**
* @openwire:property version=1 testSize=0
*/
public BrokerInfo[] getPeerBrokerInfos() {
public BrokerInfo[] getPeerBrokerInfos(){
return peerBrokerInfos;
}
public void setPeerBrokerInfos(BrokerInfo[] peerBrokerInfos) {
this.peerBrokerInfos = peerBrokerInfos;
public void setPeerBrokerInfos(BrokerInfo[] peerBrokerInfos){
this.peerBrokerInfos=peerBrokerInfos;
}
/**
* @openwire:property version=1
*/
public String getBrokerName() {
public String getBrokerName(){
return brokerName;
}
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
public void setBrokerName(String brokerName){
this.brokerName=brokerName;
}
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processBrokerInfo( this );
public Response visit(CommandVisitor visitor) throws Exception{
return visitor.processBrokerInfo(this);
}
/**
* @openwire:property version=1 cache=true
* @openwire:property version=1
*/
public boolean isSlaveBroker(){
return slaveBroker;
}
public void setSlaveBroker(boolean slaveBroker){
this.slaveBroker=slaveBroker;
}
/**
* @openwire:property version=1
*/
public boolean isMasterBroker(){
return masterBroker;
}
/**
* @param masterBroker
* The masterBroker to set.
*/
public void setMasterBroker(boolean masterBroker){
this.masterBroker=masterBroker;
}
/**
* @openwire:property version=1
* @return Returns the faultTolerantConfiguration.
*/
public boolean isFaultTolerantConfiguration(){
return faultTolerantConfiguration;
}
/**
* @param faultTolerantConfiguration
* The faultTolerantConfiguration to set.
*/
public void setFaultTolerantConfiguration(boolean faultTolerantConfiguration){
this.faultTolerantConfiguration=faultTolerantConfiguration;
}
}

View File

@ -47,6 +47,8 @@ public interface CommandTypes {
byte CONTROL_COMMAND = 14;
byte FLUSH_COMMAND = 15;
byte CONNECTION_ERROR = 16;
byte CONSUMER_CONTROL = 17;
byte CONNECTION_CONTROL = 18;
///////////////////////////////////////////////////
//
@ -124,6 +126,11 @@ public interface CommandTypes {
byte BOOLEAN_TYPE = 78;
byte BYTE_ARRAY_TYPE = 79;
///////////////////////////////////////////////////
//
// Broker to Broker command objects
@ -133,6 +140,7 @@ public interface CommandTypes {
byte MESSAGE_DISPATCH_NOTIFICATION = 90;
byte NETWORK_BRIDGE_FILTER = 91;
///////////////////////////////////////////////////
//
// Data structures contained in the command objects.
@ -153,6 +161,9 @@ public interface CommandTypes {
byte PRODUCER_ID = 123;
byte BROKER_ID = 124;

View File

@ -0,0 +1,119 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.command;
import org.apache.activemq.state.CommandVisitor;
/**
* Used to start and stop transports as well as terminating clients.
*
* @openwire:marshaller code="18"
*
* @version $Revision: 1.1 $
*/
public class ConnectionControl extends BaseCommand{
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.CONNECTION_CONTROL;
protected boolean suspend;
protected boolean resume;
protected boolean close;
protected boolean exit;
protected boolean faultTolerant;
public byte getDataStructureType(){
return DATA_STRUCTURE_TYPE;
}
public Response visit(CommandVisitor visitor) throws Exception{
return null;
}
/**
* @openwire:property version=1
* @return Returns the close.
*/
public boolean isClose(){
return close;
}
/**
* @param close
* The close to set.
*/
public void setClose(boolean close){
this.close=close;
}
/**
* @openwire:property version=1
* @return Returns the exit.
*/
public boolean isExit(){
return exit;
}
/**
* @param exit
* The exit to set.
*/
public void setExit(boolean exit){
this.exit=exit;
}
/**
* @openwire:property version=1
* @return Returns the faultTolerant.
*/
public boolean isFaultTolerant(){
return faultTolerant;
}
/**
* @param faultTolerant
* The faultTolerant to set.
*/
public void setFaultTolerant(boolean faultTolerant){
this.faultTolerant=faultTolerant;
}
/**
* @openwire:property version=1
* @return Returns the resume.
*/
public boolean isResume(){
return resume;
}
/**
* @param resume
* The resume to set.
*/
public void setResume(boolean resume){
this.resume=resume;
}
/**
* @openwire:property version=1
* @return Returns the suspend.
*/
public boolean isSuspend(){
return suspend;
}
/**
* @param suspend
* The suspend to set.
*/
public void setSuspend(boolean suspend){
this.suspend=suspend;
}
}

View File

@ -33,6 +33,8 @@ public class ConnectionInfo extends BaseCommand {
protected String userName;
protected String password;
protected BrokerId[] brokerPath;
protected boolean brokerMasterConnector;
protected boolean manageable;
public ConnectionInfo() {
}
@ -43,6 +45,16 @@ public class ConnectionInfo extends BaseCommand {
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
public void copy(ConnectionInfo copy) {
super.copy(copy);
copy.clientId = clientId;
copy.userName = userName;
copy.password = password;
copy.brokerPath = brokerPath;
copy.brokerMasterConnector = brokerMasterConnector;
copy.manageable = manageable;
}
/**
* @openwire:property version=1 cache=true
@ -105,5 +117,29 @@ public class ConnectionInfo extends BaseCommand {
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processAddConnection( this );
}
/**
* @openwire:property version=1
*/
public boolean isBrokerMasterConnector(){
return brokerMasterConnector;
}
/**
* @param brokerMasterConnector The brokerMasterConnector to set.
*/
public void setBrokerMasterConnector(boolean slaveBroker){
this.brokerMasterConnector=slaveBroker;
}
/**
* @openwire:property version=1
*/
public boolean isManageable(){
return manageable;
}
/**
* @param manageable The manageable to set.
*/
public void setManageable(boolean manageable){
this.manageable=manageable;
}
}

View File

@ -0,0 +1,115 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.command;
import org.apache.activemq.state.CommandVisitor;
/**
* Used to start and stop transports as well as terminating clients.
*
* @openwire:marshaller code="17"
*
* @version $Revision: 1.1 $
*/
public class ConsumerControl extends BaseCommand {
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_CONTROL;
protected ConsumerId consumerId;
protected boolean close;
protected int prefetch;
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
public Response visit(CommandVisitor visitor) throws Exception {
return null;
}
/**
* @openwire:property version=1
* @return Returns the close.
*/
public boolean isClose(){
return close;
}
/**
* @param close The close to set.
*/
public void setClose(boolean close){
this.close=close;
}
/**
* @openwire:property version=1
* @return Returns the consumerId.
*/
public ConsumerId getConsumerId(){
return consumerId;
}
/**
* @param consumerId The consumerId to set.
*/
public void setConsumerId(ConsumerId consumerId){
this.consumerId=consumerId;
}
/**
* @openwire:property version=1
* @return Returns the prefetch.
*/
public int getPrefetch(){
return prefetch;
}
/**
* @param prefetch The prefetch to set.
*/
public void setPrefetch(int prefetch){
this.prefetch=prefetch;
}
}

View File

@ -48,6 +48,7 @@ public class ConsumerInfo extends BaseCommand {
protected boolean retroactive;
protected byte priority;
protected BrokerId[] brokerPath;
protected boolean optimizedAcknowledge;
protected BooleanExpression additionalPredicate;
protected transient boolean networkSubscription; //this subscription originated from a network connection
@ -306,4 +307,19 @@ public class ConsumerInfo extends BaseCommand {
this.networkSubscription=networkSubscription;
}
/**
* @openwire:property version=1
* @return Returns the optimizedAcknowledge.
*/
public boolean isOptimizedAcknowledge(){
return optimizedAcknowledge;
}
/**
* @param optimizedAcknowledge The optimizedAcknowledge to set.
*/
public void setOptimizedAcknowledge(boolean optimizedAcknowledge){
this.optimizedAcknowledge=optimizedAcknowledge;
}
}

View File

@ -81,6 +81,8 @@ public class BrokerInfoMarshaller extends BaseCommandMarshaller {
}
info.setBrokerName(tightUnmarshalString(dataIn, bs));
info.setSlaveBroker(bs.readBoolean());
info.setMasterBroker(bs.readBoolean());
info.setFaultTolerantConfiguration(bs.readBoolean());
}
@ -98,6 +100,8 @@ public class BrokerInfoMarshaller extends BaseCommandMarshaller {
rc += tightMarshalObjectArray1(wireFormat, info.getPeerBrokerInfos(), bs);
rc += tightMarshalString1(info.getBrokerName(), bs);
bs.writeBoolean(info.isSlaveBroker());
bs.writeBoolean(info.isMasterBroker());
bs.writeBoolean(info.isFaultTolerantConfiguration());
return rc + 0;
}
@ -118,6 +122,8 @@ public class BrokerInfoMarshaller extends BaseCommandMarshaller {
tightMarshalObjectArray2(wireFormat, info.getPeerBrokerInfos(), dataOut, bs);
tightMarshalString2(info.getBrokerName(), dataOut, bs);
bs.readBoolean();
bs.readBoolean();
bs.readBoolean();
}
@ -148,6 +154,8 @@ public class BrokerInfoMarshaller extends BaseCommandMarshaller {
}
info.setBrokerName(looseUnmarshalString(dataIn));
info.setSlaveBroker(dataIn.readBoolean());
info.setMasterBroker(dataIn.readBoolean());
info.setFaultTolerantConfiguration(dataIn.readBoolean());
}
@ -165,6 +173,8 @@ public class BrokerInfoMarshaller extends BaseCommandMarshaller {
looseMarshalObjectArray(wireFormat, info.getPeerBrokerInfos(), dataOut);
looseMarshalString(info.getBrokerName(), dataOut);
dataOut.writeBoolean(info.isSlaveBroker());
dataOut.writeBoolean(info.isMasterBroker());
dataOut.writeBoolean(info.isFaultTolerantConfiguration());
}
}

View File

@ -0,0 +1 @@
/** * * Copyright 2005-2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.openwire.v1; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.apache.activemq.openwire.*; import org.apache.activemq.command.*; /** * Marshalling code for Open Wire Format for ConnectionControlMarshaller * * * NOTE!: This file is auto generated - do not modify! * if you need to make a change, please see the modify the groovy scripts in the * under src/gram/script and then use maven openwire:generate to regenerate * this file. * * @version $Revision$ */ public class ConnectionControlMarshaller extends BaseCommandMarshaller { /** * Return the type of Data Structure we marshal * @return short representation of the type data structure */ public byte getDataStructureType() { return ConnectionControl.DATA_STRUCTURE_TYPE; } /** * @return a new object instance */ public DataStructure createObject() { return new ConnectionControl(); } /** * Un-marshal an object instance from the data input stream * * @param o the object to un-marshal * @param dataIn the data input stream to build the object from * @throws IOException */ public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn, BooleanStream bs) throws IOException { super.tightUnmarshal(wireFormat, o, dataIn, bs); ConnectionControl info = (ConnectionControl)o; info.setClose(bs.readBoolean()); info.setExit(bs.readBoolean()); info.setFaultTolerant(bs.readBoolean()); info.setResume(bs.readBoolean()); info.setSuspend(bs.readBoolean()); } /** * Write the booleans that this object uses to a BooleanStream */ public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException { ConnectionControl info = (ConnectionControl)o; int rc = super.tightMarshal1(wireFormat, o, bs); bs.writeBoolean(info.isClose()); bs.writeBoolean(info.isExit()); bs.writeBoolean(info.isFaultTolerant()); bs.writeBoolean(info.isResume()); bs.writeBoolean(info.isSuspend()); return rc + 0; } /** * Write a object instance to data output stream * * @param o the instance to be marshaled * @param dataOut the output stream * @throws IOException thrown if an error occurs */ public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut, BooleanStream bs) throws IOException { super.tightMarshal2(wireFormat, o, dataOut, bs); ConnectionControl info = (ConnectionControl)o; bs.readBoolean(); bs.readBoolean(); bs.readBoolean(); bs.readBoolean(); bs.readBoolean(); } /** * Un-marshal an object instance from the data input stream * * @param o the object to un-marshal * @param dataIn the data input stream to build the object from * @throws IOException */ public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn) throws IOException { super.looseUnmarshal(wireFormat, o, dataIn); ConnectionControl info = (ConnectionControl)o; info.setClose(dataIn.readBoolean()); info.setExit(dataIn.readBoolean()); info.setFaultTolerant(dataIn.readBoolean()); info.setResume(dataIn.readBoolean()); info.setSuspend(dataIn.readBoolean()); } /** * Write the booleans that this object uses to a BooleanStream */ public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut) throws IOException { ConnectionControl info = (ConnectionControl)o; super.looseMarshal(wireFormat, o, dataOut); dataOut.writeBoolean(info.isClose()); dataOut.writeBoolean(info.isExit()); dataOut.writeBoolean(info.isFaultTolerant()); dataOut.writeBoolean(info.isResume()); dataOut.writeBoolean(info.isSuspend()); } }

View File

@ -81,6 +81,8 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller {
else {
info.setBrokerPath(null);
}
info.setBrokerMasterConnector(bs.readBoolean());
info.setManageable(bs.readBoolean());
}
@ -98,6 +100,8 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller {
rc += tightMarshalString1(info.getPassword(), bs);
rc += tightMarshalString1(info.getUserName(), bs);
rc += tightMarshalObjectArray1(wireFormat, info.getBrokerPath(), bs);
bs.writeBoolean(info.isBrokerMasterConnector());
bs.writeBoolean(info.isManageable());
return rc + 0;
}
@ -118,6 +122,8 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller {
tightMarshalString2(info.getPassword(), dataOut, bs);
tightMarshalString2(info.getUserName(), dataOut, bs);
tightMarshalObjectArray2(wireFormat, info.getBrokerPath(), dataOut, bs);
bs.readBoolean();
bs.readBoolean();
}
@ -148,6 +154,8 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller {
else {
info.setBrokerPath(null);
}
info.setBrokerMasterConnector(dataIn.readBoolean());
info.setManageable(dataIn.readBoolean());
}
@ -165,6 +173,8 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller {
looseMarshalString(info.getPassword(), dataOut);
looseMarshalString(info.getUserName(), dataOut);
looseMarshalObjectArray(wireFormat, info.getBrokerPath(), dataOut);
dataOut.writeBoolean(info.isBrokerMasterConnector());
dataOut.writeBoolean(info.isManageable());
}
}

View File

@ -0,0 +1 @@
/** * * Copyright 2005-2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.openwire.v1; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.apache.activemq.openwire.*; import org.apache.activemq.command.*; /** * Marshalling code for Open Wire Format for ConsumerControlMarshaller * * * NOTE!: This file is auto generated - do not modify! * if you need to make a change, please see the modify the groovy scripts in the * under src/gram/script and then use maven openwire:generate to regenerate * this file. * * @version $Revision$ */ public class ConsumerControlMarshaller extends BaseCommandMarshaller { /** * Return the type of Data Structure we marshal * @return short representation of the type data structure */ public byte getDataStructureType() { return ConsumerControl.DATA_STRUCTURE_TYPE; } /** * @return a new object instance */ public DataStructure createObject() { return new ConsumerControl(); } /** * Un-marshal an object instance from the data input stream * * @param o the object to un-marshal * @param dataIn the data input stream to build the object from * @throws IOException */ public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn, BooleanStream bs) throws IOException { super.tightUnmarshal(wireFormat, o, dataIn, bs); ConsumerControl info = (ConsumerControl)o; info.setClose(bs.readBoolean()); info.setConsumerId((org.apache.activemq.command.ConsumerId) tightUnmarsalNestedObject(wireFormat, dataIn, bs)); info.setPrefetch(dataIn.readInt()); } /** * Write the booleans that this object uses to a BooleanStream */ public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException { ConsumerControl info = (ConsumerControl)o; int rc = super.tightMarshal1(wireFormat, o, bs); bs.writeBoolean(info.isClose()); rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getConsumerId(), bs); return rc + 4; } /** * Write a object instance to data output stream * * @param o the instance to be marshaled * @param dataOut the output stream * @throws IOException thrown if an error occurs */ public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut, BooleanStream bs) throws IOException { super.tightMarshal2(wireFormat, o, dataOut, bs); ConsumerControl info = (ConsumerControl)o; bs.readBoolean(); tightMarshalNestedObject2(wireFormat, (DataStructure)info.getConsumerId(), dataOut, bs); dataOut.writeInt(info.getPrefetch()); } /** * Un-marshal an object instance from the data input stream * * @param o the object to un-marshal * @param dataIn the data input stream to build the object from * @throws IOException */ public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn) throws IOException { super.looseUnmarshal(wireFormat, o, dataIn); ConsumerControl info = (ConsumerControl)o; info.setClose(dataIn.readBoolean()); info.setConsumerId((org.apache.activemq.command.ConsumerId) looseUnmarsalNestedObject(wireFormat, dataIn)); info.setPrefetch(dataIn.readInt()); } /** * Write the booleans that this object uses to a BooleanStream */ public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut) throws IOException { ConsumerControl info = (ConsumerControl)o; super.looseMarshal(wireFormat, o, dataOut); dataOut.writeBoolean(info.isClose()); looseMarshalNestedObject(wireFormat, (DataStructure)info.getConsumerId(), dataOut); dataOut.writeInt(info.getPrefetch()); } }

View File

@ -91,6 +91,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller {
}
info.setAdditionalPredicate((org.apache.activemq.filter.BooleanExpression) tightUnmarsalNestedObject(wireFormat, dataIn, bs));
info.setNetworkSubscription(bs.readBoolean());
info.setOptimizedAcknowledge(bs.readBoolean());
}
@ -115,6 +116,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller {
rc += tightMarshalObjectArray1(wireFormat, info.getBrokerPath(), bs);
rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getAdditionalPredicate(), bs);
bs.writeBoolean(info.isNetworkSubscription());
bs.writeBoolean(info.isOptimizedAcknowledge());
return rc + 9;
}
@ -145,6 +147,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller {
tightMarshalObjectArray2(wireFormat, info.getBrokerPath(), dataOut, bs);
tightMarshalNestedObject2(wireFormat, (DataStructure)info.getAdditionalPredicate(), dataOut, bs);
bs.readBoolean();
bs.readBoolean();
}
@ -185,6 +188,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller {
}
info.setAdditionalPredicate((org.apache.activemq.filter.BooleanExpression) looseUnmarsalNestedObject(wireFormat, dataIn));
info.setNetworkSubscription(dataIn.readBoolean());
info.setOptimizedAcknowledge(dataIn.readBoolean());
}
@ -212,6 +216,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller {
looseMarshalObjectArray(wireFormat, info.getBrokerPath(), dataOut);
looseMarshalNestedObject(wireFormat, (DataStructure)info.getAdditionalPredicate(), dataOut);
dataOut.writeBoolean(info.isNetworkSubscription());
dataOut.writeBoolean(info.isOptimizedAcknowledge());
}
}

View File

@ -81,8 +81,10 @@ public class MarshallerFactory {
add(new DestinationInfoMarshaller());
add(new ShutdownInfoMarshaller());
add(new DataResponseMarshaller());
add(new ConnectionControlMarshaller());
add(new KeepAliveInfoMarshaller());
add(new FlushCommandMarshaller());
add(new ConsumerControlMarshaller());
add(new JournalTopicAckMarshaller());
add(new BrokerIdMarshaller());
add(new MessageDispatchMarshaller());

View File

@ -1,60 +1,53 @@
/**
*
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.transport;
import java.io.IOException;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
/**
* @version $Revision: 1.5 $
*/
public class TransportFilter extends DefaultTransportListener implements Transport {
public class TransportFilter implements TransportListener,Transport{
final protected Transport next;
private TransportListener transportListener;
public TransportFilter(Transport next) {
this.next = next;
public TransportFilter(Transport next){
this.next=next;
}
public TransportListener getTransportListener() {
public TransportListener getTransportListener(){
return transportListener;
}
public void setTransportListener(TransportListener channelListener) {
this.transportListener = channelListener;
if (channelListener == null)
public void setTransportListener(TransportListener channelListener){
this.transportListener=channelListener;
if(channelListener==null)
next.setTransportListener(null);
else
next.setTransportListener(this);
}
/**
* @see org.apache.activemq.Service#start()
* @throws IOException if the next channel has not been set.
* @throws IOException
* if the next channel has not been set.
*/
public void start() throws Exception {
if( next == null )
public void start() throws Exception{
if(next==null)
throw new IOException("The next channel has not been set.");
if( transportListener == null )
if(transportListener==null)
throw new IOException("The command listener has not been set.");
next.start();
}
@ -62,51 +55,57 @@ public class TransportFilter extends DefaultTransportListener implements Transpo
/**
* @see org.apache.activemq.Service#stop()
*/
public void stop() throws Exception {
public void stop() throws Exception{
next.stop();
}
}
public void onCommand(Command command) {
public void onCommand(Command command){
transportListener.onCommand(command);
}
/**
* @return Returns the next.
*/
public Transport getNext() {
public Transport getNext(){
return next;
}
public String toString() {
public String toString(){
return next.toString();
}
public void oneway(Command command) throws IOException {
public void oneway(Command command) throws IOException{
next.oneway(command);
}
public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
return next.asyncRequest(command, null);
public FutureResponse asyncRequest(Command command,ResponseCallback responseCallback) throws IOException{
return next.asyncRequest(command,null);
}
public Response request(Command command) throws IOException {
public Response request(Command command) throws IOException{
return next.request(command);
}
public Response request(Command command,int timeout) throws IOException {
public Response request(Command command,int timeout) throws IOException{
return next.request(command,timeout);
}
public void onException(IOException error) {
public void onException(IOException error){
transportListener.onException(error);
}
public Object narrow(Class target) {
if( target.isAssignableFrom(getClass()) ) {
public void transportInterupted(){
transportListener.transportInterupted();
}
public void transportResumed(){
transportListener.transportResumed();
}
public Object narrow(Class target){
if(target.isAssignableFrom(getClass())){
return this;
}
return next.narrow(target);
}
}
}

View File

@ -78,7 +78,7 @@ public class FailoverTransport implements CompositeTransport {
private long reconnectDelay = initialReconnectDelay;
private Exception connectionFailure;
private final TransportListener myTransportListener = new DefaultTransportListener() {
private final TransportListener myTransportListener = new TransportListener() {
public void onCommand(Command command) {
if (command == null) {
return;
@ -113,6 +113,18 @@ public class FailoverTransport implements CompositeTransport {
transportListener.onException(new InterruptedIOException());
}
}
public void transportInterupted(){
if (transportListener != null){
transportListener.transportInterupted();
}
}
public void transportResumed(){
if(transportListener != null){
transportListener.transportResumed();
}
}
};
public FailoverTransport() throws InterruptedIOException {
@ -147,9 +159,11 @@ public class FailoverTransport implements CompositeTransport {
Transport t = TransportFactory.compositeConnect(uri);
t.setTransportListener(myTransportListener);
t.start();
if (started) {
restoreTransport(t);
}
log.debug("Connection established");
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
@ -159,6 +173,7 @@ public class FailoverTransport implements CompositeTransport {
if (transportListener != null){
transportListener.transportResumed();
}
return false;
}
catch (Exception e) {

View File

@ -62,6 +62,8 @@ public class BrokerInfoTest extends BaseCommandTestSupport {
}
info.setBrokerName("BrokerName:4");
info.setSlaveBroker(true);
info.setMasterBroker(false);
info.setFaultTolerantConfiguration(true);
}
}

View File

@ -0,0 +1 @@
/** * * Copyright 2005-2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.openwire.v1; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.apache.activemq.openwire.*; import org.apache.activemq.command.*; /** * Test case for the OpenWire marshalling for ConnectionControl * * * NOTE!: This file is auto generated - do not modify! * if you need to make a change, please see the modify the groovy scripts in the * under src/gram/script and then use maven openwire:generate to regenerate * this file. * * @version $Revision: $ */ public class ConnectionControlTest extends BaseCommandTestSupport { public static ConnectionControlTest SINGLETON = new ConnectionControlTest(); public Object createObject() throws Exception { ConnectionControl info = new ConnectionControl(); populateObject(info); return info; } protected void populateObject(Object object) throws Exception { super.populateObject(object); ConnectionControl info = (ConnectionControl) object; info.setClose(true); info.setExit(false); info.setFaultTolerant(true); info.setResume(false); info.setSuspend(true); } }

View File

@ -62,6 +62,8 @@ public class ConnectionInfoTest extends BaseCommandTestSupport {
}
info.setBrokerPath(value);
}
info.setBrokerMasterConnector(true);
info.setManageable(false);
}
}

View File

@ -0,0 +1 @@
/** * * Copyright 2005-2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.openwire.v1; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.apache.activemq.openwire.*; import org.apache.activemq.command.*; /** * Test case for the OpenWire marshalling for ConsumerControl * * * NOTE!: This file is auto generated - do not modify! * if you need to make a change, please see the modify the groovy scripts in the * under src/gram/script and then use maven openwire:generate to regenerate * this file. * * @version $Revision: $ */ public class ConsumerControlTest extends BaseCommandTestSupport { public static ConsumerControlTest SINGLETON = new ConsumerControlTest(); public Object createObject() throws Exception { ConsumerControl info = new ConsumerControl(); populateObject(info); return info; } protected void populateObject(Object object) throws Exception { super.populateObject(object); ConsumerControl info = (ConsumerControl) object; info.setClose(true); info.setConsumerId(createConsumerId("ConsumerId:1")); info.setPrefetch(1); } }

View File

@ -72,6 +72,7 @@ public class ConsumerInfoTest extends BaseCommandTestSupport {
}
info.setAdditionalPredicate(createBooleanExpression("AdditionalPredicate:6"));
info.setNetworkSubscription(false);
info.setOptimizedAcknowledge(true);
}
}