HDFS-4725. Merge r1470771 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1486144 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-05-24 18:15:42 +00:00
parent 57cf1e8fbf
commit 4a16d5b759
8 changed files with 146 additions and 112 deletions

View File

@ -276,6 +276,9 @@ Release 2.0.5-beta - UNRELEASED
HDFS-4693. Some test cases in TestCheckpoint do not clean up after
themselves. (Arpit Agarwal, suresh via suresh)
HDFS-4725. Fix HDFS file handle leaks in FSEditLog, NameNode,
OfflineEditsBinaryLoader and some tests. (Chris Nauroth via szetszwo)
BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS
HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.

View File

@ -297,21 +297,23 @@ public class FSEditLog implements LogsPurgeable {
LOG.debug("Closing log when already closed");
return;
}
if (state == State.IN_SEGMENT) {
assert editLogStream != null;
waitForSyncToFinish();
endCurrentLogSegment(true);
}
if (journalSet != null && !journalSet.isEmpty()) {
try {
journalSet.close();
} catch (IOException ioe) {
LOG.warn("Error closing journalSet", ioe);
}
}
state = State.CLOSED;
try {
if (state == State.IN_SEGMENT) {
assert editLogStream != null;
waitForSyncToFinish();
endCurrentLogSegment(true);
}
} finally {
if (journalSet != null && !journalSet.isEmpty()) {
try {
journalSet.close();
} catch (IOException ioe) {
LOG.warn("Error closing journalSet", ioe);
}
}
state = State.CLOSED;
}
}
@ -563,6 +565,7 @@ public class FSEditLog implements LogsPurgeable {
"due to " + e.getMessage() + ". " +
"Unsynced transactions: " + (txid - synctxid);
LOG.fatal(msg, new Exception());
IOUtils.cleanup(LOG, journalSet);
terminate(1, msg);
}
} finally {
@ -586,6 +589,7 @@ public class FSEditLog implements LogsPurgeable {
"Could not sync enough journals to persistent storage. "
+ "Unsynced transactions: " + (txid - synctxid);
LOG.fatal(msg, new Exception());
IOUtils.cleanup(LOG, journalSet);
terminate(1, msg);
}
}

View File

