HDFS-4906. HDFS Output streams should not accept writes after being closed. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1494303 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2013-06-18 21:05:16 +00:00
parent 52fe4e6be9
commit 1c309f763b
7 changed files with 127 additions and 23 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import java.io.*; import java.io.*;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays; import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -368,6 +369,7 @@ private static class ChecksumFSOutputSummer extends FSOutputSummer {
private FSDataOutputStream datas; private FSDataOutputStream datas;
private FSDataOutputStream sums; private FSDataOutputStream sums;
private static final float CHKSUM_AS_FRACTION = 0.01f; private static final float CHKSUM_AS_FRACTION = 0.01f;
private boolean isClosed = false;
public ChecksumFSOutputSummer(ChecksumFileSystem fs, public ChecksumFSOutputSummer(ChecksumFileSystem fs,
Path file, Path file,
@ -391,9 +393,13 @@ public ChecksumFSOutputSummer(ChecksumFileSystem fs,
@Override @Override
public void close() throws IOException { public void close() throws IOException {
try {
flushBuffer(); flushBuffer();
sums.close(); sums.close();
datas.close(); datas.close();
} finally {
isClosed = true;
}
} }
@Override @Override
@ -402,6 +408,13 @@ protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
datas.write(b, offset, len); datas.write(b, offset, len);
sums.write(checksum); sums.write(checksum);
} }
@Override
protected void checkClosed() throws IOException {
if (isClosed) {
throw new ClosedChannelException();
}
}
} }
@Override @Override

View File

@ -20,6 +20,7 @@
import java.io.*; import java.io.*;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
@ -325,6 +326,7 @@ private static class ChecksumFSOutputSummer extends FSOutputSummer {
private FSDataOutputStream datas; private FSDataOutputStream datas;
private FSDataOutputStream sums; private FSDataOutputStream sums;
private static final float CHKSUM_AS_FRACTION = 0.01f; private static final float CHKSUM_AS_FRACTION = 0.01f;
private boolean isClosed = false;
public ChecksumFSOutputSummer(final ChecksumFs fs, final Path file, public ChecksumFSOutputSummer(final ChecksumFs fs, final Path file,
@ -356,9 +358,13 @@ public ChecksumFSOutputSummer(final ChecksumFs fs, final Path file,
@Override @Override
public void close() throws IOException { public void close() throws IOException {
try {
flushBuffer(); flushBuffer();
sums.close(); sums.close();
datas.close(); datas.close();
} finally {
isClosed = true;
}
} }
@Override @Override
@ -367,6 +373,13 @@ protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
datas.write(b, offset, len); datas.write(b, offset, len);
sums.write(checksum); sums.write(checksum);
} }
@Override
protected void checkClosed() throws IOException {
if (isClosed) {
throw new ClosedChannelException();
}
}
} }
@Override @Override

View File

@ -54,6 +54,15 @@ protected FSOutputSummer(Checksum sum, int maxChunkSize, int checksumSize) {
protected abstract void writeChunk(byte[] b, int offset, int len, byte[] checksum) protected abstract void writeChunk(byte[] b, int offset, int len, byte[] checksum)
throws IOException; throws IOException;
/**
* Check if the implementing OutputStream is closed and should no longer
* accept writes. Implementations should do nothing if this stream is not
* closed, and should throw an {@link IOException} if it is closed.
*
* @throws IOException if this stream is already closed.
*/
protected abstract void checkClosed() throws IOException;
/** Write one byte */ /** Write one byte */
@Override @Override
public synchronized void write(int b) throws IOException { public synchronized void write(int b) throws IOException {
@ -85,6 +94,9 @@ public synchronized void write(int b) throws IOException {
@Override @Override
public synchronized void write(byte b[], int off, int len) public synchronized void write(byte b[], int off, int len)
throws IOException { throws IOException {
checkClosed();
if (off < 0 || len < 0 || off > b.length - len) { if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException(); throw new ArrayIndexOutOfBoundsException();
} }

View File

@ -582,6 +582,9 @@ Release 2.1.0-beta - UNRELEASED
HDFS-4845. FSNamesystem.deleteInternal should acquire write-lock before HDFS-4845. FSNamesystem.deleteInternal should acquire write-lock before
changing the inode map. (Arpit Agarwal via szetszwo) changing the inode map. (Arpit Agarwal via szetszwo)
HDFS-4906. HDFS Output streams should not accept writes after being
closed. (atm)
BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS
HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes. HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.

View File

@ -30,6 +30,7 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.nio.BufferOverflowException; import java.nio.BufferOverflowException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
@ -1299,10 +1300,10 @@ static Socket createSocketForPipeline(final DatanodeInfo first,
return sock; return sock;
} }
private void isClosed() throws IOException { protected void checkClosed() throws IOException {
if (closed) { if (closed) {
IOException e = lastException; IOException e = lastException;
throw e != null ? e : new IOException("DFSOutputStream is closed"); throw e != null ? e : new ClosedChannelException();
} }
} }
@ -1471,7 +1472,7 @@ private void waitAndQueueCurrentPacket() throws IOException {
break; break;
} }
} }
isClosed(); checkClosed();
queueCurrentPacket(); queueCurrentPacket();
} }
} }
@ -1481,7 +1482,7 @@ private void waitAndQueueCurrentPacket() throws IOException {
protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum) protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)
throws IOException { throws IOException {
dfsClient.checkOpen(); dfsClient.checkOpen();
isClosed(); checkClosed();
int cklen = checksum.length; int cklen = checksum.length;
int bytesPerChecksum = this.checksum.getBytesPerChecksum(); int bytesPerChecksum = this.checksum.getBytesPerChecksum();
@ -1608,7 +1609,7 @@ public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags) private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
throws IOException { throws IOException {
dfsClient.checkOpen(); dfsClient.checkOpen();
isClosed(); checkClosed();
try { try {
long toWaitFor; long toWaitFor;
long lastBlockLength = -1L; long lastBlockLength = -1L;
@ -1695,7 +1696,7 @@ private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
// If we got an error here, it might be because some other thread called // If we got an error here, it might be because some other thread called
// close before our hflush completed. In that case, we should throw an // close before our hflush completed. In that case, we should throw an
// exception that the stream is closed. // exception that the stream is closed.
isClosed(); checkClosed();
// If we aren't closed but failed to sync, we should expose that to the // If we aren't closed but failed to sync, we should expose that to the
// caller. // caller.
throw ioe; throw ioe;
@ -1740,7 +1741,7 @@ public synchronized int getNumCurrentReplicas() throws IOException {
*/ */
public synchronized int getCurrentBlockReplication() throws IOException { public synchronized int getCurrentBlockReplication() throws IOException {
dfsClient.checkOpen(); dfsClient.checkOpen();
isClosed(); checkClosed();
if (streamer == null) { if (streamer == null) {
return blockReplication; // no pipeline, return repl factor of file return blockReplication; // no pipeline, return repl factor of file
} }
@ -1759,7 +1760,7 @@ private void flushInternal() throws IOException {
long toWaitFor; long toWaitFor;
synchronized (this) { synchronized (this) {
dfsClient.checkOpen(); dfsClient.checkOpen();
isClosed(); checkClosed();
// //
// If there is data in the current buffer, send it across // If there is data in the current buffer, send it across
// //
@ -1776,7 +1777,7 @@ private void waitForAckedSeqno(long seqno) throws IOException {
} }
synchronized (dataQueue) { synchronized (dataQueue) {
while (!closed) { while (!closed) {
isClosed(); checkClosed();
if (lastAckedSeqno >= seqno) { if (lastAckedSeqno >= seqno) {
break; break;
} }
@ -1788,7 +1789,7 @@ private void waitForAckedSeqno(long seqno) throws IOException {
} }
} }
} }
isClosed(); checkClosed();
} }
private synchronized void start() { private synchronized void start() {

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.ClosedChannelException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
public class TestClose {
@Test
public void testWriteAfterClose() throws IOException {
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.build();
try {
final byte[] data = "foo".getBytes();
FileSystem fs = FileSystem.get(conf);
OutputStream out = fs.create(new Path("/test"));
out.write(data);
out.close();
try {
// Should fail.
out.write(data);
fail("Should not have been able to write more data after file is closed.");
} catch (ClosedChannelException cce) {
// We got the correct exception. Ignoring.
}
// Should succeed. Double closes are OK.
out.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -169,14 +170,11 @@ public void run() {
while (true) { while (true) {
try { try {
stm.hflush(); stm.hflush();
} catch (IOException ioe) { } catch (ClosedChannelException ioe) {
if (!ioe.toString().contains("DFSOutputStream is closed")) { // Expected exception caught. Ignoring.
throw ioe;
} else {
return; return;
} }
} }
}
} catch (Throwable t) { } catch (Throwable t) {
thrown.set(t); thrown.set(t);
} }