From c47de061b77c71beafaf53227b920a31e967b9b3 Mon Sep 17 00:00:00 2001 From: Nathan Christopher Mittler Date: Sun, 23 Apr 2006 22:08:21 +0000 Subject: [PATCH] Fixed handling of bytes message for Jira issue AMQ-685 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@396329 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/activemq/io/BufferedInputStream.cpp | 48 ++++--- .../src/activemq/io/BufferedOutputStream.cpp | 32 +++-- .../src/activemq/io/BufferedOutputStream.h | 5 + .../src/activemq/io/SocketStream.cpp | 122 ++++++++++++++---- .../src/activemq/transport/stomp/StompIO.cpp | 29 ++++- 5 files changed, 163 insertions(+), 73 deletions(-) diff --git a/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp b/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp index 9e4fceab1c..407d96c2bf 100644 --- a/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp +++ b/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp @@ -86,41 +86,39 @@ char BufferedInputStream::read() throw (ActiveMQException){ //////////////////////////////////////////////////////////////////////////////// int BufferedInputStream::read( char* buffer, const int bufferSize ) throw (ActiveMQException){ - - int totalRead = 0; - // Get the number of bytes that can be copied directly from - // the buffer. - int bytesToCopy = min( tail-head, bufferSize ); - - // Copy the data to the output buffer. - memcpy( buffer, this->buffer+head, bytesToCopy ); - - // Increment the total bytes read. - totalRead += bytesToCopy; - - // Increment the head position. If the buffer is now empty, - // reset the positions. - head += bytesToCopy; - if( head == tail ){ - head = tail = 0; - } - - // If we still haven't filled the output buffer, read a buffer's + // If we still haven't filled the output buffer AND there is data + // on the input stream to be read, read a buffer's // worth from the stream. - if( bytesToCopy < bufferSize ){ - - // Buffer as much data as we can. - bufferData(); + int totalRead = 0; + while( totalRead < bufferSize ){ // Get the remaining bytes to copy. - bytesToCopy = min( tail-head, (bufferSize-bytesToCopy) ); + int bytesToCopy = min( tail-head, (bufferSize-totalRead) ); // Copy the data to the output buffer. memcpy( buffer+totalRead, this->buffer+head, bytesToCopy ); // Increment the total bytes read. totalRead += bytesToCopy; + + // Increment the head position. If the buffer is now empty, + // reset the positions and buffer more data. + head += bytesToCopy; + if( head == tail ){ + + // Reset the buffer indicies. + head = tail = 0; + + // If there is no more data currently available on the + // input stream, stop the loop. + if( stream->available() == 0 ){ + break; + } + + // Buffer as much data as we can. + bufferData(); + } } // Return the total number of bytes read. diff --git a/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp b/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp index 7dedee89e3..e3b9210551 100644 --- a/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp +++ b/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp @@ -66,21 +66,29 @@ void BufferedOutputStream::close() throw(cms::CMSException){ } //////////////////////////////////////////////////////////////////////////////// -void BufferedOutputStream::flush() throw (ActiveMQException){ +void BufferedOutputStream::emptyBuffer() throw (ActiveMQException){ if( head != tail ){ stream->write( buffer+head, tail-head ); } head = tail = 0; +} + +//////////////////////////////////////////////////////////////////////////////// +void BufferedOutputStream::flush() throw (ActiveMQException){ + // Empty the contents of the buffer to the output stream. + emptyBuffer(); + + // Flush the output stream. stream->flush(); } //////////////////////////////////////////////////////////////////////////////// void BufferedOutputStream::write( const char c ) throw (ActiveMQException){ - if( tail == bufferSize-1 ){ - flush(); + if( tail >= bufferSize ){ + emptyBuffer(); } buffer[tail++] = c; @@ -89,13 +97,14 @@ void BufferedOutputStream::write( const char c ) throw (ActiveMQException){ //////////////////////////////////////////////////////////////////////////////// void BufferedOutputStream::write( const char* buffer, const int len ) throw (ActiveMQException) -{ - - int pos = 0; - +{ // Iterate until all the data is written. - while( pos < len ){ + for( int pos=0; pos < len; ){ + if( tail >= bufferSize ){ + emptyBuffer(); + } + // Get the number of bytes left to write. int bytesToWrite = min( bufferSize-tail, len-pos ); @@ -106,12 +115,7 @@ void BufferedOutputStream::write( const char* buffer, const int len ) tail += bytesToWrite; // Decrease the number of bytes to write. - pos += bytesToWrite; - - // If we don't have enough space in the buffer, flush it. - if( bytesToWrite < len || tail >= bufferSize ){ - flush(); - } + pos += bytesToWrite; } } diff --git a/cms/activemqcms/src/activemq/io/BufferedOutputStream.h b/cms/activemqcms/src/activemq/io/BufferedOutputStream.h index fbca9b1faa..d40655984e 100644 --- a/cms/activemqcms/src/activemq/io/BufferedOutputStream.h +++ b/cms/activemqcms/src/activemq/io/BufferedOutputStream.h @@ -42,6 +42,11 @@ namespace io{ void init( OutputStream* stream, const int bufSize ); + /** + * Writes the contents of the buffer to the output stream. + */ + void emptyBuffer() throw (ActiveMQException); + private: OutputStream* stream; diff --git a/cms/activemqcms/src/activemq/io/SocketStream.cpp b/cms/activemqcms/src/activemq/io/SocketStream.cpp index 5cc9be222c..2ff5180637 100644 --- a/cms/activemqcms/src/activemq/io/SocketStream.cpp +++ b/cms/activemqcms/src/activemq/io/SocketStream.cpp @@ -74,30 +74,84 @@ char SocketStream::read() throw (ActiveMQException){ //////////////////////////////////////////////////////////////////////////////// int SocketStream::read( char* buffer, const int bufferSize ) throw (ActiveMQException){ - - int len = recv( socket->getHandle(), buffer, bufferSize, 0 ); - if( len < 0 ){ - socket->close(); - char buf[500]; - strerror_r( errno, buf, 500 ); - throw IOException( string("stomp::io::SocketStream::read(char*,int) - ") + buf ); - } - - /*printf("SocketStream:read():"); - for( int ix=0; ix 20 ) - printf("%c", buffer[ix] ); - else - printf("[%d]", buffer[ix] ); - } - printf("\n");*/ - return len; + int bytesAvailable = available(); + + while( true ){ + + int len = ::recv(socket->getHandle(), (char*)buffer, bufferSize, 0); + + // Check for typical error conditions. + if( len < 0 ){ + + #if defined(unix) && !defined(__CYGWIN__) + + // If the socket was temporarily unavailable - just try again. + if( errno == EAGAIN ){ + continue; + } + + // Create the error string. + char* errorString = ::strerror(errno); + + #else + + // If the socket was temporarily unavailable - just try again. + int errorCode = ::WSAGetLastError(); + if( errorCode == WSAEWOULDBLOCK ){ + continue; + } + + // Create the error string. + static const int errorStringSize = 512; + char errorString[errorStringSize]; + memset( errorString, 0, errorStringSize ); + FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, + 0, + errorCode, + 0, + errorString, + errorStringSize - 1, + NULL); + + #endif + + // Otherwise, this was a bad error - throw an exception. + throw IOException( string("stomp::io::SocketStream::write(char) - ") + errorString ); + } + + // No error, but no data - check for a broken socket. + if( len == 0 ){ + + // If the poll showed data, but we failed to read any, + // the socket is broken. + if( bytesAvailable > 0 ){ + throw IOException( "activemq::io::SocketInputStream::read - The connection is broken" ); + } + + // Socket is not broken, just had no data. + return 0; + } + + #ifdef SOCKET_IO_DEBUG + printf("SocketStream:read(), numbytes:%d -", len); + for( int ix=0; ix 20 ) + printf("%c", buffer[ix] ); + else + printf("[%d]", buffer[ix] ); + } + printf("\n"); + #endif + + // Data was read successfully - return the bytes read. + return len; + } } //////////////////////////////////////////////////////////////////////////////// void SocketStream::write( const char c ) throw (ActiveMQException){ - + /*if( c > 20 ){ printf("%c", c ); } @@ -109,29 +163,41 @@ void SocketStream::write( const char c ) throw (ActiveMQException){ char buf[500]; strerror_r( errno, buf, 500 ); throw IOException( string("stomp::io::SocketStream::write(char) - ") + buf ); - } + } } //////////////////////////////////////////////////////////////////////////////// void SocketStream::write( const char* buffer, const int len ) throw (ActiveMQException) { - /*for( int ix=0; ix 20 ){ - printf("%c", c ); - } - else printf("[%d]", c ); - }*/ + #ifdef SOCKET_IO_DEBUG + printf("SocketStream:write(), numbytes:%d -", len); + for( int ix=0; ix 20 ){ + printf("%c", c ); + } + else printf("[%d]", c ); + } + printf("\n" ); + #endif int remaining = len; while( remaining > 0 ) { - int length = send( socket->getHandle(), buffer, remaining, MSG_NOSIGNAL ); + int flags = 0; + #if defined(OSX) + flags = SO_NOSIGPIPE; + #elif defined( unix ) + flags = MSG_NOSIGNAL; + #endif + + int length = send( socket->getHandle(), buffer, remaining, flags ); if( length < 0 ){ socket->close(); char buf[500]; strerror_r( errno, buf, 500 ); + printf("exception in write\n" ); throw IOException( string("stomp::io::SocketStream::write(char*,int) - ") + buf ); } diff --git a/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp b/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp index b8aa1772ae..afcbe4a19d 100644 --- a/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp +++ b/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp @@ -63,6 +63,22 @@ int StompIO::readStompHeaderLine( char* buf, const int bufLen ) throw (ActiveMQE //////////////////////////////////////////////////////////////////////////////// int StompIO::readStompBodyLine( char* buf, const int bufLen ) throw (ActiveMQException){ + int content_length = 0; + + // Check for the content-length header. This is optional - if not provided + // we stop when we encounter a \0\n. + const StompFrame::HeaderInfo* headerInfo = frame.getHeaderInfo(StompFrame::HEADER_CONTENTLENGTH); + if( headerInfo != NULL ) + { + const char* lengthProperty = headerInfo->value; + char* stopped_string = NULL; + + content_length = strtoul( + lengthProperty, + &stopped_string, + 10); + } + int pos = 0; while( pos < bufLen ){ @@ -72,12 +88,12 @@ int StompIO::readStompBodyLine( char* buf, const int bufLen ) throw (ActiveMQExc // Increment the position pointer. pos++; - - // If we've reached the end of the body - return. - if( (buf[pos-1]=='\0' && pos==1) || - (pos >= 2 && buf[pos-2]=='\0' && buf[pos-1] == '\n') ){ - return pos; - } + + // Are we at the end of the frame? The end frame pattern is \0\n + bool foundFrameEndPattern = (pos >= 2 && buf[pos-2]=='\0' && buf[pos-1] == '\n'); + if( (pos > content_length) && foundFrameEndPattern ){ + return pos; + } } // Reading is not complete. @@ -220,6 +236,7 @@ void StompIO::writeStompFrame( StompFrame& frame ) throw( ActiveMQException ){ write( body, frame.getBodyLength() ); } write( '\0' ); + write( '\n' ); // Flush the stream. flush();