From 530884a2d99104a864531b27132e2a0ae825f996 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Sun, 2 Jul 2006 15:00:13 +0000 Subject: [PATCH] Better protocol error handling. Fixed http://issues.apache.org/activemq/browse/AMQ-649 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@418602 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/stomp2/ProtocolConverter.java | 49 ++--- .../transport/stomp2/ProtocolException.java | 50 +++++ .../{StompCommand.java => StompFrame.java} | 6 +- .../transport/stomp2/StompFrameError.java | 38 ++++ .../transport/stomp2/StompSubscription.java | 2 +- .../stomp2/StompTransportFilter.java | 20 +- .../transport/stomp2/StompWireFormat.java | 174 +++++++++--------- 7 files changed, 208 insertions(+), 131 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolException.java rename activemq-core/src/main/java/org/apache/activemq/transport/stomp2/{StompCommand.java => StompFrame.java} (95%) create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompFrameError.java diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java index a5b40f64b8..0a59287e2b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java @@ -19,7 +19,6 @@ package org.apache.activemq.transport.stomp2; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; -import java.net.ProtocolException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -87,13 +86,13 @@ public class ProtocolConverter { } } - protected ResponseHandler createResponseHandler(StompCommand command){ + protected ResponseHandler createResponseHandler(StompFrame command){ final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); // A response may not be needed. if( receiptId != null ) { return new ResponseHandler() { public void onResponse(ProtocolConverter converter, Response response) throws IOException { - StompCommand sc = new StompCommand(); + StompFrame sc = new StompFrame(); sc.setHeaders(new HashMap(5)); sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); transportFilter.sendToStomp(sc); @@ -112,7 +111,7 @@ public class ProtocolConverter { transportFilter.sendToActiveMQ(command); } - protected void sendToStomp(StompCommand command) throws IOException { + protected void sendToStomp(StompFrame command) throws IOException { transportFilter.sendToStomp(command); } @@ -120,9 +119,13 @@ public class ProtocolConverter { * Convert a stomp command * @param command */ - public void onStompCommad( StompCommand command ) throws IOException, JMSException { + public void onStompCommad( StompFrame command ) throws IOException, JMSException { try { + if( command.getClass() == StompFrameError.class ) { + throw ((StompFrameError)command).getException(); + } + String action = command.getAction(); if (action.startsWith(Stomp.Commands.SEND)) onStompSend(command); @@ -161,13 +164,15 @@ public class ProtocolConverter { headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId); } - StompCommand errorMessage = new StompCommand(Stomp.Responses.ERROR,headers,baos.toByteArray()); + StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR,headers,baos.toByteArray()); sendToStomp(errorMessage); + if( e.isFatal() ) + getTransportFilter().onException(e); } } - protected void onStompSend(StompCommand command) throws IOException, JMSException { + protected void onStompSend(StompFrame command) throws IOException, JMSException { checkConnected(); Map headers = command.getHeaders(); @@ -193,7 +198,7 @@ public class ProtocolConverter { } - protected void onStompAck(StompCommand command) throws ProtocolException { + protected void onStompAck(StompFrame command) throws ProtocolException { checkConnected(); // TODO: acking with just a message id is very bogus @@ -231,7 +236,7 @@ public class ProtocolConverter { } - protected void onStompBegin(StompCommand command) throws ProtocolException { + protected void onStompBegin(StompFrame command) throws ProtocolException { checkConnected(); Map headers = command.getHeaders(); @@ -258,7 +263,7 @@ public class ProtocolConverter { } - protected void onStompCommit(StompCommand command) throws ProtocolException { + protected void onStompCommit(StompFrame command) throws ProtocolException { checkConnected(); Map headers = command.getHeaders(); @@ -283,7 +288,7 @@ public class ProtocolConverter { sendToActiveMQ(tx, createResponseHandler(command)); } - protected void onStompAbort(StompCommand command) throws ProtocolException { + protected void onStompAbort(StompFrame command) throws ProtocolException { checkConnected(); Map headers = command.getHeaders(); @@ -308,7 +313,7 @@ public class ProtocolConverter { } - protected void onStompSubscribe(StompCommand command) throws ProtocolException { + protected void onStompSubscribe(StompFrame command) throws ProtocolException { checkConnected(); Map headers = command.getHeaders(); @@ -343,7 +348,7 @@ public class ProtocolConverter { } - protected void onStompUnsubscribe(StompCommand command) throws ProtocolException { + protected void onStompUnsubscribe(StompFrame command) throws ProtocolException { checkConnected(); Map headers = command.getHeaders(); @@ -375,7 +380,7 @@ public class ProtocolConverter { throw new ProtocolException("No subscription matched."); } - protected void onStompConnect(StompCommand command) throws ProtocolException { + protected void onStompConnect(StompFrame command) throws ProtocolException { if(connected.get()) { throw new ProtocolException("Allready connected."); @@ -422,7 +427,7 @@ public class ProtocolConverter { responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId); } - StompCommand sc = new StompCommand(); + StompFrame sc = new StompFrame(); sc.setAction(Stomp.Responses.CONNECTED); sc.setHeaders(responseHeaders); sendToStomp(sc); @@ -434,7 +439,7 @@ public class ProtocolConverter { } - protected void onStompDisconnect(StompCommand command) throws ProtocolException { + protected void onStompDisconnect(StompFrame command) throws ProtocolException { checkConnected(); sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command)); connected.set(false); @@ -473,7 +478,7 @@ public class ProtocolConverter { } - public ActiveMQMessage convertMessage(StompCommand command) throws IOException, JMSException { + public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException { Map headers = command.getHeaders(); // now the body @@ -488,7 +493,7 @@ public class ProtocolConverter { try { text.setText(new String(command.getContent(), "UTF-8")); } catch (Throwable e) { - throw (ProtocolException)new ProtocolException("Text could not bet set: "+e).initCause(e); + throw new ProtocolException("Text could not bet set: "+e, false, e); } msg = text; } @@ -530,9 +535,9 @@ public class ProtocolConverter { return msg; } - public StompCommand convertMessage(ActiveMQMessage message) throws IOException, JMSException { + public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException { - StompCommand command = new StompCommand(); + StompFrame command = new StompFrame(); command.setAction(Stomp.Responses.MESSAGE); HashMap headers = new HashMap(); @@ -620,8 +625,4 @@ public class ProtocolConverter { this.transportFilter = transportFilter; } - public void onStompExcepton(IOException error) { - // TODO Auto-generated method stub - } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolException.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolException.java new file mode 100644 index 0000000000..ca1f470b90 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolException.java @@ -0,0 +1,50 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp2; + +import java.io.IOException; + +/** + * + * @author chirino + */ +public class ProtocolException extends IOException { + + private static final long serialVersionUID = -2869735532997332242L; + + private final boolean fatal; + + public ProtocolException() { + this(null); + } + public ProtocolException(String s) { + this(s, false); + } + public ProtocolException(String s, boolean fatal) { + this(s,fatal, null); + } + public ProtocolException(String s, boolean fatal, Throwable cause) { + super(s); + this.fatal = fatal; + initCause(cause); + } + + public boolean isFatal() { + return fatal; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompFrame.java similarity index 95% rename from activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java rename to activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompFrame.java index ece9415ac7..5d735c0a7b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompFrame.java @@ -30,7 +30,7 @@ import org.apache.activemq.state.CommandVisitor; * * @author chirino */ -public class StompCommand implements Command { +public class StompFrame implements Command { private static final byte[] NO_DATA = new byte[]{}; @@ -38,13 +38,13 @@ public class StompCommand implements Command { private Map headers = Collections.EMPTY_MAP; private byte[] content = NO_DATA; - public StompCommand(String command, HashMap headers, byte[] data) { + public StompFrame(String command, HashMap headers, byte[] data) { this.action = command; this.headers = headers; this.content = data; } - public StompCommand() { + public StompFrame() { } public String getAction() { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompFrameError.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompFrameError.java new file mode 100644 index 0000000000..9a49f69f18 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompFrameError.java @@ -0,0 +1,38 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp2; + +/** + * Command indicating that an invalid Stomp Frame was received. + * + * @author chirino + */ +public class StompFrameError extends StompFrame { + + + private final ProtocolException exception; + + public StompFrameError(ProtocolException exception) { + this.exception = exception; + } + + public ProtocolException getException() { + return exception; + } + + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java index a056fcaa56..0a5aa3e493 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java @@ -69,7 +69,7 @@ public class StompSubscription { protocolConverter.getTransportFilter().sendToActiveMQ(ack); } - StompCommand command = protocolConverter.convertMessage(message); + StompFrame command = protocolConverter.convertMessage(message); command.setAction(Stomp.Responses.MESSAGE); if (subscriptionId!=null) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java index cfdae0c45f..8c8a33eed8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java @@ -46,14 +46,6 @@ public class StompTransportFilter extends TransportFilter { protocolConverter.setTransportFilter(this); } - public void start() throws Exception { - super.start(); - } - - public void stop() throws Exception { - super.stop(); - } - public void oneway(Command command) throws IOException { try { protocolConverter.onActiveMQCommad(command); @@ -64,7 +56,7 @@ public class StompTransportFilter extends TransportFilter { public void onCommand(Command command) { try { - protocolConverter.onStompCommad((StompCommand) command); + protocolConverter.onStompCommad((StompFrame) command); } catch (IOException e) { onException(e); } catch (JMSException e) { @@ -72,24 +64,16 @@ public class StompTransportFilter extends TransportFilter { } } - public void onException(IOException error) { - protocolConverter.onStompExcepton(error); - transportListener.onException(error); - } - - public void sendToActiveMQ(Command command) { synchronized(sendToActiveMQMutex) { transportListener.onCommand(command); } } - public void sendToStomp(StompCommand command) throws IOException { + public void sendToStomp(StompFrame command) throws IOException { synchronized(sendToStompMutex) { next.oneway(command); } } - - } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java index 735df72528..1317dee9a5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java @@ -19,7 +19,6 @@ package org.apache.activemq.transport.stomp2; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.net.ProtocolException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -62,7 +61,7 @@ public class StompWireFormat implements WireFormat { } public void marshal(Object command, DataOutputStream os) throws IOException { - StompCommand stomp = (org.apache.activemq.transport.stomp2.StompCommand) command; + StompFrame stomp = (org.apache.activemq.transport.stomp2.StompFrame) command; StringBuffer buffer = new StringBuffer(); buffer.append(stomp.getAction()); @@ -88,92 +87,97 @@ public class StompWireFormat implements WireFormat { public Object unmarshal(DataInputStream in) throws IOException { - String action = null; - - // skip white space to next real action line - while (true) { - action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded"); - if (action == null) { - throw new IOException("connection was closed"); - } else { - action = action.trim(); - if (action.length() > 0) { - break; + try { + String action = null; + + // skip white space to next real action line + while (true) { + action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded"); + if (action == null) { + throw new IOException("connection was closed"); + } else { + action = action.trim(); + if (action.length() > 0) { + break; + } } } - } - - // Parse the headers - HashMap headers = new HashMap(25); - while (true) { - String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded"); - if (line != null && line.trim().length() > 0) { - - if( headers.size() > MAX_HEADERS ) - throw new ProtocolException("The maximum number of headers was exceeded"); - - try { - int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR); - String name = line.substring(0, seperator_index).trim(); - String value = line.substring(seperator_index + 1, line.length()).trim(); - headers.put(name, value); - } - catch (Exception e) { - throw new ProtocolException("Unable to parser header line [" + line + "]"); - } - } - else { - break; - } - } - - // Read in the data part. - byte[] data = NO_DATA; - String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH); - if (contentLength!=null) { - - // Bless the client, he's telling us how much data to read in. - int length; - try { - length = Integer.parseInt(contentLength.trim()); - } catch (NumberFormatException e) { - throw new ProtocolException("Specified content-length is not a valid integer"); - } - - if( length > MAX_DATA_LENGTH ) - throw new ProtocolException("The maximum data length was exceeded"); - data = new byte[length]; - in.readFully(data); - - if (in.readByte() != 0) { - throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH+" bytes were read and " + "there was no trailing null byte"); - } - - } else { + // Parse the headers + HashMap headers = new HashMap(25); + while (true) { + String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded"); + if (line != null && line.trim().length() > 0) { + + if( headers.size() > MAX_HEADERS ) + throw new ProtocolException("The maximum number of headers was exceeded", true); + + try { + int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR); + String name = line.substring(0, seperator_index).trim(); + String value = line.substring(seperator_index + 1, line.length()).trim(); + headers.put(name, value); + } + catch (Exception e) { + throw new ProtocolException("Unable to parser header line [" + line + "]", true); + } + } + else { + break; + } + } + + // Read in the data part. + byte[] data = NO_DATA; + String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH); + if (contentLength!=null) { + + // Bless the client, he's telling us how much data to read in. + int length; + try { + length = Integer.parseInt(contentLength.trim()); + } catch (NumberFormatException e) { + throw new ProtocolException("Specified content-length is not a valid integer", true); + } - // We don't know how much to read.. data ends when we hit a 0 - byte b; - ByteArrayOutputStream baos=null; - while ((b = in.readByte()) != 0) { - - if( baos == null ) { - baos = new ByteArrayOutputStream(); - } else if( baos.size() > MAX_DATA_LENGTH ) { - throw new ProtocolException("The maximum data length was exceeded"); - } - - baos.write(b); - } - - if( baos!=null ) { - baos.close(); - data = baos.toByteArray(); - } - - } - - return new StompCommand(action, headers, data); + if( length > MAX_DATA_LENGTH ) + throw new ProtocolException("The maximum data length was exceeded", true); + + data = new byte[length]; + in.readFully(data); + + if (in.readByte() != 0) { + throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH+" bytes were read and " + "there was no trailing null byte", true); + } + + } else { + + // We don't know how much to read.. data ends when we hit a 0 + byte b; + ByteArrayOutputStream baos=null; + while ((b = in.readByte()) != 0) { + + if( baos == null ) { + baos = new ByteArrayOutputStream(); + } else if( baos.size() > MAX_DATA_LENGTH ) { + throw new ProtocolException("The maximum data length was exceeded", true); + } + + baos.write(b); + } + + if( baos!=null ) { + baos.close(); + data = baos.toByteArray(); + } + + } + + return new StompFrame(action, headers, data); + + } catch (ProtocolException e) { + return new StompFrameError(e); + } } @@ -182,7 +186,7 @@ public class StompWireFormat implements WireFormat { ByteArrayOutputStream baos=new ByteArrayOutputStream(maxLength); while ((b = in.readByte()) != '\n') { if( baos.size() > maxLength ) - throw new ProtocolException(errorMessage); + throw new ProtocolException(errorMessage, true); baos.write(b); } ByteSequence sequence = baos.toByteSequence();