HDDS-1649. On installSnapshot notification from OM leader, download checkpoint and reload OM state (#948)

This commit is contained in:
Hanisha Koneru 2019-07-22 12:06:55 -07:00 committed by Arpit Agarwal
parent 340bbaf8bf
commit cdc36fe286
16 changed files with 641 additions and 106 deletions

View File

@ -119,6 +119,7 @@ public final class OzoneConsts {
public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX;
public static final String DELETED_BLOCK_DB = "deletedBlock.db";
public static final String OM_DB_NAME = "om.db";
public static final String OM_DB_BACKUP_PREFIX = "om.db.backup.";
public static final String OM_DB_CHECKPOINTS_DIR_NAME = "om.db.checkpoints";
public static final String OZONE_MANAGER_TOKEN_DB_NAME = "om-token.db";
public static final String SCM_DB_NAME = "scm.db";

View File

@ -1630,6 +1630,14 @@
<description>Byte limit for Raft's Log Worker queue.
</description>
</property>
<property>
<name>ozone.om.ratis.log.purge.gap</name>
<value>1000000</value>
<tag>OZONE, OM, RATIS</tag>
<description>The minimum gap between log indices for Raft server to purge
its log segments after taking snapshot.
</description>
</property>
<property>
<name>ozone.om.ratis.snapshot.auto.trigger.threshold</name>

View File

@ -123,6 +123,9 @@ private OMConfigKeys() {
"ozone.om.ratis.log.appender.queue.byte-limit";
public static final String
OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";
public static final String OZONE_OM_RATIS_LOG_PURGE_GAP =
"ozone.om.ratis.log.purge.gap";
public static final int OZONE_OM_RATIS_LOG_PURGE_GAP_DEFAULT = 1000000;
// OM Snapshot configurations
public static final String OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY

View File

@ -203,7 +203,8 @@ public enum ResultCodes {
PREFIX_NOT_FOUND,
S3_BUCKET_INVALID_LENGTH
S3_BUCKET_INVALID_LENGTH,
RATIS_ERROR // Error in Ratis server
}
}

View File

@ -29,9 +29,10 @@ public interface OzoneManagerHAProtocol {
/**
* Store the snapshot index i.e. the raft log index, corresponding to the
* last transaction applied to the OM RocksDB, in OM metadata dir on disk.
* @param flush flush the OM DB to disk if true
* @return the snapshot index
* @throws IOException
*/
long saveRatisSnapshot() throws IOException;
long saveRatisSnapshot(boolean flush) throws IOException;
}

View File

@ -278,6 +278,8 @@ enum Status {
PREFIX_NOT_FOUND=50;
S3_BUCKET_INVALID_LENGTH = 51; // s3 bucket invalid length.
RATIS_ERROR = 52;
}

View File

@ -241,6 +241,7 @@ abstract class Builder {
protected String clusterId;
protected String omServiceId;
protected int numOfOMs;
protected int numOfActiveOMs;
protected Optional<Boolean> enableTrace = Optional.of(false);
protected Optional<Integer> hbInterval = Optional.empty();
@ -440,6 +441,11 @@ public Builder setNumOfOzoneManagers(int numOMs) {
return this;
}
public Builder setNumOfActiveOMs(int numActiveOMs) {
this.numOfActiveOMs = numActiveOMs;
return this;
}
public Builder setStreamBufferSizeUnit(StorageUnit unit) {
this.streamBufferSizeUnit = Optional.of(unit);
return this;

View File

@ -53,6 +53,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
private Map<String, OzoneManager> ozoneManagerMap;
private List<OzoneManager> ozoneManagers;
// Active OMs denote OMs which are up and running
private List<OzoneManager> activeOMs;
private List<OzoneManager> inactiveOMs;
private static final Random RANDOM = new Random();
private static final int RATIS_LEADER_ELECTION_TIMEOUT = 1000; // 1 seconds
@ -67,11 +71,15 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
private MiniOzoneHAClusterImpl(
OzoneConfiguration conf,
Map<String, OzoneManager> omMap,
List<OzoneManager> activeOMList,
List<OzoneManager> inactiveOMList,
StorageContainerManager scm,
List<HddsDatanodeService> hddsDatanodes) {
super(conf, scm, hddsDatanodes);
this.ozoneManagerMap = omMap;
this.ozoneManagers = new ArrayList<>(omMap.values());
this.activeOMs = activeOMList;
this.inactiveOMs = inactiveOMList;
}
/**
@ -83,6 +91,10 @@ public OzoneManager getOzoneManager() {
return this.ozoneManagers.get(0);
}
public boolean isOMActive(String omNodeId) {
return activeOMs.contains(ozoneManagerMap.get(omNodeId));
}
public OzoneManager getOzoneManager(int index) {
return this.ozoneManagers.get(index);
}
@ -91,6 +103,20 @@ public OzoneManager getOzoneManager(String omNodeId) {
return this.ozoneManagerMap.get(omNodeId);
}
/**
* Start a previously inactive OM.
*/
public void startInactiveOM(String omNodeID) throws IOException {
OzoneManager ozoneManager = ozoneManagerMap.get(omNodeID);
if (!inactiveOMs.contains(ozoneManager)) {
throw new IOException("OM is already active.");
} else {
ozoneManager.start();
activeOMs.add(ozoneManager);
inactiveOMs.remove(ozoneManager);
}
}
@Override
public void restartOzoneManager() throws IOException {
for (OzoneManager ozoneManager : ozoneManagers) {
@ -125,6 +151,8 @@ public void stopOzoneManager(String omNodeId) {
public static class Builder extends MiniOzoneClusterImpl.Builder {
private final String nodeIdBaseStr = "omNode-";
private List<OzoneManager> activeOMs = new ArrayList<>();
private List<OzoneManager> inactiveOMs = new ArrayList<>();
/**
* Creates a new Builder.
@ -137,6 +165,10 @@ public Builder(OzoneConfiguration conf) {
@Override
public MiniOzoneCluster build() throws IOException {
if (numOfActiveOMs > numOfOMs) {
throw new IllegalArgumentException("Number of active OMs cannot be " +
"more than the total number of OMs");
}
DefaultMetricsSystem.setMiniClusterMode(true);
initializeConfiguration();
StorageContainerManager scm;
@ -150,8 +182,8 @@ public MiniOzoneCluster build() throws IOException {
}
final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omMap,
scm, hddsDatanodes);
MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(
conf, omMap, activeOMs, inactiveOMs, scm, hddsDatanodes);
if (startDataNodes) {
cluster.startHddsDatanodes();
}
@ -215,9 +247,16 @@ private Map<String, OzoneManager> createOMService() throws IOException,
om.setCertClient(certClient);
omMap.put(nodeId, om);
om.start();
LOG.info("Started OzoneManager RPC server at " +
om.getOmRpcServerAddr());
if (i <= numOfActiveOMs) {
om.start();
activeOMs.add(om);
LOG.info("Started OzoneManager RPC server at " +
om.getOmRpcServerAddr());
} else {
inactiveOMs.add(om);
LOG.info("Intialized OzoneManager at " + om.getOmRpcServerAddr()
+ ". This OM is currently inactive (not running).");
}
}
// Set default OM address to point to the first OM. Clients would

View File

@ -0,0 +1,189 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.om;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.utils.db.DBCheckpoint;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import static org.apache.hadoop.ozone.om.TestOzoneManagerHA.createKey;
/**
* Tests the Ratis snaphsots feature in OM.
*/
public class TestOMRatisSnapshots {
private MiniOzoneHAClusterImpl cluster = null;
private ObjectStore objectStore;
private OzoneConfiguration conf;
private String clusterId;
private String scmId;
private int numOfOMs = 3;
private static final long SNAPSHOT_THRESHOLD = 50;
private static final int LOG_PURGE_GAP = 50;
@Rule
public ExpectedException exception = ExpectedException.none();
@Rule
public Timeout timeout = new Timeout(500_000);
/**
* Create a MiniOzoneCluster for testing. The cluster initially has one
* inactive OM. So at the start of the cluster, there will be 2 active and 1
* inactive OM.
*
* @throws IOException
*/
@Before
public void init() throws Exception {
conf = new OzoneConfiguration();
clusterId = UUID.randomUUID().toString();
scmId = UUID.randomUUID().toString();
conf.setLong(
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
SNAPSHOT_THRESHOLD);
conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP);
cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
.setClusterId(clusterId)
.setScmId(scmId)
.setOMServiceId("om-service-test1")
.setNumOfOzoneManagers(numOfOMs)
.setNumOfActiveOMs(2)
.build();
cluster.waitForClusterToBeReady();
objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore();
}
/**
* Shutdown MiniDFSCluster.
*/
@After
public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testInstallSnapshot() throws Exception {
// Get the leader OM
String leaderOMNodeId = objectStore.getClientProxy().getOMProxyProvider()
.getCurrentProxyOMNodeId();
OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
// Find the inactive OM
String followerNodeId = leaderOM.getPeerNodes().get(0).getOMNodeId();
if (cluster.isOMActive(followerNodeId)) {
followerNodeId = leaderOM.getPeerNodes().get(1).getOMNodeId();
}
OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
// Do some transactions so that the log index increases
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
.setOwner(userName)
.setAdmin(adminName)
.build();
objectStore.createVolume(volumeName, createVolumeArgs);
OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
retVolumeinfo.createBucket(bucketName);
OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
long leaderOMappliedLogIndex =
leaderRatisServer.getStateMachineLastAppliedIndex();
leaderOM.getOmRatisServer().getStateMachineLastAppliedIndex();
List<String> keys = new ArrayList<>();
while (leaderOMappliedLogIndex < 2000) {
keys.add(createKey(ozoneBucket));
leaderOMappliedLogIndex =
leaderRatisServer.getStateMachineLastAppliedIndex();
}
// Get the latest db checkpoint from the leader OM.
long leaderOMSnaphsotIndex = leaderOM.saveRatisSnapshot(true);
DBCheckpoint leaderDbCheckpoint =
leaderOM.getMetadataManager().getStore().getCheckpoint(false);
// Start the inactive OM
cluster.startInactiveOM(followerNodeId);
// The recently started OM should be lagging behind the leader OM.
long followerOMLastAppliedIndex =
followerOM.getOmRatisServer().getStateMachineLastAppliedIndex();
Assert.assertTrue(
followerOMLastAppliedIndex < leaderOMSnaphsotIndex);
// Install leader OM's db checkpoint on the lagging OM.
followerOM.getOmRatisServer().getOmStateMachine().pause();
followerOM.getMetadataManager().getStore().close();
followerOM.replaceOMDBWithCheckpoint(
leaderOMSnaphsotIndex, leaderDbCheckpoint.getCheckpointLocation());
// Reload the follower OM with new DB checkpoint from the leader OM.
followerOM.reloadOMState(leaderOMSnaphsotIndex);
followerOM.getOmRatisServer().getOmStateMachine().unpause(
leaderOMSnaphsotIndex);
// After the new checkpoint is loaded and state machine is unpaused, the
// follower OM lastAppliedIndex must match the snapshot index of the
// checkpoint.
followerOMLastAppliedIndex = followerOM.getOmRatisServer()
.getStateMachineLastAppliedIndex();
Assert.assertEquals(leaderOMSnaphsotIndex, followerOMLastAppliedIndex);
// Verify that the follower OM's DB contains the transactions which were
// made while it was inactive.
OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager();
Assert.assertNotNull(followerOMMetaMngr.getVolumeTable().get(
followerOMMetaMngr.getVolumeKey(volumeName)));
Assert.assertNotNull(followerOMMetaMngr.getBucketTable().get(
followerOMMetaMngr.getBucketKey(volumeName, bucketName)));
for (String key : keys) {
Assert.assertNotNull(followerOMMetaMngr.getKeyTable().get(
followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
}
}
}

View File

@ -829,7 +829,11 @@ public void testOMRatisSnapshot() throws Exception {
}
private void createKey(OzoneBucket ozoneBucket) throws IOException {
/**
* Create a key in the bucket.
* @return the key name.
*/
static String createKey(OzoneBucket ozoneBucket) throws IOException {
String keyName = "key" + RandomStringUtils.randomNumeric(5);
String data = "data" + RandomStringUtils.randomNumeric(5);
OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName,
@ -837,5 +841,6 @@ private void createKey(OzoneBucket ozoneBucket) throws IOException {
ReplicationFactor.ONE, new HashMap<>());
ozoneOutputStream.write(data.getBytes(), 0, data.length());
ozoneOutputStream.close();
return keyName;
}
}

View File

@ -126,7 +126,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) {
// ratis snapshot first. This step also included flushing the OM DB.
// Hence, we can set flush to false.
flush = false;
ratisSnapshotIndex = om.saveRatisSnapshot();
ratisSnapshotIndex = om.saveRatisSnapshot(true);
} else {
ratisSnapshotIndex = om.loadRatisSnapshotIndex();
}

View File

@ -196,15 +196,18 @@ public void decNumKeys() {
}
public void setNumVolumes(long val) {
this.numVolumes.incr(val);
long oldVal = this.numVolumes.value();
this.numVolumes.incr(val - oldVal);
}
public void setNumBuckets(long val) {
this.numBuckets.incr(val);
long oldVal = this.numBuckets.value();
this.numBuckets.incr(val - oldVal);
}
public void setNumKeys(long val) {
this.numKeys.incr(val);
long oldVal = this.numKeys.value();
this.numKeys.incr(val- oldVal);
}
public long getNumVolumes() {

View File

@ -25,6 +25,7 @@
import com.google.protobuf.BlockingService;
import java.net.InetAddress;
import java.nio.file.Path;
import java.security.PrivateKey;
import java.security.PublicKey;
import java.security.KeyPair;
@ -143,6 +144,10 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.utils.RetriableTask;
import org.apache.hadoop.utils.db.DBCheckpoint;
import org.apache.hadoop.utils.db.DBStore;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.LifeCycle;
import org.bouncycastle.pkcs.PKCS10CertificationRequest;
import org.slf4j.Logger;
@ -236,18 +241,20 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private RPC.Server omRpcServer;
private InetSocketAddress omRpcAddress;
private String omId;
private final OMMetadataManager metadataManager;
private final VolumeManager volumeManager;
private final BucketManager bucketManager;
private final KeyManager keyManager;
private final PrefixManagerImpl prefixManager;
private OMMetadataManager metadataManager;
private VolumeManager volumeManager;
private BucketManager bucketManager;
private KeyManager keyManager;
private PrefixManagerImpl prefixManager;
private S3BucketManager s3BucketManager;
private final OMMetrics metrics;
private OzoneManagerHttpServer httpServer;
private final OMStorage omStorage;
private final ScmBlockLocationProtocol scmBlockClient;
private final StorageContainerLocationProtocol scmContainerClient;
private ObjectName omInfoBeanName;
private final S3BucketManager s3BucketManager;
private Timer metricsTimer;
private ScheduleOMMetricsWriteTask scheduleOMMetricsWriteTask;
private static final ObjectWriter WRITER =
@ -258,7 +265,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private final Runnable shutdownHook;
private final File omMetaDir;
private final boolean isAclEnabled;
private final IAccessAuthorizer accessAuthorizer;
private IAccessAuthorizer accessAuthorizer;
private JvmPauseMonitor jvmPauseMonitor;
private final SecurityConfig secConfig;
private S3SecretManager s3SecretManager;
@ -308,12 +315,37 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
throw new OMException("OM not initialized.",
ResultCodes.OM_NOT_INITIALIZED);
}
// Read configuration and set values.
ozAdmins = conf.getTrimmedStringCollection(OZONE_ADMINISTRATORS);
omMetaDir = OmUtils.getOmDbDir(configuration);
this.isAclEnabled = conf.getBoolean(OZONE_ACL_ENABLED,
OZONE_ACL_ENABLED_DEFAULT);
this.scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE,
OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
this.preallocateBlocksMax = conf.getInt(
OZONE_KEY_PREALLOCATION_BLOCKS_MAX,
OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT);
this.grpcBlockTokenEnabled = conf.getBoolean(HDDS_BLOCK_TOKEN_ENABLED,
HDDS_BLOCK_TOKEN_ENABLED_DEFAULT);
this.useRatisForReplication = conf.getBoolean(
DFS_CONTAINER_RATIS_ENABLED_KEY, DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
// TODO: This is a temporary check. Once fully implemented, all OM state
// change should go through Ratis - be it standalone (for non-HA) or
// replicated (for HA).
isRatisEnabled = configuration.getBoolean(
OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
// Load HA related configurations
loadOMHAConfigs(configuration);
InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress();
omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString());
scmContainerClient = getScmContainerClient(configuration);
// verifies that the SCM info in the OM Version file is correct.
scmBlockClient = getScmBlockClient(configuration);
this.scmClient = new ScmClient(scmBlockClient, scmContainerClient);
// For testing purpose only, not hit scm from om as Hadoop UGI can't login
// two principals in the same JVM.
@ -329,16 +361,32 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
metadataManager = new OmMetadataManagerImpl(configuration);
secConfig = new SecurityConfig(configuration);
// Create the KMS Key Provider
try {
kmsProvider = createKeyProviderExt(configuration);
} catch (IOException ioe) {
kmsProvider = null;
LOG.error("Fail to create Key Provider");
}
if (secConfig.isSecurityEnabled()) {
omComponent = OM_DAEMON + "-" + omId;
if(omStorage.getOmCertSerialId() == null) {
throw new RuntimeException("OzoneManager started in secure mode but " +
"doesn't have SCM signed certificate.");
}
certClient = new OMCertificateClient(new SecurityConfig(conf),
omStorage.getOmCertSerialId());
}
if (secConfig.isBlockTokenEnabled()) {
blockTokenMgr = createBlockTokenSecretManager(configuration);
}
instantiateServices();
initializeRatisServer();
initializeRatisClient();
// This is a temporary check. Once fully implemented, all OM state change
// should go through Ratis - be it standalone (for non-HA) or replicated
// (for HA).
isRatisEnabled = configuration.getBoolean(
OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
startRatisServer();
startRatisClient();
if (isRatisEnabled) {
// Create Ratis storage dir
String omRatisDirectory = OmUtils.getOMRatisDirectory(configuration);
@ -361,59 +409,44 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
OM_RATIS_SNAPSHOT_INDEX);
this.snapshotIndex = loadRatisSnapshotIndex();
InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress();
omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString());
secConfig = new SecurityConfig(configuration);
volumeManager = new VolumeManagerImpl(metadataManager, configuration);
// Create the KMS Key Provider
try {
kmsProvider = createKeyProviderExt(configuration);
} catch (IOException ioe) {
kmsProvider = null;
LOG.error("Fail to create Key Provider");
}
bucketManager = new BucketManagerImpl(metadataManager, getKmsProvider(),
isRatisEnabled);
metrics = OMMetrics.create();
s3BucketManager = new S3BucketManagerImpl(configuration, metadataManager,
volumeManager, bucketManager);
if (secConfig.isSecurityEnabled()) {
omComponent = OM_DAEMON + "-" + omId;
if(omStorage.getOmCertSerialId() == null) {
throw new RuntimeException("OzoneManager started in secure mode but " +
"doesn't have SCM signed certificate.");
}
certClient = new OMCertificateClient(new SecurityConfig(conf),
omStorage.getOmCertSerialId());
s3SecretManager = new S3SecretManagerImpl(configuration, metadataManager);
delegationTokenMgr = createDelegationTokenSecretManager(configuration);
}
if (secConfig.isBlockTokenEnabled()) {
blockTokenMgr = createBlockTokenSecretManager(configuration);
}
// Start Om Rpc Server.
omRpcServer = getRpcServer(conf);
omRpcAddress = updateRPCListenAddress(configuration,
OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
this.scmClient = new ScmClient(scmBlockClient, scmContainerClient);
prefixManager = new PrefixManagerImpl(metadataManager);
keyManager = new KeyManagerImpl(this, scmClient, configuration,
omStorage.getOmId());
shutdownHook = () -> {
saveOmMetrics();
};
ShutdownHookManager.get().addShutdownHook(shutdownHook,
SHUTDOWN_HOOK_PRIORITY);
isAclEnabled = conf.getBoolean(OZONE_ACL_ENABLED,
OZONE_ACL_ENABLED_DEFAULT);
}
/**
* Instantiate services which are dependent on the OM DB state.
* When OM state is reloaded, these services are re-initialized with the
* new OM state.
*/
private void instantiateServices() throws IOException {
metadataManager = new OmMetadataManagerImpl(configuration);
volumeManager = new VolumeManagerImpl(metadataManager, configuration);
bucketManager = new BucketManagerImpl(metadataManager, getKmsProvider(),
isRatisEnabled);
s3BucketManager = new S3BucketManagerImpl(configuration, metadataManager,
volumeManager, bucketManager);
if (secConfig.isSecurityEnabled()) {
s3SecretManager = new S3SecretManagerImpl(configuration, metadataManager);
delegationTokenMgr = createDelegationTokenSecretManager(configuration);
}
prefixManager = new PrefixManagerImpl(metadataManager);
keyManager = new KeyManagerImpl(this, scmClient, configuration,
omStorage.getOmId());
if (isAclEnabled) {
accessAuthorizer = getACLAuthorizerInstance(conf);
accessAuthorizer = getACLAuthorizerInstance(configuration);
if (accessAuthorizer instanceof OzoneNativeAuthorizer) {
OzoneNativeAuthorizer authorizer =
(OzoneNativeAuthorizer) accessAuthorizer;
@ -425,17 +458,6 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
} else {
accessAuthorizer = null;
}
ozAdmins = conf.getTrimmedStringCollection(OZONE_ADMINISTRATORS);
omMetaDir = OmUtils.getOmDbDir(configuration);
this.scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE,
OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
this.preallocateBlocksMax = conf.getInt(
OZONE_KEY_PREALLOCATION_BLOCKS_MAX,
OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT);
this.grpcBlockTokenEnabled = conf.getBoolean(HDDS_BLOCK_TOKEN_ENABLED,
HDDS_BLOCK_TOKEN_ENABLED_DEFAULT);
this.useRatisForReplication = conf.getBoolean(
DFS_CONTAINER_RATIS_ENABLED_KEY, DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
}
/**
@ -1235,6 +1257,14 @@ public void start() throws IOException {
DefaultMetricsSystem.initialize("OzoneManager");
// Start Ratis services
if (omRatisServer != null) {
omRatisServer.start();
}
if (omRatisClient != null) {
omRatisClient.connect();
}
metadataManager.start(configuration);
startSecretManagerIfNecessary();
@ -1305,8 +1335,14 @@ public void restart() throws IOException {
omRpcServer.start();
isOmRpcServerRunning = true;
startRatisServer();
startRatisClient();
initializeRatisServer();
if (omRatisServer != null) {
omRatisServer.start();
}
initializeRatisClient();
if (omRatisClient != null) {
omRatisClient.connect();
}
try {
httpServer = new OzoneManagerHttpServer(configuration, this);
@ -1353,15 +1389,13 @@ private RPC.Server getRpcServer(OzoneConfiguration conf) throws IOException {
/**
* Creates an instance of ratis server.
*/
private void startRatisServer() throws IOException {
private void initializeRatisServer() throws IOException {
if (isRatisEnabled) {
if (omRatisServer == null) {
omRatisServer = OzoneManagerRatisServer.newOMRatisServer(
configuration, this, omNodeDetails, peerNodes);
}
omRatisServer.start();
LOG.info("OzoneManager Ratis server started at port {}",
LOG.info("OzoneManager Ratis server initialized at port {}",
omRatisServer.getServerPort());
} else {
omRatisServer = null;
@ -1371,14 +1405,13 @@ private void startRatisServer() throws IOException {
/**
* Creates an instance of ratis client.
*/
private void startRatisClient() throws IOException {
private void initializeRatisClient() throws IOException {
if (isRatisEnabled) {
if (omRatisClient == null) {
omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(
omNodeDetails.getOMNodeId(), omRatisServer.getRaftGroup(),
configuration);
}
omRatisClient.connect();
} else {
omRatisClient = null;
}
@ -1398,11 +1431,13 @@ public long loadRatisSnapshotIndex() {
}
@Override
public long saveRatisSnapshot() throws IOException {
public long saveRatisSnapshot(boolean flush) throws IOException {
snapshotIndex = omRatisServer.getStateMachineLastAppliedIndex();
// Flush the OM state to disk
getMetadataManager().getStore().flush();
if (flush) {
// Flush the OM state to disk
metadataManager.getStore().flush();
}
PersistentLongFile.writeFile(ratisSnapshotFile, snapshotIndex);
LOG.info("Saved Ratis Snapshot on the OM with snapshotIndex {}",
@ -2697,7 +2732,6 @@ public List<OmBucketInfo> listS3Buckets(String userName, String startKey,
}
}
}
@Override
public OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws
IOException {
@ -3069,6 +3103,179 @@ public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
}
}
/**
* Download and install latest checkpoint from leader OM.
* If the download checkpoints snapshot index is greater than this OM's
* last applied transaction index, then re-initialize the OM state via this
* checkpoint. Before re-initializing OM state, the OM Ratis server should
* be stopped so that no new transactions can be applied.
* @param leaderId peerNodeID of the leader OM
* @return If checkpoint is installed, return the corresponding termIndex.
* Otherwise, return null.
*/
public TermIndex installSnapshot(String leaderId) {
if (omSnapshotProvider == null) {
LOG.error("OM Snapshot Provider is not configured as there are no peer " +
"nodes.");
return null;
}
DBCheckpoint omDBcheckpoint = getDBCheckpointFromLeader(leaderId);
Path newDBlocation = omDBcheckpoint.getCheckpointLocation();
// Check if current ratis log index is smaller than the downloaded
// snapshot index. If yes, proceed by stopping the ratis server so that
// the OM state can be re-initialized. If no, then do not proceed with
// installSnapshot.
long lastAppliedIndex = omRatisServer.getStateMachineLastAppliedIndex();
long checkpointSnapshotIndex = omDBcheckpoint.getRatisSnapshotIndex();
if (checkpointSnapshotIndex <= lastAppliedIndex) {
LOG.error("Failed to install checkpoint from OM leader: {}. The last " +
"applied index: {} is greater than or equal to the checkpoint's " +
"snapshot index: {}. Deleting the downloaded checkpoint {}", leaderId,
lastAppliedIndex, checkpointSnapshotIndex,
newDBlocation);
try {
FileUtils.deleteFully(newDBlocation);
} catch (IOException e) {
LOG.error("Failed to fully delete the downloaded DB checkpoint {} " +
"from OM leader {}.", newDBlocation,
leaderId, e);
}
return null;
}
// Pause the State Machine so that no new transactions can be applied.
// This action also clears the OM Double Buffer so that if there are any
// pending transactions in the buffer, they are discarded.
// TODO: The Ratis server should also be paused here. This is required
// because a leader election might happen while the snapshot
// installation is in progress and the new leader might start sending
// append log entries to the ratis server.
omRatisServer.getOmStateMachine().pause();
File dbBackup;
try {
dbBackup = replaceOMDBWithCheckpoint(lastAppliedIndex, newDBlocation);
} catch (Exception e) {
LOG.error("OM DB checkpoint replacement with new downloaded checkpoint " +
"failed.", e);
return null;
}
// Reload the OM DB store with the new checkpoint.
// Restart (unpause) the state machine and update its last applied index
// to the installed checkpoint's snapshot index.
try {
reloadOMState(checkpointSnapshotIndex);
omRatisServer.getOmStateMachine().unpause(checkpointSnapshotIndex);
} catch (IOException e) {
LOG.error("Failed to reload OM state with new DB checkpoint.", e);
return null;
}
// Delete the backup DB
try {
FileUtils.deleteFully(dbBackup);
} catch (IOException e) {
LOG.error("Failed to delete the backup of the original DB {}", dbBackup);
}
// TODO: We should only return the snpashotIndex to the leader.
// Should be fixed after RATIS-586
TermIndex newTermIndex = TermIndex.newTermIndex(0,
checkpointSnapshotIndex);
return newTermIndex;
}
/**
* Download the latest OM DB checkpoint from the leader OM.
* @param leaderId OMNodeID of the leader OM node.
* @return latest DB checkpoint from leader OM.
*/
private DBCheckpoint getDBCheckpointFromLeader(String leaderId) {
LOG.info("Downloading checkpoint from leader OM {} and reloading state " +
"from the checkpoint.", leaderId);
try {
return omSnapshotProvider.getOzoneManagerDBSnapshot(leaderId);
} catch (IOException e) {
LOG.error("Failed to download checkpoint from OM leader {}", leaderId, e);
}
return null;
}
/**
* Replace the current OM DB with the new DB checkpoint.
* @param lastAppliedIndex the last applied index in the current OM DB.
* @param checkpointPath path to the new DB checkpoint
* @return location of the backup of the original DB
* @throws Exception
*/
File replaceOMDBWithCheckpoint(long lastAppliedIndex, Path checkpointPath)
throws Exception {
// Stop the DB first
DBStore store = metadataManager.getStore();
store.close();
// Take a backup of the current DB
File db = store.getDbLocation();
String dbBackupName = OzoneConsts.OM_DB_BACKUP_PREFIX +
lastAppliedIndex + "_" + System.currentTimeMillis();
File dbBackup = new File(db.getParentFile(), dbBackupName);
try {
Files.move(db.toPath(), dbBackup.toPath());
} catch (IOException e) {
LOG.error("Failed to create a backup of the current DB. Aborting " +
"snapshot installation.");
throw e;
}
// Move the new DB checkpoint into the om metadata dir
try {
Files.move(checkpointPath, db.toPath());
} catch (IOException e) {
LOG.error("Failed to move downloaded DB checkpoint {} to metadata " +
"directory {}. Resetting to original DB.", checkpointPath,
db.toPath());
Files.move(dbBackup.toPath(), db.toPath());
throw e;
}
return dbBackup;
}
/**
* Re-instantiate MetadataManager with new DB checkpoint.
* All the classes which use/ store MetadataManager should also be updated
* with the new MetadataManager instance.
*/
void reloadOMState(long newSnapshotIndex) throws IOException {
instantiateServices();
// Restart required services
metadataManager.start(configuration);
keyManager.start(configuration);
// Set metrics and start metrics back ground thread
metrics.setNumVolumes(metadataManager.countRowsInTable(metadataManager
.getVolumeTable()));
metrics.setNumBuckets(metadataManager.countRowsInTable(metadataManager
.getBucketTable()));
// Delete the omMetrics file if it exists and save the a new metrics file
// with new data
Files.deleteIfExists(getMetricsStorageFile().toPath());
saveOmMetrics();
// Update OM snapshot index with the new snapshot index (from the new OM
// DB state) and save the snapshot index to disk
this.snapshotIndex = newSnapshotIndex;
saveRatisSnapshot(false);
}
public static Logger getLogger() {
return LOG;
}

View File

@ -308,10 +308,15 @@ public RaftGroup getRaftGroup() {
}
/**
* Returns OzoneManager StateMachine.
* Initializes and returns OzoneManager StateMachine.
*/
private OzoneManagerStateMachine getStateMachine() {
return new OzoneManagerStateMachine(this);
return new OzoneManagerStateMachine(this);
}
@VisibleForTesting
public OzoneManagerStateMachine getOmStateMachine() {
return omStateMachine;
}
public OzoneManager getOzoneManager() {
@ -387,6 +392,12 @@ private RaftProperties newRaftProperties(Configuration conf) {
SizeInBytes.valueOf(logAppenderQueueByteLimit));
RaftServerConfigKeys.Log.setPreallocatedSize(properties,
SizeInBytes.valueOf(raftSegmentPreallocatedSize));
RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties,
false);
final int logPurgeGap = conf.getInt(
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP,
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP_DEFAULT);
RaftServerConfigKeys.Log.setPurgeGap(properties, logPurgeGap);
// For grpc set the maximum message size
// TODO: calculate the optimal max message size

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.ozone.container.common.transport.server.ratis
.ContainerStateMachine;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
@ -43,12 +44,15 @@
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -68,8 +72,9 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
private OzoneManagerHARequestHandler handler;
private RaftGroupId raftGroupId;
private long lastAppliedIndex = 0;
private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
private OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
private final ExecutorService executorService;
private final ExecutorService installSnapshotExecutor;
public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
this.omRatisServer = ratisServer;
@ -82,19 +87,20 @@ public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("OM StateMachine ApplyTransaction Thread - %d").build();
this.executorService = HadoopExecutors.newSingleThreadExecutor(build);
this.installSnapshotExecutor = HadoopExecutors.newSingleThreadExecutor();
}
/**
* Initializes the State Machine with the given server, group and storage.
* TODO: Load the latest snapshot from the file system.
*/
@Override
public void initialize(
RaftServer server, RaftGroupId id, RaftStorage raftStorage)
throws IOException {
super.initialize(server, id, raftStorage);
this.raftGroupId = id;
storage.init(raftStorage);
public void initialize(RaftServer server, RaftGroupId id,
RaftStorage raftStorage) throws IOException {
lifeCycle.startAndTransition(() -> {
super.initialize(server, id, raftStorage);
this.raftGroupId = id;
storage.init(raftStorage);
});
}
/**
@ -185,6 +191,27 @@ public CompletableFuture<Message> query(Message request) {
}
}
@Override
public void pause() {
lifeCycle.transition(LifeCycle.State.PAUSING);
lifeCycle.transition(LifeCycle.State.PAUSED);
ozoneManagerDoubleBuffer.stop();
}
/**
* Unpause the StateMachine, re-initialize the DoubleBuffer and update the
* lastAppliedIndex. This should be done after uploading new state to the
* StateMachine.
*/
public void unpause(long newLastAppliedSnaphsotIndex) {
lifeCycle.startAndTransition(() -> {
this.ozoneManagerDoubleBuffer =
new OzoneManagerDoubleBuffer(ozoneManager.getMetadataManager(),
this::updateLastAppliedIndex);
this.updateLastAppliedIndex(newLastAppliedSnaphsotIndex);
});
}
/**
* Take OM Ratis snapshot. Write the snapshot index to file. Snapshot index
* is the log index corresponding to the last applied transaction on the OM
@ -197,11 +224,44 @@ public CompletableFuture<Message> query(Message request) {
public long takeSnapshot() throws IOException {
LOG.info("Saving Ratis snapshot on the OM.");
if (ozoneManager != null) {
return ozoneManager.saveRatisSnapshot();
return ozoneManager.saveRatisSnapshot(true);
}
return 0;
}
/**
* Leader OM has purged entries from its log. To catch up, OM must download
* the latest checkpoint from the leader OM and install it.
* @param roleInfoProto the leader node information
* @param firstTermIndexInLog TermIndex of the first append entry available
* in the Leader's log.
* @return the last term index included in the installed snapshot.
*/
@Override
public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
RaftProtos.RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
String leaderNodeId = RaftPeerId.valueOf(roleInfoProto.getSelf().getId())
.toString();
LOG.info("Received install snapshot notificaiton form OM leader: {} with " +
"term index: {}", leaderNodeId, firstTermIndexInLog);
if (!roleInfoProto.getRole().equals(RaftProtos.RaftPeerRole.LEADER)) {
// A non-leader Ratis server should not send this notification.
LOG.error("Received Install Snapshot notification from non-leader OM " +
"node: {}. Ignoring the notification.", leaderNodeId);
return completeExceptionally(new OMException("Received notification to " +
"install snaphost from non-leader OM node",
OMException.ResultCodes.RATIS_ERROR));
}
CompletableFuture<TermIndex> future = CompletableFuture.supplyAsync(
() -> ozoneManager.installSnapshot(leaderNodeId),
installSnapshotExecutor);
return future;
}
/**
* Notifies the state machine that the raft peer is no longer leader.
*/
@ -276,10 +336,9 @@ public void setRaftGroupId(RaftGroupId raftGroupId) {
this.raftGroupId = raftGroupId;
}
public void stop() {
ozoneManagerDoubleBuffer.stop();
HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
HadoopExecutors.shutdown(installSnapshotExecutor, LOG, 5, TimeUnit.SECONDS);
}
}

View File

@ -149,7 +149,7 @@ private void closeHttpClient() throws IOException {
* @param leaderOMNodeID leader OM Node ID.
* @return the DB checkpoint (including the ratis snapshot index)
*/
protected DBCheckpoint getOzoneManagerDBSnapshot(String leaderOMNodeID)
public DBCheckpoint getOzoneManagerDBSnapshot(String leaderOMNodeID)
throws IOException {
String snapshotFileName = OM_SNAPSHOT_DB + "_" + System.currentTimeMillis();
File targetFile = new File(omSnapshotDir, snapshotFileName + ".tar.gz");