@ -653,13 +653,14 @@ public class NameNode {
}
} catch (ServiceFailedException e) {
LOG.warn("Encountered exception while exiting state ", e);
}
stopCommonServices();
if (metrics != null) {
metrics.shutdown();
}
if (namesystem != null) {
namesystem.shutdown();
} finally {
stopCommonServices();
if (metrics != null) {
metrics.shutdown();
}
if (namesystem != null) {
namesystem.shutdown();
}
}
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.tools.offlineEditsViewer.OfflineEditsViewer;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.io.IOUtils;
/**
* OfflineEditsBinaryLoader loads edits from a binary edits file
@ -59,43 +60,49 @@ class OfflineEditsBinaryLoader implements OfflineEditsLoader {
*/
@Override
public void loadEdits() throws IOException {
visitor.start(inputStream.getVersion());
while (true) {
try {
FSEditLogOp op = inputStream.readOp();
if (op == null)
break;
if (fixTxIds) {
if (nextTxId <= 0) {
nextTxId = op.getTransactionId();
try {
visitor.start(inputStream.getVersion());
while (true) {
try {
FSEditLogOp op = inputStream.readOp();
if (op == null)
break;
if (fixTxIds) {
if (nextTxId <= 0) {
nextTxId = 1;
nextTxId = op.getTransactionId();
if (nextTxId <= 0) {
nextTxId = 1;
}
}
op.setTransactionId(nextTxId);
nextTxId++;
}
op.setTransactionId(nextTxId);
nextTxId++;
visitor.visitOp(op);
} catch (IOException e) {
if (!recoveryMode) {
// Tell the visitor to clean up, then re-throw the exception
LOG.error("Got IOException at position " +
inputStream.getPosition());
visitor.close(e);
throw e;
}
LOG.error("Got IOException while reading stream! Resyncing.", e);
inputStream.resync();
} catch (RuntimeException e) {
if (!recoveryMode) {
// Tell the visitor to clean up, then re-throw the exception
LOG.error("Got RuntimeException at position " +
inputStream.getPosition());
visitor.close(e);
throw e;
}
LOG.error("Got RuntimeException while reading stream! Resyncing.", e);
inputStream.resync();
}
visitor.visitOp(op);
} catch (IOException e) {
if (!recoveryMode) {
// Tell the visitor to clean up, then re-throw the exception
LOG.error("Got IOException at position " + inputStream.getPosition());
visitor.close(e);
throw e;
}
LOG.error("Got IOException while reading stream! Resyncing.", e);
inputStream.resync();
} catch (RuntimeException e) {
if (!recoveryMode) {
// Tell the visitor to clean up, then re-throw the exception
LOG.error("Got RuntimeException at position " + inputStream.getPosition());
visitor.close(e);
throw e;
}
LOG.error("Got RuntimeException while reading stream! Resyncing.", e);
inputStream.resync();
}
visitor.close(null);
} finally {
IOUtils.cleanup(LOG, inputStream);
}
visitor.close(null);
}
}
}

View File

@ -48,6 +48,8 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -91,6 +93,8 @@ import com.google.common.base.Joiner;
/** Utilities for HDFS tests */
public class DFSTestUtil {
private static final Log LOG = LogFactory.getLog(DFSTestUtil.class);
private static Random gen = new Random();
private static String[] dirNames = {
@ -710,7 +714,11 @@ public class DFSTestUtil {
File file = new File(filename);
DataInputStream in = new DataInputStream(new FileInputStream(file));
byte[] content = new byte[(int)file.length()];
in.readFully(content);
try {
in.readFully(content);
} finally {
IOUtils.cleanup(LOG, in);
}
return content;
}

View File

@ -634,44 +634,48 @@ public class TestDistributedFileSystem {
true);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2).build();
DistributedFileSystem fs = cluster.getFileSystem();
// Create two files
Path tmpFile1 = new Path("/tmpfile1.dat");
Path tmpFile2 = new Path("/tmpfile2.dat");
DFSTestUtil.createFile(fs, tmpFile1, 1024, (short) 2, 0xDEADDEADl);
DFSTestUtil.createFile(fs, tmpFile2, 1024, (short) 2, 0xDEADDEADl);
// Get locations of blocks of both files and concat together
BlockLocation[] blockLocs1 = fs.getFileBlockLocations(tmpFile1, 0, 1024);
BlockLocation[] blockLocs2 = fs.getFileBlockLocations(tmpFile2, 0, 1024);
BlockLocation[] blockLocs = (BlockLocation[]) ArrayUtils.addAll(blockLocs1,
blockLocs2);
// Fetch VolumeBlockLocations in batch
BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays
.asList(blockLocs));
int counter = 0;
// Print out the list of ids received for each block
for (BlockStorageLocation l : locs) {
for (int i = 0; i < l.getVolumeIds().length; i++) {
VolumeId id = l.getVolumeIds()[i];
String name = l.getNames()[i];
if (id != null) {
System.out.println("Datanode " + name + " has block " + counter
+ " on volume id " + id.toString());
try {
DistributedFileSystem fs = cluster.getFileSystem();
// Create two files
Path tmpFile1 = new Path("/tmpfile1.dat");
Path tmpFile2 = new Path("/tmpfile2.dat");
DFSTestUtil.createFile(fs, tmpFile1, 1024, (short) 2, 0xDEADDEADl);
DFSTestUtil.createFile(fs, tmpFile2, 1024, (short) 2, 0xDEADDEADl);
// Get locations of blocks of both files and concat together
BlockLocation[] blockLocs1 = fs.getFileBlockLocations(tmpFile1, 0, 1024);
BlockLocation[] blockLocs2 = fs.getFileBlockLocations(tmpFile2, 0, 1024);
BlockLocation[] blockLocs = (BlockLocation[]) ArrayUtils.addAll(blockLocs1,
blockLocs2);
// Fetch VolumeBlockLocations in batch
BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays
.asList(blockLocs));
int counter = 0;
// Print out the list of ids received for each block
for (BlockStorageLocation l : locs) {
for (int i = 0; i < l.getVolumeIds().length; i++) {
VolumeId id = l.getVolumeIds()[i];
String name = l.getNames()[i];
if (id != null) {
System.out.println("Datanode " + name + " has block " + counter
+ " on volume id " + id.toString());
}
}
counter++;
}
assertEquals("Expected two HdfsBlockLocations for two 1-block files", 2,
locs.length);
for (BlockStorageLocation l : locs) {
assertEquals("Expected two replicas for each block", 2,
l.getVolumeIds().length);
for (int i = 0; i < l.getVolumeIds().length; i++) {
VolumeId id = l.getVolumeIds()[i];
String name = l.getNames()[i];
assertTrue("Expected block to be valid on datanode " + name,
id.isValid());
}
}
counter++;
}
assertEquals("Expected two HdfsBlockLocations for two 1-block files", 2,
locs.length);
for (BlockStorageLocation l : locs) {
assertEquals("Expected two replicas for each block", 2,
l.getVolumeIds().length);
for (int i = 0; i < l.getVolumeIds().length; i++) {
VolumeId id = l.getVolumeIds()[i];
String name = l.getNames()[i];
assertTrue("Expected block to be valid on datanode " + name,
id.isValid());
}
} finally {
cluster.shutdown();
}
}
@ -686,27 +690,31 @@ public class TestDistributedFileSystem {
true);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2).build();
cluster.getDataNodes();
DistributedFileSystem fs = cluster.getFileSystem();
// Create a file
Path tmpFile = new Path("/tmpfile1.dat");
DFSTestUtil.createFile(fs, tmpFile, 1024, (short) 2, 0xDEADDEADl);
// Get locations of blocks of the file
BlockLocation[] blockLocs = fs.getFileBlockLocations(tmpFile, 0, 1024);
// Stop a datanode to simulate a failure
cluster.stopDataNode(0);
// Fetch VolumeBlockLocations
BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays
.asList(blockLocs));
try {
cluster.getDataNodes();
DistributedFileSystem fs = cluster.getFileSystem();
// Create a file
Path tmpFile = new Path("/tmpfile1.dat");
DFSTestUtil.createFile(fs, tmpFile, 1024, (short) 2, 0xDEADDEADl);
// Get locations of blocks of the file
BlockLocation[] blockLocs = fs.getFileBlockLocations(tmpFile, 0, 1024);
// Stop a datanode to simulate a failure
cluster.stopDataNode(0);
// Fetch VolumeBlockLocations
BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays
.asList(blockLocs));
assertEquals("Expected one HdfsBlockLocation for one 1-block file", 1,
locs.length);
assertEquals("Expected one HdfsBlockLocation for one 1-block file", 1,
locs.length);
for (BlockStorageLocation l : locs) {
assertEquals("Expected two replicas for each block", 2,
l.getVolumeIds().length);
assertTrue("Expected one valid and one invalid replica",
(l.getVolumeIds()[0].isValid()) ^ (l.getVolumeIds()[1].isValid()));
for (BlockStorageLocation l : locs) {
assertEquals("Expected two replicas for each block", 2,
l.getVolumeIds().length);
assertTrue("Expected one valid and one invalid replica",
(l.getVolumeIds()[0].isValid()) ^ (l.getVolumeIds()[1].isValid()));
}
} finally {
cluster.shutdown();
}
}

View File

@ -255,7 +255,6 @@ public class TestEditLogJournalFailures {
doThrow(new IOException("fail on setReadyToFlush()")).when(spyElos)
.setReadyToFlush();
}
doNothing().when(spyElos).abort();
}
private EditLogFileOutputStream spyOnStream(JournalAndStream jas) {

View File

@ -545,7 +545,11 @@ public class TestStartup {
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.build();
cluster.waitActive();
try {
cluster.waitActive();
} finally {
cluster.shutdown();
}
}
/**