HDDS-964. Fix test failure in TestOmMetrics. Contributed by Ajay Kumar.

This commit is contained in:
Ajay Kumar 2019-01-04 19:31:10 -08:00 committed by Xiaoyu Yao
parent 8978466fca
commit 0faa5701d9
2 changed files with 100 additions and 34 deletions

View File

@ -246,7 +246,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
@Override
public void restartOzoneManager() throws IOException {
ozoneManager.stop();
ozoneManager.start();
ozoneManager.restart();
}
@Override

View File

@ -44,15 +44,14 @@ import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient
import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.security.OzoneSecurityException;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.NetUtils;
@ -100,6 +99,7 @@ import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
import org.apache.hadoop.ozone.security.acl.RequestContext;
import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
import org.apache.hadoop.ozone.security.OzoneDelegationTokenSecretManager;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@ -220,6 +220,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private JvmPauseMonitor jvmPauseMonitor;
private final SecurityConfig secConfig;
private final S3SecretManager s3SecretManager;
private volatile boolean isOmRpcServerRunning = false;
private OzoneManager(OzoneConfiguration conf) throws IOException {
Preconditions.checkNotNull(conf);
@ -246,38 +247,12 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
scmContainerClient = null;
scmBlockClient = null;
}
InetSocketAddress omNodeRpcAddr = getOmAddress(configuration);
int handlerCount = configuration.getInt(OZONE_OM_HANDLER_COUNT_KEY,
OZONE_OM_HANDLER_COUNT_DEFAULT);
// This is a temporary check. Once fully implemented, all OM state change
// should go through Ratis - either standalone (for non-HA) or replicated
// (for HA).
boolean omRatisEnabled = configuration.getBoolean(
OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
if (omRatisEnabled) {
omRatisServer = OzoneManagerRatisServer.newOMRatisServer(this, omId,
omNodeRpcAddr.getAddress(), configuration);
omRatisServer.start();
LOG.info("OzoneManager Ratis server started at port {}",
omRatisServer.getServerPort());
omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(
omId, omRatisServer.getRaftGroup(), configuration);
omRatisClient.connect();
} else {
omRatisServer = null;
omRatisClient = null;
}
RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
BlockingService omService = newReflectiveBlockingService(
new OzoneManagerProtocolServerSideTranslatorPB(
this, omRatisClient, omRatisEnabled));
omRpcAddressTxt = new Text(OmUtils.getOmRpcAddress(configuration));
secConfig = new SecurityConfig(configuration);
if (secConfig.isGrpcBlockTokenEnabled()) {
@ -286,10 +261,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
if(secConfig.isSecurityEnabled()){
delegationTokenMgr = createDelegationTokenSecretManager(configuration);
}
omRpcServer = startRpcServer(configuration, omNodeRpcAddr,
OzoneManagerProtocolPB.class, omService,
handlerCount);
InetSocketAddress omNodeRpcAddr = getOmAddress(configuration);
omRpcServer = getRpcServer(conf);
omRpcAddress = updateRPCListenAddress(configuration,
OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
metadataManager = new OmMetadataManagerImpl(configuration);
@ -848,6 +821,53 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
keyManager.start(configuration);
omRpcServer.start();
isOmRpcServerRunning = true;
try {
httpServer = new OzoneManagerHttpServer(configuration, this);
httpServer.start();
} catch (Exception ex) {
// Allow OM to start as Http Server failure is not fatal.
LOG.error("OM HttpServer failed to start.", ex);
}
registerMXBean();
setStartTime();
}
/**
* Restarts the service. This method re-initializes the rpc server.
*/
public void restart() throws IOException {
LOG.info(buildRpcServerStartMessage("OzoneManager RPC server",
omRpcAddress));
DefaultMetricsSystem.initialize("OzoneManager");
metadataManager.start(configuration);
startSecretManagerIfNecessary();
// Set metrics and start metrics back ground thread
metrics.setNumVolumes(metadataManager.countRowsInTable(metadataManager
.getVolumeTable()));
metrics.setNumBuckets(metadataManager.countRowsInTable(metadataManager
.getBucketTable()));
if (getMetricsStorageFile().exists()) {
OmMetricsInfo metricsInfo = READER.readValue(getMetricsStorageFile());
metrics.setNumKeys(metricsInfo.getNumKeys());
}
// Schedule save metrics
long period = configuration.getTimeDuration(OZONE_OM_METRICS_SAVE_INTERVAL,
OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
scheduleOMMetricsWriteTask = new ScheduleOMMetricsWriteTask();
metricsTimer = new Timer();
metricsTimer.schedule(scheduleOMMetricsWriteTask, 0, period);
keyManager.start(configuration);
omRpcServer = getRpcServer(configuration);
omRpcServer.start();
isOmRpcServerRunning = true;
try {
httpServer = new OzoneManagerHttpServer(configuration, this);
httpServer.start();
@ -864,6 +884,51 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
setStartTime();
}
/**
* Creates a new instance of rpc server. If an earlier instance is already
* running then returns the same.
*/
private RPC.Server getRpcServer(OzoneConfiguration conf) throws IOException {
if (isOmRpcServerRunning) {
return omRpcServer;
}
InetSocketAddress omNodeRpcAddr = getOmAddress(configuration);
// This is a temporary check. Once fully implemented, all OM state change
// should go through Ratis - either standalone (for non-HA) or replicated
// (for HA).
boolean omRatisEnabled = configuration.getBoolean(
OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
if (omRatisEnabled) {
omRatisServer = OzoneManagerRatisServer.newOMRatisServer(this, omId,
omNodeRpcAddr.getAddress(), configuration);
omRatisServer.start();
LOG.info("OzoneManager Ratis server started at port {}",
omRatisServer.getServerPort());
omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(
omId, omRatisServer.getRaftGroup(), configuration);
omRatisClient.connect();
} else {
omRatisServer = null;
omRatisClient = null;
}
final int handlerCount = conf.getInt(OZONE_OM_HANDLER_COUNT_KEY,
OZONE_OM_HANDLER_COUNT_DEFAULT);
RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
BlockingService omService = newReflectiveBlockingService(
new OzoneManagerProtocolServerSideTranslatorPB(this, omRatisClient,
omRatisEnabled));
return startRpcServer(configuration, omNodeRpcAddr,
OzoneManagerProtocolPB.class, omService,
handlerCount);
}
/**
* Stop service.
*/
@ -879,6 +944,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
if (omRatisServer != null) {
omRatisServer.stop();
}
isOmRpcServerRunning = false;
keyManager.stop();
stopSecretManager();
httpServer.stop();