dubbo 服务代理机制原理

spi

Posted by CHuiL on October 20, 2020

java 的代理机制

两种实现方式

jdk动态代理

//调用接口
public interface Interface {
    void doSomething();
    void somethingElse(String arg);
}


//接口实现
public class RealObject implements Interface {

    @Override
    public void doSomething() {
        System.out.println("doSomething");
    }

    @Override
    public void somethingElse(String arg) {
        System.out.println("somethingElse "+arg);
    }
}

//动态代理处理类,实现InvocationHandler接口
class DynamicProxyHandler implements InvocationHandler{
    private Object proxied;

    public DynamicProxyHandler(Object proxied){
        this.proxied = proxied;
    }


    @Override 
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        System.out.println("**** proxy : " + proxy.getClass() + ", method: " + method + ", args: " + args);
        if (args != null){
            for (Object arg : args){
                System.out.println(" " + arg);
            }
        }
        System.out.println("我是代理");
        return method.invoke(proxied,args);
    }
}


class SimpleProxyDemoProxy{
    public static void main(String[] args){
        RealObject real = new RealObject();
        consumer(real);
        Interface proxy = (Interface) Proxy.newProxyInstance(
                Interface.class.getClassLoader(),
                new Class[] {Interface.class},
                new DynamicProxyHandler(real)
        );
        proxy.doSomething(); //调用方法会动态的进入invoke方法中,实现代理。
        proxy.somethingElse("ala");
    }
}

结果

**** proxy : class com.sun.proxy.$Proxy0, method: public abstract void interFace.Interface.doSomething(), args: null
我是代理
doSomething
**** proxy : class com.sun.proxy.$Proxy0, method: public abstract void interFace.Interface.somethingElse(java.lang.String), args: [Ljava.lang.Object;@4b67cf4d
 ala
我是代理
somethingElse ala

javassist字节码 动态代理

public class JavassistProxyToolHandler {

    public Object getProxy(Class<?> type) throws IllegalAccessException,InstantiationException{
        ProxyFactory f = new ProxyFactory();
        f.setSuperclass(type);
        f.setFilter(new MethodFilter() {
            @Override
            public boolean isHandled(Method method) {
                return !method.getName().equals("doSomething");
            }
        });
        Class c = f.createClass();
        MethodHandler mi = new MethodHandler() {
            @Override
            public Object invoke(Object service, Method method, Method proceed, Object[] args) throws Throwable {
                System.out.println("*javassist proxy invoke* method name :" + method.getName()+" exec");
                return proceed.invoke(service,args);
            }
        };
        Object proxy = c.newInstance();
        ((Proxy)proxy).setHandler(mi);
        return proxy;
    }
}
....
    
    //使用
    RealToll realToll = new RealToll();
    JavassistProxyToolHandler javassistProxyToolHandler = new JavassistProxyToolHandler();
    try {
        realToll = (RealToll)javassistProxyToolHandler.getProxy(RealToll.class);
    }catch (Exception e){
        System.out.println(e);
    }
    realToll.doSomething();
    realToll.somethingElse("arg");

结果

doSomething
*javassist proxy invoke* method name :somethingElse exec
somethingElse arg

区别

  • javassist是直接通过操作字节码来生成代理对象的。
  • jdk只能对接口进行代理,javassist可以对类进行代理。

dubbo中代理机制的使用

服务调用

在Dubbo服务被注入到其他类中时,Spring会第一时间调用getObject方法,并由该方法执行服务引用逻辑。
在进行具体工作之前,需要先进行配置检查与收集工作,来决定服务的调用方式。有三种:1.引用本地jvm服务,2.通过直连的方式调用远程服务,3.通过注册中心调用远程服务。
不管是哪种调用方式,最终都会得到一个invoker实例。该实例具备调用本地或者远程服务的能力,但并不能将此实例暴露给用户使用,这会对用户业务代码造成侵入。所以还需要代理工程类(ProxyFactory)为服务接口生成代理类,并让代理类去调用Invoker逻辑。

调用过程

  1. 调用->获取调用对象实例->获取代理对象->生成invoker->生成代理对象
获取调用对象实例

获取各种基本配置信息-用于生成代理对象,配置信息(map);

public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
    ...
    
    public Object getObject() {
        return get();
    }
    
    public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        if (ref == null) {
            init();
        }
        return ref;
    }
    
    public synchronized void init() {
        Map<String, String> map = new HashMap<String, String>();
        .... //获取各种配置信息,如接口名,接口的方法,注册地址等等
        ref = createProxy(map); //创建代理
        ....
    }
    ...
}
生成代理对象

