diff --git a/core/src/main/java/org/jclouds/http/functions/ParseSax.java b/core/src/main/java/org/jclouds/http/functions/ParseSax.java index 59e91ceeda..726cdae4c0 100644 --- a/core/src/main/java/org/jclouds/http/functions/ParseSax.java +++ b/core/src/main/java/org/jclouds/http/functions/ParseSax.java @@ -30,13 +30,11 @@ import org.jclouds.http.HttpResponse; import org.jclouds.logging.Logger; import org.jclouds.rest.InvocationContext; import org.jclouds.rest.internal.GeneratedHttpRequest; -import org.xml.sax.ContentHandler; import org.xml.sax.InputSource; import org.xml.sax.XMLReader; import org.xml.sax.helpers.DefaultHandler; import com.google.common.base.Function; -import com.google.common.base.Throwables; import com.google.common.io.Closeables; /** @@ -51,6 +49,7 @@ public class ParseSax implements Function, InvocationContext private final HandlerWithResult handler; @Resource protected Logger logger = Logger.NULL; + private GeneratedHttpRequest request; public static interface Factory { ParseSax create(HandlerWithResult handler); @@ -63,39 +62,27 @@ public class ParseSax implements Function, InvocationContext } public T apply(HttpResponse from) throws HttpException { - InputStream input = null; - try { - input = from.getContent(); - if (input != null) { - return parse(input); - } else { - throw new HttpException("No input to parse"); - } - } catch (Exception e) { - Throwables.propagateIfPossible(e, HttpException.class); - throw new HttpException("Error parsing input for " + from, e); - } + return parse(from.getContent()); } - public T parse(InputStream xml) throws HttpException { - parseAndCloseStream(xml, getHandler()); - return getHandler().getResult(); - } - - private void parseAndCloseStream(InputStream xml, ContentHandler handler) throws HttpException { + public T parse(InputStream from) throws HttpException { + if (from == null) + throw new HttpException("No input to parse"); try { - parser.setContentHandler(handler); + parser.setContentHandler(getHandler()); // This method should accept documents with a BOM (Byte-order mark) - InputSource input = new InputSource(xml); - parser.parse(input); + parser.parse(new InputSource(from)); + return getHandler().getResult(); } catch (Exception e) { StringBuilder message = new StringBuilder(); - message.append("Error parsing input for ").append(handler); + if (request != null) + message.append("Error parsing input for ").append(request.getRequestLine()) + .append(": "); + message.append(e.getMessage()); logger.error(e, message.toString()); - Throwables.propagateIfPossible(e, HttpException.class); throw new HttpException(message.toString(), e); } finally { - Closeables.closeQuietly(xml); + Closeables.closeQuietly(from); } } @@ -111,9 +98,7 @@ public class ParseSax implements Function, InvocationContext public abstract static class HandlerWithResult extends DefaultHandler implements InvocationContext { protected GeneratedHttpRequest request; - public abstract T getResult(); - @Override public void setContext(GeneratedHttpRequest request) { this.request = request; @@ -123,5 +108,6 @@ public class ParseSax implements Function, InvocationContext @Override public void setContext(GeneratedHttpRequest request) { handler.setContext(request); + this.request = request; } } diff --git a/core/src/main/java/org/jclouds/http/internal/HttpWire.java b/core/src/main/java/org/jclouds/http/internal/HttpWire.java index 37bb8b15fa..9bbddd8645 100644 --- a/core/src/main/java/org/jclouds/http/internal/HttpWire.java +++ b/core/src/main/java/org/jclouds/http/internal/HttpWire.java @@ -18,10 +18,7 @@ */ package org.jclouds.http.internal; -import java.util.concurrent.ExecutorService; - import javax.annotation.Resource; -import javax.inject.Inject; import javax.inject.Named; import org.jclouds.http.HttpConstants; @@ -39,11 +36,6 @@ public class HttpWire extends Wire { @Resource @Named(HttpConstants.LOGGER_HTTP_WIRE) Logger wireLog = Logger.NULL; - - @Inject - public HttpWire(ExecutorService exec) { - super(exec); - } public Logger getWireLog() { return wireLog; diff --git a/core/src/main/java/org/jclouds/http/internal/SignatureWire.java b/core/src/main/java/org/jclouds/http/internal/SignatureWire.java index bb0ef0fc70..78e86cdf3c 100644 --- a/core/src/main/java/org/jclouds/http/internal/SignatureWire.java +++ b/core/src/main/java/org/jclouds/http/internal/SignatureWire.java @@ -18,10 +18,7 @@ */ package org.jclouds.http.internal; -import java.util.concurrent.ExecutorService; - import javax.annotation.Resource; -import javax.inject.Inject; import javax.inject.Named; import org.jclouds.http.HttpConstants; @@ -39,11 +36,6 @@ public class SignatureWire extends Wire { @Resource @Named(HttpConstants.LOGGER_SIGNATURE) Logger signatureLog = Logger.NULL; - - @Inject - public SignatureWire(ExecutorService exec) { - super(exec); - } public Logger getWireLog() { return signatureLog; diff --git a/core/src/main/java/org/jclouds/logging/internal/Wire.java b/core/src/main/java/org/jclouds/logging/internal/Wire.java index 2d4a97a3e9..6c5ade98a8 100644 --- a/core/src/main/java/org/jclouds/logging/internal/Wire.java +++ b/core/src/main/java/org/jclouds/logging/internal/Wire.java @@ -26,19 +26,15 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.util.concurrent.ExecutorService; import javax.annotation.Resource; -import javax.inject.Inject; -import org.jclouds.concurrent.SingleThreaded; -import org.jclouds.io.TeeInputStream; import org.jclouds.logging.Logger; import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; +import com.google.common.io.FileBackedOutputStream; + /** * Logs data to the wire LOG. * @@ -49,12 +45,7 @@ public abstract class Wire { @Resource protected Logger logger = Logger.NULL; - protected final ExecutorService exec; - @Inject - public Wire(ExecutorService exec) { - this.exec = checkNotNull(exec, "executor"); - } protected abstract Logger getWireLog(); private void wire(String header, InputStream instream) { @@ -94,38 +85,23 @@ public abstract class Wire { } public InputStream copy(final String header, InputStream instream) { + int limit = 256 * 1024; + FileBackedOutputStream out = null; try { - byte[] data = ByteStreams.toByteArray(instream); - wire(header, new ByteArrayInputStream(data)); - return new ByteArrayInputStream(data); + out = new FileBackedOutputStream(limit); + long bytesRead = ByteStreams.copy(instream, out); + if (bytesRead >= limit) + logger.warn("over limit %d/%d: wrote temp file", bytesRead, limit); + wire(header, out.getSupplier().getInput()); + return out.getSupplier().getInput(); } catch (IOException e) { throw new RuntimeException("Error tapping line", e); } finally { + Closeables.closeQuietly(out); Closeables.closeQuietly(instream); } } - public InputStream tapAsynch(final String header, InputStream instream) { - PipedOutputStream out = new PipedOutputStream(); - InputStream toReturn = new TeeInputStream(instream, out, true); - final InputStream line; - try { - line = new PipedInputStream(out); - exec.submit(new Runnable() { - public void run() { - try { - wire(header, line); - } finally { - Closeables.closeQuietly(line); - } - } - }); - } catch (IOException e) { - logger.error(e, "Error tapping line"); - } - return toReturn; - } - public InputStream input(InputStream instream) { return copy("<< ", checkNotNull(instream, "input")); } @@ -134,10 +110,7 @@ public abstract class Wire { public T output(T data) { checkNotNull(data, "data"); if (data instanceof InputStream) { - if (exec.getClass().isAnnotationPresent(SingleThreaded.class)) - return (T) copy(">> ", (InputStream) data); - else - return (T) tapAsynch(">> ", (InputStream) data); + return (T) copy(">> ", (InputStream) data); } else if (data instanceof byte[]) { output((byte[]) data); return data; @@ -154,19 +127,15 @@ public abstract class Wire { private void output(final File out) { checkNotNull(out, "output"); - exec.submit(new Runnable() { - public void run() { - InputStream in = null; - try { - in = new FileInputStream(out); - wire(">> ", in); - } catch (FileNotFoundException e) { - logger.error(e, "Error tapping file: %s", out); - } finally { - Closeables.closeQuietly(in); - } - } - }); + InputStream in = null; + try { + in = new FileInputStream(out); + wire(">> ", in); + } catch (FileNotFoundException e) { + logger.error(e, "Error tapping file: %s", out); + } finally { + Closeables.closeQuietly(in); + } } private void output(byte[] b) { diff --git a/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java b/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java index bcda844e88..16dc954bb0 100644 --- a/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java +++ b/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java @@ -92,7 +92,7 @@ public class BackoffLimitedRetryHandlerTest { ExecutorService execService = Executors.newCachedThreadPool(); JavaUrlHttpCommandExecutorService httpService = new JavaUrlHttpCommandExecutorService( execService, new DelegatingRetryHandler(), new DelegatingErrorHandler(), - new HttpWire(Executors.newCachedThreadPool())); + new HttpWire()); executorService = new TransformingHttpCommandExecutorServiceImpl(httpService, execService); } diff --git a/core/src/test/java/org/jclouds/http/internal/WireLiveTest.java b/core/src/test/java/org/jclouds/http/internal/WireLiveTest.java index 7082ca6a99..4638a6997f 100644 --- a/core/src/test/java/org/jclouds/http/internal/WireLiveTest.java +++ b/core/src/test/java/org/jclouds/http/internal/WireLiveTest.java @@ -19,7 +19,6 @@ package org.jclouds.http.internal; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.util.concurrent.Executors.sameThreadExecutor; import static java.util.concurrent.Executors.newCachedThreadPool; import static org.testng.Assert.assertEquals; @@ -29,7 +28,6 @@ import java.io.InputStream; import java.net.URL; import java.net.URLConnection; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.jclouds.encryption.EncryptionService; @@ -126,17 +124,15 @@ public class WireLiveTest { } public static HttpWire setUp() throws Exception { - ExecutorService service = newCachedThreadPool(); BufferLogger bufferLogger = new BufferLogger(); - HttpWire wire = new HttpWire(service); + HttpWire wire = new HttpWire(); wire.wireLog = (bufferLogger); return wire; } public HttpWire setUpSynch() throws Exception { - ExecutorService service = sameThreadExecutor(); BufferLogger bufferLogger = new BufferLogger(); - HttpWire wire = new HttpWire(service); + HttpWire wire = new HttpWire(); wire.wireLog = (bufferLogger); return wire; } diff --git a/core/src/test/java/org/jclouds/http/internal/WireTest.java b/core/src/test/java/org/jclouds/http/internal/WireTest.java index 1e961c6129..0338a207d5 100644 --- a/core/src/test/java/org/jclouds/http/internal/WireTest.java +++ b/core/src/test/java/org/jclouds/http/internal/WireTest.java @@ -18,13 +18,10 @@ */ package org.jclouds.http.internal; -import static com.google.common.util.concurrent.Executors.sameThreadExecutor; import static org.testng.Assert.assertEquals; import java.io.ByteArrayInputStream; import java.io.InputStream; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.jclouds.logging.Logger; import org.jclouds.util.Utils; @@ -89,17 +86,15 @@ public class WireTest { } public HttpWire setUp() throws Exception { - ExecutorService service = Executors.newCachedThreadPool(); BufferLogger bufferLogger = new BufferLogger(); - HttpWire wire = new HttpWire(service); + HttpWire wire = new HttpWire(); wire.wireLog = (bufferLogger); return wire; } public HttpWire setUpSynch() throws Exception { - ExecutorService service = sameThreadExecutor(); BufferLogger bufferLogger = new BufferLogger(); - HttpWire wire = new HttpWire(service); + HttpWire wire = new HttpWire(); wire.wireLog = (bufferLogger); return wire; }