HBASE-19083 Introduce a new log writer which can write to two HDFSes

This commit is contained in:
zhangduo 2018-01-11 21:08:02 +08:00
parent 65d84df005
commit b3dea0378e
9 changed files with 533 additions and 143 deletions

View File

@ -607,10 +607,14 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException {
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize,
eventLoopGroup, channelClass);
}
@Override
protected AsyncWriter createWriterInstance(Path path) throws IOException {
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false,
this.blocksize, eventLoopGroup, channelClass);
return createAsyncWriter(fs, path);
}
private void waitForSafePoint() {
@ -632,13 +636,12 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
private long closeWriter() {
AsyncWriter oldWriter = this.writer;
if (oldWriter != null) {
long fileLength = oldWriter.getLength();
protected final long closeWriter(AsyncWriter writer) {
if (writer != null) {
long fileLength = writer.getLength();
closeExecutor.execute(() -> {
try {
oldWriter.close();
writer.close();
} catch (IOException e) {
LOG.warn("close old writer failed", e);
}
@ -654,7 +657,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
throws IOException {
Preconditions.checkNotNull(nextWriter);
waitForSafePoint();
long oldFileLen = closeWriter();
long oldFileLen = closeWriter(this.writer);
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
this.writer = nextWriter;
if (nextWriter instanceof AsyncProtobufLogWriter) {
@ -679,7 +682,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
@Override
protected void doShutdown() throws IOException {
waitForSafePoint();
closeWriter();
closeWriter(this.writer);
closeExecutor.shutdown();
try {
if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {

View File

@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
/**
* An {@link AsyncWriter} wrapper which writes data to a set of {@link AsyncWriter} instances.
*/
@InterfaceAudience.Private
public abstract class CombinedAsyncWriter implements AsyncWriter {
private static final Logger LOG = LoggerFactory.getLogger(CombinedAsyncWriter.class);
protected final ImmutableList<AsyncWriter> writers;
protected CombinedAsyncWriter(ImmutableList<AsyncWriter> writers) {
this.writers = writers;
}
@Override
public long getLength() {
return writers.get(0).getLength();
}
@Override
public void close() throws IOException {
Exception error = null;
for (AsyncWriter writer : writers) {
try {
writer.close();
} catch (Exception e) {
LOG.warn("close writer failed", e);
if (error == null) {
error = e;
}
}
}
if (error != null) {
throw new IOException("Failed to close at least one writer, please see the warn log above. " +
"The cause is the first exception occured", error);
}
}
protected abstract void doSync(CompletableFuture<Long> future);
@Override
public CompletableFuture<Long> sync() {
CompletableFuture<Long> future = new CompletableFuture<>();
doSync(future);
return future;
}
@Override
public void append(Entry entry) {
writers.forEach(w -> w.append(entry));
}
public enum Mode {
SEQUENTIAL, PARALLEL
}
public static CombinedAsyncWriter create(Mode mode, AsyncWriter writer, AsyncWriter... writers) {
ImmutableList<AsyncWriter> ws =
ImmutableList.<AsyncWriter> builder().add(writer).add(writers).build();
switch (mode) {
case SEQUENTIAL:
return new CombinedAsyncWriter(ws) {
private void doSync(CompletableFuture<Long> future, Long length, int index) {
if (index == writers.size()) {
future.complete(length);
return;
}
writers.get(index).sync().whenComplete((len, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
doSync(future, len, index + 1);
});
}
@Override
protected void doSync(CompletableFuture<Long> future) {
doSync(future, null, 0);
}
};
case PARALLEL:
return new CombinedAsyncWriter(ws) {
@Override
protected void doSync(CompletableFuture<Long> future) {
AtomicInteger remaining = new AtomicInteger(writers.size());
writers.forEach(w -> w.sync().whenComplete((length, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
if (remaining.decrementAndGet() == 0) {
future.complete(length);
}
}));
}
};
default:
throw new IllegalArgumentException("Unknown mode: " + mode);
}
}
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
/**
* An AsyncFSWAL which writes data to two filesystems.
*/
@InterfaceAudience.Private
public class DualAsyncFSWAL extends AsyncFSWAL {
private final FileSystem remoteFs;
private final Path remoteWalDir;
public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteRootDir,
String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners,
boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
eventLoopGroup, channelClass);
this.remoteFs = remoteFs;
this.remoteWalDir = new Path(remoteRootDir, logDir);
}
@Override
protected AsyncWriter createWriterInstance(Path path) throws IOException {
AsyncWriter localWriter = super.createWriterInstance(path);
AsyncWriter remoteWriter;
boolean succ = false;
try {
remoteWriter = createAsyncWriter(remoteFs, new Path(remoteWalDir, path.getName()));
succ = true;
} finally {
if (!succ) {
closeWriter(localWriter);
}
}
return CombinedAsyncWriter.create(CombinedAsyncWriter.Mode.SEQUENTIAL, remoteWriter,
localWriter);
}
}

View File

@ -18,33 +18,15 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -56,8 +38,8 @@ import org.junit.rules.TestName;
/**
* WAL tests that can be reused across providers.
*/
public abstract class AbstractTestProtobufLog<W extends Closeable> {
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
public abstract class AbstractTestProtobufLog {
protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected FileSystem fs;
protected Path dir;
@ -93,14 +75,7 @@ public abstract class AbstractTestProtobufLog<W extends Closeable> {
TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
// faster failover with cluster.shutdown();fs.close() idiom
TEST_UTIL.getConfiguration()
.setInt("hbase.ipc.client.connect.max.retries", 1);
TEST_UTIL.getConfiguration().setInt(
"dfs.client.block.recovery.retries", 1);
TEST_UTIL.getConfiguration().setInt(
"hbase.ipc.client.connection.maxidletime", 500);
TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
SampleRegionWALCoprocessor.class.getName());
TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1);
TEST_UTIL.startMiniDFSCluster(3);
}
@ -131,77 +106,24 @@ public abstract class AbstractTestProtobufLog<W extends Closeable> {
* @throws IOException
*/
private void doRead(boolean withTrailer) throws IOException {
final int columnCount = 5;
final int recordCount = 5;
final TableName tableName =
TableName.valueOf("tablename");
final byte[] row = Bytes.toBytes("row");
int columnCount = 5;
int recordCount = 5;
TableName tableName = TableName.valueOf("tablename");
byte[] row = Bytes.toBytes("row");
long timestamp = System.currentTimeMillis();
Path path = new Path(dir, "tempwal");
// delete the log if already exists, for test only
fs.delete(path, true);
W writer = null;
ProtobufLogReader reader = null;
try {
HRegionInfo hri = new HRegionInfo(tableName,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
HTableDescriptor htd = new HTableDescriptor(tableName);
fs.mkdirs(dir);
// Write log in pb format.
writer = createWriter(path);
for (int i = 0; i < recordCount; ++i) {
WALKeyImpl key = new WALKeyImpl(
hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
WALEdit edit = new WALEdit();
for (int j = 0; j < columnCount; ++j) {
if (i == 0) {
htd.addFamily(new HColumnDescriptor("column" + j));
}
String value = i + "" + j;
edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
}
append(writer, new WAL.Entry(key, edit));
}
sync(writer);
if (withTrailer) writer.close();
// Now read the log using standard means.
reader = (ProtobufLogReader) wals.createReader(fs, path);
if (withTrailer) {
assertNotNull(reader.trailer);
} else {
assertNull(reader.trailer);
}
for (int i = 0; i < recordCount; ++i) {
WAL.Entry entry = reader.next();
assertNotNull(entry);
assertEquals(columnCount, entry.getEdit().size());
assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
assertEquals(tableName, entry.getKey().getTableName());
int idx = 0;
for (Cell val : entry.getEdit().getCells()) {
assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(),
val.getRowLength()));
String value = i + "" + idx;
assertArrayEquals(Bytes.toBytes(value), CellUtil.cloneValue(val));
idx++;
}
}
WAL.Entry entry = reader.next();
assertNull(entry);
} finally {
if (writer != null) {
writer.close();
}
if (reader != null) {
reader.close();
fs.mkdirs(dir);
try (WALProvider.Writer writer = createWriter(path)) {
ProtobufLogTestHelper.doWrite(writer, withTrailer, tableName, columnCount, recordCount, row,
timestamp);
try (ProtobufLogReader reader = (ProtobufLogReader) wals.createReader(fs, path)) {
ProtobufLogTestHelper.doRead(reader, withTrailer, tableName, columnCount, recordCount, row,
timestamp);
}
}
}
protected abstract W createWriter(Path path) throws IOException;
protected abstract void append(W writer, WAL.Entry entry) throws IOException;
protected abstract void sync(W writer) throws IOException;
protected abstract WALProvider.Writer createWriter(Path path) throws IOException;
}

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
/**
* Helper class for testing protobuf log.
*/
final class ProtobufLogTestHelper {
private ProtobufLogTestHelper() {
}
private static byte[] toValue(int prefix, int suffix) {
return Bytes.toBytes(prefix + "-" + suffix);
}
private static RegionInfo toRegionInfo(TableName tableName) {
return RegionInfoBuilder.newBuilder(tableName).setRegionId(1024).build();
}
public static void doWrite(WALProvider.Writer writer, boolean withTrailer, TableName tableName,
int columnCount, int recordCount, byte[] row, long timestamp) throws IOException {
RegionInfo hri = toRegionInfo(tableName);
for (int i = 0; i < recordCount; i++) {
WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, i, timestamp,
HConstants.DEFAULT_CLUSTER_ID);
WALEdit edit = new WALEdit();
int prefix = i;
IntStream.range(0, columnCount).mapToObj(j -> toValue(prefix, j))
.map(value -> new KeyValue(row, row, row, timestamp, value)).forEachOrdered(edit::add);
writer.append(new WAL.Entry(key, edit));
}
writer.sync(false);
if (withTrailer) {
writer.close();
}
}
public static void doRead(ProtobufLogReader reader, boolean withTrailer, TableName tableName,
int columnCount, int recordCount, byte[] row, long timestamp) throws IOException {
if (withTrailer) {
assertNotNull(reader.trailer);
} else {
assertNull(reader.trailer);
}
RegionInfo hri = toRegionInfo(tableName);
for (int i = 0; i < recordCount; ++i) {
WAL.Entry entry = reader.next();
assertNotNull(entry);
assertEquals(columnCount, entry.getEdit().size());
assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
assertEquals(tableName, entry.getKey().getTableName());
int idx = 0;
for (Cell val : entry.getEdit().getCells()) {
assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(),
val.getRowLength()));
assertArrayEquals(toValue(i, idx), CellUtil.cloneValue(val));
idx++;
}
}
assertNull(reader.next());
}
}

View File

@ -18,29 +18,24 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
@Category({ RegionServerTests.class, MediumTests.class })
public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.AsyncWriter> {
public class TestAsyncProtobufLog extends AbstractTestProtobufLog {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
@ -64,25 +59,8 @@ public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.As
}
@Override
protected AsyncWriter createWriter(Path path) throws IOException {
return AsyncFSWALProvider.createAsyncWriter(TEST_UTIL.getConfiguration(), fs, path, false,
EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
}
@Override
protected void append(AsyncWriter writer, Entry entry) throws IOException {
writer.append(entry);
}
@Override
protected void sync(AsyncWriter writer) throws IOException {
try {
writer.sync().get();
} catch (InterruptedException e) {
throw new InterruptedIOException();
} catch (ExecutionException e) {
Throwables.propagateIfPossible(e.getCause());
throw new IOException(e.getCause());
}
protected Writer createWriter(Path path) throws IOException {
return new WriterOverAsyncWriter(AsyncFSWALProvider.createAsyncWriter(
TEST_UTIL.getConfiguration(), fs, path, false, EVENT_LOOP_GROUP.next(), CHANNEL_CLASS));
}
}

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
@RunWith(Parameterized.class)
@Category({ RegionServerTests.class, MediumTests.class })
public class TestCombinedAsyncWriter {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static EventLoopGroup EVENT_LOOP_GROUP;
private static Class<? extends Channel> CHANNEL_CLASS;
private static WALFactory WALS;
@Rule
public final TestName name = new TestName();
@Parameter
public CombinedAsyncWriter.Mode mode;
@Parameters(name = "{index}: mode={0}")
public static List<Object[]> params() {
return Arrays.asList(new Object[] { CombinedAsyncWriter.Mode.SEQUENTIAL },
new Object[] { CombinedAsyncWriter.Mode.PARALLEL });
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
EVENT_LOOP_GROUP = new NioEventLoopGroup();
CHANNEL_CLASS = NioSocketChannel.class;
UTIL.startMiniDFSCluster(3);
UTIL.getTestFileSystem().mkdirs(UTIL.getDataTestDirOnTestFS());
WALS =
new WALFactory(UTIL.getConfiguration(), TestCombinedAsyncWriter.class.getSimpleName());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
if (WALS != null) {
WALS.close();
}
EVENT_LOOP_GROUP.shutdownGracefully().syncUninterruptibly();
UTIL.shutdownMiniDFSCluster();
}
@Test
public void testWithTrailer() throws IOException {
doTest(true);
}
@Test
public void testWithoutTrailer() throws IOException {
doTest(false);
}
private Path getPath(int index) throws IOException {
String methodName = name.getMethodName().replaceAll("[^A-Za-z0-9_-]", "_");
return new Path(UTIL.getDataTestDirOnTestFS(), methodName + "-" + index);
}
private void doTest(boolean withTrailer) throws IOException {
int columnCount = 5;
int recordCount = 5;
TableName tableName = TableName.valueOf("tablename");
byte[] row = Bytes.toBytes("row");
long timestamp = System.currentTimeMillis();
Path path1 = getPath(1);
Path path2 = getPath(2);
FileSystem fs = UTIL.getTestFileSystem();
Configuration conf = UTIL.getConfiguration();
try (
AsyncWriter writer1 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path1, false,
EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
AsyncWriter writer2 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path2, false,
EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
CombinedAsyncWriter writer = CombinedAsyncWriter.create(mode, writer1, writer2)) {
ProtobufLogTestHelper.doWrite(new WriterOverAsyncWriter(writer), withTrailer, tableName,
columnCount, recordCount, row, timestamp);
try (ProtobufLogReader reader = (ProtobufLogReader) WALS.createReader(fs, path1)) {
ProtobufLogTestHelper.doRead(reader, withTrailer, tableName, columnCount, recordCount, row,
timestamp);
}
try (ProtobufLogReader reader = (ProtobufLogReader) WALS.createReader(fs, path2)) {
ProtobufLogTestHelper.doRead(reader, withTrailer, tableName, columnCount, recordCount, row,
timestamp);
}
}
}
}

View File

@ -23,14 +23,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, MediumTests.class })
public class TestProtobufLog extends AbstractTestProtobufLog<WALProvider.Writer> {
public class TestProtobufLog extends AbstractTestProtobufLog {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
@ -40,14 +38,4 @@ public class TestProtobufLog extends AbstractTestProtobufLog<WALProvider.Writer>
protected Writer createWriter(Path path) throws IOException {
return FSHLogProvider.createWriter(TEST_UTIL.getConfiguration(), fs, path, false);
}
@Override
protected void append(Writer writer, Entry entry) throws IOException {
writer.append(entry);
}
@Override
protected void sync(Writer writer) throws IOException {
writer.sync(false);
}
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
class WriterOverAsyncWriter implements WALProvider.Writer {
private final WALProvider.AsyncWriter asyncWriter;
public WriterOverAsyncWriter(AsyncWriter asyncWriter) {
this.asyncWriter = asyncWriter;
}
@Override
public void close() throws IOException {
asyncWriter.close();
}
@Override
public long getLength() {
return asyncWriter.getLength();
}
@Override
public void append(Entry entry) throws IOException {
asyncWriter.append(entry);
}
@Override
public void sync(boolean forceSync) throws IOException {
try {
asyncWriter.sync().get();
} catch (InterruptedException e) {
throw new InterruptedIOException();
} catch (ExecutionException e) {
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new IOException(e.getCause());
}
}
}