HBASE-17480 Remove split region code from Region Server (Stephen Yuan Jiang)
This commit is contained in:
parent
cb9ce2ceaf
commit
bff7c4f1fd
|
@ -1,265 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.Server;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|
||||||
import org.apache.hadoop.hbase.security.User;
|
|
||||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Executes region split as a "transaction". Call {@link #prepare()} to setup
|
|
||||||
* the transaction, {@link #execute(Server, RegionServerServices)} to run the
|
|
||||||
* transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if execute fails.
|
|
||||||
*
|
|
||||||
* <p>Here is an example of how you would use this interface:
|
|
||||||
* <pre>
|
|
||||||
* SplitTransactionFactory factory = new SplitTransactionFactory(conf);
|
|
||||||
* SplitTransaction st = factory.create(parent, midKey)
|
|
||||||
* .registerTransactionListener(new TransactionListener() {
|
|
||||||
* public void transition(SplitTransaction transaction, SplitTransactionPhase from,
|
|
||||||
* SplitTransactionPhase to) throws IOException {
|
|
||||||
* // ...
|
|
||||||
* }
|
|
||||||
* public void rollback(SplitTransaction transaction, SplitTransactionPhase from,
|
|
||||||
* SplitTransactionPhase to) {
|
|
||||||
* // ...
|
|
||||||
* }
|
|
||||||
* });
|
|
||||||
* if (!st.prepare()) return;
|
|
||||||
* try {
|
|
||||||
* st.execute(server, services);
|
|
||||||
* } catch (IOException e) {
|
|
||||||
* try {
|
|
||||||
* st.rollback(server, services);
|
|
||||||
* return;
|
|
||||||
* } catch (RuntimeException e) {
|
|
||||||
* // abort the server
|
|
||||||
* }
|
|
||||||
* }
|
|
||||||
* </Pre>
|
|
||||||
* <p>A split transaction is not thread safe. Callers must ensure a split is run by
|
|
||||||
* one thread only.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
|
|
||||||
@InterfaceStability.Evolving
|
|
||||||
public interface SplitTransaction {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Each enum is a step in the split transaction.
|
|
||||||
*/
|
|
||||||
public enum SplitTransactionPhase {
|
|
||||||
/**
|
|
||||||
* Started
|
|
||||||
*/
|
|
||||||
STARTED,
|
|
||||||
/**
|
|
||||||
* Prepared
|
|
||||||
*/
|
|
||||||
PREPARED,
|
|
||||||
/**
|
|
||||||
* Before preSplit coprocessor hook
|
|
||||||
*/
|
|
||||||
BEFORE_PRE_SPLIT_HOOK,
|
|
||||||
/**
|
|
||||||
* After preSplit coprocessor hook
|
|
||||||
*/
|
|
||||||
AFTER_PRE_SPLIT_HOOK,
|
|
||||||
/**
|
|
||||||
* Set region as in transition, set it into SPLITTING state.
|
|
||||||
*/
|
|
||||||
SET_SPLITTING,
|
|
||||||
/**
|
|
||||||
* We created the temporary split data directory.
|
|
||||||
*/
|
|
||||||
CREATE_SPLIT_DIR,
|
|
||||||
/**
|
|
||||||
* Closed the parent region.
|
|
||||||
*/
|
|
||||||
CLOSED_PARENT_REGION,
|
|
||||||
/**
|
|
||||||
* The parent has been taken out of the server's online regions list.
|
|
||||||
*/
|
|
||||||
OFFLINED_PARENT,
|
|
||||||
/**
|
|
||||||
* Started in on creation of the first daughter region.
|
|
||||||
*/
|
|
||||||
STARTED_REGION_A_CREATION,
|
|
||||||
/**
|
|
||||||
* Started in on the creation of the second daughter region.
|
|
||||||
*/
|
|
||||||
STARTED_REGION_B_CREATION,
|
|
||||||
/**
|
|
||||||
* Opened the first daughter region
|
|
||||||
*/
|
|
||||||
OPENED_REGION_A,
|
|
||||||
/**
|
|
||||||
* Opened the second daughter region
|
|
||||||
*/
|
|
||||||
OPENED_REGION_B,
|
|
||||||
/**
|
|
||||||
* Point of no return.
|
|
||||||
* If we got here, then transaction is not recoverable other than by
|
|
||||||
* crashing out the regionserver.
|
|
||||||
*/
|
|
||||||
PONR,
|
|
||||||
/**
|
|
||||||
* Before postSplit coprocessor hook
|
|
||||||
*/
|
|
||||||
BEFORE_POST_SPLIT_HOOK,
|
|
||||||
/**
|
|
||||||
* After postSplit coprocessor hook
|
|
||||||
*/
|
|
||||||
AFTER_POST_SPLIT_HOOK,
|
|
||||||
/**
|
|
||||||
* Completed
|
|
||||||
*/
|
|
||||||
COMPLETED
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Split transaction journal entry
|
|
||||||
*/
|
|
||||||
public interface JournalEntry {
|
|
||||||
|
|
||||||
/** @return the completed phase marked by this journal entry */
|
|
||||||
SplitTransactionPhase getPhase();
|
|
||||||
|
|
||||||
/** @return the time of phase completion */
|
|
||||||
long getTimeStamp();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Split transaction listener
|
|
||||||
*/
|
|
||||||
public interface TransactionListener {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Invoked when transitioning forward from one transaction phase to another
|
|
||||||
* @param transaction the transaction
|
|
||||||
* @param from the current phase
|
|
||||||
* @param to the next phase
|
|
||||||
* @throws IOException listener can throw this to abort
|
|
||||||
*/
|
|
||||||
void transition(SplitTransaction transaction, SplitTransactionPhase from,
|
|
||||||
SplitTransactionPhase to) throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Invoked when rolling back a transaction from one transaction phase to the
|
|
||||||
* previous
|
|
||||||
* @param transaction the transaction
|
|
||||||
* @param from the current phase
|
|
||||||
* @param to the previous phase
|
|
||||||
*/
|
|
||||||
void rollback(SplitTransaction transaction, SplitTransactionPhase from,
|
|
||||||
SplitTransactionPhase to);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check split inputs and prepare the transaction.
|
|
||||||
* @return <code>true</code> if the region is splittable else
|
|
||||||
* <code>false</code> if it is not (e.g. its already closed, etc.).
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
boolean prepare() throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Run the transaction.
|
|
||||||
* @param server Hosting server instance. Can be null when testing.
|
|
||||||
* @param services Used to online/offline regions.
|
|
||||||
* @throws IOException If thrown, transaction failed.
|
|
||||||
* Call {@link #rollback(Server, RegionServerServices)}
|
|
||||||
* @return Regions created
|
|
||||||
* @throws IOException
|
|
||||||
* @see #rollback(Server, RegionServerServices)
|
|
||||||
* @deprecated use #execute(Server, RegionServerServices, User); as of 1.0.2, remove in 3.0
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
PairOfSameType<Region> execute(Server server, RegionServerServices services) throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Run the transaction.
|
|
||||||
* @param server Hosting server instance. Can be null when testing.
|
|
||||||
* @param services Used to online/offline regions.
|
|
||||||
* @param user
|
|
||||||
* @throws IOException If thrown, transaction failed.
|
|
||||||
* Call {@link #rollback(Server, RegionServerServices)}
|
|
||||||
* @return Regions created
|
|
||||||
* @throws IOException
|
|
||||||
* @see #rollback(Server, RegionServerServices)
|
|
||||||
*/
|
|
||||||
PairOfSameType<Region> execute(Server server, RegionServerServices services, User user)
|
|
||||||
throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Roll back a failed transaction
|
|
||||||
* @param server Hosting server instance (May be null when testing).
|
|
||||||
* @param services
|
|
||||||
* @throws IOException If thrown, rollback failed. Take drastic action.
|
|
||||||
* @return True if we successfully rolled back, false if we got to the point
|
|
||||||
* of no return and so now need to abort the server to minimize damage.
|
|
||||||
* @deprecated use #rollback(Server, RegionServerServices, User); as of 1.0.2, remove in 3.0
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
boolean rollback(Server server, RegionServerServices services) throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Roll back a failed transaction
|
|
||||||
* @param server Hosting server instance (May be null when testing).
|
|
||||||
* @param services
|
|
||||||
* @param user
|
|
||||||
* @throws IOException If thrown, rollback failed. Take drastic action.
|
|
||||||
* @return True if we successfully rolled back, false if we got to the point
|
|
||||||
* of no return and so now need to abort the server to minimize damage.
|
|
||||||
*/
|
|
||||||
boolean rollback(Server server, RegionServerServices services, User user) throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Register a listener for transaction preparation, execution, and possibly
|
|
||||||
* rollback phases.
|
|
||||||
* <p>A listener can abort a transaction by throwing an exception.
|
|
||||||
* @param listener the listener
|
|
||||||
* @return 'this' for chaining
|
|
||||||
*/
|
|
||||||
SplitTransaction registerTransactionListener(TransactionListener listener);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the journal for the transaction.
|
|
||||||
* <p>Journal entries are an opaque type represented as JournalEntry. They can
|
|
||||||
* also provide useful debugging information via their toString method.
|
|
||||||
* @return the transaction journal
|
|
||||||
*/
|
|
||||||
List<JournalEntry> getJournal();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the Server running the transaction or rollback
|
|
||||||
* @return server instance
|
|
||||||
*/
|
|
||||||
Server getServer();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the RegonServerServices of the server running the transaction or rollback
|
|
||||||
* @return region server services
|
|
||||||
*/
|
|
||||||
RegionServerServices getRegionServerServices();
|
|
||||||
}
|
|
|
@ -1,74 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configurable;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|
||||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A factory for creating SplitTransactions, which execute region split as a "transaction".
|
|
||||||
* See {@link SplitTransaction}
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
|
|
||||||
@InterfaceStability.Evolving
|
|
||||||
public class SplitTransactionFactory implements Configurable {
|
|
||||||
|
|
||||||
public static final String SPLIT_TRANSACTION_IMPL_KEY =
|
|
||||||
"hbase.regionserver.split.transaction.impl";
|
|
||||||
|
|
||||||
private Configuration conf;
|
|
||||||
|
|
||||||
public SplitTransactionFactory(Configuration conf) {
|
|
||||||
this.conf = conf;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Configuration getConf() {
|
|
||||||
return conf;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setConf(Configuration conf) {
|
|
||||||
this.conf = conf;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a split transaction
|
|
||||||
* @param r the region to split
|
|
||||||
* @param splitrow the split point in the keyspace
|
|
||||||
* @return transaction instance
|
|
||||||
*/
|
|
||||||
public SplitTransaction create(final Region r, final byte [] splitrow) {
|
|
||||||
return ReflectionUtils.instantiateWithCustomCtor(
|
|
||||||
// The implementation class must extend SplitTransactionImpl, not only
|
|
||||||
// implement the SplitTransaction interface like you might expect,
|
|
||||||
// because various places such as AssignmentManager use static methods
|
|
||||||
// from SplitTransactionImpl. Whatever we use for implementation must
|
|
||||||
// be compatible, so it's safest to require ? extends SplitTransactionImpl.
|
|
||||||
// If not compatible we will throw a runtime exception from here.
|
|
||||||
conf.getClass(SPLIT_TRANSACTION_IMPL_KEY, SplitTransactionImpl.class,
|
|
||||||
SplitTransactionImpl.class).getName(),
|
|
||||||
new Class[] { Region.class, byte[].class },
|
|
||||||
new Object[] { r, splitrow });
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,832 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InterruptedIOException;
|
|
||||||
import java.security.PrivilegedExceptionAction;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.ListIterator;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
|
||||||
import org.apache.hadoop.hbase.Server;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
|
||||||
import org.apache.hadoop.hbase.client.Mutation;
|
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
|
||||||
import org.apache.hadoop.hbase.security.User;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
|
||||||
import org.apache.hadoop.hbase.util.HasThread;
|
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
|
||||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
|
||||||
import org.apache.zookeeper.KeeperException;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class SplitTransactionImpl implements SplitTransaction {
|
|
||||||
private static final Log LOG = LogFactory.getLog(SplitTransactionImpl.class);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Region to split
|
|
||||||
*/
|
|
||||||
private final HRegion parent;
|
|
||||||
private HRegionInfo hri_a;
|
|
||||||
private HRegionInfo hri_b;
|
|
||||||
private long fileSplitTimeout = 30000;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Row to split around
|
|
||||||
*/
|
|
||||||
private final byte [] splitrow;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Transaction state for listener, only valid during execute and
|
|
||||||
* rollback
|
|
||||||
*/
|
|
||||||
private SplitTransactionPhase currentPhase = SplitTransactionPhase.STARTED;
|
|
||||||
private Server server;
|
|
||||||
private RegionServerServices rsServices;
|
|
||||||
|
|
||||||
public static class JournalEntryImpl implements JournalEntry {
|
|
||||||
private SplitTransactionPhase type;
|
|
||||||
private long timestamp;
|
|
||||||
|
|
||||||
public JournalEntryImpl(SplitTransactionPhase type) {
|
|
||||||
this(type, EnvironmentEdgeManager.currentTime());
|
|
||||||
}
|
|
||||||
|
|
||||||
public JournalEntryImpl(SplitTransactionPhase type, long timestamp) {
|
|
||||||
this.type = type;
|
|
||||||
this.timestamp = timestamp;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
sb.append(type);
|
|
||||||
sb.append(" at ");
|
|
||||||
sb.append(timestamp);
|
|
||||||
return sb.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SplitTransactionPhase getPhase() {
|
|
||||||
return type;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getTimeStamp() {
|
|
||||||
return timestamp;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Journal of how far the split transaction has progressed.
|
|
||||||
*/
|
|
||||||
private final ArrayList<JournalEntry> journal = new ArrayList<JournalEntry>();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Listeners
|
|
||||||
*/
|
|
||||||
private final ArrayList<TransactionListener> listeners = new ArrayList<TransactionListener>();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructor
|
|
||||||
* @param r Region to split
|
|
||||||
* @param splitrow Row to split around
|
|
||||||
*/
|
|
||||||
public SplitTransactionImpl(final Region r, final byte [] splitrow) {
|
|
||||||
this.parent = (HRegion)r;
|
|
||||||
this.splitrow = splitrow;
|
|
||||||
this.journal.add(new JournalEntryImpl(SplitTransactionPhase.STARTED));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void transition(SplitTransactionPhase nextPhase) throws IOException {
|
|
||||||
transition(nextPhase, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void transition(SplitTransactionPhase nextPhase, boolean isRollback)
|
|
||||||
throws IOException {
|
|
||||||
if (!isRollback) {
|
|
||||||
// Add to the journal first, because if the listener throws an exception
|
|
||||||
// we need to roll back starting at 'nextPhase'
|
|
||||||
this.journal.add(new JournalEntryImpl(nextPhase));
|
|
||||||
}
|
|
||||||
for (int i = 0; i < listeners.size(); i++) {
|
|
||||||
TransactionListener listener = listeners.get(i);
|
|
||||||
if (!isRollback) {
|
|
||||||
listener.transition(this, currentPhase, nextPhase);
|
|
||||||
} else {
|
|
||||||
listener.rollback(this, currentPhase, nextPhase);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
currentPhase = nextPhase;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean prepare() throws IOException {
|
|
||||||
if (!this.parent.isSplittable()) return false;
|
|
||||||
// Split key can be null if this region is unsplittable; i.e. has refs.
|
|
||||||
if (this.splitrow == null) return false;
|
|
||||||
HRegionInfo hri = this.parent.getRegionInfo();
|
|
||||||
parent.prepareToSplit();
|
|
||||||
// Check splitrow.
|
|
||||||
byte [] startKey = hri.getStartKey();
|
|
||||||
byte [] endKey = hri.getEndKey();
|
|
||||||
if (Bytes.equals(startKey, splitrow) ||
|
|
||||||
!this.parent.getRegionInfo().containsRow(splitrow)) {
|
|
||||||
LOG.info("Split row is not inside region key range or is equal to " +
|
|
||||||
"startkey: " + Bytes.toStringBinary(this.splitrow));
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
long rid = getDaughterRegionIdTimestamp(hri);
|
|
||||||
this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
|
|
||||||
this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
|
|
||||||
|
|
||||||
transition(SplitTransactionPhase.PREPARED);
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Calculate daughter regionid to use.
|
|
||||||
* @param hri Parent {@link HRegionInfo}
|
|
||||||
* @return Daughter region id (timestamp) to use.
|
|
||||||
*/
|
|
||||||
private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
|
|
||||||
long rid = EnvironmentEdgeManager.currentTime();
|
|
||||||
// Regionid is timestamp. Can't be less than that of parent else will insert
|
|
||||||
// at wrong location in hbase:meta (See HBASE-710).
|
|
||||||
if (rid < hri.getRegionId()) {
|
|
||||||
LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
|
|
||||||
" but current time here is " + rid);
|
|
||||||
rid = hri.getRegionId() + 1;
|
|
||||||
}
|
|
||||||
return rid;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static IOException closedByOtherException = new IOException(
|
|
||||||
"Failed to close region: already closed by another thread");
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Prepare the regions and region files.
|
|
||||||
* @param server Hosting server instance. Can be null when testing (won't try
|
|
||||||
* and update in zk if a null server)
|
|
||||||
* @param services Used to online/offline regions.
|
|
||||||
* @param user
|
|
||||||
* @throws IOException If thrown, transaction failed.
|
|
||||||
* Call {@link #rollback(Server, RegionServerServices)}
|
|
||||||
* @return Regions created
|
|
||||||
*/
|
|
||||||
@VisibleForTesting
|
|
||||||
PairOfSameType<Region> createDaughters(final Server server,
|
|
||||||
final RegionServerServices services, User user) throws IOException {
|
|
||||||
LOG.info("Starting split of region " + this.parent);
|
|
||||||
if ((server != null && server.isStopped()) ||
|
|
||||||
(services != null && services.isStopping())) {
|
|
||||||
throw new IOException("Server is stopped or stopping");
|
|
||||||
}
|
|
||||||
assert !this.parent.lock.writeLock().isHeldByCurrentThread():
|
|
||||||
"Unsafe to hold write lock while performing RPCs";
|
|
||||||
|
|
||||||
transition(SplitTransactionPhase.BEFORE_PRE_SPLIT_HOOK);
|
|
||||||
|
|
||||||
// Coprocessor callback
|
|
||||||
if (this.parent.getCoprocessorHost() != null) {
|
|
||||||
// TODO: Remove one of these
|
|
||||||
parent.getCoprocessorHost().preSplit(user);
|
|
||||||
parent.getCoprocessorHost().preSplit(splitrow, user);
|
|
||||||
}
|
|
||||||
|
|
||||||
transition(SplitTransactionPhase.AFTER_PRE_SPLIT_HOOK);
|
|
||||||
|
|
||||||
// If true, no cluster to write meta edits to or to update znodes in.
|
|
||||||
boolean testing = server == null? true:
|
|
||||||
server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
|
|
||||||
this.fileSplitTimeout = testing ? this.fileSplitTimeout :
|
|
||||||
server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
|
|
||||||
this.fileSplitTimeout);
|
|
||||||
|
|
||||||
PairOfSameType<Region> daughterRegions = stepsBeforePONR(server, services, testing);
|
|
||||||
|
|
||||||
final List<Mutation> metaEntries = new ArrayList<Mutation>();
|
|
||||||
boolean ret = false;
|
|
||||||
if (this.parent.getCoprocessorHost() != null) {
|
|
||||||
ret = parent.getCoprocessorHost().preSplitBeforePONR(splitrow, metaEntries, user);
|
|
||||||
if (ret) {
|
|
||||||
throw new IOException("Coprocessor bypassing region "
|
|
||||||
+ parent.getRegionInfo().getRegionNameAsString() + " split.");
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
for (Mutation p : metaEntries) {
|
|
||||||
HRegionInfo.parseRegionName(p.getRow());
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Row key of mutation from coprocessor is not parsable as region name."
|
|
||||||
+ "Mutations from coprocessor should only for hbase:meta table.");
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This is the point of no return. Adding subsequent edits to .META. as we
|
|
||||||
// do below when we do the daughter opens adding each to .META. can fail in
|
|
||||||
// various interesting ways the most interesting of which is a timeout
|
|
||||||
// BUT the edits all go through (See HBASE-3872). IF we reach the PONR
|
|
||||||
// then subsequent failures need to crash out this regionserver; the
|
|
||||||
// server shutdown processing should be able to fix-up the incomplete split.
|
|
||||||
// The offlined parent will have the daughters as extra columns. If
|
|
||||||
// we leave the daughter regions in place and do not remove them when we
|
|
||||||
// crash out, then they will have their references to the parent in place
|
|
||||||
// still and the server shutdown fixup of .META. will point to these
|
|
||||||
// regions.
|
|
||||||
// We should add PONR JournalEntry before offlineParentInMeta,so even if
|
|
||||||
// OfflineParentInMeta timeout,this will cause regionserver exit,and then
|
|
||||||
// master ServerShutdownHandler will fix daughter & avoid data loss. (See
|
|
||||||
// HBase-4562).
|
|
||||||
|
|
||||||
transition(SplitTransactionPhase.PONR);
|
|
||||||
|
|
||||||
// Edit parent in meta. Offlines parent region and adds splita and splitb
|
|
||||||
// as an atomic update. See HBASE-7721. This update to META makes the region
|
|
||||||
// will determine whether the region is split or not in case of failures.
|
|
||||||
// If it is successful, master will roll-forward, if not, master will rollback
|
|
||||||
// and assign the parent region.
|
|
||||||
if (services != null && !services.reportRegionStateTransition(TransitionCode.SPLIT_PONR,
|
|
||||||
parent.getRegionInfo(), hri_a, hri_b)) {
|
|
||||||
// Passed PONR, let SSH clean it up
|
|
||||||
throw new IOException("Failed to notify master that split passed PONR: "
|
|
||||||
+ parent.getRegionInfo().getRegionNameAsString());
|
|
||||||
}
|
|
||||||
return daughterRegions;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
|
|
||||||
p.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes
|
|
||||||
.toBytes(sn.getHostAndPort()));
|
|
||||||
p.addColumn(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, Bytes.toBytes(sn
|
|
||||||
.getStartcode()));
|
|
||||||
p.addColumn(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, Bytes.toBytes(openSeqNum));
|
|
||||||
return p;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public PairOfSameType<Region> stepsBeforePONR(final Server server,
|
|
||||||
final RegionServerServices services, boolean testing) throws IOException {
|
|
||||||
if (services != null && !services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT,
|
|
||||||
parent.getRegionInfo(), hri_a, hri_b)) {
|
|
||||||
throw new IOException("Failed to get ok from master to split "
|
|
||||||
+ parent.getRegionInfo().getRegionNameAsString());
|
|
||||||
}
|
|
||||||
|
|
||||||
transition(SplitTransactionPhase.SET_SPLITTING);
|
|
||||||
|
|
||||||
this.parent.getRegionFileSystem().createSplitsDir();
|
|
||||||
|
|
||||||
transition(SplitTransactionPhase.CREATE_SPLIT_DIR);
|
|
||||||
|
|
||||||
Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
|
|
||||||
Exception exceptionToThrow = null;
|
|
||||||
try{
|
|
||||||
hstoreFilesToSplit = this.parent.close(false);
|
|
||||||
} catch (Exception e) {
|
|
||||||
exceptionToThrow = e;
|
|
||||||
}
|
|
||||||
if (exceptionToThrow == null && hstoreFilesToSplit == null) {
|
|
||||||
// The region was closed by a concurrent thread. We can't continue
|
|
||||||
// with the split, instead we must just abandon the split. If we
|
|
||||||
// reopen or split this could cause problems because the region has
|
|
||||||
// probably already been moved to a different server, or is in the
|
|
||||||
// process of moving to a different server.
|
|
||||||
exceptionToThrow = closedByOtherException;
|
|
||||||
}
|
|
||||||
if (exceptionToThrow != closedByOtherException) {
|
|
||||||
transition(SplitTransactionPhase.CLOSED_PARENT_REGION);
|
|
||||||
}
|
|
||||||
if (exceptionToThrow != null) {
|
|
||||||
if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
|
|
||||||
throw new IOException(exceptionToThrow);
|
|
||||||
}
|
|
||||||
if (!testing) {
|
|
||||||
services.removeFromOnlineRegions(this.parent, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
transition(SplitTransactionPhase.OFFLINED_PARENT);
|
|
||||||
|
|
||||||
// TODO: If splitStoreFiles were multithreaded would we complete steps in
|
|
||||||
// less elapsed time? St.Ack 20100920
|
|
||||||
//
|
|
||||||
// splitStoreFiles creates daughter region dirs under the parent splits dir
|
|
||||||
// Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
|
|
||||||
// clean this up.
|
|
||||||
Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);
|
|
||||||
|
|
||||||
// Log to the journal that we are creating region A, the first daughter
|
|
||||||
// region. We could fail halfway through. If we do, we could have left
|
|
||||||
// stuff in fs that needs cleanup -- a storefile or two. Thats why we
|
|
||||||
// add entry to journal BEFORE rather than AFTER the change.
|
|
||||||
|
|
||||||
transition(SplitTransactionPhase.STARTED_REGION_A_CREATION);
|
|
||||||
|
|
||||||
assertReferenceFileCount(expectedReferences.getFirst(),
|
|
||||||
this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));
|
|
||||||
HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
|
|
||||||
assertReferenceFileCount(expectedReferences.getFirst(),
|
|
||||||
new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));
|
|
||||||
|
|
||||||
// Ditto
|
|
||||||
|
|
||||||
transition(SplitTransactionPhase.STARTED_REGION_B_CREATION);
|
|
||||||
|
|
||||||
assertReferenceFileCount(expectedReferences.getSecond(),
|
|
||||||
this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));
|
|
||||||
HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
|
|
||||||
assertReferenceFileCount(expectedReferences.getSecond(),
|
|
||||||
new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));
|
|
||||||
|
|
||||||
return new PairOfSameType<Region>(a, b);
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
void assertReferenceFileCount(int expectedReferenceFileCount, Path dir)
|
|
||||||
throws IOException {
|
|
||||||
if (expectedReferenceFileCount != 0 &&
|
|
||||||
expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(parent.getFilesystem(),
|
|
||||||
dir)) {
|
|
||||||
throw new IOException("Failing split. Expected reference file count isn't equal.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Perform time consuming opening of the daughter regions.
|
|
||||||
* @param server Hosting server instance. Can be null when testing
|
|
||||||
* @param services Used to online/offline regions.
|
|
||||||
* @param a first daughter region
|
|
||||||
* @param a second daughter region
|
|
||||||
* @throws IOException If thrown, transaction failed.
|
|
||||||
* Call {@link #rollback(Server, RegionServerServices)}
|
|
||||||
*/
|
|
||||||
@VisibleForTesting
|
|
||||||
void openDaughters(final Server server, final RegionServerServices services, Region a,
|
|
||||||
Region b) throws IOException {
|
|
||||||
boolean stopped = server != null && server.isStopped();
|
|
||||||
boolean stopping = services != null && services.isStopping();
|
|
||||||
// TODO: Is this check needed here?
|
|
||||||
if (stopped || stopping) {
|
|
||||||
LOG.info("Not opening daughters " +
|
|
||||||
b.getRegionInfo().getRegionNameAsString() +
|
|
||||||
" and " +
|
|
||||||
a.getRegionInfo().getRegionNameAsString() +
|
|
||||||
" because stopping=" + stopping + ", stopped=" + stopped);
|
|
||||||
} else {
|
|
||||||
// Open daughters in parallel.
|
|
||||||
DaughterOpener aOpener = new DaughterOpener(server, a);
|
|
||||||
DaughterOpener bOpener = new DaughterOpener(server, b);
|
|
||||||
aOpener.start();
|
|
||||||
bOpener.start();
|
|
||||||
try {
|
|
||||||
aOpener.join();
|
|
||||||
if (aOpener.getException() == null) {
|
|
||||||
transition(SplitTransactionPhase.OPENED_REGION_A);
|
|
||||||
}
|
|
||||||
bOpener.join();
|
|
||||||
if (bOpener.getException() == null) {
|
|
||||||
transition(SplitTransactionPhase.OPENED_REGION_B);
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
|
|
||||||
}
|
|
||||||
if (aOpener.getException() != null) {
|
|
||||||
throw new IOException("Failed " +
|
|
||||||
aOpener.getName(), aOpener.getException());
|
|
||||||
}
|
|
||||||
if (bOpener.getException() != null) {
|
|
||||||
throw new IOException("Failed " +
|
|
||||||
bOpener.getName(), bOpener.getException());
|
|
||||||
}
|
|
||||||
if (services != null) {
|
|
||||||
if (!services.reportRegionStateTransition(TransitionCode.SPLIT,
|
|
||||||
parent.getRegionInfo(), hri_a, hri_b)) {
|
|
||||||
throw new IOException("Failed to report split region to master: "
|
|
||||||
+ parent.getRegionInfo().getShortNameToLog());
|
|
||||||
}
|
|
||||||
// Should add it to OnlineRegions
|
|
||||||
services.addToOnlineRegions(b);
|
|
||||||
services.addToOnlineRegions(a);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public PairOfSameType<Region> execute(final Server server,
|
|
||||||
final RegionServerServices services)
|
|
||||||
throws IOException {
|
|
||||||
if (User.isHBaseSecurityEnabled(parent.getBaseConf())) {
|
|
||||||
LOG.warn("Should use execute(Server, RegionServerServices, User)");
|
|
||||||
}
|
|
||||||
return execute(server, services, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public PairOfSameType<Region> execute(final Server server, final RegionServerServices services,
|
|
||||||
User user) throws IOException {
|
|
||||||
this.server = server;
|
|
||||||
this.rsServices = services;
|
|
||||||
PairOfSameType<Region> regions = createDaughters(server, services, user);
|
|
||||||
stepsAfterPONR(server, services, regions, user);
|
|
||||||
transition(SplitTransactionPhase.COMPLETED);
|
|
||||||
return regions;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
void stepsAfterPONR(final Server server,
|
|
||||||
final RegionServerServices services, final PairOfSameType<Region> regions, User user)
|
|
||||||
throws IOException {
|
|
||||||
if (this.parent.getCoprocessorHost() != null) {
|
|
||||||
parent.getCoprocessorHost().preSplitAfterPONR(user);
|
|
||||||
}
|
|
||||||
|
|
||||||
openDaughters(server, services, regions.getFirst(), regions.getSecond());
|
|
||||||
|
|
||||||
transition(SplitTransactionPhase.BEFORE_POST_SPLIT_HOOK);
|
|
||||||
|
|
||||||
// Coprocessor callback
|
|
||||||
if (parent.getCoprocessorHost() != null) {
|
|
||||||
this.parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond(), user);
|
|
||||||
}
|
|
||||||
|
|
||||||
transition(SplitTransactionPhase.AFTER_POST_SPLIT_HOOK);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Open daughter region in its own thread.
|
|
||||||
* If we fail, abort this hosting server.
|
|
||||||
*/
|
|
||||||
private class DaughterOpener extends HasThread {
|
|
||||||
private final Server server;
|
|
||||||
private final Region r;
|
|
||||||
private Throwable t = null;
|
|
||||||
|
|
||||||
DaughterOpener(final Server s, final Region r) {
|
|
||||||
super((s == null? "null-services": s.getServerName()) +
|
|
||||||
"-daughterOpener=" + r.getRegionInfo().getEncodedName());
|
|
||||||
setDaemon(true);
|
|
||||||
this.server = s;
|
|
||||||
this.r = r;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return Null if open succeeded else exception that causes us fail open.
|
|
||||||
* Call it after this thread exits else you may get wrong view on result.
|
|
||||||
*/
|
|
||||||
Throwable getException() {
|
|
||||||
return this.t;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
openDaughterRegion(this.server, r);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
this.t = t;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Open daughter regions, add them to online list and update meta.
|
|
||||||
* @param server
|
|
||||||
* @param daughter
|
|
||||||
* @throws IOException
|
|
||||||
* @throws KeeperException
|
|
||||||
*/
|
|
||||||
@VisibleForTesting
|
|
||||||
void openDaughterRegion(final Server server, final Region daughter)
|
|
||||||
throws IOException, KeeperException {
|
|
||||||
HRegionInfo hri = daughter.getRegionInfo();
|
|
||||||
LoggingProgressable reporter = server == null ? null
|
|
||||||
: new LoggingProgressable(hri, server.getConfiguration().getLong(
|
|
||||||
"hbase.regionserver.split.daughter.open.log.interval", 10000));
|
|
||||||
((HRegion)daughter).openHRegion(reporter);
|
|
||||||
}
|
|
||||||
|
|
||||||
static class LoggingProgressable implements CancelableProgressable {
|
|
||||||
private final HRegionInfo hri;
|
|
||||||
private long lastLog = -1;
|
|
||||||
private final long interval;
|
|
||||||
|
|
||||||
LoggingProgressable(final HRegionInfo hri, final long interval) {
|
|
||||||
this.hri = hri;
|
|
||||||
this.interval = interval;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean progress() {
|
|
||||||
long now = EnvironmentEdgeManager.currentTime();
|
|
||||||
if (now - lastLog > this.interval) {
|
|
||||||
LOG.info("Opening " + this.hri.getRegionNameAsString());
|
|
||||||
this.lastLog = now;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates reference files for top and bottom half of the
|
|
||||||
* @param hstoreFilesToSplit map of store files to create half file references for.
|
|
||||||
* @return the number of reference files that were created.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private Pair<Integer, Integer> splitStoreFiles(
|
|
||||||
final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
|
|
||||||
throws IOException {
|
|
||||||
if (hstoreFilesToSplit == null) {
|
|
||||||
// Could be null because close didn't succeed -- for now consider it fatal
|
|
||||||
throw new IOException("Close returned empty list of StoreFiles");
|
|
||||||
}
|
|
||||||
// The following code sets up a thread pool executor with as many slots as
|
|
||||||
// there's files to split. It then fires up everything, waits for
|
|
||||||
// completion and finally checks for any exception
|
|
||||||
int nbFiles = 0;
|
|
||||||
for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
|
|
||||||
nbFiles += entry.getValue().size();
|
|
||||||
}
|
|
||||||
if (nbFiles == 0) {
|
|
||||||
// no file needs to be splitted.
|
|
||||||
return new Pair<Integer, Integer>(0,0);
|
|
||||||
}
|
|
||||||
// Default max #threads to use is the smaller of table's configured number of blocking store
|
|
||||||
// files or the available number of logical cores.
|
|
||||||
int defMaxThreads = Math.min(parent.conf.getInt(HStore.BLOCKING_STOREFILES_KEY,
|
|
||||||
HStore.DEFAULT_BLOCKING_STOREFILE_COUNT),
|
|
||||||
Runtime.getRuntime().availableProcessors());
|
|
||||||
// Max #threads is the smaller of the number of storefiles or the default max determined above.
|
|
||||||
int maxThreads = Math.min(parent.conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
|
|
||||||
defMaxThreads), nbFiles);
|
|
||||||
LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent +
|
|
||||||
" using " + maxThreads + " threads");
|
|
||||||
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
|
|
||||||
builder.setNameFormat("StoreFileSplitter-%1$d");
|
|
||||||
ThreadFactory factory = builder.build();
|
|
||||||
ThreadPoolExecutor threadPool =
|
|
||||||
(ThreadPoolExecutor) Executors.newFixedThreadPool(maxThreads, factory);
|
|
||||||
List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
|
|
||||||
|
|
||||||
// Split each store file.
|
|
||||||
for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
|
|
||||||
for (StoreFile sf: entry.getValue()) {
|
|
||||||
StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
|
|
||||||
futures.add(threadPool.submit(sfs));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Shutdown the pool
|
|
||||||
threadPool.shutdown();
|
|
||||||
|
|
||||||
// Wait for all the tasks to finish
|
|
||||||
try {
|
|
||||||
boolean stillRunning = !threadPool.awaitTermination(
|
|
||||||
this.fileSplitTimeout, TimeUnit.MILLISECONDS);
|
|
||||||
if (stillRunning) {
|
|
||||||
threadPool.shutdownNow();
|
|
||||||
// wait for the thread to shutdown completely.
|
|
||||||
while (!threadPool.isTerminated()) {
|
|
||||||
Thread.sleep(50);
|
|
||||||
}
|
|
||||||
throw new IOException("Took too long to split the" +
|
|
||||||
" files and create the references, aborting split");
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
int created_a = 0;
|
|
||||||
int created_b = 0;
|
|
||||||
// Look for any exception
|
|
||||||
for (Future<Pair<Path, Path>> future : futures) {
|
|
||||||
try {
|
|
||||||
Pair<Path, Path> p = future.get();
|
|
||||||
created_a += p.getFirst() != null ? 1 : 0;
|
|
||||||
created_b += p.getSecond() != null ? 1 : 0;
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Split storefiles for region " + this.parent + " Daughter A: " + created_a
|
|
||||||
+ " storefiles, Daughter B: " + created_b + " storefiles.");
|
|
||||||
}
|
|
||||||
return new Pair<Integer, Integer>(created_a, created_b);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf)
|
|
||||||
throws IOException {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Splitting started for store file: " + sf.getPath() + " for region: " +
|
|
||||||
this.parent);
|
|
||||||
}
|
|
||||||
HRegionFileSystem fs = this.parent.getRegionFileSystem();
|
|
||||||
String familyName = Bytes.toString(family);
|
|
||||||
Path path_a =
|
|
||||||
fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false,
|
|
||||||
this.parent.getSplitPolicy());
|
|
||||||
Path path_b =
|
|
||||||
fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
|
|
||||||
this.parent.getSplitPolicy());
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Splitting complete for store file: " + sf.getPath() + " for region: " +
|
|
||||||
this.parent);
|
|
||||||
}
|
|
||||||
return new Pair<Path,Path>(path_a, path_b);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Utility class used to do the file splitting / reference writing
|
|
||||||
* in parallel instead of sequentially.
|
|
||||||
*/
|
|
||||||
private class StoreFileSplitter implements Callable<Pair<Path,Path>> {
|
|
||||||
private final byte[] family;
|
|
||||||
private final StoreFile sf;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructor that takes what it needs to split
|
|
||||||
* @param family Family that contains the store file
|
|
||||||
* @param sf which file
|
|
||||||
*/
|
|
||||||
public StoreFileSplitter(final byte[] family, final StoreFile sf) {
|
|
||||||
this.sf = sf;
|
|
||||||
this.family = family;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Pair<Path,Path> call() throws IOException {
|
|
||||||
return splitStoreFile(family, sf);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean rollback(final Server server, final RegionServerServices services)
|
|
||||||
throws IOException {
|
|
||||||
if (User.isHBaseSecurityEnabled(parent.getBaseConf())) {
|
|
||||||
LOG.warn("Should use rollback(Server, RegionServerServices, User)");
|
|
||||||
}
|
|
||||||
return rollback(server, services, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean rollback(final Server server, final RegionServerServices services, User user)
|
|
||||||
throws IOException {
|
|
||||||
this.server = server;
|
|
||||||
this.rsServices = services;
|
|
||||||
// Coprocessor callback
|
|
||||||
if (this.parent.getCoprocessorHost() != null) {
|
|
||||||
this.parent.getCoprocessorHost().preRollBackSplit(user);
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean result = true;
|
|
||||||
ListIterator<JournalEntry> iterator =
|
|
||||||
this.journal.listIterator(this.journal.size());
|
|
||||||
// Iterate in reverse.
|
|
||||||
while (iterator.hasPrevious()) {
|
|
||||||
JournalEntry je = iterator.previous();
|
|
||||||
|
|
||||||
transition(je.getPhase(), true);
|
|
||||||
|
|
||||||
switch (je.getPhase()) {
|
|
||||||
|
|
||||||
case SET_SPLITTING:
|
|
||||||
if (services != null
|
|
||||||
&& !services.reportRegionStateTransition(TransitionCode.SPLIT_REVERTED,
|
|
||||||
parent.getRegionInfo(), hri_a, hri_b)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case CREATE_SPLIT_DIR:
|
|
||||||
this.parent.writestate.writesEnabled = true;
|
|
||||||
this.parent.getRegionFileSystem().cleanupSplitsDir();
|
|
||||||
break;
|
|
||||||
|
|
||||||
case CLOSED_PARENT_REGION:
|
|
||||||
try {
|
|
||||||
// So, this returns a seqid but if we just closed and then reopened, we
|
|
||||||
// should be ok. On close, we flushed using sequenceid obtained from
|
|
||||||
// hosting regionserver so no need to propagate the sequenceid returned
|
|
||||||
// out of initialize below up into regionserver as we normally do.
|
|
||||||
// TODO: Verify.
|
|
||||||
this.parent.initialize();
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
|
|
||||||
parent.getRegionInfo().getRegionNameAsString(), e);
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case STARTED_REGION_A_CREATION:
|
|
||||||
this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case STARTED_REGION_B_CREATION:
|
|
||||||
this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case OFFLINED_PARENT:
|
|
||||||
if (services != null) services.addToOnlineRegions(this.parent);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case PONR:
|
|
||||||
// We got to the point-of-no-return so we need to just abort. Return
|
|
||||||
// immediately. Do not clean up created daughter regions. They need
|
|
||||||
// to be in place so we don't delete the parent region mistakenly.
|
|
||||||
// See HBASE-3872.
|
|
||||||
return false;
|
|
||||||
|
|
||||||
// Informational only cases
|
|
||||||
case STARTED:
|
|
||||||
case PREPARED:
|
|
||||||
case BEFORE_PRE_SPLIT_HOOK:
|
|
||||||
case AFTER_PRE_SPLIT_HOOK:
|
|
||||||
case BEFORE_POST_SPLIT_HOOK:
|
|
||||||
case AFTER_POST_SPLIT_HOOK:
|
|
||||||
case OPENED_REGION_A:
|
|
||||||
case OPENED_REGION_B:
|
|
||||||
case COMPLETED:
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
throw new RuntimeException("Unhandled journal entry: " + je);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Coprocessor callback
|
|
||||||
if (this.parent.getCoprocessorHost() != null) {
|
|
||||||
this.parent.getCoprocessorHost().postRollBackSplit(user);
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* package */ HRegionInfo getFirstDaughter() {
|
|
||||||
return hri_a;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* package */ HRegionInfo getSecondDaughter() {
|
|
||||||
return hri_b;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<JournalEntry> getJournal() {
|
|
||||||
return journal;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SplitTransaction registerTransactionListener(TransactionListener listener) {
|
|
||||||
listeners.add(listener);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Server getServer() {
|
|
||||||
return server;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RegionServerServices getRegionServerServices() {
|
|
||||||
return rsServices;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -27,8 +27,6 @@ import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -48,7 +46,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.Server;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
@ -59,18 +56,14 @@ import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||||
import org.apache.hadoop.hbase.regionserver.ScannerContext;
|
import org.apache.hadoop.hbase.regionserver.ScannerContext;
|
||||||
import org.apache.hadoop.hbase.regionserver.SplitTransaction;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.SplitTransactionFactory;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.Store;
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||||
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
|
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
import org.mockito.Mockito;
|
|
||||||
|
|
||||||
@Category({CoprocessorTests.class, SmallTests.class})
|
@Category({CoprocessorTests.class, SmallTests.class})
|
||||||
public class TestCoprocessorInterface {
|
public class TestCoprocessorInterface {
|
||||||
|
@ -163,7 +156,6 @@ public class TestCoprocessorInterface {
|
||||||
private boolean postCompactCalled;
|
private boolean postCompactCalled;
|
||||||
private boolean preFlushCalled;
|
private boolean preFlushCalled;
|
||||||
private boolean postFlushCalled;
|
private boolean postFlushCalled;
|
||||||
private boolean postSplitCalled;
|
|
||||||
private ConcurrentMap<String, Object> sharedData;
|
private ConcurrentMap<String, Object> sharedData;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -215,10 +207,6 @@ public class TestCoprocessorInterface {
|
||||||
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) {
|
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) {
|
||||||
postFlushCalled = true;
|
postFlushCalled = true;
|
||||||
}
|
}
|
||||||
@Override
|
|
||||||
public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, Region l, Region r) {
|
|
||||||
postSplitCalled = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
|
public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
|
@ -244,9 +232,6 @@ public class TestCoprocessorInterface {
|
||||||
boolean wasCompacted() {
|
boolean wasCompacted() {
|
||||||
return (preCompactCalled && postCompactCalled);
|
return (preCompactCalled && postCompactCalled);
|
||||||
}
|
}
|
||||||
boolean wasSplit() {
|
|
||||||
return postSplitCalled;
|
|
||||||
}
|
|
||||||
Map<String, Object> getSharedData() {
|
Map<String, Object> getSharedData() {
|
||||||
return sharedData;
|
return sharedData;
|
||||||
}
|
}
|
||||||
|
@ -281,7 +266,7 @@ public class TestCoprocessorInterface {
|
||||||
TableName tableName = TableName.valueOf(name.getMethodName());
|
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||||
byte [][] families = { fam1, fam2, fam3 };
|
byte [][] families = { fam1, fam2, fam3 };
|
||||||
|
|
||||||
Configuration hc = initSplit();
|
Configuration hc = initConfig();
|
||||||
Region region = initHRegion(tableName, name.getMethodName(), hc,
|
Region region = initHRegion(tableName, name.getMethodName(), hc,
|
||||||
new Class<?>[]{}, families);
|
new Class<?>[]{}, families);
|
||||||
|
|
||||||
|
@ -292,15 +277,11 @@ public class TestCoprocessorInterface {
|
||||||
|
|
||||||
region.compact(false);
|
region.compact(false);
|
||||||
|
|
||||||
byte [] splitRow = ((HRegion)region).checkSplit();
|
region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class);
|
||||||
assertNotNull(splitRow);
|
|
||||||
Region [] regions = split(region, splitRow);
|
Coprocessor c = region.getCoprocessorHost().
|
||||||
for (int i = 0; i < regions.length; i++) {
|
|
||||||
regions[i] = reopenRegion(regions[i], CoprocessorImpl.class, CoprocessorII.class);
|
|
||||||
}
|
|
||||||
Coprocessor c = regions[0].getCoprocessorHost().
|
|
||||||
findCoprocessor(CoprocessorImpl.class.getName());
|
findCoprocessor(CoprocessorImpl.class.getName());
|
||||||
Coprocessor c2 = regions[0].getCoprocessorHost().
|
Coprocessor c2 = region.getCoprocessorHost().
|
||||||
findCoprocessor(CoprocessorII.class.getName());
|
findCoprocessor(CoprocessorII.class.getName());
|
||||||
Object o = ((CoprocessorImpl)c).getSharedData().get("test1");
|
Object o = ((CoprocessorImpl)c).getSharedData().get("test1");
|
||||||
Object o2 = ((CoprocessorII)c2).getSharedData().get("test2");
|
Object o2 = ((CoprocessorII)c2).getSharedData().get("test2");
|
||||||
|
@ -308,39 +289,33 @@ public class TestCoprocessorInterface {
|
||||||
assertNotNull(o2);
|
assertNotNull(o2);
|
||||||
// to coprocessors get different sharedDatas
|
// to coprocessors get different sharedDatas
|
||||||
assertFalse(((CoprocessorImpl)c).getSharedData() == ((CoprocessorII)c2).getSharedData());
|
assertFalse(((CoprocessorImpl)c).getSharedData() == ((CoprocessorII)c2).getSharedData());
|
||||||
for (int i = 1; i < regions.length; i++) {
|
c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class.getName());
|
||||||
c = regions[i].getCoprocessorHost().
|
c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class.getName());
|
||||||
findCoprocessor(CoprocessorImpl.class.getName());
|
// make sure that all coprocessor of a class have identical sharedDatas
|
||||||
c2 = regions[i].getCoprocessorHost().
|
assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
|
||||||
findCoprocessor(CoprocessorII.class.getName());
|
assertTrue(((CoprocessorII)c2).getSharedData().get("test2") == o2);
|
||||||
// make sure that all coprocessor of a class have identical sharedDatas
|
|
||||||
assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
|
|
||||||
assertTrue(((CoprocessorII)c2).getSharedData().get("test2") == o2);
|
|
||||||
}
|
|
||||||
// now have all Environments fail
|
// now have all Environments fail
|
||||||
for (int i = 0; i < regions.length; i++) {
|
try {
|
||||||
try {
|
byte [] r = region.getRegionInfo().getStartKey();
|
||||||
byte [] r = regions[i].getRegionInfo().getStartKey();
|
if (r == null || r.length <= 0) {
|
||||||
if (r == null || r.length <= 0) {
|
// Its the start row. Can't ask for null. Ask for minimal key instead.
|
||||||
// Its the start row. Can't ask for null. Ask for minimal key instead.
|
r = new byte [] {0};
|
||||||
r = new byte [] {0};
|
|
||||||
}
|
|
||||||
Get g = new Get(r);
|
|
||||||
regions[i].get(g);
|
|
||||||
fail();
|
|
||||||
} catch (org.apache.hadoop.hbase.DoNotRetryIOException xc) {
|
|
||||||
}
|
}
|
||||||
assertNull(regions[i].getCoprocessorHost().
|
Get g = new Get(r);
|
||||||
findCoprocessor(CoprocessorII.class.getName()));
|
region.get(g);
|
||||||
|
fail();
|
||||||
|
} catch (org.apache.hadoop.hbase.DoNotRetryIOException xc) {
|
||||||
}
|
}
|
||||||
c = regions[0].getCoprocessorHost().
|
assertNull(region.getCoprocessorHost().findCoprocessor(CoprocessorII.class.getName()));
|
||||||
|
c = region.getCoprocessorHost().
|
||||||
findCoprocessor(CoprocessorImpl.class.getName());
|
findCoprocessor(CoprocessorImpl.class.getName());
|
||||||
assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
|
assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
|
||||||
c = c2 = null;
|
c = c2 = null;
|
||||||
// perform a GC
|
// perform a GC
|
||||||
System.gc();
|
System.gc();
|
||||||
// reopen the region
|
// reopen the region
|
||||||
region = reopenRegion(regions[0], CoprocessorImpl.class, CoprocessorII.class);
|
region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class);
|
||||||
c = region.getCoprocessorHost().
|
c = region.getCoprocessorHost().
|
||||||
findCoprocessor(CoprocessorImpl.class.getName());
|
findCoprocessor(CoprocessorImpl.class.getName());
|
||||||
// CPimpl is unaffected, still the same reference
|
// CPimpl is unaffected, still the same reference
|
||||||
|
@ -359,7 +334,7 @@ public class TestCoprocessorInterface {
|
||||||
TableName tableName = TableName.valueOf(name.getMethodName());
|
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||||
byte [][] families = { fam1, fam2, fam3 };
|
byte [][] families = { fam1, fam2, fam3 };
|
||||||
|
|
||||||
Configuration hc = initSplit();
|
Configuration hc = initConfig();
|
||||||
Region region = initHRegion(tableName, name.getMethodName(), hc,
|
Region region = initHRegion(tableName, name.getMethodName(), hc,
|
||||||
new Class<?>[]{CoprocessorImpl.class}, families);
|
new Class<?>[]{CoprocessorImpl.class}, families);
|
||||||
for (int i = 0; i < 3; i++) {
|
for (int i = 0; i < 3; i++) {
|
||||||
|
@ -369,42 +344,23 @@ public class TestCoprocessorInterface {
|
||||||
|
|
||||||
region.compact(false);
|
region.compact(false);
|
||||||
|
|
||||||
byte [] splitRow = ((HRegion)region).checkSplit();
|
|
||||||
|
|
||||||
assertNotNull(splitRow);
|
|
||||||
Region [] regions = split(region, splitRow);
|
|
||||||
for (int i = 0; i < regions.length; i++) {
|
|
||||||
regions[i] = reopenRegion(regions[i], CoprocessorImpl.class);
|
|
||||||
}
|
|
||||||
HBaseTestingUtility.closeRegionAndWAL(region);
|
|
||||||
Coprocessor c = region.getCoprocessorHost().
|
|
||||||
findCoprocessor(CoprocessorImpl.class.getName());
|
|
||||||
|
|
||||||
// HBASE-4197
|
// HBASE-4197
|
||||||
Scan s = new Scan();
|
Scan s = new Scan();
|
||||||
RegionScanner scanner = regions[0].getCoprocessorHost().postScannerOpen(s, regions[0].getScanner(s));
|
RegionScanner scanner = region.getCoprocessorHost().postScannerOpen(s, region.getScanner(s));
|
||||||
assertTrue(scanner instanceof CustomScanner);
|
assertTrue(scanner instanceof CustomScanner);
|
||||||
// this would throw an exception before HBASE-4197
|
// this would throw an exception before HBASE-4197
|
||||||
scanner.next(new ArrayList<Cell>());
|
scanner.next(new ArrayList<Cell>());
|
||||||
|
|
||||||
|
HBaseTestingUtility.closeRegionAndWAL(region);
|
||||||
|
Coprocessor c = region.getCoprocessorHost().
|
||||||
|
findCoprocessor(CoprocessorImpl.class.getName());
|
||||||
|
|
||||||
assertTrue("Coprocessor not started", ((CoprocessorImpl)c).wasStarted());
|
assertTrue("Coprocessor not started", ((CoprocessorImpl)c).wasStarted());
|
||||||
assertTrue("Coprocessor not stopped", ((CoprocessorImpl)c).wasStopped());
|
assertTrue("Coprocessor not stopped", ((CoprocessorImpl)c).wasStopped());
|
||||||
assertTrue(((CoprocessorImpl)c).wasOpened());
|
assertTrue(((CoprocessorImpl)c).wasOpened());
|
||||||
assertTrue(((CoprocessorImpl)c).wasClosed());
|
assertTrue(((CoprocessorImpl)c).wasClosed());
|
||||||
assertTrue(((CoprocessorImpl)c).wasFlushed());
|
assertTrue(((CoprocessorImpl)c).wasFlushed());
|
||||||
assertTrue(((CoprocessorImpl)c).wasCompacted());
|
assertTrue(((CoprocessorImpl)c).wasCompacted());
|
||||||
assertTrue(((CoprocessorImpl)c).wasSplit());
|
|
||||||
|
|
||||||
for (int i = 0; i < regions.length; i++) {
|
|
||||||
HBaseTestingUtility.closeRegionAndWAL(regions[i]);
|
|
||||||
c = region.getCoprocessorHost()
|
|
||||||
.findCoprocessor(CoprocessorImpl.class.getName());
|
|
||||||
assertTrue("Coprocessor not started", ((CoprocessorImpl)c).wasStarted());
|
|
||||||
assertTrue("Coprocessor not stopped", ((CoprocessorImpl)c).wasStopped());
|
|
||||||
assertTrue(((CoprocessorImpl)c).wasOpened());
|
|
||||||
assertTrue(((CoprocessorImpl)c).wasClosed());
|
|
||||||
assertTrue(((CoprocessorImpl)c).wasCompacted());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Region reopenRegion(final Region closedRegion, Class<?> ... implClasses)
|
Region reopenRegion(final Region closedRegion, Class<?> ... implClasses)
|
||||||
|
@ -461,7 +417,7 @@ public class TestCoprocessorInterface {
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
Configuration initSplit() {
|
private Configuration initConfig() {
|
||||||
// Always compact if there is more than one store file.
|
// Always compact if there is more than one store file.
|
||||||
TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 2);
|
TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 2);
|
||||||
// Make lease timeout longer, lease checks less frequent
|
// Make lease timeout longer, lease checks less frequent
|
||||||
|
@ -480,37 +436,4 @@ public class TestCoprocessorInterface {
|
||||||
|
|
||||||
return TEST_UTIL.getConfiguration();
|
return TEST_UTIL.getConfiguration();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Region [] split(final Region r, final byte [] splitRow) throws IOException {
|
|
||||||
Region[] regions = new Region[2];
|
|
||||||
|
|
||||||
SplitTransaction st = new SplitTransactionFactory(TEST_UTIL.getConfiguration())
|
|
||||||
.create(r, splitRow);
|
|
||||||
int i = 0;
|
|
||||||
|
|
||||||
if (!st.prepare()) {
|
|
||||||
// test fails.
|
|
||||||
assertTrue(false);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
Server mockServer = Mockito.mock(Server.class);
|
|
||||||
when(mockServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
|
|
||||||
PairOfSameType<Region> daughters = st.execute(mockServer, null);
|
|
||||||
for (Region each_daughter: daughters) {
|
|
||||||
regions[i] = each_daughter;
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
LOG.info("Split transaction of " + r.getRegionInfo().getRegionNameAsString() +
|
|
||||||
" failed:" + ioe.getMessage());
|
|
||||||
assertTrue(false);
|
|
||||||
} catch (RuntimeException e) {
|
|
||||||
LOG.info("Failed rollback of failed split of " +
|
|
||||||
r.getRegionInfo().getRegionNameAsString() + e.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
assertTrue(i == 2);
|
|
||||||
return regions;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
|
@ -19,9 +19,6 @@
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
|
import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
|
||||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR;
|
|
||||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR;
|
|
||||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
|
|
||||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
|
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
|
||||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
|
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
|
||||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
|
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
|
||||||
|
@ -43,7 +40,6 @@ import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
@ -85,7 +81,6 @@ import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||||
import org.apache.hadoop.hbase.DroppedSnapshotException;
|
import org.apache.hadoop.hbase.DroppedSnapshotException;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HBaseTestCase;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
@ -159,7 +154,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
||||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
import org.apache.hadoop.hbase.wal.FaultyFSLog;
|
import org.apache.hadoop.hbase.wal.FaultyFSLog;
|
||||||
|
@ -2646,45 +2640,6 @@ public class TestHRegion {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @param parent
|
|
||||||
* Region to split.
|
|
||||||
* @param midkey
|
|
||||||
* Key to split around.
|
|
||||||
* @return The Regions we created.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
HRegion[] splitRegion(final HRegion parent, final byte[] midkey) throws IOException {
|
|
||||||
PairOfSameType<Region> result = null;
|
|
||||||
SplitTransactionImpl st = new SplitTransactionImpl(parent, midkey);
|
|
||||||
// If prepare does not return true, for some reason -- logged inside in
|
|
||||||
// the prepare call -- we are not ready to split just now. Just return.
|
|
||||||
if (!st.prepare()) {
|
|
||||||
parent.clearSplit();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
result = st.execute(null, null);
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
try {
|
|
||||||
LOG.info("Running rollback of failed split of " +
|
|
||||||
parent.getRegionInfo().getRegionNameAsString() + "; " + ioe.getMessage());
|
|
||||||
st.rollback(null, null);
|
|
||||||
LOG.info("Successful rollback of failed split of " +
|
|
||||||
parent.getRegionInfo().getRegionNameAsString());
|
|
||||||
return null;
|
|
||||||
} catch (RuntimeException e) {
|
|
||||||
// If failed rollback, kill this server to avoid having a hole in table.
|
|
||||||
LOG.info("Failed rollback of failed split of " +
|
|
||||||
parent.getRegionInfo().getRegionNameAsString() + " -- aborting server", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
parent.clearSplit();
|
|
||||||
}
|
|
||||||
return new HRegion[] { (HRegion)result.getFirst(), (HRegion)result.getSecond() };
|
|
||||||
}
|
|
||||||
|
|
||||||
// ////////////////////////////////////////////////////////////////////////////
|
// ////////////////////////////////////////////////////////////////////////////
|
||||||
// Scanner tests
|
// Scanner tests
|
||||||
// ////////////////////////////////////////////////////////////////////////////
|
// ////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -3517,204 +3472,6 @@ public class TestHRegion {
|
||||||
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
||||||
this.region = null;
|
this.region = null;
|
||||||
}
|
}
|
||||||
// ////////////////////////////////////////////////////////////////////////////
|
|
||||||
// Split test
|
|
||||||
// ////////////////////////////////////////////////////////////////////////////
|
|
||||||
/**
|
|
||||||
* Splits twice and verifies getting from each of the split regions.
|
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testBasicSplit() throws Exception {
|
|
||||||
byte[][] families = { fam1, fam2, fam3 };
|
|
||||||
|
|
||||||
Configuration hc = initSplit();
|
|
||||||
// Setting up region
|
|
||||||
this.region = initHRegion(tableName, method, hc, families);
|
|
||||||
|
|
||||||
try {
|
|
||||||
LOG.info("" + HBaseTestCase.addContent(region, fam3));
|
|
||||||
region.flush(true);
|
|
||||||
region.compactStores();
|
|
||||||
byte[] splitRow = region.checkSplit();
|
|
||||||
assertNotNull(splitRow);
|
|
||||||
LOG.info("SplitRow: " + Bytes.toString(splitRow));
|
|
||||||
HRegion[] regions = splitRegion(region, splitRow);
|
|
||||||
try {
|
|
||||||
// Need to open the regions.
|
|
||||||
// TODO: Add an 'open' to HRegion... don't do open by constructing
|
|
||||||
// instance.
|
|
||||||
for (int i = 0; i < regions.length; i++) {
|
|
||||||
regions[i] = HRegion.openHRegion(regions[i], null);
|
|
||||||
}
|
|
||||||
// Assert can get rows out of new regions. Should be able to get first
|
|
||||||
// row from first region and the midkey from second region.
|
|
||||||
assertGet(regions[0], fam3, Bytes.toBytes(START_KEY));
|
|
||||||
assertGet(regions[1], fam3, splitRow);
|
|
||||||
// Test I can get scanner and that it starts at right place.
|
|
||||||
assertScan(regions[0], fam3, Bytes.toBytes(START_KEY));
|
|
||||||
assertScan(regions[1], fam3, splitRow);
|
|
||||||
// Now prove can't split regions that have references.
|
|
||||||
for (int i = 0; i < regions.length; i++) {
|
|
||||||
// Add so much data to this region, we create a store file that is >
|
|
||||||
// than one of our unsplitable references. it will.
|
|
||||||
for (int j = 0; j < 2; j++) {
|
|
||||||
HBaseTestCase.addContent(regions[i], fam3);
|
|
||||||
}
|
|
||||||
HBaseTestCase.addContent(regions[i], fam2);
|
|
||||||
HBaseTestCase.addContent(regions[i], fam1);
|
|
||||||
regions[i].flush(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
byte[][] midkeys = new byte[regions.length][];
|
|
||||||
// To make regions splitable force compaction.
|
|
||||||
for (int i = 0; i < regions.length; i++) {
|
|
||||||
regions[i].compactStores();
|
|
||||||
midkeys[i] = regions[i].checkSplit();
|
|
||||||
}
|
|
||||||
|
|
||||||
TreeMap<String, HRegion> sortedMap = new TreeMap<String, HRegion>();
|
|
||||||
// Split these two daughter regions so then I'll have 4 regions. Will
|
|
||||||
// split because added data above.
|
|
||||||
for (int i = 0; i < regions.length; i++) {
|
|
||||||
HRegion[] rs = null;
|
|
||||||
if (midkeys[i] != null) {
|
|
||||||
rs = splitRegion(regions[i], midkeys[i]);
|
|
||||||
for (int j = 0; j < rs.length; j++) {
|
|
||||||
sortedMap.put(Bytes.toString(rs[j].getRegionInfo().getRegionName()),
|
|
||||||
HRegion.openHRegion(rs[j], null));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LOG.info("Made 4 regions");
|
|
||||||
// The splits should have been even. Test I can get some arbitrary row
|
|
||||||
// out of each.
|
|
||||||
int interval = (LAST_CHAR - FIRST_CHAR) / 3;
|
|
||||||
byte[] b = Bytes.toBytes(START_KEY);
|
|
||||||
for (HRegion r : sortedMap.values()) {
|
|
||||||
assertGet(r, fam3, b);
|
|
||||||
b[0] += interval;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
for (int i = 0; i < regions.length; i++) {
|
|
||||||
try {
|
|
||||||
regions[i].close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
// Ignore.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
|
||||||
this.region = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSplitRegion() throws IOException {
|
|
||||||
byte[] qualifier = Bytes.toBytes("qualifier");
|
|
||||||
Configuration hc = initSplit();
|
|
||||||
int numRows = 10;
|
|
||||||
byte[][] families = { fam1, fam3 };
|
|
||||||
|
|
||||||
// Setting up region
|
|
||||||
this.region = initHRegion(tableName, method, hc, families);
|
|
||||||
|
|
||||||
// Put data in region
|
|
||||||
int startRow = 100;
|
|
||||||
putData(startRow, numRows, qualifier, families);
|
|
||||||
int splitRow = startRow + numRows;
|
|
||||||
putData(splitRow, numRows, qualifier, families);
|
|
||||||
region.flush(true);
|
|
||||||
|
|
||||||
HRegion[] regions = null;
|
|
||||||
try {
|
|
||||||
regions = splitRegion(region, Bytes.toBytes("" + splitRow));
|
|
||||||
// Opening the regions returned.
|
|
||||||
for (int i = 0; i < regions.length; i++) {
|
|
||||||
regions[i] = HRegion.openHRegion(regions[i], null);
|
|
||||||
}
|
|
||||||
// Verifying that the region has been split
|
|
||||||
assertEquals(2, regions.length);
|
|
||||||
|
|
||||||
// Verifying that all data is still there and that data is in the right
|
|
||||||
// place
|
|
||||||
verifyData(regions[0], startRow, numRows, qualifier, families);
|
|
||||||
verifyData(regions[1], splitRow, numRows, qualifier, families);
|
|
||||||
|
|
||||||
} finally {
|
|
||||||
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
|
||||||
this.region = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testClearForceSplit() throws IOException {
|
|
||||||
byte[] qualifier = Bytes.toBytes("qualifier");
|
|
||||||
Configuration hc = initSplit();
|
|
||||||
int numRows = 10;
|
|
||||||
byte[][] families = { fam1, fam3 };
|
|
||||||
|
|
||||||
// Setting up region
|
|
||||||
this.region = initHRegion(tableName, method, hc, families);
|
|
||||||
|
|
||||||
// Put data in region
|
|
||||||
int startRow = 100;
|
|
||||||
putData(startRow, numRows, qualifier, families);
|
|
||||||
int splitRow = startRow + numRows;
|
|
||||||
byte[] splitRowBytes = Bytes.toBytes("" + splitRow);
|
|
||||||
putData(splitRow, numRows, qualifier, families);
|
|
||||||
region.flush(true);
|
|
||||||
|
|
||||||
HRegion[] regions = null;
|
|
||||||
try {
|
|
||||||
// Set force split
|
|
||||||
region.forceSplit(splitRowBytes);
|
|
||||||
assertTrue(region.shouldForceSplit());
|
|
||||||
// Split point should be the force split row
|
|
||||||
assertTrue(Bytes.equals(splitRowBytes, region.checkSplit()));
|
|
||||||
|
|
||||||
// Add a store that has references.
|
|
||||||
HStore storeMock = Mockito.mock(HStore.class);
|
|
||||||
when(storeMock.hasReferences()).thenReturn(true);
|
|
||||||
when(storeMock.getFamily()).thenReturn(new HColumnDescriptor("cf"));
|
|
||||||
when(storeMock.close()).thenReturn(ImmutableList.<StoreFile>of());
|
|
||||||
when(storeMock.getColumnFamilyName()).thenReturn("cf");
|
|
||||||
region.stores.put(Bytes.toBytes(storeMock.getColumnFamilyName()), storeMock);
|
|
||||||
assertTrue(region.hasReferences());
|
|
||||||
|
|
||||||
// Will not split since the store has references.
|
|
||||||
regions = splitRegion(region, splitRowBytes);
|
|
||||||
assertNull(regions);
|
|
||||||
|
|
||||||
// Region force split should be cleared after the split try.
|
|
||||||
assertFalse(region.shouldForceSplit());
|
|
||||||
|
|
||||||
// Remove the store that has references.
|
|
||||||
region.stores.remove(Bytes.toBytes(storeMock.getColumnFamilyName()));
|
|
||||||
assertFalse(region.hasReferences());
|
|
||||||
|
|
||||||
// Now we can split.
|
|
||||||
regions = splitRegion(region, splitRowBytes);
|
|
||||||
|
|
||||||
// Opening the regions returned.
|
|
||||||
for (int i = 0; i < regions.length; i++) {
|
|
||||||
regions[i] = HRegion.openHRegion(regions[i], null);
|
|
||||||
}
|
|
||||||
// Verifying that the region has been split
|
|
||||||
assertEquals(2, regions.length);
|
|
||||||
|
|
||||||
// Verifying that all data is still there and that data is in the right
|
|
||||||
// place
|
|
||||||
verifyData(regions[0], startRow, numRows, qualifier, families);
|
|
||||||
verifyData(regions[1], splitRow, numRows, qualifier, families);
|
|
||||||
|
|
||||||
} finally {
|
|
||||||
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
|
||||||
this.region = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Flushes the cache in a thread while scanning. The tests verify that the
|
* Flushes the cache in a thread while scanning. The tests verify that the
|
||||||
|
@ -5906,103 +5663,6 @@ public class TestHRegion {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSplitRegionWithReverseScan() throws IOException {
|
|
||||||
TableName tableName = TableName.valueOf("testSplitRegionWithReverseScan");
|
|
||||||
byte [] qualifier = Bytes.toBytes("qualifier");
|
|
||||||
Configuration hc = initSplit();
|
|
||||||
int numRows = 3;
|
|
||||||
byte [][] families = {fam1};
|
|
||||||
|
|
||||||
//Setting up region
|
|
||||||
this.region = initHRegion(tableName, method, hc, families);
|
|
||||||
|
|
||||||
//Put data in region
|
|
||||||
int startRow = 100;
|
|
||||||
putData(startRow, numRows, qualifier, families);
|
|
||||||
int splitRow = startRow + numRows;
|
|
||||||
putData(splitRow, numRows, qualifier, families);
|
|
||||||
region.flush(true);
|
|
||||||
|
|
||||||
HRegion [] regions = null;
|
|
||||||
try {
|
|
||||||
regions = splitRegion(region, Bytes.toBytes("" + splitRow));
|
|
||||||
//Opening the regions returned.
|
|
||||||
for (int i = 0; i < regions.length; i++) {
|
|
||||||
regions[i] = HRegion.openHRegion(regions[i], null);
|
|
||||||
}
|
|
||||||
//Verifying that the region has been split
|
|
||||||
assertEquals(2, regions.length);
|
|
||||||
|
|
||||||
//Verifying that all data is still there and that data is in the right
|
|
||||||
//place
|
|
||||||
verifyData(regions[0], startRow, numRows, qualifier, families);
|
|
||||||
verifyData(regions[1], splitRow, numRows, qualifier, families);
|
|
||||||
|
|
||||||
//fire the reverse scan1: top range, and larger than the last row
|
|
||||||
Scan scan = new Scan(Bytes.toBytes(String.valueOf(startRow + 10 * numRows)));
|
|
||||||
scan.setReversed(true);
|
|
||||||
InternalScanner scanner = regions[1].getScanner(scan);
|
|
||||||
List<Cell> currRow = new ArrayList<Cell>();
|
|
||||||
boolean more = false;
|
|
||||||
int verify = startRow + 2 * numRows - 1;
|
|
||||||
do {
|
|
||||||
more = scanner.next(currRow);
|
|
||||||
assertEquals(Bytes.toString(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(),
|
|
||||||
currRow.get(0).getRowLength()), verify + "");
|
|
||||||
verify--;
|
|
||||||
currRow.clear();
|
|
||||||
} while(more);
|
|
||||||
assertEquals(verify, startRow + numRows - 1);
|
|
||||||
scanner.close();
|
|
||||||
//fire the reverse scan2: top range, and equals to the last row
|
|
||||||
scan = new Scan(Bytes.toBytes(String.valueOf(startRow + 2 * numRows - 1)));
|
|
||||||
scan.setReversed(true);
|
|
||||||
scanner = regions[1].getScanner(scan);
|
|
||||||
verify = startRow + 2 * numRows - 1;
|
|
||||||
do {
|
|
||||||
more = scanner.next(currRow);
|
|
||||||
assertEquals(Bytes.toString(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(),
|
|
||||||
currRow.get(0).getRowLength()), verify + "");
|
|
||||||
verify--;
|
|
||||||
currRow.clear();
|
|
||||||
} while(more);
|
|
||||||
assertEquals(verify, startRow + numRows - 1);
|
|
||||||
scanner.close();
|
|
||||||
//fire the reverse scan3: bottom range, and larger than the last row
|
|
||||||
scan = new Scan(Bytes.toBytes(String.valueOf(startRow + numRows)));
|
|
||||||
scan.setReversed(true);
|
|
||||||
scanner = regions[0].getScanner(scan);
|
|
||||||
verify = startRow + numRows - 1;
|
|
||||||
do {
|
|
||||||
more = scanner.next(currRow);
|
|
||||||
assertEquals(Bytes.toString(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(),
|
|
||||||
currRow.get(0).getRowLength()), verify + "");
|
|
||||||
verify--;
|
|
||||||
currRow.clear();
|
|
||||||
} while(more);
|
|
||||||
assertEquals(verify, 99);
|
|
||||||
scanner.close();
|
|
||||||
//fire the reverse scan4: bottom range, and equals to the last row
|
|
||||||
scan = new Scan(Bytes.toBytes(String.valueOf(startRow + numRows - 1)));
|
|
||||||
scan.setReversed(true);
|
|
||||||
scanner = regions[0].getScanner(scan);
|
|
||||||
verify = startRow + numRows - 1;
|
|
||||||
do {
|
|
||||||
more = scanner.next(currRow);
|
|
||||||
assertEquals(Bytes.toString(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(),
|
|
||||||
currRow.get(0).getRowLength()), verify + "");
|
|
||||||
verify--;
|
|
||||||
currRow.clear();
|
|
||||||
} while(more);
|
|
||||||
assertEquals(verify, startRow - 1);
|
|
||||||
scanner.close();
|
|
||||||
} finally {
|
|
||||||
this.region.close();
|
|
||||||
this.region = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWriteRequestsCounter() throws IOException {
|
public void testWriteRequestsCounter() throws IOException {
|
||||||
byte[] fam = Bytes.toBytes("info");
|
byte[] fam = Bytes.toBytes("info");
|
||||||
|
|
|
@ -19,30 +19,19 @@
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.TreeMap;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
||||||
import org.apache.hadoop.hbase.HBaseTestCase;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Durability;
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestRule;
|
import org.junit.rules.TestRule;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
|
|
||||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
|
|
||||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
|
|
||||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A test similar to TestHRegion, but with in-memory flush families.
|
* A test similar to TestHRegion, but with in-memory flush families.
|
||||||
* Also checks wal truncation after in-memory compaction.
|
* Also checks wal truncation after in-memory compaction.
|
||||||
|
@ -71,88 +60,5 @@ public class TestHRegionWithInMemoryFlush extends TestHRegion{
|
||||||
return TEST_UTIL.createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey,
|
return TEST_UTIL.createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey,
|
||||||
isReadOnly, durability, wal, inMemory, families);
|
isReadOnly, durability, wal, inMemory, families);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Splits twice and verifies getting from each of the split regions.
|
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void testBasicSplit() throws Exception {
|
|
||||||
byte[][] families = { fam1, fam2, fam3 };
|
|
||||||
|
|
||||||
Configuration hc = initSplit();
|
|
||||||
// Setting up region
|
|
||||||
this.region = initHRegion(tableName, method, hc, families);
|
|
||||||
|
|
||||||
try {
|
|
||||||
LOG.info("" + HBaseTestCase.addContent(region, fam3));
|
|
||||||
region.flush(true);
|
|
||||||
region.compactStores();
|
|
||||||
byte[] splitRow = region.checkSplit();
|
|
||||||
assertNotNull(splitRow);
|
|
||||||
LOG.info("SplitRow: " + Bytes.toString(splitRow));
|
|
||||||
HRegion[] regions = splitRegion(region, splitRow);
|
|
||||||
try {
|
|
||||||
// Need to open the regions.
|
|
||||||
// TODO: Add an 'open' to HRegion... don't do open by constructing
|
|
||||||
// instance.
|
|
||||||
for (int i = 0; i < regions.length; i++) {
|
|
||||||
regions[i] = HRegion.openHRegion(regions[i], null);
|
|
||||||
}
|
|
||||||
// Assert can get rows out of new regions. Should be able to get first
|
|
||||||
// row from first region and the midkey from second region.
|
|
||||||
assertGet(regions[0], fam3, Bytes.toBytes(START_KEY));
|
|
||||||
assertGet(regions[1], fam3, splitRow);
|
|
||||||
// Test I can get scanner and that it starts at right place.
|
|
||||||
assertScan(regions[0], fam3, Bytes.toBytes(START_KEY));
|
|
||||||
assertScan(regions[1], fam3, splitRow);
|
|
||||||
// Now prove can't split regions that have references.
|
|
||||||
for (int i = 0; i < regions.length; i++) {
|
|
||||||
// Add so much data to this region, we create a store file that is >
|
|
||||||
// than one of our unsplitable references. it will.
|
|
||||||
for (int j = 0; j < 2; j++) {
|
|
||||||
HBaseTestCase.addContent(regions[i], fam3);
|
|
||||||
}
|
|
||||||
HBaseTestCase.addContent(regions[i], fam2);
|
|
||||||
HBaseTestCase.addContent(regions[i], fam1);
|
|
||||||
regions[i].flush(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
byte[][] midkeys = new byte[regions.length][];
|
|
||||||
// To make regions splitable force compaction.
|
|
||||||
for (int i = 0; i < regions.length; i++) {
|
|
||||||
regions[i].compactStores();
|
|
||||||
midkeys[i] = regions[i].checkSplit();
|
|
||||||
}
|
|
||||||
|
|
||||||
TreeMap<String, HRegion> sortedMap = new TreeMap<String, HRegion>();
|
|
||||||
// Split these two daughter regions so then I'll have 4 regions. Will
|
|
||||||
// split because added data above.
|
|
||||||
for (int i = 0; i < regions.length; i++) {
|
|
||||||
HRegion[] rs = null;
|
|
||||||
if (midkeys[i] != null) {
|
|
||||||
rs = splitRegion(regions[i], midkeys[i]);
|
|
||||||
for (int j = 0; j < rs.length; j++) {
|
|
||||||
sortedMap.put(Bytes.toString(rs[j].getRegionInfo().getRegionName()),
|
|
||||||
HRegion.openHRegion(rs[j], null));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LOG.info("Made 4 regions");
|
|
||||||
} finally {
|
|
||||||
for (int i = 0; i < regions.length; i++) {
|
|
||||||
try {
|
|
||||||
regions[i].close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
// Ignore.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
|
||||||
this.region = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,402 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.mockito.Matchers.anyInt;
|
|
||||||
import static org.mockito.Matchers.eq;
|
|
||||||
import static org.mockito.Mockito.*;
|
|
||||||
|
|
||||||
import org.mockito.Mockito;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.Server;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
|
||||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
|
||||||
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
|
||||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
|
||||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
|
||||||
import org.apache.zookeeper.KeeperException;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test the {@link SplitTransactionImpl} class against an HRegion (as opposed to
|
|
||||||
* running cluster).
|
|
||||||
*/
|
|
||||||
@Category({RegionServerTests.class, SmallTests.class})
|
|
||||||
public class TestSplitTransaction {
|
|
||||||
private final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
|
||||||
private final Path testdir =
|
|
||||||
TEST_UTIL.getDataTestDir(this.getClass().getName());
|
|
||||||
private HRegion parent;
|
|
||||||
private WALFactory wals;
|
|
||||||
private FileSystem fs;
|
|
||||||
private static final byte [] STARTROW = new byte [] {'a', 'a', 'a'};
|
|
||||||
// '{' is next ascii after 'z'.
|
|
||||||
private static final byte [] ENDROW = new byte [] {'{', '{', '{'};
|
|
||||||
private static final byte [] GOOD_SPLIT_ROW = new byte [] {'d', 'd', 'd'};
|
|
||||||
private static final byte [] CF = HConstants.CATALOG_FAMILY;
|
|
||||||
|
|
||||||
private static boolean preRollBackCalled = false;
|
|
||||||
private static boolean postRollBackCalled = false;
|
|
||||||
|
|
||||||
@Before public void setup() throws IOException {
|
|
||||||
this.fs = FileSystem.get(TEST_UTIL.getConfiguration());
|
|
||||||
TEST_UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, CustomObserver.class.getName());
|
|
||||||
this.fs.delete(this.testdir, true);
|
|
||||||
final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
|
|
||||||
FSUtils.setRootDir(walConf, this.testdir);
|
|
||||||
this.wals = new WALFactory(walConf, null, this.getClass().getName());
|
|
||||||
|
|
||||||
this.parent = createRegion(this.testdir, this.wals);
|
|
||||||
RegionCoprocessorHost host = new RegionCoprocessorHost(this.parent, null, TEST_UTIL.getConfiguration());
|
|
||||||
this.parent.setCoprocessorHost(host);
|
|
||||||
TEST_UTIL.getConfiguration().setBoolean("hbase.testing.nocluster", true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@After public void teardown() throws IOException {
|
|
||||||
if (this.parent != null && !this.parent.isClosed()) this.parent.close();
|
|
||||||
Path regionDir = this.parent.getRegionFileSystem().getRegionDir();
|
|
||||||
if (this.fs.exists(regionDir) && !this.fs.delete(regionDir, true)) {
|
|
||||||
throw new IOException("Failed delete of " + regionDir);
|
|
||||||
}
|
|
||||||
if (this.wals != null) {
|
|
||||||
this.wals.close();
|
|
||||||
}
|
|
||||||
this.fs.delete(this.testdir, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test public void testFailAfterPONR() throws IOException, KeeperException {
|
|
||||||
final int rowcount = TEST_UTIL.loadRegion(this.parent, CF);
|
|
||||||
assertTrue(rowcount > 0);
|
|
||||||
int parentRowCount = TEST_UTIL.countRows(this.parent);
|
|
||||||
assertEquals(rowcount, parentRowCount);
|
|
||||||
|
|
||||||
// Start transaction.
|
|
||||||
SplitTransactionImpl st = prepareGOOD_SPLIT_ROW();
|
|
||||||
SplitTransactionImpl spiedUponSt = spy(st);
|
|
||||||
Mockito
|
|
||||||
.doThrow(new MockedFailedDaughterOpen())
|
|
||||||
.when(spiedUponSt)
|
|
||||||
.openDaughterRegion((Server) Mockito.anyObject(),
|
|
||||||
(HRegion) Mockito.anyObject());
|
|
||||||
|
|
||||||
// Run the execute. Look at what it returns.
|
|
||||||
boolean expectedException = false;
|
|
||||||
Server mockServer = Mockito.mock(Server.class);
|
|
||||||
when(mockServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
|
|
||||||
try {
|
|
||||||
spiedUponSt.execute(mockServer, null);
|
|
||||||
} catch (IOException e) {
|
|
||||||
if (e.getCause() != null &&
|
|
||||||
e.getCause() instanceof MockedFailedDaughterOpen) {
|
|
||||||
expectedException = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertTrue(expectedException);
|
|
||||||
// Run rollback returns that we should restart.
|
|
||||||
assertFalse(spiedUponSt.rollback(null, null));
|
|
||||||
// Make sure that region a and region b are still in the filesystem, that
|
|
||||||
// they have not been removed; this is supposed to be the case if we go
|
|
||||||
// past point of no return.
|
|
||||||
Path tableDir = this.parent.getRegionFileSystem().getTableDir();
|
|
||||||
Path daughterADir = new Path(tableDir, spiedUponSt.getFirstDaughter().getEncodedName());
|
|
||||||
Path daughterBDir = new Path(tableDir, spiedUponSt.getSecondDaughter().getEncodedName());
|
|
||||||
assertTrue(TEST_UTIL.getTestFileSystem().exists(daughterADir));
|
|
||||||
assertTrue(TEST_UTIL.getTestFileSystem().exists(daughterBDir));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test straight prepare works. Tries to split on {@link #GOOD_SPLIT_ROW}
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
@Test public void testPrepare() throws IOException {
|
|
||||||
prepareGOOD_SPLIT_ROW();
|
|
||||||
}
|
|
||||||
|
|
||||||
private SplitTransactionImpl prepareGOOD_SPLIT_ROW() throws IOException {
|
|
||||||
return prepareGOOD_SPLIT_ROW(this.parent);
|
|
||||||
}
|
|
||||||
|
|
||||||
private SplitTransactionImpl prepareGOOD_SPLIT_ROW(final HRegion parentRegion)
|
|
||||||
throws IOException {
|
|
||||||
SplitTransactionImpl st = new SplitTransactionImpl(parentRegion, GOOD_SPLIT_ROW);
|
|
||||||
assertTrue(st.prepare());
|
|
||||||
return st;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Pass a reference store
|
|
||||||
*/
|
|
||||||
@Test public void testPrepareWithRegionsWithReference() throws IOException {
|
|
||||||
HStore storeMock = Mockito.mock(HStore.class);
|
|
||||||
when(storeMock.hasReferences()).thenReturn(true);
|
|
||||||
when(storeMock.getFamily()).thenReturn(new HColumnDescriptor("cf"));
|
|
||||||
when(storeMock.getSizeToFlush()).thenReturn(new MemstoreSize());
|
|
||||||
when(storeMock.close()).thenReturn(ImmutableList.<StoreFile>of());
|
|
||||||
this.parent.stores.put(Bytes.toBytes(""), storeMock);
|
|
||||||
|
|
||||||
SplitTransactionImpl st = new SplitTransactionImpl(this.parent, GOOD_SPLIT_ROW);
|
|
||||||
|
|
||||||
assertFalse("a region should not be splittable if it has instances of store file references",
|
|
||||||
st.prepare());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test SplitTransactionListener
|
|
||||||
*/
|
|
||||||
@Test public void testSplitTransactionListener() throws IOException {
|
|
||||||
SplitTransactionImpl st = new SplitTransactionImpl(this.parent, GOOD_SPLIT_ROW);
|
|
||||||
SplitTransaction.TransactionListener listener =
|
|
||||||
Mockito.mock(SplitTransaction.TransactionListener.class);
|
|
||||||
st.registerTransactionListener(listener);
|
|
||||||
st.prepare();
|
|
||||||
Server mockServer = Mockito.mock(Server.class);
|
|
||||||
when(mockServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
|
|
||||||
PairOfSameType<Region> daughters = st.execute(mockServer, null);
|
|
||||||
verify(listener).transition(st, SplitTransaction.SplitTransactionPhase.STARTED,
|
|
||||||
SplitTransaction.SplitTransactionPhase.PREPARED);
|
|
||||||
verify(listener, times(15)).transition(any(SplitTransaction.class),
|
|
||||||
any(SplitTransaction.SplitTransactionPhase.class),
|
|
||||||
any(SplitTransaction.SplitTransactionPhase.class));
|
|
||||||
verifyNoMoreInteractions(listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Pass an unreasonable split row.
|
|
||||||
*/
|
|
||||||
@Test public void testPrepareWithBadSplitRow() throws IOException {
|
|
||||||
// Pass start row as split key.
|
|
||||||
SplitTransactionImpl st = new SplitTransactionImpl(this.parent, STARTROW);
|
|
||||||
assertFalse(st.prepare());
|
|
||||||
st = new SplitTransactionImpl(this.parent, HConstants.EMPTY_BYTE_ARRAY);
|
|
||||||
assertFalse(st.prepare());
|
|
||||||
st = new SplitTransactionImpl(this.parent, new byte [] {'A', 'A', 'A'});
|
|
||||||
assertFalse(st.prepare());
|
|
||||||
st = new SplitTransactionImpl(this.parent, ENDROW);
|
|
||||||
assertFalse(st.prepare());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test public void testPrepareWithClosedRegion() throws IOException {
|
|
||||||
this.parent.close();
|
|
||||||
SplitTransactionImpl st = new SplitTransactionImpl(this.parent, GOOD_SPLIT_ROW);
|
|
||||||
assertFalse(st.prepare());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test public void testWholesomeSplit() throws IOException {
|
|
||||||
final int rowcount = TEST_UTIL.loadRegion(this.parent, CF, true);
|
|
||||||
assertTrue(rowcount > 0);
|
|
||||||
int parentRowCount = TEST_UTIL.countRows(this.parent);
|
|
||||||
assertEquals(rowcount, parentRowCount);
|
|
||||||
|
|
||||||
// Pretend region's blocks are not in the cache, used for
|
|
||||||
// testWholesomeSplitWithHFileV1
|
|
||||||
CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
|
|
||||||
((LruBlockCache) cacheConf.getBlockCache()).clearCache();
|
|
||||||
|
|
||||||
// Start transaction.
|
|
||||||
SplitTransactionImpl st = prepareGOOD_SPLIT_ROW();
|
|
||||||
|
|
||||||
// Run the execute. Look at what it returns.
|
|
||||||
Server mockServer = Mockito.mock(Server.class);
|
|
||||||
when(mockServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
|
|
||||||
PairOfSameType<Region> daughters = st.execute(mockServer, null);
|
|
||||||
// Do some assertions about execution.
|
|
||||||
assertTrue(this.fs.exists(this.parent.getRegionFileSystem().getSplitsDir()));
|
|
||||||
// Assert the parent region is closed.
|
|
||||||
assertTrue(this.parent.isClosed());
|
|
||||||
|
|
||||||
// Assert splitdir is empty -- because its content will have been moved out
|
|
||||||
// to be under the daughter region dirs.
|
|
||||||
assertEquals(0, this.fs.listStatus(this.parent.getRegionFileSystem().getSplitsDir()).length);
|
|
||||||
// Check daughters have correct key span.
|
|
||||||
assertTrue(Bytes.equals(parent.getRegionInfo().getStartKey(),
|
|
||||||
daughters.getFirst().getRegionInfo().getStartKey()));
|
|
||||||
assertTrue(Bytes.equals(GOOD_SPLIT_ROW, daughters.getFirst().getRegionInfo().getEndKey()));
|
|
||||||
assertTrue(Bytes.equals(daughters.getSecond().getRegionInfo().getStartKey(), GOOD_SPLIT_ROW));
|
|
||||||
assertTrue(Bytes.equals(parent.getRegionInfo().getEndKey(),
|
|
||||||
daughters.getSecond().getRegionInfo().getEndKey()));
|
|
||||||
// Count rows. daughters are already open
|
|
||||||
int daughtersRowCount = 0;
|
|
||||||
for (Region openRegion: daughters) {
|
|
||||||
try {
|
|
||||||
int count = TEST_UTIL.countRows(openRegion);
|
|
||||||
assertTrue(count > 0 && count != rowcount);
|
|
||||||
daughtersRowCount += count;
|
|
||||||
} finally {
|
|
||||||
HBaseTestingUtility.closeRegionAndWAL(openRegion);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertEquals(rowcount, daughtersRowCount);
|
|
||||||
// Assert the write lock is no longer held on parent
|
|
||||||
assertTrue(!this.parent.lock.writeLock().isHeldByCurrentThread());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testCountReferencesFailsSplit() throws IOException {
|
|
||||||
final int rowcount = TEST_UTIL.loadRegion(this.parent, CF);
|
|
||||||
assertTrue(rowcount > 0);
|
|
||||||
int parentRowCount = TEST_UTIL.countRows(this.parent);
|
|
||||||
assertEquals(rowcount, parentRowCount);
|
|
||||||
|
|
||||||
// Start transaction.
|
|
||||||
HRegion spiedRegion = spy(this.parent);
|
|
||||||
SplitTransactionImpl st = prepareGOOD_SPLIT_ROW(spiedRegion);
|
|
||||||
SplitTransactionImpl spiedUponSt = spy(st);
|
|
||||||
doThrow(new IOException("Failing split. Expected reference file count isn't equal."))
|
|
||||||
.when(spiedUponSt).assertReferenceFileCount(anyInt(),
|
|
||||||
eq(new Path(this.parent.getRegionFileSystem().getTableDir(),
|
|
||||||
st.getSecondDaughter().getEncodedName())));
|
|
||||||
|
|
||||||
// Run the execute. Look at what it returns.
|
|
||||||
boolean expectedException = false;
|
|
||||||
Server mockServer = Mockito.mock(Server.class);
|
|
||||||
when(mockServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
|
|
||||||
try {
|
|
||||||
spiedUponSt.execute(mockServer, null);
|
|
||||||
} catch (IOException e) {
|
|
||||||
expectedException = true;
|
|
||||||
}
|
|
||||||
assertTrue(expectedException);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Test public void testRollback() throws IOException {
|
|
||||||
final int rowcount = TEST_UTIL.loadRegion(this.parent, CF);
|
|
||||||
assertTrue(rowcount > 0);
|
|
||||||
int parentRowCount = TEST_UTIL.countRows(this.parent);
|
|
||||||
assertEquals(rowcount, parentRowCount);
|
|
||||||
|
|
||||||
// Start transaction.
|
|
||||||
HRegion spiedRegion = spy(this.parent);
|
|
||||||
SplitTransactionImpl st = prepareGOOD_SPLIT_ROW(spiedRegion);
|
|
||||||
SplitTransactionImpl spiedUponSt = spy(st);
|
|
||||||
doNothing().when(spiedUponSt).assertReferenceFileCount(anyInt(),
|
|
||||||
eq(parent.getRegionFileSystem().getSplitsDir(st.getFirstDaughter())));
|
|
||||||
when(spiedRegion.createDaughterRegionFromSplits(spiedUponSt.getSecondDaughter())).
|
|
||||||
thenThrow(new MockedFailedDaughterCreation());
|
|
||||||
// Run the execute. Look at what it returns.
|
|
||||||
boolean expectedException = false;
|
|
||||||
Server mockServer = Mockito.mock(Server.class);
|
|
||||||
when(mockServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
|
|
||||||
try {
|
|
||||||
spiedUponSt.execute(mockServer, null);
|
|
||||||
} catch (MockedFailedDaughterCreation e) {
|
|
||||||
expectedException = true;
|
|
||||||
}
|
|
||||||
assertTrue(expectedException);
|
|
||||||
// Run rollback
|
|
||||||
assertTrue(spiedUponSt.rollback(null, null));
|
|
||||||
|
|
||||||
// Assert I can scan parent.
|
|
||||||
int parentRowCount2 = TEST_UTIL.countRows(this.parent);
|
|
||||||
assertEquals(parentRowCount, parentRowCount2);
|
|
||||||
|
|
||||||
// Assert rollback cleaned up stuff in fs
|
|
||||||
assertTrue(!this.fs.exists(HRegion.getRegionDir(this.testdir, st.getFirstDaughter())));
|
|
||||||
assertTrue(!this.fs.exists(HRegion.getRegionDir(this.testdir, st.getSecondDaughter())));
|
|
||||||
assertTrue(!this.parent.lock.writeLock().isHeldByCurrentThread());
|
|
||||||
|
|
||||||
// Now retry the split but do not throw an exception this time.
|
|
||||||
assertTrue(st.prepare());
|
|
||||||
PairOfSameType<Region> daughters = st.execute(mockServer, null);
|
|
||||||
// Count rows. daughters are already open
|
|
||||||
int daughtersRowCount = 0;
|
|
||||||
for (Region openRegion: daughters) {
|
|
||||||
try {
|
|
||||||
int count = TEST_UTIL.countRows(openRegion);
|
|
||||||
assertTrue(count > 0 && count != rowcount);
|
|
||||||
daughtersRowCount += count;
|
|
||||||
} finally {
|
|
||||||
HBaseTestingUtility.closeRegionAndWAL(openRegion);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertEquals(rowcount, daughtersRowCount);
|
|
||||||
// Assert the write lock is no longer held on parent
|
|
||||||
assertTrue(!this.parent.lock.writeLock().isHeldByCurrentThread());
|
|
||||||
assertTrue("Rollback hooks should be called.", wasRollBackHookCalled());
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean wasRollBackHookCalled(){
|
|
||||||
return (preRollBackCalled && postRollBackCalled);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Exception used in this class only.
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("serial")
|
|
||||||
private class MockedFailedDaughterCreation extends IOException {}
|
|
||||||
private class MockedFailedDaughterOpen extends IOException {}
|
|
||||||
|
|
||||||
HRegion createRegion(final Path testdir, final WALFactory wals)
|
|
||||||
throws IOException {
|
|
||||||
// Make a region with start and end keys. Use 'aaa', to 'AAA'. The load
|
|
||||||
// region utility will add rows between 'aaa' and 'zzz'.
|
|
||||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("table"));
|
|
||||||
HColumnDescriptor hcd = new HColumnDescriptor(CF);
|
|
||||||
htd.addFamily(hcd);
|
|
||||||
HRegionInfo hri = new HRegionInfo(htd.getTableName(), STARTROW, ENDROW);
|
|
||||||
HRegion r = HBaseTestingUtility.createRegionAndWAL(hri, testdir, TEST_UTIL.getConfiguration(),
|
|
||||||
htd);
|
|
||||||
HBaseTestingUtility.closeRegionAndWAL(r);
|
|
||||||
return HRegion.openHRegion(testdir, hri, htd,
|
|
||||||
wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace()),
|
|
||||||
TEST_UTIL.getConfiguration());
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class CustomObserver extends BaseRegionObserver{
|
|
||||||
@Override
|
|
||||||
public void preRollBackSplit(
|
|
||||||
ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException {
|
|
||||||
preRollBackCalled = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void postRollBackSplit(
|
|
||||||
ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException {
|
|
||||||
postRollBackCalled = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue