HBASE-21503 Replication normal source can get stuck due potential race conditions between source wal reader and wal provider initialization threads.
Found and analysed by Wellington Chevreuil
This commit is contained in:
parent
d590d6e472
commit
248b8a6f56
|
@ -24,8 +24,9 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -33,16 +34,18 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* Base class of a WAL Provider that returns a single thread safe WAL that writes to Hadoop FS. By
|
||||
|
@ -85,9 +88,10 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
|
|||
protected String logPrefix;
|
||||
|
||||
/**
|
||||
* we synchronized on walCreateLock to prevent wal recreation in different threads
|
||||
* We use walCreateLock to prevent wal recreation in different threads, and also prevent getWALs
|
||||
* missing the newly created WAL, see HBASE-21503 for more details.
|
||||
*/
|
||||
private final Object walCreateLock = new Object();
|
||||
private final ReadWriteLock walCreateLock = new ReentrantReadWriteLock();
|
||||
|
||||
/**
|
||||
* @param factory factory that made us, identity used for FS layout. may not be null
|
||||
|
@ -118,29 +122,39 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
|
|||
|
||||
@Override
|
||||
public List<WAL> getWALs() {
|
||||
if (wal == null) {
|
||||
return Collections.emptyList();
|
||||
if (wal != null) {
|
||||
return Lists.newArrayList(wal);
|
||||
}
|
||||
walCreateLock.readLock().lock();
|
||||
try {
|
||||
if (wal == null) {
|
||||
return Collections.emptyList();
|
||||
} else {
|
||||
return Lists.newArrayList(wal);
|
||||
}
|
||||
} finally {
|
||||
walCreateLock.readLock().unlock();
|
||||
}
|
||||
List<WAL> wals = new ArrayList<>(1);
|
||||
wals.add(wal);
|
||||
return wals;
|
||||
}
|
||||
|
||||
@Override
|
||||
public T getWAL(RegionInfo region) throws IOException {
|
||||
T walCopy = wal;
|
||||
if (walCopy == null) {
|
||||
// only lock when need to create wal, and need to lock since
|
||||
// creating hlog on fs is time consuming
|
||||
synchronized (walCreateLock) {
|
||||
walCopy = wal;
|
||||
if (walCopy == null) {
|
||||
walCopy = createWAL();
|
||||
wal = walCopy;
|
||||
}
|
||||
}
|
||||
if (walCopy != null) {
|
||||
return walCopy;
|
||||
}
|
||||
walCreateLock.writeLock().lock();
|
||||
try {
|
||||
walCopy = wal;
|
||||
if (walCopy != null) {
|
||||
return walCopy;
|
||||
}
|
||||
walCopy = createWAL();
|
||||
wal = walCopy;
|
||||
return walCopy;
|
||||
} finally {
|
||||
walCreateLock.writeLock().unlock();
|
||||
}
|
||||
return walCopy;
|
||||
}
|
||||
|
||||
protected abstract T createWAL() throws IOException;
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.Future;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||
|
||||
/**
|
||||
* Testcase for HBASE-21503.
|
||||
*/
|
||||
@Category({ RegionServerTests.class, SmallTests.class })
|
||||
public class TestRaceBetweenGetWALAndGetWALs {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRaceBetweenGetWALAndGetWALs.class);
|
||||
|
||||
private static Future<List<WAL>> GET_WALS_FUTURE;
|
||||
|
||||
private static final class FSWALProvider extends AbstractFSWALProvider<AbstractFSWAL<?>> {
|
||||
|
||||
@Override
|
||||
protected AbstractFSWAL<?> createWAL() throws IOException {
|
||||
// just like what may do in the WALListeners, schedule an asynchronous task to call the
|
||||
// getWALs method.
|
||||
GET_WALS_FUTURE = ForkJoinPool.commonPool().submit(this::getWALs);
|
||||
// sleep a while to make the getWALs arrive before we return
|
||||
Threads.sleep(2000);
|
||||
return Mockito.mock(AbstractFSWAL.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doInit(Configuration conf) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRace() throws IOException, InterruptedException, ExecutionException {
|
||||
FSWALProvider p = new FSWALProvider();
|
||||
WAL wal = p.getWAL(null);
|
||||
assertNotNull(GET_WALS_FUTURE);
|
||||
List<WAL> wals = GET_WALS_FUTURE.get();
|
||||
assertSame(wal, Iterables.getOnlyElement(wals));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue