AMQP: Clean up old commented code, pick up the trace setting from the transport filter.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1418089 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-12-06 21:22:21 +00:00
parent 67252fe60d
commit 8c5c33bfc2
2 changed files with 6 additions and 215 deletions

View File

@ -65,39 +65,16 @@ class AmqpProtocolConverter {
private static final UnsignedInteger DURABLE = new UnsignedInteger(2);
private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
public AmqpProtocolConverter(AmqpTransport amqpTransport, BrokerContext brokerContext) {
this.amqpTransport = amqpTransport;
}
int prefetch = 100;
ReentrantLock lock = new ReentrantLock();
//
// private static final Buffer PING_RESP_FRAME = new PINGRESP().encode();
//
//
// private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
// private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
//
// private final ConcurrentHashMap<ConsumerId, AmqpSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, AmqpSubscription>();
// private final ConcurrentHashMap<UTF8Buffer, AmqpSubscription> amqpSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, AmqpSubscription>();
// private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>();
// private final Map<Destination, UTF8Buffer> amqpTopicMap = new LRUCache<Destination, UTF8Buffer>();
// private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>();
// private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>();
//
// private final AtomicBoolean connected = new AtomicBoolean(false);
// private CONNECT connect;
// private String clientId;
// private final String QOS_PROPERTY_NAME = "QoSPropertyName";
int prefetch = 100;
boolean trace = true;
TransportImpl protonTransport = new TransportImpl();
ConnectionImpl protonConnection = new ConnectionImpl();
{
public AmqpProtocolConverter(AmqpTransport transport, BrokerContext brokerContext) {
this.amqpTransport = transport;
this.protonTransport.bind(this.protonConnection);
if( trace ) {
if( transport.isTrace() ) {
this.protonTransport.setProtocolTracer(new ProtocolTracer() {
@Override
public void receivedFrame(TransportFrame transportFrame) {
@ -351,18 +328,6 @@ class AmqpProtocolConverter {
connectionInfo.setClientId(clientId);
}
// String userName = "";
// if (connect.userName() != null) {
// userName = connect.userName().toString();
// }
// String passswd = "";
// if (connect.password() != null) {
// passswd = connect.password().toString();
// }
// connectionInfo.setUserName(userName);
// connectionInfo.setPassword(passswd);
connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
sendToActiveMQ(connectionInfo, new ResponseHandler() {
@ -1020,182 +985,6 @@ class AmqpProtocolConverter {
return rc;
}
// void onUnSubscribe(UNSUBSCRIBE command) {
// UTF8Buffer[] topics = command.topics();
// if (topics != null) {
// for (int i = 0; i < topics.length; i++) {
// onUnSubscribe(topics[i]);
// }
// }
// UNSUBACK ack = new UNSUBACK();
// ack.messageId(command.messageId());
// pumpOut(ack.encode());
//
// }
//
// void onUnSubscribe(UTF8Buffer topicName) {
// AmqpSubscription subs = amqpSubscriptionByTopic.remove(topicName);
// if (subs != null) {
// ConsumerInfo info = subs.getConsumerInfo();
// if (info != null) {
// subscriptionsByConsumerId.remove(info.getConsumerId());
// }
// RemoveInfo removeInfo = info.createRemoveCommand();
// sendToActiveMQ(removeInfo, null);
// }
// }
//
//
// /**
// * Dispatch a ActiveMQ command
// */
//
//
//
// void onAMQPPublish(PUBLISH command) throws IOException, JMSException {
// checkConnected();
// }
//
// void onAMQPPubAck(PUBACK command) {
// short messageId = command.messageId();
// MessageAck ack;
// synchronized (consumerAcks) {
// ack = consumerAcks.remove(messageId);
// }
// if (ack != null) {
// amqpTransport.sendToActiveMQ(ack);
// }
// }
//
// void onAMQPPubRec(PUBREC commnand) {
// //from a subscriber - send a PUBREL in response
// PUBREL pubrel = new PUBREL();
// pubrel.messageId(commnand.messageId());
// pumpOut(pubrel.encode());
// }
//
// void onAMQPPubRel(PUBREL command) {
// PUBREC ack;
// synchronized (publisherRecs) {
// ack = publisherRecs.remove(command.messageId());
// }
// if (ack == null) {
// LOG.warn("Unknown PUBREL: " + command.messageId() + " received");
// }
// PUBCOMP pubcomp = new PUBCOMP();
// pubcomp.messageId(command.messageId());
// pumpOut(pubcomp.encode());
// }
//
// void onAMQPPubComp(PUBCOMP command) {
// short messageId = command.messageId();
// MessageAck ack;
// synchronized (consumerAcks) {
// ack = consumerAcks.remove(messageId);
// }
// if (ack != null) {
// amqpTransport.sendToActiveMQ(ack);
// }
// }
//
//
//
//
// public AmqpTransport amqpTransport {
// return amqpTransport;
// }
//
//
//
// void configureInactivityMonitor(short heartBeat) {
// try {
//
// int heartBeatMS = heartBeat * 1000;
// AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
// monitor.setProtocolConverter(this);
// monitor.setReadCheckTime(heartBeatMS);
// monitor.setInitialDelayTime(heartBeatMS);
// monitor.startMonitorThread();
//
// } catch (Exception ex) {
// LOG.warn("Failed to start AMQP InactivityMonitor ", ex);
// }
//
// LOG.debug(getClientId() + " AMQP Connection using heart beat of " + heartBeat + " secs");
// }
//
//
//
// void checkConnected() throws AmqpProtocolException {
// if (!connected.get()) {
// throw new AmqpProtocolException("Not connected.");
// }
// }
//
// private String getClientId() {
// if (clientId == null) {
// if (connect != null && connect.clientId() != null) {
// clientId = connect.clientId().toString();
// }
// } else {
// clientId = "";
// }
// return clientId;
// }
//
// private void stopTransport() {
// try {
// amqpTransport.stop();
// } catch (Throwable e) {
// LOG.debug("Failed to stop AMQP transport ", e);
// }
// }
//
// ResponseHandler createResponseHandler(final PUBLISH command) {
//
// if (command != null) {
// switch (command.qos()) {
// case AT_LEAST_ONCE:
// return new ResponseHandler() {
// public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
// if (response.isException()) {
// LOG.warn("Failed to send AMQP Publish: ", command, ((ExceptionResponse) response).getException());
// } else {
// PUBACK ack = new PUBACK();
// ack.messageId(command.messageId());
// converter.amqpTransport.sendToAmqp(ack.encode());
// }
// }
// };
// case EXACTLY_ONCE:
// return new ResponseHandler() {
// public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
// if (response.isException()) {
// LOG.warn("Failed to send AMQP Publish: ", command, ((ExceptionResponse) response).getException());
// } else {
// PUBREC ack = new PUBREC();
// ack.messageId(command.messageId());
// synchronized (publisherRecs) {
// publisherRecs.put(command.messageId(), ack);
// }
// converter.amqpTransport.sendToAmqp(ack.encode());
// }
// }
// };
// case AT_MOST_ONCE:
// break;
// }
// }
// return null;
// }
//
// private String convertAMQPToActiveMQ(String name) {
// String result = name.replace('#', '>');
// result = result.replace('+', '*');
// result = result.replace('/', '.');
// return result;
// }
////////////////////////////////////////////////////////////////////////////
//
// Implementation methods

View File

@ -45,4 +45,6 @@ public interface AmqpTransport {
public String getRemoteAddress();
public boolean isTrace();
}