HADOOP-2418 Fix assertion failures in TestTableMapReduce, TestTableIndex, and TestTableJoinMapReduce
git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@604034 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a24a76ca83
commit
438b82450a
|
@ -78,6 +78,8 @@ Trunk (unreleased changes)
|
||||||
HADOOP-2397 The only time that a meta scanner should try to recover a log is
|
HADOOP-2397 The only time that a meta scanner should try to recover a log is
|
||||||
when the master is starting
|
when the master is starting
|
||||||
HADOOP-2417 Fix critical shutdown problem introduced by HADOOP-2338
|
HADOOP-2417 Fix critical shutdown problem introduced by HADOOP-2338
|
||||||
|
HADOOP-2418 Fix assertion failures in TestTableMapReduce, TestTableIndex,
|
||||||
|
and TestTableJoinMapReduce
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HADOOP-2401 Add convenience put method that takes writable
|
HADOOP-2401 Add convenience put method that takes writable
|
||||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.hadoop.dfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseAdmin;
|
import org.apache.hadoop.hbase.HBaseAdmin;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HScannerInterface;
|
import org.apache.hadoop.hbase.HScannerInterface;
|
||||||
|
@ -68,6 +67,11 @@ public class TestTableIndex extends MultiRegionTable {
|
||||||
static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN);
|
static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN);
|
||||||
static final String ROWKEY_NAME = "key";
|
static final String ROWKEY_NAME = "key";
|
||||||
static final String INDEX_DIR = "testindex";
|
static final String INDEX_DIR = "testindex";
|
||||||
|
private static final Text[] columns = {
|
||||||
|
TEXT_INPUT_COLUMN,
|
||||||
|
TEXT_OUTPUT_COLUMN
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
private HTableDescriptor desc;
|
private HTableDescriptor desc;
|
||||||
|
|
||||||
|
@ -146,7 +150,7 @@ public class TestTableIndex extends MultiRegionTable {
|
||||||
if (printResults) {
|
if (printResults) {
|
||||||
LOG.info("Print table contents before map/reduce");
|
LOG.info("Print table contents before map/reduce");
|
||||||
}
|
}
|
||||||
scanTable(conf, printResults);
|
scanTable(printResults);
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
|
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
|
||||||
|
@ -179,10 +183,10 @@ public class TestTableIndex extends MultiRegionTable {
|
||||||
if (printResults) {
|
if (printResults) {
|
||||||
LOG.info("Print table contents after map/reduce");
|
LOG.info("Print table contents after map/reduce");
|
||||||
}
|
}
|
||||||
scanTable(conf, printResults);
|
scanTable(printResults);
|
||||||
|
|
||||||
// verify index results
|
// verify index results
|
||||||
verify(conf);
|
verify();
|
||||||
}
|
}
|
||||||
|
|
||||||
private String createIndexConfContent() {
|
private String createIndexConfContent() {
|
||||||
|
@ -218,10 +222,9 @@ public class TestTableIndex extends MultiRegionTable {
|
||||||
return c.toString();
|
return c.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void scanTable(HBaseConfiguration c, boolean printResults)
|
private void scanTable(boolean printResults)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HTable table = new HTable(c, new Text(TABLE_NAME));
|
HTable table = new HTable(conf, new Text(TABLE_NAME));
|
||||||
Text[] columns = { TEXT_INPUT_COLUMN, TEXT_OUTPUT_COLUMN };
|
|
||||||
HScannerInterface scanner = table.obtainScanner(columns,
|
HScannerInterface scanner = table.obtainScanner(columns,
|
||||||
HConstants.EMPTY_START_ROW);
|
HConstants.EMPTY_START_ROW);
|
||||||
try {
|
try {
|
||||||
|
@ -243,7 +246,16 @@ public class TestTableIndex extends MultiRegionTable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verify(HBaseConfiguration c) throws IOException {
|
private void verify() throws IOException {
|
||||||
|
// Sleep before we start the verify to ensure that when the scanner takes
|
||||||
|
// its snapshot, all the updates have made it into the cache.
|
||||||
|
try {
|
||||||
|
Thread.sleep(conf.getLong("hbase.regionserver.optionalcacheflushinterval",
|
||||||
|
60L * 1000L));
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
|
||||||
Path localDir = new Path(this.testDir, "index_" +
|
Path localDir = new Path(this.testDir, "index_" +
|
||||||
Integer.toString(new Random().nextInt()));
|
Integer.toString(new Random().nextInt()));
|
||||||
this.fs.copyToLocalFile(new Path(INDEX_DIR), localDir);
|
this.fs.copyToLocalFile(new Path(INDEX_DIR), localDir);
|
||||||
|
@ -265,15 +277,14 @@ public class TestTableIndex extends MultiRegionTable {
|
||||||
throw new IOException("no index directory found");
|
throw new IOException("no index directory found");
|
||||||
}
|
}
|
||||||
|
|
||||||
HTable table = new HTable(c, new Text(TABLE_NAME));
|
HTable table = new HTable(conf, new Text(TABLE_NAME));
|
||||||
Text[] columns = { TEXT_INPUT_COLUMN, TEXT_OUTPUT_COLUMN };
|
|
||||||
scanner = table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
|
scanner = table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
|
||||||
|
|
||||||
HStoreKey key = new HStoreKey();
|
HStoreKey key = new HStoreKey();
|
||||||
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
||||||
|
|
||||||
IndexConfiguration indexConf = new IndexConfiguration();
|
IndexConfiguration indexConf = new IndexConfiguration();
|
||||||
String content = c.get("hbase.index.conf");
|
String content = conf.get("hbase.index.conf");
|
||||||
if (content != null) {
|
if (content != null) {
|
||||||
indexConf.addFromXML(content);
|
indexConf.addFromXML(content);
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.hadoop.dfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseAdmin;
|
import org.apache.hadoop.hbase.HBaseAdmin;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HScannerInterface;
|
import org.apache.hadoop.hbase.HScannerInterface;
|
||||||
|
@ -55,7 +54,7 @@ public class TestTableMapReduce extends MultiRegionTable {
|
||||||
@SuppressWarnings("hiding")
|
@SuppressWarnings("hiding")
|
||||||
private static final Log LOG =
|
private static final Log LOG =
|
||||||
LogFactory.getLog(TestTableMapReduce.class.getName());
|
LogFactory.getLog(TestTableMapReduce.class.getName());
|
||||||
|
|
||||||
static final String SINGLE_REGION_TABLE_NAME = "srtest";
|
static final String SINGLE_REGION_TABLE_NAME = "srtest";
|
||||||
static final String MULTI_REGION_TABLE_NAME = "mrtest";
|
static final String MULTI_REGION_TABLE_NAME = "mrtest";
|
||||||
static final String INPUT_COLUMN = "contents:";
|
static final String INPUT_COLUMN = "contents:";
|
||||||
|
@ -63,6 +62,11 @@ public class TestTableMapReduce extends MultiRegionTable {
|
||||||
static final String OUTPUT_COLUMN = "text:";
|
static final String OUTPUT_COLUMN = "text:";
|
||||||
static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN);
|
static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN);
|
||||||
|
|
||||||
|
private static final Text[] columns = {
|
||||||
|
TEXT_INPUT_COLUMN,
|
||||||
|
TEXT_OUTPUT_COLUMN
|
||||||
|
};
|
||||||
|
|
||||||
private MiniDFSCluster dfsCluster = null;
|
private MiniDFSCluster dfsCluster = null;
|
||||||
private FileSystem fs;
|
private FileSystem fs;
|
||||||
private Path dir;
|
private Path dir;
|
||||||
|
@ -232,7 +236,7 @@ public class TestTableMapReduce extends MultiRegionTable {
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Print table contents before map/reduce");
|
LOG.info("Print table contents before map/reduce");
|
||||||
scanTable(conf, SINGLE_REGION_TABLE_NAME, true);
|
scanTable(SINGLE_REGION_TABLE_NAME, true);
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
|
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
|
||||||
|
@ -256,10 +260,10 @@ public class TestTableMapReduce extends MultiRegionTable {
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Print table contents after map/reduce");
|
LOG.info("Print table contents after map/reduce");
|
||||||
scanTable(conf, SINGLE_REGION_TABLE_NAME, true);
|
scanTable(SINGLE_REGION_TABLE_NAME, true);
|
||||||
|
|
||||||
// verify map-reduce results
|
// verify map-reduce results
|
||||||
verify(conf, SINGLE_REGION_TABLE_NAME);
|
verify(SINGLE_REGION_TABLE_NAME);
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
table.close();
|
table.close();
|
||||||
|
@ -311,21 +315,17 @@ public class TestTableMapReduce extends MultiRegionTable {
|
||||||
}
|
}
|
||||||
|
|
||||||
// verify map-reduce results
|
// verify map-reduce results
|
||||||
verify(conf, MULTI_REGION_TABLE_NAME);
|
verify(MULTI_REGION_TABLE_NAME);
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
table.close();
|
table.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void scanTable(HBaseConfiguration conf, String tableName,
|
private void scanTable(String tableName, boolean printValues)
|
||||||
boolean printValues) throws IOException {
|
throws IOException {
|
||||||
HTable table = new HTable(conf, new Text(tableName));
|
HTable table = new HTable(conf, new Text(tableName));
|
||||||
|
|
||||||
Text[] columns = {
|
|
||||||
TEXT_INPUT_COLUMN,
|
|
||||||
TEXT_OUTPUT_COLUMN
|
|
||||||
};
|
|
||||||
HScannerInterface scanner =
|
HScannerInterface scanner =
|
||||||
table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
|
table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
|
||||||
|
|
||||||
|
@ -350,14 +350,17 @@ public class TestTableMapReduce extends MultiRegionTable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("null")
|
@SuppressWarnings("null")
|
||||||
private void verify(HBaseConfiguration conf, String tableName)
|
private void verify(String tableName) throws IOException {
|
||||||
throws IOException {
|
// Sleep before we start the verify to ensure that when the scanner takes
|
||||||
|
// its snapshot, all the updates have made it into the cache.
|
||||||
|
try {
|
||||||
|
Thread.sleep(conf.getLong("hbase.regionserver.optionalcacheflushinterval",
|
||||||
|
60L * 1000L));
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
HTable table = new HTable(conf, new Text(tableName));
|
HTable table = new HTable(conf, new Text(tableName));
|
||||||
|
|
||||||
Text[] columns = {
|
|
||||||
TEXT_INPUT_COLUMN,
|
|
||||||
TEXT_OUTPUT_COLUMN
|
|
||||||
};
|
|
||||||
HScannerInterface scanner =
|
HScannerInterface scanner =
|
||||||
table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
|
table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.hadoop.dfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseAdmin;
|
import org.apache.hadoop.hbase.HBaseAdmin;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HScannerInterface;
|
import org.apache.hadoop.hbase.HScannerInterface;
|
||||||
|
@ -94,7 +93,10 @@ public class TestTableJoinMapReduce extends MultiRegionTable {
|
||||||
StaticTestEnvironment.shutdownDfs(dfsCluster);
|
StaticTestEnvironment.shutdownDfs(dfsCluster);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testTableJoinMapReduce() {
|
/**
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public void testTableJoinMapReduce() throws Exception {
|
||||||
HTable table = null;
|
HTable table = null;
|
||||||
try {
|
try {
|
||||||
HTableDescriptor desc = new HTableDescriptor(FIRST_RELATION);
|
HTableDescriptor desc = new HTableDescriptor(FIRST_RELATION);
|
||||||
|
@ -188,16 +190,20 @@ public class TestTableJoinMapReduce extends MultiRegionTable {
|
||||||
|
|
||||||
JobClient.runJob(jobConf);
|
JobClient.runJob(jobConf);
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
mrCluster.shutdown();
|
if (mrCluster != null) {
|
||||||
|
mrCluster.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
verify(conf, OUTPUT_TABLE);
|
verify(OUTPUT_TABLE);
|
||||||
} catch (IOException e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -208,8 +214,16 @@ public class TestTableJoinMapReduce extends MultiRegionTable {
|
||||||
* @param outputTable
|
* @param outputTable
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void verify(HBaseConfiguration conf, String outputTable)
|
private void verify(String outputTable) throws IOException {
|
||||||
throws IOException {
|
// Sleep before we start the verify to ensure that when the scanner takes
|
||||||
|
// its snapshot, all the updates have made it into the cache.
|
||||||
|
try {
|
||||||
|
Thread.sleep(conf.getLong("hbase.regionserver.optionalcacheflushinterval",
|
||||||
|
60L * 1000L));
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
|
||||||
HTable table = new HTable(conf, new Text(outputTable));
|
HTable table = new HTable(conf, new Text(outputTable));
|
||||||
Text[] columns = { new Text("a:"), new Text("b:"), new Text("c:"),
|
Text[] columns = { new Text("a:"), new Text("b:"), new Text("c:"),
|
||||||
new Text("d:"), new Text("e:") };
|
new Text("d:"), new Text("e:") };
|
||||||
|
@ -222,8 +236,8 @@ public class TestTableJoinMapReduce extends MultiRegionTable {
|
||||||
|
|
||||||
int i = 0;
|
int i = 0;
|
||||||
while (scanner.next(key, results)) {
|
while (scanner.next(key, results)) {
|
||||||
assertTrue(results.keySet().size() == 5);
|
|
||||||
LOG.info("result_table.column.size: " + results.keySet().size());
|
LOG.info("result_table.column.size: " + results.keySet().size());
|
||||||
|
assertEquals(5, results.keySet().size());
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
assertTrue(i == 3);
|
assertTrue(i == 3);
|
||||||
|
|
Loading…
Reference in New Issue