- Changed the command id objects so that they do not use fields that have the same name as the class (for the .net folks)

- Added a ConnectionError command that can be used to notify a connection of async errors associated with the connection.
- 

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@367341 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-01-09 17:21:52 +00:00
parent 42ec94a247
commit c59246d5ed
36 changed files with 292 additions and 107 deletions

View File

@ -1210,7 +1210,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
waitForBrokerInfo();
if( brokerInfo==null )
throw new JMSException("Connection failed before Broker info was received.");
return brokerInfo.getBrokerId().getBrokerId();
return brokerInfo.getBrokerId().getValue();
}
/**

View File

@ -158,6 +158,6 @@ public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQD
}
public String toString() {
return "ActiveMQConnectionConsumer { consumerId=" +consumerInfo.getConsumerId()+" }";
return "ActiveMQConnectionConsumer { value=" +consumerInfo.getConsumerId()+" }";
}
}

View File

@ -69,7 +69,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
}
String connectionID = connection.getConnectionInfo().getConnectionId().getConnectionId();
String connectionID = connection.getConnectionInfo().getConnectionId().getValue();
if (physicalName.indexOf(connectionID) < 0) {
throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
@ -226,7 +226,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
}
public String toString() {
return "ActiveMQInputStream { consumerId="+info.getConsumerId()+", producerId=" +producerId+" }";
return "ActiveMQInputStream { value="+info.getConsumerId()+", producerId=" +producerId+" }";
}
}

View File

@ -116,7 +116,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
* Create a MessageConsumer
*
* @param session
* @param consumerId
* @param value
* @param dest
* @param name
* @param selector
@ -140,7 +140,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
}
String connectionID = session.connection.getConnectionInfo().getConnectionId().getConnectionId();
String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
if (physicalName.indexOf(connectionID) < 0) {
throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
@ -211,7 +211,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
/**
* @return Returns the consumerId.
* @return Returns the value.
*/
protected ConsumerId getConsumerId() {
return info.getConsumerId();
@ -752,7 +752,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
public String toString() {
return "ActiveMQMessageConsumer { consumerId=" +info.getConsumerId()+", started=" +started.get()+" }";
return "ActiveMQMessageConsumer { value=" +info.getConsumerId()+", started=" +started.get()+" }";
}
}

View File

@ -499,7 +499,7 @@ public class ActiveMQMessageProducer implements MessageProducer, StatsCapable, C
}
public String toString() {
return "ActiveMQMessageProducer { consumerId=" +info.getProducerId()+" }";
return "ActiveMQMessageProducer { value=" +info.getProducerId()+" }";
}
}

View File

@ -242,7 +242,7 @@ public class ActiveMQQueueBrowser implements
}
public String toString() {
return "ActiveMQQueueBrowser { consumerId=" +consumerId+" }";
return "ActiveMQQueueBrowser { value=" +consumerId+" }";
}
}

View File

@ -59,7 +59,7 @@ public class ActiveMQQueueReceiver extends ActiveMQMessageConsumer implements
/**
* @param theSession
* @param consumerId
* @param value
* @param destination
* @param messageSelector
* @param prefetch

View File

@ -214,7 +214,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
this.acknowledgementMode = acknowledgeMode;
this.asyncDispatch=asyncDispatch;
this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getSessionId());
this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
setTransactionContext(new TransactionContext(connection));
connection.addSession(this);
stats = new JMSSessionStatsImpl(producers, consumers);
@ -1433,7 +1433,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
/**
* Returns the session id.
*
* @return sessionId - session id.
* @return value - session id.
*/
protected SessionId getSessionId() {
return info.getSessionId();
@ -1601,7 +1601,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws JMSException
*/
protected SessionInfo getSessionInfo() throws JMSException {
SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getSessionId());
SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
return info;
}

View File

