Compare commits
5 Commits
master
...
trunk_on_h
Author | SHA1 | Date |
---|---|---|
Joe Schaefer | 6826da4264 | |
Jim Kellerman | ec216bb98e | |
Jim Kellerman | 0c24711331 | |
Jim Kellerman | 69bbf6b099 | |
Jim Kellerman | e527c8794e |
Binary file not shown.
Binary file not shown.
|
@ -770,11 +770,13 @@ public class SequenceFile {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
assert false : "hashCode not designed";
|
||||
return 42; // any arbitrary constant will do
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuffer sb = new StringBuffer();
|
||||
sb.append("size: ").append(this.theMetadata.size()).append("\n");
|
||||
|
@ -952,6 +954,13 @@ public class SequenceFile {
|
|||
}
|
||||
}
|
||||
|
||||
/** flush all currently written data to the file system */
|
||||
public void syncFs() throws IOException {
|
||||
if (out != null) {
|
||||
out.sync(); // flush contents to file system
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns the configuration of this file. */
|
||||
Configuration getConf() { return conf; }
|
||||
|
||||
|
@ -1115,10 +1124,13 @@ public class SequenceFile {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isCompressed() { return true; }
|
||||
@Override
|
||||
boolean isBlockCompressed() { return false; }
|
||||
|
||||
/** Append a key/value pair. */
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public synchronized void append(Object key, Object val)
|
||||
throws IOException {
|
||||
|
@ -1151,6 +1163,7 @@ public class SequenceFile {
|
|||
}
|
||||
|
||||
/** Append a key/value pair. */
|
||||
@Override
|
||||
public synchronized void appendRaw(byte[] keyData, int keyOffset,
|
||||
int keyLength, ValueBytes val) throws IOException {
|
||||
|
||||
|
@ -1240,7 +1253,9 @@ public class SequenceFile {
|
|||
finalizeFileHeader();
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isCompressed() { return true; }
|
||||
@Override
|
||||
boolean isBlockCompressed() { return true; }
|
||||
|
||||
/** Initialize */
|
||||
|
@ -1268,6 +1283,7 @@ public class SequenceFile {
|
|||
}
|
||||
|
||||
/** Compress and flush contents to dfs */
|
||||
@Override
|
||||
public synchronized void sync() throws IOException {
|
||||
if (noBufferedRecords > 0) {
|
||||
super.sync();
|
||||
|
@ -1297,6 +1313,7 @@ public class SequenceFile {
|
|||
}
|
||||
|
||||
/** Close the file. */
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
if (out != null) {
|
||||
sync();
|
||||
|
@ -1305,6 +1322,7 @@ public class SequenceFile {
|
|||
}
|
||||
|
||||
/** Append a key/value pair. */
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public synchronized void append(Object key, Object val)
|
||||
throws IOException {
|
||||
|
@ -1337,6 +1355,7 @@ public class SequenceFile {
|
|||
}
|
||||
|
||||
/** Append a key/value pair. */
|
||||
@Override
|
||||
public synchronized void appendRaw(byte[] keyData, int keyOffset,
|
||||
int keyLength, ValueBytes val) throws IOException {
|
||||
|
||||
|
@ -1929,6 +1948,7 @@ public class SequenceFile {
|
|||
* of the value may be computed by calling buffer.getLength() before and
|
||||
* after calls to this method. */
|
||||
/** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
|
||||
@Deprecated
|
||||
public synchronized int next(DataOutputBuffer buffer) throws IOException {
|
||||
// Unsupported for block-compressed sequence files
|
||||
if (blockCompressed) {
|
||||
|
@ -2210,6 +2230,7 @@ public class SequenceFile {
|
|||
}
|
||||
|
||||
/** Returns the name of the file. */
|
||||
@Override
|
||||
public String toString() {
|
||||
return file.toString();
|
||||
}
|
||||
|
@ -2798,6 +2819,7 @@ public class SequenceFile {
|
|||
this.tmpDir = tmpDir;
|
||||
this.progress = progress;
|
||||
}
|
||||
@Override
|
||||
protected boolean lessThan(Object a, Object b) {
|
||||
// indicate we're making progress
|
||||
if (progress != null) {
|
||||
|
@ -2933,7 +2955,7 @@ public class SequenceFile {
|
|||
totalBytes += segmentsToMerge.get(i).segmentLength;
|
||||
}
|
||||
if (totalBytes != 0) //being paranoid
|
||||
progPerByte = 1.0f / (float)totalBytes;
|
||||
progPerByte = 1.0f / totalBytes;
|
||||
//reset factor to what it originally was
|
||||
factor = origFactor;
|
||||
return this;
|
||||
|
@ -3055,6 +3077,7 @@ public class SequenceFile {
|
|||
compareTo(that.segmentPathName.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (!(o instanceof SegmentDescriptor)) {
|
||||
return false;
|
||||
|
@ -3069,6 +3092,7 @@ public class SequenceFile {
|
|||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
|
||||
}
|
||||
|
@ -3159,6 +3183,7 @@ public class SequenceFile {
|
|||
/** The default cleanup. Subclasses can override this with a custom
|
||||
* cleanup
|
||||
*/
|
||||
@Override
|
||||
public void cleanup() throws IOException {
|
||||
super.close();
|
||||
if (super.shouldPreserveInput()) return;
|
||||
|
|
|
@ -24,6 +24,8 @@ import java.io.FileNotFoundException;
|
|||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URLEncoder;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.SortedMap;
|
||||
|
@ -35,6 +37,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -48,10 +51,10 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.SequenceFile.CompressionType;
|
||||
import org.apache.hadoop.io.SequenceFile.Metadata;
|
||||
import org.apache.hadoop.io.SequenceFile.Reader;
|
||||
import org.apache.hadoop.hbase.io.SequenceFile;
|
||||
import org.apache.hadoop.hbase.io.SequenceFile.CompressionType;
|
||||
import org.apache.hadoop.hbase.io.SequenceFile.Metadata;
|
||||
import org.apache.hadoop.hbase.io.SequenceFile.Reader;
|
||||
import org.apache.hadoop.io.compress.DefaultCodec;
|
||||
|
||||
/**
|
||||
|
@ -91,6 +94,8 @@ import org.apache.hadoop.io.compress.DefaultCodec;
|
|||
public class HLog implements HConstants, Syncable {
|
||||
private static final Log LOG = LogFactory.getLog(HLog.class);
|
||||
private static final String HLOG_DATFILE = "hlog.dat.";
|
||||
private static final SimpleDateFormat DATE_FORMAT =
|
||||
new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
|
||||
static final byte [] METACOLUMN = Bytes.toBytes("METACOLUMN:");
|
||||
static final byte [] METAROW = Bytes.toBytes("METAROW");
|
||||
final FileSystem fs;
|
||||
|
@ -173,7 +178,7 @@ public class HLog implements HConstants, Syncable {
|
|||
this.flushlogentries =
|
||||
conf.getInt("hbase.regionserver.flushlogentries", 100);
|
||||
this.blocksize =
|
||||
conf.getLong("hbase.regionserver.hlog.blocksize", 1024L * 1024L);
|
||||
conf.getLong("hbase.regionserver.hlog.blocksize", 64L * 1024L * 1024L);
|
||||
this.optionalFlushInterval =
|
||||
conf.getLong("hbase.regionserver.optionallogflushinterval", 10 * 1000);
|
||||
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
|
||||
|
@ -492,9 +497,14 @@ public class HLog implements HConstants, Syncable {
|
|||
}
|
||||
}
|
||||
|
||||
// This is public only because it implements a method in Syncable.
|
||||
public void sync() throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Sync-ing " + unflushedEntries + ". Last flush time was: " +
|
||||
DATE_FORMAT.format(new Date(lastLogFlushTime)));
|
||||
}
|
||||
lastLogFlushTime = System.currentTimeMillis();
|
||||
this.writer.sync();
|
||||
this.writer.syncFs();
|
||||
unflushedEntries = 0;
|
||||
}
|
||||
|
||||
|
@ -737,16 +747,46 @@ public class HLog implements HConstants, Syncable {
|
|||
throws IOException {
|
||||
Map<byte [], SequenceFile.Writer> logWriters =
|
||||
new TreeMap<byte [], SequenceFile.Writer>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
long leaseRecoveryPeriod =
|
||||
conf.getLong("hbase.regionserver.hlog.leaserecoveryperiod", 10000);
|
||||
|
||||
try {
|
||||
for (int i = 0; i < logfiles.length; i++) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Splitting " + (i + 1) + " of " + logfiles.length + ": " +
|
||||
logfiles[i].getPath());
|
||||
}
|
||||
// Check for possibly empty file. With appends, currently Hadoop reports
|
||||
// a zero length even if the file has been sync'd. Revisit if
|
||||
// HADOOP-4751 is committed.
|
||||
boolean possiblyEmpty = logfiles[i].getLen() <= 0;
|
||||
// Recover the file's lease if necessary
|
||||
try {
|
||||
while (true) {
|
||||
try {
|
||||
FSDataOutputStream out = fs.append(logfiles[i].getPath());
|
||||
out.close();
|
||||
break;
|
||||
} catch (IOException e) {
|
||||
e = RemoteExceptionHandler.checkIOException(e);
|
||||
if (e instanceof EOFException) {
|
||||
throw e;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Triggering lease recovery.");
|
||||
}
|
||||
}
|
||||
try {
|
||||
Thread.sleep(leaseRecoveryPeriod);
|
||||
} catch (InterruptedException ex) {
|
||||
// ignore it and try again
|
||||
}
|
||||
}
|
||||
} catch (EOFException e) {
|
||||
// file is empty, skip it
|
||||
continue;
|
||||
}
|
||||
if (logfiles[i].getLen() <= 0) {
|
||||
// File is empty, skip it.
|
||||
continue;
|
||||
}
|
||||
HLogKey key = new HLogKey();
|
||||
HLogEdit val = new HLogEdit();
|
||||
try {
|
||||
|
@ -815,6 +855,15 @@ public class HLog implements HConstants, Syncable {
|
|||
} catch (IOException e) {
|
||||
LOG.warn("Close in finally threw exception -- continuing", e);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e = RemoteExceptionHandler.checkIOException(e);
|
||||
if (e instanceof EOFException) {
|
||||
// No recoverable data in file. Skip it.
|
||||
continue;
|
||||
}
|
||||
|
||||
} finally {
|
||||
// Delete the input file now so we do not replay edits. We could
|
||||
// have gotten here because of an exception. If so, probably
|
||||
// nothing we can do about it. Replaying it, it could work but we
|
||||
|
@ -822,12 +871,6 @@ public class HLog implements HConstants, Syncable {
|
|||
// could have lost some edits.
|
||||
fs.delete(logfiles[i].getPath(), true);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (possiblyEmpty) {
|
||||
continue;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
for (SequenceFile.Writer w : logWriters.values()) {
|
||||
|
|
|
@ -0,0 +1,114 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.io;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.EOFException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.hbase.io.SequenceFile;
|
||||
|
||||
/** Tries to read the file created by Writer */
|
||||
public class Reader {
|
||||
|
||||
/**
|
||||
* @param args
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void main(String[] args) throws IOException {
|
||||
if (args.length < 1 || args.length > 2) {
|
||||
System.err.println("usage: Reader expected-number-of-records [ -n ]");
|
||||
System.err.println(" where -n = do not try to recover lease");
|
||||
return;
|
||||
}
|
||||
int expected = Integer.valueOf(args[0]);
|
||||
boolean recover = true;
|
||||
if (args.length == 2 && args[1].compareTo("-n") == 0) {
|
||||
recover = false;
|
||||
}
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
Path dir = new Path(conf.get("fs.default.name"), "log");
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
if (!(fs instanceof DistributedFileSystem)) {
|
||||
throw new IOException("Wrong file system: " + fs.getClass().getName());
|
||||
}
|
||||
|
||||
if (recover) {
|
||||
waitForLeaseRecovery(fs, new Path(dir, "log"));
|
||||
}
|
||||
|
||||
SequenceFile.Reader in = null;
|
||||
try {
|
||||
in = new SequenceFile.Reader(fs, new Path(dir, "log"), conf);
|
||||
} catch (EOFException e) {
|
||||
if (expected != 0) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
IntWritable key = new IntWritable();
|
||||
BytesWritable value = new BytesWritable();
|
||||
int count = 0;
|
||||
IOException ex = null;
|
||||
try {
|
||||
while (in.next(key, value)) {
|
||||
count++;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
ex = e;
|
||||
}
|
||||
if (expected != count) {
|
||||
System.err.println("Read " + count + " lines, expected " + expected +
|
||||
" lines");
|
||||
}
|
||||
in.close();
|
||||
if (ex != null) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
static void waitForLeaseRecovery(FileSystem fs, Path file) {
|
||||
boolean done = false;
|
||||
while (!done) {
|
||||
try {
|
||||
Thread.sleep(10*1000);
|
||||
} catch (InterruptedException e) {
|
||||
System.out.println("Sleep interrupted.");
|
||||
}
|
||||
try {
|
||||
FSDataOutputStream out = fs.append(file);
|
||||
out.close();
|
||||
done = true;
|
||||
} catch (IOException e) {
|
||||
System.out.println("Triggering lease recovery if needed.");
|
||||
}
|
||||
}
|
||||
System.out.println("Lease Recovery Successful");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.io;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.compress.DefaultCodec;
|
||||
import org.apache.hadoop.hbase.io.SequenceFile;
|
||||
|
||||
/** Writes to a Sequence file and then commits suicide */
|
||||
public class Writer {
|
||||
private static byte[] bytes = new byte[1020];
|
||||
private static BytesWritable value;
|
||||
|
||||
/**
|
||||
* @param args
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void main(String[] args) throws IOException {
|
||||
if (args.length != 3) {
|
||||
System.err.println("usage: Writer total-writes writes-per-sync block-size-mb");
|
||||
return;
|
||||
}
|
||||
long blocksize = Long.valueOf(args[2]);
|
||||
if (blocksize != 1L && blocksize != 64L) {
|
||||
System.err.println("Only 1MB and 64MB blocksizes are allowed");
|
||||
return;
|
||||
}
|
||||
blocksize *= 1024L * 1024L;
|
||||
|
||||
for (int i = 0; i < bytes.length; i++) {
|
||||
bytes[i] = (byte)(i % 64);
|
||||
}
|
||||
value = new BytesWritable(bytes);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
Path dir = new Path(conf.get("fs.default.name"), "log");
|
||||
conf.set("fs.default.name", dir.toString());
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
if (!(fs instanceof DistributedFileSystem)) {
|
||||
throw new IOException("Wrong file system: " + fs.getClass().getName());
|
||||
}
|
||||
|
||||
fs.mkdirs(dir);
|
||||
Path file = new Path(dir, "log");
|
||||
if (fs.exists(file)) {
|
||||
fs.delete(file, false);
|
||||
}
|
||||
|
||||
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, file,
|
||||
IntWritable.class, BytesWritable.class,
|
||||
fs.getConf().getInt("io.file.buffer.size", 4096),
|
||||
fs.getDefaultReplication(), blocksize,
|
||||
SequenceFile.CompressionType.NONE, new DefaultCodec(), null,
|
||||
new SequenceFile.Metadata());
|
||||
|
||||
int totalWrites = Integer.valueOf(args[0]);
|
||||
int writesPerSync = Integer.valueOf(args[1]);
|
||||
for (int i = 1; i <= totalWrites; i++) {
|
||||
writer.append(new IntWritable(i), value);
|
||||
if (i % writesPerSync == 0) {
|
||||
writer.syncFs();
|
||||
}
|
||||
}
|
||||
|
||||
// The following *should* prevent hdfs shutdown hook from running
|
||||
Runtime.getRuntime().halt(-1);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue