1、配景
最近在搜索Netty和Zookeeper方面的文章时,看到了这篇文章《轻量级漫衍式 RPC 框架》,作者用Zookeeper、Netty和Spring写了一个轻量级的漫衍式RPC框架。花了一些时间看了下他的代码,写的清洁简朴,写的RPC框架可以算是一个浅易版的dubbo。这个RPC框架虽小,可是麻雀虽小,五脏俱全,有乐趣的可以进修一下。
本人在这个浅易版的RPC上添加了如下特性:
项目地点:https://github.com/luxiaoxun/NettyRpc
2、简介
RPC,即 Remote Procedure Call(长途进程挪用),挪用长途计较机上的处事,就像挪用当地处事一样。RPC可以很好的解耦系统,如WebService就是一种基于Http协议的RPC。
这个RPC整体框架如下:
这个RPC框架利用的一些技能所办理的问题:
处事宣布与订阅:处事端利用Zookeeper注册处事地点,劳务派遣管理系统,客户端从Zookeeper获取可用的处事地点。
通信:利用Netty作为通信框架。
Spring:利用Spring设置处事,加载Bean,扫描注解。
动态署理:客户端利用署理模式透明化处事挪用。
动静编解码:利用Protostuff序列化和反序列化动静。
3、处事端宣布处事
利用注解标注要宣布的处事
处事注解
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Component public @interface RpcService { Class<?> value(); }
一个处事接口:
public interface HelloService { String hello(String name); String hello(Person person); }
一个处事实现:利用注解标注
@RpcService(HelloService.class) public class HelloServiceImpl implements HelloService { @Override public String hello(String name) { return "Hello! " + name; } @Override public String hello(Person person) { return "Hello! " + person.getFirstName() + " " + person.getLastName(); } }
处事在启动的时候扫描获得所有的处事接口及其实现:
@Override public void setApplicationContext(ApplicationContext ctx) throws BeansException { Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class); if (MapUtils.isNotEmpty(serviceBeanMap)) { for (Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName(); handlerMap.put(interfaceName, serviceBean); } } }
在Zookeeper集群上注册处事地点:
public class ServiceRegistry { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class); private CountDownLatch latch = new CountDownLatch(1); private String registryAddress; public ServiceRegistry(String registryAddress) { this.registryAddress = registryAddress; } public void register(String data) { if (data != null) { ZooKeeper zk = connectServer(); if (zk != null) { AddRootNode(zk); // Add root node if not exist createNode(zk, data); } } } private ZooKeeper connectServer() { ZooKeeper zk = null; try { zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); } } }); latch.await(); } catch (IOException e) { LOGGER.error("", e); } catch (InterruptedException ex){ LOGGER.error("", ex); } return zk; } private void AddRootNode(ZooKeeper zk){ try { Stat s = zk.exists(Constant.ZK_REGISTRY_PATH, false); if (s == null) { zk.create(Constant.ZK_REGISTRY_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { LOGGER.error(e.toString()); } catch (InterruptedException e) { LOGGER.error(e.toString()); } } private void createNode(ZooKeeper zk, String data) { try { byte[] bytes = data.getBytes(); String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); LOGGER.debug("create zookeeper node ({} => {})", path, data); } catch (KeeperException e) { LOGGER.error("", e); } catch (InterruptedException ex){ LOGGER.error("", ex); } } } ServiceRegistry