HBASE-27780 FileChangeWatcher improvements (#5164)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
251e498477
commit
e4b4cef80e
|
@ -27,7 +27,6 @@ import java.nio.file.WatchKey;
|
||||||
import java.nio.file.WatchService;
|
import java.nio.file.WatchService;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.zookeeper.server.ZooKeeperThread;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -72,7 +71,8 @@ public final class FileChangeWatcher {
|
||||||
* relative to <code>dirPath</code>.
|
* relative to <code>dirPath</code>.
|
||||||
* @throws IOException if there is an error creating the WatchService.
|
* @throws IOException if there is an error creating the WatchService.
|
||||||
*/
|
*/
|
||||||
public FileChangeWatcher(Path dirPath, Consumer<WatchEvent<?>> callback) throws IOException {
|
public FileChangeWatcher(Path dirPath, String threadNameSuffix, Consumer<WatchEvent<?>> callback)
|
||||||
|
throws IOException {
|
||||||
FileSystem fs = dirPath.getFileSystem();
|
FileSystem fs = dirPath.getFileSystem();
|
||||||
WatchService watchService = fs.newWatchService();
|
WatchService watchService = fs.newWatchService();
|
||||||
|
|
||||||
|
@ -83,7 +83,7 @@ public final class FileChangeWatcher {
|
||||||
StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY,
|
StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY,
|
||||||
StandardWatchEventKinds.OVERFLOW });
|
StandardWatchEventKinds.OVERFLOW });
|
||||||
state = State.NEW;
|
state = State.NEW;
|
||||||
this.watcherThread = new WatcherThread(watchService, callback);
|
this.watcherThread = new WatcherThread(threadNameSuffix, watchService, callback);
|
||||||
this.watcherThread.setDaemon(true);
|
this.watcherThread.setDaemon(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,20 +172,30 @@ public final class FileChangeWatcher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String getWatcherThreadName() {
|
||||||
|
return watcherThread.getName();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void handleException(Thread thread, Throwable e) {
|
||||||
|
LOG.warn("Exception occurred from thread {}", thread.getName(), e);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Inner class that implements the watcher thread logic.
|
* Inner class that implements the watcher thread logic.
|
||||||
*/
|
*/
|
||||||
private class WatcherThread extends ZooKeeperThread {
|
private class WatcherThread extends Thread {
|
||||||
|
|
||||||
private static final String THREAD_NAME = "FileChangeWatcher";
|
private static final String THREAD_NAME_PREFIX = "FileChangeWatcher-";
|
||||||
|
|
||||||
final WatchService watchService;
|
final WatchService watchService;
|
||||||
final Consumer<WatchEvent<?>> callback;
|
final Consumer<WatchEvent<?>> callback;
|
||||||
|
|
||||||
WatcherThread(WatchService watchService, Consumer<WatchEvent<?>> callback) {
|
WatcherThread(String threadNameSuffix, WatchService watchService,
|
||||||
super(THREAD_NAME);
|
Consumer<WatchEvent<?>> callback) {
|
||||||
|
super(THREAD_NAME_PREFIX + threadNameSuffix);
|
||||||
this.watchService = watchService;
|
this.watchService = watchService;
|
||||||
this.callback = callback;
|
this.callback = callback;
|
||||||
|
setUncaughtExceptionHandler(FileChangeWatcher::handleException);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -74,11 +74,11 @@ public final class X509Util {
|
||||||
static final String CONFIG_PREFIX = "hbase.rpc.tls.";
|
static final String CONFIG_PREFIX = "hbase.rpc.tls.";
|
||||||
public static final String TLS_CONFIG_PROTOCOL = CONFIG_PREFIX + "protocol";
|
public static final String TLS_CONFIG_PROTOCOL = CONFIG_PREFIX + "protocol";
|
||||||
public static final String TLS_CONFIG_KEYSTORE_LOCATION = CONFIG_PREFIX + "keystore.location";
|
public static final String TLS_CONFIG_KEYSTORE_LOCATION = CONFIG_PREFIX + "keystore.location";
|
||||||
static final String TLS_CONFIG_KEYSTORE_TYPE = CONFIG_PREFIX + "keystore.type";
|
public static final String TLS_CONFIG_KEYSTORE_TYPE = CONFIG_PREFIX + "keystore.type";
|
||||||
static final String TLS_CONFIG_KEYSTORE_PASSWORD = CONFIG_PREFIX + "keystore.password";
|
public static final String TLS_CONFIG_KEYSTORE_PASSWORD = CONFIG_PREFIX + "keystore.password";
|
||||||
static final String TLS_CONFIG_TRUSTSTORE_LOCATION = CONFIG_PREFIX + "truststore.location";
|
public static final String TLS_CONFIG_TRUSTSTORE_LOCATION = CONFIG_PREFIX + "truststore.location";
|
||||||
static final String TLS_CONFIG_TRUSTSTORE_TYPE = CONFIG_PREFIX + "truststore.type";
|
public static final String TLS_CONFIG_TRUSTSTORE_TYPE = CONFIG_PREFIX + "truststore.type";
|
||||||
static final String TLS_CONFIG_TRUSTSTORE_PASSWORD = CONFIG_PREFIX + "truststore.password";
|
public static final String TLS_CONFIG_TRUSTSTORE_PASSWORD = CONFIG_PREFIX + "truststore.password";
|
||||||
public static final String TLS_CONFIG_CLR = CONFIG_PREFIX + "clr";
|
public static final String TLS_CONFIG_CLR = CONFIG_PREFIX + "clr";
|
||||||
public static final String TLS_CONFIG_OCSP = CONFIG_PREFIX + "ocsp";
|
public static final String TLS_CONFIG_OCSP = CONFIG_PREFIX + "ocsp";
|
||||||
public static final String TLS_CONFIG_REVERSE_DNS_LOOKUP_ENABLED =
|
public static final String TLS_CONFIG_REVERSE_DNS_LOOKUP_ENABLED =
|
||||||
|
@ -417,8 +417,12 @@ public final class X509Util {
|
||||||
String keyStoreLocation = config.get(TLS_CONFIG_KEYSTORE_LOCATION, "");
|
String keyStoreLocation = config.get(TLS_CONFIG_KEYSTORE_LOCATION, "");
|
||||||
keystoreWatcher.set(newFileChangeWatcher(keyStoreLocation, resetContext));
|
keystoreWatcher.set(newFileChangeWatcher(keyStoreLocation, resetContext));
|
||||||
String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, "");
|
String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, "");
|
||||||
|
// we are using the same callback for both. there's no reason to kick off two
|
||||||
|
// threads if keystore/truststore are both at the same location
|
||||||
|
if (!keyStoreLocation.equals(trustStoreLocation)) {
|
||||||
trustStoreWatcher.set(newFileChangeWatcher(trustStoreLocation, resetContext));
|
trustStoreWatcher.set(newFileChangeWatcher(trustStoreLocation, resetContext));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static FileChangeWatcher newFileChangeWatcher(String fileLocation, Runnable resetContext)
|
private static FileChangeWatcher newFileChangeWatcher(String fileLocation, Runnable resetContext)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -430,7 +434,8 @@ public final class X509Util {
|
||||||
if (parentPath == null) {
|
if (parentPath == null) {
|
||||||
throw new IOException("Key/trust store path does not have a parent: " + filePath);
|
throw new IOException("Key/trust store path does not have a parent: " + filePath);
|
||||||
}
|
}
|
||||||
FileChangeWatcher fileChangeWatcher = new FileChangeWatcher(parentPath, watchEvent -> {
|
FileChangeWatcher fileChangeWatcher =
|
||||||
|
new FileChangeWatcher(parentPath, Objects.toString(filePath.getFileName()), watchEvent -> {
|
||||||
handleWatchEvent(filePath, watchEvent, resetContext);
|
handleWatchEvent(filePath, watchEvent, resetContext);
|
||||||
});
|
});
|
||||||
fileChangeWatcher.start();
|
fileChangeWatcher.start();
|
||||||
|
|
|
@ -17,8 +17,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.io;
|
package org.apache.hadoop.hbase.io;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -29,11 +32,15 @@ import java.nio.file.WatchEvent;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
|
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
|
||||||
|
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
|
||||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.hamcrest.Matchers;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
|
@ -76,12 +83,43 @@ public class TestFileChangeWatcher {
|
||||||
UTIL.cleanupTestDir();
|
UTIL.cleanupTestDir();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEnableCertFileReloading() throws IOException {
|
||||||
|
Configuration myConf = new Configuration();
|
||||||
|
String sharedPath = "/tmp/foo.jks";
|
||||||
|
myConf.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION, sharedPath);
|
||||||
|
myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, sharedPath);
|
||||||
|
AtomicReference<FileChangeWatcher> keystoreWatcher = new AtomicReference<>();
|
||||||
|
AtomicReference<FileChangeWatcher> truststoreWatcher = new AtomicReference<>();
|
||||||
|
X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> {
|
||||||
|
});
|
||||||
|
assertNotNull(keystoreWatcher.get());
|
||||||
|
assertThat(keystoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("-foo.jks"));
|
||||||
|
assertNull(truststoreWatcher.get());
|
||||||
|
|
||||||
|
keystoreWatcher.getAndSet(null).stop();
|
||||||
|
truststoreWatcher.set(null);
|
||||||
|
|
||||||
|
String truststorePath = "/tmp/bar.jks";
|
||||||
|
myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, truststorePath);
|
||||||
|
X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> {
|
||||||
|
});
|
||||||
|
|
||||||
|
assertNotNull(keystoreWatcher.get());
|
||||||
|
assertThat(keystoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("-foo.jks"));
|
||||||
|
assertNotNull(truststoreWatcher.get());
|
||||||
|
assertThat(truststoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("-bar.jks"));
|
||||||
|
|
||||||
|
keystoreWatcher.getAndSet(null).stop();
|
||||||
|
truststoreWatcher.getAndSet(null).stop();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException {
|
public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException {
|
||||||
FileChangeWatcher watcher = null;
|
FileChangeWatcher watcher = null;
|
||||||
try {
|
try {
|
||||||
final List<WatchEvent<?>> events = new ArrayList<>();
|
final List<WatchEvent<?>> events = new ArrayList<>();
|
||||||
watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
|
watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
|
||||||
LOG.info("Got an update: {} {}", event.kind(), event.context());
|
LOG.info("Got an update: {} {}", event.kind(), event.context());
|
||||||
// Filter out the extra ENTRY_CREATE events that are
|
// Filter out the extra ENTRY_CREATE events that are
|
||||||
// sometimes seen at the start. Even though we create the watcher
|
// sometimes seen at the start. Even though we create the watcher
|
||||||
|
@ -124,7 +162,7 @@ public class TestFileChangeWatcher {
|
||||||
FileChangeWatcher watcher = null;
|
FileChangeWatcher watcher = null;
|
||||||
try {
|
try {
|
||||||
final List<WatchEvent<?>> events = new ArrayList<>();
|
final List<WatchEvent<?>> events = new ArrayList<>();
|
||||||
watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
|
watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
|
||||||
LOG.info("Got an update: {} {}", event.kind(), event.context());
|
LOG.info("Got an update: {} {}", event.kind(), event.context());
|
||||||
// Filter out the extra ENTRY_CREATE events that are
|
// Filter out the extra ENTRY_CREATE events that are
|
||||||
// sometimes seen at the start. Even though we create the watcher
|
// sometimes seen at the start. Even though we create the watcher
|
||||||
|
@ -164,7 +202,7 @@ public class TestFileChangeWatcher {
|
||||||
FileChangeWatcher watcher = null;
|
FileChangeWatcher watcher = null;
|
||||||
try {
|
try {
|
||||||
final List<WatchEvent<?>> events = new ArrayList<>();
|
final List<WatchEvent<?>> events = new ArrayList<>();
|
||||||
watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
|
watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
|
||||||
LOG.info("Got an update: {} {}", event.kind(), event.context());
|
LOG.info("Got an update: {} {}", event.kind(), event.context());
|
||||||
synchronized (events) {
|
synchronized (events) {
|
||||||
events.add(event);
|
events.add(event);
|
||||||
|
@ -198,7 +236,7 @@ public class TestFileChangeWatcher {
|
||||||
FileChangeWatcher watcher = null;
|
FileChangeWatcher watcher = null;
|
||||||
try {
|
try {
|
||||||
final List<WatchEvent<?>> events = new ArrayList<>();
|
final List<WatchEvent<?>> events = new ArrayList<>();
|
||||||
watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
|
watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
|
||||||
LOG.info("Got an update: {} {}", event.kind(), event.context());
|
LOG.info("Got an update: {} {}", event.kind(), event.context());
|
||||||
// Filter out the extra ENTRY_CREATE events that are
|
// Filter out the extra ENTRY_CREATE events that are
|
||||||
// sometimes seen at the start. Even though we create the watcher
|
// sometimes seen at the start. Even though we create the watcher
|
||||||
|
@ -238,7 +276,7 @@ public class TestFileChangeWatcher {
|
||||||
FileChangeWatcher watcher = null;
|
FileChangeWatcher watcher = null;
|
||||||
try {
|
try {
|
||||||
final AtomicInteger callCount = new AtomicInteger(0);
|
final AtomicInteger callCount = new AtomicInteger(0);
|
||||||
watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
|
watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
|
||||||
LOG.info("Got an update: {} {}", event.kind(), event.context());
|
LOG.info("Got an update: {} {}", event.kind(), event.context());
|
||||||
int oldValue;
|
int oldValue;
|
||||||
synchronized (callCount) {
|
synchronized (callCount) {
|
||||||
|
|
Loading…
Reference in New Issue