@ -101,7 +101,7 @@ public class ActiveMQTopicSubscriber extends ActiveMQMessageConsumer implements
/**
* @param theSession
* @param consumerId
* @param value
* @param dest
* @param name
* @param selector

View File

@ -20,7 +20,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.*;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.Service;
@ -28,6 +28,7 @@ import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
@ -61,8 +62,6 @@ import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.InvalidClientIDException;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
@ -84,6 +83,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
private WireFormatInfo wireFormatInfo;
protected boolean disposed=false;
protected boolean shuttingDown=false;
static class ConnectionState extends org.apache.activemq.state.ConnectionState {
@ -138,7 +138,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
if( disposed)
return;
disposed=true;
disposed=true;
//
// Remove all logical connection associated with this connection
// from the broker.
@ -150,6 +150,8 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
} catch (Throwable ignore) {
}
}
shuttingDown=false;
}
public void serviceTransportException(IOException e) {
@ -163,12 +165,17 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
public void serviceException(Throwable e) {
if( !disposed ) {
if( !disposed && !shuttingDown ) {
shuttingDown=true;
if( log.isDebugEnabled() )
log.debug("Async error occurred: "+e,e);
// TODO: think about how to handle this. Should we send the error down to the client
// so that he can report it to a registered error listener?
// Should we terminate the connection?
ConnectionError ce = new ConnectionError();
ce.setException(e);
dispatchAsync(ce);
try {
stop();
} catch (Exception ignore) {
}
}
}
@ -238,6 +245,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
public Response processShutdown(ShutdownInfo info) throws Throwable {
shuttingDown=true;
stop();
return null;
}

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
@ -70,6 +71,12 @@ public class TransportConnection extends AbstractConnection {
}
public void stop() throws Exception {
try {
transport.oneway(new ShutdownInfo());
} catch (IOException ignore) {
}
transport.stop();
active = false;
super.stop();

View File

@ -233,7 +233,7 @@ public class TransportConnector implements Connector {
if (broker == null) {
throw new IllegalArgumentException("You must specify the broker property. Maybe this connector should be added to a broker?");
}
return TransportFactory.bind(broker.getBrokerId().getBrokerId(),uri);
return TransportFactory.bind(broker.getBrokerId().getValue(),uri);
}
public DiscoveryAgent getDiscoveryAgent() throws IOException {

View File

@ -36,7 +36,7 @@ public class ActiveMQTempQueue extends ActiveMQTempDestination implements Tempor
}
public ActiveMQTempQueue(ConnectionId connectionId, long sequenceId) {
super(connectionId.getConnectionId(), sequenceId);
super(connectionId.getValue(), sequenceId);
}
public byte getDataStructureType() {

View File

@ -36,7 +36,7 @@ public class ActiveMQTempTopic extends ActiveMQTempDestination implements Tempor
}
public ActiveMQTempTopic(ConnectionId connectionId, long sequenceId) {
super(connectionId.getConnectionId(), sequenceId);
super(connectionId.getValue(), sequenceId);
}

View File

@ -25,17 +25,17 @@ package org.apache.activemq.command;
public class BrokerId implements DataStructure {
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.BROKER_ID;
protected String brokerId;
protected String value;
public BrokerId() {
}
public BrokerId(String brokerId) {
this.brokerId = brokerId;
this.value = brokerId;
}
public int hashCode() {
return brokerId.hashCode();
return value.hashCode();
}
public boolean equals(Object o) {
@ -44,7 +44,7 @@ public class BrokerId implements DataStructure {
if( o == null || o.getClass()!=BrokerId.class )
return false;
BrokerId id = (BrokerId) o;
return brokerId.equals(id.brokerId);
return value.equals(id.value);
}
public byte getDataStructureType() {
@ -52,17 +52,17 @@ public class BrokerId implements DataStructure {
}
public String toString() {
return brokerId;
return value;
}
/**
* @openwire:property version=1
*/
public String getBrokerId() {
return brokerId;
public String getValue() {
return value;
}
public void setBrokerId(String brokerId) {
this.brokerId = brokerId;
public void setValue(String brokerId) {
this.value = brokerId;
}
public boolean isMarshallAware() {

View File

@ -46,6 +46,7 @@ public interface CommandTypes {
byte REMOVE_INFO = 12;
byte CONTROL_COMMAND = 14;
byte FLUSH_COMMAND = 15;
byte CONNECTION_ERROR = 16;
///////////////////////////////////////////////////
//

View File

@ -0,0 +1,63 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.command;
import org.apache.activemq.state.CommandVisitor;
/**
*
* @openwire:marshaller code="16"
* @version $Revision$
*/
public class ConnectionError extends BaseCommand {
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.CONNECTION_ERROR;
protected ConnectionId connectionId;
Throwable exception;
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
public Response visit(CommandVisitor visitor) throws Throwable {
return null;
}
/**
* @openwire:property version=1
*/
public Throwable getException() {
return exception;
}
public void setException(Throwable exception) {
this.exception = exception;
}
/**
* @openwire:property version=1
*/
public ConnectionId getConnectionId() {
return connectionId;
}
public void setConnectionId(ConnectionId connectionId) {
this.connectionId = connectionId;
}
}

View File

@ -26,33 +26,33 @@ public class ConnectionId implements DataStructure {
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.CONNECTION_ID;
protected String connectionId;
protected String value;
public ConnectionId() {
}
public ConnectionId(String connectionId) {
this.connectionId = connectionId;
this.value = connectionId;
}
public ConnectionId(ConnectionId id) {
this.connectionId = id.getConnectionId();
this.value = id.getValue();
}
public ConnectionId(SessionId id) {
this.connectionId = id.getConnectionId();
this.value = id.getConnectionId();
}
public ConnectionId(ProducerId id) {
this.connectionId = id.getConnectionId();
this.value = id.getConnectionId();
}
public ConnectionId(ConsumerId id) {
this.connectionId = id.getConnectionId();
this.value = id.getConnectionId();
}
public int hashCode() {
return connectionId.hashCode();
return value.hashCode();
}
public boolean equals(Object o) {
@ -61,7 +61,7 @@ public class ConnectionId implements DataStructure {
if( o == null || o.getClass()!=ConnectionId.class )
return false;
ConnectionId id = (ConnectionId) o;
return connectionId.equals(id.connectionId);
return value.equals(id.value);
}
public byte getDataStructureType() {
@ -69,17 +69,17 @@ public class ConnectionId implements DataStructure {
}
public String toString() {
return connectionId;
return value;
}
/**
* @openwire:property version=1
*/
public String getConnectionId() {
return connectionId;
public String getValue() {
return value;
}
public void setConnectionId(String connectionId) {
this.connectionId = connectionId;
public void setValue(String connectionId) {
this.value = connectionId;
}
public boolean isMarshallAware() {

View File

@ -28,7 +28,7 @@ public class ConsumerId implements DataStructure {
protected String connectionId;
protected long sessionId;
protected long consumerId;
protected long value;
protected transient int hashCode;
protected transient String key;
@ -39,14 +39,14 @@ public class ConsumerId implements DataStructure {
public ConsumerId(SessionId sessionId, long consumerId) {
this.connectionId = sessionId.getConnectionId();
this.sessionId = sessionId.getSessionId();
this.consumerId=consumerId;
this.sessionId = sessionId.getValue();
this.value=consumerId;
}
public ConsumerId(ConsumerId id) {
this.connectionId = id.getConnectionId();
this.sessionId = id.getSessionId();
this.consumerId=id.getConsumerId();
this.value=id.getValue();
}
public SessionId getParentId() {
@ -58,7 +58,7 @@ public class ConsumerId implements DataStructure {
public int hashCode() {
if( hashCode == 0 ) {
hashCode = connectionId.hashCode() ^ (int)sessionId ^ (int)consumerId;
hashCode = connectionId.hashCode() ^ (int)sessionId ^ (int)value;
}
return hashCode;
}
@ -70,7 +70,7 @@ public class ConsumerId implements DataStructure {
return false;
ConsumerId id = (ConsumerId) o;
return sessionId==id.sessionId
&& consumerId==id.consumerId
&& value==id.value
&& connectionId.equals(id.connectionId);
}
@ -80,7 +80,7 @@ public class ConsumerId implements DataStructure {
public String toString() {
if( key==null ) {
key = connectionId+":"+sessionId+":"+consumerId;
key = connectionId+":"+sessionId+":"+value;
}
return key;
}
@ -109,11 +109,11 @@ public class ConsumerId implements DataStructure {
/**
* @openwire:property version=1
*/
public long getConsumerId() {
return consumerId;
public long getValue() {
return value;
}
public void setConsumerId(long consumerId) {
this.consumerId = consumerId;
public void setValue(long consumerId) {
this.value = consumerId;
}
public boolean isMarshallAware() {

View File

@ -49,7 +49,7 @@ public class ConsumerInfo extends BaseCommand {
protected BrokerId[] brokerPath;
protected transient BooleanExpression additionalPredicate;
protected transient boolean networkSubscription; //this subscription orginated from a network connection
protected transient boolean networkSubscription; //this subscription originated from a network connection
public ConsumerInfo() {
}

View File

@ -34,6 +34,9 @@ public class DiscoveryEvent implements DataStructure {
protected String serviceName;
protected String brokerName;
public DiscoveryEvent() {
}
public DiscoveryEvent(String serviceName) {
this.serviceName = serviceName;
}

View File

@ -25,7 +25,7 @@ public class LocalTransactionId extends TransactionId {
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.ACTIVEMQ_LOCAL_TRANSACTION_ID;
protected ConnectionId connectionId;
protected long transactionId;
protected long value;
private transient String transactionKey;
private transient int hashCode;
@ -35,7 +35,7 @@ public class LocalTransactionId extends TransactionId {
public LocalTransactionId(ConnectionId connectionId, long transactionId) {
this.connectionId=connectionId;
this.transactionId=transactionId;
this.value=transactionId;
}
public byte getDataStructureType() {
@ -52,7 +52,7 @@ public class LocalTransactionId extends TransactionId {
public String getTransactionKey() {
if( transactionKey==null ) {
transactionKey = "TX:"+connectionId+":"+transactionId;
transactionKey = "TX:"+connectionId+":"+value;
}
return transactionKey;
}
@ -63,7 +63,7 @@ public class LocalTransactionId extends TransactionId {
public int hashCode() {
if( hashCode == 0 ) {
hashCode = connectionId.hashCode() ^ (int)transactionId;
hashCode = connectionId.hashCode() ^ (int)value;
}
return hashCode;
}
@ -74,18 +74,18 @@ public class LocalTransactionId extends TransactionId {
if( o == null || o.getClass()!=LocalTransactionId.class )
return false;
LocalTransactionId tx = (LocalTransactionId) o;
return transactionId==tx.transactionId
return value==tx.value
&& connectionId.equals(tx.connectionId);
}
/**
* @openwire:property version=1
*/
public long getTransactionId() {
return transactionId;
public long getValue() {
return value;
}
public void setTransactionId(long transactionId) {
this.transactionId = transactionId;
public void setValue(long transactionId) {
this.value = transactionId;
}
/**

View File

@ -38,7 +38,7 @@ public class ProducerId implements DataStructure {
public ProducerId(SessionId sessionId, long producerId) {
this.connectionId = sessionId.getConnectionId();
this.sessionId = sessionId.getSessionId();
this.sessionId = sessionId.getValue();
this.producerId=producerId;
}
@ -88,13 +88,13 @@ public class ProducerId implements DataStructure {
* @param sessionKey
*/
private void setProducerSessionKey(String sessionKey) {
// Parse off the sessionId
// Parse off the value
int p = sessionKey.lastIndexOf(":");
if( p >= 0 ) {
sessionId = Long.parseLong(sessionKey.substring(p+1));
sessionKey = sessionKey.substring(0,p);
}
// The rest is the connectionId
// The rest is the value
connectionId = sessionKey;
}

View File

@ -26,7 +26,7 @@ public class SessionId implements DataStructure {
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.SESSION_ID;
protected String connectionId;
protected long sessionId;
protected long value;
protected transient int hashCode;
protected transient String key;
@ -36,23 +36,23 @@ public class SessionId implements DataStructure {
}
public SessionId(ConnectionId connectionId, long sessionId) {
this.connectionId = connectionId.getConnectionId();
this.sessionId=sessionId;
this.connectionId = connectionId.getValue();
this.value=sessionId;
}
public SessionId(SessionId id) {
this.connectionId = id.getConnectionId();
this.sessionId=id.getSessionId();
this.value=id.getValue();
}
public SessionId(ProducerId id) {
this.connectionId = id.getConnectionId();
this.sessionId=id.getSessionId();
this.value=id.getSessionId();
}
public SessionId(ConsumerId id) {
this.connectionId = id.getConnectionId();
this.sessionId=id.getSessionId();
this.value=id.getSessionId();
}
public ConnectionId getParentId() {
@ -64,7 +64,7 @@ public class SessionId implements DataStructure {
public int hashCode() {
if( hashCode == 0 ) {
hashCode = connectionId.hashCode() ^ (int)sessionId;
hashCode = connectionId.hashCode() ^ (int)value;
}
return hashCode;
}
@ -75,7 +75,7 @@ public class SessionId implements DataStructure {
if( o == null || o.getClass()!=SessionId.class )
return false;
SessionId id = (SessionId) o;
return sessionId==id.sessionId
return value==id.value
&& connectionId.equals(id.connectionId);
}
@ -96,16 +96,16 @@ public class SessionId implements DataStructure {
/**
* @openwire:property version=1
*/
public long getSessionId() {
return sessionId;
public long getValue() {
return value;
}
public void setSessionId(long sessionId) {
this.sessionId = sessionId;
public void setValue(long sessionId) {
this.value = sessionId;
}
public String toString() {
if( key==null ) {
key = connectionId+":"+sessionId;
key = connectionId+":"+value;
}
return key;
}

View File

@ -64,7 +64,7 @@ public class BrokerIdMarshaller extends org.apache.activemq.openwire.DataStreamM
super.unmarshal(wireFormat, o, dataIn, bs);
BrokerId info = (BrokerId)o;
info.setBrokerId(readString(dataIn, bs));
info.setValue(readString(dataIn, bs));
}
@ -77,7 +77,7 @@ public class BrokerIdMarshaller extends org.apache.activemq.openwire.DataStreamM
BrokerId info = (BrokerId)o;
int rc = super.marshal1(wireFormat, o, bs);
rc += writeString(info.getBrokerId(), bs);
rc += writeString(info.getValue(), bs);
return rc+0;
}
@ -93,7 +93,7 @@ public class BrokerIdMarshaller extends org.apache.activemq.openwire.DataStreamM
super.marshal2(wireFormat, o, dataOut, bs);
BrokerId info = (BrokerId)o;
writeString(info.getBrokerId(), dataOut, bs);
writeString(info.getValue(), dataOut, bs);
}
}

View File

@ -0,0 +1,102 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.openwire.v1;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.activemq.openwire.*;
import org.apache.activemq.command.*;
/**
* Marshalling code for Open Wire Format for ConnectionError
*
*
* NOTE!: This file is auto generated - do not modify!
* if you need to make a change, please see the modify the groovy scripts in the
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
* @version $Revision$
*/
public class ConnectionErrorMarshaller extends BaseCommandMarshaller {
/**
* Return the type of Data Structure we marshal
* @return short representation of the type data structure
*/
public byte getDataStructureType() {
return ConnectionError.DATA_STRUCTURE_TYPE;
}
/**
* @return a new object instance
*/
public DataStructure createObject() {
return new ConnectionError();
}
/**
* Un-marshal an object instance from the data input stream
*
* @param o the object to un-marshal
* @param dataIn the data input stream to build the object from
* @throws IOException
*/
public void unmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn, BooleanStream bs) throws IOException {
super.unmarshal(wireFormat, o, dataIn, bs);
ConnectionError info = (ConnectionError)o;
info.setException((java.lang.Throwable) unmarsalThrowable(wireFormat, dataIn, bs));
info.setConnectionId((org.apache.activemq.command.ConnectionId) unmarsalNestedObject(wireFormat, dataIn, bs));
}
/**
* Write the booleans that this object uses to a BooleanStream
*/
public int marshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException {
ConnectionError info = (ConnectionError)o;
int rc = super.marshal1(wireFormat, o, bs);
rc += marshalThrowable(wireFormat, info.getException(), bs);
rc += marshal1NestedObject(wireFormat, info.getConnectionId(), bs);
return rc+0;
}
/**
* Write a object instance to data output stream
*
* @param o the instance to be marshaled
* @param dataOut the output stream
* @throws IOException thrown if an error occurs
*/
public void marshal2(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut, BooleanStream bs) throws IOException {
super.marshal2(wireFormat, o, dataOut, bs);
ConnectionError info = (ConnectionError)o;
marshalThrowable(wireFormat, info.getException(), dataOut, bs);
marshal2NestedObject(wireFormat, info.getConnectionId(), dataOut, bs);
}
}

View File

@ -64,7 +64,7 @@ public class ConnectionIdMarshaller extends org.apache.activemq.openwire.DataStr
super.unmarshal(wireFormat, o, dataIn, bs);
ConnectionId info = (ConnectionId)o;
info.setConnectionId(readString(dataIn, bs));
info.setValue(readString(dataIn, bs));
}
@ -77,7 +77,7 @@ public class ConnectionIdMarshaller extends org.apache.activemq.openwire.DataStr
ConnectionId info = (ConnectionId)o;
int rc = super.marshal1(wireFormat, o, bs);
rc += writeString(info.getConnectionId(), bs);
rc += writeString(info.getValue(), bs);
return rc+0;
}
@ -93,7 +93,7 @@ public class ConnectionIdMarshaller extends org.apache.activemq.openwire.DataStr
super.marshal2(wireFormat, o, dataOut, bs);
ConnectionId info = (ConnectionId)o;
writeString(info.getConnectionId(), dataOut, bs);
writeString(info.getValue(), dataOut, bs);
}
}

View File

@ -66,7 +66,7 @@ public class ConsumerIdMarshaller extends org.apache.activemq.openwire.DataStrea
ConsumerId info = (ConsumerId)o;
info.setConnectionId(readString(dataIn, bs));
info.setSessionId(unmarshalLong(wireFormat, dataIn, bs));
info.setConsumerId(unmarshalLong(wireFormat, dataIn, bs));
info.setValue(unmarshalLong(wireFormat, dataIn, bs));
}
@ -81,7 +81,7 @@ public class ConsumerIdMarshaller extends org.apache.activemq.openwire.DataStrea
int rc = super.marshal1(wireFormat, o, bs);
rc += writeString(info.getConnectionId(), bs);
rc+=marshal1Long(wireFormat, info.getSessionId(), bs);
rc+=marshal1Long(wireFormat, info.getConsumerId(), bs);
rc+=marshal1Long(wireFormat, info.getValue(), bs);
return rc+0;
}
@ -99,7 +99,7 @@ public class ConsumerIdMarshaller extends org.apache.activemq.openwire.DataStrea
ConsumerId info = (ConsumerId)o;
writeString(info.getConnectionId(), dataOut, bs);
marshal2Long(wireFormat, info.getSessionId(), dataOut, bs);
marshal2Long(wireFormat, info.getConsumerId(), dataOut, bs);
marshal2Long(wireFormat, info.getValue(), dataOut, bs);
}
}

View File

@ -64,7 +64,7 @@ public class LocalTransactionIdMarshaller extends TransactionIdMarshaller {
super.unmarshal(wireFormat, o, dataIn, bs);
LocalTransactionId info = (LocalTransactionId)o;
info.setTransactionId(unmarshalLong(wireFormat, dataIn, bs));
info.setValue(unmarshalLong(wireFormat, dataIn, bs));
info.setConnectionId((org.apache.activemq.command.ConnectionId) unmarsalCachedObject(wireFormat, dataIn, bs));
}
@ -78,7 +78,7 @@ public class LocalTransactionIdMarshaller extends TransactionIdMarshaller {
LocalTransactionId info = (LocalTransactionId)o;
int rc = super.marshal1(wireFormat, o, bs);
rc+=marshal1Long(wireFormat, info.getTransactionId(), bs);
rc+=marshal1Long(wireFormat, info.getValue(), bs);
rc += marshal1CachedObject(wireFormat, info.getConnectionId(), bs);
return rc+0;
@ -95,7 +95,7 @@ public class LocalTransactionIdMarshaller extends TransactionIdMarshaller {
super.marshal2(wireFormat, o, dataOut, bs);
LocalTransactionId info = (LocalTransactionId)o;
marshal2Long(wireFormat, info.getTransactionId(), dataOut, bs);
marshal2Long(wireFormat, info.getValue(), dataOut, bs);
marshal2CachedObject(wireFormat, info.getConnectionId(), dataOut, bs);
}

View File

@ -55,6 +55,7 @@ public class MarshallerFactory {
add(new JournalQueueAckMarshaller());
add(new WireFormatInfoMarshaller());
add(new ResponseMarshaller());
add(new ConnectionErrorMarshaller());
add(new ActiveMQObjectMessageMarshaller());
add(new ConsumerInfoMarshaller());
add(new ConnectionIdMarshaller());

View File

@ -65,7 +65,7 @@ public class SessionIdMarshaller extends org.apache.activemq.openwire.DataStream
SessionId info = (SessionId)o;
info.setConnectionId(readString(dataIn, bs));
info.setSessionId(unmarshalLong(wireFormat, dataIn, bs));
info.setValue(unmarshalLong(wireFormat, dataIn, bs));
}
@ -79,7 +79,7 @@ public class SessionIdMarshaller extends org.apache.activemq.openwire.DataStream
int rc = super.marshal1(wireFormat, o, bs);
rc += writeString(info.getConnectionId(), bs);
rc+=marshal1Long(wireFormat, info.getSessionId(), bs);
rc+=marshal1Long(wireFormat, info.getValue(), bs);
return rc+0;
}
@ -96,7 +96,7 @@ public class SessionIdMarshaller extends org.apache.activemq.openwire.DataStream
SessionId info = (SessionId)o;
writeString(info.getConnectionId(), dataOut, bs);
marshal2Long(wireFormat, info.getSessionId(), dataOut, bs);
marshal2Long(wireFormat, info.getValue(), dataOut, bs);
}
}

View File

@ -62,7 +62,7 @@ public class SimpleAuthorizationBroker extends BrokerFilter implements SecurityA
// You don't need to be an admin to create temp destinations.
if( !destination.isTemporary()
|| !((ActiveMQTempDestination)destination).getConnectionId().equals(context.getConnectionId().getConnectionId()) ) {
|| !((ActiveMQTempDestination)destination).getConnectionId().equals(context.getConnectionId().getValue()) ) {
Set allowedACLs = adminACLs.get(destination);
if(allowedACLs!=null && !securityContext.isInOneOf(allowedACLs))
@ -80,7 +80,7 @@ public class SimpleAuthorizationBroker extends BrokerFilter implements SecurityA
// You don't need to be an admin to remove temp destinations.
if( !destination.isTemporary()
|| !((ActiveMQTempDestination)destination).getConnectionId().equals(context.getConnectionId().getConnectionId()) ) {
|| !((ActiveMQTempDestination)destination).getConnectionId().equals(context.getConnectionId().getValue()) ) {
Set allowedACLs = adminACLs.get(destination);
if(allowedACLs!=null && !securityContext.isInOneOf(allowedACLs))

View File

@ -54,7 +54,7 @@ public class DiscoveryTransportFactory extends FailoverTransportFactory {
// +components.length);
// }
// Map parameters=new HashMap(compositData.getParameters());
// DiscoveryTransportServer server=new DiscoveryTransportServer(TransportFactory.bind(brokerId,components[0]));
// DiscoveryTransportServer server=new DiscoveryTransportServer(TransportFactory.bind(value,components[0]));
// IntrospectionSupport.setProperties(server,parameters,"discovery");
// DiscoveryAgent discoveryAgent=DiscoveryAgentFactory.createDiscoveryAgent(server.getDiscovery());
// // Use the host name to configure the group of the discovery agent.
@ -64,7 +64,7 @@ public class DiscoveryTransportFactory extends FailoverTransportFactory {
// }
// }
// if(!parameters.containsKey("discovery.brokerName")){
// parameters.put("discovery.brokerName",brokerId);
// parameters.put("discovery.brokerName",value);
// }
// IntrospectionSupport.setProperties(discoveryAgent,parameters,"discovery.");
// server.setDiscoveryAgent(discoveryAgent);

View File

@ -104,7 +104,7 @@ public class ClientTestSupport extends TestCase {
protected ConnectionInfo createConnectionInfo() throws Throwable {
ConnectionInfo info = new ConnectionInfo();
info.setConnectionId(new ConnectionId("connection:"+(++idGenerator)));
info.setClientId( info.getConnectionId().getConnectionId() );
info.setClientId( info.getConnectionId().getValue() );
return info;
}

View File

@ -117,7 +117,7 @@ public class BrokerTestSupport extends CombinationTestSupport {
protected ConnectionInfo createConnectionInfo() throws Throwable {
ConnectionInfo info = new ConnectionInfo();
info.setConnectionId(new ConnectionId("connection:"+(++idGenerator)));
info.setClientId( info.getConnectionId().getConnectionId() );
info.setClientId( info.getConnectionId().getValue() );
return info;
}

View File

@ -73,7 +73,7 @@ public class NetworkTestSupport extends BrokerTestSupport {
}
/**
* @param brokerId
* @param value
* @return
* @throws Exception
* @throws IOException