JCLOUDS-532: Properly close HTTP streams

This commit is contained in:
Ignasi Barrera 2015-03-09 12:03:18 +01:00
parent 9f93941525
commit ec63b55a04
5 changed files with 301 additions and 37 deletions

View File

@ -17,7 +17,6 @@
package org.jclouds.http.handlers; package org.jclouds.http.handlers;
import static java.lang.Math.max; import static java.lang.Math.max;
import static org.jclouds.http.HttpUtils.releasePayload;
import java.io.IOException; import java.io.IOException;
import java.util.Random; import java.util.Random;
@ -95,7 +94,6 @@ public class BackoffLimitedRetryHandler implements HttpRetryHandler, IOException
} }
public boolean shouldRetryRequest(HttpCommand command, HttpResponse response) { public boolean shouldRetryRequest(HttpCommand command, HttpResponse response) {
releasePayload(response);
return ifReplayableBackoffAndReturnTrue(command); return ifReplayableBackoffAndReturnTrue(command);
} }

View File

@ -19,6 +19,7 @@ package org.jclouds.http.internal;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Throwables.propagate; import static com.google.common.base.Throwables.propagate;
import static org.jclouds.http.HttpUtils.checkRequestHasContentLengthOrChunkedEncoding; import static org.jclouds.http.HttpUtils.checkRequestHasContentLengthOrChunkedEncoding;
import static org.jclouds.http.HttpUtils.releasePayload;
import static org.jclouds.http.HttpUtils.wirePayloadIfEnabled; import static org.jclouds.http.HttpUtils.wirePayloadIfEnabled;
import static org.jclouds.util.Throwables2.getFirstThrowableOfType; import static org.jclouds.util.Throwables2.getFirstThrowableOfType;
@ -42,6 +43,8 @@ import org.jclouds.http.handlers.DelegatingRetryHandler;
import org.jclouds.io.ContentMetadataCodec; import org.jclouds.io.ContentMetadataCodec;
import org.jclouds.logging.Logger; import org.jclouds.logging.Logger;
import com.google.common.annotations.VisibleForTesting;
public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandExecutorService { public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandExecutorService {
protected final HttpUtils utils; protected final HttpUtils utils;
protected final ContentMetadataCodec contentMetadataCodec; protected final ContentMetadataCodec contentMetadataCodec;
@ -120,13 +123,17 @@ public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandEx
return response; return response;
} }
private boolean shouldContinue(HttpCommand command, HttpResponse response) { @VisibleForTesting
boolean shouldContinue(HttpCommand command, HttpResponse response) {
boolean shouldContinue = false; boolean shouldContinue = false;
if (retryHandler.shouldRetryRequest(command, response)) { if (retryHandler.shouldRetryRequest(command, response)) {
shouldContinue = true; shouldContinue = true;
} else { } else {
errorHandler.handleError(command, response); errorHandler.handleError(command, response);
} }
// At this point we are going to send a new request or we have just handled the error, so
// we should make sure that any open stream is closed.
releasePayload(response);
return shouldContinue; return shouldContinue;
} }

View File

