Better protocol error handling.

Fixed http://issues.apache.org/activemq/browse/AMQ-649



git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@418602 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-07-02 15:00:13 +00:00
parent 1ac3421e36
commit 530884a2d9
7 changed files with 208 additions and 131 deletions

View File

@ -19,7 +19,6 @@ package org.apache.activemq.transport.stomp2;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.ProtocolException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
@ -87,13 +86,13 @@ public class ProtocolConverter {
} }
} }
protected ResponseHandler createResponseHandler(StompCommand command){ protected ResponseHandler createResponseHandler(StompFrame command){
final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
// A response may not be needed. // A response may not be needed.
if( receiptId != null ) { if( receiptId != null ) {
return new ResponseHandler() { return new ResponseHandler() {
public void onResponse(ProtocolConverter converter, Response response) throws IOException { public void onResponse(ProtocolConverter converter, Response response) throws IOException {
StompCommand sc = new StompCommand(); StompFrame sc = new StompFrame();
sc.setHeaders(new HashMap(5)); sc.setHeaders(new HashMap(5));
sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
transportFilter.sendToStomp(sc); transportFilter.sendToStomp(sc);
@ -112,7 +111,7 @@ public class ProtocolConverter {
transportFilter.sendToActiveMQ(command); transportFilter.sendToActiveMQ(command);
} }
protected void sendToStomp(StompCommand command) throws IOException { protected void sendToStomp(StompFrame command) throws IOException {
transportFilter.sendToStomp(command); transportFilter.sendToStomp(command);
} }
@ -120,9 +119,13 @@ public class ProtocolConverter {
* Convert a stomp command * Convert a stomp command
* @param command * @param command
*/ */
public void onStompCommad( StompCommand command ) throws IOException, JMSException { public void onStompCommad( StompFrame command ) throws IOException, JMSException {
try { try {
if( command.getClass() == StompFrameError.class ) {
throw ((StompFrameError)command).getException();
}
String action = command.getAction(); String action = command.getAction();
if (action.startsWith(Stomp.Commands.SEND)) if (action.startsWith(Stomp.Commands.SEND))
onStompSend(command); onStompSend(command);
@ -161,13 +164,15 @@ public class ProtocolConverter {
headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId); headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
} }
StompCommand errorMessage = new StompCommand(Stomp.Responses.ERROR,headers,baos.toByteArray()); StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR,headers,baos.toByteArray());
sendToStomp(errorMessage); sendToStomp(errorMessage);
if( e.isFatal() )
getTransportFilter().onException(e);
} }
} }
protected void onStompSend(StompCommand command) throws IOException, JMSException { protected void onStompSend(StompFrame command) throws IOException, JMSException {
checkConnected(); checkConnected();
Map headers = command.getHeaders(); Map headers = command.getHeaders();
@ -193,7 +198,7 @@ public class ProtocolConverter {
} }
protected void onStompAck(StompCommand command) throws ProtocolException { protected void onStompAck(StompFrame command) throws ProtocolException {
checkConnected(); checkConnected();
// TODO: acking with just a message id is very bogus // TODO: acking with just a message id is very bogus
@ -231,7 +236,7 @@ public class ProtocolConverter {
} }
protected void onStompBegin(StompCommand command) throws ProtocolException { protected void onStompBegin(StompFrame command) throws ProtocolException {
checkConnected(); checkConnected();
Map headers = command.getHeaders(); Map headers = command.getHeaders();
@ -258,7 +263,7 @@ public class ProtocolConverter {
} }
protected void onStompCommit(StompCommand command) throws ProtocolException { protected void onStompCommit(StompFrame command) throws ProtocolException {
checkConnected(); checkConnected();
Map headers = command.getHeaders(); Map headers = command.getHeaders();
@ -283,7 +288,7 @@ public class ProtocolConverter {
sendToActiveMQ(tx, createResponseHandler(command)); sendToActiveMQ(tx, createResponseHandler(command));
} }
protected void onStompAbort(StompCommand command) throws ProtocolException { protected void onStompAbort(StompFrame command) throws ProtocolException {
checkConnected(); checkConnected();
Map headers = command.getHeaders(); Map headers = command.getHeaders();
@ -308,7 +313,7 @@ public class ProtocolConverter {
} }
protected void onStompSubscribe(StompCommand command) throws ProtocolException { protected void onStompSubscribe(StompFrame command) throws ProtocolException {
checkConnected(); checkConnected();
Map headers = command.getHeaders(); Map headers = command.getHeaders();
@ -343,7 +348,7 @@ public class ProtocolConverter {
} }
protected void onStompUnsubscribe(StompCommand command) throws ProtocolException { protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
checkConnected(); checkConnected();
Map headers = command.getHeaders(); Map headers = command.getHeaders();
@ -375,7 +380,7 @@ public class ProtocolConverter {
throw new ProtocolException("No subscription matched."); throw new ProtocolException("No subscription matched.");
} }
protected void onStompConnect(StompCommand command) throws ProtocolException { protected void onStompConnect(StompFrame command) throws ProtocolException {
if(connected.get()) { if(connected.get()) {
throw new ProtocolException("Allready connected."); throw new ProtocolException("Allready connected.");
@ -422,7 +427,7 @@ public class ProtocolConverter {
responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId); responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
} }
StompCommand sc = new StompCommand(); StompFrame sc = new StompFrame();
sc.setAction(Stomp.Responses.CONNECTED); sc.setAction(Stomp.Responses.CONNECTED);
sc.setHeaders(responseHeaders); sc.setHeaders(responseHeaders);
sendToStomp(sc); sendToStomp(sc);
@ -434,7 +439,7 @@ public class ProtocolConverter {
} }
protected void onStompDisconnect(StompCommand command) throws ProtocolException { protected void onStompDisconnect(StompFrame command) throws ProtocolException {
checkConnected(); checkConnected();
sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command)); sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
connected.set(false); connected.set(false);
@ -473,7 +478,7 @@ public class ProtocolConverter {
} }
public ActiveMQMessage convertMessage(StompCommand command) throws IOException, JMSException { public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
Map headers = command.getHeaders(); Map headers = command.getHeaders();
// now the body // now the body
@ -488,7 +493,7 @@ public class ProtocolConverter {
try { try {
text.setText(new String(command.getContent(), "UTF-8")); text.setText(new String(command.getContent(), "UTF-8"));
} catch (Throwable e) { } catch (Throwable e) {
throw (ProtocolException)new ProtocolException("Text could not bet set: "+e).initCause(e); throw new ProtocolException("Text could not bet set: "+e, false, e);
} }
msg = text; msg = text;
} }
@ -530,9 +535,9 @@ public class ProtocolConverter {
return msg; return msg;
} }
public StompCommand convertMessage(ActiveMQMessage message) throws IOException, JMSException { public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException {
StompCommand command = new StompCommand(); StompFrame command = new StompFrame();
command.setAction(Stomp.Responses.MESSAGE); command.setAction(Stomp.Responses.MESSAGE);
HashMap headers = new HashMap(); HashMap headers = new HashMap();
@ -620,8 +625,4 @@ public class ProtocolConverter {
this.transportFilter = transportFilter; this.transportFilter = transportFilter;
} }
public void onStompExcepton(IOException error) {
// TODO Auto-generated method stub
}
} }

View File

@ -0,0 +1,50 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp2;
import java.io.IOException;
/**
*
* @author <a href="http://hiramchirino.com">chirino</a>
*/
public class ProtocolException extends IOException {
private static final long serialVersionUID = -2869735532997332242L;
private final boolean fatal;
public ProtocolException() {
this(null);
}
public ProtocolException(String s) {
this(s, false);
}
public ProtocolException(String s, boolean fatal) {
this(s,fatal, null);
}
public ProtocolException(String s, boolean fatal, Throwable cause) {
super(s);
this.fatal = fatal;
initCause(cause);
}
public boolean isFatal() {
return fatal;
}
}

View File

@ -30,7 +30,7 @@ import org.apache.activemq.state.CommandVisitor;
* *
* @author <a href="http://hiramchirino.com">chirino</a> * @author <a href="http://hiramchirino.com">chirino</a>
*/ */
public class StompCommand implements Command { public class StompFrame implements Command {
private static final byte[] NO_DATA = new byte[]{}; private static final byte[] NO_DATA = new byte[]{};
@ -38,13 +38,13 @@ public class StompCommand implements Command {
private Map headers = Collections.EMPTY_MAP; private Map headers = Collections.EMPTY_MAP;
private byte[] content = NO_DATA; private byte[] content = NO_DATA;
public StompCommand(String command, HashMap headers, byte[] data) { public StompFrame(String command, HashMap headers, byte[] data) {
this.action = command; this.action = command;
this.headers = headers; this.headers = headers;
this.content = data; this.content = data;
} }
public StompCommand() { public StompFrame() {
} }
public String getAction() { public String getAction() {

View File

@ -0,0 +1,38 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp2;
/**
* Command indicating that an invalid Stomp Frame was received.
*
* @author <a href="http://hiramchirino.com">chirino</a>
*/
public class StompFrameError extends StompFrame {
private final ProtocolException exception;
public StompFrameError(ProtocolException exception) {
this.exception = exception;
}
public ProtocolException getException() {
return exception;
}
}

View File

@ -69,7 +69,7 @@ public class StompSubscription {
protocolConverter.getTransportFilter().sendToActiveMQ(ack); protocolConverter.getTransportFilter().sendToActiveMQ(ack);
} }
StompCommand command = protocolConverter.convertMessage(message); StompFrame command = protocolConverter.convertMessage(message);
command.setAction(Stomp.Responses.MESSAGE); command.setAction(Stomp.Responses.MESSAGE);
if (subscriptionId!=null) { if (subscriptionId!=null) {

View File

@ -46,14 +46,6 @@ public class StompTransportFilter extends TransportFilter {
protocolConverter.setTransportFilter(this); protocolConverter.setTransportFilter(this);
} }
public void start() throws Exception {
super.start();
}
public void stop() throws Exception {
super.stop();
}
public void oneway(Command command) throws IOException { public void oneway(Command command) throws IOException {
try { try {
protocolConverter.onActiveMQCommad(command); protocolConverter.onActiveMQCommad(command);
@ -64,7 +56,7 @@ public class StompTransportFilter extends TransportFilter {
public void onCommand(Command command) { public void onCommand(Command command) {
try { try {
protocolConverter.onStompCommad((StompCommand) command); protocolConverter.onStompCommad((StompFrame) command);
} catch (IOException e) { } catch (IOException e) {
onException(e); onException(e);
} catch (JMSException e) { } catch (JMSException e) {
@ -72,24 +64,16 @@ public class StompTransportFilter extends TransportFilter {
} }
} }
public void onException(IOException error) {
protocolConverter.onStompExcepton(error);
transportListener.onException(error);
}
public void sendToActiveMQ(Command command) { public void sendToActiveMQ(Command command) {
synchronized(sendToActiveMQMutex) { synchronized(sendToActiveMQMutex) {
transportListener.onCommand(command); transportListener.onCommand(command);
} }
} }
public void sendToStomp(StompCommand command) throws IOException { public void sendToStomp(StompFrame command) throws IOException {
synchronized(sendToStompMutex) { synchronized(sendToStompMutex) {
next.oneway(command); next.oneway(command);
} }
} }
} }

View File

@ -19,7 +19,6 @@ package org.apache.activemq.transport.stomp2;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.ProtocolException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
@ -62,7 +61,7 @@ public class StompWireFormat implements WireFormat {
} }
public void marshal(Object command, DataOutputStream os) throws IOException { public void marshal(Object command, DataOutputStream os) throws IOException {
StompCommand stomp = (org.apache.activemq.transport.stomp2.StompCommand) command; StompFrame stomp = (org.apache.activemq.transport.stomp2.StompFrame) command;
StringBuffer buffer = new StringBuffer(); StringBuffer buffer = new StringBuffer();
buffer.append(stomp.getAction()); buffer.append(stomp.getAction());
@ -88,92 +87,97 @@ public class StompWireFormat implements WireFormat {
public Object unmarshal(DataInputStream in) throws IOException { public Object unmarshal(DataInputStream in) throws IOException {
String action = null; try {
String action = null;
// skip white space to next real action line // skip white space to next real action line
while (true) { while (true) {
action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded"); action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
if (action == null) { if (action == null) {
throw new IOException("connection was closed"); throw new IOException("connection was closed");
} else { } else {
action = action.trim(); action = action.trim();
if (action.length() > 0) { if (action.length() > 0) {
break; break;
}
} }
} }
}
// Parse the headers // Parse the headers
HashMap headers = new HashMap(25); HashMap headers = new HashMap(25);
while (true) { while (true) {
String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded"); String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
if (line != null && line.trim().length() > 0) { if (line != null && line.trim().length() > 0) {
if( headers.size() > MAX_HEADERS ) if( headers.size() > MAX_HEADERS )
throw new ProtocolException("The maximum number of headers was exceeded"); throw new ProtocolException("The maximum number of headers was exceeded", true);
try { try {
int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR); int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
String name = line.substring(0, seperator_index).trim(); String name = line.substring(0, seperator_index).trim();
String value = line.substring(seperator_index + 1, line.length()).trim(); String value = line.substring(seperator_index + 1, line.length()).trim();
headers.put(name, value); headers.put(name, value);
} }
catch (Exception e) { catch (Exception e) {
throw new ProtocolException("Unable to parser header line [" + line + "]"); throw new ProtocolException("Unable to parser header line [" + line + "]", true);
} }
} }
else { else {
break; break;
} }
}
// Read in the data part.
byte[] data = NO_DATA;
String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
if (contentLength!=null) {
// Bless the client, he's telling us how much data to read in.
int length;
try {
length = Integer.parseInt(contentLength.trim());
} catch (NumberFormatException e) {
throw new ProtocolException("Specified content-length is not a valid integer");
} }
if( length > MAX_DATA_LENGTH ) // Read in the data part.
throw new ProtocolException("The maximum data length was exceeded"); byte[] data = NO_DATA;
String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
if (contentLength!=null) {
data = new byte[length]; // Bless the client, he's telling us how much data to read in.
in.readFully(data); int length;
try {
length = Integer.parseInt(contentLength.trim());
} catch (NumberFormatException e) {
throw new ProtocolException("Specified content-length is not a valid integer", true);
}
if (in.readByte() != 0) { if( length > MAX_DATA_LENGTH )
throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH+" bytes were read and " + "there was no trailing null byte"); throw new ProtocolException("The maximum data length was exceeded", true);
}
} else { data = new byte[length];
in.readFully(data);
// We don't know how much to read.. data ends when we hit a 0 if (in.readByte() != 0) {
byte b; throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH+" bytes were read and " + "there was no trailing null byte", true);
ByteArrayOutputStream baos=null; }
while ((b = in.readByte()) != 0) {
if( baos == null ) { } else {
baos = new ByteArrayOutputStream();
} else if( baos.size() > MAX_DATA_LENGTH ) {
throw new ProtocolException("The maximum data length was exceeded");
}
baos.write(b); // We don't know how much to read.. data ends when we hit a 0
} byte b;
ByteArrayOutputStream baos=null;
while ((b = in.readByte()) != 0) {
if( baos!=null ) { if( baos == null ) {
baos.close(); baos = new ByteArrayOutputStream();
data = baos.toByteArray(); } else if( baos.size() > MAX_DATA_LENGTH ) {
} throw new ProtocolException("The maximum data length was exceeded", true);
}
} baos.write(b);
}
return new StompCommand(action, headers, data); if( baos!=null ) {
baos.close();
data = baos.toByteArray();
}
}
return new StompFrame(action, headers, data);
} catch (ProtocolException e) {
return new StompFrameError(e);
}
} }
@ -182,7 +186,7 @@ public class StompWireFormat implements WireFormat {
ByteArrayOutputStream baos=new ByteArrayOutputStream(maxLength); ByteArrayOutputStream baos=new ByteArrayOutputStream(maxLength);
while ((b = in.readByte()) != '\n') { while ((b = in.readByte()) != '\n') {
if( baos.size() > maxLength ) if( baos.size() > maxLength )
throw new ProtocolException(errorMessage); throw new ProtocolException(errorMessage, true);
baos.write(b); baos.write(b);
} }
ByteSequence sequence = baos.toByteSequence(); ByteSequence sequence = baos.toByteSequence();