Fixed handling of bytes message for Jira issue AMQ-685

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@396329 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Nathan Christopher Mittler 2006-04-23 22:08:21 +00:00
parent 2acf76e923
commit c47de061b7
5 changed files with 163 additions and 73 deletions

View File

@ -86,41 +86,39 @@ char BufferedInputStream::read() throw (ActiveMQException){
////////////////////////////////////////////////////////////////////////////////
int BufferedInputStream::read( char* buffer,
const int bufferSize ) throw (ActiveMQException){
int totalRead = 0;
// Get the number of bytes that can be copied directly from
// the buffer.
int bytesToCopy = min( tail-head, bufferSize );
// Copy the data to the output buffer.
memcpy( buffer, this->buffer+head, bytesToCopy );
// Increment the total bytes read.
totalRead += bytesToCopy;
// Increment the head position. If the buffer is now empty,
// reset the positions.
head += bytesToCopy;
if( head == tail ){
head = tail = 0;
}
// If we still haven't filled the output buffer, read a buffer's
// If we still haven't filled the output buffer AND there is data
// on the input stream to be read, read a buffer's
// worth from the stream.
if( bytesToCopy < bufferSize ){
// Buffer as much data as we can.
bufferData();
int totalRead = 0;
while( totalRead < bufferSize ){
// Get the remaining bytes to copy.
bytesToCopy = min( tail-head, (bufferSize-bytesToCopy) );
int bytesToCopy = min( tail-head, (bufferSize-totalRead) );
// Copy the data to the output buffer.
memcpy( buffer+totalRead, this->buffer+head, bytesToCopy );
// Increment the total bytes read.
totalRead += bytesToCopy;
// Increment the head position. If the buffer is now empty,
// reset the positions and buffer more data.
head += bytesToCopy;
if( head == tail ){
// Reset the buffer indicies.
head = tail = 0;
// If there is no more data currently available on the
// input stream, stop the loop.
if( stream->available() == 0 ){
break;
}
// Buffer as much data as we can.
bufferData();
}
}
// Return the total number of bytes read.

View File

@ -66,21 +66,29 @@ void BufferedOutputStream::close() throw(cms::CMSException){
}
////////////////////////////////////////////////////////////////////////////////
void BufferedOutputStream::flush() throw (ActiveMQException){
void BufferedOutputStream::emptyBuffer() throw (ActiveMQException){
if( head != tail ){
stream->write( buffer+head, tail-head );
}
head = tail = 0;
}
////////////////////////////////////////////////////////////////////////////////
void BufferedOutputStream::flush() throw (ActiveMQException){
// Empty the contents of the buffer to the output stream.
emptyBuffer();
// Flush the output stream.
stream->flush();
}
////////////////////////////////////////////////////////////////////////////////
void BufferedOutputStream::write( const char c ) throw (ActiveMQException){
if( tail == bufferSize-1 ){
flush();
if( tail >= bufferSize ){
emptyBuffer();
}
buffer[tail++] = c;
@ -89,13 +97,14 @@ void BufferedOutputStream::write( const char c ) throw (ActiveMQException){
////////////////////////////////////////////////////////////////////////////////
void BufferedOutputStream::write( const char* buffer, const int len )
throw (ActiveMQException)
{
int pos = 0;
{
// Iterate until all the data is written.
while( pos < len ){
for( int pos=0; pos < len; ){
if( tail >= bufferSize ){
emptyBuffer();
}
// Get the number of bytes left to write.
int bytesToWrite = min( bufferSize-tail, len-pos );
@ -106,12 +115,7 @@ void BufferedOutputStream::write( const char* buffer, const int len )
tail += bytesToWrite;
// Decrease the number of bytes to write.
pos += bytesToWrite;
// If we don't have enough space in the buffer, flush it.
if( bytesToWrite < len || tail >= bufferSize ){
flush();
}
pos += bytesToWrite;
}
}

View File

@ -42,6 +42,11 @@ namespace io{
void init( OutputStream* stream, const int bufSize );
/**
* Writes the contents of the buffer to the output stream.
*/
void emptyBuffer() throw (ActiveMQException);
private:
OutputStream* stream;

View File

@ -74,30 +74,84 @@ char SocketStream::read() throw (ActiveMQException){
////////////////////////////////////////////////////////////////////////////////
int SocketStream::read( char* buffer, const int bufferSize ) throw (ActiveMQException){
int len = recv( socket->getHandle(), buffer, bufferSize, 0 );
if( len < 0 ){
socket->close();
char buf[500];
strerror_r( errno, buf, 500 );
throw IOException( string("stomp::io::SocketStream::read(char*,int) - ") + buf );
}
/*printf("SocketStream:read():");
for( int ix=0; ix<len; ++ix ){
if( buffer[ix] > 20 )
printf("%c", buffer[ix] );
else
printf("[%d]", buffer[ix] );
}
printf("\n");*/
return len;
int bytesAvailable = available();
while( true ){
int len = ::recv(socket->getHandle(), (char*)buffer, bufferSize, 0);
// Check for typical error conditions.
if( len < 0 ){
#if defined(unix) && !defined(__CYGWIN__)
// If the socket was temporarily unavailable - just try again.
if( errno == EAGAIN ){
continue;
}
// Create the error string.
char* errorString = ::strerror(errno);
#else
// If the socket was temporarily unavailable - just try again.
int errorCode = ::WSAGetLastError();
if( errorCode == WSAEWOULDBLOCK ){
continue;
}
// Create the error string.
static const int errorStringSize = 512;
char errorString[errorStringSize];
memset( errorString, 0, errorStringSize );
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
0,
errorCode,
0,
errorString,
errorStringSize - 1,
NULL);
#endif
// Otherwise, this was a bad error - throw an exception.
throw IOException( string("stomp::io::SocketStream::write(char) - ") + errorString );
}
// No error, but no data - check for a broken socket.
if( len == 0 ){
// If the poll showed data, but we failed to read any,
// the socket is broken.
if( bytesAvailable > 0 ){
throw IOException( "activemq::io::SocketInputStream::read - The connection is broken" );
}
// Socket is not broken, just had no data.
return 0;
}
#ifdef SOCKET_IO_DEBUG
printf("SocketStream:read(), numbytes:%d -", len);
for( int ix=0; ix<len; ++ix ){
if( buffer[ix] > 20 )
printf("%c", buffer[ix] );
else
printf("[%d]", buffer[ix] );
}
printf("\n");
#endif
// Data was read successfully - return the bytes read.
return len;
}
}
////////////////////////////////////////////////////////////////////////////////
void SocketStream::write( const char c ) throw (ActiveMQException){
/*if( c > 20 ){
printf("%c", c );
}
@ -109,29 +163,41 @@ void SocketStream::write( const char c ) throw (ActiveMQException){
char buf[500];
strerror_r( errno, buf, 500 );
throw IOException( string("stomp::io::SocketStream::write(char) - ") + buf );
}
}
}
////////////////////////////////////////////////////////////////////////////////
void SocketStream::write( const char* buffer, const int len )
throw (ActiveMQException)
{
/*for( int ix=0; ix<len; ++ix ){
char c = buffer[ix];
if( c > 20 ){
printf("%c", c );
}
else printf("[%d]", c );
}*/
#ifdef SOCKET_IO_DEBUG
printf("SocketStream:write(), numbytes:%d -", len);
for( int ix=0; ix<len; ++ix ){
char c = buffer[ix];
if( c > 20 ){
printf("%c", c );
}
else printf("[%d]", c );
}
printf("\n" );
#endif
int remaining = len;
while( remaining > 0 ) {
int length = send( socket->getHandle(), buffer, remaining, MSG_NOSIGNAL );
int flags = 0;
#if defined(OSX)
flags = SO_NOSIGPIPE;
#elif defined( unix )
flags = MSG_NOSIGNAL;
#endif
int length = send( socket->getHandle(), buffer, remaining, flags );
if( length < 0 ){
socket->close();
char buf[500];
strerror_r( errno, buf, 500 );
printf("exception in write\n" );
throw IOException( string("stomp::io::SocketStream::write(char*,int) - ") + buf );
}

View File

@ -63,6 +63,22 @@ int StompIO::readStompHeaderLine( char* buf, const int bufLen ) throw (ActiveMQE
////////////////////////////////////////////////////////////////////////////////
int StompIO::readStompBodyLine( char* buf, const int bufLen ) throw (ActiveMQException){
int content_length = 0;
// Check for the content-length header. This is optional - if not provided
// we stop when we encounter a \0\n.
const StompFrame::HeaderInfo* headerInfo = frame.getHeaderInfo(StompFrame::HEADER_CONTENTLENGTH);
if( headerInfo != NULL )
{
const char* lengthProperty = headerInfo->value;
char* stopped_string = NULL;
content_length = strtoul(
lengthProperty,
&stopped_string,
10);
}
int pos = 0;
while( pos < bufLen ){
@ -72,12 +88,12 @@ int StompIO::readStompBodyLine( char* buf, const int bufLen ) throw (ActiveMQExc
// Increment the position pointer.
pos++;
// If we've reached the end of the body - return.
if( (buf[pos-1]=='\0' && pos==1) ||
(pos >= 2 && buf[pos-2]=='\0' && buf[pos-1] == '\n') ){
return pos;
}
// Are we at the end of the frame? The end frame pattern is \0\n
bool foundFrameEndPattern = (pos >= 2 && buf[pos-2]=='\0' && buf[pos-1] == '\n');
if( (pos > content_length) && foundFrameEndPattern ){
return pos;
}
}
// Reading is not complete.
@ -220,6 +236,7 @@ void StompIO::writeStompFrame( StompFrame& frame ) throw( ActiveMQException ){
write( body, frame.getBodyLength() );
}
write( '\0' );
write( '\n' );
// Flush the stream.
flush();