HBASE-25216 The client zk syncer should deal with meta replica count change (#2614)

Signed-off-by: Yu Li <liyu@apache.org>
This commit is contained in:
Duo Zhang 2020-11-04 17:54:18 +08:00 committed by GitHub
parent f37cd05c32
commit 49774c7e18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 245 additions and 103 deletions

View File

@ -322,8 +322,9 @@ public class HMaster extends HRegionServer implements MasterServices {
// Tracker for load balancer state
LoadBalancerTracker loadBalancerTracker;
// Tracker for meta location, if any client ZK quorum specified
MetaLocationSyncer metaLocationSyncer;
private MetaLocationSyncer metaLocationSyncer;
// Tracker for active master location, if any client ZK quorum specified
@VisibleForTesting
MasterAddressSyncer masterAddressSyncer;
// Tracker for auto snapshot cleanup state
SnapshotCleanupTracker snapshotCleanupTracker;
@ -3852,4 +3853,9 @@ public class HMaster extends HRegionServer implements MasterServices {
}
return compactionState;
}
@Override
public MetaLocationSyncer getMetaLocationSyncer() {
return metaLocationSyncer;
}
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.procedure2.LockedResource;
import org.apache.hadoop.hbase.procedure2.Procedure;
@ -570,4 +571,11 @@ public interface MasterServices extends Server {
*/
boolean normalizeRegions(
final NormalizeTableFilterParams ntfp, final boolean isHighPriority) throws IOException;
/**
* Get the meta location syncer.
* <p/>
* We need to get this in MTP to tell the syncer the new meta replica count.
*/
MetaLocationSyncer getMetaLocationSyncer();
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
@ -157,6 +158,12 @@ public class ModifyTableProcedure
break;
case MODIFY_TABLE_ASSIGN_NEW_REPLICAS:
assignNewReplicasIfNeeded(env);
if (TableName.isMetaTableName(getTableName())) {
MetaLocationSyncer syncer = env.getMasterServices().getMetaLocationSyncer();
if (syncer != null) {
syncer.setMetaReplicaCount(modifiedTableDescriptor.getRegionReplication());
}
}
if (deleteColumnFamilyInModify) {
setNextState(ModifyTableState.MODIFY_TABLE_DELETE_FS_LAYOUT);
} else {

View File

@ -19,12 +19,11 @@
package org.apache.hadoop.hbase.master.zksyncer;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.util.Threads;
@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,22 +40,68 @@ import org.slf4j.LoggerFactory;
* Tracks the target znode(s) on server ZK cluster and synchronize them to client ZK cluster if
* changed
* <p/>
* The target znode(s) is given through {@link #getNodesToWatch()} method
* The target znode(s) is given through {@link #getPathsToWatch()} method
*/
@InterfaceAudience.Private
public abstract class ClientZKSyncer extends ZKListener {
private static final Logger LOG = LoggerFactory.getLogger(ClientZKSyncer.class);
private final Server server;
private final ZKWatcher clientZkWatcher;
/**
* Used to store the newest data which we want to sync to client zk.
* <p/>
* For meta location, since we may reduce the replica number, so here we add a {@code delete} flag
* to tell the updater delete the znode on client zk and quit.
*/
private static final class ZKData {
byte[] data;
boolean delete = false;
synchronized void set(byte[] data) {
this.data = data;
notifyAll();
}
synchronized byte[] get() throws InterruptedException {
while (!delete && data == null) {
wait();
}
byte[] d = data;
data = null;
return d;
}
synchronized void delete() {
this.delete = true;
notifyAll();
}
synchronized boolean isDeleted() {
return delete;
}
}
// We use queues and daemon threads to synchronize the data to client ZK cluster
// to avoid blocking the single event thread for watchers
private final Map<String, BlockingQueue<byte[]>> queues;
private final ConcurrentMap<String, ZKData> queues;
public ClientZKSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) {
super(watcher);
this.server = server;
this.clientZkWatcher = clientZkWatcher;
this.queues = new HashMap<>();
this.queues = new ConcurrentHashMap<>();
}
private void startNewSyncThread(String path) {
ZKData zkData = new ZKData();
queues.put(path, zkData);
Thread updater = new ClientZkUpdater(path, zkData);
updater.setDaemon(true);
updater.start();
watchAndCheckExists(path);
}
/**
@ -69,17 +113,12 @@ public abstract class ClientZKSyncer extends ZKListener {
this.watcher.registerListener(this);
// create base znode on remote ZK
ZKUtil.createWithParents(clientZkWatcher, watcher.getZNodePaths().baseZNode);
// set meta znodes for client ZK
Collection<String> nodes = getNodesToWatch();
LOG.debug("Znodes to watch: " + nodes);
// set znodes for client ZK
Set<String> paths = getPathsToWatch();
LOG.debug("ZNodes to watch: {}", paths);
// initialize queues and threads
for (String node : nodes) {
BlockingQueue<byte[]> queue = new ArrayBlockingQueue<>(1);
queues.put(node, queue);
Thread updater = new ClientZkUpdater(node, queue);
updater.setDaemon(true);
updater.start();
watchAndCheckExists(node);
for (String path : paths) {
startNewSyncThread(path);
}
}
@ -112,10 +151,9 @@ public abstract class ClientZKSyncer extends ZKListener {
* @param data the data to write to queue
*/
private void upsertQueue(String node, byte[] data) {
BlockingQueue<byte[]> queue = queues.get(node);
synchronized (queue) {
queue.poll();
queue.offer(data);
ZKData zkData = queues.get(node);
if (zkData != null) {
zkData.set(data);
}
}
@ -126,35 +164,49 @@ public abstract class ClientZKSyncer extends ZKListener {
* @param data the data to set to client ZK
* @throws InterruptedException if the thread is interrupted during process
*/
private final void setDataForClientZkUntilSuccess(String node, byte[] data)
private void setDataForClientZkUntilSuccess(String node, byte[] data)
throws InterruptedException {
boolean create = false;
while (!server.isStopped()) {
try {
LOG.debug("Set data for remote " + node + ", client zk wather: " + clientZkWatcher);
ZKUtil.setData(clientZkWatcher, node, data);
break;
} catch (KeeperException.NoNodeException nne) {
// Node doesn't exist, create it and set value
try {
if (create) {
ZKUtil.createNodeIfNotExistsNoWatch(clientZkWatcher, node, data, CreateMode.PERSISTENT);
break;
} catch (KeeperException.ConnectionLossException
| KeeperException.SessionExpiredException ee) {
reconnectAfterExpiration();
} catch (KeeperException e) {
LOG.warn(
"Failed to create znode " + node + " due to: " + e.getMessage() + ", will retry later");
} else {
ZKUtil.setData(clientZkWatcher, node, data);
}
} catch (KeeperException.ConnectionLossException
| KeeperException.SessionExpiredException ee) {
reconnectAfterExpiration();
break;
} catch (KeeperException e) {
LOG.debug("Failed to set data to client ZK, will retry later", e);
LOG.debug("Failed to set data for {} to client ZK, will retry later", node, e);
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
reconnectAfterExpiration();
}
if (e.code() == KeeperException.Code.NONODE) {
create = true;
}
if (e.code() == KeeperException.Code.NODEEXISTS) {
create = false;
}
}
Threads.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
}
}
private void deleteDataForClientZkUntilSuccess(String node) throws InterruptedException {
while (!server.isStopped()) {
LOG.debug("Delete remote " + node + ", client zk wather: " + clientZkWatcher);
try {
ZKUtil.deleteNode(clientZkWatcher, node);
} catch (KeeperException e) {
LOG.debug("Failed to delete node from client ZK, will retry later", e);
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
reconnectAfterExpiration();
}
}
}
}
private final void reconnectAfterExpiration() throws InterruptedException {
LOG.warn("ZK session expired or lost. Retry a new connection...");
try {
@ -164,11 +216,7 @@ public abstract class ClientZKSyncer extends ZKListener {
}
}
@Override
public void nodeCreated(String path) {
if (!validate(path)) {
return;
}
private void getDataAndWatch(String path) {
try {
byte[] data = ZKUtil.getDataAndWatch(watcher, path);
upsertQueue(path, data);
@ -177,23 +225,39 @@ public abstract class ClientZKSyncer extends ZKListener {
}
}
private void removeQueue(String path) {
ZKData zkData = queues.remove(path);
if (zkData != null) {
zkData.delete();
}
}
@Override
public void nodeCreated(String path) {
if (validate(path)) {
getDataAndWatch(path);
} else {
removeQueue(path);
}
}
@Override
public void nodeDataChanged(String path) {
if (validate(path)) {
nodeCreated(path);
}
}
@Override
public synchronized void nodeDeleted(String path) {
if (validate(path)) {
try {
if (ZKUtil.watchAndCheckExists(watcher, path)) {
nodeCreated(path);
getDataAndWatch(path);
}
} catch (KeeperException e) {
LOG.warn("Unexpected exception handling nodeDeleted event for path: " + path, e);
}
} else {
removeQueue(path);
}
}
@ -202,41 +266,67 @@ public abstract class ClientZKSyncer extends ZKListener {
* @param path the path to validate
* @return true if the znode is watched by us
*/
abstract boolean validate(String path);
protected abstract boolean validate(String path);
/**
* @return the znode(s) to watch
* @return the zk path(s) to watch
*/
abstract Collection<String> getNodesToWatch() throws KeeperException;
protected abstract Set<String> getPathsToWatch();
protected final void refreshWatchingList() {
Set<String> newPaths = getPathsToWatch();
LOG.debug("New ZNodes to watch: {}", newPaths);
Iterator<Map.Entry<String, ZKData>> iter = queues.entrySet().iterator();
// stop unused syncers
while (iter.hasNext()) {
Map.Entry<String, ZKData> entry = iter.next();
if (!newPaths.contains(entry.getKey())) {
iter.remove();
entry.getValue().delete();
}
}
// start new syncers
for (String newPath : newPaths) {
if (!queues.containsKey(newPath)) {
startNewSyncThread(newPath);
}
}
}
/**
* Thread to synchronize znode data to client ZK cluster
*/
class ClientZkUpdater extends Thread {
final String znode;
final BlockingQueue<byte[]> queue;
private final class ClientZkUpdater extends Thread {
private final String znode;
private final ZKData zkData;
public ClientZkUpdater(String znode, BlockingQueue<byte[]> queue) {
public ClientZkUpdater(String znode, ZKData zkData) {
this.znode = znode;
this.queue = queue;
this.zkData = zkData;
setName("ClientZKUpdater-" + znode);
}
@Override
public void run() {
LOG.debug("Client zk updater for znode {} started", znode);
while (!server.isStopped()) {
try {
byte[] data = queue.take();
byte[] data = zkData.get();
if (data != null) {
setDataForClientZkUntilSuccess(znode, data);
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Interrupted while checking whether need to update meta location to client zk");
} else {
if (zkData.isDeleted()) {
deleteDataForClientZkUntilSuccess(znode);
break;
}
}
} catch (InterruptedException e) {
LOG.debug("Interrupted while checking whether need to update meta location to client zk");
Thread.currentThread().interrupt();
break;
}
}
LOG.debug("Client zk updater for znode {} stopped", znode);
}
}
}

View File

@ -18,9 +18,8 @@
*/
package org.apache.hadoop.hbase.master.zksyncer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@ -39,14 +38,12 @@ public class MasterAddressSyncer extends ClientZKSyncer {
}
@Override
boolean validate(String path) {
protected boolean validate(String path) {
return path.equals(masterAddressZNode);
}
@Override
Collection<String> getNodesToWatch() {
ArrayList<String> toReturn = new ArrayList<>();
toReturn.add(masterAddressZNode);
return toReturn;
protected Set<String> getPathsToWatch() {
return Collections.singleton(masterAddressZNode);
}
}

View File

@ -18,13 +18,12 @@
*/
package org.apache.hadoop.hbase.master.zksyncer;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
/**
* Tracks the meta region locations on server ZK cluster and synchronize them to client ZK cluster
@ -32,19 +31,28 @@ import org.apache.zookeeper.KeeperException;
*/
@InterfaceAudience.Private
public class MetaLocationSyncer extends ClientZKSyncer {
private volatile int metaReplicaCount = 1;
public MetaLocationSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) {
super(watcher, clientZkWatcher, server);
}
@Override
boolean validate(String path) {
protected boolean validate(String path) {
return watcher.getZNodePaths().isMetaZNodePath(path);
}
@Override
Collection<String> getNodesToWatch() throws KeeperException {
return watcher.getMetaReplicaNodes().stream()
.map(znode -> ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode))
.collect(Collectors.toList());
protected Set<String> getPathsToWatch() {
return IntStream.range(0, metaReplicaCount)
.mapToObj(watcher.getZNodePaths()::getZNodeForReplica).collect(Collectors.toSet());
}
public void setMetaReplicaCount(int replicaCount) {
if (replicaCount != metaReplicaCount) {
metaReplicaCount = replicaCount;
refreshWatchingList();
}
}
}

View File

@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -26,6 +30,7 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNameTestRule;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@ -35,13 +40,11 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -61,7 +64,7 @@ public class TestSeparateClientZKCluster {
private final byte[] newVal = Bytes.toBytes("v2");
@Rule
public TestName name = new TestName();
public TableNameTestRule name = new TableNameTestRule();
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
@ -78,13 +81,15 @@ public class TestSeparateClientZKCluster {
TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", -1);
TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1);
// core settings for testing client ZK cluster
TEST_UTIL.getConfiguration().setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
ZKConnectionRegistry.class, ConnectionRegistry.class);
TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, clientZkPort);
// reduce zk session timeout to easier trigger session expiration
TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, ZK_SESSION_TIMEOUT);
// Start a cluster with 2 masters and 3 regionservers.
StartMiniClusterOption option = StartMiniClusterOption.builder()
.numMasters(2).numRegionServers(3).numDataNodes(3).build();
StartMiniClusterOption option =
StartMiniClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build();
TEST_UTIL.startMiniCluster(option);
}
@ -97,7 +102,7 @@ public class TestSeparateClientZKCluster {
@Test
public void testBasicOperation() throws Exception {
TableName tn = TableName.valueOf(name.getMethodName());
TableName tn = name.getTableName();
// create table
Connection conn = TEST_UTIL.getConnection();
try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn)) {
@ -113,7 +118,7 @@ public class TestSeparateClientZKCluster {
Get get = new Get(row);
Result result = table.get(get);
LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
Assert.assertArrayEquals(value, result.getValue(family, qualifier));
assertArrayEquals(value, result.getValue(family, qualifier));
}
}
@ -133,19 +138,19 @@ public class TestSeparateClientZKCluster {
}
LOG.info("Shutdown master {}", master.getServerName());
while (cluster.getMaster() == null || !cluster.getMaster().isInitialized()) {
LOG.info("Get master {}", cluster.getMaster() == null? "null":
cluster.getMaster().getServerName());
LOG.info("Get master {}",
cluster.getMaster() == null ? "null" : cluster.getMaster().getServerName());
Thread.sleep(200);
}
LOG.info("Got master {}", cluster.getMaster().getServerName());
// confirm client access still works
Assert.assertTrue(admin.balance(false));
assertTrue(admin.balance(false));
}
}
@Test
public void testMetaRegionMove() throws Exception {
TableName tn = TableName.valueOf(name.getMethodName());
TableName tn = name.getTableName();
// create table
Connection conn = TEST_UTIL.getConnection();
try (Admin admin = conn.getAdmin();
@ -191,13 +196,13 @@ public class TestSeparateClientZKCluster {
table.put(put);
result = table.get(get);
LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
Assert.assertArrayEquals(newVal, result.getValue(family, qualifier));
assertArrayEquals(newVal, result.getValue(family, qualifier));
}
}
@Test
public void testMetaMoveDuringClientZkClusterRestart() throws Exception {
TableName tn = TableName.valueOf(name.getMethodName());
TableName tn = name.getTableName();
// create table
Connection conn = TEST_UTIL.getConnection();
try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn)) {
@ -233,13 +238,13 @@ public class TestSeparateClientZKCluster {
Get get = new Get(row);
Result result = table.get(get);
LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
Assert.assertArrayEquals(value, result.getValue(family, qualifier));
assertArrayEquals(value, result.getValue(family, qualifier));
}
}
@Test
public void testAsyncTable() throws Exception {
TableName tn = TableName.valueOf(name.getMethodName());
TableName tn = name.getTableName();
ColumnFamilyDescriptorBuilder cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
TableDescriptorBuilder tableDescBuilder =
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
@ -255,7 +260,22 @@ public class TestSeparateClientZKCluster {
Get get = new Get(row);
Result result = table.get(get).get();
LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
Assert.assertArrayEquals(value, result.getValue(family, qualifier));
assertArrayEquals(value, result.getValue(family, qualifier));
}
}
@Test
public void testChangeMetaReplicaCount() throws Exception {
Admin admin = TEST_UTIL.getAdmin();
try (RegionLocator locator =
TEST_UTIL.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
assertEquals(1, locator.getAllRegionLocations().size());
HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, 3);
TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 3);
HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, 2);
TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 2);
HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, 1);
TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 1);
}
}
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.procedure2.LockedResource;
import org.apache.hadoop.hbase.procedure2.Procedure;
@ -514,4 +515,9 @@ public class MockNoopMasterServices implements MasterServices {
public boolean normalizeRegions(NormalizeTableFilterParams ntfp, boolean isHighPriority) {
return false;
}
@Override
public MetaLocationSyncer getMetaLocationSyncer() {
return null;
}
}

View File

@ -160,7 +160,7 @@ public class TestMasterNoCluster {
while (!master.isInitialized()) {
Threads.sleep(200);
}
Assert.assertNull(master.metaLocationSyncer);
Assert.assertNull(master.getMetaLocationSyncer());
Assert.assertNull(master.masterAddressSyncer);
master.stopMaster();
master.join();