HBASE-25091 Move LogComparator from ReplicationSource to AbstractFSWALProvider#.WALsStartTimeComparator (#2449)

Give the comparator a more descriptive name, a better location,
and make it work even when passed hbase:meta WAL files.

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
Michael Stack 2020-10-01 10:04:58 -07:00 committed by GitHub
parent 11a336a74a
commit 3b91a15183
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 98 additions and 33 deletions

View File

@ -64,8 +64,8 @@ public class RecoveredReplicationSource extends ReplicationSource {
public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
boolean hasPathChanged = false;
PriorityBlockingQueue<Path> newPaths =
new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
PriorityBlockingQueue<Path> newPaths = new PriorityBlockingQueue<Path>(queueSizePerGroup,
new AbstractFSWALProvider.WALStartTimeComparator());
pathsLoop: for (Path path : queue) {
if (fs.exists(path)) { // still in same location, don't need to do anything
newPaths.add(path);

View File

@ -24,7 +24,6 @@ import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -253,7 +252,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
PriorityBlockingQueue<Path> queue = queues.get(walPrefix);
if (queue == null) {
queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
queue = new PriorityBlockingQueue<>(queueSizePerGroup,
new AbstractFSWALProvider.WALStartTimeComparator());
// make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
// the shipper may quit immediately
queue.put(wal);
@ -759,31 +759,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
return !this.server.isStopped() && this.sourceRunning;
}
/**
* Comparator used to compare logs together based on their start time
*/
public static class LogsComparator implements Comparator<Path> {
@Override
public int compare(Path o1, Path o2) {
return Long.compare(getTS(o1), getTS(o2));
}
/**
* <p>
* Split a path to get the start time
* </p>
* <p>
* For example: 10.20.20.171%3A60020.1277499063250
* </p>
* @param p path to split
* @return start time
*/
private static long getTS(Path p) {
return AbstractFSWALProvider.getWALStartTimeFromWALName(p.getName());
}
}
public ReplicationQueueInfo getReplicationQueueInfo() {
return replicationQueueInfo;
}

View File

@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hbase.wal;
import static org.apache.commons.lang3.StringUtils.isNumeric;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
@ -418,6 +420,36 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
return p != null && p.endsWith(META_WAL_PROVIDER_ID);
}
/**
* Comparator used to compare WAL files together based on their start time.
* Just compares start times and nothing else.
*/
public static class WALStartTimeComparator implements Comparator<Path> {
@Override
public int compare(Path o1, Path o2) {
return Long.compare(getTS(o1), getTS(o2));
}
/**
* Split a path to get the start time
* For example: 10.20.20.171%3A60020.1277499063250
* Could also be a meta WAL which adds a '.meta' suffix or a synchronous replication WAL
* which adds a '.syncrep' suffix. Check.
* @param p path to split
* @return start time
*/
private static long getTS(Path p) {
String name = p.getName();
String [] splits = name.split("\\.");
String ts = splits[splits.length - 1];
if (!isNumeric(ts)) {
// Its a '.meta' or a '.syncrep' suffix.
ts = splits[splits.length - 2];
}
return Long.parseLong(ts);
}
}
public static boolean isArchivedLogFile(Path p) {
String oldLog = Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME + Path.SEPARATOR;
return p.toString().contains(oldLog);
@ -545,8 +577,4 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
public static String getWALPrefixFromWALName(String name) {
return getWALNameGroupFromWALName(name, 1);
}
public static long getWALStartTimeFromWALName(String name) {
return Long.parseLong(getWALNameGroupFromWALName(name, 2));
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Comparator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, SmallTests.class})
public class TestWALProvider {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALProvider.class);
/**
* Test start time comparator.
*/
@Test
public void testWALStartTimeComparator() throws IOException {
Path metaPath1 = new Path("hdfs://localhost:59875/user/stack/test-data/" +
"f4cb8ffa-6ff7-59a6-f167-6cc00f24899a/WALs/localhost,59908,1600304600425/" +
"localhost%2C59908%2C1600304600425.meta.1600304604319.meta");
Path metaPath2 = new Path("hdfs://localhost:59875/user/stack/test-data/" +
"f4cb8ffa-6ff7-59a6-f167-6cc00f24899a/WALs/localhost,59908,1600304600425/" +
"localhost%2C59908%2C1600304600425.meta.1600304604320.meta");
Path path3 = new Path("hdfs://localhost:59875/user/stack/test-data/" +
"f4cb8ffa-6ff7-59a6-f167-6cc00f24899a/WALs/localhost,59908,1600304600425/" +
"localhost%2C59908%2C1600304600425.1600304604321");
Path metaPath4 = new Path("hdfs://localhost:59875/user/stack/test-data/" +
"f4cb8ffa-6ff7-59a6-f167-6cc00f24899a/WALs/localhost,59908,1600304600425/" +
"localhost%2C59908%2C1600304600425.meta.1600304604321.meta");
Comparator c = new AbstractFSWALProvider.WALStartTimeComparator();
assertTrue(c.compare(metaPath1, metaPath1) == 0);
assertTrue(c.compare(metaPath2, metaPath2) == 0);
assertTrue(c.compare(metaPath1, metaPath2) < 0);
assertTrue(c.compare(metaPath2, metaPath1) > 0);
assertTrue(c.compare(metaPath2, path3) < 0);
assertTrue(c.compare(path3, metaPath4) == 0);
}
}