[OLINGO-1343] Fix deadlock Piped_Stream (by Aleksandr)

This commit is contained in:
mibo 2019-04-26 20:48:08 +02:00
parent 482c99c9f0
commit b8ac17e4ae
8 changed files with 421 additions and 15 deletions

View File

@ -60,6 +60,8 @@ public class ConfigurationImpl implements Configuration {
private static final String CONTINUE_ON_ERROR = "continueOnError";
public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; // 4MB
private final Map<String, Object> CONF = new HashMap<String, Object>();
private transient ExecutorService executor = createExecutor(10);

View File

@ -18,10 +18,7 @@
*/
package org.apache.olingo.client.core.communication.request;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -32,6 +29,9 @@ import org.apache.http.HttpResponse;
import org.apache.olingo.client.api.communication.request.ODataPayloadManager;
import org.apache.olingo.client.api.communication.response.ODataResponse;
import org.apache.olingo.client.api.http.HttpClientException;
import org.apache.olingo.client.core.ConfigurationImpl;
import org.apache.olingo.client.core.communication.util.PipedInputStream;
import org.apache.olingo.client.core.communication.util.PipedOutputStream;
/**
* OData request payload management abstract class.
@ -62,7 +62,7 @@ public abstract class AbstractODataStreamManager<T extends ODataResponse> extend
* @param futureWrap wrapper of the Future object of the HttpResponse.
*/
public AbstractODataStreamManager(final Wrapper<Future<HttpResponse>> futureWrap) {
this(futureWrap, new PipedOutputStream());
this(futureWrap, new PipedOutputStream(null, ConfigurationImpl.DEFAULT_BUFFER_SIZE));
}
/**
@ -76,8 +76,8 @@ public abstract class AbstractODataStreamManager<T extends ODataResponse> extend
this.futureWrap = futureWrap;
try {
this.body = new PipedInputStream(getBodyStreamWriter());
} catch (IOException e) {
this.body = new PipedInputStream(getBodyStreamWriter(), ConfigurationImpl.DEFAULT_BUFFER_SIZE);
} catch (Exception e) {
throw new IllegalStateException(e);
}
this.defaultBody = this.body;

View File

@ -20,10 +20,10 @@ package org.apache.olingo.client.core.communication.request;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PipedOutputStream;
import java.util.Arrays;
import org.apache.olingo.client.api.communication.request.ODataStreamer;
import org.apache.olingo.client.core.communication.util.PipedOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -18,7 +18,6 @@
*/
package org.apache.olingo.client.core.communication.request.batch;
import java.io.PipedOutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@ -30,6 +29,7 @@ import org.apache.olingo.client.api.communication.request.ODataPayloadManager;
import org.apache.olingo.client.api.communication.request.batch.ODataBatchRequest;
import org.apache.olingo.client.api.communication.request.batch.ODataBatchResponseItem;
import org.apache.olingo.client.api.communication.response.ODataResponse;
import org.apache.olingo.client.core.communication.util.PipedOutputStream;
import org.apache.olingo.client.core.communication.request.streamed.AbstractODataStreamedRequest;
import org.apache.olingo.commons.api.format.ContentType;
import org.apache.olingo.commons.api.http.HttpMethod;
@ -71,7 +71,7 @@ public abstract class AbstractODataBatchRequest<V extends ODataResponse, T exten
}
public PipedOutputStream getOutputStream() {
return getPayloadManager().getBodyStreamWriter();
return (PipedOutputStream) getPayloadManager().getBodyStreamWriter();
}
/**

View File

@ -24,6 +24,7 @@ import org.apache.olingo.client.api.ODataBatchConstants;
import org.apache.olingo.client.api.communication.request.ODataBatchableRequest;
import org.apache.olingo.client.api.communication.request.batch.ODataBatchRequest;
import org.apache.olingo.client.api.communication.request.batch.ODataBatchRequestItem;
import org.apache.olingo.client.core.communication.util.PipedOutputStream;
import org.apache.olingo.client.core.communication.request.AbstractODataStreamer;
/**
@ -54,7 +55,7 @@ public abstract class AbstractODataBatchRequestItem extends AbstractODataStreame
* @param req OData batch request.
*/
public AbstractODataBatchRequestItem(final ODataBatchRequest req) {
super(req.getOutputStream());
super((PipedOutputStream) req.getOutputStream());
this.open = true;
this.req = req;
}

View File

