HDFS-2878. Merge r1242995 from trunk to 0.23
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1298236 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
985eb07407
commit
1a6236752c
|
@ -113,6 +113,9 @@ Release 0.23.3 - UNRELEASED
|
||||||
HDFS-2410. Further cleanup of hardcoded configuration keys and values.
|
HDFS-2410. Further cleanup of hardcoded configuration keys and values.
|
||||||
(suresh)
|
(suresh)
|
||||||
|
|
||||||
|
HDFS-2878. Fix TestBlockRecovery and move it back into main test directory.
|
||||||
|
(todd)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
HDFS-2477. Optimize computing the diff between a block report and the
|
HDFS-2477. Optimize computing the diff between a block report and the
|
||||||
namenode state. (Tomasz Nykiel via hairong)
|
namenode state. (Tomasz Nykiel via hairong)
|
||||||
|
|
|
@ -229,8 +229,7 @@ class BPOfferService implements Runnable {
|
||||||
|
|
||||||
private void connectToNNAndHandshake() throws IOException {
|
private void connectToNNAndHandshake() throws IOException {
|
||||||
// get NN proxy
|
// get NN proxy
|
||||||
bpNamenode = new DatanodeProtocolClientSideTranslatorPB(nnAddr,
|
bpNamenode = dn.connectToNN(nnAddr);
|
||||||
dn.getConf());
|
|
||||||
|
|
||||||
// First phase of the handshake with NN - get the namespace
|
// First phase of the handshake with NN - get the namespace
|
||||||
// info.
|
// info.
|
||||||
|
|
|
@ -988,6 +988,14 @@ public class DataNode extends Configured
|
||||||
SocketChannel.open().socket() : new Socket();
|
SocketChannel.open().socket() : new Socket();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connect to the NN. This is separated out for easier testing.
|
||||||
|
*/
|
||||||
|
DatanodeProtocolClientSideTranslatorPB connectToNN(
|
||||||
|
InetSocketAddress nnAddr) throws IOException {
|
||||||
|
return new DatanodeProtocolClientSideTranslatorPB(nnAddr, conf);
|
||||||
|
}
|
||||||
|
|
||||||
public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
|
public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
|
||||||
DatanodeID datanodeid, final Configuration conf, final int socketTimeout)
|
DatanodeID datanodeid, final Configuration conf, final int socketTimeout)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -1936,8 +1944,10 @@ public class DataNode extends Configured
|
||||||
public DatanodeProtocolClientSideTranslatorPB getBPNamenode(String bpid)
|
public DatanodeProtocolClientSideTranslatorPB getBPNamenode(String bpid)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
BPOfferService bpos = blockPoolManager.get(bpid);
|
BPOfferService bpos = blockPoolManager.get(bpid);
|
||||||
if(bpos == null || bpos.bpNamenode == null) {
|
if (bpos == null) {
|
||||||
throw new IOException("cannot find a namnode proxy for bpid=" + bpid);
|
throw new IOException("No block pool offer service for bpid=" + bpid);
|
||||||
|
} else if (bpos.bpNamenode == null) {
|
||||||
|
throw new IOException("cannot find a namenode proxy for bpid=" + bpid);
|
||||||
}
|
}
|
||||||
return bpos.bpNamenode;
|
return bpos.bpNamenode;
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
@ -31,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord;
|
||||||
|
@ -39,23 +41,30 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -72,6 +81,8 @@ public class TestBlockRecovery {
|
||||||
private final static long RECOVERY_ID = 3000L;
|
private final static long RECOVERY_ID = 3000L;
|
||||||
private final static String CLUSTER_ID = "testClusterID";
|
private final static String CLUSTER_ID = "testClusterID";
|
||||||
private final static String POOL_ID = "BP-TEST";
|
private final static String POOL_ID = "BP-TEST";
|
||||||
|
private final static InetSocketAddress NN_ADDR = new InetSocketAddress(
|
||||||
|
"localhost", 5020);
|
||||||
private final static long BLOCK_ID = 1000L;
|
private final static long BLOCK_ID = 1000L;
|
||||||
private final static long GEN_STAMP = 2000L;
|
private final static long GEN_STAMP = 2000L;
|
||||||
private final static long BLOCK_LEN = 3000L;
|
private final static long BLOCK_LEN = 3000L;
|
||||||
|
@ -80,9 +91,6 @@ public class TestBlockRecovery {
|
||||||
private final static ExtendedBlock block = new ExtendedBlock(POOL_ID,
|
private final static ExtendedBlock block = new ExtendedBlock(POOL_ID,
|
||||||
BLOCK_ID, BLOCK_LEN, GEN_STAMP);
|
BLOCK_ID, BLOCK_LEN, GEN_STAMP);
|
||||||
|
|
||||||
private final NamespaceInfo nsifno =
|
|
||||||
new NamespaceInfo(1,CLUSTER_ID, POOL_ID, 2, 3);
|
|
||||||
|
|
||||||
static {
|
static {
|
||||||
((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
|
((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
|
||||||
((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
|
((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
|
||||||
|
@ -99,21 +107,54 @@ public class TestBlockRecovery {
|
||||||
conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
|
conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
|
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
FileSystem.setDefaultUri(conf, "hdfs://localhost:5020");
|
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
||||||
|
FileSystem.setDefaultUri(conf,
|
||||||
|
"hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort());
|
||||||
ArrayList<File> dirs = new ArrayList<File>();
|
ArrayList<File> dirs = new ArrayList<File>();
|
||||||
File dataDir = new File(DATA_DIR);
|
File dataDir = new File(DATA_DIR);
|
||||||
FileUtil.fullyDelete(dataDir);
|
FileUtil.fullyDelete(dataDir);
|
||||||
dataDir.mkdirs();
|
dataDir.mkdirs();
|
||||||
dirs.add(dataDir);
|
dirs.add(dataDir);
|
||||||
DatanodeProtocol namenode = mock(DatanodeProtocol.class);
|
final DatanodeProtocolClientSideTranslatorPB namenode =
|
||||||
|
mock(DatanodeProtocolClientSideTranslatorPB.class);
|
||||||
|
|
||||||
|
Mockito.doAnswer(new Answer<DatanodeRegistration>() {
|
||||||
|
@Override
|
||||||
|
public DatanodeRegistration answer(InvocationOnMock invocation)
|
||||||
|
throws Throwable {
|
||||||
|
return (DatanodeRegistration) invocation.getArguments()[0];
|
||||||
|
}
|
||||||
|
}).when(namenode).registerDatanode(
|
||||||
|
Mockito.any(DatanodeRegistration.class),
|
||||||
|
Mockito.any(DatanodeStorage[].class));
|
||||||
|
|
||||||
when(namenode.versionRequest()).thenReturn(new NamespaceInfo
|
when(namenode.versionRequest()).thenReturn(new NamespaceInfo
|
||||||
(1, CLUSTER_ID, POOL_ID, 1L, 1));
|
(1, CLUSTER_ID, POOL_ID, 1L, 1));
|
||||||
when(namenode.sendHeartbeat(any(DatanodeRegistration.class), anyLong(),
|
|
||||||
anyLong(), anyLong(), anyLong(), anyInt(), anyInt(), anyInt()))
|
|
||||||
.thenReturn(new DatanodeCommand[0]);
|
|
||||||
dn = new DataNode(conf, dirs, null);
|
|
||||||
|
|
||||||
DataNodeTestUtils.setBPNamenodeByIndex(dn, nsifno, POOL_ID, namenode);
|
when(namenode.sendHeartbeat(
|
||||||
|
Mockito.any(DatanodeRegistration.class),
|
||||||
|
Mockito.any(StorageReport[].class),
|
||||||
|
Mockito.anyInt(),
|
||||||
|
Mockito.anyInt(),
|
||||||
|
Mockito.anyInt()))
|
||||||
|
.thenReturn(new DatanodeCommand[0]);
|
||||||
|
|
||||||
|
dn = new DataNode(conf, dirs, null) {
|
||||||
|
@Override
|
||||||
|
DatanodeProtocolClientSideTranslatorPB connectToNN(
|
||||||
|
InetSocketAddress nnAddr) throws IOException {
|
||||||
|
Assert.assertEquals(NN_ADDR, nnAddr);
|
||||||
|
return namenode;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
dn.runDatanodeDaemon();
|
||||||
|
while (!dn.isDatanodeFullyStarted()) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(50);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
fail("Interrupted starting DN");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -355,9 +396,11 @@ public class TestBlockRecovery {
|
||||||
|
|
||||||
private Collection<RecoveringBlock> initRecoveringBlocks() throws IOException {
|
private Collection<RecoveringBlock> initRecoveringBlocks() throws IOException {
|
||||||
Collection<RecoveringBlock> blocks = new ArrayList<RecoveringBlock>(1);
|
Collection<RecoveringBlock> blocks = new ArrayList<RecoveringBlock>(1);
|
||||||
|
DatanodeInfo mockOtherDN = new DatanodeInfo(
|
||||||
|
new DatanodeID("127.0.0.1", "storage-1234", 0, 0));
|
||||||
DatanodeInfo[] locs = new DatanodeInfo[] {
|
DatanodeInfo[] locs = new DatanodeInfo[] {
|
||||||
new DatanodeInfo(dn.getDNRegistrationForBP(block.getBlockPoolId())),
|
new DatanodeInfo(dn.getDNRegistrationForBP(block.getBlockPoolId())),
|
||||||
mock(DatanodeInfo.class) };
|
mockOtherDN };
|
||||||
RecoveringBlock rBlock = new RecoveringBlock(block, locs, RECOVERY_ID);
|
RecoveringBlock rBlock = new RecoveringBlock(block, locs, RECOVERY_ID);
|
||||||
blocks.add(rBlock);
|
blocks.add(rBlock);
|
||||||
return blocks;
|
return blocks;
|
||||||
|
@ -495,7 +538,8 @@ public class TestBlockRecovery {
|
||||||
ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(block);
|
ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(block);
|
||||||
BlockWriteStreams streams = null;
|
BlockWriteStreams streams = null;
|
||||||
try {
|
try {
|
||||||
streams = replicaInfo.createStreams(true, 0, 0);
|
streams = replicaInfo.createStreams(true,
|
||||||
|
DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512));
|
||||||
streams.checksumOut.write('a');
|
streams.checksumOut.write('a');
|
||||||
dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1));
|
dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1));
|
||||||
try {
|
try {
|
Loading…
Reference in New Issue