From ec63b55a04afdd5a970d8b05674deb4c3802c59c Mon Sep 17 00:00:00 2001 From: Ignasi Barrera Date: Mon, 9 Mar 2015 12:03:18 +0100 Subject: [PATCH] JCLOUDS-532: Properly close HTTP streams --- .../handlers/BackoffLimitedRetryHandler.java | 2 - .../BaseHttpCommandExecutorService.java | 9 +- .../JavaUrlHttpCommandExecutorService.java | 16 +- .../BackoffLimitedRetryHandlerTest.java | 31 +- .../BaseHttpCommandExecutorServiceTest.java | 280 ++++++++++++++++++ 5 files changed, 301 insertions(+), 37 deletions(-) create mode 100644 core/src/test/java/org/jclouds/http/internal/BaseHttpCommandExecutorServiceTest.java diff --git a/core/src/main/java/org/jclouds/http/handlers/BackoffLimitedRetryHandler.java b/core/src/main/java/org/jclouds/http/handlers/BackoffLimitedRetryHandler.java index 481cde21a1..978406ca09 100644 --- a/core/src/main/java/org/jclouds/http/handlers/BackoffLimitedRetryHandler.java +++ b/core/src/main/java/org/jclouds/http/handlers/BackoffLimitedRetryHandler.java @@ -17,7 +17,6 @@ package org.jclouds.http.handlers; import static java.lang.Math.max; -import static org.jclouds.http.HttpUtils.releasePayload; import java.io.IOException; import java.util.Random; @@ -95,7 +94,6 @@ public class BackoffLimitedRetryHandler implements HttpRetryHandler, IOException } public boolean shouldRetryRequest(HttpCommand command, HttpResponse response) { - releasePayload(response); return ifReplayableBackoffAndReturnTrue(command); } diff --git a/core/src/main/java/org/jclouds/http/internal/BaseHttpCommandExecutorService.java b/core/src/main/java/org/jclouds/http/internal/BaseHttpCommandExecutorService.java index 1eda6a9cee..e77065877f 100644 --- a/core/src/main/java/org/jclouds/http/internal/BaseHttpCommandExecutorService.java +++ b/core/src/main/java/org/jclouds/http/internal/BaseHttpCommandExecutorService.java @@ -19,6 +19,7 @@ package org.jclouds.http.internal; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Throwables.propagate; import static org.jclouds.http.HttpUtils.checkRequestHasContentLengthOrChunkedEncoding; +import static org.jclouds.http.HttpUtils.releasePayload; import static org.jclouds.http.HttpUtils.wirePayloadIfEnabled; import static org.jclouds.util.Throwables2.getFirstThrowableOfType; @@ -42,6 +43,8 @@ import org.jclouds.http.handlers.DelegatingRetryHandler; import org.jclouds.io.ContentMetadataCodec; import org.jclouds.logging.Logger; +import com.google.common.annotations.VisibleForTesting; + public abstract class BaseHttpCommandExecutorService implements HttpCommandExecutorService { protected final HttpUtils utils; protected final ContentMetadataCodec contentMetadataCodec; @@ -120,13 +123,17 @@ public abstract class BaseHttpCommandExecutorService implements HttpCommandEx return response; } - private boolean shouldContinue(HttpCommand command, HttpResponse response) { + @VisibleForTesting + boolean shouldContinue(HttpCommand command, HttpResponse response) { boolean shouldContinue = false; if (retryHandler.shouldRetryRequest(command, response)) { shouldContinue = true; } else { errorHandler.handleError(command, response); } + // At this point we are going to send a new request or we have just handled the error, so + // we should make sure that any open stream is closed. + releasePayload(response); return shouldContinue; } diff --git a/core/src/main/java/org/jclouds/http/internal/JavaUrlHttpCommandExecutorService.java b/core/src/main/java/org/jclouds/http/internal/JavaUrlHttpCommandExecutorService.java index bafe7f377e..ce36149884 100644 --- a/core/src/main/java/org/jclouds/http/internal/JavaUrlHttpCommandExecutorService.java +++ b/core/src/main/java/org/jclouds/http/internal/JavaUrlHttpCommandExecutorService.java @@ -18,7 +18,6 @@ package org.jclouds.http.internal; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Throwables.propagate; -import static com.google.common.io.ByteStreams.toByteArray; import static com.google.common.net.HttpHeaders.CONTENT_LENGTH; import static com.google.common.net.HttpHeaders.HOST; import static com.google.common.net.HttpHeaders.USER_AGENT; @@ -26,7 +25,6 @@ import static org.jclouds.http.HttpUtils.filterOutContentHeaders; import static org.jclouds.io.Payloads.newInputStreamPayload; import static org.jclouds.util.Closeables2.closeQuietly; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; @@ -96,7 +94,7 @@ public class JavaUrlHttpCommandExecutorService extends BaseHttpCommandExecutorSe try { in = connection.getInputStream(); } catch (IOException e) { - in = bufferAndCloseStream(connection.getErrorStream()); + in = connection.getErrorStream(); } catch (RuntimeException e) { closeQuietly(in); throw e; @@ -127,18 +125,6 @@ public class JavaUrlHttpCommandExecutorService extends BaseHttpCommandExecutorSe return builder.build(); } - private InputStream bufferAndCloseStream(InputStream inputStream) throws IOException { - InputStream in = null; - try { - if (inputStream != null) { - in = new ByteArrayInputStream(toByteArray(inputStream)); - } - } finally { - closeQuietly(inputStream); - } - return in; - } - @Override protected HttpURLConnection convert(HttpRequest request) throws IOException, InterruptedException { boolean chunked = "chunked".equals(request.getFirstHeaderOrNull("Transfer-Encoding")); diff --git a/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java b/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java index 4e2bfe640a..cc25c27207 100644 --- a/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java +++ b/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java @@ -19,6 +19,7 @@ package org.jclouds.http.handlers; import static org.jclouds.reflect.Reflection2.method; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import java.io.IOException; import java.io.InputStream; @@ -117,46 +118,38 @@ public class BackoffLimitedRetryHandlerTest { } @Test - void testClosesInputStream() throws InterruptedException, IOException, SecurityException, NoSuchMethodException { + void testInputStreamIsNotClosed() throws SecurityException, NoSuchMethodException, IOException { HttpCommand command = createCommand(); - - HttpResponse response = HttpResponse.builder().statusCode(400).build(); + HttpResponse response = HttpResponse.builder().statusCode(500).build(); InputStream inputStream = new InputStream() { - boolean isOpen = true; + int count = 2; @Override public void close() { - this.isOpen = false; + fail("The retry handler should not close the response stream"); } - int count = 1; - @Override public int read() throws IOException { - if (this.isOpen) - return (count > -1) ? count-- : -1; - else - return -1; + return count < 0 ? -1 : --count; } @Override public int available() throws IOException { - if (this.isOpen) - return count; - else - return 0; + return count < 0 ? 0 : count; } }; + response.setPayload(Payloads.newInputStreamPayload(inputStream)); response.getPayload().getContentMetadata().setContentLength(1l); - assertEquals(response.getPayload().getInput().available(), 1); - assertEquals(response.getPayload().getInput().read(), 1); + assertEquals(response.getPayload().openStream().available(), 2); + assertEquals(response.getPayload().openStream().read(), 1); handler.shouldRetryRequest(command, response); - assertEquals(response.getPayload().getInput().available(), 0); - assertEquals(response.getPayload().getInput().read(), -1); + assertEquals(response.getPayload().openStream().available(), 1); + assertEquals(response.getPayload().openStream().read(), 0); } private final Function processor = ContextBuilder diff --git a/core/src/test/java/org/jclouds/http/internal/BaseHttpCommandExecutorServiceTest.java b/core/src/test/java/org/jclouds/http/internal/BaseHttpCommandExecutorServiceTest.java new file mode 100644 index 0000000000..8fa399ec24 --- /dev/null +++ b/core/src/test/java/org/jclouds/http/internal/BaseHttpCommandExecutorServiceTest.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jclouds.http.internal; + +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.getCurrentArguments; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.jclouds.http.HttpUtils.closeClientButKeepContentStream; +import static org.jclouds.http.HttpUtils.releasePayload; +import static org.jclouds.io.Payloads.newInputStreamPayload; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import java.io.IOException; +import java.io.InputStream; + +import javax.inject.Inject; + +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.jclouds.http.HttpCommand; +import org.jclouds.http.HttpRequest; +import org.jclouds.http.HttpResponse; +import org.jclouds.http.HttpUtils; +import org.jclouds.http.IOExceptionRetryHandler; +import org.jclouds.http.handlers.DelegatingErrorHandler; +import org.jclouds.http.handlers.DelegatingRetryHandler; +import org.jclouds.io.ContentMetadataCodec; +import org.jclouds.rest.internal.BaseHttpApiMetadata; +import org.testng.annotations.Test; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.name.Names; + +@Test(groups = "unit", testName = "BaseHttpCommandExecutorServiceTest") +public class BaseHttpCommandExecutorServiceTest { + + public void testStreamIsClosedWhenRetrying() throws IOException { + MockInputStream in = new MockInputStream(2); // Input stream that produces 2 bytes + HttpResponse response = HttpResponse.builder().payload(newInputStreamPayload(in)).build(); + response.getPayload().getContentMetadata().setContentLength(1l); + HttpCommand command = mockHttpCommand(); + + DelegatingRetryHandler retryHandler = EasyMock.createMock(DelegatingRetryHandler.class); + DelegatingErrorHandler errorHandler = EasyMock.createMock(DelegatingErrorHandler.class); + + expect(retryHandler.shouldRetryRequest(command, response)).andReturn(true); + replay(retryHandler, errorHandler); + + // Verify the stream is open. This consumes one byte. + assertEquals(response.getPayload().openStream().available(), 2); + assertEquals(response.getPayload().openStream().read(), 1); + + BaseHttpCommandExecutorService service = mockHttpCommandExecutorService(retryHandler, errorHandler); + assertTrue(service.shouldContinue(command, response)); + + verify(retryHandler, errorHandler); + + // Verify that the response stream is closed and consumed + assertFalse(in.isOpen); + assertTrue(response.getPayload().openStream() == in); // The service shouldn't have changed it + assertEquals(response.getPayload().openStream().available(), 0); + assertEquals(response.getPayload().openStream().read(), -1); + } + + public void testStreamIsClosedWhenNotRetrying() throws IOException { + MockInputStream in = new MockInputStream(2); // Input stream that produces 2 bytes + HttpResponse response = HttpResponse.builder().payload(newInputStreamPayload(in)).build(); + response.getPayload().getContentMetadata().setContentLength(1l); + HttpCommand command = mockHttpCommand(); + + DelegatingRetryHandler retryHandler = EasyMock.createMock(DelegatingRetryHandler.class); + DelegatingErrorHandler errorHandler = EasyMock.createMock(DelegatingErrorHandler.class); + + errorHandler.handleError(command, response); + expectLastCall(); + expect(retryHandler.shouldRetryRequest(command, response)).andReturn(false); + replay(retryHandler, errorHandler); + + // Verify the stream is open. This consumes one byte. + assertEquals(response.getPayload().openStream().available(), 2); + assertEquals(response.getPayload().openStream().read(), 1); + + BaseHttpCommandExecutorService service = mockHttpCommandExecutorService(retryHandler, errorHandler); + assertFalse(service.shouldContinue(command, response)); + + verify(retryHandler, errorHandler); + + // Verify that the response stream is closed + assertFalse(in.isOpen); + assertTrue(response.getPayload().openStream() == in); + assertEquals(response.getPayload().openStream().available(), 0); + assertEquals(response.getPayload().openStream().read(), -1); + } + + public void testStreamIsClosedAndBufferedInTheErrorHandlerWhenNotRetrying() throws IOException { + MockInputStream in = new MockInputStream(2); // Input stream that produces 2 bytes + HttpResponse response = HttpResponse.builder().payload(newInputStreamPayload(in)).build(); + response.getPayload().getContentMetadata().setContentLength(1l); + HttpCommand command = mockHttpCommand(); + + DelegatingRetryHandler retryHandler = EasyMock.createMock(DelegatingRetryHandler.class); + DelegatingErrorHandler errorHandler = EasyMock.createMock(DelegatingErrorHandler.class); + + errorHandler.handleError(command, response); + expectLastCall().andAnswer(new IAnswer() { + @Override + public Void answer() throws Throwable { + // This error handler will close the original stream and buffer it into memory + HttpResponse response = (HttpResponse) getCurrentArguments()[1]; + closeClientButKeepContentStream(response); + return null; + } + }); + + expect(retryHandler.shouldRetryRequest(command, response)).andReturn(false); + replay(retryHandler, errorHandler); + + // Verify the stream is open. This consumes one byte. + assertEquals(response.getPayload().openStream().available(), 2); + assertEquals(response.getPayload().openStream().read(), 1); + + BaseHttpCommandExecutorService service = mockHttpCommandExecutorService(retryHandler, errorHandler); + assertFalse(service.shouldContinue(command, response)); + + verify(retryHandler, errorHandler); + + // Verify that the original response stream is closed and consumed + assertFalse(in.isOpen); + assertEquals(in.available(), 0); + assertEquals(in.read(), -1); + + // Verify that the buffered stream is now repeatable and we can read the bytes that still have not + // been consumed from the original stream + assertTrue(response.getPayload().isRepeatable()); + assertEquals(response.getPayload().openStream().available(), 1); + assertEquals(response.getPayload().openStream().read(), 0); + } + + public void testCloseStreamCanBeCalledMoreThanOnce() throws IOException { + MockInputStream in = new MockInputStream(2); // Input stream that produces 2 bytes + HttpResponse response = HttpResponse.builder().payload(newInputStreamPayload(in)).build(); + response.getPayload().getContentMetadata().setContentLength(1l); + HttpCommand command = mockHttpCommand(); + + DelegatingRetryHandler retryHandler = EasyMock.createMock(DelegatingRetryHandler.class); + DelegatingErrorHandler errorHandler = EasyMock.createMock(DelegatingErrorHandler.class); + + errorHandler.handleError(command, response); + expectLastCall().andAnswer(new IAnswer() { + @Override + public Void answer() throws Throwable { + // This error handler will close the original stream + HttpResponse response = (HttpResponse) getCurrentArguments()[1]; + releasePayload(response); + return null; + } + }); + + expect(retryHandler.shouldRetryRequest(command, response)).andReturn(false); + replay(retryHandler, errorHandler); + + // Verify the stream is open. This consumes one byte. + assertEquals(response.getPayload().openStream().available(), 2); + assertEquals(response.getPayload().openStream().read(), 1); + + BaseHttpCommandExecutorService service = mockHttpCommandExecutorService(retryHandler, errorHandler); + assertFalse(service.shouldContinue(command, response)); + + verify(retryHandler, errorHandler); + + // Verify that the response stream is closed and consumed + assertFalse(in.isOpen); + assertEquals(in.closeCount, 2); // The stream has been closed twice, but the IOException should not propagated + assertTrue(response.getPayload().openStream() == in); // The service shouldn't have changed it + assertEquals(response.getPayload().openStream().available(), 0); + assertEquals(response.getPayload().openStream().read(), -1); + } + + private HttpCommand mockHttpCommand() { + return new HttpCommand(HttpRequest.builder().endpoint("http://localhost").method("mock").build()); + } + + private BaseHttpCommandExecutorService mockHttpCommandExecutorService(final DelegatingRetryHandler retryHandler, + final DelegatingErrorHandler errorHandler) { + Injector injector = Guice.createInjector(new AbstractModule() { + @Override + protected void configure() { + Names.bindProperties(binder(), BaseHttpApiMetadata.defaultProperties()); + bind(DelegatingRetryHandler.class).toInstance(retryHandler); + bind(DelegatingErrorHandler.class).toInstance(errorHandler); + bind(BaseHttpCommandExecutorService.class).to(MockHttpCommandExecutorService.class); + } + }); + + return injector.getInstance(BaseHttpCommandExecutorService.class); + } + + private static class MockInputStream extends InputStream { + boolean isOpen = true; + int count; + int closeCount = 0; + + public MockInputStream(int count) { + this.count = count; + } + + @Override + public void close() throws IOException { + this.closeCount++; + if (!this.isOpen) { + throw new IOException("The stream is already closed"); + } + this.isOpen = false; + } + + @Override + public int read() throws IOException { + if (this.isOpen) + return (count > 0) ? --count : -1; + else + return -1; + } + + @Override + public int available() throws IOException { + if (this.isOpen) + return count; + else + return 0; + } + + } + + private static class MockHttpCommandExecutorService extends BaseHttpCommandExecutorService { + + @Inject + MockHttpCommandExecutorService(HttpUtils utils, ContentMetadataCodec contentMetadataCodec, + DelegatingRetryHandler retryHandler, IOExceptionRetryHandler ioRetryHandler, + DelegatingErrorHandler errorHandler, HttpWire wire) { + super(utils, contentMetadataCodec, retryHandler, ioRetryHandler, errorHandler, wire); + } + + @Override + protected Object convert(HttpRequest request) throws IOException, InterruptedException { + return null; + } + + @Override + protected HttpResponse invoke(Object nativeRequest) throws IOException, InterruptedException { + return null; + } + + @Override + protected void cleanup(Object nativeRequest) { + + } + + } + +}