@ -22,8 +22,6 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
@ -40,6 +38,9 @@ import org.apache.olingo.client.api.communication.request.ODataStreamer;
import org.apache.olingo.client.api.communication.request.batch.ODataBatchLineIterator;
import org.apache.olingo.client.api.communication.response.ODataResponse;
import org.apache.olingo.client.api.http.NoContentException;
import org.apache.olingo.client.core.ConfigurationImpl;
import org.apache.olingo.client.core.communication.util.PipedInputStream;
import org.apache.olingo.client.core.communication.util.PipedOutputStream;
import org.apache.olingo.client.core.communication.request.batch.ODataBatchController;
import org.apache.olingo.client.core.communication.request.batch.ODataBatchLineIteratorImpl;
import org.apache.olingo.client.core.communication.request.batch.ODataBatchUtilities;
@ -264,10 +265,11 @@ public abstract class AbstractODataResponse implements ODataResponse {
if (payload == null && batchInfo != null && batchInfo.isValidBatch()) {
// get input stream till the end of item
payload = new PipedInputStream();
payload = new PipedInputStream(null);
try {
final PipedOutputStream os = new PipedOutputStream((PipedInputStream) payload);
final PipedOutputStream os = new PipedOutputStream((PipedInputStream) payload,
ConfigurationImpl.DEFAULT_BUFFER_SIZE);
new Thread(new Runnable() {
@Override
@ -281,7 +283,7 @@ public abstract class AbstractODataResponse implements ODataResponse {
}
}
}).start();
} catch (IOException e) {
} catch (Exception e) {
LOG.error("Error streaming payload response", e);
throw new IllegalStateException(e);
}

View File

@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.olingo.client.core.communication.util;
import org.apache.olingo.client.core.ConfigurationImpl;
import java.io.IOException;
/**
* This class is equivalent to <code>java.io.PipedInputStream</code>. In the
* interface it only adds a constructor which allows for specifying the buffer
* size. Its implementation, however, is much simpler and a lot more efficient
* than its equivalent. It doesn't rely on polling. Instead it uses proper
* synchronization with its counterpart <code>be.re.io.PipedOutputStream</code>.
*
* Multiple readers can read from this stream concurrently. The block asked for
* by a reader is delivered completely, or until the end of the stream if less
* is available. Other readers can't come in between.
*
* @author WD
*/
public class PipedInputStream extends java.io.PipedInputStream {
final Object sync = new Object();
byte[] buffer;
boolean closed = false;
int readLaps = 0;
int readPosition = 0;
PipedOutputStream source;
int writeLaps = 0;
int writePosition = 0;
/**
* Creates an unconnected PipedInputStream with a default buffer size.
*/
protected PipedInputStream() {
this(null);
}
/**
* Creates a PipedInputStream with a default buffer size and connects it to
* <code>source</code>.
*
* @exception IOException
* It was already connected.
*/
public PipedInputStream(PipedOutputStream source) {
this(source, ConfigurationImpl.DEFAULT_BUFFER_SIZE);
}
/**
* Creates a PipedInputStream with buffer size <code>bufferSize</code> and
* connects it to <code>source</code>.
*
*/
public PipedInputStream(PipedOutputStream source, int bufferSize) {
if (source != null) {
try {
connect(source);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
buffer = new byte[bufferSize];
}
public int available() {
/*
* The circular buffer is inspected to see where the reader and the
* writer are located.
*/
return writePosition > readPosition
/* The writer is in the same lap. */ ? writePosition - readPosition
: (writePosition < readPosition
/* The writer is in the next lap. */ ? buffer.length
- readPosition
+ 1
+ writePosition
:
/*
* The writer is at the same position or a complete lap
* ahead.
*/
(writeLaps > readLaps ? buffer.length : 0));
}
/**
* @exception IOException
* The pipe is not connected.
*/
public void close() throws IOException {
if (source == null) {
throw new IOException("Unconnected pipe");
}
synchronized (sync) {
closed = true;
// Release any pending writers.
sync.notifyAll();
}
}
/**
* @exception IOException
* The pipe is already connected.
*/
public void connect(PipedOutputStream source) throws IOException {
if (this.source != null) {
throw new IOException("Pipe already connected");
}
this.source = source;
source.sink = this;
}
public void mark(int readLimit) {
/* not supported */
}
public boolean markSupported() {
return false;
}
public int read() throws IOException {
byte[] b = new byte[0];
int result = read(b);
return result == -1 ? -1 : b[0];
}
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
/**
* @exception IOException
* The pipe is not connected.
*/
public int read(byte[] b, int off, int len) throws IOException {
if (source == null) {
throw new IOException("Unconnected pipe");
}
synchronized (sync) {
if (writePosition == readPosition && writeLaps == readLaps) {
if (closed) {
return -1;
}
// Wait for any writer to put something in the circular buffer.
try {
sync.wait();
} catch (InterruptedException e) {
throw new IOException(e.getMessage());
}
// Try again.
return read(b, off, len);
}
// Don't read more than the capacity indicated by len or what's
// available
// in the circular buffer.
int amount = Math .min(len,
(writePosition > readPosition
? writePosition
: buffer.length)
-
readPosition);
System.arraycopy(
buffer,
readPosition,
b,
off,
amount);
readPosition += amount;
if (readPosition == buffer.length) {
// A lap was completed, so go // back.
readPosition = 0;
readLaps++;
}
// The buffer is only released when the complete desired block was
// obtained.
if (amount < len) {
int second = read(b, off + amount, len - amount);
return second == -1 ? amount : amount + second;
} else {
sync.notifyAll();
}
return amount;
}
}
}

View File

@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.olingo.client.core.communication.util;
import org.apache.olingo.client.core.ConfigurationImpl;
import java.io.IOException;
/**
* This class is equivalent to <code>java.io.PipedOutputStream</code>. In the
* interface it only adds a constructor which allows for specifying the buffer
* size. Its implementation, however, is much simpler and a lot more efficient
* than its equivalent. It doesn't rely on polling. Instead it uses proper
* synchronization with its counterpart <code>be.re.io.PipedInputStream</code>.
*
* Multiple writers can write in this stream concurrently. The block written by
* a writer is put in completely. Other writers can't come in between.
*
* @author WD
*/
public class PipedOutputStream extends java.io.PipedOutputStream {
PipedInputStream sink;
/**
* Creates an unconnected PipedOutputStream.
*/
protected PipedOutputStream() {
this(null);
}
/**
* Creates a PipedOutputStream with a default buffer size and connects it to
* <code>sink</code>.
*
*/
public PipedOutputStream(PipedInputStream sink) {
this(sink, ConfigurationImpl.DEFAULT_BUFFER_SIZE);
}
/**
* Creates a PipedOutputStream with buffer size <code>bufferSize</code> and
* connects it to <code>sink</code>.
*
*/
public PipedOutputStream(PipedInputStream sink, int bufferSize) {
if (sink != null) {
try {
connect(sink);
} catch (Exception e) {
throw new RuntimeException(e);
}
sink.buffer = new byte[bufferSize];
}
}
/**
* @exception IOException
* The pipe is not connected.
*/
public void close() throws IOException {
if (sink == null) {
throw new IOException("Unconnected pipe");
}
synchronized (sink.sync) {
sink.closed = true;
flush();
}
}
/**
* @exception IOException
* The pipe is already connected.
*/
public void connect(PipedInputStream sink) throws IOException {
if (this.sink != null) {
throw new IOException("Pipe already connected");
}
this.sink = sink;
sink.source = this;
}
public void flush() throws IOException {
synchronized (sink.sync) {
// Release all readers.
sink.sync.notifyAll();
}
}
public void write(int b) throws IOException {
write(new byte[] { (byte) b });
}
public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}
/**
* @exception IOException
* The pipe is not connected or a reader has closed it.
*/
public void write(byte[] b, int off, int len) throws IOException {
if (sink == null) {
throw new IOException("Unconnected pipe");
}
if (sink.closed) {
throw new IOException("Broken pipe");
}
synchronized (sink.sync) {
if (sink.writePosition == sink.readPosition
&& sink.writeLaps > sink.readLaps) {
// The circular buffer is full, so wait for some reader to
// consume something.
try {
sink.sync.wait();
} catch (InterruptedException e) {
throw new IOException(e.getMessage());
}
// Try again.
write(b, off, len);
return;
}
// Don't write more than the capacity indicated by len or the space
// available in the circular buffer.
int amount = Math.min(len,
(sink.writePosition < sink.readPosition
? sink.readPosition
: sink.buffer.length)
- sink.writePosition);
System.arraycopy(
b,
off,
sink.buffer,
sink.writePosition,
amount);
sink.writePosition += amount;
if (sink.writePosition == sink.buffer.length) {
sink.writePosition = 0;
++sink.writeLaps;
}
// The buffer is only released when the complete desired block was
// written.
if (amount < len) {
write(b, off + amount, len - amount);
} else {
sink.sync.notifyAll();
}
}
}
}