HDFS-7645. Rolling upgrade is restoring blocks from trash multiple times (Contributed by Vinayakumar B and Keisuke Ogiwara)

This commit is contained in:
Arpit Agarwal 2015-03-30 15:25:16 -07:00
parent cc0a01c503
commit 1a495fbb48
16 changed files with 119 additions and 64 deletions

View File

@ -359,6 +359,9 @@ Release 2.8.0 - UNRELEASED
HDFS-7890. Improve information on Top users for metrics in HDFS-7890. Improve information on Top users for metrics in
RollingWindowsManager and lower log level (J.Andreina via vinayakumarb) RollingWindowsManager and lower log level (J.Andreina via vinayakumarb)
HDFS-7645. Rolling upgrade is restoring blocks from trash multiple times.
(Vinayakumar B and Keisuke Ogiwara via Arpit Agarwal)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -29,12 +29,12 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class RollingUpgradeInfo extends RollingUpgradeStatus { public class RollingUpgradeInfo extends RollingUpgradeStatus {
private final long startTime; private final long startTime;
private final long finalizeTime; private long finalizeTime;
private boolean createdRollbackImages; private boolean createdRollbackImages;
public RollingUpgradeInfo(String blockPoolId, boolean createdRollbackImages, public RollingUpgradeInfo(String blockPoolId, boolean createdRollbackImages,
long startTime, long finalizeTime) { long startTime, long finalizeTime) {
super(blockPoolId); super(blockPoolId, finalizeTime != 0);
this.createdRollbackImages = createdRollbackImages; this.createdRollbackImages = createdRollbackImages;
this.startTime = startTime; this.startTime = startTime;
this.finalizeTime = finalizeTime; this.finalizeTime = finalizeTime;
@ -57,10 +57,22 @@ public class RollingUpgradeInfo extends RollingUpgradeStatus {
return startTime; return startTime;
} }
@Override
public boolean isFinalized() { public boolean isFinalized() {
return finalizeTime != 0; return finalizeTime != 0;
} }
/**
* Finalize the upgrade if not already finalized
* @param finalizeTime
*/
public void finalize(long finalizeTime) {
if (finalizeTime != 0) {
this.finalizeTime = finalizeTime;
createdRollbackImages = false;
}
}
public long getFinalizeTime() { public long getFinalizeTime() {
return finalizeTime; return finalizeTime;
} }

View File

@ -27,15 +27,21 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class RollingUpgradeStatus { public class RollingUpgradeStatus {
private final String blockPoolId; private final String blockPoolId;
private final boolean finalized;
public RollingUpgradeStatus(String blockPoolId) { public RollingUpgradeStatus(String blockPoolId, boolean finalized) {
this.blockPoolId = blockPoolId; this.blockPoolId = blockPoolId;
this.finalized = finalized;
} }
public String getBlockPoolId() { public String getBlockPoolId() {
return blockPoolId; return blockPoolId;
} }
public boolean isFinalized() {
return finalized;
}
@Override @Override
public int hashCode() { public int hashCode() {
return blockPoolId.hashCode(); return blockPoolId.hashCode();
@ -49,7 +55,8 @@ public class RollingUpgradeStatus {
return false; return false;
} }
final RollingUpgradeStatus that = (RollingUpgradeStatus) obj; final RollingUpgradeStatus that = (RollingUpgradeStatus) obj;
return this.blockPoolId.equals(that.blockPoolId); return this.blockPoolId.equals(that.blockPoolId)
&& this.isFinalized() == that.isFinalized();
} }
@Override @Override

View File

@ -1686,11 +1686,13 @@ public class PBHelper {
RollingUpgradeStatus status) { RollingUpgradeStatus status) {
return RollingUpgradeStatusProto.newBuilder() return RollingUpgradeStatusProto.newBuilder()
.setBlockPoolId(status.getBlockPoolId()) .setBlockPoolId(status.getBlockPoolId())
.setFinalized(status.isFinalized())
.build(); .build();
} }
public static RollingUpgradeStatus convert(RollingUpgradeStatusProto proto) { public static RollingUpgradeStatus convert(RollingUpgradeStatusProto proto) {
return new RollingUpgradeStatus(proto.getBlockPoolId()); return new RollingUpgradeStatus(proto.getBlockPoolId(),
proto.getFinalized());
} }
public static RollingUpgradeInfoProto convert(RollingUpgradeInfo info) { public static RollingUpgradeInfoProto convert(RollingUpgradeInfo info) {

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.*; import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
@ -470,15 +471,19 @@ class BPOfferService {
/** /**
* Signal the current rolling upgrade status as indicated by the NN. * Signal the current rolling upgrade status as indicated by the NN.
* @param inProgress true if a rolling upgrade is in progress * @param rollingUpgradeStatus rolling upgrade status
*/ */
void signalRollingUpgrade(boolean inProgress) throws IOException { void signalRollingUpgrade(RollingUpgradeStatus rollingUpgradeStatus)
throws IOException {
if (rollingUpgradeStatus == null) {
return;
}
String bpid = getBlockPoolId(); String bpid = getBlockPoolId();
if (inProgress) { if (!rollingUpgradeStatus.isFinalized()) {
dn.getFSDataset().enableTrash(bpid); dn.getFSDataset().enableTrash(bpid);
dn.getFSDataset().setRollingUpgradeMarker(bpid); dn.getFSDataset().setRollingUpgradeMarker(bpid);
} else { } else {
dn.getFSDataset().restoreTrash(bpid); dn.getFSDataset().clearTrash(bpid);
dn.getFSDataset().clearRollingUpgradeMarker(bpid); dn.getFSDataset().clearRollingUpgradeMarker(bpid);
} }
} }

View File

@ -662,7 +662,7 @@ class BPServiceActor implements Runnable {
" in HeartbeatResponse. Expected " + " in HeartbeatResponse. Expected " +
bpos.getBlockPoolId()); bpos.getBlockPoolId());
} else { } else {
bpos.signalRollingUpgrade(rollingUpgradeStatus != null); bpos.signalRollingUpgrade(rollingUpgradeStatus);
} }
} }

View File

@ -351,7 +351,8 @@ public class BlockPoolSliceStorage extends Storage {
sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " + sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
" both be present."); " both be present.");
doRollback(sd, nsInfo); // rollback if applicable doRollback(sd, nsInfo); // rollback if applicable
} else { } else if (startOpt == StartupOption.ROLLBACK &&
!sd.getPreviousDir().exists()) {
// Restore all the files in the trash. The restored files are retained // Restore all the files in the trash. The restored files are retained
// during rolling upgrade rollback. They are deleted during rolling // during rolling upgrade rollback. They are deleted during rolling
// upgrade downgrade. // upgrade downgrade.
@ -378,6 +379,12 @@ public class BlockPoolSliceStorage extends Storage {
&& this.cTime == nsInfo.getCTime()) { && this.cTime == nsInfo.getCTime()) {
return; // regular startup return; // regular startup
} }
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) {
int restored = restoreBlockFilesFromTrash(getTrashRootDir(sd));
LOG.info("Restored " + restored + " block files from trash " +
"before the layout upgrade. These blocks will be moved to " +
"the previous directory during the upgrade");
}
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION
|| this.cTime < nsInfo.getCTime()) { || this.cTime < nsInfo.getCTime()) {
doUpgrade(datanode, sd, nsInfo); // upgrade doUpgrade(datanode, sd, nsInfo); // upgrade
@ -730,16 +737,12 @@ public class BlockPoolSliceStorage extends Storage {
/** /**
* Delete all files and directories in the trash directories. * Delete all files and directories in the trash directories.
*/ */
public void restoreTrash() { public void clearTrash() {
for (StorageDirectory sd : storageDirs) { for (StorageDirectory sd : storageDirs) {
File trashRoot = getTrashRootDir(sd); File trashRoot = getTrashRootDir(sd);
try {
Preconditions.checkState(!(trashRoot.exists() && sd.getPreviousDir().exists())); Preconditions.checkState(!(trashRoot.exists() && sd.getPreviousDir().exists()));
restoreBlockFilesFromTrash(trashRoot); FileUtil.fullyDelete(trashRoot);
FileUtil.fullyDelete(getTrashRootDir(sd)); LOG.info("Cleared trash for storage directory " + sd);
} catch (IOException ioe) {
LOG.warn("Restoring trash failed for storage directory " + sd);
}
} }
} }

View File

@ -168,11 +168,11 @@ public class DataStorage extends Storage {
} }
} }
public void restoreTrash(String bpid) { public void clearTrash(String bpid) {
if (trashEnabledBpids.contains(bpid)) { if (trashEnabledBpids.contains(bpid)) {
getBPStorage(bpid).restoreTrash(); getBPStorage(bpid).clearTrash();
trashEnabledBpids.remove(bpid); trashEnabledBpids.remove(bpid);
LOG.info("Restored trash for bpid " + bpid); LOG.info("Cleared trash for bpid " + bpid);
} }
} }

View File

@ -490,9 +490,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
public void enableTrash(String bpid); public void enableTrash(String bpid);
/** /**
* Restore trash * Clear trash
*/ */
public void restoreTrash(String bpid); public void clearTrash(String bpid);
/** /**
* @return true when trash is enabled * @return true when trash is enabled

View File

@ -2619,8 +2619,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
@Override @Override
public void restoreTrash(String bpid) { public void clearTrash(String bpid) {
dataStorage.restoreTrash(bpid); dataStorage.clearTrash(bpid);
} }
@Override @Override

View File

@ -7568,7 +7568,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/** Is rolling upgrade in progress? */ /** Is rolling upgrade in progress? */
public boolean isRollingUpgrade() { public boolean isRollingUpgrade() {
return rollingUpgradeInfo != null; return rollingUpgradeInfo != null && !rollingUpgradeInfo.isFinalized();
} }
void checkRollingUpgrade(String action) throws RollingUpgradeException { void checkRollingUpgrade(String action) throws RollingUpgradeException {
@ -7583,7 +7583,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
checkSuperuserPrivilege(); checkSuperuserPrivilege();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
writeLock(); writeLock();
final RollingUpgradeInfo returnInfo;
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
if (!isRollingUpgrade()) { if (!isRollingUpgrade()) {
@ -7591,8 +7590,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
} }
checkNameNodeSafeMode("Failed to finalize rolling upgrade"); checkNameNodeSafeMode("Failed to finalize rolling upgrade");
returnInfo = finalizeRollingUpgradeInternal(now()); finalizeRollingUpgradeInternal(now());
getEditLog().logFinalizeRollingUpgrade(returnInfo.getFinalizeTime()); getEditLog().logFinalizeRollingUpgrade(rollingUpgradeInfo.getFinalizeTime());
if (haEnabled) { if (haEnabled) {
// roll the edit log to make sure the standby NameNode can tail // roll the edit log to make sure the standby NameNode can tail
getFSImage().rollEditLog(); getFSImage().rollEditLog();
@ -7612,14 +7611,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
if (auditLog.isInfoEnabled() && isExternalInvocation()) { if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(true, "finalizeRollingUpgrade", null, null, null); logAuditEvent(true, "finalizeRollingUpgrade", null, null, null);
} }
return returnInfo; return rollingUpgradeInfo;
} }
RollingUpgradeInfo finalizeRollingUpgradeInternal(long finalizeTime) void finalizeRollingUpgradeInternal(long finalizeTime) {
throws RollingUpgradeException { // Set the finalize time
final long startTime = rollingUpgradeInfo.getStartTime(); rollingUpgradeInfo.finalize(finalizeTime);
rollingUpgradeInfo = null;
return new RollingUpgradeInfo(blockPoolId, false, startTime, finalizeTime);
} }
long addCacheDirective(CacheDirectiveInfo directive, long addCacheDirective(CacheDirectiveInfo directive,

View File

@ -607,4 +607,5 @@ message SnapshotInfoProto {
*/ */
message RollingUpgradeStatusProto { message RollingUpgradeStatusProto {
required string blockPoolId = 1; required string blockPoolId = 1;
optional bool finalized = 2 [default = false];
} }

View File

@ -79,6 +79,9 @@
<button type="button" class="close" data-dismiss="alert" aria-hidden="true">&times;</button> <button type="button" class="close" data-dismiss="alert" aria-hidden="true">&times;</button>
{#RollingUpgradeStatus} {#RollingUpgradeStatus}
{@if cond="{finalizeTime} > 0"}
<p>Rolling upgrade finalized at {#helper_date_tostring value="{finalizeTime}"/}. </p>
{:else}
<p>Rolling upgrade started at {#helper_date_tostring value="{startTime}"/}. </br> <p>Rolling upgrade started at {#helper_date_tostring value="{startTime}"/}. </br>
{#createdRollbackImages} {#createdRollbackImages}
Rollback image has been created. Proceed to upgrade daemons. Rollback image has been created. Proceed to upgrade daemons.
@ -86,6 +89,7 @@
Rollback image has not been created. Rollback image has not been created.
{/createdRollbackImages} {/createdRollbackImages}
</p> </p>
{/if}
{/RollingUpgradeStatus} {/RollingUpgradeStatus}
{@if cond="{DistinctVersionCount} > 1"} {@if cond="{DistinctVersionCount} > 1"}

View File

@ -1226,7 +1226,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} }
@Override @Override
public void restoreTrash(String bpid) { public void clearTrash(String bpid) {
} }
@Override @Override

View File

@ -19,12 +19,7 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -43,7 +38,9 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.Builder; import org.apache.hadoop.hdfs.MiniDFSCluster.Builder;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.TestRollingUpgrade; import org.apache.hadoop.hdfs.TestRollingUpgrade;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -208,7 +205,34 @@ public class TestDataNodeRollingUpgrade {
public void testDatanodeRollingUpgradeWithFinalize() throws Exception { public void testDatanodeRollingUpgradeWithFinalize() throws Exception {
try { try {
startCluster(); startCluster();
rollingUpgradeAndFinalize();
// Do it again
rollingUpgradeAndFinalize();
} finally {
shutdownCluster();
}
}
@Test(timeout = 600000)
public void testDatanodeRUwithRegularUpgrade() throws Exception {
try {
startCluster();
rollingUpgradeAndFinalize();
DataNodeProperties dn = cluster.stopDataNode(0);
cluster.restartNameNode(0, true, "-upgrade");
cluster.restartDataNode(dn, true);
cluster.waitActive();
fs = cluster.getFileSystem(0);
Path testFile3 = new Path("/" + GenericTestUtils.getMethodName()
+ ".03.dat");
DFSTestUtil.createFile(fs, testFile3, FILE_SIZE, REPL_FACTOR, SEED);
cluster.getFileSystem().finalizeUpgrade();
} finally {
shutdownCluster();
}
}
private void rollingUpgradeAndFinalize() throws IOException, Exception {
// Create files in DFS. // Create files in DFS.
Path testFile1 = new Path("/" + GenericTestUtils.getMethodName() + ".01.dat"); Path testFile1 = new Path("/" + GenericTestUtils.getMethodName() + ".01.dat");
Path testFile2 = new Path("/" + GenericTestUtils.getMethodName() + ".02.dat"); Path testFile2 = new Path("/" + GenericTestUtils.getMethodName() + ".02.dat");
@ -218,6 +242,7 @@ public class TestDataNodeRollingUpgrade {
startRollingUpgrade(); startRollingUpgrade();
File blockFile = getBlockForFile(testFile2, true); File blockFile = getBlockForFile(testFile2, true);
File trashFile = getTrashFileForBlock(blockFile, false); File trashFile = getTrashFileForBlock(blockFile, false);
cluster.triggerBlockReports();
deleteAndEnsureInTrash(testFile2, blockFile, trashFile); deleteAndEnsureInTrash(testFile2, blockFile, trashFile);
finalizeRollingUpgrade(); finalizeRollingUpgrade();
@ -225,10 +250,6 @@ public class TestDataNodeRollingUpgrade {
assertFalse(isTrashRootPresent()); assertFalse(isTrashRootPresent());
assert(!fs.exists(testFile2)); assert(!fs.exists(testFile2));
assert(fs.exists(testFile1)); assert(fs.exists(testFile1));
} finally {
shutdownCluster();
}
} }
@Test (timeout=600000) @Test (timeout=600000)

View File

@ -306,7 +306,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
} }
@Override @Override
public void restoreTrash(String bpid) { public void clearTrash(String bpid) {
} }