分類: 3C資訊

  • Netty學習篇⑤–編、解碼源碼分析

    Netty學習篇⑤–編、解碼源碼分析

    前言

    學習Netty也有一段時間了,Netty作為一個高性能的異步框架,很多RPC框架也運用到了Netty中的知識,在rpc框架中豐富的數據協議及編解碼可以讓使用者更加青睞;
    Netty支持豐富的編解碼框架,其本身內部提供的編解碼也可以應對各種業務場景;
    今天主要就是學習下Netty中提供的編、解碼類,之前只是簡單的使用了下Netty提供的解碼類,今天更加深入的研究下Netty中編、解碼的源碼及部分使用。

    編、解碼的概念

    • 編碼(Encoder)

      編碼就是將我們發送的數據編碼成字節數組方便在網絡中進行傳輸,類似Java中的序列化,將對象序列化成字節傳輸
    • 解碼(Decoder)

      解碼和編碼相反,將傳輸過來的字節數組轉化為各種對象來進行展示等,類似Java中的反序列化
      如:
      // 將字節數組轉化為字符串
      new String(byte bytes[], Charset charset)

    編、解碼超類

    ByteToMessageDecoder: 解碼超類,將字節轉換成消息

    解碼解碼一般用於將獲取到的消息解碼成系統可識別且自己需要的數據結構;因此ByteToMessageDecoder需要繼承ChannelInboundHandlerAdapter入站適配器來獲取到入站的數據,在handler使用之前通過channelRead獲取入站數據進行一波解碼;
    ByteToMessageDecoder類圖

    源碼分析

    通過channelRead獲取入站數據,將數據緩存至cumulation數據緩衝區,最後在傳給decode進行解碼,在read完成之後清空緩存的數據

    1. 獲取入站數據

    /**
    *  通過重寫channelRead方法來獲取入站數據
    */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 檢測是否是byteBuf對象格式數據
        if (msg instanceof ByteBuf) {
            // 實例化字節解碼成功輸出集合 即List<Object> out
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                // 獲取到的請求的數據
                ByteBuf data = (ByteBuf) msg;
                // 如果緩衝數據區為空則代表是首次觸發read方法
                first = cumulation == null;
                if (first) {
                    // 如果是第一次read則當前msg數據為緩衝數據
                    cumulation = data;
                } else {
                    // 如果不是則觸發累加,將緩衝區的舊數據和新獲取到的數據通過        expandCumulation 方法累加在一起存入緩衝區cumulation
                    // cumulator 累加類,將緩衝池中數據和新數據進行組合在一起
                    // private Cumulator cumulator = MERGE_CUMULATOR;
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                // 將緩衝區數據cumulation進行解碼
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                // 在解碼完畢后釋放引用和清空全局字節緩衝區
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                    // discardAfterReads為netty中設置的讀取多少次后開始丟棄字節 默認值16
                    // 可通過setDiscardAfterReads(int n)來設置值不設置默認16次
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // 在我們讀取了足夠的數據可以嘗試丟棄一些字節已保證不出現內存溢出的異常
                    // 
                    // See https://github.com/netty/netty/issues/4275
                    // 讀取次數重置為0
                    numReads = 0;
                    // 重置讀寫指針或丟棄部分已讀取的字節
                    discardSomeReadBytes();
                }
                // out為解碼成功的傳遞給下一個handler
                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                // 結束當前read傳遞到下個ChannelHandler
                fireChannelRead(ctx, out, size);
                // 回收響應集合 將insertSinceRecycled設置為false;
                // insertSinceRecycled用於channelReadComplete判斷使用
                out.recycle();
            }
        } else {
            // 不是的話直接fire傳遞給下一個handler
            ctx.fireChannelRead(msg);
        }
    }
    
    2. 初始化字節緩衝區計算器: Cumulator主要用於全局字節緩衝區和新讀取的字節緩衝區組合在一起擴容
    public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
        
        /**
        * alloc ChannelHandlerContext分配的字節緩衝區
        * cumulation 當前ByteToMessageDecoder類全局的字節緩衝區
        * in 入站的字節緩衝區
        **/
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            final ByteBuf buffer;
            // 如果全局ByteBuf寫入的字節+當前入站的字節數據大於全局緩衝區最大的容量或者全局緩衝區的引用數大於1個或全局緩衝區只讀
            if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
                // Expand cumulation (by replace it) when either there is not more room in the buffer
                // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                // duplicate().retain() or if its read-only.
                //
                // See:
                // - https://github.com/netty/netty/issues/2327
                // - https://github.com/netty/netty/issues/1764
                // 進行擴展全局字節緩衝區(容量大小 = 新數據追加到舊數據末尾組成新的全局字節緩衝區)
                buffer = expandCumulation(alloc, cumulation, in.readableBytes());
            } else {
                buffer = cumulation;
            }
            // 將新數據寫入緩衝區
            buffer.writeBytes(in);
            // 釋放當前的字節緩衝區的引用
            in.release();
            
            return buffer;
        }
    };
    
    
    /**
    * alloc 字節緩衝區操作類
    * cumulation 全局累加字節緩衝區
    * readable 讀取到的字節數長度
    */
    // 字節緩衝區擴容方法
    static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
        // 舊數據
        ByteBuf oldCumulation = cumulation;
        // 通過ByteBufAllocator將緩衝區擴大到oldCumulation + readable大小
        cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
        // 將舊數據重新寫入到新的字節緩衝區
        cumulation.writeBytes(oldCumulation);
        // 舊字節緩衝區引用-1
        oldCumulation.release();
        return cumulation;
    }
    3. ByteBuf釋放當前字節緩衝區的引用: 通過調用ReferenceCounted接口中的release方法來釋放
    @Override
    public boolean release() {
        return release0(1);
    }
    
    @Override
    public boolean release(int decrement) {
        return release0(checkPositive(decrement, "decrement"));
    }
    
    /**
    * decrement 減量
    */
    private boolean release0(int decrement) {
        for (;;) {
            int refCnt = this.refCnt;
            // 當前引用小於減量
            if (refCnt < decrement) {
                throw new IllegalReferenceCountException(refCnt, -decrement);
            }
            // 這裏就利用里線程併發中的知識CAS,線程安全的設置refCnt的值
            if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
                // 如果減量和引用量相等
                if (refCnt == decrement) {
                    // 全部釋放
                    deallocate();
                    return true;
                }
                return false;
            }
        }
    }

    4. 將全局字節緩衝區進行解碼

    /**
    * ctx ChannelHandler的上下文,用於傳輸數據與下一個handler來交互
    * in 入站數據
    * out 解析之後的出站集合 (此出站不是返回給客戶端的而是傳遞給下個handler的)
    */
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            // 如果入站數據還有沒解析的
            while (in.isReadable()) {
                // 解析成功的出站集合長度
                int outSize = out.size();
                // 如果大於0則說明解析成功的數據還沒被消費完,直接fire掉給通道中的後續handler繼續                消費
                if (outSize > 0) {
                    fireChannelRead(ctx, out, outSize);
                    out.clear();
    
                    // Check if this handler was removed before continuing with decoding.
                    // 在這個handler刪除之前檢查是否還在繼續解碼
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    // 如果移除了,它繼續操作緩衝區是不安全的
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/4635
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }
                // 入站數據字節長度
                int oldInputLength = in.readableBytes();
                // 開始解碼數據
                decodeRemovalReentryProtection(ctx, in, out);
    
                // Check if this handler was removed before continuing the loop.
                // 
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }
    
                // 解析完畢跳出循環
                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }
    
                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                        StringUtil.simpleClassName(getClass()) +
                        ".decode() did not read anything but decoded a message.");
                }
    
                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable cause) {
            throw new DecoderException(cause);
        }
    }
    
    final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            // 設置解碼狀態為正在解碼  STATE_INIT = 0; STATE_CALLING_CHILD_DECODE = 1;             STATE_HANDLER_REMOVED_PENDING = 2; 分別為初始化; 解碼; 解碼完畢移除
            decodeState = STATE_CALLING_CHILD_DECODE;
            try {
                // 具體的解碼邏輯(netty提供的解碼器或自定義解碼器中重寫的decode方法)
                decode(ctx, in, out);
            } finally {
                // 此時decodeState為正在解碼中 值為1,返回false
                boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
                // 在設置為初始化等待解碼
                decodeState = STATE_INIT;
                // 解碼完成移除當前ChannelHandler標記為不處理
                // 可以看看handlerRemoved源碼。如果緩衝區還有數據直接傳遞給下一個handler
                if (removePending) {
                    handlerRemoved(ctx);
                }
            }
        }
    5. 執行channelReadComplete
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 讀取次數重置
        numReads = 0;
        // 重置讀寫index
        discardSomeReadBytes();
        // 在channelRead meth中定義賦值 decodeWasNull = !out.insertSinceRecycled();
        // out指的是解碼集合List<Object> out; 咱們可以點進
        if (decodeWasNull) {
            decodeWasNull = false;
            if (!ctx.channel().config().isAutoRead()) {
                ctx.read();
            }
        }
        // fire掉readComplete傳遞到下一個handler的readComplete
        ctx.fireChannelReadComplete();
    }
    
    /**
    *  然後我們可以搜索下insertSinceRecucled在什麼地方被賦值了
    * Returns {@code true} if any elements where added or set. This will be reset once {@link #recycle()} was called.
    */
    boolean insertSinceRecycled() {
        return insertSinceRecycled;
    }
    
    
    // 搜索下insert的調用我們可以看到是CodecOutputList類即為channelRead中的out集合,眾所周知在    decode完之後,解碼數據就會被調用add方法,此時insertSinceRecycled被設置為true
    private void insert(int index, Object element) {
        array[index] = element;
        insertSinceRecycled = true;
    }
    
    
    /**
    * 清空回收數組內部的所有元素和存儲空間
    * Recycle the array which will clear it and null out all entries in the internal storage.
    */
    // 搜索recycle的調用我么可以知道在channelRead的finally邏輯中 調用了out.recycle();此時        insertSinceRecycled被設置為false
    void recycle() {
        for (int i = 0 ; i < size; i ++) {
            array[i] = null;
        }
        clear();
        insertSinceRecycled = false;
        handle.recycle(this);
    }
    

    至此ByteToMessageDecoder解碼類應該差不多比較清晰了!!!

    MessageToByteEncoder: 編碼超類,將消息轉成字節進行編碼發出

    何謂編碼,就是將發送數據轉化為客戶端和服務端約束好的數據結構和格式進行傳輸,我們可以在編碼過程中將消息體body的長度和一些頭部信息有序的設置到ByteBuf字節緩衝區中;方便解碼方靈活的運用來判斷(是否完整的包等)和處理業務;解碼是繼承入站數據,反之編碼應該繼承出站的數據;接下來我們看看編碼類是怎麼進行編碼的;
    MessageToByteEncoder類圖如下

    源碼分析

    既然是繼承出站類,我們直接看看write方法是怎麼樣的

    /**
    * 通過write方法獲取到出站的數據即要發送出去的數據
    * ctx channelHandler上下文
    * msg 發送的數據 Object可以通過繼承類指定的泛型來指定
    * promise channelPromise異步監聽,類似ChannelFuture,只不過promise可以設置監聽的結果,future只能通過獲取監聽的成功失敗結果;可以去了解下promise和future的區別
    */
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            // 檢測發送數據的類型 通過TypeParameterMatcher類型匹配器
            if (acceptOutboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                // 分配字節緩衝區 preferDirect默認為true
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    // 進行編碼
                    encode(ctx, cast, buf);
                } finally {
                    // 完成編碼后釋放對象的引用
                    ReferenceCountUtil.release(cast);
                }
                // 如果緩衝區有數據則通過ctx發送出去,promise可以監聽數據傳輸並設置是否完成
                if (buf.isReadable()) {
                    ctx.write(buf, promise);
                } else {
                    // 如果沒有數據則釋放字節緩衝區的引用併發送一個empty的空包
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                // 非TypeParameterMatcher類型匹配器匹配的類型直接發送出去
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            if (buf != null) {
                buf.release();
            }
        }
    }
    
    // 初始化設置preferDirect為true
    protected MessageToByteEncoder() {
        this(true);
    }
    protected MessageToByteEncoder(boolean preferDirect) {
        matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
        this.preferDirect = preferDirect;
    }

    編碼: 重寫encode方法,根據實際業務來進行數據編碼

    // 此處就是我們需要重寫的編碼方法了,我們和根據約束好的或者自己定義好想要的數據格式發送給對方
    
    // 下面是我自己寫的demo的編碼方法;頭部設置好body的長度,服務端可以根據長度來判斷是否是完整的包,僅僅自學寫的簡單的demo非正常線上運營項目的邏輯
    public class MyClientEncode extends MessageToByteEncoder<String> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
            if (null != msg) {
                byte[] request = msg.getBytes(Charset.forName("UTF-8"));
                out.writeInt(request.length);
                out.writeBytes(request);
            }
        }
    }

    編碼類相對要簡單很多,因為只需要將發送的數據序列化,按照一定的格式進行發送數據!!!

    項目實戰

    項目主要簡單的實現下自定義編解碼器的運用及LengthFieldBasedFrameDecoder的使用

    • 項目結構如下
      │  hetangyuese-netty-06.iml
      │  pom.xml
      │
      ├─src
      │  ├─main
      │  │  ├─java
      │  │  │  └─com
      │  │  │      └─hetangyuese
      │  │  │          └─netty
      │  │  │              ├─client
      │  │  │              │      MyClient06.java
      │  │  │              │      MyClientChannelInitializer.java
      │  │  │              │      MyClientDecoder.java
      │  │  │              │      MyClientEncode.java
      │  │  │              │      MyClientHandler.java
      │  │  │              │      MyMessage.java
      │  │  │              │
      │  │  │              └─server
      │  │  │                      MyChannelInitializer.java
      │  │  │                      MyServer06.java
      │  │  │                      MyServerDecoder.java
      │  │  │                      MyServerDecoderLength.java
      │  │  │                      MyServerEncoder.java
      │  │  │                      MyServerHandler.java
      │  │  │
      │  │  └─resources
      │  └─test
      │      └─java
      
    • 服務端

      Serverhandler: 只是簡單的將解碼的內容輸出

      public class MyServerHandler extends ChannelInboundHandlerAdapter {
      
          @Override
          public void channelActive(ChannelHandlerContext ctx) throws Exception {
              System.out.println("客戶端連接成功 time: " + new Date().toLocaleString());
          }
      
          @Override
          public void channelInactive(ChannelHandlerContext ctx) throws Exception {
              System.out.println("客戶端斷開連接 time: " + new Date().toLocaleString());
          }
      
          @Override
          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
              String body = (String) msg;
              System.out.println("content:" + body);
          }
      
          @Override
          public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
              // 出現異常關閉通道
              cause.printStackTrace();
              ctx.close();
          }
      }

      解碼器

      public class MyServerDecoder extends ByteToMessageDecoder {
      
          // 此處我頭部只塞了長度字段佔4個字節,別問為啥我知道,這是要客戶端和服務端約束好的
          private static int min_head_length = 4;
      
          @Override
          protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
              // 解碼的字節長度
              int size = in.readableBytes();
              if(size < min_head_length) {
                  System.out.println("解析的數據長度小於頭部長度字段的長度");
                  return ;
              }
              // 讀取的時候指針已經移位到長度字段的尾端
              int length = in.readInt();
              if (size < length) {
                  System.out.println("解析的數據長度與長度不符合");
                  return ;
              }
      
              // 上面已經讀取到了長度字段,後面的長度就是body
              ByteBuf decoderArr = in.readBytes(length);
              byte[] request = new byte[decoderArr.readableBytes()];
              // 將數據寫入空數組
              decoderArr.readBytes(request);
              String body = new String(request, Charset.forName("UTF-8"));
              out.add(body);
          }
      }

      將解碼器加入到channelHandler中:記得加到業務handler的前面否則無效

      public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
      
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
              ch.pipeline()
      //                .addLast(new MyServerDecoderLength(10240, 0, 4, 0, 0))
      //                .addLast(new LengthFieldBasedFrameDecoder(10240, 0, 4, 0, 0))
                      .addLast(new MyServerDecoder())
                      .addLast(new MyServerHandler())
              ;
          }
      }
    • 客戶端

      ClientHandler

      public class MyClientHandler extends ChannelInboundHandlerAdapter {
      
          @Override
          public void channelActive(ChannelHandlerContext ctx) throws Exception {
              System.out.println("與服務端連接成功");
              for (int i = 0; i<10; i++) {
                  ctx.writeAndFlush("hhhhh" + i);
              }
          }
      
          @Override
          public void channelInactive(ChannelHandlerContext ctx) throws Exception {
              System.out.println("與服務端斷開連接");
          }
      
          @Override
          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
              System.out.println("收到服務端消息:" +msg+ " time: " + new Date().toLocaleString());
          }
      
          @Override
          public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
              cause.printStackTrace();
              ctx.close();
          }
      }

      編碼器

      public class MyClientEncode extends MessageToByteEncoder<String> {
      
          @Override
          protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
              if (null != msg) {
                  byte[] request = msg.getBytes(Charset.forName("UTF-8"));
                  out.writeInt(request.length);
                  out.writeBytes(request);
              }
          }
      }

      將編碼器加到ClientHandler的前面

      public class MyClientChannelInitializer extends ChannelInitializer<SocketChannel> {
      
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
              ch.pipeline()
                      .addLast(new MyClientDecoder())
                      .addLast(new MyClientEncode())
                      .addLast(new MyClientHandler())
              ;
      
          }
      }
    • 服務端運行結果
      MyServer06 is start ...................
      客戶端連接成功 time: 2019-11-19 16:35:47
      content:hhhhh0
      content:hhhhh1
      content:hhhhh2
      content:hhhhh3
      content:hhhhh4
      content:hhhhh5
      content:hhhhh6
      content:hhhhh7
      content:hhhhh8
      content:hhhhh9
    • 如果不用自定義的解碼器怎麼獲取到body內容呢

      將自定義編碼器換成LengthFieldBasedFrameDecoder(10240, 0, 4, 0, 0)

      public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
      
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
              ch.pipeline()
      //                .addLast(new MyServerDecoderLength(10240, 0, 4, 0, 0))
                      .addLast(new LengthFieldBasedFrameDecoder(10240, 0, 4, 0, 0))
      //                .addLast(new MyServerDecoder())
                      .addLast(new MyServerHandler())
              ;
          }
      }
      
      // 怕忘記的各個參數的含義在這在說明一次,自己不斷的修改每個值觀察結果就可以更加深刻的理解
      /**
      * maxFrameLength:消息體的最大長度,好像默認最大值為1024*1024
      * lengthFieldOffset 長度字段所在字節數組的下標 (我這是第一個write的所以下標是0)
      * lengthFieldLength 長度字段的字節長度(int類型佔4個字節)
      * lengthAdjustment 長度字段補償的數值 (lengthAdjustment =  數據包長度 - lengthFieldOffset - lengthFieldLength - 長度域的值),解析需要減去對應的數值
      * initialBytesToStrip 是否去掉長度字段(0不去除,對應長度域字節長度)
      */
      public LengthFieldBasedFrameDecoder(
                  int maxFrameLength,
                  int lengthFieldOffset, int lengthFieldLength,
                  int lengthAdjustment, int initialBytesToStrip)
      結果: 前都帶上了長度
      MyServer06 is start ...................
      客戶端連接成功 time: 2019-11-19 17:53:42
      收到客戶端發來的消息:   hhhhh0, time: 2019-11-19 17:53:42
      收到客戶端發來的消息:   hhhhh1, time: 2019-11-19 17:53:42
      收到客戶端發來的消息:   hhhhh2, time: 2019-11-19 17:53:42
      收到客戶端發來的消息:   hhhhh3, time: 2019-11-19 17:53:42
      收到客戶端發來的消息:   hhhhh4, time: 2019-11-19 17:53:42
      收到客戶端發來的消息:   hhhhh5, time: 2019-11-19 17:53:42
      收到客戶端發來的消息:   hhhhh6, time: 2019-11-19 17:53:42
      收到客戶端發來的消息:   hhhhh7, time: 2019-11-19 17:53:42
      收到客戶端發來的消息:   hhhhh8, time: 2019-11-19 17:53:42
      收到客戶端發來的消息:   hhhhh9, time: 2019-11-19 17:53:42

      如果我們在客戶端的長度域中做手腳 LengthFieldBasedFrameDecoder(10240, 0, 4, 0, 0)

      舊: out.writeInt(request.length);
      新: out.writeInt(request.length + 1);
      // 看結果就不正常,0後面多了一個0;但是不知道為啥只解碼了一次??? 求解答
      MyServer06 is start ...................
      客戶端連接成功 time: 2019-11-19 17:56:55
      收到客戶端發來的消息:   hhhhh0 , time: 2019-11-19 17:56:55
      
      // 正確修改為 LengthFieldBasedFrameDecoder(10240, 0, 4, -1, 0)
      // 結果:
      MyServer06 is start ...................
      客戶端連接成功 time: 2019-11-19 18:02:18
      收到客戶端發來的消息:   hhhhh0, time: 2019-11-19 18:02:18
      收到客戶端發來的消息:   hhhhh1, time: 2019-11-19 18:02:18
      收到客戶端發來的消息:   hhhhh2, time: 2019-11-19 18:02:18
      收到客戶端發來的消息:   hhhhh3, time: 2019-11-19 18:02:18
      收到客戶端發來的消息:   hhhhh4, time: 2019-11-19 18:02:18
      收到客戶端發來的消息:   hhhhh5, time: 2019-11-19 18:02:18
      收到客戶端發來的消息:   hhhhh6, time: 2019-11-19 18:02:18
      收到客戶端發來的消息:   hhhhh7, time: 2019-11-19 18:02:18
      收到客戶端發來的消息:   hhhhh8, time: 2019-11-19 18:02:18
      收到客戶端發來的消息:   hhhhh9, time: 2019-11-19 18:02:18

      捨棄長度域 :LengthFieldBasedFrameDecoder(10240, 0, 4, 0, 4)

      // 結果
      MyServer06 is start ...................
      客戶端連接成功 time: 2019-11-19 18:03:44
      收到客戶端發來的消息:hhhhh0, time: 2019-11-19 18:03:44
      收到客戶端發來的消息:hhhhh1, time: 2019-11-19 18:03:44
      收到客戶端發來的消息:hhhhh2, time: 2019-11-19 18:03:44
      收到客戶端發來的消息:hhhhh3, time: 2019-11-19 18:03:44
      收到客戶端發來的消息:hhhhh4, time: 2019-11-19 18:03:44
      收到客戶端發來的消息:hhhhh5, time: 2019-11-19 18:03:44
      收到客戶端發來的消息:hhhhh6, time: 2019-11-19 18:03:44
      收到客戶端發來的消息:hhhhh7, time: 2019-11-19 18:03:44
      收到客戶端發來的消息:hhhhh8, time: 2019-11-19 18:03:44
      收到客戶端發來的消息:hhhhh9, time: 2019-11-19 18:03:44
      分析源碼示例中的 lengthAdjustment = 消息字節長度 – lengthFieldOffset-lengthFieldLength-長度域中的值
    • 源碼中的示例
       * <pre>
       * lengthFieldOffset   =  0
       * lengthFieldLength   =  2
       * <b>lengthAdjustment</b>    = <b>-2</b> (= the length of the Length field)
       * initialBytesToStrip =  0
       *
       * BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
       * +--------+----------------+      +--------+----------------+
       * | Length | Actual Content |----->| Length | Actual Content |
       * | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
       * +--------+----------------+      +--------+----------------+
       * </pre>
      長度域中0x000E為16進制,轉換成10進制是14,說明消息體長度為14;根據公式:14-0-2-14 = -2
      * <pre>
       * lengthFieldOffset   = 0
       * lengthFieldLength   = 3
       * <b>lengthAdjustment</b>    = <b>2</b> (= the length of Header 1)
       * initialBytesToStrip = 0
       *
       * BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
       * +----------+----------+----------------+      +----------+----------+----------------+
       * |  Length  | Header 1 | Actual Content |----->|  Length  | Header 1 | Actual Content |
       * | 0x00000C |  0xCAFE  | "HELLO, WORLD" |      | 0x00000C |  0xCAFE  | "HELLO, WORLD" |
       * +----------+----------+----------------+      +----------+----------+----------------+
       * </pre>
      從上的例子可以知道;lengthAdjustment(2) = 17- 12(00000C)-lengthFieldOffset(0) - lengthFieldLength(3);

      …….等等

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※帶您來了解什麼是 USB CONNECTOR  ?

    ※自行創業 缺乏曝光? 下一步”網站設計“幫您第一時間規劃公司的門面形象

    ※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

    ※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

    ※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

    ※試算大陸海運運費!

  • Hadoop壓縮的圖文教程

    Hadoop壓縮的圖文教程

    近期由於Hadoop集群機器硬盤資源緊張,有需求讓把 Hadoop 集群上的歷史數據進行下壓縮,開始從網上查找的都是關於各種壓縮機制的對比,很少有關於怎麼壓縮的教程(我沒找到。。),再此特記錄下本次壓縮的過程,方便以後查閱,利己利人。

     

    本文涉及的所有 jar包、腳本、native lib 見文末的相關下載 ~

     

    我的壓縮版本: 

    Jdk 1.7及以上

    Hadoop-2.2.0 版本

     

    壓縮前環境準備:

    關於壓縮算法對比,網上資料很多,這裏我用的是 Bzip2 的壓縮方式,比較中庸,由於是Hadoop自帶的壓縮機制,也不需要額外下載別的東西,只需要在 Hadoop根目錄下 lib/native 文件下有如下文件即可:

     

     

     

     

    壓縮之前要檢查 Hadoop 集群支持的壓縮算法: hadoop checknative

    每台機器都要檢查一下,都显示如圖 true 則說明 集群支持 bzip2 壓縮,

    如果显示false 則需要將上圖的文件下載拷貝到 Hadoop根目錄下 lib/native

     

     

     

    壓縮程序介紹:

     

    壓縮程序用到的類 getFileList(獲取文件路徑) 、 FileHdfsCompress(壓縮類)、FileHdfsDeCompress(解壓縮類) ,只用到這三個類即可完成壓縮/解壓縮操作。

     

     

     

     

    getFileList 作用:遞歸打印 傳入文件目錄下文件的根路徑,包括子目錄下的文件。開始想直接輸出到文件中,後來打包放到集群上運行時,發現文件沒有內容,可能是由於分佈式運行的關係,所以就把路徑打印出來,人工在放到文件中。

     

    核心代碼:
    public static void listAllFiles(String path, List<String> strings) throws IOException {
            FileSystem hdfs = getHdfs(path);
            Path[] filesAndDirs = getFilesAndDirs(path);
    
            for(Path p : filesAndDirs){
                if(hdfs.getFileStatus(p).isFile()){
                    if(!p.getName().contains("SUCCESS")){
                        System.out.println(p);
                    }
                }else{
                    listAllFiles(p.toString(),strings);
                }
            }
           // FileUtils.writeLines(new File(FILE_LIST_OUTPUT_PATH), strings,true);
    
        }

    public static FileSystem getHdfs(String path) throws IOException {
            Configuration conf = new Configuration();
            return FileSystem.get(URI.create(path),conf);
        }
    
    
    public static Path[] getFilesAndDirs(String path) throws IOException { FileStatus[] fs = getHdfs(path).listStatus(new Path(path)); return FileUtil.stat2Paths(fs); }

      

     FileHdfsCompress:壓縮程序非常簡單,對應程序里的 FileHdfsCompress 類,(解壓縮是 FileHdfsDeCompress),採用的是Hadoop 原生API  ,將Hadoop集群上原文件讀入作為輸入流,將壓縮路徑的輸入流作為輸出,再使用相關的壓縮算法即,代碼如下:

     

    核心代碼:
    
     //指定壓縮方式
                Class<?> codecClass = Class.forName(COMPRESS_CLASS_NAME);
    
                Configuration conf = new Configuration();
                CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);
                // FileSystem fs = FileSystem.get(conf);
                FileSystem fs = FileSystem.get(URI.create(inputPath),conf);
    
                //原文件路徑 txt 用原本的輸入流讀入
                FSDataInputStream in = fs.open(new Path(inputPath));
    
                //創建 HDFS 上的輸出流,壓縮路徑
                //通過文件系統獲取輸出流
                OutputStream out = fs.create(new Path(FILE_OUTPUT_PATH));
                //對輸出流的數據壓縮
                CompressionOutputStream compressOut = codec.createOutputStream(out);
                //讀入原文件 壓縮到HDFS上 輸入--普通流  輸出-壓縮流
                IOUtils.copyBytes(in, compressOut, 4096,true);
    

      

    以下是代碼優化的過程,不涉及壓縮程序使用,不感興趣同學可以跳過 ~ :

     

    在實際編碼中,我其實是走了彎路的,一開始並沒有想到用 Hadoop API 就能實現壓縮解壓縮功能,代碼到此其實是經歷了優化迭代的過程。

     

    最開始時壓縮的思路就是 將文件讀進來,再壓縮出去,一開始使用了 MapReduce 的方式,在編碼過程中,由於對生成壓縮文件的路徑還有要求,又在 Hadoop 輸出時自定義了輸出類來使的輸出文件的名字符合要求,不是 part-r-0000.txt ,而是時間戳.txt 的格式,至此符合原線上路徑的要求。

     

    而在實際運行過程中發現,MR 程序需要啟動 Yarn,並佔用Yarn 資源,由於壓縮時間較長,有可能會長時間佔用 集群資源不釋放,後來發現 MR 程序的初衷是用來做并行計算的,而壓縮僅僅是 map 任務讀取一條就寫一條,不涉及計算,就是內容的簡單搬運。所以這裏放棄了使用 MR 想着可不可以就用簡單的 Hadoop API 就完成壓縮功能,經過一番嘗試后,發現真的可行! 使用了 Hadoop API 釋放了集群資源,壓縮速度也還可以,這樣就把這個壓縮程序當做一個後台進程跑就行了也不用考慮集群資源分配的問題

     

    實測壓縮步驟:

     

    1 將項目打包,上傳到hadoop 集群任一節點即可,準備好相應的腳本,輸入數據文件,日誌文件,如下圖:

     

     

     

     

    2 使用獲取文件路徑腳本,打印路徑: 

     

     

     

    getFileLish.sh 腳本內容如下,就是簡單調用,傳入參數為 hadoop集群上 HDFS 上目錄路徑

    #!/bin/sh
    
    echo "begin get fileList"
    
    echo "第一個參數$1"
    
    if [ ! -n "$1" ]; then
    echo "check param!"
    fi
    
    #original file
    hadoop jar hadoop-compress-1.0.jar com.people.util.getFileList  $1
    

      

    3 將 打印的路徑 粘貼到 compress.txt 中,第 2 步中會把目錄的文件路徑包括子目錄路徑都打印出來,將其粘貼進  compress.txt 中即可,注意 文件名可隨意定

     

    4 使用壓縮腳本即可,sh compress.sh /data/new_compress/compress.txt  ,加粗的部分是腳本的參數意思是 第3步中文件的路徑,注意:這裏只能是絕對路徑,不然可能報找不到文件的異常。

     

     

     compress.sh 腳本內容如下,就是簡單調用,傳入參數為 第3步中文件的絕對路徑

     

    #!/bin/sh
    echo "begin compress"
    
    echo "第一個參數$1"
    
    if [ ! -n "$1" ]; then
    echo "check param!"
    fi
    
    
    hadoop jar hadoop-compress-1.0.jar com.people.compress.FileHdfsCompress  $1 >> /data/new_compress/compress.log 2>&1 &
    

      

     

    5 查看壓縮日誌,發現後台程序已經開始壓縮了!,tail  -f  compress.log

     

    6 如果感覺壓縮速度不夠,可以多台機器執行腳本,也可以一台機器執行多個任務,因為這個腳本任務是一個後台進程,不會佔用集群 Yarn 資源。

     

     相關下載:

     

    程序源碼下載: git@github.com:fanpengyi/hadoop-compress.git

     

    Hadoop 壓縮相關需要的 腳本、jar包、lib 下載: 關注公眾號 “大數據江湖”,後台回復 “hadoop壓縮”,即可下載

     

    長按即可關注

     

     — The End —

     

     

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
    【其他文章推薦】

    ※為什麼 USB CONNECTOR 是電子產業重要的元件?

    網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

    ※想要讓你的商品成為最夯、最多人討論的話題?網頁設計公司讓你強力曝光

    ※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

    ※專營大陸快遞台灣服務

    台灣快遞大陸的貨運公司有哪些呢?

  • Thrift總結(四)Thrift實現雙向通信

    Thrift總結(四)Thrift實現雙向通信

    前面介紹過 Thrift 安裝和使用,介紹了Thrift服務的發布和客戶端調用,可以查看我之前的文章:

    但是,之前介紹的都是單向的客戶端發送消息,服務端接收消息。而客戶端卻得不到服務器的響應。

    那如果我們要實現雙向通信(即:客戶端發送請求,服務端處理返回,服務端發送消息,客戶端處理返回)的功能,該怎麼實現呢?

     

    其實在不涉及語言平台的制約,WebService或是webapi 就可以實現這種客戶端發起請求,服務端的處理的單向流程。

    然而,實際場景中,可能我們的某些業務需求,更需要服務端能夠響應請求並處理數據。下面我通過一個demo案例,介紹下Thrift 是如何實現雙向通信的。

     

    一、安裝Thrift

    這裏不再贅述,戳這裏查看我上篇文章的介紹:

     

    二、編寫Thrift IDL文件 

    編寫thrift腳本,命名為student.thrift  如下:

    service HelloWorldService{
        void SayHello(1:string msg);
    }

    生成service 的方法,之前的文章有介紹,這裏就不介紹了。

     

    三、編寫服務端代碼

    創建HelloThrift.Server 服務端工程,添加HelloWorldBidirectionServer類,HelloWorldBidirectionServer 實現了Iface接口用於接收客戶端消息,並有一個客戶端傳輸層對象集合用於記錄所有已連接的客戶端。

     public class HelloWorldBidirectionServer : HelloWorldBidirectionService.Iface
        {
            public void Run(int port)
            {
                try
                {
                    TServerTransport transport = new TServerSocket(port);
    
                    TTransportFactory transportFac = new TTransportFactory();
    
                    TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
                    TThreadPoolServer server = new TThreadPoolServer(getProcessorFactory(), transport, transportFac, inputProtocolFactory);
    
                    server.Serve();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
    
            public static List<TTransport> TransportCollection = new List<TTransport>();
    
            public void SayHello(string msg)
            {
                Console.WriteLine(string.Format("{0:yyyy/MM/dd hh:mm:ss} 服務端接收到消息: {1}", DateTime.Now, msg));
            }
    
            public void SayToClient(string msg)
            {
                try
                {
                    foreach (TTransport trans in TransportCollection)
                    {
                        TBinaryProtocol protocol = new TBinaryProtocol(trans);
                        HelloWorldBidirectionService.Client client = new HelloWorldBidirectionService.Client(protocol);
                        //Thread.Sleep(1000);
                        client.SayHello(msg);
                        //Console.WriteLine("發給了客戶端喲");
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
    
            public TProcessorFactory getProcessorFactory()
            {
                return new HelloWorldBidirectionProcessor();
            }
        }
    
        public class HelloWorldBidirectionProcessor : TProcessorFactory
        {
            public TProcessor GetProcessor(TTransport trans, TServer server = null)
            {
                if (trans.IsOpen)
                {
                    HelloWorldBidirectionServer.TransportCollection.Add(trans);
                    Console.WriteLine("客戶端連上。");
                }
    
                HelloWorldBidirectionServer srv = new HelloWorldBidirectionServer();
                return new global::HelloWorldBidirectionService.Processor(srv);
            }
        }

     

    四、編寫客戶端代碼

    首先創建HelloThrift.Client客戶端項目,添加接收服務端消息的類HelloWorldBidirectionClient,裏面只有一個實現Iface接口的方法:

      public class HelloWorldBidirectionClient
        {
            static HelloWorldBidirectionService.Client client = null;
            public void ConnectAndListern(int port, string ip = "127.0.0.1")
            {
                //Tsocket: TCP/IP Socket接口
                TSocket tSocket = new TSocket(ip, port);
                //消息結構協議
                TProtocol protocol = new TBinaryProtocol(tSocket);
                try
                {
                    if (client == null)
                    {
                        client = new global::HelloWorldBidirectionService.Client(protocol);
                        tSocket.Open();//建立連接
                        StartListern(tSocket);//啟動監聽線程
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
    
            public void Say(string msg)
            {
                if (client != null)
                    client.SayHello(msg);
            }
    
            void StartListern(TSocket tSocket)
            {
                Thread t = new Thread(new ParameterizedThreadStart(Run));
                t.Start(tSocket);
            }
    
            public void Run(object tSocket)
            {
                HelloWorldBidirectionService.Processor process = new HelloWorldBidirectionService.Processor(new HelloWorldBidirectionFace());
    
                try
                {
                    while (process.Process(new TBinaryProtocol((TSocket)tSocket), new TBinaryProtocol((TSocket)tSocket)))
                    {
                        Console.WriteLine("消息接收完成,等下一波,阻塞中......");
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine("連接斷開..." + ex.Message);
                }
            }
    
        }
        class HelloWorldBidirectionFace : HelloWorldBidirectionService.Iface
        {
            public void SayHello(string msg)
            {
                Console.WriteLine(string.Format("{0:yyyy/MM/dd hh:mm:ss} 收到服務端響應消息 {1}", DateTime.Now, msg));
    
            }
        }

     實現客戶端,ConnectAndListern方法可以與服務端建立連接,並開啟客戶端端口監聽來自服務端的信息。Say方法可將消息發送至服務端。

     

    五、測試

     測試效果如下:

     

     

     

    六、最後

      1. 關於使用Thrift 構建我們自己的rpc 的方法,這裏基本講完了。其他的方法本文就不再演示了,調用起來都是一樣。  

      2. 後續會簡單討論一下Thrift 框架的通信原理。

      3. 源代碼下載,

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
    【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包”嚨底家”

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    小三通海運與一般國際貿易有何不同?

    小三通快遞通關作業有哪些?

  • 美國蒙大拿州密蘇拉(Missoula)聯邦地區法官克里斯坦森(Dana Christensen)與環保人士及美國原住民站在同一陣線

    美國蒙大拿州密蘇拉(Missoula)聯邦地區法官克里斯坦森(Dana Christensen)與環保人士及美國原住民站在同一陣線,駁回美國魚類暨野生動物管理局(US Fish and Wildlife Service)將灰熊從瀕危物種名單除名的決定。

    環保人士主張,根據瀕臨滅絕物種保護法,對這些灰熊與蒙大拿州和下48州(Lower 48)的其他灰熊族群採取差別待遇,是生物學上靠不住且非法行為,法官也同意這類說法。

    環保人士說,儘管灰熊數量有所回升,倘若沒有受到聯邦持續保護,牠們的復育情況就會受到影響。此外,氣候變遷導致灰熊食物供給出現變化和人為死亡率高,也對灰熊生存構成威脅。

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※帶您來了解什麼是 USB CONNECTOR  ?

    ※自行創業 缺乏曝光? 下一步”網站設計“幫您第一時間規劃公司的門面形象

    ※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

    ※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

    ※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

    ※試算大陸海運運費!

  • 每年聯合國大會召開之際,多國元首和政府領袖同時舉行的「氣候週」今天(25日)開跑

    每年聯合國大會召開之際,多國元首和政府領袖同時舉行的「氣候週」今天(25日)開跑,他們敦促世界領袖緊急採取行動降低全球暖化。

    波蘭12月將主辦聯合國氣候變化綱要公約第24次締約方會議(COP24),聯合國氣候首長艾斯皮諾薩(Patricia Espinosa)呼籲各國團結,支持2015年巴黎協定所訂規定,將全球暖化升溫限制在攝氏兩度以下。

    艾斯皮諾薩表示,各國並未實現他們的承諾。並說:「各國目前依據巴黎協定做出的承諾,將使得全球溫度在2100年升高約三度。」

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※為什麼 USB CONNECTOR 是電子產業重要的元件?

    網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

    ※想要讓你的商品成為最夯、最多人討論的話題?網頁設計公司讓你強力曝光

    ※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

    ※專營大陸快遞台灣服務

    台灣快遞大陸的貨運公司有哪些呢?

  • 北極白鯨驚奇現身泰晤士河 再不回家專家憂安危

    摘錄自2018年9月26日中央社報導

    據英國各大媒體報導,一隻迷途的白鯨現身英國泰晤士河,第一次被目擊是25日在肯特郡(Kent)格雷夫森德(Gravesend)的泰晤士河段,當時白鯨正在駁船附近覓食,被暱稱為「班尼」(Benny),距離其原棲息地北極圈水域達數千公里。26日白鯨再次現身同個地點,引發是否迷航及可能遇險的擔憂。

    鯨豚保育協會(WDC)海洋哺乳類動物科學家羅特(Rob Lott)表示,這隻白鯨正受到監控,以防牠擱淺。「但白鯨停留在泰晤士河口的時間愈長,就愈令人擔心。」

    海洋生物保育慈善團體ORCA的保育學家巴比(Lucy Babey)表示:「這是白鯨出現在英國的最南端紀錄。」

    英國海洋生物救援組織表示,英國最後一次發現白鯨蹤跡是3年前在北英格蘭的諾森伯蘭(Northumberland)海岸,以及北愛爾蘭,但極為罕見。

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

    ※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

    ※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

    台灣海運大陸貨務運送流程

    兩岸物流進出口一站式服務

  • 蘇門答臘瀕危老虎 受困捕獸陷阱後死亡

    摘錄自2018年9月26日中央社報導

    印尼官員今天(26日)說,一頭被列為極度瀕危物種的蘇門答臘虎死屍在蘇門答臘島廖內省(Riau)的慕亞拉藍布村(Muara Lembu)附近山溝被發現,根據虎屍肚子上圍繞著陷阱裡的繩索研判,應是受困獵人所設陷阱後死亡。

    稍早,當地村民告訴保育機構說,有人看到一頭雌蘇門答臘虎受困獵人為捕獵野豬設置的陷阱。但官員趕往現場後,已不見老虎。官員隔天回到原區域搜尋,才在附近山溝尋獲。

    國際自然保育聯盟(IUCN)將蘇門答臘虎列為「極危」的瀕危動物。自然界只剩不到400頭蘇門答臘虎,環保人士說,由於蘇門答臘虎自然棲息地迅速縮減,使牠們與人類發生衝突的機率升高。

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包”嚨底家”

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    小三通海運與一般國際貿易有何不同?

    小三通快遞通關作業有哪些?

  • 保護環境 越南明年起加徵汽油環保稅

    摘錄自2018年9月21日中央社報導

    為了保護和改善環境,越南政府自明年起將調漲對汽油等燃料產品的環境保護稅,其中汽油每公升環保稅將從現行收費3000越盾(約新台幣4元)提升為4000越盾。

    越南國會常務委員會昨天(20日)通過有關環境保護稅的決議,自明年起將對相關產品加徵環保稅,希望藉此保護與改善環境,減少污染物排放。

    根據這項決議,汽油每公升環保稅將從現行收費3000越盾提升為4000越盾,柴油從1500越盾升到2000越盾,燃油和潤滑油從900越盾升到2000越盾等;此外,煤炭、塑膠袋與除草劑等也被列入加徵環保稅的產品名單。

    越南國會官員指出,加徵對汽油等燃料產品的環保稅後,國家預算每年將增加15.7兆越盾,這是一筆很大的稅收,有利用於保護與改善環境。

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

    ※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

    ※Google地圖已可更新顯示潭子電動車充電站設置地點!!

    ※帶您來看台北網站建置台北網頁設計,各種案例分享

    小三通物流營運型態?

    ※快速運回,大陸空運推薦?

  • 米其林啟動循環經濟 2048年輪胎永續用料達80%、百分百回收

    米其林啟動循環經濟 2048年輪胎永續用料達80%、百分百回收

    環境資訊中心外電;姜唯 翻譯;林大利 審校;稿源:ENS

    法國輪胎製造商米其林提出「Ambition 2048」計畫,要在2048年達到輪胎用料80%為永續材料,並且回收再利用率達到100%。



    米其林新概念輪胎「VISION」。圖片來源:米其林Michelin

    2018年全球報廢輪胎達10億個

    世界永續發展工商理事會估計,2018年全世界將產生10億個報廢輪胎,約2500萬噸。今日全球輪胎再製率為70%,回收率為50%,多的20%轉化為能量。相較之下,塑膠包裝或容器每年回收率僅14%。

    為實現「Ambition 2048」,米其林正在投資高科技回收技術,以期將永續材料比例拉高到80%。

    米其林總部位於克萊蒙費朗,目前在全球有11萬1700名員工,遍佈170個國家,在17個國家擁有68個生產基地,2016年共生產1.87億個輪胎。

    「Biobutterfly」計畫 啟動輪胎循環經濟

    米其林計畫用它的新輪胎「VISION」建立循環經濟。這種新概念輪胎不需要充氣,將採用生物來源和回收材料製成,胎面可由生物分解,透過3D列印再製。



    米其林新輪胎「VISION」。圖片來源:米其林Michelin

    今日輪胎的成份包含超過200種原料。輪胎工業中使用的橡膠中有60%是用石油衍生碳氫化合物製成,剩下的40%仍然是天然橡膠。

    米其林的「Ambition 2048」永續發展目標包括致力研究生物來源材料。2012年米其林與「Axens」石化公司和法國石油能源研究所(IFP Energies Nouvelles)共同啟動「Biobutterfly」計畫。

    「Biobutterfly」計畫致力於透過生物材料製作合成橡膠,例如木材、禾稈(straw)和甜菜。

    專利技術 回收輪胎轉化為永續材料

    米其林也試著將更多可回收和可再生材料整合進輪胎中。2017年底米其林收購了總部位於美國喬治亞州的「Lehigh Technologies」化學公司,該公司的專利技術是把回收輪胎轉製成高科技微粉化橡膠粉末。



    微粉化橡膠粉末。圖片來源:Lehigh Technologies

    這些創新材料減少了輪胎生產所需的非再生原料數量,如合成橡膠或碳煙。

    微粉化橡膠粉末是一種低成本的永續材料,可代替輪胎製程中使用的其他材料,以及塑膠、瀝青和建築材料。

    世界許多輪胎大廠以及瀝青和建築材料專業公司已經是「Lehigh Technologies」公司的客戶。

    藉著「Ambition 2048」,米其林估計將實現:

    *每年省下3300萬桶石油,足以填滿16.5個超級油輪
    *法國每個人一個月的總能量消耗
    *每年省下一台一般轎車(8L / 100公里)跑650億公里的油耗

    Sustainable Ambitions: Michelin Plans for 2048 CLERMONT-FERRAND, France, September 24, 2018 (ENS)

    To the French tire manufacturer Michelin, Ambition 2048 means a whole new strategy of using sustainable materials in tire manufacturing and recycling. It means that in the year 2048 Michelin plans to manufacture its tires using 80 percent sustainable materials, and that 100 percent of those tires will be recycled.



    圖片來源:米其林Michelin

    Headquartered in Clermont-Ferrand, Michelin is present in 170 of the world’s 197 countries, has 111,700 employees and operates 68 production facilities in 17 countries, which collectively produced 187 million tires in 2016.

    The World Business Council for Sustainable Development estimates that in 2018 there will be one billion end-of-life tires generated in the world – around 25 million tons.

    Today the worldwide recovery rate for tires is 70 percent and the recycling rate is 50 percent. The remaining 20 percent are transformed into energy. By comparison, 14 percent of plastic packaging or containers are recovered each year.

    To accomplish Ambition 2048, Michelin is investing in high technology recycling technologies that will enable the company to increase this content to 80 percent sustainable material.

    Michelin plans to help create a circular economy with a new tire concept called VISION. This airless tire would be made of bio-sourced and recycled products with a biodegradable tread that is renewable with a 3D printer. 

    Today, over 200 raw materials go into tire composition. Sixty percent of the rubber used in the tire industry is synthetic, produced from petroleum-derived hydrocarbons, although natural rubber is still necessary for the remaining 40 percent.

    Michelin’s Ambition 2048 sustainable development goal includes a commitment to research into bio-sourced materials, such as Biobutterfly, a program launched in 2012 with Axens and IFP Energies Nouvelles.

    Biobutterfly involves the creation of synthetic elastomers from biomass such as wood, straw or beet.

    Michelin is integrating more recycled and renewable materials in its tires. This strategy motivated the acquisition in late 2017 of the American company Lehigh Technologies, based in Georgia, which specializes in high technology micronized rubber powders derived from recycled tires.

    These innovative materials reduce the amount of non-renewable raw materials needed for tire production, such as elastomers or carbon black.

    Micronized rubber powder is a low cost sustainable material that can substitute for other components used in the manufacture of tires, as well as plastics, asphalt and construction materials.

    Major world tire manufacturers, as well as companies specialized in asphalt and construction materials are already Lehigh Technologies’ customers.

    When Ambition 2048 is achieved, Michelin estimates that the potential savings will be equivalent to:

    * – 33 million barrels of oil saved per year, enough to fill 16.5 supertankers
    * – One month’s total energy consumption of everyone in France
    * – 65 billion kilometers driven by an average sedan (8L / 100 km) per year

    ※ 全文及圖片詳見:

    作者

    如果有一件事是重要的,如果能為孩子實現一個願望,那就是人類與大自然和諧共存。

    於特有生物研究保育中心服務,小鳥和棲地是主要的研究對象。是龜毛的讀者,認為龜毛是探索世界的美德。

    延伸閱讀

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

    ※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

    台灣寄大陸海運貨物規則及重量限制?

    大陸寄台灣海運費用試算一覽表

    台中搬家,彰化搬家,南投搬家前需注意的眉眉角角,別等搬了再說!

  • 噴藥害死蜜蜂 奧地利果農遭判刑

    摘錄自2018年9月26日中央社報導

    奧地利一名果農因違法噴灑殺蟲劑,隸屬鄰近2個養蜂人超過50個蜂群因此死亡。26日奧地利克拉根福法院(Klagenfurt)以「蓄意危害環境」,判處果農1年有期徒刑,至少需服刑4個月才可假釋,以及賠償超過2萬歐元(2萬3500美元)。

    這名47歲果農針對他位於奧地利卡林西亞省(Carinthia)拉萬特地區(Lavanttal)的果樹噴灑藥效強大的殺蟲劑陶斯松(chlorpyrifos),當時果樹的花仍會吸引蜜蜂前去。法院指出,以他的經驗和訓練他人的角色,足以證明他知道自身行為會帶來何種後果。

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

    網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

    ※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

    大陸寄台灣空運注意事項

    大陸海運台灣交貨時間多久?

    ※避免吃悶虧無故遭抬價!台中搬家公司免費估價,有契約讓您安心有保障!