HDFS-12612. DFSStripedOutputStream.close will throw if called a second time with a failed streamer. (Lei (Eddy) Xu)

This commit is contained in:
Lei Xu 2017-10-17 15:52:09 -07:00
parent 75323394fb
commit f27a4ad032
4 changed files with 184 additions and 38 deletions

View File

@ -82,6 +82,12 @@ public class DFSStripedOutputStream extends DFSOutputStream
implements StreamCapabilities {
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
/**
* OutputStream level last exception, will be used to indicate the fatal
* exception of this stream, i.e., being aborted.
*/
private final ExceptionLastSeen exceptionLastSeen = new ExceptionLastSeen();
static class MultipleBlockingQueue<T> {
private final List<BlockingQueue<T>> queues;
@ -971,12 +977,9 @@ public class DFSStripedOutputStream extends DFSOutputStream
if (isClosed()) {
return;
}
for (StripedDataStreamer streamer : streamers) {
streamer.getLastException().set(
new IOException("Lease timeout of "
exceptionLastSeen.set(new IOException("Lease timeout of "
+ (dfsClient.getConf().getHdfsTimeout() / 1000)
+ " seconds expired."));
}
try {
closeThreads(true);
@ -1133,19 +1136,27 @@ public class DFSStripedOutputStream extends DFSOutputStream
@Override
protected synchronized void closeImpl() throws IOException {
if (isClosed()) {
exceptionLastSeen.check(true);
// Writing to at least {dataUnits} replicas can be considered as success,
// and the rest of data can be recovered.
final int minReplication = ecPolicy.getNumDataUnits();
int goodStreamers = 0;
final MultipleIOException.Builder b = new MultipleIOException.Builder();
for(int i = 0; i < streamers.size(); i++) {
final StripedDataStreamer si = getStripedDataStreamer(i);
for (final StripedDataStreamer si : streamers) {
try {
si.getLastException().check(true);
goodStreamers++;
} catch (IOException e) {
b.add(e);
}
}
if (goodStreamers < minReplication) {
final IOException ioe = b.build();
if (ioe != null) {
throw ioe;
}
}
return;
}
@ -1183,9 +1194,10 @@ public class DFSStripedOutputStream extends DFSOutputStream
}
} finally {
// Failures may happen when flushing data/parity data out. Exceptions
// may be thrown if more than 3 streamers fail, or updatePipeline RPC
// fails. Streamers may keep waiting for the new block/GS information.
// Thus need to force closing these threads.
// may be thrown if the number of failed streamers is more than the
// number of parity blocks, or updatePipeline RPC fails. Streamers may
// keep waiting for the new block/GS information. Thus need to force
// closing these threads.
closeThreads(true);
}

View File

@ -285,38 +285,21 @@ class DataStreamer extends Daemon {
packets.clear();
}
class LastExceptionInStreamer {
private IOException thrown;
synchronized void set(Throwable t) {
assert t != null;
this.thrown = t instanceof IOException ?
(IOException) t : new IOException(t);
}
synchronized void clear() {
thrown = null;
}
/** Check if there already is an exception. */
class LastExceptionInStreamer extends ExceptionLastSeen {
/**
* Check if there already is an exception.
*/
@Override
synchronized void check(boolean resetToNull) throws IOException {
final IOException thrown = get();
if (thrown != null) {
if (LOG.isTraceEnabled()) {
// wrap and print the exception to know when the check is called
LOG.trace("Got Exception while checking, " + DataStreamer.this,
new Throwable(thrown));
}
final IOException e = thrown;
if (resetToNull) {
thrown = null;
super.check(resetToNull);
}
throw e;
}
}
synchronized void throwException4Close() throws IOException {
check(false);
throw new ClosedChannelException();
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
/**
* The exception last seen by the {@link DataStreamer} or
* {@link DFSOutputStream}.
*/
@InterfaceAudience.Private
class ExceptionLastSeen {
private IOException thrown;
/** Get the last seen exception. */
synchronized protected IOException get() {
return thrown;
}
/**
* Set the last seen exception.
* @param t the exception.
*/
synchronized void set(Throwable t) {
assert t != null;
this.thrown = t instanceof IOException ?
(IOException) t : new IOException(t);
}
/** Clear the last seen exception. */
synchronized void clear() {
thrown = null;
}
/**
* Check if there already is an exception. Throw the exception if exist.
*
* @param resetToNull set to true to reset exception to null after calling
* this function.
* @throws IOException on existing IOException.
*/
synchronized void check(boolean resetToNull) throws IOException {
if (thrown != null) {
final IOException e = thrown;
if (resetToNull) {
thrown = null;
}
throw e;
}
}
synchronized void throwException4Close() throws IOException {
check(false);
throw new ClosedChannelException();
}
}

View File

@ -319,6 +319,82 @@ public class TestDFSStripedOutputStreamWithFailure {
}
}
private void testCloseWithExceptionsInStreamer(
int numFailures, boolean shouldFail) throws Exception {
assertTrue(numFailures <=
ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
final Path dirFile = new Path(dir, "ecfile-" + numFailures);
try (FSDataOutputStream out = dfs.create(dirFile, true)) {
out.write("idempotent close".getBytes());
// Expect to raise IOE on the first close call, but any following
// close() should be no-op.
LambdaTestUtils.intercept(IOException.class,
out::close);
assertTrue(out.getWrappedStream() instanceof DFSStripedOutputStream);
DFSStripedOutputStream stripedOut =
(DFSStripedOutputStream) out.getWrappedStream();
for (int i = 0; i < numFailures; i++) {
// Only inject 1 stream failure.
stripedOut.getStripedDataStreamer(i).getLastException().set(
new IOException("injected failure")
);
}
if (shouldFail) {
LambdaTestUtils.intercept(IOException.class, out::close);
}
// Close multiple times. All the following close() should have no
// side-effect.
out.close();
}
}
// HDFS-12612
@Test
public void testIdempotentCloseWithFailedStreams() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
try {
setup(conf);
// shutdown few datanodes to avoid getting sufficient data blocks number
// of datanodes.
while (cluster.getDataNodes().size() >= dataBlocks) {
cluster.stopDataNode(0);
}
cluster.restartNameNodes();
cluster.triggerHeartbeats();
testCloseWithExceptionsInStreamer(1, false);
testCloseWithExceptionsInStreamer(ecPolicy.getNumParityUnits(), false);
testCloseWithExceptionsInStreamer(ecPolicy.getNumParityUnits() + 1, true);
testCloseWithExceptionsInStreamer(ecPolicy.getNumDataUnits(), true);
} finally {
tearDown();
}
}
@Test
public void testCloseAfterAbort() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
try {
setup(conf);
final Path dirFile = new Path(dir, "ecfile");
FSDataOutputStream out = dfs.create(dirFile, true);
assertTrue(out.getWrappedStream() instanceof DFSStripedOutputStream);
DFSStripedOutputStream stripedOut =
(DFSStripedOutputStream) out.getWrappedStream();
stripedOut.abort();
LambdaTestUtils.intercept(IOException.class,
"Lease timeout", stripedOut::close);
} finally {
tearDown();
}
}
@Test(timeout = 90000)
public void testAddBlockWhenNoSufficientParityNumOfNodes()
throws IOException {