HBASE-18884 Coprocessor Design Improvements follow up of HBASE-17732

- Change Service Coprocessor#getService() to List<Service> Coprocessor#getServices()
- Checkin the finalized design doc into repo
- Added example to javadoc of Coprocessor base interface on how to implement one in the new design
This commit is contained in:
Apekshit Sharma 2017-09-27 18:06:12 -07:00
parent ca2959824d
commit 74d0adce61
27 changed files with 117 additions and 79 deletions

View File

@ -20,14 +20,42 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.Optional;
import java.util.Collections;
import com.google.protobuf.Service;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
/**
* Coprocessor interface.
* Base interface for the 4 coprocessors - MasterCoprocessor, RegionCoprocessor,
* RegionServerCoprocessor, and WALCoprocessor.
* Do NOT implement this interface directly. Unless an implementation implements one (or more) of
* the above mentioned 4 coprocessors, it'll fail to be loaded by any coprocessor host.
*
* Example:
* Building a coprocessor to observer Master operations.
* <pre>
* class MyMasterCoprocessor implements MasterCoprocessor {
* &#64;Override
* public Optional&lt;MasterObserver> getMasterObserver() {
* return new MyMasterObserver();
* }
* }
*
* class MyMasterObserver implements MasterObserver {
* ....
* }
* </pre>
*
* Building a Service which can be loaded by both Master and RegionServer
* <pre>
* class MyCoprocessorService implements MasterCoprocessor, RegionServerCoprocessor {
* &#64;Override
* public Optional&lt;Service> getServices() {
* return new ...;
* }
* }
* </pre>
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
@ -70,7 +98,7 @@ public interface Coprocessor {
/**
* Coprocessor endpoints providing protobuf services should implement this interface.
*/
default Optional<Service> getService() {
return Optional.empty();
default Iterable<Service> getServices() {
return Collections.EMPTY_SET;
}
}

View File

