HDFS-7645. Rolling upgrade is restoring blocks from trash multiple times (Contributed by Vinayakumar B and Keisuke Ogiwara)
(cherry picked from commit abf3ad988d
)
This commit is contained in:
parent
fe874e9f69
commit
3a5b9f49df
|
@ -20,6 +20,9 @@ Release 2.7.2 - UNRELEASED
|
|||
|
||||
HDFS-8722. Optimize datanode writes for small writes and flushes (kihwal)
|
||||
|
||||
HDFS-7645. Rolling upgrade is restoring blocks from trash multiple times.
|
||||
(Vinayakumar B and Keisuke Ogiwara via Arpit Agarwal)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
HDFS-6945. BlockManager should remove a block from excessReplicateMap and
|
||||
|
|
|
@ -29,12 +29,12 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
@InterfaceStability.Evolving
|
||||
public class RollingUpgradeInfo extends RollingUpgradeStatus {
|
||||
private final long startTime;
|
||||
private final long finalizeTime;
|
||||
private long finalizeTime;
|
||||
private boolean createdRollbackImages;
|
||||
|
||||
public RollingUpgradeInfo(String blockPoolId, boolean createdRollbackImages,
|
||||
long startTime, long finalizeTime) {
|
||||
super(blockPoolId);
|
||||
super(blockPoolId, finalizeTime != 0);
|
||||
this.createdRollbackImages = createdRollbackImages;
|
||||
this.startTime = startTime;
|
||||
this.finalizeTime = finalizeTime;
|
||||
|
@ -57,10 +57,22 @@ public class RollingUpgradeInfo extends RollingUpgradeStatus {
|
|||
return startTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFinalized() {
|
||||
return finalizeTime != 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Finalize the upgrade if not already finalized
|
||||
* @param finalizeTime
|
||||
*/
|
||||
public void finalize(long finalizeTime) {
|
||||
if (finalizeTime != 0) {
|
||||
this.finalizeTime = finalizeTime;
|
||||
createdRollbackImages = false;
|
||||
}
|
||||
}
|
||||
|
||||
public long getFinalizeTime() {
|
||||
return finalizeTime;
|
||||
}
|
||||
|
|
|
@ -27,15 +27,21 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
@InterfaceStability.Evolving
|
||||
public class RollingUpgradeStatus {
|
||||
private final String blockPoolId;
|
||||
private final boolean finalized;
|
||||
|
||||
public RollingUpgradeStatus(String blockPoolId) {
|
||||
public RollingUpgradeStatus(String blockPoolId, boolean finalized) {
|
||||
this.blockPoolId = blockPoolId;
|
||||
this.finalized = finalized;
|
||||
}
|
||||
|
||||
public String getBlockPoolId() {
|
||||
return blockPoolId;
|
||||
}
|
||||
|
||||
public boolean isFinalized() {
|
||||
return finalized;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return blockPoolId.hashCode();
|
||||
|
@ -49,7 +55,8 @@ public class RollingUpgradeStatus {
|
|||
return false;
|
||||
}
|
||||
final RollingUpgradeStatus that = (RollingUpgradeStatus) obj;
|
||||
return this.blockPoolId.equals(that.blockPoolId);
|
||||
return this.blockPoolId.equals(that.blockPoolId)
|
||||
&& this.isFinalized() == that.isFinalized();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1687,11 +1687,13 @@ public class PBHelper {
|
|||
RollingUpgradeStatus status) {
|
||||
return RollingUpgradeStatusProto.newBuilder()
|
||||
.setBlockPoolId(status.getBlockPoolId())
|
||||
.setFinalized(status.isFinalized())
|
||||
.build();
|
||||
}
|
||||
|
||||
public static RollingUpgradeStatus convert(RollingUpgradeStatusProto proto) {
|
||||
return new RollingUpgradeStatus(proto.getBlockPoolId());
|
||||
return new RollingUpgradeStatus(proto.getBlockPoolId(),
|
||||
proto.getFinalized());
|
||||
}
|
||||
|
||||
public static RollingUpgradeInfoProto convert(RollingUpgradeInfo info) {
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
|||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
|
||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.server.protocol.*;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
|
||||
|
@ -470,15 +471,19 @@ class BPOfferService {
|
|||
|
||||
/**
|
||||
* Signal the current rolling upgrade status as indicated by the NN.
|
||||
* @param inProgress true if a rolling upgrade is in progress
|
||||
* @param rollingUpgradeStatus rolling upgrade status
|
||||
*/
|
||||
void signalRollingUpgrade(boolean inProgress) throws IOException {
|
||||
void signalRollingUpgrade(RollingUpgradeStatus rollingUpgradeStatus)
|
||||
throws IOException {
|
||||
if (rollingUpgradeStatus == null) {
|
||||
return;
|
||||
}
|
||||
String bpid = getBlockPoolId();
|
||||
if (inProgress) {
|
||||
if (!rollingUpgradeStatus.isFinalized()) {
|
||||
dn.getFSDataset().enableTrash(bpid);
|
||||
dn.getFSDataset().setRollingUpgradeMarker(bpid);
|
||||
} else {
|
||||
dn.getFSDataset().restoreTrash(bpid);
|
||||
dn.getFSDataset().clearTrash(bpid);
|
||||
dn.getFSDataset().clearRollingUpgradeMarker(bpid);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -613,7 +613,7 @@ class BPServiceActor implements Runnable {
|
|||
" in HeartbeatResponse. Expected " +
|
||||
bpos.getBlockPoolId());
|
||||
} else {
|
||||
bpos.signalRollingUpgrade(rollingUpgradeStatus != null);
|
||||
bpos.signalRollingUpgrade(rollingUpgradeStatus);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -351,7 +351,8 @@ public class BlockPoolSliceStorage extends Storage {
|
|||
sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
|
||||
" both be present.");
|
||||
doRollback(sd, nsInfo); // rollback if applicable
|
||||
} else {
|
||||
} else if (startOpt == StartupOption.ROLLBACK &&
|
||||
!sd.getPreviousDir().exists()) {
|
||||
// Restore all the files in the trash. The restored files are retained
|
||||
// during rolling upgrade rollback. They are deleted during rolling
|
||||
// upgrade downgrade.
|
||||
|
@ -378,6 +379,12 @@ public class BlockPoolSliceStorage extends Storage {
|
|||
&& this.cTime == nsInfo.getCTime()) {
|
||||
return; // regular startup
|
||||
}
|
||||
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) {
|
||||
int restored = restoreBlockFilesFromTrash(getTrashRootDir(sd));
|
||||
LOG.info("Restored " + restored + " block files from trash " +
|
||||
"before the layout upgrade. These blocks will be moved to " +
|
||||
"the previous directory during the upgrade");
|
||||
}
|
||||
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION
|
||||
|| this.cTime < nsInfo.getCTime()) {
|
||||
doUpgrade(datanode, sd, nsInfo); // upgrade
|
||||
|
@ -730,16 +737,12 @@ public class BlockPoolSliceStorage extends Storage {
|
|||
/**
|
||||
* Delete all files and directories in the trash directories.
|
||||
*/
|
||||
public void restoreTrash() {
|
||||
public void clearTrash() {
|
||||
for (StorageDirectory sd : storageDirs) {
|
||||
File trashRoot = getTrashRootDir(sd);
|
||||
try {
|
||||
Preconditions.checkState(!(trashRoot.exists() && sd.getPreviousDir().exists()));
|
||||
restoreBlockFilesFromTrash(trashRoot);
|
||||
FileUtil.fullyDelete(getTrashRootDir(sd));
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Restoring trash failed for storage directory " + sd);
|
||||
}
|
||||
FileUtil.fullyDelete(trashRoot);
|
||||
LOG.info("Cleared trash for storage directory " + sd);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -168,11 +168,11 @@ public class DataStorage extends Storage {
|
|||
}
|
||||
}
|
||||
|
||||
public void restoreTrash(String bpid) {
|
||||
public void clearTrash(String bpid) {
|
||||
if (trashEnabledBpids.contains(bpid)) {
|
||||
getBPStorage(bpid).restoreTrash();
|
||||
getBPStorage(bpid).clearTrash();
|
||||
trashEnabledBpids.remove(bpid);
|
||||
LOG.info("Restored trash for bpid " + bpid);
|
||||
LOG.info("Cleared trash for bpid " + bpid);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -490,9 +490,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
public void enableTrash(String bpid);
|
||||
|
||||
/**
|
||||
* Restore trash
|
||||
* Clear trash
|
||||
*/
|
||||
public void restoreTrash(String bpid);
|
||||
public void clearTrash(String bpid);
|
||||
|
||||
/**
|
||||
* @return true when trash is enabled
|
||||
|
|
|
@ -2637,8 +2637,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void restoreTrash(String bpid) {
|
||||
dataStorage.restoreTrash(bpid);
|
||||
public void clearTrash(String bpid) {
|
||||
dataStorage.clearTrash(bpid);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -7615,7 +7615,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
|
||||
/** Is rolling upgrade in progress? */
|
||||
public boolean isRollingUpgrade() {
|
||||
return rollingUpgradeInfo != null;
|
||||
return rollingUpgradeInfo != null && !rollingUpgradeInfo.isFinalized();
|
||||
}
|
||||
|
||||
void checkRollingUpgrade(String action) throws RollingUpgradeException {
|
||||
|
@ -7630,7 +7630,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
checkSuperuserPrivilege();
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
writeLock();
|
||||
final RollingUpgradeInfo returnInfo;
|
||||
try {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
if (!isRollingUpgrade()) {
|
||||
|
@ -7638,8 +7637,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
}
|
||||
checkNameNodeSafeMode("Failed to finalize rolling upgrade");
|
||||
|
||||
returnInfo = finalizeRollingUpgradeInternal(now());
|
||||
getEditLog().logFinalizeRollingUpgrade(returnInfo.getFinalizeTime());
|
||||
finalizeRollingUpgradeInternal(now());
|
||||
getEditLog().logFinalizeRollingUpgrade(rollingUpgradeInfo.getFinalizeTime());
|
||||
if (haEnabled) {
|
||||
// roll the edit log to make sure the standby NameNode can tail
|
||||
getFSImage().rollEditLog();
|
||||
|
@ -7659,14 +7658,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
|
||||
logAuditEvent(true, "finalizeRollingUpgrade", null, null, null);
|
||||
}
|
||||
return returnInfo;
|
||||
return rollingUpgradeInfo;
|
||||
}
|
||||
|
||||
RollingUpgradeInfo finalizeRollingUpgradeInternal(long finalizeTime)
|
||||
throws RollingUpgradeException {
|
||||
final long startTime = rollingUpgradeInfo.getStartTime();
|
||||
rollingUpgradeInfo = null;
|
||||
return new RollingUpgradeInfo(blockPoolId, false, startTime, finalizeTime);
|
||||
void finalizeRollingUpgradeInternal(long finalizeTime) {
|
||||
// Set the finalize time
|
||||
rollingUpgradeInfo.finalize(finalizeTime);
|
||||
}
|
||||
|
||||
long addCacheDirective(CacheDirectiveInfo directive,
|
||||
|
|
|
@ -607,4 +607,5 @@ message SnapshotInfoProto {
|
|||
*/
|
||||
message RollingUpgradeStatusProto {
|
||||
required string blockPoolId = 1;
|
||||
optional bool finalized = 2 [default = false];
|
||||
}
|
||||
|
|
|
@ -79,6 +79,9 @@
|
|||
<button type="button" class="close" data-dismiss="alert" aria-hidden="true">×</button>
|
||||
|
||||
{#RollingUpgradeStatus}
|
||||
{@if cond="{finalizeTime} > 0"}
|
||||
<p>Rolling upgrade finalized at {#helper_date_tostring value="{finalizeTime}"/}. </p>
|
||||
{:else}
|
||||
<p>Rolling upgrade started at {#helper_date_tostring value="{startTime}"/}. </br>
|
||||
{#createdRollbackImages}
|
||||
Rollback image has been created. Proceed to upgrade daemons.
|
||||
|
@ -86,6 +89,7 @@
|
|||
Rollback image has not been created.
|
||||
{/createdRollbackImages}
|
||||
</p>
|
||||
{/if}
|
||||
{/RollingUpgradeStatus}
|
||||
|
||||
{@if cond="{DistinctVersionCount} > 1"}
|
||||
|
|
|
@ -1219,7 +1219,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void restoreTrash(String bpid) {
|
||||
public void clearTrash(String bpid) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,12 +19,7 @@
|
|||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -43,7 +38,9 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster.Builder;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||
import org.apache.hadoop.hdfs.TestRollingUpgrade;
|
||||
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
|
@ -208,7 +205,34 @@ public class TestDataNodeRollingUpgrade {
|
|||
public void testDatanodeRollingUpgradeWithFinalize() throws Exception {
|
||||
try {
|
||||
startCluster();
|
||||
rollingUpgradeAndFinalize();
|
||||
// Do it again
|
||||
rollingUpgradeAndFinalize();
|
||||
} finally {
|
||||
shutdownCluster();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 600000)
|
||||
public void testDatanodeRUwithRegularUpgrade() throws Exception {
|
||||
try {
|
||||
startCluster();
|
||||
rollingUpgradeAndFinalize();
|
||||
DataNodeProperties dn = cluster.stopDataNode(0);
|
||||
cluster.restartNameNode(0, true, "-upgrade");
|
||||
cluster.restartDataNode(dn, true);
|
||||
cluster.waitActive();
|
||||
fs = cluster.getFileSystem(0);
|
||||
Path testFile3 = new Path("/" + GenericTestUtils.getMethodName()
|
||||
+ ".03.dat");
|
||||
DFSTestUtil.createFile(fs, testFile3, FILE_SIZE, REPL_FACTOR, SEED);
|
||||
cluster.getFileSystem().finalizeUpgrade();
|
||||
} finally {
|
||||
shutdownCluster();
|
||||
}
|
||||
}
|
||||
|
||||
private void rollingUpgradeAndFinalize() throws IOException, Exception {
|
||||
// Create files in DFS.
|
||||
Path testFile1 = new Path("/" + GenericTestUtils.getMethodName() + ".01.dat");
|
||||
Path testFile2 = new Path("/" + GenericTestUtils.getMethodName() + ".02.dat");
|
||||
|
@ -218,6 +242,7 @@ public class TestDataNodeRollingUpgrade {
|
|||
startRollingUpgrade();
|
||||
File blockFile = getBlockForFile(testFile2, true);
|
||||
File trashFile = getTrashFileForBlock(blockFile, false);
|
||||
cluster.triggerBlockReports();
|
||||
deleteAndEnsureInTrash(testFile2, blockFile, trashFile);
|
||||
finalizeRollingUpgrade();
|
||||
|
||||
|
@ -225,10 +250,6 @@ public class TestDataNodeRollingUpgrade {
|
|||
assertFalse(isTrashRootPresent());
|
||||
assert(!fs.exists(testFile2));
|
||||
assert(fs.exists(testFile1));
|
||||
|
||||
} finally {
|
||||
shutdownCluster();
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout=600000)
|
||||
|
|
|
@ -306,7 +306,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void restoreTrash(String bpid) {
|
||||
public void clearTrash(String bpid) {
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue