From 9621bafab63046d9261848428d9b45d820a59c28 Mon Sep 17 00:00:00 2001 From: BukrosSzabolcs Date: Wed, 15 Jan 2020 21:59:58 +0100 Subject: [PATCH] HBASE-23601 OutputSink.WriterThread exception gets stuck and repeated indefinietly (#1028) --- .../RegionReplicaReplicationEndpoint.java | 1 + .../apache/hadoop/hbase/wal/OutputSink.java | 20 ++- .../apache/hadoop/hbase/wal/WALSplitter.java | 1 + .../hbase/wal/TestOutputSinkWriter.java | 130 ++++++++++++++++++ 4 files changed, 151 insertions(+), 1 deletion(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestOutputSinkWriter.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index 60f693ae279..cead808e124 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -249,6 +249,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { } catch (IOException e) { LOG.warn("Received IOException while trying to replicate" + StringUtils.stringifyException(e)); + outputSink.restartWriterThreadsIfNeeded(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java index b58913489b8..de62c4d5b0d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java @@ -89,6 +89,19 @@ public abstract class OutputSink { } } + public synchronized void restartWriterThreadsIfNeeded() { + for(int i = 0; i< writerThreads.size(); i++){ + WriterThread t = writerThreads.get(i); + if (!t.isAlive()){ + String threadName = t.getName(); + LOG.debug("Replacing dead thread: " + threadName); + WriterThread newThread = new WriterThread(controller, entryBuffers, this, threadName); + newThread.start(); + writerThreads.set(i, newThread); + } + } + } + /** * Wait for writer threads to dump all info to the sink * @@ -164,7 +177,12 @@ public abstract class OutputSink { WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i) { - super(Thread.currentThread().getName() + "-Writer-" + i); + this(controller, entryBuffers, sink, Thread.currentThread().getName() + "-Writer-" + i); + } + + WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, + OutputSink sink, String threadName) { + super(threadName); this.controller = controller; this.entryBuffers = entryBuffers; outputSink = sink; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index d7bbd072c37..6b75f1d9185 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -475,6 +475,7 @@ public class WALSplitter { if (thrown == null) { return; } + this.thrown.set(null); if (thrown instanceof IOException) { throw new IOException(thrown); } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestOutputSinkWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestOutputSinkWriter.java new file mode 100644 index 00000000000..f72623767c4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestOutputSinkWriter.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestOutputSinkWriter { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass( + TestOutputSinkWriter.class); + + @Test + public void testExeptionHandling() throws IOException, InterruptedException { + WALSplitter.PipelineController controller = new WALSplitter.PipelineController(); + BrokenEntryBuffers entryBuffers = new BrokenEntryBuffers(controller, 2000); + OutputSink sink = new OutputSink(controller, entryBuffers, 1) { + + @Override protected int getNumOpenWriters() { + return 0; + } + + @Override protected void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException { + + } + + @Override protected List close() throws IOException { + return null; + } + + @Override public Map getOutputCounts() { + return null; + } + + @Override public int getNumberOfRecoveredRegions() { + return 0; + } + + @Override public boolean keepRegionEvent(WAL.Entry entry) { + return false; + } + }; + + //start the Writer thread and give it time trow the exception + sink.startWriterThreads(); + Thread.sleep(1000L); + + //make sure the exception is stored + try { + controller.checkForErrors(); + Assert.fail(); + } + catch (RuntimeException re){ + Assert.assertTrue(true); + } + + sink.restartWriterThreadsIfNeeded(); + + //after the check the stored exception should be gone + try { + controller.checkForErrors(); + } + catch (RuntimeException re){ + Assert.fail(); + } + + //prep another exception and wait for it to be thrown + entryBuffers.setThrowError(true); + Thread.sleep(1000L); + + //make sure the exception is stored + try { + controller.checkForErrors(); + Assert.fail(); + } + catch (RuntimeException re){ + Assert.assertTrue(true); + } + } + + static class BrokenEntryBuffers extends EntryBuffers{ + boolean throwError = true; + + public BrokenEntryBuffers(WALSplitter.PipelineController controller, long maxHeapUsage) { + super(controller, maxHeapUsage); + } + + @Override + synchronized EntryBuffers.RegionEntryBuffer getChunkToWrite() { + //This just emulates something going wrong with in the Writer + if(throwError){ + throwError = false; + throw new RuntimeException("testing"); + } + return null; + } + + public void setThrowError(boolean newValue){ + throwError = newValue; + } + }; +}