@ -18,7 +18,6 @@ package org.jclouds.http.internal;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Throwables.propagate; import static com.google.common.base.Throwables.propagate;
import static com.google.common.io.ByteStreams.toByteArray;
import static com.google.common.net.HttpHeaders.CONTENT_LENGTH; import static com.google.common.net.HttpHeaders.CONTENT_LENGTH;
import static com.google.common.net.HttpHeaders.HOST; import static com.google.common.net.HttpHeaders.HOST;
import static com.google.common.net.HttpHeaders.USER_AGENT; import static com.google.common.net.HttpHeaders.USER_AGENT;
@ -26,7 +25,6 @@ import static org.jclouds.http.HttpUtils.filterOutContentHeaders;
import static org.jclouds.io.Payloads.newInputStreamPayload; import static org.jclouds.io.Payloads.newInputStreamPayload;
import static org.jclouds.util.Closeables2.closeQuietly; import static org.jclouds.util.Closeables2.closeQuietly;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.lang.reflect.Field; import java.lang.reflect.Field;
@ -96,7 +94,7 @@ public class JavaUrlHttpCommandExecutorService extends BaseHttpCommandExecutorSe
try { try {
in = connection.getInputStream(); in = connection.getInputStream();
} catch (IOException e) { } catch (IOException e) {
in = bufferAndCloseStream(connection.getErrorStream()); in = connection.getErrorStream();
} catch (RuntimeException e) { } catch (RuntimeException e) {
closeQuietly(in); closeQuietly(in);
throw e; throw e;
@ -127,18 +125,6 @@ public class JavaUrlHttpCommandExecutorService extends BaseHttpCommandExecutorSe
return builder.build(); return builder.build();
} }
private InputStream bufferAndCloseStream(InputStream inputStream) throws IOException {
InputStream in = null;
try {
if (inputStream != null) {
in = new ByteArrayInputStream(toByteArray(inputStream));
}
} finally {
closeQuietly(inputStream);
}
return in;
}
@Override @Override
protected HttpURLConnection convert(HttpRequest request) throws IOException, InterruptedException { protected HttpURLConnection convert(HttpRequest request) throws IOException, InterruptedException {
boolean chunked = "chunked".equals(request.getFirstHeaderOrNull("Transfer-Encoding")); boolean chunked = "chunked".equals(request.getFirstHeaderOrNull("Transfer-Encoding"));

View File

@ -19,6 +19,7 @@ package org.jclouds.http.handlers;
import static org.jclouds.reflect.Reflection2.method; import static org.jclouds.reflect.Reflection2.method;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -117,46 +118,38 @@ public class BackoffLimitedRetryHandlerTest {
} }
@Test @Test
void testClosesInputStream() throws InterruptedException, IOException, SecurityException, NoSuchMethodException { void testInputStreamIsNotClosed() throws SecurityException, NoSuchMethodException, IOException {
HttpCommand command = createCommand(); HttpCommand command = createCommand();
HttpResponse response = HttpResponse.builder().statusCode(500).build();
HttpResponse response = HttpResponse.builder().statusCode(400).build();
InputStream inputStream = new InputStream() { InputStream inputStream = new InputStream() {
boolean isOpen = true; int count = 2;
@Override @Override
public void close() { public void close() {
this.isOpen = false; fail("The retry handler should not close the response stream");
} }
int count = 1;
@Override @Override
public int read() throws IOException { public int read() throws IOException {
if (this.isOpen) return count < 0 ? -1 : --count;
return (count > -1) ? count-- : -1;
else
return -1;
} }
@Override @Override
public int available() throws IOException { public int available() throws IOException {
if (this.isOpen) return count < 0 ? 0 : count;
return count;
else
return 0;
} }
}; };
response.setPayload(Payloads.newInputStreamPayload(inputStream)); response.setPayload(Payloads.newInputStreamPayload(inputStream));
response.getPayload().getContentMetadata().setContentLength(1l); response.getPayload().getContentMetadata().setContentLength(1l);
assertEquals(response.getPayload().getInput().available(), 1); assertEquals(response.getPayload().openStream().available(), 2);
assertEquals(response.getPayload().getInput().read(), 1); assertEquals(response.getPayload().openStream().read(), 1);
handler.shouldRetryRequest(command, response); handler.shouldRetryRequest(command, response);
assertEquals(response.getPayload().getInput().available(), 0); assertEquals(response.getPayload().openStream().available(), 1);
assertEquals(response.getPayload().getInput().read(), -1); assertEquals(response.getPayload().openStream().read(), 0);
} }
private final Function<Invocation, HttpRequest> processor = ContextBuilder private final Function<Invocation, HttpRequest> processor = ContextBuilder

View File

@ -0,0 +1,280 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.http.internal;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.getCurrentArguments;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.jclouds.http.HttpUtils.closeClientButKeepContentStream;
import static org.jclouds.http.HttpUtils.releasePayload;
import static org.jclouds.io.Payloads.newInputStreamPayload;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.io.InputStream;
import javax.inject.Inject;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.jclouds.http.HttpCommand;
import org.jclouds.http.HttpRequest;
import org.jclouds.http.HttpResponse;
import org.jclouds.http.HttpUtils;
import org.jclouds.http.IOExceptionRetryHandler;
import org.jclouds.http.handlers.DelegatingErrorHandler;
import org.jclouds.http.handlers.DelegatingRetryHandler;
import org.jclouds.io.ContentMetadataCodec;
import org.jclouds.rest.internal.BaseHttpApiMetadata;
import org.testng.annotations.Test;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.name.Names;
@Test(groups = "unit", testName = "BaseHttpCommandExecutorServiceTest")
public class BaseHttpCommandExecutorServiceTest {
public void testStreamIsClosedWhenRetrying() throws IOException {
MockInputStream in = new MockInputStream(2); // Input stream that produces 2 bytes
HttpResponse response = HttpResponse.builder().payload(newInputStreamPayload(in)).build();
response.getPayload().getContentMetadata().setContentLength(1l);
HttpCommand command = mockHttpCommand();
DelegatingRetryHandler retryHandler = EasyMock.createMock(DelegatingRetryHandler.class);
DelegatingErrorHandler errorHandler = EasyMock.createMock(DelegatingErrorHandler.class);
expect(retryHandler.shouldRetryRequest(command, response)).andReturn(true);
replay(retryHandler, errorHandler);
// Verify the stream is open. This consumes one byte.
assertEquals(response.getPayload().openStream().available(), 2);
assertEquals(response.getPayload().openStream().read(), 1);
BaseHttpCommandExecutorService<?> service = mockHttpCommandExecutorService(retryHandler, errorHandler);
assertTrue(service.shouldContinue(command, response));
verify(retryHandler, errorHandler);
// Verify that the response stream is closed and consumed
assertFalse(in.isOpen);
assertTrue(response.getPayload().openStream() == in); // The service shouldn't have changed it
assertEquals(response.getPayload().openStream().available(), 0);
assertEquals(response.getPayload().openStream().read(), -1);
}
public void testStreamIsClosedWhenNotRetrying() throws IOException {
MockInputStream in = new MockInputStream(2); // Input stream that produces 2 bytes
HttpResponse response = HttpResponse.builder().payload(newInputStreamPayload(in)).build();
response.getPayload().getContentMetadata().setContentLength(1l);
HttpCommand command = mockHttpCommand();
DelegatingRetryHandler retryHandler = EasyMock.createMock(DelegatingRetryHandler.class);
DelegatingErrorHandler errorHandler = EasyMock.createMock(DelegatingErrorHandler.class);
errorHandler.handleError(command, response);
expectLastCall();
expect(retryHandler.shouldRetryRequest(command, response)).andReturn(false);
replay(retryHandler, errorHandler);
// Verify the stream is open. This consumes one byte.
assertEquals(response.getPayload().openStream().available(), 2);
assertEquals(response.getPayload().openStream().read(), 1);
BaseHttpCommandExecutorService<?> service = mockHttpCommandExecutorService(retryHandler, errorHandler);
assertFalse(service.shouldContinue(command, response));
verify(retryHandler, errorHandler);
// Verify that the response stream is closed
assertFalse(in.isOpen);
assertTrue(response.getPayload().openStream() == in);
assertEquals(response.getPayload().openStream().available(), 0);
assertEquals(response.getPayload().openStream().read(), -1);
}
public void testStreamIsClosedAndBufferedInTheErrorHandlerWhenNotRetrying() throws IOException {
MockInputStream in = new MockInputStream(2); // Input stream that produces 2 bytes
HttpResponse response = HttpResponse.builder().payload(newInputStreamPayload(in)).build();
response.getPayload().getContentMetadata().setContentLength(1l);
HttpCommand command = mockHttpCommand();
DelegatingRetryHandler retryHandler = EasyMock.createMock(DelegatingRetryHandler.class);
DelegatingErrorHandler errorHandler = EasyMock.createMock(DelegatingErrorHandler.class);
errorHandler.handleError(command, response);
expectLastCall().andAnswer(new IAnswer<Void>() {
@Override
public Void answer() throws Throwable {
// This error handler will close the original stream and buffer it into memory
HttpResponse response = (HttpResponse) getCurrentArguments()[1];
closeClientButKeepContentStream(response);
return null;
}
});
expect(retryHandler.shouldRetryRequest(command, response)).andReturn(false);
replay(retryHandler, errorHandler);
// Verify the stream is open. This consumes one byte.
assertEquals(response.getPayload().openStream().available(), 2);
assertEquals(response.getPayload().openStream().read(), 1);
BaseHttpCommandExecutorService<?> service = mockHttpCommandExecutorService(retryHandler, errorHandler);
assertFalse(service.shouldContinue(command, response));
verify(retryHandler, errorHandler);
// Verify that the original response stream is closed and consumed
assertFalse(in.isOpen);
assertEquals(in.available(), 0);
assertEquals(in.read(), -1);
// Verify that the buffered stream is now repeatable and we can read the bytes that still have not
// been consumed from the original stream
assertTrue(response.getPayload().isRepeatable());
assertEquals(response.getPayload().openStream().available(), 1);
assertEquals(response.getPayload().openStream().read(), 0);
}
public void testCloseStreamCanBeCalledMoreThanOnce() throws IOException {
MockInputStream in = new MockInputStream(2); // Input stream that produces 2 bytes
HttpResponse response = HttpResponse.builder().payload(newInputStreamPayload(in)).build();
response.getPayload().getContentMetadata().setContentLength(1l);
HttpCommand command = mockHttpCommand();
DelegatingRetryHandler retryHandler = EasyMock.createMock(DelegatingRetryHandler.class);
DelegatingErrorHandler errorHandler = EasyMock.createMock(DelegatingErrorHandler.class);
errorHandler.handleError(command, response);
expectLastCall().andAnswer(new IAnswer<Void>() {
@Override
public Void answer() throws Throwable {
// This error handler will close the original stream
HttpResponse response = (HttpResponse) getCurrentArguments()[1];
releasePayload(response);
return null;
}
});
expect(retryHandler.shouldRetryRequest(command, response)).andReturn(false);
replay(retryHandler, errorHandler);
// Verify the stream is open. This consumes one byte.
assertEquals(response.getPayload().openStream().available(), 2);
assertEquals(response.getPayload().openStream().read(), 1);
BaseHttpCommandExecutorService<?> service = mockHttpCommandExecutorService(retryHandler, errorHandler);
assertFalse(service.shouldContinue(command, response));
verify(retryHandler, errorHandler);
// Verify that the response stream is closed and consumed
assertFalse(in.isOpen);
assertEquals(in.closeCount, 2); // The stream has been closed twice, but the IOException should not propagated
assertTrue(response.getPayload().openStream() == in); // The service shouldn't have changed it
assertEquals(response.getPayload().openStream().available(), 0);
assertEquals(response.getPayload().openStream().read(), -1);
}
private HttpCommand mockHttpCommand() {
return new HttpCommand(HttpRequest.builder().endpoint("http://localhost").method("mock").build());
}
private BaseHttpCommandExecutorService<?> mockHttpCommandExecutorService(final DelegatingRetryHandler retryHandler,
final DelegatingErrorHandler errorHandler) {
Injector injector = Guice.createInjector(new AbstractModule() {
@Override
protected void configure() {
Names.bindProperties(binder(), BaseHttpApiMetadata.defaultProperties());
bind(DelegatingRetryHandler.class).toInstance(retryHandler);
bind(DelegatingErrorHandler.class).toInstance(errorHandler);
bind(BaseHttpCommandExecutorService.class).to(MockHttpCommandExecutorService.class);
}
});
return injector.getInstance(BaseHttpCommandExecutorService.class);
}
private static class MockInputStream extends InputStream {
boolean isOpen = true;
int count;
int closeCount = 0;
public MockInputStream(int count) {
this.count = count;
}
@Override
public void close() throws IOException {
this.closeCount++;
if (!this.isOpen) {
throw new IOException("The stream is already closed");
}
this.isOpen = false;
}
@Override
public int read() throws IOException {
if (this.isOpen)
return (count > 0) ? --count : -1;
else
return -1;
}
@Override
public int available() throws IOException {
if (this.isOpen)
return count;
else
return 0;
}
}
private static class MockHttpCommandExecutorService extends BaseHttpCommandExecutorService<Object> {
@Inject
MockHttpCommandExecutorService(HttpUtils utils, ContentMetadataCodec contentMetadataCodec,
DelegatingRetryHandler retryHandler, IOExceptionRetryHandler ioRetryHandler,
DelegatingErrorHandler errorHandler, HttpWire wire) {
super(utils, contentMetadataCodec, retryHandler, ioRetryHandler, errorHandler, wire);
}
@Override
protected Object convert(HttpRequest request) throws IOException, InterruptedException {
return null;
}
@Override
protected HttpResponse invoke(Object nativeRequest) throws IOException, InterruptedException {
return null;
}
@Override
protected void cleanup(Object nativeRequest) {
}
}
}