標籤: 網頁設計公司推薦

  • Zabbix-(四)郵件、釘釘告警通知

    Zabbix-(四)郵件、釘釘告警通知

    Zabbix-(四)郵件、釘釘告警通知

    一.前言

    在之前的文章里,通過Zabbix對主機的磁盤、CPU以及內存進行了監控,並在首頁Dashboard里創建了監控圖形,但是只有當我們登錄到Zabbix后才能看到監控到的問題(Problem),因此在本篇文章里,將利用觸發器(Trigger),以及媒介(Media)等配置項,實現當觸發器觸發時,通過不同媒介,如:郵件、釘釘,發送動作(Action),實現實時通知告警功能。

    準備

    • Zabbix Server (Zabbix 4.4)
    • 在Zabbix中已配置一些監控項和觸發器(這些配置可以參考我的)

    二.安裝相關環境

    由於使用到腳本告警媒介,本文中通過調用Python腳本觸發告警,因此需要在Zabbix Server主機上安裝pip以及相關模塊。(這裏Python使用Centos7自帶的Python2.7.5)

    1. 安裝pip

      # yum install -y epel-release
      
      # yum install -y python-pip
    2. 安裝requests模塊

      # pip install requests

    三.配置告警媒介類型

    Zabbix默認自帶了2種報警媒介類型(Media Type)电子郵件以及短信,我們將修改电子郵件類型配置,並新建腳本類型和Webhook類型。希望通過腳本、Webhook告警媒介發送釘釘消息。

    注:Webhook告警媒介是Zabbix 4.4的新特性

    1. 修改电子郵件告警媒介

      點擊【管理】-【報警媒介類型】-【Email】

      修改Email配置,我這裏用的是Outlook郵箱,具體SMTP服務器可以參考。使用其他郵箱也可以去對應官網查詢SMTP配置。

      測試發送郵箱,點擊【測試】

      輸入收件人郵箱

      收到郵件

    2. 新增腳本告警媒介

      新建Python腳本告警媒介,用戶釘釘告警

      點擊【創建媒體類型】

      進行配置

      配置項
      * 名稱 Python腳本
      類型 腳本
      * 腳本名稱 pythonScript.py
      腳本參數(參數1) {ALERT.MESSAGE}
      腳本參數(參數2) {ALERT.SENDTO}
      腳本參數(參數3) {ALERT.SUBJECT}

      接下來新建Python腳本,Zabbix Server配置文件中可以配置告警腳本路徑,默認為 /usr/lib/zabbix/alertscripts

      # 查看告警腳本路徑
      # cat zabbix_server.conf | grep AlertScriptsPath

      編寫告警腳本

      # cd /usr/lib/zabbix/alertscripts
      # vim pythonScript.py

      腳本內容

      #!/usr/bin/env python
      #coding:utf-8
      
      import requests,json,sys,os,datetime
      
      # 釘釘機器人地址
      webhook="https://oapi.dingtalk.com/robot/send?access_token=your_dingding_robot_access_token"
      
      # 對應{ALERT.SENDTO}, Zabbix告警媒介配置界面第2個參數
      user=sys.argv[2]
      
      # 對應{ALERT.MESSAGE}, Zabbix告警媒介配置界面第1個參數
      text=sys.argv[1]
      data={
          "msgtype": "text",
          "text": {
              "content": text
          },
          "at": {
              "atMobiles": [
                  user
              ],
              "isAtAll": False
          }
      }
      headers = {'Content-Type': 'application/json'}
      x=requests.post(url=webhook,data=json.dumps(data),headers=headers)

      給腳本可執行權限

      # chmod uo+x /usr/lib/zabbix/alertscripts/pythonScript.py

      測試腳本

      釘釘收到消息

    3. 新增Webhook告警媒介

      配置項
      * 名稱 Webhook
      類型 Webhook
      參數: (名稱)
      user {ALERT.SENDTO}
      subject {ALERT.SUBJECT}
      message {ALERT.MESSAGE}

      腳本:

      try {
          Zabbix.Log(4, 'params= '+value);
      
          params = JSON.parse(value);
          req = new CurlHttpRequest();
          data = {};
          result = {};
      
          req.AddHeader('Content-Type: application/json');
      
          data.msgtype = "text";
          //   對應 message參數
          data.text = {"content" : params.message};
          //   對應 user參數
          data.at = {"atMobiles": [params.user], "isAtAll": "false"};
      
          //   釘釘機器人
          resp = req.Post('https://oapi.dingtalk.com/robot/send?access_token=your_access_token',
              JSON.stringify(data)
          );
      } catch (error) {
          result = {};
      }
      
      return JSON.stringify(result);

      測試Webhook

    四.為用戶添加告警媒介

    需要將新增的告警媒介添加給用戶

    點擊【用戶】-【告警媒介】

    將上述步驟添加的告警媒介(Python腳本、Webhoob、Email),進行添加(收件人根據告警媒介類型填寫郵箱手機號),嚴重性也根據需要勾選。

    五.配置動作

    完成上述配置完成后,需要創建動作(Action),將觸發器(Trigger)告警媒介(Media Type)進行關聯,一旦觸發器觸發,那麼Zabbix會執行動作,再去執行告警媒介。

    1. 添加動作

      點擊【配置】-【動作】-【創建動作】

    2. 配置【動作】相關信息

      配置項
      * 名稱 告警動作
      新的觸發條件 【觸發器】【等於】【Template Disk Free Size: 磁盤剩餘空間觸發器】

      操作步驟如下圖:

      群組選擇 ->Linux servers

      主機選擇 -> Template Disk Free Size 模板()

      勾選觸發器 -> 磁盤剩餘空間觸發器 ()

      勾選後點擊【選擇】

    3. 配置【操作】相關信息

      點擊【操作】

      先配置以下信息

      配置項
      * 默認操作步驟持續時間 1h(保持默認)
      默認標題 告警: {EVENT.NAME}
      消息內容 【磁盤空間不足告警】
      告警事件: {EVENT.DATE} {EVENT.TIME}
      告警問題: {EVENT.NAME}
      告警主機: {HOST.IP} {HOST.NAME}
      告警級別: {EVENT.SEVERITY}
      磁盤剩餘:{ITEM.VALUE}

      上述配置表格【默認標題】和【消息內容】值中形如{EVENT.NAME}的內容是Zabbix中的宏(Marco),宏是一個變量,例如 {HOST.IP} 表示告警主機的IP地址,Zabbix自帶的宏可以參考

      繼續配置操作

      點擊【新的】

      【操作類型】選擇發送消息,【發送到用戶】添加Admin

      【僅送到】根據需要選擇之前配置的,本文選擇Email和Python腳本(這裏只能單選或全選,所以需要先選擇一個,因此需要多次)

      添加完成後點擊【添加】

    六.測試

    向被監控主機拷貝或下載大文件,使其磁盤剩餘空間低於觸發器監控閾值,等待觸發器觸發問題,查看儀錶盤、郵件等。

    儀錶盤

    釘釘

    郵件

    七.參考文檔

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
    【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包”嚨底家”

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    小三通海運與一般國際貿易有何不同?

    小三通快遞通關作業有哪些?

  • Feign 調用丟失Header的解決方案

    Feign 調用丟失Header的解決方案

    問題

    在 Spring Cloud 中 微服務之間的調用會用到Feign,但是在默認情況下,Feign 調用遠程服務存在Header請求頭丟失問題。

    解決方案

    首先需要寫一個 Feign請求攔截器,通過實現RequestInterceptor接口,完成對所有的Feign請求,傳遞請求頭和請求參數。

    Feign 請求攔截器

    public class FeignBasicAuthRequestInterceptor implements RequestInterceptor {
    
        private static final Logger logger = LoggerFactory.getLogger(FeignBasicAuthRequestInterceptor.class);
    
        @Override
        public void apply(RequestTemplate requestTemplate) {
            ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder
                    .getRequestAttributes();
            HttpServletRequest request = attributes.getRequest();
            Enumeration<String> headerNames = request.getHeaderNames();
            if (headerNames != null) {
                while (headerNames.hasMoreElements()) {
                    String name = headerNames.nextElement();
                    String values = request.getHeader(name);
                    requestTemplate.header(name, values);
                }
            }
            Enumeration<String> bodyNames = request.getParameterNames();
            StringBuffer body =new StringBuffer();
            if (bodyNames != null) {
                while (bodyNames.hasMoreElements()) {
                    String name = bodyNames.nextElement();
                    String values = request.getParameter(name);
                    body.append(name).append("=").append(values).append("&");
                }
            }
            if(body.length()!=0) {
                body.deleteCharAt(body.length()-1);
                requestTemplate.body(body.toString());
                logger.info("feign interceptor body:{}",body.toString());
            }
        }
    }

    通過配置文件配置 讓 所有 FeignClient,來使用 FeignBasicAuthRequestInterceptor

    feign:
      client:
        config:
          default:
            connectTimeout: 5000
            readTimeout: 5000
            loggerLevel: basic
            requestInterceptors: com.leparts.config.FeignBasicAuthRequestInterceptor

    也可以配置讓 某個 FeignClient 來使用這個 FeignBasicAuthRequestInterceptor

    feign:
      client:
        config:
          xxxx: # 遠程服務名
            connectTimeout: 5000
            readTimeout: 5000
            loggerLevel: basic
            requestInterceptors: com.leparts.config.FeignBasicAuthRequestInterceptor

    經過測試,上面的解決方案可以正常的使用;但是出現了新的問題。

    在轉發Feign的請求頭的時候, 如果開啟了Hystrix, Hystrix的默認隔離策略是Thread(線程隔離策略), 因此轉發攔截器內是無法獲取到請求的請求頭信息的。

    可以修改默認隔離策略為信號量模式:

    hystrix.command.default.execution.isolation.strategy=SEMAPHORE

    但信號量模式不是官方推薦的隔離策略;另一個解決方法就是自定義Hystrix的隔離策略。

    自定義策略

    HystrixConcurrencyStrategy 是提供給開發者去自定義hystrix內部線程池及其隊列,還提供了包裝callable的方法,以及傳遞上下文變量的方法。所以可以繼承了HystrixConcurrencyStrategy,用來實現了自己的併發策略。

    @Component
    public class FeignHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
    
        private static final Logger log = LoggerFactory.getLogger(FeignHystrixConcurrencyStrategy.class);
    
        private HystrixConcurrencyStrategy delegate;
    
        public FeignHystrixConcurrencyStrategy() {
            try {
                this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
                if (this.delegate instanceof FeignHystrixConcurrencyStrategy) {
                    // Welcome to singleton hell...
                    return;
                }
    
                HystrixCommandExecutionHook commandExecutionHook =
                        HystrixPlugins.getInstance().getCommandExecutionHook();
    
                HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
                HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
                HystrixPropertiesStrategy propertiesStrategy =
                        HystrixPlugins.getInstance().getPropertiesStrategy();
                this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher, propertiesStrategy);
    
                HystrixPlugins.reset();
                HystrixPlugins instance = HystrixPlugins.getInstance();
                instance.registerConcurrencyStrategy(this);
                instance.registerCommandExecutionHook(commandExecutionHook);
                instance.registerEventNotifier(eventNotifier);
                instance.registerMetricsPublisher(metricsPublisher);
                instance.registerPropertiesStrategy(propertiesStrategy);
            } catch (Exception e) {
                log.error("Failed to register Sleuth Hystrix Concurrency Strategy", e);
            }
        }
    
        private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
                                                     HystrixMetricsPublisher metricsPublisher,
                                                     HystrixPropertiesStrategy propertiesStrategy) {
            if (log.isDebugEnabled()) {
                log.debug("Current Hystrix plugins configuration is [" + "concurrencyStrategy ["
                        + this.delegate + "]," + "eventNotifier [" + eventNotifier + "]," + "metricPublisher ["
                        + metricsPublisher + "]," + "propertiesStrategy [" + propertiesStrategy + "]," + "]");
                log.debug("Registering Sleuth Hystrix Concurrency Strategy.");
            }
        }
    
        @Override
        public <T> Callable<T> wrapCallable(Callable<T> callable) {
            RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
            return new WrappedCallable<>(callable, requestAttributes);
        }
    
        @Override
        public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
                                                HystrixProperty<Integer> corePoolSize,
                                                HystrixProperty<Integer> maximumPoolSize,
                                                HystrixProperty<Integer> keepAliveTime,
                                                TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime,
                    unit, workQueue);
        }
    
        @Override
        public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
                                                HystrixThreadPoolProperties threadPoolProperties) {
            return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
        }
    
        @Override
        public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
            return this.delegate.getBlockingQueue(maxQueueSize);
        }
    
        @Override
        public <T> HystrixRequestVariable<T> getRequestVariable(HystrixRequestVariableLifecycle<T> rv) {
            return this.delegate.getRequestVariable(rv);
        }
    
        static class WrappedCallable<T> implements Callable<T> {
            private final Callable<T> target;
            private final RequestAttributes requestAttributes;
    
            WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
                this.target = target;
                this.requestAttributes = requestAttributes;
            }
    
            @Override
            public T call() throws Exception {
                try {
                    RequestContextHolder.setRequestAttributes(requestAttributes);
                    return target.call();
                } finally {
                    RequestContextHolder.resetRequestAttributes();
                }
            }
        }
    }

    致此,Feign調用丟失請求頭的問題就解決的了 。

    參考

    https://blog.csdn.net/zl1zl2zl3/article/details/79084368
    https://cloud.spring.io/spring-cloud-static/spring-cloud-openfeign/2.2.0.RC2/reference/html/

    歡迎掃碼或微信搜索公眾號《程序員果果》關注我,關注有驚喜~

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

    ※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

    台灣寄大陸海運貨物規則及重量限制?

    大陸寄台灣海運費用試算一覽表

    台中搬家,彰化搬家,南投搬家前需注意的眉眉角角,別等搬了再說!

  • 一文帶你深入了解 Redis 的持久化方式及其原理

    一文帶你深入了解 Redis 的持久化方式及其原理

    Redis 提供了兩種持久化方式,一種是基於快照形式的 RDB,另一種是基於日誌形式的 AOF,每種方式都有自己的優缺點,本文將介紹 Redis 這兩種持久化方式,希望閱讀本文後你對 Redis 的這兩種方式有更加全面、清晰的認識。

    RDB 快照方式持久化

    先從 RDB 快照方式聊起,RDB 是 Redis 默認開啟的持久化方式,並不需要我們單獨開啟,先來看看跟 RDB 相關的配置信息:

    ################################ SNAPSHOTTING  ################################
    #
    # Save the DB on disk:
    #
    #   save <seconds> <changes>
    #
    #   Will save the DB if both the given number of seconds and the given
    #   number of write operations against the DB occurred.
    #
    #   In the example below the behaviour will be to save:
    #   after 900 sec (15 min) if at least 1 key changed
    #   after 300 sec (5 min) if at least 10 keys changed
    #   after 60 sec if at least 10000 keys changed
    #   save ""
    # 自動生成快照的觸發機制 中間的是時間,單位秒,後面的是變更數據 60 秒變更 10000 條數據則自動生成快照
    save 900 1
    save 300 10
    save 60 10000
    
    # 生成快照失敗時,主線程是否停止寫入
    stop-writes-on-bgsave-error yes
    
    # 是否採用壓縮算法存儲
    rdbcompression yes
    
    # 數據恢復時是否檢測 RDB文件有效性
    rdbchecksum yes
    
    # The filename where to dump the DB
    # RDB 快照生成的文件名稱
    dbfilename dump.rdb
    
    # 快照生成的路徑 AOF 也是存放在這個路徑下面
    dir .

    關於 RDB 相關配置信息不多,需要我們調整的就更少了,我們只需要根據自己的業務量修改生成快照的機制和文件存放路徑即可。

    RDB 有兩種持久化方式:手動觸發自動觸發手動觸發使用以下兩個命令:

    • save:會阻塞當前 Redis 服務器響應其他命令,直到 RDB 快照生成完成為止,對於內存 比較大的實例會造成長時間阻塞,所以線上環境不建議使用

    • bgsave:Redis 主進程會 fork 一個子進程,RDB 快照生成有子進程來負責,完成之後,子進程自動結束,bgsave 只會在 fork 子進程的時候短暫的阻塞,這個過程是非常短的,所以推薦使用該命令來手動觸發

    除了執行命令手動觸發之外,Redis 內部還存在自動觸發 RDB 的持久化機制,在以下幾種情況下 Redis 會自動觸發 RDB 持久化

    • 在配置中配置了 save 相關配置信息,如我們上面配置文件中的 save 60 10000 ,也可以把它歸類為“save m n”格式的配置,表示 m 秒內數據集存在 n 次修改時,會自動觸發 bgsave。

    • 在主從情況下,如果從節點執行全量複製操作,主節點自動執行 bgsave 生成 RDB 文件併發送給從節點

    • 執行 debug reload 命令重新加載 Redis 時,也會自動觸發 save 操作

    • 默認情況下執行 shutdown 命令時,如果沒有開啟 AOF 持久化功能則自動執行 bgsave

    上面就是 RDB 持久化的方式,可以看出 save 命令使用的比較少,大多數情況下使用的都是 bgsave 命令,所以這個 bgsave 命令還是有一些東西,那接下來我們就一起看看 bgsave 背後的原理,先從流程圖開始入手:

    bgsave 命令大概有以下幾個步驟:

    • 1、執行 bgsave 命令,Redis 主進程判斷當前是否存在正在執行的 RDB/AOF 子進程,如果存在, bgsave 命令直接返回不在往下執行。
    • 2、父進程執行 fork 操作創建子進程,fork 操作過程中父進程會阻塞,fork 完成後父進程將不在阻塞可以接受其他命令。
    • 3、子進程創建新的 RDB 文件,基於父進程當前內存數據生成臨時快照文件,完成後用新的 RDB 文件替換原有的 RDB 文件,並且給父進程發送 RDB 快照生成完畢通知

    上面就是 bgsave 命令背後的一些內容,RDB 的內容就差不多了,我們一起來總結 RDB 持久化的優缺點,RDB 方式的優點

    • RDB 快照是某一時刻 Redis 節點內存數據,非常適合做備份,上傳到遠程服務器或者文件系統中,用於容災備份
    • 數據恢復時 RDB 要遠遠快於 AOF

    有優點同樣存在缺點,RDB 的缺點有

    • RDB 持久化方式數據沒辦法做到實時持久化/秒級持久化。我們已經知道了 bgsave 命令每次運行都要執行 fork 操作創建子進程,屬於重量級操作,頻繁執行成本過高。
    • RDB 文件使用特定二進制格式保存,Redis 版本演進過程中有多個格式 的 RDB 版本,存在老版本 Redis 服務無法兼容新版 RDB 格式的問題

    如果我們對數據要求比較高,每一秒的數據都不能丟,RDB 持久化方式肯定是不能夠滿足要求的,那 Redis 有沒有辦法滿足呢,答案是有的,那就是接下來的 AOF 持久化方式

    AOF 持久化方式

    Redis 默認並沒有開啟 AOF 持久化方式,需要我們自行開啟,在 redis.conf 配置文件中將 appendonly no 調整為 appendonly yes,這樣就開啟了 AOF 持久化,與 RDB 不同的是 AOF 是以記錄操作命令的形式來持久化數據的,我們可以查看以下 AOF 的持久化文件 appendonly.aof

    *2
    $6
    SELECT
    $1
    0
    *3
    $3
    set
    $6
    mykey1
    $6
    你好
    *3
    $3
    set
    $4
    key2
    $5
    hello
    *1
    $8

    大概就是長這樣的,具體的你可以查看你 Redis 服務器上的 appendonly.aof 配置文件,這也意味着我們可以在 appendonly.aof 文件中國修改值,等 Redis 重啟時將會加載修改之後的值。看似一些簡單的操作命令,其實從命令到 appendonly.aof 這個過程中非常有學問的,下面時 AOF 持久化流程圖:

    在 AOF 持久化過程中有兩個非常重要的操作:一個是將操作命令追加到 AOF_BUF 緩存區,另一個是 AOF_buf 緩存區數據同步到 AOF 文件,接下來我們詳細聊一聊這兩個操作:

    1、為什麼要將命令寫入到 aof_buf 緩存區而不是直接寫入到 aof 文件?

    我們知道 Redis 是單線程響應,如果每次寫入 AOF 命令都直接追加到磁盤上的 AOF 文件中,這樣頻繁的 IO 開銷,Redis 的性能就完成取決於你的機器硬件了,為了提升 Redis 的響應效率就添加了一層 aof_buf 緩存層, 利用的是操作系統的 cache 技術,這樣就提升了 Redis 的性能,雖然這樣性能是解決了,但是同時也引入了一個問題,aof_buf 緩存區數據如何同步到 AOF 文件呢?由誰同步呢?這就是我們接下來要聊的一個操作:fsync 操作

    2、aof_buf 緩存區數據如何同步到 aof 文件中?

    aof_buf 緩存區數據寫入到 aof 文件是有 linux 系統去完成的,由於 Linux 系統調度機制周期比較長,如果系統故障宕機了,意味着一個周期內的數據將全部丟失,這不是我們想要的,所以 Linux 提供了一個 fsync 命令,fsync 是針對單個文件操作(比如這裏的 AOF 文件),做強制硬盤同步,fsync 將阻塞直到寫入硬盤完成后返回,保證了數據持久化,正是由於有這個命令,所以 redis 提供了配置項讓我們自行決定何時進行磁盤同步,redis 在 redis.conf 中提供了appendfsync 配置項,有如下三個選項:

    # appendfsync always
    appendfsync everysec
    # appendfsync no
    • always:每次有寫入命令都進行緩存區與磁盤數據同步,這樣保證不會有數據丟失,但是這樣會導致 redis 的吞吐量大大下降,下降到每秒只能支持幾百的 TPS ,這違背了 redis 的設計,所以不推薦使用這種方式
    • everysec:這是 redis 默認的同步機制,雖然每秒同步一次數據,看上去時間也很快的,但是它對 redis 的吞吐量沒有任何影響,每秒同步一次的話意味着最壞的情況下我們只會丟失 1 秒的數據, 推薦使用這種同步機制,兼顧性能和數據安全
    • no:不做任何處理,緩存區與 aof 文件同步交給系統去調度,操作系統同步調度的周期不固定,最長會有 30 秒的間隔,這樣出故障了就會丟失比較多的數據。

    這就是三種磁盤同步策略,但是你有沒有注意到一個問題,AOF 文件都是追加的,隨着服務器的運行 AOF 文件會越來越大,體積過大的 AOF 文件對 redis 服務器甚至是主機都會有影響,而且在 Redis 重啟時加載過大的 AOF 文件需要過多的時間,這些都是不友好的,那 Redis 是如何解決這個問題的呢?Redis 引入了重寫機制來解決 AOF 文件過大的問題。

    3、Redis 是如何進行 AOF 文件重寫的?

    Redis AOF 文件重寫是把 Redis 進程內的數據轉化為寫命令同步到新 AOF 文件的過程,重寫之後的 AOF 文件會比舊的 AOF 文件占更小的體積,這是由以下幾個原因導致的:

    • 進程內已經超時的數據不再寫入文件
    • 舊的 AOF 文件含有無效命令,如 del key1、hdel key2、srem keys、set a111、set a222等。重寫使用進程內數據直接生成,這樣新的AOF文件只保 留最終數據的寫入命令
    • 多條寫命令可以合併為一個,如:lpush list a、lpush list b、lpush list c可以轉化為:lpush list a b c。為了防止單條命令過大造成客戶端緩衝區溢 出,對於 list、set、hash、zset 等類型操作,以 64 個元素為界拆分為多條。

    重寫之後的 AOF 文件體積更小了,不但能夠節約磁盤空間,更重要的是在 Redis 數據恢復時,更小體積的 AOF 文件加載時間更短。AOF 文件重寫跟 RDB 持久化一樣分為手動觸發自動觸發,手動觸發直接調用 bgrewriteaof 命令就好了,我們後面會詳細聊一聊這個命令,自動觸發就需要我們在 redis.conf 中修改以下幾個配置

    auto-aof-rewrite-percentage 100
    auto-aof-rewrite-min-size 64mb
    • auto-aof-rewrite-percentage:代表當前 AOF文件空間 (aof_current_size)和上一次重寫后 AOF 文件空間(aof_base_size)的比值,默認是 100%,也就是一樣大的時候
    • auto-aof-rewrite-min-size:表示運行 AOF 重寫時 AOF 文件最小體積,默認為 64MB,也就是說 AOF 文件最小為 64MB 才有可能觸發重寫

    滿足了這兩個條件,Redis 就會自動觸發 AOF 文件重寫,AOF 文件重寫的細節跟 RDB 持久化生成快照有點類似,下面是 AOF 文件重寫流程圖:

    AOF 文件重寫也是交給子進程來完成,跟 RDB 生成快照很像,AOF 文件重寫在重寫期間建立了一個 aof_rewrite_buf 緩存區來保存重寫期間主進程響應的命令,等新的 AOF 文件重寫完成后,將這部分文件同步到新的 AOF 文件中,最後用新的 AOF 文件替換掉舊的 AOF 文件。需要注意的是在重寫期間,舊的 AOF 文件依然會進行磁盤同步,這樣做的目的是防止重寫失敗導致數據丟失,

    Redis 持久化數據恢復

    我們知道 Redis 是基於內存的,所有的數據都存放在內存中,由於機器宕機或者其他因素重啟了就會導致我們的數據全部丟失,這也就是要做持久化的原因,當服務器重啟時,Redis 會從持久化文件中加載數據,這樣我們的數據就恢復到了重啟前的數據,在數據恢復這一塊Redis 是如何實現的?我們先來看看數據恢復的流程圖:

    Redis 的數據恢複流程比較簡單,優先恢復的是 AOF 文件,如果 AOF 文件不存在時則嘗試加載 RDB 文件,為什麼 RDB 的恢復速度比 AOF 文件快,但是還是會優先加載 AOF 文件呢?我個人認為是 AOF 文件數據更全面並且 AOF 兼容性比 RDB 強,需要注意的是當存在 RDB/AOF 時,如果數據加載不成功,Redis 服務啟動會失敗。

    最後

    目前互聯網上很多大佬都有 Redis 系列教程,如有雷同,請多多包涵了。原創不易,碼字不易,還希望大家多多支持。若文中有所錯誤之處,還望提出,謝謝。

    歡迎掃碼關注微信公眾號:「平頭哥的技術博文」,和平頭哥一起學習,一起進步。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
    【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包”嚨底家”

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    小三通海運與一般國際貿易有何不同?

    小三通快遞通關作業有哪些?

  • 蘇門答臘瀕危老虎 受困捕獸陷阱後死亡

    摘錄自2018年9月26日中央社報導

    印尼官員今天(26日)說,一頭被列為極度瀕危物種的蘇門答臘虎死屍在蘇門答臘島廖內省(Riau)的慕亞拉藍布村(Muara Lembu)附近山溝被發現,根據虎屍肚子上圍繞著陷阱裡的繩索研判,應是受困獵人所設陷阱後死亡。

    稍早,當地村民告訴保育機構說,有人看到一頭雌蘇門答臘虎受困獵人為捕獵野豬設置的陷阱。但官員趕往現場後,已不見老虎。官員隔天回到原區域搜尋,才在附近山溝尋獲。

    國際自然保育聯盟(IUCN)將蘇門答臘虎列為「極危」的瀕危動物。自然界只剩不到400頭蘇門答臘虎,環保人士說,由於蘇門答臘虎自然棲息地迅速縮減,使牠們與人類發生衝突的機率升高。

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包”嚨底家”

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    小三通海運與一般國際貿易有何不同?

    小三通快遞通關作業有哪些?

  • 米其林啟動循環經濟 2048年輪胎永續用料達80%、百分百回收

    米其林啟動循環經濟 2048年輪胎永續用料達80%、百分百回收

    環境資訊中心外電;姜唯 翻譯;林大利 審校;稿源:ENS

    法國輪胎製造商米其林提出「Ambition 2048」計畫,要在2048年達到輪胎用料80%為永續材料,並且回收再利用率達到100%。



    米其林新概念輪胎「VISION」。圖片來源:米其林Michelin

    2018年全球報廢輪胎達10億個

    世界永續發展工商理事會估計,2018年全世界將產生10億個報廢輪胎,約2500萬噸。今日全球輪胎再製率為70%,回收率為50%,多的20%轉化為能量。相較之下,塑膠包裝或容器每年回收率僅14%。

    為實現「Ambition 2048」,米其林正在投資高科技回收技術,以期將永續材料比例拉高到80%。

    米其林總部位於克萊蒙費朗,目前在全球有11萬1700名員工,遍佈170個國家,在17個國家擁有68個生產基地,2016年共生產1.87億個輪胎。

    「Biobutterfly」計畫 啟動輪胎循環經濟

    米其林計畫用它的新輪胎「VISION」建立循環經濟。這種新概念輪胎不需要充氣,將採用生物來源和回收材料製成,胎面可由生物分解,透過3D列印再製。



    米其林新輪胎「VISION」。圖片來源:米其林Michelin

    今日輪胎的成份包含超過200種原料。輪胎工業中使用的橡膠中有60%是用石油衍生碳氫化合物製成,剩下的40%仍然是天然橡膠。

    米其林的「Ambition 2048」永續發展目標包括致力研究生物來源材料。2012年米其林與「Axens」石化公司和法國石油能源研究所(IFP Energies Nouvelles)共同啟動「Biobutterfly」計畫。

    「Biobutterfly」計畫致力於透過生物材料製作合成橡膠,例如木材、禾稈(straw)和甜菜。

    專利技術 回收輪胎轉化為永續材料

    米其林也試著將更多可回收和可再生材料整合進輪胎中。2017年底米其林收購了總部位於美國喬治亞州的「Lehigh Technologies」化學公司,該公司的專利技術是把回收輪胎轉製成高科技微粉化橡膠粉末。



    微粉化橡膠粉末。圖片來源:Lehigh Technologies

    這些創新材料減少了輪胎生產所需的非再生原料數量,如合成橡膠或碳煙。

    微粉化橡膠粉末是一種低成本的永續材料,可代替輪胎製程中使用的其他材料,以及塑膠、瀝青和建築材料。

    世界許多輪胎大廠以及瀝青和建築材料專業公司已經是「Lehigh Technologies」公司的客戶。

    藉著「Ambition 2048」,米其林估計將實現:

    *每年省下3300萬桶石油,足以填滿16.5個超級油輪
    *法國每個人一個月的總能量消耗
    *每年省下一台一般轎車(8L / 100公里)跑650億公里的油耗

    Sustainable Ambitions: Michelin Plans for 2048 CLERMONT-FERRAND, France, September 24, 2018 (ENS)

    To the French tire manufacturer Michelin, Ambition 2048 means a whole new strategy of using sustainable materials in tire manufacturing and recycling. It means that in the year 2048 Michelin plans to manufacture its tires using 80 percent sustainable materials, and that 100 percent of those tires will be recycled.



    圖片來源:米其林Michelin

    Headquartered in Clermont-Ferrand, Michelin is present in 170 of the world’s 197 countries, has 111,700 employees and operates 68 production facilities in 17 countries, which collectively produced 187 million tires in 2016.

    The World Business Council for Sustainable Development estimates that in 2018 there will be one billion end-of-life tires generated in the world – around 25 million tons.

    Today the worldwide recovery rate for tires is 70 percent and the recycling rate is 50 percent. The remaining 20 percent are transformed into energy. By comparison, 14 percent of plastic packaging or containers are recovered each year.

    To accomplish Ambition 2048, Michelin is investing in high technology recycling technologies that will enable the company to increase this content to 80 percent sustainable material.

    Michelin plans to help create a circular economy with a new tire concept called VISION. This airless tire would be made of bio-sourced and recycled products with a biodegradable tread that is renewable with a 3D printer. 

    Today, over 200 raw materials go into tire composition. Sixty percent of the rubber used in the tire industry is synthetic, produced from petroleum-derived hydrocarbons, although natural rubber is still necessary for the remaining 40 percent.

    Michelin’s Ambition 2048 sustainable development goal includes a commitment to research into bio-sourced materials, such as Biobutterfly, a program launched in 2012 with Axens and IFP Energies Nouvelles.

    Biobutterfly involves the creation of synthetic elastomers from biomass such as wood, straw or beet.

    Michelin is integrating more recycled and renewable materials in its tires. This strategy motivated the acquisition in late 2017 of the American company Lehigh Technologies, based in Georgia, which specializes in high technology micronized rubber powders derived from recycled tires.

    These innovative materials reduce the amount of non-renewable raw materials needed for tire production, such as elastomers or carbon black.

    Micronized rubber powder is a low cost sustainable material that can substitute for other components used in the manufacture of tires, as well as plastics, asphalt and construction materials.

    Major world tire manufacturers, as well as companies specialized in asphalt and construction materials are already Lehigh Technologies’ customers.

    When Ambition 2048 is achieved, Michelin estimates that the potential savings will be equivalent to:

    * – 33 million barrels of oil saved per year, enough to fill 16.5 supertankers
    * – One month’s total energy consumption of everyone in France
    * – 65 billion kilometers driven by an average sedan (8L / 100 km) per year

    ※ 全文及圖片詳見:

    作者

    如果有一件事是重要的,如果能為孩子實現一個願望,那就是人類與大自然和諧共存。

    於特有生物研究保育中心服務,小鳥和棲地是主要的研究對象。是龜毛的讀者,認為龜毛是探索世界的美德。

    延伸閱讀

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

    ※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

    台灣寄大陸海運貨物規則及重量限制?

    大陸寄台灣海運費用試算一覽表

    台中搬家,彰化搬家,南投搬家前需注意的眉眉角角,別等搬了再說!

  • 百度聯手奇瑞安徽蕪湖建首個“全無人駕駛運營區域”

    日前,百度與安徽省蕪湖市人民政府正式簽訂合作協定,雙方將在蕪湖共同建設“全無人駕駛汽車運營區域”。

    安徽省蕪湖市市長潘朝輝稱,百度正在與奇瑞聯手打造全自動駕駛汽車。奇瑞汽車有限公司董事長尹同躍也表示,首批試運營車將由奇瑞提供。數量有20-30台,均達到全自動駕駛水準。目前車輛已經完成自動駕駛改裝,即將上路測試。

    此次與蕪湖市人民政府共建的運營區域會隨著時間逐漸擴大範圍。第一階段全自動駕駛汽車會在道況簡單的有限區域進行試運營,3-5年之間擴大區域,5年之後實現全市示範。

    關於車輛具體歸屬,技術分工,第一階段運營區規模、何時啟動,以及百度、蕪湖市政府、奇瑞的具體運營角色,現場稱會在未來給出更加詳細的資訊。

    與其他進入自動駕駛的企業相比,百度是首家與地方政府企業合作進行城市道路試運營的企業。去年年底自動駕駛汽車事業部成立時,就對外公佈了全自動駕駛的3年商用,5年量產計畫。百度全自動駕駛汽車會以公共車輛的形式,選擇10個左右國內不同城市和示範區商用運營。此次與蕪湖市政府運營簽約,也包括了3年商用和5年量產的長遠規劃。

    隨著自動駕駛汽車開始進入工程化和市場化階段,和現階段大量的封閉區域和高速公路測試相比,城市是下階段測試的理想場景。就百度而言,通過城市環境的試運營,會獲得更多道路行駛資料,説明快速反覆運算技術推向市場。除此之外,累計路測經驗也將有利於百度更加貼近未來制定的自動駕駛汽車法律法規以及行駛道路與環境標準。

    本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包”嚨底家”

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    小三通海運與一般國際貿易有何不同?

    小三通快遞通關作業有哪些?

  • 清華大學與日產聯手研發電動車與自動駕駛技術

    5月16日,“清華大學(汽車系)-日產智慧出行聯合研究中心”成立儀式在北京舉行,日產汽車攜手清華大學,針對中國市場的電動汽車和自動駕駛技術開展研發工作。

    合作框架包括:電動汽車以及電池相關技術、自動駕駛和未來中國道路系統三個方面,雙方將共用資源,調研中國道路情況以及各地的駕駛習慣,助力發展中國智慧出行交通方式。與此同時,日產汽車與清華大學還將培養優秀本地人才。

    據瞭解,日產汽車與清華大學有著多年的合作歷史,過去10年間,雙方在汽車核心技術開發,汽車發展戰略研究等領域一直保持密切合作,日產汽車將通過此次合作深入研究中國道路交通網絡,以最佳方式實現“日產智慧出行”。

    本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

    ※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

    台灣寄大陸海運貨物規則及重量限制?

    大陸寄台灣海運費用試算一覽表

    台中搬家,彰化搬家,南投搬家前需注意的眉眉角角,別等搬了再說!

  • 2.NioEventLoop的創建

    2.NioEventLoop的創建

    NioEventLoop的創建

    NioEventLoop是netty及其重要的組成部件,它的首要職責就是為註冊在它上的channels服務,發現這些channels上發生的新連接、讀寫等I/O事件,然後將事件轉交 channel 流水線處理。使用netty時,我們首先要做的就是創建NioEventLoopGroup,這是一組NioEventLoop的集合,類似線程與線程池。通常,服務端會創建2個group,一個叫做bossGroup,一個叫做workerGroup。bossGroup負責監聽綁定的端口,接受請求並創建新連接,初始化后交由workerGroup處理後續IO事件。

    NioEventLoop和NioEventLoopGroup的類圖

    首先看看NioEventLoop和NioEventLoopGroup的類關係圖

    類多但不亂,可以發現三個特點:

    1. 兩者都繼承了ExecutorService,從而與線程池建立了聯繫
    2. NioEventLoop繼承的都是SingleThread,NioEventLoop繼承的是MultiThread
    3. NioEventLoop還繼承了AbstractScheduledEventExecutor,不難猜出這是個和定時任務調度有關的線程池

    NioEventLoopGroup的創建

    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    我們先看看bossGroup和workerGroup的構造方法。

    public NioEventLoopGroup() {
        this(0);
    }
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
    除此之外,還有多達8種構造方法,這些構造方法可以指定5種參數:
    1、最大線程數量。如果指定為0,那麼Netty會將線程數量設置為CPU邏輯處理器數量的2倍
    2、線程工廠。要求線程工廠類必須實現java.util.concurrent.ThreadFactory接口。如果沒有指定線程工廠,那麼默認DefaultThreadFactory。
    3、SelectorProvider。如果沒有指定SelectorProvider,那麼默認的SelectorProvider為SelectorProvider.provider()。
    4、SelectStrategyFactory。如果沒有指定則默認為DefaultSelectStrategyFactory.INSTANCE
    5、RejectedExecutionHandler。拒絕策略處理類,如果這個EventLoopGroup已被關閉,那麼之後提交的Runnable任務會默認調用RejectedExecutionHandler的reject方法進行處理。如果沒有指定,則默認調用拒絕策略。

    最終,NioEventLoopGroup會重載到父類MultiThreadEventExecutorGroup的構造方法上,這裏省略了一些健壯性代碼。

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {
        // 步驟1
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        
        // 步驟2
        children = new EventExecutor[nThreads];
        for (int i = 0; i < nThreads; i ++) {
            children[i] = newChild(executor, args);
        }    
    
        // 步驟3
        chooser = chooserFactory.newChooser(children);
    
        // 步驟4
        final FutureListener<Object> terminationListener = future -> {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        };
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
    
        // 步驟5
        Set<EventExecutor> childrenSet = new LinkedHashSet<>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }

    這裏可以分解為5個步驟,下面一步步講解

    步驟1

    第一個步驟是創建線程池executor。從workerGroup構造方法可知,默認傳進來的executor為null,所以首先創建executor。newDefaultThreadFactory的作用是設置線程的前綴名和線程優先級,默認情況下,前綴名是nioEventLoopGroup-x-y這樣的命名規則,而線程優先級則是5,處於中間位置。
    創建完newDefaultThreadFactory后,進入到ThreadPerTaskExecutor。它直接實現了juc包的線程池頂級接口,從構造方法可以看到它只是簡單的把factory賦值給自己的成員變量。而它實現的接口方法調用了threadFactory的newThread方法。從名字可以看出,它構造了一個thread,並立即啟動thread。

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }    

    那麼我們回過頭來看下DefaultThreadFactory的newThread方法,發現他創建了一個FastThreadLocalThread。這是netty自定義的一個線程類,顧名思義,netty認為它的性能更快。關於它的解析留待以後。這裏步驟1創建線程池就完成了。總的來說他與我們通常使用的線程池不太一樣,不設置線程池的線程數和任務隊列,而是來一個任務啟動一個線程。(問題:那任務一多豈不是直接線程爆炸?)

    @Override
    public Thread newThread(Runnable r) {
        Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());        
        return t;
    }
    protected Thread newThread(Runnable r, String name) {
        return new FastThreadLocalThread(threadGroup, r, name);
    }

    步驟2

    步驟2是創建workerGroup中的NioEventLoop。在示例代碼中,傳進來的線程數是0,顯然不可能真的只創建0個nioEventLoop線程。在調用父類MultithreadEventLoopGroup構造函數時,對線程數進行了判斷,若為0,則傳入默認線程數,該值默認為2倍CPU核心數

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    // 靜態代碼塊初始化DEFAULT_EVENT_LOOP_THREADS
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads",     NettyRuntime.availableProcessors() * 2));
    }    

    接下來是通過newChild方法為每一個EventExecutor創建一個對應的NioEventLoop。這個方法傳入了一些args到NioEventLoop中,前三個是在NioEventLoopGroup創建時傳過來的參數。默認值見上文

    1. SlectorProvider.provider, 用於創建 Java NIO Selector 對象;
    2. SelectStrategyFactory, 選擇策略工廠;
    3. RejectedExecutionHandlers, 拒絕執行處理器;
    4. EventLoopTaskQueueFactory,任務隊列工廠,默認為null;

    進入NioEventLoop的構造函數,如下:

    NioEventLoop構造函數
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                     EventLoopTaskQueueFactory queueFactory) {
            super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                    rejectedExecutionHandler);
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            }
            if (strategy == null) {
                throw new NullPointerException("selectStrategy");
            }
            provider = selectorProvider;
            final SelectorTuple selectorTuple = openSelector();
            selector = selectorTuple.selector;
            unwrappedSelector = selectorTuple.unwrappedSelector;
            selectStrategy = strategy;
        }
    // 父類構造函數    
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                            boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                            RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
        this.executor = ThreadExecutorMap.apply(executor, this);
        this.taskQueue = ObjectUtil.checkNotNull(taskQueue, taskQueue");
        rejectedExecutionHandler = ObjectUtil.checkNotNullrejectedHandler, "rejectedHandler");
    }    

    首先調用一個newTaskQueue方法創建一個任務隊列。這是一個mpsc即多生產者單消費者的無鎖隊列。之後調用父類的構造函數,在父類的構造函數中,將NioEventLoopGroup設置為自己的parent,並通過匿名內部類創建了這樣一個Executor————通過ThreadPerTaskExecutor執行傳進來的任務,並且在執行時將當前線程與NioEventLoop綁定。其他屬性也一一設置。
    在nioEventLoop構造函數中,我們發現創建了一個selector,不妨看一看netty對它的包裝。

    unwrappedSelector = provider.openSelector();
    if (DISABLE_KEY_SET_OPTIMIZATION) {
        return new SelectorTuple(unwrappedSelector);
    }

    首先看到netty定義了一個常量DISABLE_KEY_SET_OPTIMIZATION,如果這個常量設置為true,也即不對keyset進行優化,則直接返回未包裝的selector。那麼netty對selector進行了哪些優化?

    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
    final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
    
        SelectionKey[] keys;
        int size;
    
        SelectedSelectionKeySet() {
            keys = new SelectionKey[1024];
        }
    } 

    往下,我們看到了一個叫做selectedSelectionKeySet的類,點進去可以看到,它繼承了AbstractSet,然而它的成員變量卻讓我們想到了ArrayList,再看看它定義的方法,除了不支持remove和contains外,活脫脫一個簡化版的ArrayList,甚至也支持擴容。
    沒錯,netty確實通過反射的方式,將selectionKey從Set替換為了ArrayList。仔細一想,卻又覺得此番做法有些道理。眾所周知,雖然HashSet和ArrayList隨機查找的時間複雜度都是o(1),但相比數組直接通過偏移量定位,HashSet由於需要Hash運算,時間消耗上又稍稍遜色了些。再加上使用場景上,都是獲取selectionKey集合然後遍歷,Set去重的特性完全用不上,也無怪乎追求性能的netty想要替換它了。

    步驟3

    創建完workerGroup的NioEventLoop后,如何挑選一個nioEventLoop進行工作是netty接下來想要做的事。一般來說輪詢是一個很容易想到的方案,為此需要創建一個類似負載均衡作用的線程選擇器。當然追求性能到喪心病狂的netty是不會輕易滿足的。我們看看netty在這樣常見的方案里又做了哪些操作。

    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
    // PowerOfTwo
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
    // Generic
    public EventExecutor next() {
        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }

    可以看到,netty根據workerGroup內線程的數量採取了2種不同的線程選擇器,當線程數x是2的冪次方時,可以通過&(x-1)來達到對x取模的效果,其他情況則需要直接取模。這與hashmap強制設置容量為2的冪次方有異曲同工之妙。

    步驟4

    步驟4就是添加一些保證健壯性而添加的監聽器了,這些監聽器會在EventLoop被關閉后得到通知。

    步驟5

    創建一個只讀的NioEventLoop線程組

    到此NioEventLoopGroup及其包含的NioEventLoop組就創建完成了

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
    【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包”嚨底家”

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    小三通海運與一般國際貿易有何不同?

    小三通快遞通關作業有哪些?

  • Github PageHelper 原理解析

    任何服務對數據庫的日常操作,都離不開增刪改查。如果一次查詢的紀錄很多,那我們必須採用分頁的方式。對於一個Springboot項目,訪問和查詢MySQL數據庫,持久化框架可以使用MyBatis,分頁工具可以使用github的 PageHelper。我們來看一下PageHelper的使用方法:

     1 // 組裝查詢條件
     2 ArticleVO articleVO = new ArticleVO();
     3 articleVO.setAuthor("劉慈欣");
     4 
     5 // 初始化返回類
     6 // ResponsePages類是這樣一種返回類,其中包括返回代碼code和返回消息msg
     7 // 還包括返回的數據和分頁信息
     8 // 其中,分頁信息就是 com.github.pagehelper.Page<?> 類型
     9 ResponsePages<List<ArticleVO>> responsePages = new ResponsePages<>();
    10 
    11 // 這裏為了簡單,寫死分頁參數。正確的做法是從查詢條件中獲取
    12 // 假設需要獲取第1頁的數據,每頁20條記錄
    13 // com.github.pagehelper.Page<?> 類的基本字段如下
    14 // pageNum: 當前頁
    15 // pageSize: 每頁條數
    16 // total: 總記錄數
    17 // pages: 總頁數
    18 com.github.pagehelper.Page<?> page = PageHelper.startPage(1, 20);
    19 
    20 // 根據條件獲取文章列表
    21 List<ArticleVO> articleList = articleMapper.getArticleListByCondition(articleVO);
    22 
    23 // 設置返回數據
    24 responsePages.setData(articleList);
    25 
    26 // 設置分頁信息
    27 responsePages.setPage(page);

      

    如代碼所示,page 是組裝好的分頁參數,即每頁显示20條記錄,並且显示第1頁。然後我們執行mapper的獲取文章列表的方法,返回了結果。此時我們查看 responsePages 的內容,可以看到 articleList 中有20條記錄,page中包括當前頁,每頁條數,總記錄數,總頁數等信息。   使用方法就是這麼簡單,但是僅僅知道如何使用還不夠,還需要對原理有所了解。下面就來看看,PageHelper 實現分頁的原理。   我們先來看看 startPage 方法。進入此方法,發現一堆方法重載,最後進入真正的 startPage 方法,有5個參數,如下所示:

     1 /**
     2  * 開始分頁
     3  *
     4  * @param pageNum      頁碼
     5  * @param pageSize     每頁显示數量
     6  * @param count        是否進行count查詢
     7  * @param reasonable   分頁合理化,null時用默認配置
     8  * @param pageSizeZero true 且 pageSize=0 時返回全部結果,false時分頁, null時用默認配置
     9  */
    10 public static <E> Page<E> startPage(int pageNum, int pageSize, boolean count, Boolean reasonable, Boolean pageSizeZero) {
    11     Page<E> page = new Page<E>(pageNum, pageSize, count);
    12     page.setReasonable(reasonable);
    13     page.setPageSizeZero(pageSizeZero);
    14     // 當已經執行過orderBy的時候
    15     Page<E> oldPage = SqlUtil.getLocalPage();
    16     if (oldPage != null && oldPage.isOrderByOnly()) {
    17         page.setOrderBy(oldPage.getOrderBy());
    18     }
    19     SqlUtil.setLocalPage(page);
    20     return page;
    21 }

      

    getLocalPage 和 setLocalPage 方法做了什麼操作?我們進入基類 BaseSqlUtil 看一下:

     1 package com.github.pagehelper.util;
     2 ...
     3 
     4 public class BaseSqlUtil {
     5     // 省略其他代碼
     6 
     7     private static final ThreadLocal<Page> LOCAL_PAGE = new ThreadLocal<Page>();
     8     
     9     /**
    10      * 從 ThreadLocal<Page> 中獲取 page
    11      */ 
    12     public static <T> Page<T> getLocalPage() {
    13         return LOCAL_PAGE.get();
    14     }
    15     
    16     /**
    17      * 將 page 設置到 ThreadLocal<Page>
    18      */
    19     public static void setLocalPage(Page page) {
    20         LOCAL_PAGE.set(page);
    21     }
    22 
    23     // 省略其他代碼
    24 }

     

    原來是將 page 放入了 ThreadLocal<Page> 中。ThreadLocal 是每個線程獨有的變量,與其他線程不影響,是放置 page 的好地方。 setLocalPage 之後,一定有地方 getLocalPage,我們跟蹤進入代碼來看。   有了MyBatis動態代理的知識后,我們知道最終執行SQL的地方是 MapperMethod 的 execute 方法,作為回顧,我們來看一下:

     1 package org.apache.ibatis.binding;
     2 ...
     3 
     4 public class MapperMethod {
     5 
     6     public Object execute(SqlSession sqlSession, Object[] args) {
     7         Object result;
     8         if (SqlCommandType.INSERT == command.getType()) {
     9             // 省略
    10         } else if (SqlCommandType.UPDATE == command.getType()) {
    11             // 省略
    12         } else if (SqlCommandType.DELETE == command.getType()) {
    13             // 省略
    14         } else if (SqlCommandType.SELECT == command.getType()) {
    15             if (method.returnsVoid() && method.hasResultHandler()) {
    16                 executeWithResultHandler(sqlSession, args);
    17                 result = null;
    18             } else if (method.returnsMany()) {
    19                 /**
    20                  * 獲取多條記錄
    21                  */
    22                 result = executeForMany(sqlSession, args);
    23             } else if ...
    24                 // 省略
    25         } else if (SqlCommandType.FLUSH == command.getType()) {
    26             // 省略
    27         } else {
    28             throw new BindingException("Unknown execution method for: " + command.getName());
    29         }
    30         ...
    31         
    32         return result;
    33     }
    34 }

      

    由於執行的是select操作,並且需要查詢多條紀錄,所以我們進入 executeForMany 這個方法中,然後進入 selectList 方法,然後是 executor.query 方法。再然後突然進入到了 mybatis 的 Plugin 類的 invoke 方法,這是為什麼?   這裏就必須提到 mybatis 提供的 Interceptor 接口。
    Intercept 機制讓我們可以將自己製作的分頁插件 intercept 到查詢語句執行的地方,這是MyBatis對外提供的標準接口。藉助於Java的動態代理,標準的攔截器可以攔截在指定的數據庫訪問流程中,執行攔截器自定義的邏輯,比如在執行SQL之前攔截,拼裝一個分頁的SQL並執行。   讓我們回到MyBatis初始化的時候,我們發現 MyBatis 為我們組裝了 sqlSessionFactory,所有的 sqlSession 都是生成自這個 Factory。在這篇文章中,我們將重點放在 interceptorChain 上。程序啟動時,MyBatis 或者是 mybatis-spring 會掃描代碼中所有實現了 interceptor 接口的插件,並將它們以【攔截器集合】的方式,存儲在 interceptorChain 中。如下所示:

    # sqlSessionFactory 中的重要信息
    
    sqlSessionFactory
        configuration
            environment        
            mapperRegistry
                config         
                knownMappers   
            mappedStatements   
            resultMaps         
            sqlFragments       
            interceptorChain   # MyBatis攔截器調用鏈
                interceptors   # 攔截器集合,記錄了所有實現了Interceptor接口,並且使用了invocation變量的類

      

    如果MyBatis檢測到有攔截器,它就會在攔截器指定的執行點,首先執行 Plugin 的 invoke 方法,喚醒攔截器,然後執行攔截器定義的邏輯。因此,當 query 方法即將執行的時候,其實執行的是攔截器的邏輯。   MyBatis官網的說明: MyBatis 允許你在已映射語句執行過程中的某一點進行攔截調用。默認情況下,MyBatis 允許使用插件來攔截的方法調用包括:

    • Executor (update, query, flushStatements, commit, rollback, getTransaction, close, isClosed)
    • ParameterHandler (getParameterObject, setParameters)
    • ResultSetHandler (handleResultSets, handleOutputParameters)
    • StatementHandler (prepare, parameterize, batch, update, query)

      如果想了解更多攔截器的知識,可以看文末的參考資料。   我們回到主線,繼續看Plugin類的invoke方法:

     1 package org.apache.ibatis.plugin;
     2 ...
     3 
     4 public class Plugin implements InvocationHandler {
     5     ...
     6 
     7     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
     8         try {
     9            Set<Method> methods = signatureMap.get(method.getDeclaringClass());
    10            if (methods != null && methods.contains(method)) {
    11                // 執行攔截器的邏輯
    12                return interceptor.intercept(new Invocation(target, method, args));
    13            }
    14            return method.invoke(target, args);
    15        } catch (Exception e) {
    16            throw ExceptionUtil.unwrapThrowable(e);
    17        }
    18    }
    19    ...
    20 }

     

    我們去看 intercept 方法的實現,這裏我們進入【PageHelper】類來看:

     1 package com.github.pagehelper;
     2 ...
     3 
     4 /**
     5  * Mybatis - 通用分頁攔截器
     6  */
     7 @SuppressWarnings("rawtypes")
     8 @Intercepts(@Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class}))
     9 public class PageHelper extends BasePageHelper implements Interceptor {
    10     private final SqlUtil sqlUtil = new SqlUtil();
    11 
    12     @Override
    13     public Object intercept(Invocation invocation) throws Throwable {
    14         // 執行 sqlUtil 的攔截邏輯
    15         return sqlUtil.intercept(invocation);
    16     }
    17 
    18     @Override
    19     public Object plugin(Object target) {
    20         return Plugin.wrap(target, this);
    21     }
    22 
    23     @Override
    24     public void setProperties(Properties properties) {
    25         sqlUtil.setProperties(properties);
    26     }
    27 }

     

    可以看到最終調用了 SqlUtil 的intercept 方法,裏面的 doIntercept 方法是 PageHelper 原理中最重要的方法。跟進來看:

      1 package com.github.pagehelper.util;
      2 ...
      3 
      4 public class SqlUtil extends BaseSqlUtil implements Constant {
      5     ...
      6     
      7     /**
      8      * 真正的攔截器方法
      9      *
     10      * @param invocation
     11      * @return
     12      * @throws Throwable
     13      */
     14     public Object intercept(Invocation invocation) throws Throwable {
     15         try {
     16             return doIntercept(invocation);  // 執行攔截
     17         } finally {
     18             clearLocalPage();  // 清空 ThreadLocal<Page>
     19         }
     20     }
     21     
     22     /**
     23      * 真正的攔截器方法
     24      *
     25      * @param invocation
     26      * @return
     27      * @throws Throwable
     28      */
     29     public Object doIntercept(Invocation invocation) throws Throwable {
     30         // 省略其他代碼
     31         
     32         // 調用方法判斷是否需要進行分頁
     33         if (!runtimeDialect.skip(ms, parameterObject, rowBounds)) {
     34             ResultHandler resultHandler = (ResultHandler) args[3];
     35             // 當前的目標對象
     36             Executor executor = (Executor) invocation.getTarget();
     37             
     38             /**
     39              * getBoundSql 方法執行后,boundSql 中保存的是沒有 limit 的sql語句
     40              */
     41             BoundSql boundSql = ms.getBoundSql(parameterObject);
     42             
     43             // 反射獲取動態參數
     44             Map<String, Object> additionalParameters = (Map<String, Object>) additionalParametersField.get(boundSql);
     45             // 判斷是否需要進行 count 查詢,默認需要
     46             if (runtimeDialect.beforeCount(ms, parameterObject, rowBounds)) {
     47                 // 省略代碼
     48                 
     49                 // 執行 count 查詢
     50                 Object countResultList = executor.query(countMs, parameterObject, RowBounds.DEFAULT, resultHandler, countKey, countBoundSql);
     51                 Long count = (Long) ((List) countResultList).get(0);
     52                 
     53                 // 處理查詢總數,從 ThreadLocal<Page> 中取出 page 並設置 total
     54                 runtimeDialect.afterCount(count, parameterObject, rowBounds);
     55                 if (count == 0L) {
     56                     // 當查詢總數為 0 時,直接返回空的結果
     57                     return runtimeDialect.afterPage(new ArrayList(), parameterObject, rowBounds);
     58                 }
     59             }
     60             // 判斷是否需要進行分頁查詢
     61             if (runtimeDialect.beforePage(ms, parameterObject, rowBounds)) {
     62                 /**
     63                  * 生成分頁的緩存 key
     64                  * pageKey變量是分頁參數存放的地方
     65                  */
     66                 CacheKey pageKey = executor.createCacheKey(ms, parameterObject, rowBounds, boundSql);
     67                 /**
     68                  * 處理參數對象,會從 ThreadLocal<Page> 中將分頁參數取出來,放入 pageKey 中
     69                  * 主要邏輯就是這樣,代碼就不再單獨貼出來了,有興趣的同學可以跟進驗證
     70                  */
     71                 parameterObject = runtimeDialect.processParameterObject(ms, parameterObject, boundSql, pageKey);
     72                 /**
     73                  * 調用方言獲取分頁 sql
     74                  * 該方法執行后,pageSql中保存的sql語句,被加上了 limit 語句
     75                  */
     76                 String pageSql = runtimeDialect.getPageSql(ms, boundSql, parameterObject, rowBounds, pageKey);
     77                 BoundSql pageBoundSql = new BoundSql(ms.getConfiguration(), pageSql, boundSql.getParameterMappings(), parameterObject);
     78                 //設置動態參數
     79                 for (String key : additionalParameters.keySet()) {
     80                     pageBoundSql.setAdditionalParameter(key, additionalParameters.get(key));
     81                 }
     82                 /**
     83                  * 執行分頁查詢
     84                  */
     85                 resultList = executor.query(ms, parameterObject, RowBounds.DEFAULT, resultHandler, pageKey, pageBoundSql);
     86             } else {
     87                 resultList = new ArrayList();
     88             }
     89         } else {
     90             args[2] = RowBounds.DEFAULT;
     91             // 不需要分頁查詢,執行原方法,不走代理
     92             resultList = (List) invocation.proceed();
     93         }
     94         /**
     95          * 主要邏輯:
     96          * 從 ThreadLocal<Page> 中取出 page
     97          * 將 resultList 塞進 page,並返回
     98          */
     99         return runtimeDialect.afterPage(resultList, parameterObject, rowBounds);
    100     }
    101     ...
    102 }

     

    Count 查詢語句 countBoundSql 被執行了,分頁查詢語句 pageBoundSql 也被執行了。然後從 ThreadLocal<Page> 中將page 取出來,設置記錄總數,每頁條數等信息,同時也將查詢到的記錄塞進page,最後返回。再之後就是mybatis的常規後續操作了。

     

    知識拓展

    我們來看看 PageHelper 支持哪些數據庫的分頁操作:

    1. Oracle
    2. Mysql
    3. MariaDB
    4. SQLite
    5. Hsqldb
    6. PostgreSQL
    7. DB2
    8. SqlServer(2005,2008)
    9. Informix
    10. H2
    11. SqlServer2012
    12. Derby
    13. Phoenix

      原來 PageHelper 支持這麼多數據庫,那麼持久化工具mybatis為什麼不一口氣把分頁也做了呢? 其實mybatis也有自帶的分頁方法:RowBounds。
    RowBounds簡單地來說包括 offset 和 limit。實現原理是將所有符合條件的記錄獲取出來,然後丟棄 offset 之前的數據,只獲取 limit 條數據。這種做法效率低下,個人猜想mybatis只想把數據庫連接和SQL執行這方面做精做強,至於如分頁之類的細節,本身提供Intercept接口,讓第三方實現該接口來完成分頁。PageHelper 就是這樣的第三方分頁插件。甚至你可以實現該接口,製作你自己的業務邏輯,攔截到任何MyBatis允許你攔截的地方。  

    總結

    PageHelper 的分頁原理,最核心的部分是實現了 MyBatis 的 Interceptor 接口,從而將分頁參數攔截在執行sql之前,拼裝出分頁sql到數據庫中執行。 初始化的時候,因為 PageHelper 的 SqlUtil 中實例化了 intercept 方法,因此MyBatis 將它視作一個攔截器,記錄在 interceptorChain 中。 執行的時候,PageHelper首先將 page 需求記錄在 ThreadLocal<Page> 中,然後在攔截的時候,從 ThreadLocal<Page> 中取出 page,拼裝出分頁sql,然後執行。 同時將結果分頁信息(包括當前頁,每頁條數,總頁數,總記錄數等)設置回page,讓業務代碼可以獲取。  

    參考資料

    • PageHelper淺析:
    • MyBatis攔截器:
    • ThreadLocal理解:

     

    創作時間:2019-11-20 21:21

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

    ※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

    台灣寄大陸海運貨物規則及重量限制?

    大陸寄台灣海運費用試算一覽表

    台中搬家,彰化搬家,南投搬家前需注意的眉眉角角,別等搬了再說!

  • 寶馬投資Scoop拼車應用公司進軍汽車共享市場

    據《路透社》報導,寶馬集團透露,其旗下風險投資部門iVentures已經為加利福尼亞拼車應用公司Scoop提供了一筆投資金額,具體數目未知。

    Scoop公司目前位於三藩市港灣區,通過智慧手機支援為乘客提供拼車服務,其開發出的應用軟體可將相鄰社區及工作區域的人們連接起來,共同拼車。寶馬此舉為汽車製造商為初創公司投資的最新一次行動,使消費者在未擁有私人車輛或經常駕駛的情況下也能夠出行。

    此外,寶馬還分別為車隊管理軟體公司RideCell、泊車點定位服務Zirx、簡化公共交通系統服務Moovit、從智慧手機上集成資料教授司機如何安全駕駛的Zendrive公司進行了投資。

    本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包”嚨底家”

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    小三通海運與一般國際貿易有何不同?

    小三通快遞通關作業有哪些?