HDFS-6005. Simplify Datanode rollback and downgrade. (Contributed by Suresh Srinivas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1571431 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2014-02-24 20:46:35 +00:00
parent 8e7a2b8d5d
commit dbf14320c0
11 changed files with 302 additions and 337 deletions

View File

@ -75,3 +75,7 @@ HDFS-5535 subtasks:
HDFS-5994. Fix TestDataNodeRollingUpgrade. (Arpit Agarwal via szetszwo)
HDFS-5999. Do not create rollback fsimage when it already exists. (jing9)
HDFS-6005. Simplify Datanode rollback and downgrade. (Suresh Srinivas via
Arpit Agarwal)

View File

@ -17,13 +17,10 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@ -32,24 +29,15 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* One instance per block-pool/namespace on the DN, which handles the
@ -419,9 +407,9 @@ class BPOfferService {
*/
void signalRollingUpgrade(boolean inProgress) {
if (inProgress) {
dn.getFSDataset().enableDeleteToTrash(getBlockPoolId());
dn.getFSDataset().enableTrash(getBlockPoolId());
} else {
dn.getFSDataset().disableAndPurgeTrashStorage(getBlockPoolId());
dn.getFSDataset().restoreTrash(getBlockPoolId());
}
}

View File

@ -18,15 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
@ -40,7 +32,14 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.util.Daemon;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Manages storage for the set of BlockPoolSlices which share a particular
@ -174,7 +173,7 @@ public class BlockPoolSliceStorage extends Storage {
/**
* Format a block pool slice storage.
* @param sd the block pool storage
* @param bpSdir the block pool storage
* @param nsInfo the name space info
* @throws IOException Signals that an I/O exception has occurred.
*/
@ -212,7 +211,7 @@ public class BlockPoolSliceStorage extends Storage {
if (!blockpoolID.equals("") && !blockpoolID.equals(bpid)) {
throw new InconsistentFSStateException(storage,
"Unexepcted blockpoolID " + bpid + " . Expected " + blockpoolID);
"Unexpected blockpoolID " + bpid + ". Expected " + blockpoolID);
}
blockpoolID = bpid;
}
@ -236,7 +235,6 @@ public class BlockPoolSliceStorage extends Storage {
* Upgrade if this.LV > LAYOUT_VERSION || this.cTime < namenode.cTime Regular
* startup if this.LV = LAYOUT_VERSION && this.cTime = namenode.cTime
*
* @param dn DataNode to which this storage belongs to
* @param sd storage directory <SD>/current/<bpid>
* @param nsInfo namespace info
* @param startOpt startup option
@ -246,13 +244,13 @@ public class BlockPoolSliceStorage extends Storage {
NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
if (startOpt == StartupOption.ROLLBACK) {
doRollback(sd, nsInfo); // rollback if applicable
} else if (StartupOption.isRollingUpgradeRollback(startOpt)) {
File trashRoot = getTrashRootDir(sd);
int filesRestored =
trashRoot.exists() ? restoreBlockFilesFromTrash(trashRoot) : 0;
LOG.info("Restored " + filesRestored + " block files from trash.");
} else {
// Restore all the files in the trash. The restored files are retained
// during rolling upgrade rollback. They are deleted during rolling
// upgrade downgrade.
int restored = restoreBlockFilesFromTrash(getTrashRootDir(sd));
LOG.info("Restored " + restored + " block files from trash.");
}
readProperties(sd);
checkVersionUpgradable(this.layoutVersion);
assert this.layoutVersion >= HdfsConstants.DATANODE_LAYOUT_VERSION
@ -335,7 +333,8 @@ public class BlockPoolSliceStorage extends Storage {
File bpTmpDir = bpSd.getPreviousTmp();
assert !bpTmpDir.exists() : "previous.tmp directory must not exist.";
// 2. Rename <SD>/curernt/<bpid>/current to <SD>/curernt/<bpid>/previous.tmp
// 2. Rename <SD>/current/<bpid>/current to
// <SD>/current/<bpid>/previous.tmp
rename(bpCurDir, bpTmpDir);
// 3. Create new <SD>/current with block files hardlinks and VERSION
@ -346,7 +345,8 @@ public class BlockPoolSliceStorage extends Storage {
this.cTime = nsInfo.getCTime();
writeProperties(bpSd);
// 4.rename <SD>/curernt/<bpid>/previous.tmp to <SD>/curernt/<bpid>/previous
// 4.rename <SD>/current/<bpid>/previous.tmp to
// <SD>/current/<bpid>/previous
rename(bpTmpDir, bpPrevDir);
LOG.info("Upgrade of block pool " + blockpoolID + " at " + bpSd.getRoot()
+ " is complete");
@ -380,15 +380,17 @@ public class BlockPoolSliceStorage extends Storage {
/**
* Restore all files from the trash directory to their corresponding
* locations under current/
*
* @param trashRoot
* @throws IOException
*/
private int restoreBlockFilesFromTrash(File trashRoot) throws IOException {
private int restoreBlockFilesFromTrash(File trashRoot)
throws IOException {
int filesRestored = 0;
File restoreDirectory = null;
File[] children = trashRoot.exists() ? trashRoot.listFiles() : null;
if (children == null) {
return 0;
}
for (File child : trashRoot.listFiles()) {
File restoreDirectory = null;
for (File child : children) {
if (child.isDirectory()) {
// Recurse to process subdirectories.
filesRestored += restoreBlockFilesFromTrash(child);
@ -408,7 +410,7 @@ public class BlockPoolSliceStorage extends Storage {
}
++filesRestored;
}
FileUtil.fullyDelete(trashRoot);
return filesRestored;
}
@ -527,9 +529,6 @@ public class BlockPoolSliceStorage extends Storage {
/**
* gets the data node storage directory based on block pool storage
*
* @param bpRoot
* @return
*/
private static String getDataNodeStorageRoot(String bpRoot) {
Matcher matcher = BLOCK_POOL_PATH_PATTERN.matcher(bpRoot);
@ -571,7 +570,6 @@ public class BlockPoolSliceStorage extends Storage {
* The subdirectory structure under trash/ mirrors that under current/ to keep
* implicit memory of where the files are to be restored (if necessary).
*
* @param blockFile
* @return the trash directory for a given block file that is being deleted.
*/
public String getTrashDirectory(File blockFile) {
@ -587,7 +585,6 @@ public class BlockPoolSliceStorage extends Storage {
* The subdirectory structure under trash/ mirrors that under current/ to keep
* implicit memory of where the files are to be restored.
*
* @param blockFile
* @return the target directory to restore a previously deleted block file.
*/
@VisibleForTesting
@ -601,9 +598,26 @@ public class BlockPoolSliceStorage extends Storage {
/**
* Delete all files and directories in the trash directories.
*/
public void emptyTrash() {
public void restoreTrash() {
for (StorageDirectory sd : storageDirs) {
FileUtil.fullyDelete(getTrashRootDir(sd));
File trashRoot = getTrashRootDir(sd);
try {
restoreBlockFilesFromTrash(trashRoot);
FileUtil.fullyDelete(getTrashRootDir(sd));
} catch (IOException ioe) {
LOG.warn("Restoring trash failed for storage directory " + sd);
}
}
}
/** trash is enabled if at least one storage directory contains trash root */
@VisibleForTesting
public boolean trashEnabled() {
for (StorageDirectory sd : storageDirs) {
if (getTrashRootDir(sd).exists()) {
return true;
}
}
return false;
}
}

View File

@ -17,42 +17,10 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -69,41 +37,17 @@ import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.net.DomainPeerServer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.datatransfer.*;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.protocolPB.*;
import org.apache.hadoop.hdfs.security.token.block.*;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
@ -115,11 +59,7 @@ import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMetho
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.http.HttpConfig;
@ -142,21 +82,24 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.*;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
import javax.management.ObjectName;
import java.io.*;
import java.lang.management.ManagementFactory;
import java.net.*;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.util.ExitUtil.terminate;
/**********************************************************
* DataNode is a class (and program) that stores a set of
@ -1770,7 +1713,6 @@ public class DataNode extends Configured
}
if (!parseArguments(args, conf)) {
LOG.error("Bad command line arguments");
printUsage(System.err);
return null;
}
@ -1940,18 +1882,6 @@ public class DataNode extends Configured
startOpt = StartupOption.ROLLBACK;
} else if (StartupOption.REGULAR.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.REGULAR;
} else if (StartupOption.ROLLINGUPGRADE.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.ROLLINGUPGRADE;
if ((i < args.length ) &&
(args[i].equalsIgnoreCase(RollingUpgradeStartupOption.ROLLBACK.toString()))) {
startOpt.setRollingUpgradeStartupOption(args[i++]);
} else {
LOG.error("Missing or unrecognized option to " + StartupOption.ROLLINGUPGRADE);
return false;
}
LOG.info("Rolling upgrade rollback requested via startup option");
} else {
return false;
}
@ -2579,4 +2509,9 @@ public class DataNode extends Configured
boolean shouldRun() {
return shouldRun;
}
@VisibleForTesting
DataStorage getStorage() {
return storage;
}
}

View File

@ -18,22 +18,10 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -50,6 +38,11 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
import java.io.*;
import java.nio.channels.FileLock;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* Data storage information file.
* <p>
@ -95,7 +88,7 @@ public class DataStorage extends Storage {
new ConcurrentHashMap<String, Boolean>());
}
public StorageInfo getBPStorage(String bpid) {
public BlockPoolSliceStorage getBPStorage(String bpid) {
return bpStorageMap.get(bpid);
}
@ -120,9 +113,6 @@ public class DataStorage extends Storage {
/**
* Enable trash for the specified block pool storage.
*
* @param bpid
* @param inProgress
*/
public void enableTrash(String bpid) {
if (trashEnabledBpids.add(bpid)) {
@ -130,18 +120,16 @@ public class DataStorage extends Storage {
}
}
/**
* Disable trash for the specified block pool storage.
* Existing files in trash are purged i.e. permanently deleted.
*
* @param bpid
* @param inProgress
*/
public void disableAndPurgeTrash(String bpid) {
if (trashEnabledBpids.remove(bpid)) {
LOG.info("Disabled trash for bpid " + bpid);
public void restoreTrash(String bpid) {
if (trashEnabledBpids.contains(bpid)) {
getBPStorage(bpid).restoreTrash();
trashEnabledBpids.remove(bpid);
LOG.info("Restored trash for bpid " + bpid);
}
((BlockPoolSliceStorage) getBPStorage(bpid)).emptyTrash();
}
public boolean trashEnabled(String bpid) {
return trashEnabledBpids.contains(bpid);
}
/**
@ -150,7 +138,6 @@ public class DataStorage extends Storage {
* 'trash' directory. If there is a subsequent rollback, then the block
* files will be restored from trash.
*
* @param blockFile
* @return trash directory if rolling upgrade is in progress, null
* otherwise.
*/
@ -242,7 +229,7 @@ public class DataStorage extends Storage {
// 3. Update all storages. Some of them might have just been formatted.
this.writeAll();
// 4. mark DN storage is initilized
// 4. mark DN storage is initialized
this.initialized = true;
}
@ -724,9 +711,11 @@ public class DataStorage extends Storage {
/*
* Finalize the upgrade for a block pool
* This also empties trash created during rolling upgrade and disables
* trash functionality.
*/
void finalizeUpgrade(String bpID) throws IOException {
// To handle finalizing a snapshot taken at datanode level while
// To handle finalizing a snapshot taken at datanode level while
// upgrading to federation, if datanode level snapshot previous exists,
// then finalize it. Else finalize the corresponding BP.
for (StorageDirectory sd : storageDirs) {

View File

@ -417,11 +417,16 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* moved to a separate trash directory instead of being deleted immediately.
* This can be useful for example during rolling upgrades.
*/
public void enableDeleteToTrash(String bpid);
public void enableTrash(String bpid);
/**
* Disable 'trash' for the given dataset and purge existing files in 'trash'.
* Restore trash
*/
public void disableAndPurgeTrashStorage(String bpid);
public void restoreTrash(String bpid);
/**
* @return true when trash is enabled
*/
public boolean trashEnabled(String bpid);
}

View File

@ -17,27 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -45,37 +24,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -89,6 +43,15 @@ import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.Executor;
/**************************************************
* FSDataset manages a set of data blocks. Each block
* has a unique name and an extent on disk.
@ -1894,13 +1857,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override
public void enableDeleteToTrash(String bpid) {
public void enableTrash(String bpid) {
dataStorage.enableTrash(bpid);
}
@Override
public void disableAndPurgeTrashStorage(String bpid) {
dataStorage.disableAndPurgeTrash(bpid);
public void restoreTrash(String bpid) {
dataStorage.restoreTrash(bpid);
}
@Override
public boolean trashEnabled(String bpid) {
return dataStorage.trashEnabled(bpid);
}
@Override

View File

@ -17,10 +17,6 @@
*/
package org.apache.hadoop.hdfs;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -39,6 +35,10 @@ import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
/**
* This class tests rolling upgrade.
@ -46,8 +46,13 @@ import org.junit.Test;
public class TestRollingUpgrade {
private static final Log LOG = LogFactory.getLog(TestRollingUpgrade.class);
private void runCmd(DFSAdmin dfsadmin, String... args) throws Exception {
Assert.assertEquals(0, dfsadmin.run(args));
public static void runCmd(DFSAdmin dfsadmin, boolean success,
String... args) throws Exception {
if (success) {
Assert.assertEquals(0, dfsadmin.run(args));
} else {
Assert.assertTrue(dfsadmin.run(args) != 0);
}
}
/**
@ -71,30 +76,29 @@ public class TestRollingUpgrade {
final DFSAdmin dfsadmin = new DFSAdmin(conf);
dfs.mkdirs(foo);
{
//illegal argument
final String[] args = {"-rollingUpgrade", "abc"};
Assert.assertTrue(dfsadmin.run(args) != 0);
}
//illegal argument "abc" to rollingUpgrade option
runCmd(dfsadmin, false, "-rollingUpgrade", "abc");
//query rolling upgrade
runCmd(dfsadmin, "-rollingUpgrade");
runCmd(dfsadmin, true, "-rollingUpgrade");
//start rolling upgrade
runCmd(dfsadmin, "-rollingUpgrade", "start");
runCmd(dfsadmin, true, "-rollingUpgrade", "start");
//query rolling upgrade
runCmd(dfsadmin, "-rollingUpgrade", "query");
runCmd(dfsadmin, true, "-rollingUpgrade", "query");
dfs.mkdirs(bar);
//finalize rolling upgrade
runCmd(dfsadmin, "-rollingUpgrade", "finalize");
runCmd(dfsadmin, true, "-rollingUpgrade", "finalize");
dfs.mkdirs(baz);
runCmd(dfsadmin, "-rollingUpgrade");
runCmd(dfsadmin, true, "-rollingUpgrade");
// All directories created before upgrade, when upgrade in progress and
// after upgrade finalize exists
Assert.assertTrue(dfs.exists(foo));
Assert.assertTrue(dfs.exists(bar));
Assert.assertTrue(dfs.exists(baz));
@ -104,6 +108,7 @@ public class TestRollingUpgrade {
dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
}
// Ensure directories exist after restart
cluster.restartNameNode();
{
final DistributedFileSystem dfs = cluster.getFileSystem();

View File

@ -1055,13 +1055,17 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
public void enableDeleteToTrash(String bpid) {
public void enableTrash(String bpid) {
throw new UnsupportedOperationException();
}
@Override
public void disableAndPurgeTrashStorage(String bpid) {
// do nothing
public void restoreTrash(String bpid) {
}
@Override
public boolean trashEnabled(String bpid) {
return false;
}
@Override

View File

@ -20,23 +20,22 @@ package org.apache.hadoop.hdfs.server.datanode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.log4j.Level;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import static org.apache.hadoop.hdfs.MiniDFSCluster.*;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;
/**
* Ensure that the DataNode correctly handles rolling upgrade
@ -47,132 +46,190 @@ public class TestDataNodeRollingUpgrade {
private static final short REPL_FACTOR = 1;
private static final int BLOCK_SIZE = 1024 * 1024;
private static final long FILE_SIZE = BLOCK_SIZE * 4;
private static final long FILE_SIZE = BLOCK_SIZE;
private static final long SEED = 0x1BADF00DL;
Configuration conf;
MiniDFSCluster cluster = null;
DistributedFileSystem fs;
DistributedFileSystem fs = null;
DataNode dn = null;
NameNode nn = null;
String blockPoolId = null;
private void runCmd(DFSAdmin dfsadmin, String... args) throws Exception {
assertThat(dfsadmin.run(args), is(0));
private void startCluster() throws IOException {
conf = new HdfsConfiguration();
cluster = new Builder(conf).numDataNodes(REPL_FACTOR).build();
cluster.waitActive();
fs = cluster.getFileSystem();
nn = cluster.getNameNode(0);
assertNotNull(nn);
dn = cluster.getDataNodes().get(0);
assertNotNull(dn);
blockPoolId = cluster.getNameNode(0).getNamesystem().getBlockPoolId();
}
private void shutdownCluster() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
fs = null;
nn = null;
dn = null;
blockPoolId = null;
}
private void triggerHeartBeats() throws Exception {
// Sleep briefly so that DN learns of the rolling upgrade
// state and other states from heartbeats.
cluster.triggerHeartbeats();
Thread.sleep(5000);
}
/** Test assumes that the file has a single block */
private File getBlockForFile(Path path, boolean exists) throws IOException {
LocatedBlocks blocks = nn.getRpcServer().getBlockLocations(path.toString(),
0, Long.MAX_VALUE);
assertEquals(1, blocks.getLocatedBlocks().size());
ExtendedBlock block = blocks.getLocatedBlocks().get(0).getBlock();
BlockLocalPathInfo bInfo = dn.getFSDataset().getBlockLocalPathInfo(block);
File blockFile = new File(bInfo.getBlockPath());
assertEquals(exists, blockFile.exists());
return blockFile;
}
private File getTrashFileForBlock(File blockFile, boolean exists) {
File trashFile = new File(
dn.getStorage().getTrashDirectoryForBlockFile(blockPoolId, blockFile));
assertEquals(exists, trashFile.exists());
return trashFile;
}
/**
* Ensures that the blocks belonging to the deleted file are in trash
*/
private void deleteAndEnsureInTrash(Path pathToDelete,
File blockFile, File trashFile) throws Exception {
assertTrue(blockFile.exists());
assertFalse(trashFile.exists());
// Now delete the file and ensure the corresponding block in trash
LOG.info("Deleting file " + pathToDelete + " during rolling upgrade");
fs.delete(pathToDelete, false);
assert(!fs.exists(pathToDelete));
triggerHeartBeats();
assertTrue(trashFile.exists());
assertFalse(blockFile.exists());
}
private void ensureTrashDisabled() {
// Trash is disabled; trash root does not exist
assertFalse(dn.getFSDataset().trashEnabled(blockPoolId));
BlockPoolSliceStorage bps = dn.getStorage().getBPStorage(blockPoolId);
assertFalse(bps.trashEnabled());
}
/**
* Ensures that the blocks from trash are restored
*/
private void ensureTrashRestored(File blockFile, File trashFile)
throws Exception {
assertTrue(blockFile.exists());
assertFalse(trashFile.exists());
ensureTrashDisabled();
}
private void startRollingUpgrade() throws Exception {
LOG.info("Starting rolling upgrade");
final DFSAdmin dfsadmin = new DFSAdmin(conf);
runCmd(dfsadmin, "-rollingUpgrade", "start");
TestRollingUpgrade.runCmd(dfsadmin, true, "-rollingUpgrade", "start");
triggerHeartBeats();
// Ensure datanode rolling upgrade is started
assertTrue(dn.getFSDataset().trashEnabled(blockPoolId));
}
private void finalizeRollingUpgrade() throws Exception {
LOG.info("Finalizing rolling upgrade");
final DFSAdmin dfsadmin = new DFSAdmin(conf);
runCmd(dfsadmin, "-rollingUpgrade", "finalize");
TestRollingUpgrade.runCmd(dfsadmin, true, "-rollingUpgrade", "finalize");
triggerHeartBeats();
// Ensure datanode rolling upgrade is started
assertFalse(dn.getFSDataset().trashEnabled(blockPoolId));
BlockPoolSliceStorage bps = dn.getStorage().getBPStorage(blockPoolId);
assertFalse(bps.trashEnabled());
}
private void rollbackRollingUpgrade() throws Exception {
// Shutdown datanodes and namenodes
// Restart the namenode with rolling upgrade rollback
LOG.info("Starting rollback of the rolling upgrade");
// Shutdown the DN and the NN in preparation for rollback.
DataNodeProperties dnprop = cluster.stopDataNode(0);
MiniDFSCluster.DataNodeProperties dnprop = cluster.stopDataNode(0);
cluster.shutdownNameNodes();
// Restart the daemons with rollback flags.
cluster.restartNameNode("-rollingupgrade", "rollback");
dnprop.setDnArgs("-rollingupgrade", "rollback");
cluster.restartDataNode(dnprop);
cluster.waitActive();
nn = cluster.getNameNode(0);
dn = cluster.getDataNodes().get(0);
triggerHeartBeats();
}
@Test (timeout=600000)
public void testDatanodeRollingUpgradeWithFinalize() throws Exception {
// start a cluster
try {
// Start a cluster.
conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL_FACTOR).build();
cluster.waitActive();
fs = cluster.getFileSystem();
Path testFile1 = new Path("/TestDataNodeRollingUpgrade1.dat");
Path testFile2 = new Path("/TestDataNodeRollingUpgrade2.dat");
startCluster();
// Create files in DFS.
Path testFile1 = new Path("/TestDataNodeRollingUpgrade1.dat");
Path testFile2 = new Path("/TestDataNodeRollingUpgrade2.dat");
DFSTestUtil.createFile(fs, testFile1, FILE_SIZE, REPL_FACTOR, SEED);
DFSTestUtil.createFile(fs, testFile2, FILE_SIZE, REPL_FACTOR, SEED);
startRollingUpgrade();
// Sleep briefly so that DN learns of the rolling upgrade
// from heartbeats.
cluster.triggerHeartbeats();
Thread.sleep(5000);
fs.delete(testFile2, false);
// Sleep briefly so that block files can be moved to trash
// (this is scheduled for asynchronous execution).
cluster.triggerBlockReports();
Thread.sleep(5000);
File blockFile = getBlockForFile(testFile2, true);
File trashFile = getTrashFileForBlock(blockFile, false);
deleteAndEnsureInTrash(testFile2, blockFile, trashFile);
finalizeRollingUpgrade();
// Ensure that testFile2 stays deleted.
// Ensure that delete file testFile2 stays deleted after finalize
ensureTrashDisabled();
assert(!fs.exists(testFile2));
assert(fs.exists(testFile1));
} finally {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
shutdownCluster();
}
}
@Test (timeout=600000)
public void testDatanodeRollingUpgradeWithRollback() throws Exception {
// start a cluster
try {
// Start a cluster.
conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL_FACTOR).build();
cluster.waitActive();
fs = cluster.getFileSystem();
Path testFile1 = new Path("/TestDataNodeRollingUpgrade1.dat");
startCluster();
// Create files in DFS.
DFSTestUtil.createFile(fs, testFile1, BLOCK_SIZE, BLOCK_SIZE, FILE_SIZE, REPL_FACTOR, SEED);
Path testFile1 = new Path("/TestDataNodeRollingUpgrade1.dat");
DFSTestUtil.createFile(fs, testFile1, FILE_SIZE, REPL_FACTOR, SEED);
String fileContents1 = DFSTestUtil.readFile(fs, testFile1);
startRollingUpgrade();
// Sleep briefly so that DN learns of the rolling upgrade
// from heartbeats.
cluster.triggerHeartbeats();
Thread.sleep(5000);
LOG.info("Deleting file during rolling upgrade");
fs.delete(testFile1, false);
// Sleep briefly so that block files can be moved to trash
// (this is scheduled for asynchronous execution).
cluster.triggerBlockReports();
Thread.sleep(5000);
assert(!fs.exists(testFile1));
File blockFile = getBlockForFile(testFile1, true);
File trashFile = getTrashFileForBlock(blockFile, false);
deleteAndEnsureInTrash(testFile1, blockFile, trashFile);
// Now perform a rollback to restore DFS to the pre-rollback state.
rollbackRollingUpgrade();
// Ensure that testFile1 was restored after the rollback.
// Ensure that block was restored from trash
ensureTrashRestored(blockFile, trashFile);
// Ensure that files exist and restored file contents are the same.
assert(fs.exists(testFile1));
String fileContents2 = DFSTestUtil.readFile(fs, testFile1);
// Ensure that file contents are the same.
assertThat(fileContents1, is(fileContents2));
} finally {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
shutdownCluster();
}
}
}

View File

@ -83,8 +83,6 @@ public class TestDatanodeStartupOptions {
checkExpected(true, StartupOption.REGULAR, conf, "-regular");
checkExpected(true, StartupOption.REGULAR, conf, "-REGULAR");
checkExpected(true, StartupOption.ROLLBACK, conf, "-rollback");
checkExpected(true, StartupOption.ROLLINGUPGRADE, conf, "-rollingupgrade", "rollback");
checkExpected(true, StartupOption.ROLLINGUPGRADE, conf, "-rollingupgraDE", "ROLLBack");
}
/**
@ -94,7 +92,5 @@ public class TestDatanodeStartupOptions {
public void testStartupFailure() {
checkExpected(false, StartupOption.REGULAR, conf, "unknownoption");
checkExpected(false, StartupOption.REGULAR, conf, "-regular -rollback");
checkExpected(false, StartupOption.REGULAR, conf, "-rollingupgrade", "downgrade");
checkExpected(false, StartupOption.REGULAR, conf, "-rollingupgrade", "unknownoption");
}
}