Moved AutoCloseInputStream ResponseConsumedWatcher DefaultResponseConsumedWatcher classes from HttpCore to HttpClient

git-svn-id: https://svn.apache.org/repos/asf/jakarta/httpcomponents/trunk/http-client@358363 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Oleg Kalnichevski 2005-12-21 19:50:25 +00:00
parent 45faf4c1f6
commit 0c1449688b
5 changed files with 511 additions and 0 deletions

View File

@ -0,0 +1,143 @@
/*
* $HeadURL$
* $Revision$
* $Date$
*
* ====================================================================
*
* Copyright 2002-2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.impl;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
/**
* Closes an underlying stream as soon as the end of the stream is reached, and
* notifies a client when it has done so.
*
* @author Ortwin Glück
* @author Eric Johnson
* @author <a href="mailto:mbowler@GargoyleSoftware.com">Mike Bowler</a>
*
* @since 2.0
*/
class AutoCloseInputStream extends FilterInputStream {
/**
* The watcher is notified when the contents of the stream have
* been exhausted
*/
private ResponseConsumedWatcher watcher = null;
private boolean watcherNotified = false;
/**
* Create a new auto closing stream for the provided connection
*
* @param in the input stream to read from
* @param watcher To be notified when the contents of the stream have been
* consumed.
*/
public AutoCloseInputStream(
final InputStream in, final ResponseConsumedWatcher watcher) {
super(in);
this.watcher = watcher;
}
/**
* Reads the next byte of data from the input stream.
*
* @throws IOException when there is an error reading
* @return the character read, or -1 for EOF
*/
public int read() throws IOException {
int l = super.read();
checkEndOfStream(l);
return l;
}
/**
* Reads up to <code>len</code> bytes of data from the stream.
*
* @param b a <code>byte</code> array to read data into
* @param off an offset within the array to store data
* @param len the maximum number of bytes to read
* @return the number of bytes read or -1 for EOF
* @throws IOException if there are errors reading
*/
public int read(byte[] b, int off, int len) throws IOException {
int l = super.read(b, off, len);
checkEndOfStream(l);
return l;
}
/**
* Reads some number of bytes from the input stream and stores them into the
* buffer array b.
*
* @param b a <code>byte</code> array to read data into
* @return the number of bytes read or -1 for EOF
* @throws IOException if there are errors reading
*/
public int read(byte[] b) throws IOException {
int l = super.read(b);
checkEndOfStream(l);
return l;
}
/**
* Close the stream, and also close the underlying stream if it is not
* already closed.
* @throws IOException If an IO problem occurs.
*/
public void close() throws IOException {
super.close();
ensureWatcherNotified();
}
/**
* Close the underlying stream should the end of the stream arrive.
*
* @param readResult The result of the read operation to check.
* @throws IOException If an IO problem occurs.
*/
private void checkEndOfStream(int readResult) throws IOException {
if (readResult == -1) {
close();
}
}
/**
* Notify the watcher that the contents have been consumed.
* @throws IOException If an IO problem occurs.
*/
private void ensureWatcherNotified() throws IOException {
if (!this.watcherNotified) {
this.watcherNotified = true;
if (watcher != null) {
watcher.responseConsumed();
}
}
}
}

View File

@ -0,0 +1,76 @@
/*
* $HeadURL$
* $Revision$
* $Date$
*
* ====================================================================
*
* Copyright 1999-2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.impl;
import java.io.IOException;
import org.apache.http.HttpConnection;
import org.apache.http.HttpResponse;
/**
* <p>
* </p>
* @author <a href="mailto:oleg at ural.ru">Oleg Kalnichevski</a>
*
* @version $Revision$
*
* @since 4.0
*/
public class DefaultResponseConsumedWatcher
implements ResponseConsumedWatcher {
private final HttpConnection conn;
private final HttpResponse response;
public DefaultResponseConsumedWatcher(
final HttpConnection conn, final HttpResponse response) {
super();
if (conn == null) {
throw new IllegalArgumentException("HTTP connection may not be null");
}
if (response == null) {
throw new IllegalArgumentException("HTTP response may not be null");
}
this.conn = conn;
this.response = response;
}
public void responseConsumed() {
ConnectionReuseStrategy s = new DefaultConnectionReuseStrategy();
if (!s.keepAlive(this.response)) {
try {
this.conn.close();
} catch (IOException ex) {
// log error
}
}
}
}

View File

@ -0,0 +1,52 @@
/*
* $HeadURL$
* $Revision$
* $Date$
*
* ====================================================================
*
* Copyright 1999-2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.impl;
/**
* When a response stream has been consumed, various parts of the HttpClient
* implementation need to respond appropriately.
*
* <p>When one of the three types of {@link java.io.InputStream}, one of
* AutoCloseInputStream (package), {@link ContentLengthInputStream}, or
* {@link ChunkedInputStream} finishes with its content, either because
* all content has been consumed, or because it was explicitly closed,
* it notifies its corresponding method via this interface.</p>
*
* @see ContentLengthInputStream
* @see ChunkedInputStream
* @author Eric Johnson
*/
interface ResponseConsumedWatcher {
/**
* A response has been consumed.
*/
void responseConsumed();
}

View File

@ -0,0 +1,113 @@
/*
* $HeadURL$
* $Revision$
* $Date$
* ====================================================================
*
* Copyright 2002-2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.impl;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
public class TestAutoCloseInputStream extends TestCase {
public TestAutoCloseInputStream(String testName) {
super(testName);
}
// ------------------------------------------------------- TestCase Methods
public static Test suite() {
return new TestSuite(TestAutoCloseInputStream.class);
}
// ------------------------------------------------------------------- Main
public static void main(String args[]) {
String[] testCaseName = { TestAutoCloseInputStream.class.getName() };
junit.textui.TestRunner.main(testCaseName);
}
class TestResponseConsumedWatcher implements ResponseConsumedWatcher {
private int count;
public TestResponseConsumedWatcher() {
super();
count = 0;
}
public void responseConsumed() {
count++;
}
public int getCount() {
return this.count;
}
};
public void testConstructors() throws Exception {
InputStream instream = new ByteArrayInputStream(new byte[] {});
ResponseConsumedWatcher watcher = new TestResponseConsumedWatcher();
new AutoCloseInputStream(instream, watcher);
new AutoCloseInputStream(instream, null);
}
public void testBasics() throws IOException {
byte[] input = "0123456789ABCDEF".getBytes("US-ASCII");
InputStream instream = new ByteArrayInputStream(input);
TestResponseConsumedWatcher watcher = new TestResponseConsumedWatcher();
instream = new AutoCloseInputStream(instream, watcher);
byte[] tmp = new byte[input.length];
int ch = instream.read();
assertTrue(ch != -1);
tmp[0] = (byte)ch;
assertTrue(instream.read(tmp, 1, tmp.length - 1) != -1);
assertTrue(instream.read(tmp) == -1);
for (int i = 0; i < input.length; i++) {
assertEquals(input[i], tmp[i]);
}
assertTrue(instream.read() == -1);
instream.close();
instream.close();
// Has been triggered once only
assertEquals(1, watcher.getCount());
}
public void testNullWatcher() throws IOException {
byte[] input = "0".getBytes("US-ASCII");
InputStream instream = new ByteArrayInputStream(input);
instream = new AutoCloseInputStream(instream, null);
assertTrue(instream.read() != -1);
assertTrue(instream.read() == -1);
assertTrue(instream.read() == -1);
}
}

View File

@ -0,0 +1,127 @@
/*
* $HeadURL$
* $Revision$
* $Date$
* ====================================================================
*
* Copyright 2002-2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.impl;
import java.io.ByteArrayInputStream;
import org.apache.http.Header;
import org.apache.http.HttpConnection;
import org.apache.http.HttpMutableEntity;
import org.apache.http.HttpMutableResponse;
import org.apache.http.HttpVersion;
import org.apache.http.StatusLine;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.message.BasicHttpResponse;
import org.apache.http.mockup.HttpConnectionMockup;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
public class TestDefaultResponseConsumedWatcher extends TestCase {
public TestDefaultResponseConsumedWatcher(String testName) {
super(testName);
}
// ------------------------------------------------------- TestCase Methods
public static Test suite() {
return new TestSuite(TestDefaultResponseConsumedWatcher.class);
}
// ------------------------------------------------------------------- Main
public static void main(String args[]) {
String[] testCaseName = { TestDefaultResponseConsumedWatcher.class.getName() };
junit.textui.TestRunner.main(testCaseName);
}
public void testIllegalResponseArg() throws Exception {
try {
new DefaultResponseConsumedWatcher(null, null);
fail("IllegalArgumentException should have been thrown");
} catch (IllegalArgumentException ex) {
// expected
}
try {
new DefaultResponseConsumedWatcher(
new HttpConnectionMockup(), null);
fail("IllegalArgumentException should have been thrown");
} catch (IllegalArgumentException ex) {
// expected
}
}
public void testConnectionAutoClose() throws Exception {
byte[] data = new byte[] {'1', '2', '3'};
HttpConnection conn = new HttpConnectionMockup();
HttpMutableEntity entity = new BasicHttpEntity();
entity.setChunked(false);
entity.setContentLength(data.length);
entity.setContent(new ByteArrayInputStream(data));
StatusLine statusline = new StatusLine(HttpVersion.HTTP_1_0, 200, "OK");
HttpMutableResponse response = new BasicHttpResponse(statusline);
response.setHeader(new Header("Connection", "Close"));
response.setParams(new DefaultHttpParams(null));
response.setEntity(entity);
// Wrap the entity input stream
ResponseConsumedWatcher watcher = new DefaultResponseConsumedWatcher(conn, response);
entity.setContent(new AutoCloseInputStream(entity.getContent(), watcher));
assertTrue(conn.isOpen());
while (entity.getContent().read() != -1) {}
assertFalse(conn.isOpen());
}
public void testConnectionKeepAlive() throws Exception {
byte[] data = new byte[] {'1', '2', '3'};
HttpConnection conn = new HttpConnectionMockup();
HttpMutableEntity entity = new BasicHttpEntity();
entity.setChunked(false);
entity.setContentLength(data.length);
entity.setContent(new ByteArrayInputStream(data));
StatusLine statusline = new StatusLine(HttpVersion.HTTP_1_1, 200, "OK");
HttpMutableResponse response = new BasicHttpResponse(statusline);
response.setHeader(new Header("Connection", "Keep-alive"));
response.setParams(new DefaultHttpParams(null));
response.setEntity(entity);
// Wrap the entity input stream
ResponseConsumedWatcher watcher = new DefaultResponseConsumedWatcher(conn, response);
entity.setContent(new AutoCloseInputStream(entity.getContent(), watcher));
assertTrue(conn.isOpen());
while (entity.getContent().read() != -1) {}
assertTrue(conn.isOpen());
}
}