概述
最近有许多网友在咨询netty client中,netty的channel毗连池应该如何设计。这是个稍微有些巨大的主题,牵扯到蛮多技能点,要想在网上找到相关的又相对完整的参考文章,确实不太容易。
在本篇文章中,会给出个中一种办理方案,而且附带完整的可运行的代码。假如网友有更好的方案,可以回覆本文,我们一起接头接头,一起开阔思路和眼界。
阅读本文之前需要具备一些基本常识
1、知道netty的一些基本常识,好比ByteBuf类的相关api;
2、知道netty的执行流程;
3、 必需阅读过我之前写的netty实战-自界说解码器处理惩罚半包动静,因为本文部门代码来自这篇文章。
此刻微处事很是的热门,也有许多公司在用。微处事框架中,假如是利用thrift、grpc来作为数据序列化框架的话,凡是城市生成一个SDK给客户端用户利用。客户端只要利用这个SDK,就可以利便的挪用处事端的微处事接口。本文接头的就是利用SDK的netty客户端,它的netty channel毗连池的设计方案。至于netty http client的channel毗连池设计,基于http的,是别的一个主题了,需要别的写文章来接头的。
netty channel毗连池设计
DB毗连池中,当某个线程获取到一个db connection后,在读取数据可能写数据时,假如线程没有操纵完,这个db connection一直被该线程独有着,直到线程执行完任务。假如netty client的channel毗连池设计也是利用这种独有的方法的话,有几个问题。
1、netty中channel的writeAndFlush要领,挪用完后是不消期待返回功效的,writeAndFlush一被挪用,顿时返回。对付这种环境,是完全没须要让线程独有一个channel的。
2、利用雷同DB pool的方法,从池子里拿毗连,用完后返回,这里的一进一出,需要思量并发锁的问题。别的,假如请求量很大的时候,毗连会不足用,其他线程也只能期待其他线程释放毗连。
因此不太发起利用上面的方法来设计netty channel毗连池,channel独有的价钱太大了。可以利用Channel数组的形式, 复用netty的channel。当线程要需要Channel的时候,随机从数组选中一个Channel,假如Channel还未成立,则建设一个。假如线程选中的Channel已经成立了,则复用这个Channel。
假设channel数组的长度为4
private Channel[] channels = new Channel[4];
当外部系统请求client的时候,client从channels数组中随机挑选一个channel,假如该channel尚未成立,则触发成立channel的逻辑。无论有几多请求,都是复用这4个channel。假设有10个线程,那么部门线程大概会利用沟通的channel来发送数据和吸收数据。因为是随机选择一个channel的,多个线程掷中同一个channel的机率照旧很大的。如下图
10个线程中,大概有3个线程都是利用channel2来发送数据的。这个会引入别的一个问题。thread1通过channel2发送一条动静msg1随处事端,thread2也通过channel2发送一条动静msg2随处事端,当处事端处理惩罚完数据,通过channel2返回数据给客户端的时候,如何区分哪条动静是哪个线程的呢?假如不做区分,万一thread1拿到的功效其实是thread2要的功效,怎么办?
那么如何做到让thread1和thread2拿到它们本身想要的功效呢?
之前我在netty实战-自界说解码器处理惩罚半包动静一文中提到,自界说动静的时候,凡是会在动静中插手一个序列号,用来独一标识动静的。当thread1发送动静时,往动静中插入一个独一的动静序列号,同时为thread1成立一个callback回调措施,当处事端返回动静的时候,按照动静中的序列号从对应的callback措施获取功效。这样就可以办理上面说到的问题。
动静名目
动静、动静seq以及callback对应干系
OK,下面就基于上面的设计来举办编码。
代码
先来实现netty客户端,配置10个线程并发获取channel,为了到达真正的并发,操作CountDownLatch来做开关,同时channel毗连池配置4个channel。
package nettyinaction.nettyclient.channelpool.client; import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import nettyinaction.nettyclient.channelpool.ChannelUtils; import nettyinaction.nettyclient.channelpool.IntegerFactory; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; public class SocketClient { public static void main(String[] args) throws InterruptedException { //当所有线程都筹备后,开闸,让所有线程并发的去获取netty的channel final CountDownLatch countDownLatchBegin = new CountDownLatch(1); //当所有线程都执行完任务后,释放主线程,让主线程继承执行下去 final CountDownLatch countDownLatchEnd = new CountDownLatch(10); //netty channel池 final NettyChannelPool nettyChannelPool = new NettyChannelPool(); final Map<String, String> resultsMap = new HashMap<>(); //利用10个线程,并发的去获取netty channel for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { try { //先让线程block住 countDownLatchBegin.await(); Channel channel = null; try { channel = nettyChannelPool.syncGetChannel(); } catch (InterruptedException e) { e.printStackTrace(); } //为每个线程成立一个callback,当动静返回的时候,在callback中获取功效 CallbackService callbackService = new CallbackService(); //给动静分派一个独一的动静序列号 int seq = IntegerFactory.getInstance().incrementAndGet(); //操作Channel的attr要领,成立动静与callback的对应干系 ChannelUtils.putCallback2DataMap(channel,seq,callbackService); synchronized (callbackService) { UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false); ByteBuf buffer = allocator.buffer(20); buffer.writeInt(ChannelUtils.MESSAGE_LENGTH); buffer.writeInt(seq); String threadName = Thread.currentThread().getName(); buffer.writeBytes(threadName.getBytes()); buffer.writeBytes("body".getBytes()); //给netty 处事端发送动静,异步的,该要了解立即返回 channel.writeAndFlush(buffer); //期待返回功效 callbackService.wait(); //理会功效,这个result在callback中已经理会到了。 ByteBuf result = callbackService.result; int length = result.readInt(); int seqFromServer = result.readInt(); byte[] head = new byte[8]; result.readBytes(head); String headString = new String(head); byte[] body = new byte[4]; result.readBytes(body); String bodyString = new String(body); resultsMap.put(threadName, seqFromServer + headString + bodyString); } } catch (Exception e) { e.printStackTrace(); } finally { countDownLatchEnd.countDown(); } } }).start(); } //开闸,让10个线程并发获取netty channel countDownLatchBegin.countDown(); //等10个线程执行完后,打印最终功效 countDownLatchEnd.await(); System.out.println("resultMap="+resultsMap); } public static class CallbackService{ public volatile ByteBuf result; public void receiveMessage(ByteBuf receiveBuf) throws Exception { synchronized (this) { result = receiveBuf; this.notify(); } } } }