HDFS-5115. Make StorageID a UUID.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1516666 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5e27288c1e
commit
6f699e8ea5
|
@ -23,13 +23,7 @@ import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.NavigableMap;
|
|
||||||
import java.util.TreeMap;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -54,15 +48,8 @@ import org.apache.hadoop.hdfs.server.namenode.HostFileManager.EntrySet;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.HostFileManager.MutableEntrySet;
|
import org.apache.hadoop.hdfs.server.namenode.HostFileManager.MutableEntrySet;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
|
import org.apache.hadoop.hdfs.server.protocol.*;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
|
||||||
import org.apache.hadoop.hdfs.util.CyclicIteration;
|
import org.apache.hadoop.hdfs.util.CyclicIteration;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
|
@ -657,24 +644,6 @@ public class DatanodeManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Generate new storage ID.
|
|
||||||
*
|
|
||||||
* @return unique storage ID
|
|
||||||
*
|
|
||||||
* Note: that collisions are still possible if somebody will try
|
|
||||||
* to bring in a data storage from a different cluster.
|
|
||||||
*/
|
|
||||||
private String newStorageID() {
|
|
||||||
String newID = null;
|
|
||||||
while(newID == null) {
|
|
||||||
newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
|
|
||||||
if (datanodeMap.get(newID) != null)
|
|
||||||
newID = null;
|
|
||||||
}
|
|
||||||
return newID;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register the given datanode with the namenode. NB: the given
|
* Register the given datanode with the namenode. NB: the given
|
||||||
* registration is mutated and given back to the datanode.
|
* registration is mutated and given back to the datanode.
|
||||||
|
@ -779,7 +748,7 @@ public class DatanodeManager {
|
||||||
if ("".equals(nodeReg.getStorageID())) {
|
if ("".equals(nodeReg.getStorageID())) {
|
||||||
// this data storage has never been registered
|
// this data storage has never been registered
|
||||||
// it is either empty or was created by pre-storageID version of DFS
|
// it is either empty or was created by pre-storageID version of DFS
|
||||||
nodeReg.setStorageID(newStorageID());
|
nodeReg.setStorageID(DatanodeStorage.newStorageID());
|
||||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
NameNode.stateChangeLog.debug(
|
NameNode.stateChangeLog.debug(
|
||||||
"BLOCK* NameSystem.registerDatanode: "
|
"BLOCK* NameSystem.registerDatanode: "
|
||||||
|
|
|
@ -67,15 +67,7 @@ import java.net.UnknownHostException;
|
||||||
import java.nio.channels.ClosedByInterruptException;
|
import java.nio.channels.ClosedByInterruptException;
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.AbstractList;
|
import java.util.*;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -138,12 +130,8 @@ import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
|
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
|
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.*;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
|
||||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||||
import org.apache.hadoop.hdfs.web.resources.Param;
|
import org.apache.hadoop.hdfs.web.resources.Param;
|
||||||
import org.apache.hadoop.http.HttpServer;
|
import org.apache.hadoop.http.HttpServer;
|
||||||
|
@ -1041,30 +1029,7 @@ public class DataNode extends Configured
|
||||||
|
|
||||||
public static void setNewStorageID(DatanodeID dnId) {
|
public static void setNewStorageID(DatanodeID dnId) {
|
||||||
LOG.info("Datanode is " + dnId);
|
LOG.info("Datanode is " + dnId);
|
||||||
dnId.setStorageID(createNewStorageId(dnId.getXferPort()));
|
dnId.setStorageID(DatanodeStorage.newStorageID());
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return a unique storage ID of form "DS-randInt-ipaddr-port-timestamp"
|
|
||||||
*/
|
|
||||||
static String createNewStorageId(int port) {
|
|
||||||
// It is unlikely that we will create a non-unique storage ID
|
|
||||||
// for the following reasons:
|
|
||||||
// a) SecureRandom is a cryptographically strong random number generator
|
|
||||||
// b) IP addresses will likely differ on different hosts
|
|
||||||
// c) DataNode xfer ports will differ on the same host
|
|
||||||
// d) StorageIDs will likely be generated at different times (in ms)
|
|
||||||
// A conflict requires that all four conditions are violated.
|
|
||||||
// NB: The format of this string can be changed in the future without
|
|
||||||
// requiring that old SotrageIDs be updated.
|
|
||||||
String ip = "unknownIP";
|
|
||||||
try {
|
|
||||||
ip = DNS.getDefaultIP("default");
|
|
||||||
} catch (UnknownHostException ignored) {
|
|
||||||
LOG.warn("Could not find an IP address for the \"default\" inteface.");
|
|
||||||
}
|
|
||||||
int rand = DFSUtil.getSecureRandom().nextInt(Integer.MAX_VALUE);
|
|
||||||
return "DS-" + rand + "-" + ip + "-" + port + "-" + Time.now();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Ensure the authentication method is kerberos */
|
/** Ensure the authentication method is kerberos */
|
||||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
|
@ -106,11 +107,11 @@ public class DataStorage extends Storage {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Create an ID for this storage. */
|
/** Create an ID for this storage. */
|
||||||
public synchronized void createStorageID(int datanodePort) {
|
public synchronized void createStorageID() {
|
||||||
if (storageID != null && !storageID.isEmpty()) {
|
if (storageID != null && !storageID.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
storageID = DataNode.createNewStorageId(datanodePort);
|
storageID = DatanodeStorage.newStorageID();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -194,7 +195,7 @@ public class DataStorage extends Storage {
|
||||||
}
|
}
|
||||||
|
|
||||||
// make sure we have storage id set - if not - generate new one
|
// make sure we have storage id set - if not - generate new one
|
||||||
createStorageID(datanode.getXferPort());
|
createStorageID();
|
||||||
|
|
||||||
// 3. Update all storages. Some of them might have just been formatted.
|
// 3. Update all storages. Some of them might have just been formatted.
|
||||||
this.writeAll();
|
this.writeAll();
|
||||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.server.protocol;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.StorageType;
|
import org.apache.hadoop.hdfs.StorageType;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class captures information of a storage in Datanode.
|
* Class captures information of a storage in Datanode.
|
||||||
*/
|
*/
|
||||||
|
@ -64,4 +66,14 @@ public class DatanodeStorage {
|
||||||
public StorageType getStorageType() {
|
public StorageType getStorageType() {
|
||||||
return storageType;
|
return storageType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate new storage ID. The format of this string can be changed
|
||||||
|
* in the future without requiring that old SotrageIDs be updated.
|
||||||
|
*
|
||||||
|
* @return unique storage ID
|
||||||
|
*/
|
||||||
|
public static String newStorageID() {
|
||||||
|
return "DS-" + UUID.randomUUID();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,11 +22,7 @@ import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Random;
|
|
||||||
|
|
||||||
import javax.management.NotCompliantMBeanException;
|
import javax.management.NotCompliantMBeanException;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
@ -74,7 +70,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
@Override
|
@Override
|
||||||
public SimulatedFSDataset newInstance(DataNode datanode,
|
public SimulatedFSDataset newInstance(DataNode datanode,
|
||||||
DataStorage storage, Configuration conf) throws IOException {
|
DataStorage storage, Configuration conf) throws IOException {
|
||||||
return new SimulatedFSDataset(datanode, storage, conf);
|
return new SimulatedFSDataset(storage, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -386,13 +382,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
private final SimulatedStorage storage;
|
private final SimulatedStorage storage;
|
||||||
private final String storageId;
|
private final String storageId;
|
||||||
|
|
||||||
public SimulatedFSDataset(DataNode datanode, DataStorage storage,
|
public SimulatedFSDataset(DataStorage storage, Configuration conf) {
|
||||||
Configuration conf) {
|
|
||||||
if (storage != null) {
|
if (storage != null) {
|
||||||
storage.createStorageID(datanode.getXferPort());
|
storage.createStorageID();
|
||||||
this.storageId = storage.getStorageID();
|
this.storageId = storage.getStorageID();
|
||||||
} else {
|
} else {
|
||||||
this.storageId = "unknownStorageId" + new Random().nextInt();
|
this.storageId = "unknownStorageId-" + UUID.randomUUID();
|
||||||
}
|
}
|
||||||
registerMBean(storageId);
|
registerMBean(storageId);
|
||||||
this.storage = new SimulatedStorage(
|
this.storage = new SimulatedStorage(
|
||||||
|
|
|
@ -102,7 +102,7 @@ public class TestBPOfferService {
|
||||||
.when(mockDn).getMetrics();
|
.when(mockDn).getMetrics();
|
||||||
|
|
||||||
// Set up a simulated dataset with our fake BP
|
// Set up a simulated dataset with our fake BP
|
||||||
mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, null, conf));
|
mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
|
||||||
mockFSDataset.addBlockPool(FAKE_BPID, conf);
|
mockFSDataset.addBlockPool(FAKE_BPID, conf);
|
||||||
|
|
||||||
// Wire the dataset to the DN.
|
// Wire the dataset to the DN.
|
||||||
|
|
|
@ -311,7 +311,7 @@ public class TestSimulatedFSDataset {
|
||||||
}
|
}
|
||||||
|
|
||||||
private SimulatedFSDataset getSimulatedFSDataset() {
|
private SimulatedFSDataset getSimulatedFSDataset() {
|
||||||
SimulatedFSDataset fsdataset = new SimulatedFSDataset(null, null, conf);
|
SimulatedFSDataset fsdataset = new SimulatedFSDataset(null, conf);
|
||||||
fsdataset.addBlockPool(bpid, conf);
|
fsdataset.addBlockPool(bpid, conf);
|
||||||
return fsdataset;
|
return fsdataset;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue