• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Netty处理器的小和Netty大文件传输方法思路

武飞扬头像
while_black
帮助1

  1. Netty处理器的小技巧

使用一个解析处理器,对上传的请求进行解析,对特定的请求进行解析,再在pipeline后面加上指定的handler理器

    1.  
      ((FullHttpRequest) msg).release();
    2.  
       
    3.  
      ctx.fireChannelRead(msg);
    4.  
       
    5.  
      ctx.channel().pipeline().remove(this);
    6.  
       
    7.  
      pipeline.addAfter(new Process());
    1.  
      @Sharable
    2.  
      public class ParseRequestHandler extends ChannelInboundHandlerAdapter {
  1. 大文件传输方法的思路

大文件传输我这里在原来的Netty文件传输之上进行了修改,本质原因是文件太多,不能一次传完,需要分多次进行传输。

  • 使用了ChunkedWriteHandler,对文件信息进行了分包处理,使得文件传输压力减小,其次保证了文件的安全性

  • 对大文件进行分片读取,读取生成一个指定的小文件,再发送小文件。(实现了大文件向小文件的过渡)最后再删除小文件即可

  • 客户端使用了序号进行区分信息,被一个小文件都带有一个序号,用于区分文件地址。客户端每次发送一个序号,服务器返回一个指定位置的指定长度的小文件,客户端再对序号加一,请求服务器,直到服务器完全传输完毕。

  1.  
    package ProcessRequest;
  2.  
     
  3.  
     
  4.  
    import java.io.IOException;
  5.  
    import java.io.RandomAccessFile;
  6.  
    import java.net.URLDecoder;
  7.  
    import java.net.URLEncoder;
  8.  
     
  9.  
    import org.apache.log4j.Logger;
  10.  
    import Utility.ExecutorGroup;
  11.  
    import Utility.FileHandlerFunction;
  12.  
    import Utility.HTTPDefineConstant;
  13.  
    import Utility.HTTPResponse;
  14.  
    import io.netty.channel.ChannelFuture;
  15.  
    import io.netty.channel.ChannelHandlerContext;
  16.  
    import io.netty.channel.ChannelInboundHandlerAdapter;
  17.  
    import io.netty.channel.ChannelProgressiveFuture;
  18.  
    import io.netty.channel.ChannelProgressiveFutureListener;
  19.  
    import io.netty.handler.codec.http.FullHttpRequest;
  20.  
    import io.netty.handler.codec.http.HttpChunkedInput;
  21.  
    import io.netty.handler.codec.http.HttpResponse;
  22.  
    import io.netty.handler.codec.http.HttpResponseStatus;
  23.  
    import io.netty.handler.stream.ChunkedFile;
  24.  
    import io.netty.handler.stream.ChunkedWriteHandler;
  25.  
    import io.netty.util.concurrent.EventExecutorGroup;
  26.  
     
  27.  
    public class DownloadOneHugeFileProcess extends ChannelInboundHandlerAdapter{
  28.  
    // 日志
  29.  
    private static final Logger logger = Logger.getLogger(DownloadOneHugeFileProcess.class);
  30.  
     
  31.  
    private String userID = "";
  32.  
     
  33.  
    private ChannelHandlerContext ctx = null;
  34.  
     
  35.  
    // 执行耗时操作的线程池
  36.  
    private EventExecutorGroup group = ExecutorGroup.getInstance().getExecutorGroup();
  37.  
     
  38.  
    private String fileName;
  39.  
     
  40.  
    //客户端发送的需要返回的文件里面序号段,这里序号段从0开始传输文件
  41.  
    private long sequenceNumber = 0;
  42.  
     
  43.  
    private int dataPackageNumber = 0;
  44.  
     
  45.  
    // 设置为每次传输5m大小的数据,实现单个文件速度的快递提高,让客户端能马上获得文件
  46.  
    // 让客户端马上能播放文件,实现一边播放一边下载
  47.  
    private final int chunkSize = 5*1024*1024;
  48.  
     
  49.  
    private RandomAccessFile downloadFile;
  50.  
     
  51.  
    @Override
  52.  
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  53.  
    this.ctx = ctx;
  54.  
    this.userID = "" ctx.channel().remoteAddress();
  55.  
    }
  56.  
     
  57.  
    @Override
  58.  
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
  59.  
    this.ctx = ctx;
  60.  
    this.userID = "" ctx.channel().remoteAddress();
  61.  
    }
  62.  
     
  63.  
    @Override
  64.  
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  65.  
    if (msg instanceof FullHttpRequest) {
  66.  
    FullHttpRequest request = (FullHttpRequest) msg;
  67.  
    // 获取请求方法名
  68.  
    String method = request.method().toString();
  69.  
    // 下载文件(2022-03-25)
  70.  
    if ("DownloadOneFileRequest".equals(method)) {
  71.  
    System.out.println("DownloadOneHugeFileRequest Process");
  72.  
     
  73.  
    /*
  74.  
    * 服务器根据客户端发送的数据序号,从文件中截取指定长度的文件发送回客户端
  75.  
    * */
  76.  
    fileName =URLDecoder.decode(request.headers().get("oneFileName"), "UTF-8");
  77.  
    System.out.println("下载的文件名" fileName);
  78.  
    sequenceNumber = Long.parseLong(request.headers().get("sequenceNumber"));
  79.  
    startSend(System.getProperty("user.dir") "/AudioAndVideo/" fileName, "668");
  80.  
     
  81.  
    ((FullHttpRequest) msg).release();
  82.  
    } else {
  83.  
    // 激活下一个Handler
  84.  
    ctx.fireChannelRead(msg);
  85.  
    ctx.channel().pipeline().remove(this);
  86.  
    }
  87.  
     
  88.  
    } else {
  89.  
    ctx.fireChannelRead(msg);
  90.  
    }
  91.  
    }
  92.  
     
  93.  
     
  94.  
     
  95.  
    public void startSend(String path, String statusCode) {
  96.  
    // 提交耗时任务给线程池
  97.  
    group.submit(new Runnable() {
  98.  
    @Override
  99.  
    public void run() {
  100.  
    try {
  101.  
     
  102.  
    /*
  103.  
    *downloadFile的内置开始位置在getContent()中设置
  104.  
    *这个文件在等全部分包完成再关闭吧
  105.  
    * */
  106.  
    downloadFile = new RandomAccessFile(path, "r");
  107.  
     
  108.  
    long fileLen = downloadFile.length();
  109.  
     
  110.  
    // 文件不为空且文件长度小于0
  111.  
    if (fileLen < 0 && downloadFile != null) {
  112.  
    logger.error("读取待批改文件遇到异常!");
  113.  
    downloadFile.close();
  114.  
    }
  115.  
    else {
  116.  
     
  117.  
    dataPackageNumber = (int) (fileLen/(chunkSize));
  118.  
    int tem = (int) (fileLen%(chunkSize) );
  119.  
    if(tem != 0) {
  120.  
    //有余数
  121.  
    dataPackageNumber ;
  122.  
    }
  123.  
    }
  124.  
    //生成一个临时文件
  125.  
    sendHugeFile(statusCode,sequenceNumber,createOnePartSmallFile());
  126.  
     
  127.  
    } catch (IOException e) {
  128.  
    logger.error(e);
  129.  
    e.printStackTrace();
  130.  
    }
  131.  
    }
  132.  
    });
  133.  
    }
  134.  
     
  135.  
    private void sendHugeFile( String statusCode ,long number, RandomAccessFile temFile) throws IOException {
  136.  
     
  137.  
    HttpResponseStatus status = null;
  138.  
    switch (statusCode) {
  139.  
    case "668":
  140.  
    status = HTTPDefineConstant.DOWNLOAD_ONEFILE_SUCCESS;
  141.  
    break;
  142.  
    case "669":
  143.  
    status = HTTPDefineConstant.DOWNLOAD_ONEFILE_FAIL;
  144.  
    break;
  145.  
    default:
  146.  
    break;
  147.  
    }
  148.  
     
  149.  
    if (null == ctx.pipeline().get(ChunkedWriteHandler.class)) {
  150.  
    ctx.pipeline().addBefore("downloadOneHugeFile", "chunked writer", new ChunkedWriteHandler());
  151.  
    }
  152.  
     
  153.  
    ChannelFuture sendFileFuture;
  154.  
    HttpResponse response;
  155.  
    if(number != dataPackageNumber-1)
  156.  
    {
  157.  
    response = HTTPResponse.downloadOneHugeFileResponse(status, temFile.length() , URLEncoder.encode(fileName, "UTF-8") , String.valueOf(number) , "NO");
  158.  
    ctx.write(response);
  159.  
    }else {
  160.  
    logger.info("sequenceNumber 文件包序号为:" number " 为最后一个文件包");
  161.  
    response = HTTPResponse.downloadOneHugeFileResponse(status, temFile.length() , URLEncoder.encode(fileName, "UTF-8") , String.valueOf(number) , "YES");
  162.  
    ctx.write(response);
  163.  
    }
  164.  
    sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(temFile,0, (long)temFile.length(),(int)temFile.length()) ), ctx.newProgressivePromise());
  165.  
     
  166.  
    sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
  167.  
    @Override
  168.  
    public void operationComplete(ChannelProgressiveFuture future) throws Exception {
  169.  
    if (future.isSuccess()) {
  170.  
    try {
  171.  
    temFile.close();
  172.  
    logger.info("downloadFile1文件关闭成功");
  173.  
    } catch (IOException e) {
  174.  
    logger.error("downloadFile文件关闭出现异常:" e);
  175.  
    }
  176.  
    // try {
  177.  
    // temFile.write(new byte[] {1, 2, 3});
  178.  
    //
  179.  
    // } catch (IOException e) {
  180.  
    // e.printStackTrace(); //如果流已经关闭,这里会报
  181.  
    // }
  182.  
    //进行删除文件的操作
  183.  
    String temFilePath = System.getProperty("user.dir") "/AudioAndVideo/" number fileName;
  184.  
    if(FileHandlerFunction.delete(temFilePath))
  185.  
    {
  186.  
    System.out.println("成功删除:" temFilePath);
  187.  
    }else {
  188.  
    System.out.println("删除失败:" temFilePath);
  189.  
     
  190.  
    }
  191.  
     
  192.  
    if(number == dataPackageNumber-1)
  193.  
    {
  194.  
    try {
  195.  
    downloadFile.close();
  196.  
    logger.info("最后一个downloadFile文件关闭成功");
  197.  
    } catch (IOException e) {
  198.  
    logger.error("downloadFile文件关闭出现异常:" e);
  199.  
    }
  200.  
     
  201.  
    logger.info("下载文件 :" temFile.getFD() " 传输完毕\n接收用户:" userID);
  202.  
    ctx.pipeline().remove("downloadOneHugeFile");
  203.  
    ctx.pipeline().remove("chunked writer");
  204.  
    }
  205.  
    }
  206.  
    else {//发送失败的情况,重新发送
  207.  
    sendHugeFile(statusCode,number,temFile);
  208.  
    }
  209.  
    }
  210.  
    @Override
  211.  
    public void operationProgressed(ChannelProgressiveFuture future, long progress, long total)
  212.  
    throws Exception {
  213.  
    }
  214.  
    });
  215.  
     
  216.  
     
  217.  
    }
  218.  
     
  219.  
     
  220.  
    @Override
  221.  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  222.  
    logger.error(cause);
  223.  
    }
  224.  
     
  225.  
    private RandomAccessFile createOnePartSmallFile() throws IOException
  226.  
    {
  227.  
    long fileLen = downloadFile.length();
  228.  
    // logger.info("客户端请求传输大文件 fileLen:" fileLen);
  229.  
    // logger.info("客户端请求传输大文件 dataPackageNumber:" dataPackageNumber);
  230.  
     
  231.  
    String temFilePath = System.getProperty("user.dir") "/AudioAndVideo/" sequenceNumber fileName;
  232.  
    RandomAccessFile temFile = null;
  233.  
    try {
  234.  
    temFile = new RandomAccessFile(temFilePath, "rw");
  235.  
    } catch (IOException e) {
  236.  
    logger.error("文件打开出现异常:" e);
  237.  
    }
  238.  
    temFile.seek(0);
  239.  
     
  240.  
    if(dataPackageNumber-1 != sequenceNumber )
  241.  
    {
  242.  
    logger.info("生成下载文件,文件序号为:" sequenceNumber);
  243.  
    temFile.write(getContent(sequenceNumber*chunkSize,chunkSize, downloadFile));
  244.  
    }
  245.  
    else {
  246.  
     
  247.  
    int tem1 = (int) (fileLen%(chunkSize) );
  248.  
    if(tem1 != 0) {
  249.  
    //有余数
  250.  
    logger.info("生成最后一个文件,有余数,文件序号为:" sequenceNumber);
  251.  
    long temChunkSize = fileLen-(sequenceNumber*chunkSize);
  252.  
    temFile.write(getContent(sequenceNumber*temChunkSize,temChunkSize, downloadFile));
  253.  
     
  254.  
    }else {
  255.  
    //无余数
  256.  
    logger.info("生成最后一个文件,无余数,文件序号为:" sequenceNumber);
  257.  
    temFile.write(getContent(sequenceNumber*chunkSize,chunkSize, downloadFile));
  258.  
     
  259.  
    }
  260.  
    }
  261.  
     
  262.  
    return temFile;
  263.  
    }
  264.  
     
  265.  
     
  266.  
    private byte[] getContent(long off , long len, RandomAccessFile downloadFile) throws IOException {
  267.  
    downloadFile.seek(off);
  268.  
    byte[] bytes = new byte[(int) len];
  269.  
    downloadFile.read(bytes,(int)0, (int)len);
  270.  
    return bytes;
  271.  
    }
  272.  
     
  273.  
    }
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfkfefc
系列文章
更多 icon
同类精品
更多 icon
继续加载