Clean up some test client code.
This commit is contained in:
Timothy Bish 2015-11-23 16:55:11 -05:00
parent bc9edf00d1
commit d9e22a9368
4 changed files with 102 additions and 128 deletions

View File

@ -19,9 +19,6 @@ package org.apache.activemq.transport.amqp.client;
import java.io.IOException;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Endpoint;
import org.apache.qpid.proton.engine.EndpointState;
import org.slf4j.Logger;
@ -142,10 +139,7 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
@Override
public void remotelyClosed(AmqpConnection connection) {
Exception error = getRemoteError();
if (error == null) {
error = new IOException("Remote has closed without error information");
}
Exception error = AmqpSupport.convertToException(getEndpoint().getRemoteCondition());
if (endpoint != null) {
// TODO: if this is a producer/consumer link then we may only be detached,
@ -192,40 +186,10 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
return getEndpoint().getRemoteState();
}
@Override
public boolean hasRemoteError() {
return getEndpoint().getRemoteCondition().getCondition() != null;
}
@Override
public Exception getRemoteError() {
String message = getRemoteErrorMessage();
Exception remoteError = null;
Symbol error = getEndpoint().getRemoteCondition().getCondition();
if (error != null) {
if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) {
remoteError = new SecurityException(message);
} else {
remoteError = new Exception(message);
}
}
return remoteError;
}
@Override
public String getRemoteErrorMessage() {
String message = "Received unkown error from remote peer";
if (getEndpoint().getRemoteCondition() != null) {
ErrorCondition error = getEndpoint().getRemoteCondition();
if (error.getDescription() != null && !error.getDescription().isEmpty()) {
message = error.getDescription();
}
}
return message;
}
@Override
public void processRemoteOpen(AmqpConnection connection) throws IOException {
doOpenInspection();
@ -254,7 +218,7 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
LOG.warn("Open of {} failed: ", this);
Exception openError;
if (hasRemoteError()) {
openError = getRemoteError();
openError = AmqpSupport.convertToException(getEndpoint().getRemoteCondition());
} else {
openError = getOpenAbortException();
}

View File

@ -594,43 +594,43 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
LOG.trace("New Proton Event: {}", protonEvent.getType());
}
AmqpResource amqpResource = null;
AmqpEventSink amqpEventSink = null;
switch (protonEvent.getType()) {
case CONNECTION_REMOTE_CLOSE:
amqpResource = (AmqpConnection) protonEvent.getConnection().getContext();
amqpResource.processRemoteClose(this);
amqpEventSink = (AmqpEventSink) protonEvent.getConnection().getContext();
amqpEventSink.processRemoteClose(this);
break;
case CONNECTION_REMOTE_OPEN:
amqpResource = (AmqpConnection) protonEvent.getConnection().getContext();
amqpResource.processRemoteOpen(this);
amqpEventSink = (AmqpEventSink) protonEvent.getConnection().getContext();
amqpEventSink.processRemoteOpen(this);
break;
case SESSION_REMOTE_CLOSE:
amqpResource = (AmqpSession) protonEvent.getSession().getContext();
amqpResource.processRemoteClose(this);
amqpEventSink = (AmqpEventSink) protonEvent.getSession().getContext();
amqpEventSink.processRemoteClose(this);
break;
case SESSION_REMOTE_OPEN:
amqpResource = (AmqpSession) protonEvent.getSession().getContext();
amqpResource.processRemoteOpen(this);
amqpEventSink = (AmqpEventSink) protonEvent.getSession().getContext();
amqpEventSink.processRemoteOpen(this);
break;
case LINK_REMOTE_CLOSE:
amqpResource = (AmqpResource) protonEvent.getLink().getContext();
amqpResource.processRemoteClose(this);
amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
amqpEventSink.processRemoteClose(this);
break;
case LINK_REMOTE_DETACH:
amqpResource = (AmqpResource) protonEvent.getLink().getContext();
amqpResource.processRemoteDetach(this);
amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
amqpEventSink.processRemoteDetach(this);
break;
case LINK_REMOTE_OPEN:
amqpResource = (AmqpResource) protonEvent.getLink().getContext();
amqpResource.processRemoteOpen(this);
amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
amqpEventSink.processRemoteOpen(this);
break;
case LINK_FLOW:
amqpResource = (AmqpResource) protonEvent.getLink().getContext();
amqpResource.processFlowUpdates(this);
amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
amqpEventSink.processFlowUpdates(this);
break;
case DELIVERY:
amqpResource = (AmqpResource) protonEvent.getLink().getContext();
amqpResource.processDeliveryUpdates(this);
amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
amqpEventSink.processDeliveryUpdates(this);
break;
default:
break;

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client;
import java.io.IOException;
/**
* Interface used by classes that want to process AMQP events sent from
* the transport layer.
*/
public interface AmqpEventSink {
/**
* Event handler for remote peer open of this resource.
*
* @param connection
* the AmqpConnection instance for easier access to fire events.
*
* @throws IOException if an error occurs while processing the update.
*/
void processRemoteOpen(AmqpConnection connection) throws IOException;
/**
* Event handler for remote peer detach of this resource.
*
* @param connection
* the AmqpConnection instance for easier access to fire events.
*
* @throws IOException if an error occurs while processing the update.
*/
void processRemoteDetach(AmqpConnection connection) throws IOException;
/**
* Event handler for remote peer close of this resource.
*
* @param connection
* the AmqpConnection instance for easier access to fire events.
*
* @throws IOException if an error occurs while processing the update.
*/
void processRemoteClose(AmqpConnection connection) throws IOException;
/**
* Called when the Proton Engine signals an Delivery related event has been triggered
* for the given endpoint.
*
* @param connection
* the AmqpConnection instance for easier access to fire events.
*
* @throws IOException if an error occurs while processing the update.
*/
void processDeliveryUpdates(AmqpConnection connection) throws IOException;
/**
* Called when the Proton Engine signals an Flow related event has been triggered
* for the given endpoint.
*
* @param connection
* the AmqpConnection instance for easier access to fire events.
*
* @throws IOException if an error occurs while processing the update.
*/
void processFlowUpdates(AmqpConnection connection) throws IOException;
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -16,8 +16,6 @@
*/
package org.apache.activemq.transport.amqp.client;
import java.io.IOException;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
/**
@ -26,7 +24,7 @@ import org.apache.activemq.transport.amqp.client.util.AsyncResult;
* All AMQP types should implement this interface to allow for control of state
* and configuration details.
*/
public interface AmqpResource {
public interface AmqpResource extends AmqpEventSink {
/**
* Perform all the work needed to open this resource and store the request
@ -102,71 +100,4 @@ public interface AmqpResource {
*/
void failed(Exception cause);
/**
* Event handler for remote peer open of this resource.
*
* @param connection
* The connection that owns this resource.
*
* @throws IOException if an error occurs while processing the update.
*/
void processRemoteOpen(AmqpConnection connection) throws IOException;
/**
* Event handler for remote peer detach of this resource.
*
* @param connection
* The connection that owns this resource.
*
* @throws IOException if an error occurs while processing the update.
*/
void processRemoteDetach(AmqpConnection connection) throws IOException;
/**
* Event handler for remote peer close of this resource.
*
* @param connection
* The connection that owns this resource.
*
* @throws IOException if an error occurs while processing the update.
*/
void processRemoteClose(AmqpConnection connection) throws IOException;
/**
* Called when the Proton Engine signals an Delivery related event has been triggered
* for the given endpoint.
*
* @param connection
* The connection that owns this resource.
*
* @throws IOException if an error occurs while processing the update.
*/
void processDeliveryUpdates(AmqpConnection connection) throws IOException;
/**
* Called when the Proton Engine signals an Flow related event has been triggered
* for the given endpoint.
*
* @param connection
* The connection that owns this resource.
*
* @throws IOException if an error occurs while processing the update.
*/
void processFlowUpdates(AmqpConnection connection) throws IOException;
/**
* @returns true if the remote end has sent an error
*/
boolean hasRemoteError();
/**
* @return an Exception derived from the error state of the endpoint's Remote Condition.
*/
Exception getRemoteError();
/**
* @return an Error message derived from the error state of the endpoint's Remote Condition.
*/
String getRemoteErrorMessage();
}