Java从零开启手写一 Reflect 反射实现通用调用之客户端
发布时间:2021-11-03 17:33:26 所属栏目:语言 来源:互联网
导读:上一篇我们介绍了,如何实现基于反射的通用服务端。 这一节我们来一起学习下如何实现通用客户端。 因为内容较多,所以拆分为 2 个部分。 基本思路 所有的方法调用,基于反射进行相关处理实现。 java 从零开始手写 RPC (06) reflect 反射实现通用调用之客户端
上一篇我们介绍了,如何实现基于反射的通用服务端。
这一节我们来一起学习下如何实现通用客户端。
因为内容较多,所以拆分为 2 个部分。
基本思路
所有的方法调用,基于反射进行相关处理实现。
java 从零开始手写 RPC (06) reflect 反射实现通用调用之客户端
核心类
为了便于拓展,我们把核心类调整如下:
package com.github.houbb.rpc.client.core;
import com.github.houbb.heaven.annotation.ThreadSafe;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.context.RpcClientContext;
import com.github.houbb.rpc.client.handler.RpcClientHandler;
import com.github.houbb.rpc.common.constant.RpcConstant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* <p> rpc 客户端 </p>
*
* <pre> Created: 2019/10/16 11:21 下午 </pre>
* <pre> Project: rpc </pre>
*
* @author houbinbin
* @since 0.0.2
*/
@ThreadSafe
public class RpcClient {
private static final Log log = LogFactory.getLog(RpcClient.class);
/**
* 地址信息
* @since 0.0.6
*/
private final String address;
/**
* 监听端口号
* @since 0.0.6
*/
private final int port;
/**
* 客户端处理 handler
* 作用:用于获取请求信息
* @since 0.0.4
*/
private final ChannelHandler channelHandler;
public RpcClient(final RpcClientContext clientContext) {
this.address = clientContext.address();
this.port = clientContext.port();
this.channelHandler = clientContext.channelHandler();
}
/**
* 进行连接
* @since 0.0.6
*/
public ChannelFuture connect() {
// 启动服务端
log.info("RPC 服务开始启动客户端");
EventLoopGroup workerGroup = new NioEventLoopGroup();
/**
* channel future 信息
* 作用:用于写入请求信息
* @since 0.0.6
*/
ChannelFuture channelFuture;
try {
Bootstrap bootstrap = new Bootstrap();
channelFuture = bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<Channel>(){
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
// 解码 bytes=>resp
.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
// request=>bytes
.addLast(new ObjectEncoder())
// 日志输出
.addLast(new LoggingHandler(LogLevel.INFO))
.addLast(channelHandler);
}
})
.connect(address, port)
.syncUninterruptibly();
log.info("RPC 服务启动客户端完成,监听地址 {}:{}", address, port);
} catch (Exception e) {
log.error("RPC 客户端遇到异常", e);
throw new RuntimeException(e);
}
// 不要关闭线程池!!!
return channelFuture;
}
}
可以灵活指定对应的服务端地址、端口信息。
ChannelHandler 作为处理参数传入。
ObjectDecoder、ObjectEncoder、LoggingHandler 都和服务端类似,是 netty 的内置实现。
RpcClientHandler
客户端的 handler 实现如下:
/*
* Copyright (c) 2019. houbinbin Inc.
* rpc All rights reserved.
*/
package com.github.houbb.rpc.client.handler;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.client.invoke.InvokeService;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* <p> 客户端处理类 </p>
*
* <pre> Created: 2019/10/16 11:30 下午 </pre>
* <pre> Project: rpc </pre>
*
* @author houbinbin
* @since 0.0.2
*/
public class RpcClientHandler extends SimpleChannelInboundHandler {
private static final Log log = LogFactory.getLog(RpcClient.class);
/**
* 调用服务管理类
*
* @since 0.0.6
*/
private final InvokeService invokeService;
public RpcClientHandler(InvokeService invokeService) {
this.invokeService = invokeService;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
RpcResponse rpcResponse = (RpcResponse)msg;
invokeService.addResponse(rpcResponse.seqId(), rpcResponse);
log.info("[Client] response is :{}", rpcResponse);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 每次用完要关闭,不然拿不到response,我也不知道为啥(目测得了解netty才行)
// 个人理解:如果不关闭,则永远会被阻塞。
ctx.flush();
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
只有 channelRead0 做了调整,基于 InvokeService 对结果进行处理。
InvokeService
接口
package com.github.houbb.rpc.client.invoke;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
/**
* 调用服务接口
* @author binbin.hou
* @since 0.0.6
*/
public interface InvokeService {
/**
* 添加请求信息
* @param seqId 序列号
* @return this
* @since 0.0.6
*/
InvokeService addRequest(final String seqId);
/**
* 放入结果
* @param seqId 唯一标识
* @param rpcResponse 响应结果
* @return this
* @since 0.0.6
*/
InvokeService addResponse(final String seqId, final RpcResponse rpcResponse);
/**
* 获取标志信息对应的结果
* @param seqId 序列号
* @return 结果
* @since 0.0.6
*/
RpcResponse getResponse(final String seqId);
}
主要是对入参、出参的设置,以及出参的获取。
实现
package com.github.houbb.rpc.client.invoke.impl;
import com.github.houbb.heaven.util.guava.Guavas;
import com.github.houbb.heaven.util.lang.ObjectUtil;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.client.invoke.InvokeService;
import com.github.houbb.rpc.common.exception.RpcRuntimeException;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* 调用服务接口
* @author binbin.hou
* @since 0.0.6
*/
public class DefaultInvokeService implements InvokeService {
private static final Log LOG = LogFactory.getLog(DefaultInvokeService.class);
/**
* 请求序列号集合
* (1)这里后期如果要添加超时检测,可以添加对应的超时时间。
* 可以把这里调整为 map
* @since 0.0.6
*/
private final Set<String> requestSet;
/**
* 响应结果
* @since 0.0.6
*/
private final ConcurrentHashMap<String, RpcResponse> responseMap;
public DefaultInvokeService() {
requestSet = Guavas.newHashSet();
responseMap = new ConcurrentHashMap<>();
}
@Override
public InvokeService addRequest(String seqId) {
LOG.info("[Client] start add request for seqId: {}", seqId);
requestSet.add(seqId);
return this;
}
@Override
public InvokeService addResponse(String seqId, RpcResponse rpcResponse) {
// 这里放入之前,可以添加判断。
// 如果 seqId 必须处理请求集合中,才允许放入。或者直接忽略丢弃。
LOG.info("[Client] 获取结果信息,seq: {}, rpcResponse: {}", seqId, rpcResponse);
responseMap.putIfAbsent(seqId, rpcResponse);
// 通知所有等待方
LOG.info("[Client] seq 信息已经放入,通知所有等待方", seqId);
synchronized (this) {
this.notifyAll();
}
return this;
}
@Override
public RpcResponse getResponse(String seqId) {
try {
RpcResponse rpcResponse = this.responseMap.get(seqId);
if(ObjectUtil.isNotNull(rpcResponse)) {
LOG.info("[Client] seq {} 对应结果已经获取: {}", seqId, rpcResponse);
return rpcResponse;
}
// 进入等待
while (rpcResponse == null) {
LOG.info("[Client] seq {} 对应结果为空,进入等待", seqId);
// 同步等待锁
synchronized (this) {
this.wait();
}
rpcResponse = this.responseMap.get(seqId);
LOG.info("[Client] seq {} 对应结果已经获取: {}", seqId, rpcResponse);
}
return rpcResponse;
} catch (InterruptedException e) {
throw new RpcRuntimeException(e);
}
}
}
使用 requestSet 存储对应的请求入参。
使用 responseMap 存储对应的请求出参,在获取的时候通过同步 while 循环等待,获取结果。
此处,通过 notifyAll() 和 wait() 进行等待和唤醒。
ReferenceConfig-服务端配置
说明
我们想调用服务端,首先肯定要定义好要调用的对象。
ReferenceConfig 就是要告诉 rpc 框架,调用的服务端信息。
接口
package com.github.houbb.rpc.client.config.reference;
import com.github.houbb.rpc.common.config.component.RpcAddress;
import java.util.List;
/**
* 引用配置类
*
* 后期配置:
* (1)timeout 调用超时时间
* (2)version 服务版本处理
* (3)callType 调用方式 oneWay/sync/async
* (4)check 是否必须要求服务启动。
*
* spi:
* (1)codec 序列化方式
* (2)netty 网络通讯架构
* (3)load-balance 负载均衡
* (4)失败策略 fail-over/fail-fast
*
* filter:
* (1)路由
* (2)耗时统计 monitor 服务治理
*
* 优化思考:
* (1)对于唯一的 serviceId,其实其 interface 是固定的,是否可以省去?
* @author binbin.hou
* @since 0.0.6
* @param <T> 接口泛型
*/
public interface ReferenceConfig<T> {
/**
* 设置服务标识
* @param serviceId 服务标识
* @return this
* @since 0.0.6
*/
ReferenceConfig<T> serviceId(final String serviceId);
/**
* 服务唯一标识
* @since 0.0.6
*/
String serviceId();
/**
* 服务接口
* @since 0.0.6
* @return 接口信息
*/
Class<T> serviceInterface();
/**
* 设置服务接口信息
* @param serviceInterface 服务接口信息
* @return this
* @since 0.0.6
*/
ReferenceConfig<T> serviceInterface(final Class<T> serviceInterface);
/**
* 设置服务地址信息
* (1)单个写法:ip:port:weight
* (2)集群写法:ip1:port1:weight1,ip2:port2:weight2
*
* 其中 weight 权重可以不写,默认为1.
*
* @param addresses 地址列表信息
* @return this
* @since 0.0.6
*/
ReferenceConfig<T> addresses(final String addresses);
/**
* 获取对应的引用实现
* @return 引用代理类
* @since 0.0.6
*/
T reference();
}
实现
package com.github.houbb.rpc.client.config.reference.impl;
import com.github.houbb.heaven.constant.PunctuationConst;
import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.heaven.util.guava.Guavas;
import com.github.houbb.heaven.util.lang.NumUtil;
import com.github.houbb.rpc.client.config.reference.ReferenceConfig;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.client.core.context.impl.DefaultRpcClientContext;
import com.github.houbb.rpc.client.handler.RpcClientHandler;
import com.github.houbb.rpc.client.invoke.InvokeService;
import com.github.houbb.rpc.client.invoke.impl.DefaultInvokeService;
import com.github.houbb.rpc.client.proxy.ReferenceProxy;
import com.github.houbb.rpc.client.proxy.context.ProxyContext;
import com.github.houbb.rpc.client.proxy.context.impl.DefaultProxyContext;
import com.github.houbb.rpc.common.config.component.RpcAddress;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import java.util.List;
/**
* 引用配置类默认实现
*
* @author binbin.hou
* @since 0.0.6
* @param <T> 接口泛型
*/
public class DefaultReferenceConfig<T> implements ReferenceConfig<T> {
/**
* 服务唯一标识
* @since 0.0.6
*/
private String serviceId;
/**
* 服务接口
* @since 0.0.6
*/
private Class<T> serviceInterface;
/**
* 服务地址信息
* (1)如果不为空,则直接根据地址获取
* (2)如果为空,则采用自动发现的方式
*
* TODO: 这里调整为 set 更加合理。
*
* 如果为 subscribe 可以自动发现,然后填充这个字段信息。
* @since 0.0.6
*/
private List<RpcAddress> rpcAddresses;
/**
* 用于写入信息
* (1)client 连接 server 端的 channel future
* (2)后期进行 Load-balance 路由等操作。可以放在这里执行。
* @since 0.0.6
*/
private List<ChannelFuture> channelFutures;
/**
* 客户端处理信息
* @since 0.0.6
*/
@Deprecated
private RpcClientHandler channelHandler;
/**
* 调用服务管理类
* @since 0.0.6
*/
private InvokeService invokeService;
public DefaultReferenceConfig() {
// 初始化信息
this.rpcAddresses = Guavas.newArrayList();
this.channelFutures = Guavas.newArrayList();
this.invokeService = new DefaultInvokeService();
}
@Override
public String serviceId() {
return serviceId;
}
@Override
public DefaultReferenceConfig<T> serviceId(String serviceId) {
this.serviceId = serviceId;
return this;
}
@Override
public Class<T> serviceInterface() {
return serviceInterface;
}
@Override
public DefaultReferenceConfig<T> serviceInterface(Class<T> serviceInterface) {
this.serviceInterface = serviceInterface;
return this;
}
@Override
public ReferenceConfig<T> addresses(String addresses) {
ArgUtil.notEmpty(addresses, "addresses");
String[] addressArray = addresses.split(PunctuationConst.COMMA);
ArgUtil.notEmpty(addressArray, "addresses");
for(String address : addressArray) {
String[] addressSplits = address.split(PunctuationConst.COLON);
if(addressSplits.length < 2) {
throw new IllegalArgumentException("Address must be has ip and port, like 127.0.0.1:9527");
}
String ip = addressSplits[0];
int port = NumUtil.toIntegerThrows(addressSplits[1]);
// 包含权重信息
int weight = 1;
if(addressSplits.length >= 3) {
weight = NumUtil.toInteger(addressSplits[2], 1);
}
RpcAddress rpcAddress = new RpcAddress(ip, port, weight);
this.rpcAddresses.add(rpcAddress);
}
return this;
}
/**
* 获取对应的引用实现
* (1)处理所有的反射代理信息-方法可以抽离,启动各自独立即可。
* (2)启动对应的长连接
* @return 引用代理类
* @since 0.0.6
*/
@Override
public T reference() {
// 1. 启动 client 端到 server 端的连接信息
// 1.1 为了提升性能,可以将所有的 client=>server 的连接都调整为一个 thread。
// 1.2 初期为了简单,直接使用同步循环的方式。
// 创建 handler
// 循环连接
for(RpcAddress rpcAddress : rpcAddresses) {
final ChannelHandler channelHandler = new RpcClientHandler(invokeService);
final DefaultRpcClientContext context = new DefaultRpcClientContext();
context.address(rpcAddress.address()).port(rpcAddress.port()).channelHandler(channelHandler);
ChannelFuture channelFuture = new RpcClient(context).connect();
// 循环同步等待
// 如果出现异常,直接中断?捕获异常继续进行??
channelFutures.add(channelFuture);
}
// 2. 接口动态代理
ProxyContext<T> proxyContext = buildReferenceProxyContext();
return ReferenceProxy.newProxyInstance(proxyContext);
}
/**
* 构建调用上下文
* @return 引用代理上下文
* @since 0.0.6
*/
private ProxyContext<T> buildReferenceProxyContext() {
DefaultProxyContext<T> proxyContext = new DefaultProxyContext<>();
proxyContext.serviceId(this.serviceId);
proxyContext.serviceInterface(this.serviceInterface);
proxyContext.channelFutures(this.channelFutures);
proxyContext.invokeService(this.invokeService);
return proxyContext;
}
}
这里主要根据指定的服务端信息,初始化对应的代理实现。
这里还可以拓展指定权重,便于后期负载均衡拓展,本期暂时不做实现。
ReferenceProxy
说明
所有的 rpc 调用,客户端只有服务端的接口。
那么,怎么才能和调用本地方法一样调用远程方法呢?
答案就是动态代理。 (编辑:威海站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |