Netty基于流的传输处理

TCP/IP的基于流的传输中,接收的数据被存储到套接字接收缓冲器中。不幸的是,基于流的传输的缓冲器不是分组的队列,而是字节的队列。 这意味着,即使将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一组字节(有点悲剧)。 因此,不能保证读的是您在远程定入的行数据。 例如,假设操作系统的TCP/IP堆栈已收到三个数据包:

由于基于流的协议的这种通用属性,在应用程序中以下面的碎片形式(只是其中的一种)读取它们的机会很高:

因此,接收部分,无论是服务器侧还是客户端侧,都应该将接收到的数据碎片整理成逻辑可由应用容易地理解的一个或多个有意义的帧。 在上述示例的情况下,接收的数据应该如下成帧:

针对上面的问题,下面列出了两个解决方案。

第一个解决方案

现在我们回到TIME客户端示例。在这里有同样的问题。 32位整数可以算是非常少量的数据量了,并且不可能经常被分段。 然而,问题是它可以分割,并且碎片的可能性将随着流量增加而增加。

简单的解决方案是创建一个内部累积缓冲区,并等待所有4个字节被接收到内部缓冲区。 以下是修正的TimeClientHandler实现,它修复了问题:

package cn.netty.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();

        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}


  1. ChannelHandler有两个生命周期侦听器方法:handlerAdded()handlerRemoved()。 只要不会阻塞很长时间,您就可以执行任意初始化任务。

  2. 首先,所有接收到的数据应累加到buf中。

  3. 然后,处理程序必须检查buf是否有足够的数据(在此示例中为4个字节),当足够时就继续进行实际的业务逻辑。否则,在有更多数据到达时Netty将再次调用channelRead()方法,最终累积到达4个字节再执行实际的业务。

第二个解决方案

虽然第一个解决方案已经解决了TIME客户端的问题,但修改的处理程序看起来不那么干净。想象如果一个更复杂的协议,它由多个字段组成,例如:可变长度字段等。上面的ChannelInboundHandler实现很快就无法维护了。

可能已经注意到,可以向ChannelPipeline添加多个ChannelHandler,因此,可将一个单片的ChannelHandler拆分为多个模块,以降低应用程序的复杂性。 例如,可将TimeClientHandler拆分为两个处理程序:

  • TimeDecoder处理碎片问题

  • TimeClientHandler的初始简单版本

幸运的是,Netty提供了一个可扩展类,可以帮助我们方便地编写:

package cn.netty.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }

        out.add(in.readBytes(4)); // (4)
    }
}


  1. ByteToMessageDecoderChannelInboundHandler的一个实现,它使得处理碎片问题变得容易。

  2. ByteToMessageDecoder在接收到新数据时,使用内部维护的累积缓冲区调用decode()方法。

  3. decode()可以决定在累积缓冲区中没有足够数据的情况下不添加任何东西。 当接收到更多数据时,ByteToMessageDecoder将再次调用decode()

  4. 如果decode()将对象添加到out,则意味着解码器成功地解码了消息。 ByteToMessageDecoder将丢弃累积缓冲区的读取部分。要记住,不需要解码多个消息。 ByteToMessageDecoder将继续调用decode()方法,直到它没有再有任何东西添加。

现在我们有另一个处理程序插入ChannelPipeline,应该在TimeClient中修改ChannelInitializer实现:

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});


如果您喜欢折腾,也可以想尝试使用ReplayDecoder,这简化了解码器更多的工作。但需要参考API参考以获得更多信息。

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }}


此外,Netty提供了现成的解码器,使我们能够非常容易地实现大多数的协议,并帮助您避免使用一个单一的不可维护的处理程序实现。有关更多详细示例,请参阅以下示例:

二进制协议实现: Netty实践-factorial服务器
基于文本行的协议实现: Netty实践-telnet服务器


2019-04-20 15:46

相关知识点

开源项目

知识点

相关教程

更多

[Netty 1] 初识Netty

1. 简介 最早接触netty是在阅读Zookeeper源码的时候,后来看到Storm的消息传输层也由ZMQ转为Netty,所以决心好好来研究和学习一下netty这个框架。 Netty项目地址:http://netty.io/index.html Github项目: https://github.com/netty/netty Netty是一个异步的、事件驱动的网络应用框架,基于它能够快速开发高性

关于流的问题!!

请大家给我说下这个sanner类怎么弄呀?听老师说过一回,后来又忘了。  能用具体例子说明更好,先谢了

Ajax 异步传输

function getData(){  var url = "Show.do?dh=1";  createXmlHttpReq();  xmlHttp.onreadystatechange = function(){   if(xmlHttp.readyState == 4 && xmlHttp.status == 200){    var resp = xm

Netty环境配置

netty是一个java事件驱动的网络通信框架,也就是一个jar包,只要在项目里引用即可。

Netty开发环境配置

最新版本的Netty 4.x和JDK 1.6及更高版本

Netty源码分析

Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序[官方定义],整体来看其包含了以下内容:1.提供了丰富的协议编解码支持,2.实现自有的buffer系统,减少复制所带来的消耗,3.整套channel的实现,4.基于事件的过程流转以及完整的网络事件响应与扩展,5.丰富的example。本文并不对Netty实际使用中可能出现的问题做分析,只是从

电子邮件的传输过程

本节在命令行窗口,通过用新浪邮箱给163邮箱发送邮件,模拟电子邮件的传输过程​。

storm学习之Netty代替ZMQ

整理自 http://www.csdn.net/article/2014-08-04/2821018/2 消息队列现在是模块之间通信的非常通用的解决方案了。消息队列使得进程间的通信可以跨越物理机,这对于分布式系统尤为重要,毕竟我们不能假定进程究竟是部署在同一台物理机上还是部署到不同的物理机上。RabbitMQ是应用比较广泛的MQ 提到MQ,不得不提的是ZeroMQ。ZeroMQ封装了Socket,

HDFS Datanode与Client之间的数据传输

在HDFS之中,Datanode与Namenode之间是通过RPC进行通信的;在Datanode和Client之间的通信通过两种方式来完成,一种是通过RPC(主要有三个方法recoverBlock、getBlockInfo、getBlockLocalPathInfo),另外一种方式是通过普通的socket。Client与Datanode之间的数据传输就是通过普通的socket来进行传输的。 在类D

工作流系统

科学工作流相比较社会商业流还是简单一些的,现在好多软件都在提供企业的管理流系统,比较不错。 松散耦合有好处,但是也有性能不好的,无用操作多的坏处。 kepler系统等跟storm又有什么区别,可重复性,著作权问题,难道商业流不需要这些? http://hi.baidu.com/qiubaiwei/blog/item/5c10b7d29d85f5d7a8ec9a19.html 科学计算流和商业业务流

HDFS的文件操作流

大家可能对本地文件系统中的文件I/O流已经是非常的熟悉了,那么,像HDFS这种分布式的文件I/O流——基于网络   I/O流的数据流,又是如何实现的呢?这就是本文的重点之一:HDFS的文件写入流。    熟悉HDFS的人可能知道,当我们调用DistributedFileSystem的create方法时,将会返回一个FSDataOutputStream对象,通过这个对象来对文件进行数据的写入。还是贴

关于java io流关闭的问题

RT,我想问的是,如果不关闭流,会出现什么样不同的情况?  希望大家能多多的给出不同的答案,很想知道!  我个人的分数真的很少,所以不好意思。但是我希望达人们 能一起交流交流  问题补充:  达人们很多啊。看到答案,真的很感谢!

在Twitter,Netty 4 GC开销降为五分之一

原文:http://www.infoq.com/cn/news/2013/11/netty4-twitter/  Netty项目在7月份发布了Netty 4的第一个版本,其性能的显著提升主要来源于垃圾收集开销的降低。在Twitter,Netty 4经过完善已经获得了5倍的性能提升,但也有一些代价。  Netty项目创始人和Twitter软件工程师Trustin Lee从2003年开始就一直编写网络

Netty入门实例-时间服务器

Netty中服务器和客户端之间最大的和唯一的区别是使用了不同的Bootstrap和Channel实现

Netty入门实例-使用POJO代替ByteBuf

使用TIME协议的客户端和服务器示例,让它们使用POJO来代替原来的ByteBuf。

最新教程

更多

java线程状态详解(6种)

java线程类为:java.lang.Thread,其实现java.lang.Runnable接口。 线程在运行过程中有6种状态,分别如下: NEW:初始状态,线程被构建,但是还没有调用start()方法 RUNNABLE:运行状态,Java线程将操作系统中的就绪和运行两种状态统称为“运行状态” BLOCK:阻塞状态,表示线程阻塞

redis从库只读设置-redis集群管理

默认情况下redis数据库充当slave角色时是只读的不能进行写操作,如果写入,会提示以下错误:READONLY You can't write against a read only slave.  127.0.0.1:6382> set k3 111  (error) READONLY You can't write against a read only slave. 如果你要开启从库

Netty环境配置

netty是一个java事件驱动的网络通信框架,也就是一个jar包,只要在项目里引用即可。

Netty入门实例-使用POJO代替ByteBuf

使用TIME协议的客户端和服务器示例,让它们使用POJO来代替原来的ByteBuf。

Netty入门实例-时间服务器

Netty中服务器和客户端之间最大的和唯一的区别是使用了不同的Bootstrap和Channel实现

Netty入门实例-编写服务器端程序

channelRead()处理程序方法实现如下

Netty开发环境配置

最新版本的Netty 4.x和JDK 1.6及更高版本

电商平台数据库设计

电商平台数据库表设计:商品分类表、商品信息表、品牌表、商品属性表、商品属性扩展表、规格表、规格扩展表

HttpClient 上传文件

我们使用MultipartEntityBuilder创建一个HttpEntity。 当创建构建器时,添加一个二进制体 - 包含将要上传的文件以及一个文本正文。 接下来,使用RequestBuilder创建一个HTTP请求,并分配先前创建的HttpEntity。

MongoDB常用命令

查看当前使用的数据库    > db    test  切换数据库   > use foobar    switched to db foobar  插入文档    > post={"title":"领悟书生","content":"这是一个分享教程的网站","date":new

快速了解MongoDB【基本概念与体系结构】

什么是MongoDB MongoDB is a general purpose, document-based, distributed database built for modern application developers and for the cloud era. MongoDB是一个基于分布式文件存储的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。

windows系统安装MongoDB

安装 下载MongoDB的安装包:mongodb-win32-x86_64-2008plus-ssl-3.2.10-signed.msi,按照提示步骤安装即可。 安装完成后,软件会安装在C:\Program Files\MongoDB 目录中 我们要启动的服务程序就是C:\Program Files\MongoDB\Server\3.2\bin目录下的mongod.exe,为了方便我们每次启动,我

Spring boot整合MyBatis-Plus 之二:增删改查

基于上一篇springboot整合MyBatis-Plus之后,实现简单的增删改查 创建实体类 添加表注解TableName和主键注解TableId import com.baomidou.mybatisplus.annotations.TableId;
import com.baomidou.mybatisplus.annotations.TableName;
import com.baom

分布式ID生成器【snowflake雪花算法】

基于snowflake雪花算法分布式ID生成器 snowflake雪花算法分布式ID生成器几大特点: 41bit的时间戳可以支持该算法使用到2082年 10bit的工作机器id可以支持1024台机器 序列号支持1毫秒产生4096个自增序列id 整体上按照时间自增排序 整个分布式系统内不会产生ID碰撞 每秒能够产生26万ID左右 Twitter的 Snowflake分布式ID生成器的JAVA实现方案

Spring boot整合mybatis plus

快速了解mybatis plus 是对Mybatis框架的二次封装和扩展 纯正血统:完全继承原生 Mybatis 的所有特性 最少依赖:仅仅依赖Mybatis以及Mybatis-Spring 性能损耗小:启动即会自动注入基本CURD ,性能无损耗,直接面向对象操作 自动热加载:Mapper对应的xml可以热加载,大大减少重启Web服务器时间,提升开发效率 性能分析:自带Sql性能分析插件,开发测试