根据invoker来生成代理对象。

public class ReferenceConfig<T> extends ReferenceConfigBase<T> {

    private T createProxy(Map<String, String> map) {
        ....//前面都是用于生成invoker,根据map和url来决定使用哪一种invoker,有本地的,远程点对点的(服务直连),和多个注册中心或多个服务提供者,或者两者混合,根据 url 构建 Invoker。然后再通过 Cluster 合并多个 Invoker;
        
        //用生成的invoker来获取代理类
        return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    }
}
public abstract class AbstractProxyFactory implements ProxyFactory {
    ...
    public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
        //获取 interfaces 数组
        Set<Class<?>> interfaces = new HashSet<>();
        String config = invoker.getUrl().getParameter(INTERFACES);
        if (config != null && config.length() > 0) {
            String[] types = COMMA_SPLIT_PATTERN.split(config);
            for (String type : types) {
                // TODO can we load successfully for a different classloader?.
                interfaces.add(ReflectUtils.forName(type));
            }
        }
        ...
        
        return getProxy(invoker, interfaces.toArray(new Class<?>[0]));
    }
}
public class JavassistProxyFactory extends AbstractProxyFactory {

    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
    ....
}
        
public abstract class Proxy {

    public static Proxy getProxy(Class<?>... ics) {
        return getProxy(ClassUtils.getClassLoader(Proxy.class), ics);
    }

    public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
        //ics大小不能大于65535
        if (ics.length > MAX_PROXY_COUNT) { 
    
            throw new IllegalArgumentException("interface limit exceeded");
        }
        
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < ics.length; i++) {
            //判断是否是interface
            String itf = ics[i].getName();
            if (!ics[i].isInterface()) { //
                throw new RuntimeException(itf + " is not a interface.");
            }

            Class<?> tmp = null;
            try {
                tmp = Class.forName(itf, false, cl);
            } catch (ClassNotFoundException e) {
            }

            if (tmp != ics[i]) {
                throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
            }

            sb.append(itf).append(';');
        }

        // 使用interface名作为key
        String key = sb.toString();

        // 通过class loader获取case
        final Map<String, Object> cache;
        synchronized (PROXY_CACHE_MAP) {
            cache = PROXY_CACHE_MAP.computeIfAbsent(cl, k -> new HashMap<>());
        }

        Proxy proxy = null;
        //先尝试通过cache获取代理对象
        synchronized (cache) {
            do {
                Object value = cache.get(key);
                if (value instanceof Reference<?>) {
                    proxy = (Proxy) ((Reference<?>) value).get();
                    if (proxy != null) {
                        return proxy;
                    }
                }

                if (value == PENDING_GENERATION_MARKER) {
                    try {
                        cache.wait();
                    } catch (InterruptedException e) {
                    }
                } else {
                    cache.put(key, PENDING_GENERATION_MARKER);
                    break;
                }
            }
            while (true);
        }

        long id = PROXY_CLASS_COUNTER.getAndIncrement(); //用于递增proxy$0
        String pkg = null;
        //ccp 用于为服务接口生成代理类,比如我们有一个 DemoService 接口,这个接口代理类就是由 ccp 生成的。
        //ccm 则是用于为 org.apache.dubbo.common.bytecode.Proxy 抽象类生成子类,主要是实现 Proxy 类的抽象方法。
        //以下先用ccp生成具体接口代理类
        ClassGenerator ccp = null, ccm = null;
        try {
            ccp = ClassGenerator.newInstance(cl); 

            Set<String> worked = new HashSet<>();
            List<Method> methods = new ArrayList<>();

            for (int i = 0; i < ics.length; i++) {
                if (!Modifier.isPublic(ics[i].getModifiers())) {
                    String npkg = ics[i].getPackage().getName();
                    if (pkg == null) {
                        pkg = npkg;
                    } else {
                        if (!pkg.equals(npkg)) {
                            throw new IllegalArgumentException("non-public interfaces from different packages");
                        }
                    }
                }
                ccp.addInterface(ics[i]);

                for (Method method : ics[i].getMethods()) {
                    String desc = ReflectUtils.getDesc(method);
                    if (worked.contains(desc) || Modifier.isStatic(method.getModifiers())) {
                        continue;
                    }
                    if (ics[i].isInterface() && Modifier.isStatic(method.getModifiers())) {
                        continue;
                    }
                    worked.add(desc);

                    int ix = methods.size();
                    Class<?> rt = method.getReturnType();
                    Class<?>[] pts = method.getParameterTypes();
                    
                    StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
                    for (int j = 0; j < pts.length; j++) {
                        code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                    }
                    code.append(" Object ret = handler.invoke(this, methods[").append(ix).append("], args);");
                    if (!Void.TYPE.equals(rt)) {
                        code.append(" return ").append(asArgument(rt, "ret")).append(";");
                    }

                    methods.add(method);
                    ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
                }
            }

            if (pkg == null) {
                pkg = PACKAGE_NAME;
            }

            // create ProxyInstance class.
            String pcn = pkg + ".proxy" + id;
            ccp.setClassName(pcn);
            ccp.addField("public static java.lang.reflect.Method[] methods;");
            ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
            ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
            ccp.addDefaultConstructor();
            //生成接口Class
            Class<?> clazz = ccp.toClass();
            clazz.getField("methods").set(null, methods.toArray(new Method[0]));

            // create Proxy class.
            String fcn = Proxy.class.getName() + id;
            ccm = ClassGenerator.newInstance(cl);
            ccm.setClassName(fcn);
            ccm.addDefaultConstructor();
            ccm.setSuperClass(Proxy.class);
            //生成Proxy抽象类子类,实现抽象newInstance方法。
            ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
            Class<?> pc = ccm.toClass();
            proxy = (Proxy) pc.newInstance();
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        } finally {
            // release ClassGenerator
            if (ccp != null) {
                ccp.release();
            }
            if (ccm != null) {
                ccm.release();
            }
            synchronized (cache) {
                if (proxy == null) {
                    cache.remove(key);
                } else {
                    cache.put(key, new WeakReference<Proxy>(proxy));
                }
                cache.notifyAll();
            }
        }
        return proxy; 
    }
}
    

最后 ccp生成代码如下

package org.apache.dubbo.common.bytecode;

public class proxy0 implements org.apache.dubbo.demo.DemoService {

    public static java.lang.reflect.Method[] methods;

    private java.lang.reflect.InvocationHandler handler;

    public proxy0() {
    }

    public proxy0(java.lang.reflect.InvocationHandler arg0) {
        handler = $1;
    }

    public java.lang.String sayHello(java.lang.String arg0) {
        Object[] args = new Object[1];
        args[0] = ($w) $1;
        Object ret = handler.invoke(this, methods[0], args);
        return (java.lang.String) ret;
    }
}

cnn生成的代码如下

public abstract class Proxy {
    ....
    public Object newInstance(java.lang.reflect.InvocationHandler h) { 
         return new org.apache.dubbo.proxy0(h);
    }
    ...
生成invoker

根据map配置信息,判断是何种调用(3种),组成对应的url,生成对应的invoker;
Invoker 是 Dubbo 的核心模型,代表一个可执行体。在服务提供方,Invoker 用于调用服务提供类。在服务消费方,Invoker 用于执行远程调用。
Invoker 是由 Protocol 实现类构建而来。Protocol 实现类有很多,这里以DubboProtocol为例子

public abstract class AbstractProtocol implements Protocol {
    ...
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
    }
    protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException;
}
public class DubboProtocol extends AbstractProtocol {

    @Override
    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);

        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); //这里深挖client就是是Netty 提供的 API 构建 Netty 客户端
        invokers.add(invoker);

        return invoker;
    }
}
public class InvokerInvocationHandler implements InvocationHandler {
    private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
    private final Invoker<?> invoker;
    private ConsumerModel consumerModel;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
        String serviceKey = invoker.getUrl().getServiceKey();
        if (serviceKey != null) {
            this.consumerModel = ApplicationModel.getConsumerModel(serviceKey);
        }
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (parameterTypes.length == 0) {
            if ("toString".equals(methodName)) {
                return invoker.toString();
            } else if ("$destroy".equals(methodName)) {
                invoker.destroy();
                return null;
            } else if ("hashCode".equals(methodName)) {
                return invoker.hashCode();
            }
        } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
            return invoker.equals(args[0]);
        }
        RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
        String serviceKey = invoker.getUrl().getServiceKey();
        rpcInvocation.setTargetServiceUniqueName(serviceKey);
      
        if (consumerModel != null) {
            rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
            rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
        }

        return invoker.invoke(rpcInvocation).recreate();
    }
}