HDDS-1339. Implement ratis snapshots on OM (#651)

This commit is contained in:
Hanisha Koneru 2019-04-03 22:50:28 -07:00 committed by GitHub
parent 7b5b783f66
commit f09a78f73f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 269 additions and 72 deletions

View File

@ -279,4 +279,7 @@ public final class OzoneConsts {
// Dummy OMNodeID for OM Clients to use for a non-HA OM setup // Dummy OMNodeID for OM Clients to use for a non-HA OM setup
public static final String OM_NODE_ID_DUMMY = "omNodeIdDummy"; public static final String OM_NODE_ID_DUMMY = "omNodeIdDummy";
// OM Ratis snapshot file to store the last applied index
public static final String OM_RATIS_SNAPSHOT_INDEX = "ratisSnapshotIndex";
} }

View File

@ -145,7 +145,7 @@ public abstract class Storage {
* *
* @return the directory path * @return the directory path
*/ */
private File getCurrentDir() { public File getCurrentDir() {
return new File(storageDir, STORAGE_DIR_CURRENT); return new File(storageDir, STORAGE_DIR_CURRENT);
} }

View File

@ -65,6 +65,12 @@ public interface DBStore extends AutoCloseable {
*/ */
ArrayList<Table> listTables() throws IOException; ArrayList<Table> listTables() throws IOException;
/**
* Flush the DB buffer onto persistent storage.
* @throws IOException
*/
void flush() throws IOException;
/** /**
* Compact the entire database. * Compact the entire database.
* *

View File

@ -272,6 +272,17 @@ public class RDBStore implements DBStore {
return returnList; return returnList;
} }
@Override
public void flush() throws IOException {
final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true);
try {
db.flush(flushOptions);
} catch (RocksDBException e) {
LOG.error("Unable to Flush RocksDB data", e);
throw toIOException("Unable to Flush RocksDB data", e);
}
}
@Override @Override
public DBCheckpoint getCheckpoint(boolean flush) { public DBCheckpoint getCheckpoint(boolean flush) {
final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(flush); final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(flush);

View File

@ -1603,18 +1603,27 @@
<property> <property>
<name>ozone.om.ratis.log.appender.queue.num-elements</name> <name>ozone.om.ratis.log.appender.queue.num-elements</name>
<value>1024</value> <value>1024</value>
<tag>OZONE, DEBUG, CONTAINER, RATIS</tag> <tag>OZONE, DEBUG, OM, RATIS</tag>
<description>Number of operation pending with Raft's Log Worker. <description>Number of operation pending with Raft's Log Worker.
</description> </description>
</property> </property>
<property> <property>
<name>ozone.om.ratis.log.appender.queue.byte-limit</name> <name>ozone.om.ratis.log.appender.queue.byte-limit</name>
<value>32MB</value> <value>32MB</value>
<tag>OZONE, DEBUG, CONTAINER, RATIS</tag> <tag>OZONE, DEBUG, OM, RATIS</tag>
<description>Byte limit for Raft's Log Worker queue. <description>Byte limit for Raft's Log Worker queue.
</description> </description>
</property> </property>
<property>
<name>ozone.om.ratis.snapshot.auto.trigger.threshold</name>
<value>400000</value>
<tag>OZONE, DEBUG, OM, RATIS</tag>
<description>The log index threshold after ratis will auto trigger
snapshot on the OM state machine.
</description>
</property>
<property> <property>
<name>ozone.om.ratis.server.request.timeout</name> <name>ozone.om.ratis.server.request.timeout</name>
<value>3s</value> <value>3s</value>

View File

@ -136,6 +136,12 @@ public final class OMConfigKeys {
public static final String public static final String
OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB"; OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";
// OM Snapshot configurations
public static final String OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY
= "ozone.om.ratis.snapshot.auto.trigger.threshold";
public static final long
OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_DEFAULT
= 400000;
// OM Ratis server configurations // OM Ratis server configurations
public static final String OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_KEY public static final String OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_KEY

View File

@ -36,6 +36,14 @@ import java.io.IOException;
*/ */
public interface OzoneManagerHAProtocol { public interface OzoneManagerHAProtocol {
/**
* Store the snapshot index i.e. the raft log index, corresponding to the
* last transaction applied to the OM RocksDB, in OM metadata dir on disk.
* @return the snapshot index
* @throws IOException
*/
long saveRatisSnapshot() throws IOException;
/** /**
* Add a allocate block, it is assumed that the client is having an open * Add a allocate block, it is assumed that the client is having an open
* key session going on. This block will be appended to this open key session. * key session going on. This block will be appended to this open key session.
@ -56,7 +64,6 @@ public interface OzoneManagerHAProtocol {
OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID, OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
KeyLocation keyLocation) throws IOException; KeyLocation keyLocation) throws IOException;
/** /**
* Add the openKey entry with given keyInfo and clientID in to openKeyTable. * Add the openKey entry with given keyInfo and clientID in to openKeyTable.
* This will be called only from applyTransaction, once after calling * This will be called only from applyTransaction, once after calling
@ -81,5 +88,4 @@ public interface OzoneManagerHAProtocol {
*/ */
OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs omKeyArgs, OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs omKeyArgs,
String multipartUploadID) throws IOException; String multipartUploadID) throws IOException;
} }

