欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 宝鼎售后问题提交 | 后台管理


新闻资讯

MENU

软件开发知识

则触发建立 昆山软件开发 channel的逻辑

点击: 次  来源:宝鼎软件 时间:2017-09-25

原文出处: Sam哥哥

概述

最近有许多网友在咨询netty client中,netty的channel毗连池应该如何设计。这是个稍微有些巨大的主题,牵扯到蛮多技能点,要想在网上找到相关的又相对完整的参考文章,确实不太容易。

在本篇文章中,会给出个中一种办理方案,而且附带完整的可运行的代码。假如网友有更好的方案,可以回覆本文,我们一起接头接头,一起开阔思路和眼界。

阅读本文之前需要具备一些基本常识

1、知道netty的一些基本常识,好比ByteBuf类的相关api;
2、知道netty的执行流程;
3、 必需阅读过我之前写的netty实战-自界说解码器处理惩罚半包动静,因为本文部门代码来自这篇文章。

此刻微处事很是的热门,也有许多公司在用。微处事框架中,假如是利用thrift、grpc来作为数据序列化框架的话,凡是城市生成一个SDK给客户端用户利用。客户端只要利用这个SDK,就可以利便的挪用处事端的微处事接口。本文接头的就是利用SDK的netty客户端,它的netty channel毗连池的设计方案。至于netty http client的channel毗连池设计,基于http的,是别的一个主题了,需要别的写文章来接头的。

netty channel毗连池设计

DB毗连池中,当某个线程获取到一个db connection后,在读取数据可能写数据时,假如线程没有操纵完,这个db connection一直被该线程独有着,直到线程执行完任务。假如netty client的channel毗连池设计也是利用这种独有的方法的话,有几个问题。

1、netty中channel的writeAndFlush要领,挪用完后是不消期待返回功效的,writeAndFlush一被挪用,顿时返回。对付这种环境,是完全没须要让线程独有一个channel的。
2、利用雷同DB pool的方法,从池子里拿毗连,用完后返回,这里的一进一出,需要思量并发锁的问题。别的,假如请求量很大的时候,毗连会不足用,其他线程也只能期待其他线程释放毗连。

因此不太发起利用上面的方法来设计netty channel毗连池,channel独有的价钱太大了。可以利用Channel数组的形式, 复用netty的channel。当线程要需要Channel的时候,随机从数组选中一个Channel,假如Channel还未成立,则建设一个。假如线程选中的Channel已经成立了,则复用这个Channel。

则触发成立 昆山软件开拓 channel的逻辑

假设channel数组的长度为4

private Channel[] channels = new Channel[4];

当外部系统请求client的时候,client从channels数组中随机挑选一个channel,假如该channel尚未成立,则触发成立channel的逻辑。无论有几多请求,都是复用这4个channel。假设有10个线程,那么部门线程大概会利用沟通的channel来发送数据和吸收数据。因为是随机选择一个channel的,多个线程掷中同一个channel的机率照旧很大的。如下图

则触发成立 昆山软件开拓 channel的逻辑

10个线程中,大概有3个线程都是利用channel2来发送数据的。这个会引入别的一个问题。thread1通过channel2发送一条动静msg1随处事端,thread2也通过channel2发送一条动静msg2随处事端,当处事端处理惩罚完数据,通过channel2返回数据给客户端的时候,如何区分哪条动静是哪个线程的呢?假如不做区分,万一thread1拿到的功效其实是thread2要的功效,怎么办?

那么如何做到让thread1和thread2拿到它们本身想要的功效呢?

之前我在netty实战-自界说解码器处理惩罚半包动静一文中提到,自界说动静的时候,凡是会在动静中插手一个序列号,用来独一标识动静的。当thread1发送动静时,往动静中插入一个独一的动静序列号,同时为thread1成立一个callback回调措施,当处事端返回动静的时候,按照动静中的序列号从对应的callback措施获取功效。这样就可以办理上面说到的问题。

动静名目

则触发成立 昆山软件开拓 channel的逻辑

动静、动静seq以及callback对应干系

则触发成立 昆山软件开拓 channel的逻辑

则触发成立 昆山软件开拓 channel的逻辑

OK,下面就基于上面的设计来举办编码。

代码

先来实现netty客户端,配置10个线程并发获取channel,为了到达真正的并发,操作CountDownLatch来做开关,同时channel毗连池配置4个channel。

package nettyinaction.nettyclient.channelpool.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import nettyinaction.nettyclient.channelpool.ChannelUtils;
import nettyinaction.nettyclient.channelpool.IntegerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

public class SocketClient {
    public static void main(String[] args) throws InterruptedException {
        //当所有线程都筹备后,开闸,让所有线程并发的去获取netty的channel
        final CountDownLatch countDownLatchBegin = new CountDownLatch(1);

        //当所有线程都执行完任务后,释放主线程,让主线程继承执行下去
        final CountDownLatch countDownLatchEnd = new CountDownLatch(10);

        //netty channel池
        final NettyChannelPool nettyChannelPool = new NettyChannelPool();

        final Map<String, String> resultsMap = new HashMap<>();
        //利用10个线程,并发的去获取netty channel
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        //先让线程block住
                        countDownLatchBegin.await();

                        Channel channel = null;
                        try {
                            channel = nettyChannelPool.syncGetChannel();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                        //为每个线程成立一个callback,当动静返回的时候,在callback中获取功效
                        CallbackService callbackService = new CallbackService();
                        //给动静分派一个独一的动静序列号
                        int seq = IntegerFactory.getInstance().incrementAndGet();
                        //操作Channel的attr要领,成立动静与callback的对应干系
                        ChannelUtils.putCallback2DataMap(channel,seq,callbackService);

                        synchronized (callbackService) {
                            UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
                            ByteBuf buffer = allocator.buffer(20);
                            buffer.writeInt(ChannelUtils.MESSAGE_LENGTH);

                            buffer.writeInt(seq);
                            String threadName = Thread.currentThread().getName();
                            buffer.writeBytes(threadName.getBytes());
                            buffer.writeBytes("body".getBytes());

                            //给netty 处事端发送动静,异步的,该要了解立即返回
                            channel.writeAndFlush(buffer);

                            //期待返回功效
                            callbackService.wait();

                            //理会功效,这个result在callback中已经理会到了。
                            ByteBuf result = callbackService.result;
                            int length = result.readInt();
                            int seqFromServer = result.readInt();

                            byte[] head = new byte[8];
                            result.readBytes(head);
                            String headString = new String(head);

                            byte[] body = new byte[4];
                            result.readBytes(body);
                            String bodyString = new String(body);
                            resultsMap.put(threadName, seqFromServer + headString + bodyString);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    finally {
                        countDownLatchEnd.countDown();
                    }
                }
            }).start();
        }

        //开闸,让10个线程并发获取netty channel
        countDownLatchBegin.countDown();

        //等10个线程执行完后,打印最终功效
        countDownLatchEnd.await();
        System.out.println("resultMap="+resultsMap);
    }

    public static class CallbackService{
        public volatile ByteBuf result;
        public void receiveMessage(ByteBuf receiveBuf) throws Exception {
            synchronized (this) {
                result = receiveBuf;
                this.notify();
            }
        }
    }
}