@ -29,9 +29,9 @@ import com.google.protobuf.Service;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -500,8 +500,8 @@ extends AggregateService implements RegionCoprocessor {
}
@Override
public Optional<Service> getService() {
return Optional.of(this);
public Iterable<Service> getServices() {
return Collections.singleton(this);
}
/**

View File

@ -22,10 +22,10 @@ import java.io.Closeable;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -313,8 +313,8 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
}
@Override
public Optional<Service> getService() {
return Optional.of(this);
public Iterable<Service> getServices() {
return Collections.singleton(this);
}
@Override

View File

@ -19,9 +19,9 @@
package org.apache.hadoop.hbase.security.access;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -175,7 +175,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg
}
@Override
public Optional<Service> getService() {
return Optional.of(this);
public Iterable<Service> getServices() {
return Collections.singleton(this);
}
}

View File

@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -40,7 +40,6 @@ import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
/**
* The aggregation implementation at a region.
*/
@ -50,8 +49,8 @@ implements RegionCoprocessor {
private RegionCoprocessorEnvironment env = null;
@Override
public Optional<Service> getService() {
return Optional.of(this);
public Iterable<Service> getServices() {
return Collections.singleton(this);
}
@Override

View File

@ -19,8 +19,8 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -51,8 +51,8 @@ public class ColumnAggregationEndpointNullResponse
private static final Log LOG = LogFactory.getLog(ColumnAggregationEndpointNullResponse.class);
private RegionCoprocessorEnvironment env = null;
@Override
public Optional<Service> getService() {
return Optional.of(this);
public Iterable<Service> getServices() {
return Collections.singleton(this);
}
@Override

View File

@ -19,8 +19,8 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -54,8 +54,8 @@ public class ColumnAggregationEndpointWithErrors
private RegionCoprocessorEnvironment env = null;
@Override
public Optional<Service> getService() {
return Optional.of(this);
public Iterable<Service> getServices() {
return Collections.singleton(this);
}
@Override

View File

@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.util.Threads;
import java.io.IOException;
import java.util.Optional;
import java.util.Collections;
/**
* Test implementation of a coprocessor endpoint exposing the
@ -45,8 +45,8 @@ public class ProtobufCoprocessorService extends TestRpcServiceProtos.TestProtobu
public ProtobufCoprocessorService() {}
@Override
public Optional<Service> getService() {
return Optional.of(this);
public Iterable<Service> getServices() {
return Collections.singleton(this);
}
@Override

View File

@ -23,7 +23,7 @@ import static org.junit.Assert.fail;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Optional;
import java.util.Collections;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
@ -131,8 +131,8 @@ public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
public DummyRegionServerEndpoint() {}
@Override
public Optional<Service> getService() {
return Optional.of(this);
public Iterable<Service> getServices() {
return Collections.singleton(this);
}
@Override

View File

@ -21,11 +21,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Optional;
import java.util.Collections;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos;
@ -106,8 +104,8 @@ public class TestRegionServerCoprocessorEndpoint {
implements RegionServerCoprocessor {
@Override
public Optional<Service> getService() {
return Optional.of(this);
public Iterable<Service> getServices() {
return Collections.singleton(this);
}
@Override

View File

@ -23,8 +23,8 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -124,8 +124,8 @@ public class TestServerCustomProtocol {
}
@Override
public Optional<Service> getService() {
return Optional.of(this);
public Iterable<Service> getServices() {
return Collections.singleton(this);
}
}

View File

@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.coprocessor.example;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
@ -102,8 +102,8 @@ public class BulkDeleteEndpoint extends BulkDeleteService implements RegionCopro
private RegionCoprocessorEnvironment env;
@Override
public Optional<Service> getService() {
return Optional.of(this);
public Iterable<Service> getServices() {
return Collections.singleton(this);
}
@Override

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RefreshHFilesProtos;
import org.apache.hadoop.hbase.regionserver.Store;
import java.io.IOException;
import java.util.Optional;
import java.util.Collections;
/**
* Coprocessor endpoint to refresh HFiles on replica.
@ -51,8 +51,8 @@ public class RefreshHFilesEndpoint extends RefreshHFilesProtos.RefreshHFilesServ
}
@Override
public Optional<Service> getService() {
return Optional.of(this);
public Iterable<Service> getServices() {
return Collections.singleton(this);
}
@Override

View File

@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.coprocessor.example;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@ -58,8 +58,8 @@ public class RowCountEndpoint extends ExampleProtos.RowCountService implements R
* Just returns a reference to this object, which implements the RowCounterService interface.
*/
@Override
public Optional<Service> getService() {
return Optional.of(this);
public Iterable<Service> getServices() {
return Collections.singleton(this);
}
/**

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.rsgroup;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
@ -95,8 +96,8 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
}
@Override
public Optional<Service> getService() {
return Optional.of(groupAdminService);
public Iterable<Service> getServices() {
return Collections.singleton(groupAdminService);
}
@Override

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Optional;
import java.util.Collections;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@ -83,8 +83,8 @@ extends RowProcessorService implements RegionCoprocessor {
}
@Override
public Optional<Service> getService() {
return Optional.of(this);
public Iterable<Service> getServices() {
return Collections.singleton(this);
}
/**

View File

@ -22,7 +22,7 @@ package org.apache.hadoop.hbase.coprocessor;
import com.google.protobuf.Service;
import org.apache.yetus.audience.InterfaceAudience;
import java.util.Optional;
import java.util.Collections;
/**
* Classes to help maintain backward compatibility with now deprecated {@link CoprocessorService}
@ -50,8 +50,8 @@ public class CoprocessorServiceBackwardCompatiblity {
}
@Override
public Optional<Service> getService() {
return Optional.of(service.getService());
public Iterable<Service> getServices() {
return Collections.singleton(service.getService());
}
}
@ -64,8 +64,8 @@ public class CoprocessorServiceBackwardCompatiblity {
}
@Override
public Optional<Service> getService() {
return Optional.of(service.getService());
public Iterable<Service> getServices() {
return Collections.singleton(service.getService());
}
}
@ -78,8 +78,8 @@ public class CoprocessorServiceBackwardCompatiblity {
}
@Override
public Optional<Service> getService() {
return Optional.of(service.getService());
public Iterable<Service> getServices() {
return Collections.singleton(service.getService());
}
}
}

View File

@ -19,8 +19,8 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
@ -120,8 +120,8 @@ public class MultiRowMutationEndpoint extends MultiRowMutationService implements
}
@Override
public Optional<Service> getService() {
return Optional.of(this);
public Iterable<Service> getServices() {
return Collections.singleton(this);
}
/**

View File

@ -26,7 +26,7 @@ import org.apache.yetus.audience.InterfaceStability;
import java.util.Optional;
/**
* WALCoprocessor don't support loading services using {@link #getService()}.
* WALCoprocessor don't support loading services using {@link #getServices()}.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Set;
import com.google.protobuf.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -126,7 +127,10 @@ public class MasterCoprocessorHost
@Override
public MasterEnvironment createEnvironment(final MasterCoprocessor instance, final int priority,
final int seq, final Configuration conf) {
instance.getService().ifPresent(masterServices::registerService);
// If coprocessor exposes any services, register them.
for (Service service : instance.getServices()) {
masterServices.registerService(service);
}
return new MasterEnvironment(instance, priority, seq, conf, masterServices);
}

View File

@ -393,9 +393,10 @@ public class RegionCoprocessorHost
@Override
public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq,
Configuration conf) {
// Due to current dynamic protocol design, Endpoint uses a different way to be registered and
// executed. It uses a visitor pattern to invoke registered Endpoint method.
instance.getService().ifPresent(region::registerService);
// If coprocessor exposes any services, register them.
for (Service service : instance.getServices()) {
region.registerService(service);
}
ConcurrentMap<String, Object> classData;
// make sure only one thread can add maps
synchronized (SHARED_DATA_MAP) {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import com.google.protobuf.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -206,7 +207,10 @@ public class RegionServerCoprocessorHost extends
final int seq, final Configuration conf, final RegionServerServices services) {
super(impl, priority, seq, conf);
this.regionServerServices = services;
impl.getService().ifPresent(regionServerServices::registerService);
// If coprocessor exposes any services, register them.
for (Service service : impl.getServices()) {
regionServerServices.registerService(service);
}
this.metricRegistry =
MetricsCoprocessor.createRegistryForRSCoprocessor(impl.getClass().getName());
}

View File

@ -23,6 +23,7 @@ import java.net.InetAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -126,8 +127,6 @@ import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.MapMaker;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.util.ByteRange;
import org.apache.hadoop.hbase.util.Bytes;
@ -1019,8 +1018,9 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
}
@Override
public Optional<Service> getService() {
return Optional.of(AccessControlProtos.AccessControlService.newReflectiveService(this));
public Iterable<Service> getServices() {
return Collections.singleton(
AccessControlProtos.AccessControlService.newReflectiveService(this));
}
/*********************************** Observer implementations ***********************************/

View File

@ -19,7 +19,7 @@
package org.apache.hadoop.hbase.security.token;
import java.io.IOException;
import java.util.Optional;
import java.util.Collections;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -96,8 +96,9 @@ public class TokenProvider implements AuthenticationProtos.AuthenticationService
// AuthenticationService implementation
@Override
public Optional<Service> getService() {
return Optional.of(AuthenticationProtos.AuthenticationService.newReflectiveService(this));
public Iterable<Service> getServices() {
return Collections.singleton(
AuthenticationProtos.AuthenticationService.newReflectiveService(this));
}
@Override

View File

@ -26,6 +26,7 @@ import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LA
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -200,8 +201,9 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso
}
@Override
public Optional<Service> getService() {
return Optional.of(VisibilityLabelsProtos.VisibilityLabelsService.newReflectiveService(this));
public Iterable<Service> getServices() {
return Collections.singleton(
VisibilityLabelsProtos.VisibilityLabelsService.newReflectiveService(this));
}
/********************************* Master related hooks **********************************/

View File

@ -35,8 +35,8 @@ import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -2564,8 +2564,8 @@ public class TestAccessController extends SecureTestUtil {
public void stop(CoprocessorEnvironment env) throws IOException { }
@Override
public Optional<Service> getService() {
return Optional.of(this);
public Iterable<Service> getServices() {
return Collections.singleton(this);
}
@Override