View File

@ -87,6 +87,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
return this.ozoneManagers.get(index); return this.ozoneManagers.get(index);
} }
public OzoneManager getOzoneManager(String omNodeId) {
return this.ozoneManagerMap.get(omNodeId);
}
@Override @Override
public void restartOzoneManager() throws IOException { public void restartOzoneManager() throws IOException {
for (OzoneManager ozoneManager : ozoneManagers) { for (OzoneManager ozoneManager : ozoneManagers) {

View File

@ -76,6 +76,7 @@ public class TestOzoneManagerHA {
private String clusterId; private String clusterId;
private String scmId; private String scmId;
private int numOfOMs = 3; private int numOfOMs = 3;
private static final long SNAPSHOT_THRESHOLD = 50;
@Rule @Rule
public ExpectedException exception = ExpectedException.none(); public ExpectedException exception = ExpectedException.none();
@ -99,7 +100,9 @@ public class TestOzoneManagerHA {
conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2); conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
conf.setInt(OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 10); conf.setInt(OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 10);
conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 10); conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 10);
conf.setLong(
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
SNAPSHOT_THRESHOLD);
cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf) cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
.setClusterId(clusterId) .setClusterId(clusterId)
.setScmId(scmId) .setScmId(scmId)
@ -326,9 +329,8 @@ public class TestOzoneManagerHA {
throw e; throw e;
} }
} }
} }
/** /**
* Create a volume and test its attribute. * Create a volume and test its attribute.
*/ */
@ -370,8 +372,6 @@ public class TestOzoneManagerHA {
} }
} }
/** /**
* Test that OMFailoverProxyProvider creates an OM proxy for each OM in the * Test that OMFailoverProxyProvider creates an OM proxy for each OM in the
* cluster. * cluster.
@ -533,4 +533,84 @@ public class TestOzoneManagerHA {
proxyProvider.getCurrentProxyOMNodeId()); proxyProvider.getCurrentProxyOMNodeId());
} }
} }
@Test
public void testOMRatisSnapshot() throws Exception {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
.setOwner(userName)
.setAdmin(adminName)
.build();
objectStore.createVolume(volumeName, createVolumeArgs);
OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
retVolumeinfo.createBucket(bucketName);
OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
String leaderOMNodeId = objectStore.getClientProxy().getOMProxyProvider()
.getCurrentProxyOMNodeId();
OzoneManager ozoneManager = cluster.getOzoneManager(leaderOMNodeId);
// Send commands to ratis to increase the log index so that ratis
// triggers a snapshot on the state machine.
long appliedLogIndex = 0;
while (appliedLogIndex <= SNAPSHOT_THRESHOLD) {
createKey(ozoneBucket);
appliedLogIndex = ozoneManager.getOmRatisServer()
.getStateMachineLastAppliedIndex();
}
GenericTestUtils.waitFor(() -> {
if (ozoneManager.loadRatisSnapshotIndex() > 0) {
return true;
}
return false;
}, 1000, 100000);
// The current lastAppliedLogIndex on the state machine should be greater
// than or equal to the saved snapshot index.
long smLastAppliedIndex =
ozoneManager.getOmRatisServer().getStateMachineLastAppliedIndex();
long ratisSnapshotIndex = ozoneManager.loadRatisSnapshotIndex();
Assert.assertTrue("LastAppliedIndex on OM State Machine ("
+ smLastAppliedIndex + ") is less than the saved snapshot index("
+ ratisSnapshotIndex + ").",
smLastAppliedIndex >= ratisSnapshotIndex);
// Add more transactions to Ratis to trigger another snapshot
while (appliedLogIndex <= (smLastAppliedIndex + SNAPSHOT_THRESHOLD)) {
createKey(ozoneBucket);
appliedLogIndex = ozoneManager.getOmRatisServer()
.getStateMachineLastAppliedIndex();
}
GenericTestUtils.waitFor(() -> {
if (ozoneManager.loadRatisSnapshotIndex() > 0) {
return true;
}
return false;
}, 1000, 100000);
// The new snapshot index must be greater than the previous snapshot index
long ratisSnapshotIndexNew = ozoneManager.loadRatisSnapshotIndex();
Assert.assertTrue("Latest snapshot index must be greater than previous " +
"snapshot indices", ratisSnapshotIndexNew > ratisSnapshotIndex);
}
private void createKey(OzoneBucket ozoneBucket) throws IOException {
String keyName = "key" + RandomStringUtils.randomNumeric(5);
String data = "data" + RandomStringUtils.randomNumeric(5);
OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName,
data.length(), ReplicationType.STAND_ALONE,
ReplicationFactor.ONE, new HashMap<>());
ozoneOutputStream.write(data.getBytes(), 0, data.length());
ozoneOutputStream.close();
}
} }

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRe
import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.util.PersistentLongFile;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.Client;
@ -179,6 +180,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_FILE; import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_FILE;
import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_TEMP_FILE; import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_TEMP_FILE;
import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_INDEX;
import static org.apache.hadoop.ozone.OzoneConsts.RPC_PORT; import static org.apache.hadoop.ozone.OzoneConsts.RPC_PORT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys import static org.apache.hadoop.ozone.om.OMConfigKeys
@ -233,11 +235,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private RPC.Server omRpcServer; private RPC.Server omRpcServer;
private InetSocketAddress omRpcAddress; private InetSocketAddress omRpcAddress;
private String omId; private String omId;
private OMNodeDetails omNodeDetails;
private List<OMNodeDetails> peerNodes; private List<OMNodeDetails> peerNodes;
private boolean isRatisEnabled;
private OzoneManagerRatisServer omRatisServer;
private OzoneManagerRatisClient omRatisClient;
private final OMMetadataManager metadataManager; private final OMMetadataManager metadataManager;
private final VolumeManager volumeManager; private final VolumeManager volumeManager;
private final BucketManager bucketManager; private final BucketManager bucketManager;
@ -266,6 +264,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private volatile boolean isOmRpcServerRunning = false; private volatile boolean isOmRpcServerRunning = false;
private String omComponent; private String omComponent;
private boolean isRatisEnabled;
private OzoneManagerRatisServer omRatisServer;
private OzoneManagerRatisClient omRatisClient;
private OMNodeDetails omNodeDetails;
private final File ratisSnapshotFile;
private long snapshotIndex;
private KeyProviderCryptoExtension kmsProvider = null; private KeyProviderCryptoExtension kmsProvider = null;
private static String keyProviderUriKeyName = private static String keyProviderUriKeyName =
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH; CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
@ -306,6 +311,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
startRatisServer(); startRatisServer();
startRatisClient(); startRatisClient();
this.ratisSnapshotFile = new File(omStorage.getCurrentDir(),
OM_RATIS_SNAPSHOT_INDEX);
this.snapshotIndex = loadRatisSnapshotIndex();
InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress(); InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress();
omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString()); omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString());
@ -1307,6 +1316,33 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
} }
} }
@VisibleForTesting
public long loadRatisSnapshotIndex() {
if (ratisSnapshotFile.exists()) {
try {
return PersistentLongFile.readFile(ratisSnapshotFile, 0);
} catch (IOException e) {
LOG.error("Unable to read the ratis snapshot index (last applied " +
"transaction log index)", e);
}
}
return 0;
}
@Override
public long saveRatisSnapshot() throws IOException {
snapshotIndex = omRatisServer.getStateMachineLastAppliedIndex();
// Flush the OM state to disk
getMetadataManager().getStore().flush();
PersistentLongFile.writeFile(ratisSnapshotFile, snapshotIndex);
LOG.info("Saved Ratis Snapshot on the OM with snapshotIndex {}",
snapshotIndex);
return snapshotIndex;
}
/** /**
* Stop service. * Stop service.
*/ */
@ -2103,7 +2139,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
} }
} }
@Override @Override
public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID, public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
KeyLocation keyLocation) throws IOException { KeyLocation keyLocation) throws IOException {

View File

@ -40,7 +40,6 @@ import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMNodeDetails; import org.apache.hadoop.ozone.om.OMNodeDetails;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.client.RaftClientConfigKeys;
@ -60,7 +59,6 @@ import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle; import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.SizeInBytes;
@ -83,6 +81,7 @@ public final class OzoneManagerRatisServer {
private final RaftPeerId raftPeerId; private final RaftPeerId raftPeerId;
private final OzoneManagerServerProtocol ozoneManager; private final OzoneManagerServerProtocol ozoneManager;
private final OzoneManagerStateMachine omStateMachine;
private final ClientId clientId = ClientId.randomId(); private final ClientId clientId = ClientId.randomId();
private final ScheduledExecutorService scheduledRoleChecker; private final ScheduledExecutorService scheduledRoleChecker;
@ -130,11 +129,13 @@ public final class OzoneManagerRatisServer {
LOG.info("Instantiating OM Ratis server with GroupID: {} and " + LOG.info("Instantiating OM Ratis server with GroupID: {} and " +
"Raft Peers: {}", raftGroupIdStr, raftPeersStr.toString().substring(2)); "Raft Peers: {}", raftGroupIdStr, raftPeersStr.toString().substring(2));
this.omStateMachine = getStateMachine();
this.server = RaftServer.newBuilder() this.server = RaftServer.newBuilder()
.setServerId(this.raftPeerId) .setServerId(this.raftPeerId)
.setGroup(this.raftGroup) .setGroup(this.raftGroup)
.setProperties(serverProperties) .setProperties(serverProperties)
.setStateMachine(getStateMachine(this.raftGroupId)) .setStateMachine(omStateMachine)
.build(); .build();
// Run a scheduler to check and update the server role on the leader // Run a scheduler to check and update the server role on the leader
@ -156,7 +157,7 @@ public final class OzoneManagerRatisServer {
* Creates an instance of OzoneManagerRatisServer. * Creates an instance of OzoneManagerRatisServer.
*/ */
public static OzoneManagerRatisServer newOMRatisServer( public static OzoneManagerRatisServer newOMRatisServer(
Configuration ozoneConf, OzoneManager om, Configuration ozoneConf, OzoneManagerServerProtocol omProtocol,
OMNodeDetails omNodeDetails, List<OMNodeDetails> peerNodes) OMNodeDetails omNodeDetails, List<OMNodeDetails> peerNodes)
throws IOException { throws IOException {
@ -186,7 +187,7 @@ public final class OzoneManagerRatisServer {
raftPeers.add(raftPeer); raftPeers.add(raftPeer);
} }
return new OzoneManagerRatisServer(ozoneConf, om, omServiceId, return new OzoneManagerRatisServer(ozoneConf, omProtocol, omServiceId,
localRaftPeerId, ratisAddr, raftPeers); localRaftPeerId, ratisAddr, raftPeers);
} }
@ -197,7 +198,7 @@ public final class OzoneManagerRatisServer {
/** /**
* Returns OzoneManager StateMachine. * Returns OzoneManager StateMachine.
*/ */
private BaseStateMachine getStateMachine(RaftGroupId gid) { private OzoneManagerStateMachine getStateMachine() {
return new OzoneManagerStateMachine(this); return new OzoneManagerStateMachine(this);
} }
@ -382,10 +383,13 @@ public final class OzoneManagerRatisServer {
this.roleCheckInitialDelayMs = leaderElectionMinTimeout this.roleCheckInitialDelayMs = leaderElectionMinTimeout
.toLong(TimeUnit.MILLISECONDS); .toLong(TimeUnit.MILLISECONDS);
/** long snapshotAutoTriggerThreshold = conf.getLong(
* TODO: when ratis snapshots are implemented, set snapshot threshold and OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
* queue size. OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_DEFAULT);
*/ RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(
properties, true);
RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(
properties, snapshotAutoTriggerThreshold);
return properties; return properties;
} }
@ -517,4 +521,8 @@ public final class OzoneManagerRatisServer {
private UUID getRaftGroupIdFromOmServiceId(String omServiceId) { private UUID getRaftGroupIdFromOmServiceId(String omServiceId) {
return UUID.nameUUIDFromBytes(omServiceId.getBytes(StandardCharsets.UTF_8)); return UUID.nameUUIDFromBytes(omServiceId.getBytes(StandardCharsets.UTF_8));
} }
public long getStateMachineLastAppliedIndex() {
return omStateMachine.getLastAppliedIndex();
}
} }

View File

@ -69,6 +69,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
private final OzoneManagerServerProtocol ozoneManager; private final OzoneManagerServerProtocol ozoneManager;
private RequestHandler handler; private RequestHandler handler;
private RaftGroupId raftGroupId; private RaftGroupId raftGroupId;
private long lastAppliedIndex = 0;
public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) { public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
this.omRatisServer = ratisServer; this.omRatisServer = ratisServer;
@ -95,6 +96,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
* should be rejected. * should be rejected.
* @throws IOException thrown by the state machine while validating * @throws IOException thrown by the state machine while validating
*/ */
@Override
public TransactionContext startTransaction( public TransactionContext startTransaction(
RaftClientRequest raftClientRequest) throws IOException { RaftClientRequest raftClientRequest) throws IOException {
ByteString messageContent = raftClientRequest.getMessage().getContent(); ByteString messageContent = raftClientRequest.getMessage().getContent();
@ -115,7 +117,63 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
return ctxt; return ctxt;
} }
return handleStartTransactionRequests(raftClientRequest, omRequest); return handleStartTransactionRequests(raftClientRequest, omRequest);
}
/*
* Apply a committed log entry to the state machine.
*/
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
try {
OMRequest request = OMRatisHelper.convertByteStringToOMRequest(
trx.getStateMachineLogEntry().getLogData());
long trxLogIndex = trx.getLogEntry().getIndex();
CompletableFuture<Message> future = CompletableFuture
.supplyAsync(() -> runCommand(request, trxLogIndex));
return future;
} catch (IOException e) {
return completeExceptionally(e);
}
}
/**
* Query the state machine. The request must be read-only.
*/
@Override
public CompletableFuture<Message> query(Message request) {
try {
OMRequest omRequest = OMRatisHelper.convertByteStringToOMRequest(
request.getContent());
return CompletableFuture.completedFuture(queryCommand(omRequest));
} catch (IOException e) {
return completeExceptionally(e);
}
}
/**
* Take OM Ratis snapshot. Write the snapshot index to file. Snapshot index
* is the log index corresponding to the last applied transaction on the OM
* State Machine.
*
* @return the last applied index on the state machine which has been
* stored in the snapshot file.
*/
@Override
public long takeSnapshot() throws IOException {
LOG.info("Saving Ratis snapshot on the OM.");
if (ozoneManager != null) {
return ozoneManager.saveRatisSnapshot();
}
return 0;
}
/**
* Notifies the state machine that the raft peer is no longer leader.
*/
@Override
public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
throws IOException {
omRatisServer.updateServerRole();
} }
/** /**
@ -142,10 +200,8 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
.setLogData(raftClientRequest.getMessage().getContent()) .setLogData(raftClientRequest.getMessage().getContent())
.build(); .build();
} }
} }
private TransactionContext handleInitiateMultipartUpload( private TransactionContext handleInitiateMultipartUpload(
RaftClientRequest raftClientRequest, OMRequest omRequest) { RaftClientRequest raftClientRequest, OMRequest omRequest) {
@ -237,7 +293,6 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
.build(); .build();
} }
/** /**
* Handle AllocateBlock Request, which needs a special handling. This * Handle AllocateBlock Request, which needs a special handling. This
* request needs to be executed on the leader, where it connects to SCM and * request needs to be executed on the leader, where it connects to SCM and
@ -250,7 +305,6 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
RaftClientRequest raftClientRequest, OMRequest omRequest) { RaftClientRequest raftClientRequest, OMRequest omRequest) {
OMResponse omResponse = handler.handle(omRequest); OMResponse omResponse = handler.handle(omRequest);
// If request is failed, no need to proceed further. // If request is failed, no need to proceed further.
// Setting the exception with omResponse message and code. // Setting the exception with omResponse message and code.
@ -270,7 +324,6 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
return transactionContext; return transactionContext;
} }
// Get original request // Get original request
OzoneManagerProtocolProtos.AllocateBlockRequest allocateBlockRequest = OzoneManagerProtocolProtos.AllocateBlockRequest allocateBlockRequest =
omRequest.getAllocateBlockRequest(); omRequest.getAllocateBlockRequest();
@ -294,7 +347,6 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
.setServerRole(RaftProtos.RaftPeerRole.LEADER) .setServerRole(RaftProtos.RaftPeerRole.LEADER)
.setLogData(messageContent) .setLogData(messageContent)
.build(); .build();
} }
/** /**
@ -308,56 +360,33 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
STATUS_CODE + omResponse.getStatus()); STATUS_CODE + omResponse.getStatus());
} }
/*
* Apply a committed log entry to the state machine.
*/
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
try {
OMRequest request = OMRatisHelper.convertByteStringToOMRequest(
trx.getStateMachineLogEntry().getLogData());
CompletableFuture<Message> future = CompletableFuture
.supplyAsync(() -> runCommand(request));
return future;
} catch (IOException e) {
return completeExceptionally(e);
}
}
/** /**
* Query the state machine. The request must be read-only. * Submits write request to OM and returns the response Message.
*/
@Override
public CompletableFuture<Message> query(Message request) {
try {
OMRequest omRequest = OMRatisHelper.convertByteStringToOMRequest(
request.getContent());
return CompletableFuture.completedFuture(runCommand(omRequest));
} catch (IOException e) {
return completeExceptionally(e);
}
}
/**
* Notifies the state machine that the raft peer is no longer leader.
*/
@Override
public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
throws IOException {
omRatisServer.updateServerRole();
}
/**
* Submits request to OM and returns the response Message.
* @param request OMRequest * @param request OMRequest
* @return response from OM * @return response from OM
* @throws ServiceException * @throws ServiceException
*/ */
private Message runCommand(OMRequest request) { private Message runCommand(OMRequest request, long trxLogIndex) {
OMResponse response = handler.handle(request);
lastAppliedIndex = trxLogIndex;
return OMRatisHelper.convertResponseToMessage(response);
}
/**
* Submits read request to OM and returns the response Message.
* @param request OMRequest
* @return response from OM
* @throws ServiceException
*/
private Message queryCommand(OMRequest request) {
OMResponse response = handler.handle(request); OMResponse response = handler.handle(request);
return OMRatisHelper.convertResponseToMessage(response); return OMRatisHelper.convertResponseToMessage(response);
} }
public long getLastAppliedIndex() {
return lastAppliedIndex;
}
private static <T> CompletableFuture<T> completeExceptionally(Exception e) { private static <T> CompletableFuture<T> completeExceptionally(Exception e) {
final CompletableFuture<T> future = new CompletableFuture<>(); final CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(e); future.completeExceptionally(e);