fixed wire logging

git-svn-id: http://jclouds.googlecode.com/svn/trunk@2671 3d8758e0-26b5-11de-8745-db77d3ebf521
This commit is contained in:
adrian.f.cole 2010-01-19 07:05:59 +00:00
parent 969bb8ea38
commit 49f14ca0e8
7 changed files with 40 additions and 110 deletions

View File

@ -30,13 +30,11 @@ import org.jclouds.http.HttpResponse;
import org.jclouds.logging.Logger; import org.jclouds.logging.Logger;
import org.jclouds.rest.InvocationContext; import org.jclouds.rest.InvocationContext;
import org.jclouds.rest.internal.GeneratedHttpRequest; import org.jclouds.rest.internal.GeneratedHttpRequest;
import org.xml.sax.ContentHandler;
import org.xml.sax.InputSource; import org.xml.sax.InputSource;
import org.xml.sax.XMLReader; import org.xml.sax.XMLReader;
import org.xml.sax.helpers.DefaultHandler; import org.xml.sax.helpers.DefaultHandler;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
/** /**
@ -51,6 +49,7 @@ public class ParseSax<T> implements Function<HttpResponse, T>, InvocationContext
private final HandlerWithResult<T> handler; private final HandlerWithResult<T> handler;
@Resource @Resource
protected Logger logger = Logger.NULL; protected Logger logger = Logger.NULL;
private GeneratedHttpRequest<?> request;
public static interface Factory { public static interface Factory {
<T> ParseSax<T> create(HandlerWithResult<T> handler); <T> ParseSax<T> create(HandlerWithResult<T> handler);
@ -63,39 +62,27 @@ public class ParseSax<T> implements Function<HttpResponse, T>, InvocationContext
} }
public T apply(HttpResponse from) throws HttpException { public T apply(HttpResponse from) throws HttpException {
InputStream input = null; return parse(from.getContent());
try {
input = from.getContent();
if (input != null) {
return parse(input);
} else {
throw new HttpException("No input to parse");
}
} catch (Exception e) {
Throwables.propagateIfPossible(e, HttpException.class);
throw new HttpException("Error parsing input for " + from, e);
}
} }
public T parse(InputStream xml) throws HttpException { public T parse(InputStream from) throws HttpException {
parseAndCloseStream(xml, getHandler()); if (from == null)
return getHandler().getResult(); throw new HttpException("No input to parse");
}
private void parseAndCloseStream(InputStream xml, ContentHandler handler) throws HttpException {
try { try {
parser.setContentHandler(handler); parser.setContentHandler(getHandler());
// This method should accept documents with a BOM (Byte-order mark) // This method should accept documents with a BOM (Byte-order mark)
InputSource input = new InputSource(xml); parser.parse(new InputSource(from));
parser.parse(input); return getHandler().getResult();
} catch (Exception e) { } catch (Exception e) {
StringBuilder message = new StringBuilder(); StringBuilder message = new StringBuilder();
message.append("Error parsing input for ").append(handler); if (request != null)
message.append("Error parsing input for ").append(request.getRequestLine())
.append(": ");
message.append(e.getMessage());
logger.error(e, message.toString()); logger.error(e, message.toString());
Throwables.propagateIfPossible(e, HttpException.class);
throw new HttpException(message.toString(), e); throw new HttpException(message.toString(), e);
} finally { } finally {
Closeables.closeQuietly(xml); Closeables.closeQuietly(from);
} }
} }
@ -111,9 +98,7 @@ public class ParseSax<T> implements Function<HttpResponse, T>, InvocationContext
public abstract static class HandlerWithResult<T> extends DefaultHandler implements public abstract static class HandlerWithResult<T> extends DefaultHandler implements
InvocationContext { InvocationContext {
protected GeneratedHttpRequest<?> request; protected GeneratedHttpRequest<?> request;
public abstract T getResult(); public abstract T getResult();
@Override @Override
public void setContext(GeneratedHttpRequest<?> request) { public void setContext(GeneratedHttpRequest<?> request) {
this.request = request; this.request = request;
@ -123,5 +108,6 @@ public class ParseSax<T> implements Function<HttpResponse, T>, InvocationContext
@Override @Override
public void setContext(GeneratedHttpRequest<?> request) { public void setContext(GeneratedHttpRequest<?> request) {
handler.setContext(request); handler.setContext(request);
this.request = request;
} }
} }

View File

@ -18,10 +18,7 @@
*/ */
package org.jclouds.http.internal; package org.jclouds.http.internal;
import java.util.concurrent.ExecutorService;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.inject.Inject;
import javax.inject.Named; import javax.inject.Named;
import org.jclouds.http.HttpConstants; import org.jclouds.http.HttpConstants;
@ -40,11 +37,6 @@ public class HttpWire extends Wire {
@Named(HttpConstants.LOGGER_HTTP_WIRE) @Named(HttpConstants.LOGGER_HTTP_WIRE)
Logger wireLog = Logger.NULL; Logger wireLog = Logger.NULL;
@Inject
public HttpWire(ExecutorService exec) {
super(exec);
}
public Logger getWireLog() { public Logger getWireLog() {
return wireLog; return wireLog;
} }

View File

@ -18,10 +18,7 @@
*/ */
package org.jclouds.http.internal; package org.jclouds.http.internal;
import java.util.concurrent.ExecutorService;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.inject.Inject;
import javax.inject.Named; import javax.inject.Named;
import org.jclouds.http.HttpConstants; import org.jclouds.http.HttpConstants;
@ -40,11 +37,6 @@ public class SignatureWire extends Wire {
@Named(HttpConstants.LOGGER_SIGNATURE) @Named(HttpConstants.LOGGER_SIGNATURE)
Logger signatureLog = Logger.NULL; Logger signatureLog = Logger.NULL;
@Inject
public SignatureWire(ExecutorService exec) {
super(exec);
}
public Logger getWireLog() { public Logger getWireLog() {
return signatureLog; return signatureLog;
} }

View File

@ -26,19 +26,15 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.ExecutorService;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.inject.Inject;
import org.jclouds.concurrent.SingleThreaded;
import org.jclouds.io.TeeInputStream;
import org.jclouds.logging.Logger; import org.jclouds.logging.Logger;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.google.common.io.FileBackedOutputStream;
/** /**
* Logs data to the wire LOG. * Logs data to the wire LOG.
* *
@ -49,12 +45,7 @@ public abstract class Wire {
@Resource @Resource
protected Logger logger = Logger.NULL; protected Logger logger = Logger.NULL;
protected final ExecutorService exec;
@Inject
public Wire(ExecutorService exec) {
this.exec = checkNotNull(exec, "executor");
}
protected abstract Logger getWireLog(); protected abstract Logger getWireLog();
private void wire(String header, InputStream instream) { private void wire(String header, InputStream instream) {
@ -94,38 +85,23 @@ public abstract class Wire {
} }
public InputStream copy(final String header, InputStream instream) { public InputStream copy(final String header, InputStream instream) {
int limit = 256 * 1024;
FileBackedOutputStream out = null;
try { try {
byte[] data = ByteStreams.toByteArray(instream); out = new FileBackedOutputStream(limit);
wire(header, new ByteArrayInputStream(data)); long bytesRead = ByteStreams.copy(instream, out);
return new ByteArrayInputStream(data); if (bytesRead >= limit)
logger.warn("over limit %d/%d: wrote temp file", bytesRead, limit);
wire(header, out.getSupplier().getInput());
return out.getSupplier().getInput();
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Error tapping line", e); throw new RuntimeException("Error tapping line", e);
} finally { } finally {
Closeables.closeQuietly(out);
Closeables.closeQuietly(instream); Closeables.closeQuietly(instream);
} }
} }
public InputStream tapAsynch(final String header, InputStream instream) {
PipedOutputStream out = new PipedOutputStream();
InputStream toReturn = new TeeInputStream(instream, out, true);
final InputStream line;
try {
line = new PipedInputStream(out);
exec.submit(new Runnable() {
public void run() {
try {
wire(header, line);
} finally {
Closeables.closeQuietly(line);
}
}
});
} catch (IOException e) {
logger.error(e, "Error tapping line");
}
return toReturn;
}
public InputStream input(InputStream instream) { public InputStream input(InputStream instream) {
return copy("<< ", checkNotNull(instream, "input")); return copy("<< ", checkNotNull(instream, "input"));
} }
@ -134,10 +110,7 @@ public abstract class Wire {
public <T> T output(T data) { public <T> T output(T data) {
checkNotNull(data, "data"); checkNotNull(data, "data");
if (data instanceof InputStream) { if (data instanceof InputStream) {
if (exec.getClass().isAnnotationPresent(SingleThreaded.class)) return (T) copy(">> ", (InputStream) data);
return (T) copy(">> ", (InputStream) data);
else
return (T) tapAsynch(">> ", (InputStream) data);
} else if (data instanceof byte[]) { } else if (data instanceof byte[]) {
output((byte[]) data); output((byte[]) data);
return data; return data;
@ -154,19 +127,15 @@ public abstract class Wire {
private void output(final File out) { private void output(final File out) {
checkNotNull(out, "output"); checkNotNull(out, "output");
exec.submit(new Runnable() { InputStream in = null;
public void run() { try {
InputStream in = null; in = new FileInputStream(out);
try { wire(">> ", in);
in = new FileInputStream(out); } catch (FileNotFoundException e) {
wire(">> ", in); logger.error(e, "Error tapping file: %s", out);
} catch (FileNotFoundException e) { } finally {
logger.error(e, "Error tapping file: %s", out); Closeables.closeQuietly(in);
} finally { }
Closeables.closeQuietly(in);
}
}
});
} }
private void output(byte[] b) { private void output(byte[] b) {

View File

@ -92,7 +92,7 @@ public class BackoffLimitedRetryHandlerTest {
ExecutorService execService = Executors.newCachedThreadPool(); ExecutorService execService = Executors.newCachedThreadPool();
JavaUrlHttpCommandExecutorService httpService = new JavaUrlHttpCommandExecutorService( JavaUrlHttpCommandExecutorService httpService = new JavaUrlHttpCommandExecutorService(
execService, new DelegatingRetryHandler(), new DelegatingErrorHandler(), execService, new DelegatingRetryHandler(), new DelegatingErrorHandler(),
new HttpWire(Executors.newCachedThreadPool())); new HttpWire());
executorService = new TransformingHttpCommandExecutorServiceImpl(httpService, execService); executorService = new TransformingHttpCommandExecutorServiceImpl(httpService, execService);
} }

View File

@ -19,7 +19,6 @@
package org.jclouds.http.internal; package org.jclouds.http.internal;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.Executors.sameThreadExecutor;
import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
@ -29,7 +28,6 @@ import java.io.InputStream;
import java.net.URL; import java.net.URL;
import java.net.URLConnection; import java.net.URLConnection;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.jclouds.encryption.EncryptionService; import org.jclouds.encryption.EncryptionService;
@ -126,17 +124,15 @@ public class WireLiveTest {
} }
public static HttpWire setUp() throws Exception { public static HttpWire setUp() throws Exception {
ExecutorService service = newCachedThreadPool();
BufferLogger bufferLogger = new BufferLogger(); BufferLogger bufferLogger = new BufferLogger();
HttpWire wire = new HttpWire(service); HttpWire wire = new HttpWire();
wire.wireLog = (bufferLogger); wire.wireLog = (bufferLogger);
return wire; return wire;
} }
public HttpWire setUpSynch() throws Exception { public HttpWire setUpSynch() throws Exception {
ExecutorService service = sameThreadExecutor();
BufferLogger bufferLogger = new BufferLogger(); BufferLogger bufferLogger = new BufferLogger();
HttpWire wire = new HttpWire(service); HttpWire wire = new HttpWire();
wire.wireLog = (bufferLogger); wire.wireLog = (bufferLogger);
return wire; return wire;
} }

View File

@ -18,13 +18,10 @@
*/ */
package org.jclouds.http.internal; package org.jclouds.http.internal;
import static com.google.common.util.concurrent.Executors.sameThreadExecutor;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.InputStream; import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jclouds.logging.Logger; import org.jclouds.logging.Logger;
import org.jclouds.util.Utils; import org.jclouds.util.Utils;
@ -89,17 +86,15 @@ public class WireTest {
} }
public HttpWire setUp() throws Exception { public HttpWire setUp() throws Exception {
ExecutorService service = Executors.newCachedThreadPool();
BufferLogger bufferLogger = new BufferLogger(); BufferLogger bufferLogger = new BufferLogger();
HttpWire wire = new HttpWire(service); HttpWire wire = new HttpWire();
wire.wireLog = (bufferLogger); wire.wireLog = (bufferLogger);
return wire; return wire;
} }
public HttpWire setUpSynch() throws Exception { public HttpWire setUpSynch() throws Exception {
ExecutorService service = sameThreadExecutor();
BufferLogger bufferLogger = new BufferLogger(); BufferLogger bufferLogger = new BufferLogger();
HttpWire wire = new HttpWire(service); HttpWire wire = new HttpWire();
wire.wireLog = (bufferLogger); wire.wireLog = (bufferLogger);
return wire; return wire;
} }