標籤: 網頁設計公司

  • 這 10 行比較字符串相等的代碼給我整懵逼了,不信你也來看看

    這 10 行比較字符串相等的代碼給我整懵逼了,不信你也來看看

    抱歉用這種標題吸引你點進來了,不過你不妨看完,看看能否讓你有所收穫。​(有收穫,請評論區留個言,沒收穫,下周末我直播吃**,哈哈,這你也信)

    補充說明:微信公眾號改版,對各個號主影響還挺大的。目前從後台數據來看,對我影響不大,因為我這反正都是小號,閱讀量本身就少的可憐,真相了,狗頭(剛從交流群學會的表情)。

    先直接上代碼:

    boolean safeEqual(String a, String b) {
       if (a.length() != b.length()) {
           return false;
       }
       int equal = 0;
       for (int i = 0; i < a.length(); i++) {
           equal |= a.charAt(i) ^ b.charAt(i);
       }
       return equal == 0;
    }

    上面的代碼是我根據原版(Scala)翻譯成 Java的,Scala 版本(最開始吸引程序猿石頭注意力的代碼)如下:

    def safeEqual(a: String, b: String) = {
      if (a.length != b.length) {
        false
      } else {
        var equal = 0
        for (i <- Array.range(0, a.length)) {
          equal |= a(i) ^ b(i)
        }
        equal == 0
      }
    }

    剛開始看到這段源碼感覺挺奇怪的,這個函數的功能是比較兩個字符串是否相等,首先“長度不等結果肯定不等,立即返回”這個很好理解。

    再看看後面的,稍微動下腦筋,轉彎下也能明白這其中的門道:通過異或操作1^1=0, 1^0=1, 0^0=0,來比較每一位,如果每一位都相等的話,兩個字符串肯定相等,最後存儲累計異或值的變量equal必定為 0,否則為 1。

    再細想一下呢?

    for (i <- Array.range(0, a.length)) {
      if (a(i) ^ b(i) != 0// or a(i) != b[i]
        return false
    }

    我們常常講性能優化,從效率角度上講,難道不是應該只要中途發現某一位的結果不同了(即為1)就可以立即返回兩個字符串不相等了嗎?(如上所示)

    這其中肯定有……

    再再細想一下呢?

    結合方法名稱 safeEquals 可能知道些眉目,與安全有關。

    本文開篇的代碼來自playframewok 里用來驗證cookie(session)中的數據是否合法(包含簽名的驗證),也是石頭寫這篇文章的由來。

    以前知道通過延遲計算等手段來提高效率的手段,但這種已經算出結果卻延遲返回的,還是頭一回!

    我們來看看,JDK 中也有類似的方法,如下代碼摘自 java.security.MessageDigest

    public static boolean isEqual(byte[] digesta, byte[] digestb) {
       if (digesta == digestb) return true;
       if (digesta == null || digestb == null) {
           return false;
       }
       if (digesta.length != digestb.length) {
           return false;
       }

       int result = 0;
       // time-constant comparison
       for (int i = 0; i < digesta.length; i++) {
           result |= digesta[i] ^ digestb[i];
       }
       return result == 0;
    }

    看註釋知道了,目的是為了用常量時間複雜度進行比較。

    但這個計算過程耗費的時間不是常量有啥風險? (腦海里響起了背景音樂:“小朋友,你是否有很多問號?”)

    真相大白

    再深入探索和了解了一下,原來這麼做是為了防止計時攻擊(Timing Attack)。(也有人翻譯成時序攻擊​)​

    計時攻擊(Timing Attack)

    計時攻擊是邊信道攻擊(或稱”側信道攻擊”, Side Channel Attack, 簡稱SCA) 的一種,邊信道攻擊是一種針對軟件或硬件設計缺陷,走“歪門邪道”的一種攻擊方式。

    這種攻擊方式是通過功耗、時序、電磁泄漏等方式達到破解目的。在很多物理隔絕的環境中,往往也能出奇制勝,這類新型攻擊的有效性遠高於傳統的密碼分析的數學方法(某百科上說的)。

    這種手段可以讓調用 safeEquals("abcdefghijklmn", "xbcdefghijklmn") (只有首位不相同)和調用 safeEquals("abcdefghijklmn", "abcdefghijklmn") (兩個完全相同的字符串)的所耗費的時間一樣。防止通過大量的改變輸入並通過統計運行時間來暴力破解出要比較的字符串。

    舉個,如果用之前說的“高效”的方式來實現的話。假設某個用戶設置了密碼為 password,通過從a到z(實際範圍可能更廣)不斷枚舉第一位,最終統計發現 p0000000 的運行時間比其他從任意a~z的都長(因為要到第二位才能發現不同,其他非 p 開頭的字符串第一位不同就直接返回了),這樣就能猜測出用戶密碼的第一位很可能是p了,然後再不斷一位一位迭代下去最終破解出用戶的密碼。

    當然,以上是從理論角度分析,確實容易理解。但實際上好像通過統計運行時間總感覺不太靠譜,這個運行時間對環境太敏感了,比如網絡,內存,CPU負載等等都會影響。

    但安全問題感覺更像是 “寧可信其有,不可信其無”。為了防止(特別是與簽名/密碼驗證等相關的操作)被 timing attack,目前各大語言都提供了相應的安全比較函數。各種軟件系統(例如 OpenSSL)、框架(例如 Play)的實現也都採用了這種方式。

    例如 “世界上最好的編程語言”(粉絲較少,評論區應該打不起架來)—— php中的:

    // Compares two strings using the same time whether they're equal or not.
    // This function should be used to mitigate timing attacks; 
    // for instance, when testing crypt() password hashes.
    bool hash_equals ( string $known_string , string $user_string )

    //This function is safe against timing attacks.
    boolean password_verify ( string $password , string $hash )

    其實各種語言版本的實現方式都與上面的版本差不多,將兩個字符串每一位取出來異或(^)並用或(|)保存,最後通過判斷結果是否為 0 來確定兩個字符串是否相等。

    如果剛開始沒有用 safeEquals 去實現,後續的版本還會通過打補丁的方式去修復這樣的安全隱患。

    例如 JDK 1.6.0_17 中的Release Notes[1]中就提到了MessageDigest.isEqual 中的bug的修復,如下圖所示:

    MessageDigest timing attack vulnerabilities

    大家可以看看這次變更的的詳細信息openjdk中的 bug fix diff[2]為:

    MessageDigest.isEqual計時攻擊

    Timing Attack 真的可行嗎?

    我覺得各大語言的 API 都用這種實現,肯定還是有道理的,理論上應該可以被利用的。 這不,學術界的這篇論文就宣稱用這種計時攻擊的方法破解了 OpenSSL 0.9.7 的RSA加密算法了。關於 RSA 算法的介紹可以看看之前本人寫的這篇文章。

    這篇Remote Timing Attacks are Practical[3] 論文中指出(我大致翻譯下摘要,感興趣的同學可以通過文末鏈接去看原文):

    計時攻擊往往用於攻擊一些性能較弱的計算設備,例如一些智能卡。我們通過實驗發現,也能用於攻擊普通的軟件系統。本文通過實驗證明,通過這種計時攻擊方式能夠攻破一個基於 OpenSSL 的 web 服務器的私鑰。結果證明計時攻擊用於進行網絡攻擊在實踐中可行的,因此各大安全系統需要抵禦這種風險。

    最後,本人畢竟不是專研完全方向,以上描述是基於本人的理解,如果有不對的地方,還請大家留言指出來。感謝。

    補充說明2:感謝正在閱讀文章的你,讓我還有動力繼續堅持更新原創。

    本人發文不多,但希望寫的文章能達到的目的是:佔用你的閱讀時間,就盡量能夠讓你有所收穫。

    如果你覺得我的文章有所幫助,還請你幫忙轉發分享,另外請別忘了點擊公眾號右上角加個星標,好讓你別錯過後續的精彩文章(微信改版了,或許我發的文章都不能推送到你那了)。

    ​原創真心不易,希望你能幫我個小忙唄,如果本文內容你覺得有所啟發,有所收穫,請幫忙點個“在看”唄,或者轉發分享讓更多的小夥伴看到。 ​ 參考資料:

    • Timing Attacks on RSA: Revealing Your Secrets through the Fourth Dimension
    • Remote Timing Attacks are Practical

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※為什麼 USB CONNECTOR 是電子產業重要的元件?

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※台北網頁設計公司全省服務真心推薦

    ※想知道最厲害的網頁設計公司"嚨底家"!

    ※推薦評價好的iphone維修中心

  • Express4.x之中間件與路由詳解及源碼分析,Express4.x之API:express,Express4.x之API:express

    Express4.x之中間件與路由詳解及源碼分析,Express4.x之API:express,Express4.x之API:express

    • Application.use()
    • Application.router()
    • express核心源碼模擬

     一、express.use()

    1.1app.use([path,] callback [, callback …])

    通過語法結構可以看到Application.use()參數分別有以下幾種情形:

    app.use(function(){...}); //給全局添加一个中間件
    app.use(path,function(){...}); //給指定路由path添加一个中間件
    app.use(function(){...}, function(){...}, ...); //給全局添加n个中間件
    app.use(path,function(){...},function(){...}, ...); //給指定路由path添加n个中間件

    關於path最簡單也是最常用的就是字符串類型(例:‘/abcd’);除了字符串Express還提供了模板和正則格式(例:’/abc?d‘, ‘/ab+cd‘, ‘/ab\*cd‘, ‘/a(bc)?d‘, ‘/\/abc|\/xyz/‘);除了單個的字符串和模板還可以將多個path作為一個數組的元素,然後將這個數組作為use的path,這樣就可以同時給多個路由添加中間件,詳細內容可以參考官方文檔:https://www.expressjs.com.cn/4x/api.html#path-examples。

    關於callbakc多個或單个中間件程序這已經再語法結構中直觀的體現出來了,這裏重點來看看回調函數的參數:

    app.use(function(req,res,next){...}); //必須提供的參數 
    app.use(function(err,req,res,next){...}); //錯誤中間件需要在最前面添加一個錯誤參數

    關於中間件的簡單應用:

    let express = require('./express');
    let app = express();
    app.use('/',function(req,res,next){
        console.log("我是一個全局中間件");
        next(); //每个中間件的最末尾必須調用next
    });
    
    app.use('/',function(err,req,res,next){
        console.log("我是一個全局錯誤中間,當發生錯誤是調用")    
        console.error(err.stack);
        res.status(500).send('服務出錯誤了!');
        //由於這個錯誤處理直接響應了客戶端,可以不再調用next,當然後面還需要處理一些業務的話也是可以調用next的
    });

    1.2簡單的模擬Express源碼實現Appliction.use()以及各個請求方法的響應註冊方法(這裏個源碼模擬路由概念還比較模糊,所以使用請求方法的響應註冊API,而沒有使用路由描述):

     1 //文件結構
     2 express
     3     index.js
     4 //源碼模擬實現
     5 let http = require("http");
     6 let url = require('url');
     7 function createApplication(){
     8     //app是一個監聽函數
     9     let app = (req,res) =>{
    10         //取出每一個層
    11         //1.獲取請求的方法
    12         let m = req.method.toLowerCase();
    13         let {pathname} = url.parse(req.url,true);
    14 
    15         //通過next方法進行迭代
    16         let index = 0;
    17         function next(err){
    18             //如果routes迭代完成還沒有找到,說明路徑不存在
    19             if(index === app.routes.length) return res.end(`Cannot ${m} ${pathname}`);
    20             let {method, path, handler} = app.routes[index++];//每次調用next就應該取下一個layer
    21             if(err){
    22                 if(handler.length === 4){
    23                     handler(err,req,res,next);
    24                 }else{
    25                     next(err);
    26                 }
    27             }else{
    28                 if(method === 'middle'){ //處理中間件
    29                     if(path === '/' || path === pathname || pathname.startsWith(path+'/')){
    30                         handler(req,res,next);
    31                     }else{
    32                         next();//如果這个中間件沒有匹配到,繼續通過next迭代路由容器routes
    33                     }
    34                 }else{ //處理路由
    35                     if( (method === m || method ==='all') && (path === pathname || path === '*')){ //匹配請求方法和請求路徑(接口)
    36                         handler(req,res);//匹配成功后執行的Callback
    37                     }else{
    38                         next();
    39                     }
    40                 }
    41             }
    42         }
    43         next();
    44     }
    45     app.routes = [];//路由容器
    46     app.use = function(path,handler){
    47         if(typeof handler !== 'function'){
    48             handler = path;
    49             path = '/';
    50         }
    51         let layer = {
    52             method:'middle', //method是middle就表示它是一个中間件
    53             path,
    54             handler
    55         }
    56         app.routes.push(layer);
    57     }
    58     app.all = function(path,handler){
    59         let layer = {
    60             method:'all',
    61             path,
    62             handler
    63         }
    64         app.routes.push(layer);
    65     }
    66     console.log(http.METHODS);
    67     http.METHODS.forEach(method =>{
    68         method = method.toLocaleLowerCase();
    69         app[method] = function (path,handler){//批量生成各個請求方法的路由註冊方法
    70             let layer = {
    71                 method,
    72                 path,
    73                 handler
    74             }
    75             app.routes.push(layer);
    76         }
    77     });
    78     //內置中間件,給req擴展path、qury屬性
    79     app.use(function(req,res,next){
    80         let {pathname,query} = url.parse(req.url,true);
    81         let hostname = req.headers['host'].split(':')[0];
    82         req.path = pathname;
    83         req.query = query;
    84         req.hostname = hostname;
    85         next();
    86     });
    87     //通過app.listen調用http.createServer()掛在app(),啟動express服務
    88     app.listen = function(){
    89         let server = http.createServer(app);
    90         server.listen(...arguments);
    91     }
    92     return app;
    93 }
    94 module.exports = createApplication;

    View Code

    測試模擬實現的Express:

     1 let express = require('./express');
     2 
     3 let app = express();
     4 
     5 app.use('/',function(req,res,next){
     6     console.log("我是一個全局中間件");
     7     next();
     8 });
     9 app.use('/user',function(req,res,next){
    10     console.log("我是user接口的中間件");
    11     next();
    12 });
    13 app.get('/name',function(req,res){
    14     console.log(req.path,req.query,req.hostname);
    15     res.end('zfpx');
    16 });
    17 app.post('/name',function(req,res){
    18     res.end('post name');
    19 });
    20 
    21 
    22 app.all("*",function(req,res){
    23     res.end('all');
    24 });
    25 
    26 app.use(function(err,req,res,next){
    27     console.log(err);
    28     next();
    29 });
    30 
    31 app.listen(12306);

    View Code

    在windows系統下測試請求:

     

     

    關於源碼的構建詳細內容可以參考這個視頻教程:app.use()模擬構建視頻教程,前面就已經說明過這個模式實現僅僅是從表面的業務邏輯,雖然有一點底層的雛形,但與源碼還是相差甚遠,這一部分也僅僅只是想幫助理解Express採用最簡單的方式表現出來。

    1.3如果你看過上面的源碼或自己也實現過,就會發現Express關於中間件的添加方式除了app.use()還有app.all()及app.METHOD()。在模擬源碼中我並未就use和all的差異做處理,都是採用了請求路徑絕對等於path,這種方式是all的特性,use的path實際表示為請求路徑的開頭:

    app.use(path,callback):path表示請求路徑的開頭部分。

    app.all(path,callback):paht表示完全等於請求路徑。

    app.METHOD(path,callback):並不是真的有METHOD這個方法,而是指HTTP請求方法,實際上表示的是app.get()、app.post()、app.put()等方法,而有時候我們會將這些方法說成用來註冊路由,這是因為路由註冊的確使用這些方法,但同時這些方法也是可以用作中間的添加,這在前一篇博客中的功能解析中就有說明(Express4.x之API:express),詳細見過後面的路由解析就會更加明了。

     二、express.router()

    2.1在實例化一個Application時會實例化一個express.router()實例並被添加到app._router屬性上,實際上這個app使用的use、all、METHOD時都是在底層調用了該Router實例上對應的方法,比如看下面這些示例:

     1 let express = require("express");
     2 let app = express();
     3 
     4 app._router.use(function(req,res,next){
     5     console.log("--app.router--");
     6     next();
     7 });
     8 
     9 app._router.post("/csJSON",function(req,res,next){
    10     res.writeHead(200);
    11     res.write(JSON.stringify(req.body));
    12     res.end();
    13 });
    14 
    15 app.listen(12306);

    上面示例中的app._router.use、app._router.post分別同等與app.use、app.post,這裏到這裏也就說明了上一篇博客中的路由與Application的關係Express4.x之API:express。

    2.2Express中的Router除了為Express.Application提供路由功能以外,Express也將它作為一個獨立的路由工具分離了出來,也就是說Router自身可以獨立作為一個應用,如果我們在實際應用中有相關業務有類似Express.Application的路由需求,可以直接實例化一個Router使用,應用的方式如下:

    let express = require('/express');
    let router = express.Router();
    //這部分可以詳細參考官方文檔有詳細的介紹

    2.3由於這篇博客主要是分析Express的中間件及路由的底層邏輯,所以就不在這裏詳細介紹某個模塊的應用,如果有時間我再寫一篇關於Router模塊的應用,這裏我直接上一份模擬Express路由的代碼以供參考:

    文件結構:

    express //根路徑
        index.js //express主入口文件
        application.js //express應用構造模塊
        router //路由路徑
            index.js //路由主入口文件
            layer.js //構造層的模塊
            route.js //子路由模塊

    Express路由系統的邏輯結構圖:

    模擬代碼(express核心源碼模擬):

    1 //express主入口文件
    2 let Application = require('./application.js');
    3 
    4 function createApplication(){
    5     return new Application();
    6 }
    7 
    8 module.exports = createApplication;

    index.js //express主入口文件

     1 //用來創建應用app
     2 let http = require('http');
     3 let url = require('url');
     4 
     5 //導入路由系統模塊
     6 let Router = require('./router');
     7 
     8 const methods = http.METHODS;
     9 
    10 //Application ---- express的應用系統
    11 function Application(){
    12     //創建一個內置的路由系統
    13     this._router = new Router();
    14 }
    15 
    16 //app.get ---- 實現路由註冊業務
    17 // Application.prototype.get = function(path,...handlers){
    18 //     this._router.get(path,'use',handlers);
    19 // }
    20 
    21 methods.forEach(method => {
    22     method = method.toLocaleLowerCase();
    23     Application.prototype[method] = function(path,...handlers){
    24         this._router[method](path,handlers);
    25     }
    26 });
    27 
    28 //app.use ---- 實現中間件註冊業務
    29 //這裏模擬處理三種參數模式:
    30 // -- 1個回調函數:callback
    31 // -- 多個回調函數:[callback,] callback [,callback...]
    32 // -- 指定路由的中間件:[path,] callback [,callback...]
    33 // -- 注意源碼中可以處理這三種參數形式還可以處理上面數據的數組形式,以及其他Application(直接將其他app上的中間件添加到當前應用上)
    34 Application.prototype.use = function(fn){
    35     let path = '/';
    36     let fns = [];
    37     let arg = [].slice.call(arguments);
    38     if(typeof fn !== 'function' && arg.length >= 2){
    39         if(typeof arg[0] !== 'string'){
    40             fns = arg;
    41         }else{
    42             path = arg[0];
    43             fns = arg.slice(1);
    44         }
    45     }else{
    46         fns = arg;
    47     }
    48     this._router.use(path,'use',fns);
    49 }
    50 
    51 Application.prototype.all = function(fn){
    52     let path = '/';
    53     let fns = [];
    54     let arg = [].slice.call(arguments);
    55     if(typeof fn !== 'function' && arg.length >= 2){
    56         if(typeof arg[0] !== 'string'){
    57             fns = arg;
    58         }else{
    59             path = arg[0];
    60             fns = arg.slice(1);
    61         }
    62       }else{
    63         fns = arg;
    64     }
    65     this._router.use(path,'all',fns);
    66 }
    67 
    68 //將http的listen方法封裝到Application的原型上
    69 Application.prototype.listen = function(){
    70     let server = http.createServer((req,res)=>{
    71         //done 用於當路由無任何可匹配項時調用的處理函數
    72         function done(){
    73             res.end(`Cannot ${req.url} ${req.method}`);
    74         }
    75         this._router.handle(req,res,done); //調用路由系統的handle方法處理請求
    76     });
    77     server.listen(...arguments);
    78 };
    79 
    80 module.exports = Application;

    application.js //express應用構造模塊

     1 //express路由系統
     2 const Layer = require('./layer.js');
     3 const Route = require('./route.js');
     4 
     5 const http = require('http');
     6 const methods = http.METHODS;
     7 
     8 const url = require('url');
     9 
    10 
    11 //路由對象構造函數
    12 function Router(){
    13     this.stack = [];
    14 }
    15 
    16 //router.route ---- 用於創建子路由對象route與主路由上層(layer)的關係
    17 //並將主路由上的層緩存到路由對象的stack容器中,該層建立路徑與子路由處理請求的關係
    18 Router.prototype.route = function(path){
    19     let route = new Route();
    20     let layer = new Layer(path,route.dispatch.bind(route));
    21     this.stack.push(layer);
    22     return route;
    23 }
    24 
    25 //router.get ---- 實現路由註冊
    26 //實際上這個方法調用router.route方法分別創建一個主路由系統層、一個子路由系統,並建立兩者之間的關係,詳細見Router.prototype.route
    27 //然後獲取子路由系統對象,並將回調函數和請求方法註冊在這個子路由系統上
    28 
    29 
    30 // Router.prototype.get = function(path,handlers){
    31 //     let route = this.route(path);
    32 //     route.get(handlers);
    33 // }
    34 
    35 methods.forEach(method =>{ 
    36     method = method.toLocaleLowerCase();
    37     //注意下面這個方法會出現內存泄漏問題,有待改進
    38     Router.prototype[method] = function(path, handlers){
    39         let route = this.route(path);
    40         route[method](handlers);
    41     }
    42 });
    43 
    44 //router.use ---- 實現中間件註冊(按照路由開頭的路徑匹配,即相對路由匹配)
    45 Router.prototype.use = function(path,routerType,fns){
    46     let router = this;
    47     fns.forEach(function(fn){
    48         let layer = new Layer(path,fn);
    49         layer.middle = true; //標記這個層為相對路由中間件
    50         layer.routerType = routerType;
    51         router.stack.push(layer);
    52     });
    53 }
    54 
    55 //調用路由處理請求
    56 Router.prototype.handle = function(req,res,out){
    57     let {pathname} = url.parse(req.url);
    58     let index = 0;
    59     let next = () => {
    60         if(index >= this.stack.length) return out();
    61         let layer = this.stack[index++];
    62         if(layer.middle && (layer.path === '/' || pathname === layer.path || pathname.startsWith(layer.path + '/'))){
    63             //處理中間件
    64             if(layer.routerType === 'use'){
    65                 layer.handle_request(req,res,next);
    66             }else if(layer.routerType === 'all' && layer.path === pathname){
    67                 layer.handle_request(req,res,next);
    68             }else{
    69                 next();
    70             }
    71         }else if(layer.match(pathname)){
    72             //處理響應--更準確的說是處理具體請求方法上的中間件或響應
    73             layer.handle_request(req,res,next);
    74         }else{
    75             next();
    76         }
    77     }
    78     next();
    79 }
    80 
    81 module.exports = Router;

    index.js //路由主入口文件

     1 //Layer的構造函數
     2 function Layer(path,handler){
     3     this.path = path; //當前層的路徑
     4     this.handler = handler;  //當前層的回調函數
     5 }
     6 
     7 //判斷請求方法與當前層的方法是否一致
     8 Layer.prototype.match = function(pathname){
     9     return this.path === pathname;
    10 }
    11 
    12 //調用當前層的回調函數handler
    13 Layer.prototype.handle_request = function(req,res,next){
    14     this.handler(req,res,next);
    15 }
    16 
    17 module.exports = Layer;

    layer.js //構造層的模塊

     1 //Layer的構造函數
     2 function Layer(path,handler){
     3     this.path = path; //當前層的路徑
     4     this.handler = handler;  //當前層的回調函數
     5 }
     6 
     7 //判斷請求方法與當前層的方法是否一致
     8 Layer.prototype.match = function(pathname){
     9     return this.path === pathname;
    10 }
    11 
    12 //調用當前層的回調函數handler
    13 Layer.prototype.handle_request = function(req,res,next){
    14     this.handler(req,res,next);
    15 }
    16 
    17 module.exports = Layer;

    route.js //子路由模塊

    測試代碼:

     1 let express = require('./express');
     2 let app = express();
     3 
     4 
     5 app.use('/name',function(req,res,next){
     6     console.log('use1');
     7     next();
     8 });
     9 app.use(function(req,res,next){
    10     console.log('use2-1');
    11     next();
    12 },function(req,res,next){
    13     console.log('use2-2');
    14     next();
    15 });
    16 
    17 app.all('/name',function(req,res,next){
    18     console.log('all-1');
    19     next();
    20 });
    21 app.all('/name/app',function(req,res,next){
    22     console.log('all-2');
    23     next();
    24 });
    25 
    26 app.get('/name/app',function(req,res){
    27     res.end(req.url);
    28 });
    29 // console.log(app._router.stack);
    30 app.listen(12306);

    View Code

    測試結果:

     

     

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

  • Kubernetes-subpath的使用

    一、什麼是subpath

    為了支持單一個pod多次使用同一個volume而設計,subpath翻譯過來是子路徑的意思,如果是數據卷掛載在容器,指的是存儲卷目錄的子路徑,如果是配置項configMap/Secret,則指的是掛載在容器的子路徑。

     

    二、subpath的使用場景

    1、 1個pod中可以拉起多個容器,有時候希望將不同容器的路徑掛載在存儲卷volume的子路徑,這個時候需要用到subpath

    2、volume支持將configMap/Secret掛載在容器的路徑,但是會覆蓋掉容器路徑下原有的文件,如何支持選定configMap/Secret的每個key-value掛載在容器中,且不會覆蓋掉原目錄下的文件,這個時候也可以用到subpath

     

    三、subpath的使用

    1、存儲卷

        採用hostpath的方式創建PV,宿主機的映射目錄為/data/pod/volume5

    [root@k8s-master zhanglei]# cat pv-subpath.yaml 
    kind: PersistentVolume
    apiVersion: v1
    metadata:
      name: pv-subpath-05
      labels:
        release: stable
    spec:
      capacity:
        storage: 0.1Gi
      accessModes:
        - ReadWriteOnce
      persistentVolumeReclaimPolicy: Recycle
      hostPath:
        path: /data/pod/volume5                 # 宿主機的目錄

    [root@k8s-master zhanglei]# kubectl create -f pv-subpath.yaml  

    PV創建成功后,再創建PVC

    [root@k8s-master zhanglei]# cat pvc-subpath.yaml 
    apiVersion: v1
    kind: PersistentVolumeClaim
    metadata:
      name: pvc-subpath
      namespace: default
    spec:
     accessModes: ["ReadWriteOnce"]
     resources:
       requests: 
         storage: 0.05Gi
    [root@k8s-master zhanglei]# kubectl create -f pvc-subpath.yaml

    在pod中聲明並使用subpath

    [root@k8s-master zhanglei]# cat pod-subpath.yaml 
    apiVersion: v1
    kind: Pod
    metadata:
      name: pod-subpath-zltest
    spec:
        containers:
        - name: ubuntu-subpath-container
          image: ubuntu
          volumeMounts:
          - mountPath: /var/lib/ubuntu            # 容器1的掛載目錄
            name: subpath-vol
            subPath: ubuntutest                   # 宿主機volume5的子目錄1
        - name: nginx-subpath-container
          image: nginx
          volumeMounts:
          - mountPath: /var/www/nginx             # 容器2的掛載目錄
            name: subpath-vol
            subPath: nginxtest                   # 宿主機volume5的子目錄2 
        volumes:
        - name: subpath-vol
          persistentVolumeClaim:
            claimName: pvc-subpath               # PVC的名字

      [root@k8s-master zhanglei]# kubectl create -f pod-subpath.yaml

    [root@k8s-master zhanglei]# kubectl describe pod  pod-subpath-zltest 
    Name:         pod-subpath-zltest
    Namespace:    default
    Priority:     0
    Node:         k8s-master/192.168.126.129
    Start Time:   Fri, 29 May 2020 16:45:49 +0800
    Labels:       <none>
    Annotations:  cni.projectcalico.org/podIP: 10.122.235.235/32
                  cni.projectcalico.org/podIPs: 10.122.235.235/32
    Status:       Running
    IP:           10.122.235.235
    IPs:
      IP:  10.122.235.235
    Containers:
      ubuntu-subpath-container:
        Container ID:   docker://6e5cb30ee7e03b77d2ca22e4cd818ff326fa40836427fe17b1584646b4388dce
        Image:          ubuntu
        Image ID:       docker-pullable://ubuntu@sha256:747d2dbbaaee995098c9792d99bd333c6783ce56150d1b11e333bbceed5c54d7
        Port:           <none>
        Host Port:      <none>
        State:          Waiting
          Reason:       CrashLoopBackOff
        Last State:     Terminated
          Reason:       Completed
          Exit Code:    0
          Started:      Sun, 14 Jun 2020 22:38:11 +0800
          Finished:     Sun, 14 Jun 2020 22:38:11 +0800
        Ready:          False
        Restart Count:  558
        Environment:    <none>
        Mounts:
          /var/lib/ubuntu from subpath-vol (rw,path="ubuntutest")
          /var/run/secrets/kubernetes.io/serviceaccount from default-token-74s86 (ro)
      nginx-subpath-container:
        Container ID:   docker://95101741eb1b6aa4c1e53d8fc4ab8006e74fd2eb923eca211ca20a01edcd7630
        Image:          nginx
        Image ID:       docker-pullable://nginx@sha256:30dfa439718a17baafefadf16c5e7c9d0a1cde97b4fd84f63b69e13513be7097
        Port:           <none>
        Host Port:      <none>
        State:          Running
          Started:      Fri, 29 May 2020 16:47:14 +0800
        Ready:          True
        Restart Count:  0
        Environment:    <none>
        Mounts:
          /var/run/secrets/kubernetes.io/serviceaccount from default-token-74s86 (ro)
          /var/www/nginx from subpath-vol (rw,path="nginxtest")
    Conditions:
      Type              Status
      Initialized       True 
      Ready             False 
      ContainersReady   False 
      PodScheduled      True 
    Volumes:
      subpath-vol:
        Type:       PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
        ClaimName:  pvc-subpath
        ReadOnly:   false
      default-token-74s86:
        Type:        Secret (a volume populated by a Secret)
        SecretName:  default-token-74s86
        Optional:    false
    QoS Class:       BestEffort
    Node-Selectors:  <none>
    Tolerations:     node.kubernetes.io/not-ready:NoExecute for 300s
                     node.kubernetes.io/unreachable:NoExecute for 300s
    Events:
      Type     Reason   Age                    From                 Message
      ----     ------   ----                   ----                 -------
      Normal   Pulled   21m (x555 over 16d)    kubelet, k8s-master  Successfully pulled image "ubuntu"
      Normal   Created  21m (x555 over 16d)    kubelet, k8s-master  Created container ubuntu-subpath-container
      Normal   Started  21m (x555 over 16d)    kubelet, k8s-master  Started container ubuntu-subpath-container
      Normal   Pulling  6m10s (x562 over 16d)  kubelet, k8s-master  Pulling image "ubuntu"
      Warning  BackOff  71s (x11744 over 16d)  kubelet, k8s-master  Back-off restarting failed container

    現在來驗證下在宿主機存儲卷的目錄下是否有2個子目錄,1個是ubuntutest用來掛載容器1的,另外1個是nginxtest用來掛載容器2的

    [root@k8s-master /]# cd data/pod/volume5
    [root@k8s-master volume5]# ls
    nginxtest ubuntutest
    [root@k8s-master volume5]# cd nginxtest/     # 可以看到是1個目錄,非文件
    [root@k8s-master nginxtest]#

    進入到容器中,掛載一個文件,驗證是否可以同步到存儲卷

    [root@k8s-master nginxtest]# kubectl exec -it pod-subpath-zltest -c nginx-subpath-container -- bash
    root@pod-subpath-zltest:/# cd /var/www/nginx
    root@pod-subpath-zltest:/var/www/nginx# ls
    nginx-test-subpath.txt
    [root@k8s-master volume5]# cd nginxtest/
    [root@k8s-master nginxtest]# ls
    nginx-test-subpath.txt

    可以看到容器1的目錄/var/www/nginx 和存儲卷的子目錄 nginxtest完成了映射,容器2類似,這裏不再贅述。

    2、配置項-configMap

    1)創建configMap

    [root@k8s-master consecret]# cat conf-subpath.yaml 
    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: conf-subpath-zltest
      namespace: default
    data:
      example.property.1: hello      # key-value鍵值對
      example.property.2: world
      example.property.file: |-
        property.1=value-1
        property.2=value-2
        property.3=value-3

    2)在Pod中使用configMap

    [root@k8s-master consecret]# cat pod-conf-subpath.yaml 
    apiVersion: v1
    kind: Pod
    metadata:
      labels:
        purpose: test-configmap-volume
      name: pod-conf-testvolume
    spec:
      containers:
        - name: test-configmap-volume
          image: nginx
          volumeMounts:
            - name: config-volume
              mountPath: /etc/nginx/example.property.1       # 容器掛載目錄
              subPath: example.property.1                    # 將key名稱作為文件名,hello作為文件內容
      volumes:
        - name: config-volume
          configMap:
             name: conf-subpath-zltest      # 指定使用哪個CM
            
    [root@k8s-master consecret]# kubectl create -f pod-conf-subpath.yaml 
    [root@k8s-master consecret]# kubectl describe pod  pod-conf-testvolume 
    Name:         pod-conf-testvolume
    Namespace:    default
    Priority:     0
    Node:         k8s-master/192.168.126.129
    Start Time:   Wed, 03 Jun 2020 11:46:36 +0800
    Labels:       purpose=test-configmap-volume
    Annotations:  cni.projectcalico.org/podIP: 10.122.235.249/32
                  cni.projectcalico.org/podIPs: 10.122.235.249/32
    Status:       Running
    IP:           10.122.235.249
    IPs:
      IP:  10.122.235.249
    Containers:
      test-configmap-volume:
        Container ID:   docker://e2cf37cb24af32023eb5d22389545c3468104a4344c47363b5330addc40cb914
        Image:          nginx
        Image ID:       docker-pullable://nginx@sha256:883874c218a6c71640579ae54e6952398757ec65702f4c8ba7675655156fcca6
        Port:           <none>
        Host Port:      <none>
        State:          Running
          Started:      Wed, 03 Jun 2020 11:46:53 +0800
        Ready:          True
        Restart Count:  0
        Environment:    <none>
        Mounts:
          /etc/nginx/example.property.1 from config-volume (rw,path="example.property.1")  
          /var/run/secrets/kubernetes.io/serviceaccount from default-token-74s86 (ro)
    Conditions:
      Type              Status
      Initialized       True 
      Ready             True 
      ContainersReady   True 
      PodScheduled      True 
    Volumes:
      config-volume:
        Type:      ConfigMap (a volume populated by a ConfigMap)
        Name:      conf-subpath-zltest
        Optional:  false
      default-token-74s86:
        Type:        Secret (a volume populated by a Secret)
        SecretName:  default-token-74s86
        Optional:    false
    QoS Class:       BestEffort
    Node-Selectors:  <none>
    Tolerations:     node.kubernetes.io/not-ready:NoExecute for 300s
                     node.kubernetes.io/unreachable:NoExecute for 300s
    Events:          <none>

    在容器掛載路徑驗證下是否將configMap中example.property.1掛載在容器中,且是否會覆蓋掉原有的目錄

    root@pod-conf-testvolume:/# cd  /etc/nginx 
    root@pod-conf-testvolume:/etc/nginx# ls
    conf.d            fastcgi_params  koi-win    modules     scgi_params   win-utf
    example.property.1  koi-utf        mime.types    nginx.conf  uwsgi_params

    從上可以看到example.property.1已經掛載到容器中,且未對目錄原有的文件進行覆蓋

    root@pod-conf-testvolume:/etc/nginx# cd example.property.1 
    bash: cd: example.property.1: Not a directory
    root@pod-conf-testvolume:/etc/nginx# cat example.property.1 helloroot@pod-conf-testvolume:/etc/nginx# 

    從上可以驗證configMap的subpath用法支持將configMap中的每對key-value以key名稱作為文件名,value作為文件內容掛載到容器的目錄中。

    四、總結

    本文介紹了subpath分別在持久化存儲卷和配置項configMap中的使用,豐富了volume在pod中的使用場景。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※Google地圖已可更新顯示潭子電動車充電站設置地點!!

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※別再煩惱如何寫文案,掌握八大原則!

    網頁設計最專業,超強功能平台可客製化

  • 線上服務的FGC問題排查,看這篇就夠了!

    線上服務的FGC問題排查,看這篇就夠了!

    線上服務的GC問題,是Java程序非常典型的一類問題,非常考驗工程師排查問題的能力。同時,幾乎是面試必考題,但是能真正答好此題的人並不多,要麼原理沒吃透,要麼缺乏實戰經驗。

    過去半年時間里,我們的廣告系統出現了多次和GC相關的線上問題,有Full GC過於頻繁的,有Young GC耗時過長的,這些問題帶來的影響是:GC過程中的程序卡頓,進一步導致服務超時從而影響到廣告收入。

    這篇文章,我將以一個FGC頻繁的線上案例作為引子,詳細介紹下GC的排查過程,另外會結合GC的運行原理給出一份實踐指南,希望對你有所幫助。內容分成以下3個部分:

    1、從一次FGC頻繁的線上案例說起

    2、GC的運行原理介紹

    3、排查FGC問題的實踐指南

    01 從一次FGC頻繁的線上案例說起

    去年10月份,我們的廣告召回系統在程序上線后收到了FGC頻繁的系統告警,通過下面的監控圖可以看到:平均每35分鐘就進行了一次FGC。而程序上線前,我們的FGC頻次大概是2天一次。下面,詳細介紹下該問題的排查過程。

    1. 檢查JVM配置

    通過以下命令查看JVM的啟動參數:
    ps aux | grep “applicationName=adsearch”

    -Xms4g -Xmx4g -Xmn2g -Xss1024K
    -XX:ParallelGCThreads=5
    -XX:+UseConcMarkSweepGC
    -XX:+UseParNewGC
    -XX:+UseCMSCompactAtFullCollection
    -XX:CMSInitiatingOccupancyFraction=80

    可以看到堆內存為4G,新生代為2G,老年代也為2G,新生代採用ParNew收集器,老年代採用併發標記清除的CMS收集器,當老年代的內存佔用率達到80%時會進行FGC。

    進一步通過 jmap -heap 7276 | head -n20 可以得知新生代的Eden區為1.6G,S0和S1區均為0.2G。

    2. 觀察老年代的內存變化

    通過觀察老年代的使用情況,可以看到:每次FGC后,內存都能回到500M左右,因此我們排除了內存泄漏的情況。

    3. 通過jmap命令查看堆內存中的對象

    通過命令 jmap -histo 7276 | head -n20

    上圖中,按照對象所佔內存大小排序,显示了存活對象的實例數、所佔內存、類名。可以看到排名第一的是:int[],而且所佔內存大小遠遠超過其他存活對象。至此,我們將懷疑目標鎖定在了 int[] .

    4. 進一步dump堆內存文件進行分析

    鎖定 int[] 后,我們打算dump堆內存文件,通過可視化工具進一步跟蹤對象的來源。考慮堆轉儲過程中會暫停程序,因此我們先從服務管理平台摘掉了此節點,然後通過以下命令dump堆內存:

    jmap -dump:format=b,file=heap 7276

    通過JVisualVM工具導入dump出來的堆內存文件,同樣可以看到各個對象所佔空間,其中int[]佔到了50%以上的內存,進一步往下便可以找到 int[] 所屬的業務對象,發現它來自於架構團隊提供的codis基礎組件。

    5. 通過代碼分析可疑對象

    通過代碼分析,codis基礎組件每分鐘會生成約40M大小的int數組,用於統計TP99 和 TP90,數組的生命周期是一分鐘。而根據第2步觀察老年代的內存變化時,發現老年代的內存基本上也是每分鐘增加40多M,因此推斷:這40M的int數組應該是從新生代晉陞到老年代。

    我們進一步查看了YGC的頻次監控,通過下圖可以看到大概1分鐘有8次左右的YGC,這樣基本驗證了我們的推斷:因為CMS收集器默認的分代年齡是6次,即YGC 6次后還存活的對象就會晉陞到老年代,而codis組件中的大數組生命周期是1分鐘,剛好滿足這個要求。

    至此,整個排查過程基本結束了,那為什麼程序上線前沒出現此問題呢?通過上圖可以看到:程序上線前YGC的頻次在5次左右,此次上線后YGC頻次變成了8次左右,從而引發了此問題。

    6. 解決方案

    為了快速解決問題,我們將CMS收集器的分代年齡改成了15次,改完后FGC頻次恢復到了2天一次,後續如果YGC的頻次超過每分鐘15次還會再次觸發此問題。當然,我們最根本的解決方案是:優化程序以降低YGC的頻率,同時縮短codis組件中int數組的生命周期,這裏就不做展開了。

    02 GC的運行原理介紹

    上面整個案例的分析過程中,其實涉及到很多GC的原理知識,如果不懂得這些原理就着手處理,其實整個排查過程是很抓瞎的。

    這裏,我選擇幾個最核心的知識點,展開介紹下GC的運行原理,最後再給出一份實踐指南。

    1. 堆內存結構

    大家都知道: GC分為YGC和FGC,它們均發生在JVM的堆內存上。先來看下JDK8的堆內存結構:

    可以看到,堆內存採用了分代結構,包括新生代和老年代。新生代又分為:Eden區,From Survivor區(簡稱S0),To Survivor區(簡稱S1區),三者的默認比例為8:1:1。另外,新生代和老年代的默認比例為1:2。

    堆內存之所以採用分代結構,是考慮到絕大部分對象都是短生命周期的,這樣不同生命周期的對象可放在不同的區域中,然後針對新生代和老年代採用不同的垃圾回收算法,從而使得GC效率最高。

    2. YGC是什麼時候觸發的?

    大多數情況下,對象直接在年輕代中的Eden區進行分配,如果Eden區域沒有足夠的空間,那麼就會觸發YGC(Minor GC),YGC處理的區域只有新生代。因為大部分對象在短時間內都是可收回掉的,因此YGC后只有極少數的對象能存活下來,而被移動到S0區(採用的是複製算法)。

    當觸發下一次YGC時,會將Eden區和S0區的存活對象移動到S1區,同時清空Eden區和S0區。當再次觸發YGC時,這時候處理的區域就變成了Eden區和S1區(即S0和S1進行角色交換)。每經過一次YGC,存活對象的年齡就會加1。

    3. FGC又是什麼時候觸發的?

    下面4種情況,對象會進入到老年代中:

    1、YGC時,To Survivor區不足以存放存活的對象,對象會直接進入到老年代。

    2、經過多次YGC后,如果存活對象的年齡達到了設定閾值,則會晉陞到老年代中。

    3、動態年齡判定規則,To Survivor區中相同年齡的對象,如果其大小之和佔到了 To Survivor區一半以上的空間,那麼大於此年齡的對象會直接進入老年代,而不需要達到默認的分代年齡。

    4、大對象:由-XX:PretenureSizeThreshold啟動參數控制,若對象大小大於此值,就會繞過新生代, 直接在老年代中分配。

    當晉陞到老年代的對象大於了老年代的剩餘空間時,就會觸發FGC(Major GC),FGC處理的區域同時包括新生代和老年代。除此之外,還有以下4種情況也會觸發FGC:

    1、老年代的內存使用率達到了一定閾值(可通過參數調整),直接觸發FGC。

    2、空間分配擔保:在YGC之前,會先檢查老年代最大可用的連續空間是否大於新生代所有對象的總空間。如果小於,說明YGC是不安全的,則會查看參數 HandlePromotionFailure 是否被設置成了允許擔保失敗,如果不允許則直接觸發Full GC;如果允許,那麼會進一步檢查老年代最大可用的連續空間是否大於歷次晉陞到老年代對象的平均大小,如果小於也會觸發 Full GC。

    3、Metaspace(元空間)在空間不足時會進行擴容,當擴容到了-XX:MetaspaceSize 參數的指定值時,也會觸發FGC。

    4、System.gc() 或者Runtime.gc() 被顯式調用時,觸發FGC。

    4. 在什麼情況下,GC會對程序產生影響?

    不管YGC還是FGC,都會造成一定程度的程序卡頓(即Stop The World問題:GC線程開始工作,其他工作線程被掛起),即使採用ParNew、CMS或者G1這些更先進的垃圾回收算法,也只是在減少卡頓時間,而並不能完全消除卡頓。

    那到底什麼情況下,GC會對程序產生影響呢?根據嚴重程度從高到底,我認為包括以下4種情況:

    1、FGC過於頻繁:FGC通常是比較慢的,少則幾百毫秒,多則幾秒,正常情況FGC每隔幾個小時甚至幾天才執行一次,對系統的影響還能接受。但是,一旦出現FGC頻繁(比如幾十分鐘就會執行一次),這種肯定是存在問題的,它會導致工作線程頻繁被停止,讓系統看起來一直有卡頓現象,也會使得程序的整體性能變差。

    2、YGC耗時過長:一般來說,YGC的總耗時在幾十或者上百毫秒是比較正常的,雖然會引起系統卡頓幾毫秒或者幾十毫秒,這種情況幾乎對用戶無感知,對程序的影響可以忽略不計。但是如果YGC耗時達到了1秒甚至幾秒(都快趕上FGC的耗時了),那卡頓時間就會增大,加上YGC本身比較頻繁,就會導致比較多的服務超時問題。

    3、FGC耗時過長:FGC耗時增加,卡頓時間也會隨之增加,尤其對於高併發服務,可能導致FGC期間比較多的超時問題,可用性降低,這種也需要關注。

    4、YGC過於頻繁:即使YGC不會引起服務超時,但是YGC過於頻繁也會降低服務的整體性能,對於高併發服務也是需要關注的。

    其中,「FGC過於頻繁」和「YGC耗時過長」,這兩種情況屬於比較典型的GC問題,大概率會對程序的服務質量產生影響。剩餘兩種情況的嚴重程度低一些,但是對於高併發或者高可用的程序也需要關注。

    03 排查FGC問題的實踐指南

    通過上面的案例分析以及理論介紹,再總結下FGC問題的排查思路,作為一份實踐指南供大家參考。

    1. 清楚從程序角度,有哪些原因導致FGC?

    1、大對象:系統一次性加載了過多數據到內存中(比如SQL查詢未做分頁),導致大對象進入了老年代。

    2、內存泄漏:頻繁創建了大量對象,但是無法被回收(比如IO對象使用完后未調用close方法釋放資源),先引發FGC,最後導致OOM.

    3、程序頻繁生成一些長生命周期的對象,當這些對象的存活年齡超過分代年齡時便會進入老年代,最後引發FGC. (即本文中的案例)

    4、程序BUG導致動態生成了很多新類,使得 Metaspace 不斷被佔用,先引發FGC,最後導致OOM.

    5、代碼中顯式調用了gc方法,包括自己的代碼甚至框架中的代碼。

    6、JVM參數設置問題:包括總內存大小、新生代和老年代的大小、Eden區和S區的大小、元空間大小、垃圾回收算法等等。

    2. 清楚排查問題時能使用哪些工具

    1、公司的監控系統:大部分公司都會有,可全方位監控JVM的各項指標。

    2、JDK的自帶工具,包括jmap、jstat等常用命令:

    查看堆內存各區域的使用率以及GC情況
    jstat -gcutil -h20 pid 1000

    查看堆內存中的存活對象,並按空間排序
    jmap -histo pid | head -n20

    dump堆內存文件
    jmap -dump:format=b,file=heap pid

    3、可視化的堆內存分析工具:JVisualVM、MAT等

    3. 排查指南

    1、查看監控,以了解出現問題的時間點以及當前FGC的頻率(可對比正常情況看頻率是否正常)

    2、了解該時間點之前有沒有程序上線、基礎組件升級等情況。

    3、了解JVM的參數設置,包括:堆空間各個區域的大小設置,新生代和老年代分別採用了哪些垃圾收集器,然後分析JVM參數設置是否合理。

    4、再對步驟1中列出的可能原因做排除法,其中元空間被打滿、內存泄漏、代碼顯式調用gc方法比較容易排查。

    5、針對大對象或者長生命周期對象導致的FGC,可通過 jmap -histo 命令並結合dump堆內存文件作進一步分析,需要先定位到可疑對象。

    6、通過可疑對象定位到具體代碼再次分析,這時候要結合GC原理和JVM參數設置,弄清楚可疑對象是否滿足了進入到老年代的條件才能下結論。

    04 最後的話

    這篇文章通過線上案例並結合GC原理詳細介紹了FGC的排查過程,同時給出了一份實踐指南。

    後續會以類似的方式,再分享一個YGC耗時過長的案例,希望能幫助大家吃透GC問題排查,如果覺得本文對你有幫助,請大家關注我的個人公眾號!

    – End –

    作者簡介:程序員,985碩士,前亞馬遜Java工程師,現58轉轉技術總監。持續分享技術和管理方向的文章。如果感興趣,可微信掃描下面的二維碼關注我的公眾號:『IT人的職場進階』

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※帶您來了解什麼是 USB CONNECTOR  ?

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    ※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

    ※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※教你寫出一流的銷售文案?

  • 3dTiles 數據規範詳解[1] 介紹

    3dTiles 數據規範詳解[1] 介紹

    版權:轉載請帶原地址。https://www.cnblogs.com/onsummer/p/12799366.html @秋意正寒

    Web中的三維

    html5和webgl技術使得瀏覽器三維變成了可能。

    巧婦難為無米之炊,三維數據(三維模型)是三維可視化重要的一環,事實上就是:三維數據眾多,行業跨界廣。

    參考資料:http://www.bgteach.com/article/132

    three.js的各種加載器實現了大部分通用三維格式的加載,屏蔽了格式不同的數據結構差異。

    然而,這樣還是不能滿足日益增長的效果需求,比如場景一大,模型文件體積變大,解析所耗費的時間越來越長。

    webgl,包括所有gpu有關的圖形渲染編程,幾乎只認這樣的三維數據:頂點、頂點顏色、頂點法線、着色語言…

    所以,三維圖形界的通用格式:glTF應運而生,它面向終點,它按照圖形編程所需的格式來存儲數據,藉以二進制編碼提高傳輸速度。

    它不再使用面向對象的思維存儲三維模型、貼圖紋理,而是按顯卡的思維存儲,存的是頂點、法線、頂點顏色等最基礎的信息,只不過組織結構上進行了精心的設計。

    它面向終點,就意味着可編輯性差,因為渲染性能的提高犧牲了可編輯性,它不再像3ds、dae甚至是max、skp一樣容易編輯和轉換。

    事實上,大多數三維軟件提供了glTF格式的轉換,或多一步,或一步到位。

    地理真三維

    早年,地理的三維還處於地形三維上,即数字高程模型(DEM)提供地表的高度拉伸。柵格高程數據、等高線、不規則三角網等均是数字高程模型的具體案例。
    下圖是不規則三角網,也即所謂的三角面片(圖形渲染中很常見):

    隨着學科的融合、計算機技術和硬件的更新換代,使得有模型、有細節的真三維融入到GIS中成為了可能,或者說,計算機技術和硬件的升級,給GIS以更廣闊的視角觀察世界。

    cesium.js 號稱是 webgl 封裝的三維地理庫,是支持 gltf 模型的加載的。

    面對大規模精細三維數據的加載,還要照顧到GIS的各種坐標系統、分析計算,gltf這種單個模型的方案顯得力不從心。

    2016年,Cesium 團隊借鑒傳統2DGIS的地圖規範——WMTS,借鑒圖形學中的層次細節模型,打造出大規模的三維數據標準—— 3d-Tiles,中文譯名:三維瓦片。

    它在模型上利用了 gltf 渲染快的特點,對大規模的三維數據進行組織,包括層次細節模型、模型的屬性數據、模型的層級數據等。

    3dTiles的設計思想

    3dTiles 繼承了 gltf 的優點:貼合圖形渲染 API 的邏輯,討 GPU 喜愛,webgl 對其內部組織起來的三維模型數據,不需要轉換,可以直接渲染(glTF 的功勞)。

    關於 glTF 是如何嵌入到 3dTiles 中的,開篇不談,後續精講。

    我們區分一組概念:規範和實現。

    3dTiles 是一種規範,在規範的指導下,各種資源文件可以是獨立存在於硬盤中的目錄、文件,也可以以二進制形式寫入數據庫中。目前,3dTiles 的官方實現只有 “散列文件”,也就是文件、文件夾的形式存儲在硬盤中,有關如何存儲到數據庫中的討論,官方仍在進行中(截至發博客)。

    glTF 也是一種規範,它的數據文件不一定就是後綴名為 .gltf 的文件,也不一定只有一個文件(glTF 的文件還可以是二進制文件、紋理貼圖文件等,扯遠了哈)。
    在本文,會嚴格指明是數據還是數據標準,如果我說的是 “XXX文件(例如 Bird.glb 文件)” ,那就是在指特定的文件。

    3dTiles還有一個特點:那就是不記錄模型數據,只記錄各級“Tile”的邏輯關係,以及“Tile”自己的屬性信息。所謂的模型數據,是指三維模型的頂點、貼圖材質、法線、顏色等信息。邏輯關係是指,各級Tile是如何在空間中保持連續的,LOD是如何組織的。屬性信息就很簡單啦,門有門的生產商,窗戶有窗戶的使用年限等,往大了說,建築還有它自己的壽命、法人、施工單位等屬性信息。

    3dTiles的特點總結如下:

    • 三維模型使用了 glTF 規範,繼承它的渲染高性能
    • 除了嵌入的 glTF,3dTiles 自己 只記錄各級Tile的空間邏輯關係(如何構成整個3dtiles)和屬性信息,以及模型與屬性如何掛接在一起的信息

    我覺得你還是雲里霧裡的,下一節將展示3dTiles具體數據,說說3dTiles的組織結構,說說3dTiles中的”Tile”,也就是“三維瓦片數據”中的“瓦片”是什麼。

    3dTiles系列博客最終目錄:

    01 引入與博客目錄:3dTiles 數據規範詳解

    02 Tileset與Tile

    03 內嵌在瓦片文件中的兩大數據表

    04.1 B3dm 類型

    04.2 I3dm 類型

    04.3 Pnts 類型

    04.4 Cmpt 類型

    04.5 未發布的瓦片規範

    05 3dTiles強大的擴展能力

    06 優缺點

    07 與I3S比較

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※為什麼 USB CONNECTOR 是電子產業重要的元件?

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※台北網頁設計公司全省服務真心推薦

    ※想知道最厲害的網頁設計公司"嚨底家"!

    ※推薦評價好的iphone維修中心

  • Dart Memo for Android Developers

    Dart Memo for Android Developers

    Dart語言一些語法特點和編程規範.

    本文適合: 日常使用Kotlin, 突然想寫個Flutter程序的Android程序員.

    Dart語言

    完整的請看A tour of the Dart language

    • 創建對象可以不用new. -> 並且規範不讓用new, lint會報錯.
    • 聲明變量可以用var, 也可以用具體類型如String. 不變量用final, 常量用const.
    • 沒有訪問修飾符, 用_來表示私有: 文件級別.
    • 字符串可以用單引號'.
    • 語句結尾要用;.
    • 創建數組可以用: var list = [1, 2, 3];.
    • assert()常用來斷定開發時不可能會出現的情況.
    • 空測試操作符: ??.
    • 過濾操作符: where.
    • 兩個點..表示鏈式調用.
    • dynamic說明類型未指定.
    • 除了throw異常, 還可以throw別的東西, 比如字符串.

    函數

    • 函數返回值在函數最開頭, 可以不標. -> 但是規範會建議標註返回值.
    bool isNoble(int atomicNumber) {
      return _nobleGases[atomicNumber] != null;
    }
    
    • =>箭頭符號, 用來簡化一句話的方法.
    bool isNoble(int atomicNumber) => _nobleGases[atomicNumber] != null;
    

    構造函數

    • 構造函數{}表示帶名字, 參數可選, 若要必選加上@required.
    const Scrollbar({Key key, @required Widget child})
    
    • 構造函數名可以是ClassName或者ClassName.identifier.
    • 空構造函數體可以省略, 用;結尾就行:
    class Point {
      double x, y;
      Point(this.x, this.y);
    }
    

    這裡會初始化相應的變量, 也不用聲明具體的參數類型.

    • factory構造, 可以用來返回緩存實例, 或者返回類型的子類:
    factory Logger(String name) {
        return _cache.putIfAbsent(name, () => Logger._internal(name));
    }
    

    異步代碼

    Future<String> lookUpVersion() async => '1.0.0';
    
    Future checkVersion() async {
      var version = await lookUpVersion();
      // Do something with version
    }
    

    編程規範類

    完整的規範在這裏: Effective Dart.

    有一些Good和Bad的舉例, 這裏僅列出比較常用的幾項.

    • 文件名要蛇形命名: lowercase_with_underscores. 類名: UpperCamelCase.
    • 對自己程序的文件, 兩種import都可以(package開頭或者相對路徑), 但是要保持一致.
    • Flutter程序嵌套比較多, 要用結尾的,來幫助格式化.

    本文緣由

    年初的時候學了一陣子Flutter, 寫了各種大小demo. 結果隔了兩個月之後, 突然心血來潮想寫個小東西, 打開Android Studio, 首先發現創建Flutter程序的按鈕都不見了. (估計是Android Studio4.0升級之後Flutter的插件沒跟上).

    接着用命令行創建了工程, 打開之後稍微整理了一下心情, 然後就….懵逼了.

    突然不知道如何下手.
    宏觀的東西還記得, 要用什麼package, 基本常用的幾個Widget都是啥, 但是微觀的, 忘了函數和數組都是咋定義的了.
    這種懵逼的狀態令我很憤怒, 果然是上年紀了嗎, 無縫切換個語言都不行.

    於是就想着還是寫個備忘錄吧.

    References

    • A tour of the Dart language
    • Effective Dart

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

  • 【asp.net core 系列】9 實戰之 UnitOfWork以及自定義代碼生成

    【asp.net core 系列】9 實戰之 UnitOfWork以及自定義代碼生成

    0. 前言

    在前一篇中我們創建了一個基於EF的數據查詢接口實現基類,這一篇我將帶領大家講一下為這EF補充一些功能,並且提供一個解決避免寫大量配置類的方案。

    1. SaveChanges的外移

    在之前介紹EF Core的時候,我們提到過使用EF需要在每次使用之後,調用一次SaveChanges將數據提交給數據庫。在實際開發中,我們不能添加一條數據或者做一次修改就調用一次SaveChanges,這完全不現實。因為每次調用SaveChanges是EF向數據庫提交變更的時候,所以EF推薦的是每次執行完用戶的請求之後統一提交數據給數據庫。

    這樣就會造成一個問題,可能也不是問題:我們需要一個接口來管理EF 的SaveChanges操作。

    1.1 創建一個IUnitOfWork接口

    通常我們會在Domain項目中添加一個IUnitOfWork接口,這個接口有一個方法就是SaveChanges,代碼如下:

    namespace Domain.Insfrastructure
    {
        public interface IUnitOfWork
        {
            void SaveChanges();
        }
    }
    

    這個方法的意思表示到執行該方法的時候,一個完整的工作流程執行完成了。也就是說,當執行該方法后,當前請求不會再與數據庫發生連接。

    1.2 實現IUnitOfWork接口

    在 Domain.Implement中添加IUnitOfWork實現類:

    using Domain.Insfrastructure;
    using Microsoft.EntityFrameworkCore;
    
    namespace Domain.Implements.Insfrastructure
    {
        public class UnitOfWork: IUnitOfWork
        {
            private DbContext DbContext;
            public UnitOfWork(DbContext context)
            {
                DbContext = context;
            }
    
            public void SaveChanges()
            {
                DbContext.SaveChanges();
            }
        }
    }
    

    1.3 調用時機

    到現在我們已經創建了一個UnitOfWork的方法,那麼問題來了,我們該在什麼時候調用呢,或者說如何調用呢?

    我的建議是創建一個ActionFilter,針對所有的控制器進行SaveChanges進行處理。當然了,也可以在控制器中持有一個IUnitOfWork的示例,然後在Action結束的時候,執行SaveChanges。不過這樣存在一個問題,可能會存在遺漏的方法。所以我推薦這樣操作,這裏簡單演示一下如何創建攔截器:

    在Web的根目錄下,創建一個Filters目錄,這個目錄里用來存儲一些過濾器,創建我們需要的過濾器:

    using Domain.Insfrastructure;
    using Microsoft.AspNetCore.Mvc.Filters;
    
    namespace Web.Filters
    {
        public class UnitOfWorkFilterAttribute : ActionFilterAttribute
        {
            public IUnitOfWork UnitOfWork;
    
            public override void OnActionExecuted(ActionExecutedContext context)
            {
                UnitOfWork.SaveChanges();
            }
        }
    }
    

    使用一個ActionFilter可以很方便的解決一些容易遺漏但又必須執行的代碼。這裏就先不介紹如何配置Filter的啟用和詳細介紹了,請允許我賣個關子。當然了,有些小夥伴肯定也能猜到這是一個Attribute類,所以可以按照Attribute給Controller打標記。

    2. 創建一個簡單的代碼生成方法

    之前在介紹EF的時候,有個小夥伴跟我說,還要寫配置文件啊,太麻煩了。是的,之前我介紹了很多關於寫配置文件不使用特性的好處,但不解決這個問題就無法真正體檢配置類的好處。

    雖然說,EF Core約定優先,但是如果默認約定的話,得在DBContext中聲明 DbSet<T> 來聲明這個字段,實體類少的話,比較簡單。如果多個數據表的話,就會非常麻煩。

    所以這時候就要使用工具類, 那麼簡單的分析一下,這個工具類需要有哪些功能:

    • 第一步,找到實體類並解析出實體類的類名
    • 第二步,生成配置文件
    • 第三步,創建對應的Repository接口和實現類

    很簡單的三步,但是難點就是找實體類並解析出實體類名。

    在Util項目中添加一個Develop目錄,並創建Develop類:

    namespace Utils.Develop
    {
        public class Develop
        {
            
        }
    }
    

    定位當前類所在目錄,通過

    Directory.GetCurrentDirectory()
    

    這個方法可以獲取當前執行的DLL所在目錄,當然不同的編譯器在執行的時候,會有微妙的不同。所以我們需要以此為根據然後獲取項目的根目錄,一個簡單的方法,查找*.sln 所在目錄:

    public static string CurrentDirect
    {
        get
        {
            var execute = Directory.GetCurrentDirectory();
            var parent = Directory.GetParent(execute);
            while(parent.GetFiles("*.sln",SearchOption.TopDirectoryOnly).Length == 0)
            {
                parent = parent.Parent;
                if(parent == null)
                {
                    return null;
                }
            }
            return parent.FullName;
        }
    }
    

    2.1 獲取實體類

    那麼獲取到根目錄之後,我們下一步就是獲取實體類。因為我們的實體類都要求是繼承BaseEntity或者命名空間都是位於Data.Models下面。當然這個名稱都是根據實際業務場景約束的,這裏只是以當前項目舉例。那麼,我們可以通過以下方法找到我們設置的實體類:

    public static Type[] LoadEntities()
    {
        var assembly = Assembly.Load("Data");
        var allTypes = assembly.GetTypes();
        var ofNamespace = allTypes.Where(t => t.Namespace == "Data.Models" || t.Namespace.StartsWith("Data.Models."));
        var subTypes = allTypes.Where(t => t.BaseType.Name == "BaseEntity`1");
        return ofNamespace.Union(subTypes).ToArray();
    }
    

    通過 Assembly加載Data的程序集,然後選擇出符合我們要求的實體類。

    2.2 編寫Repository接口

    我們先約定Model的Repository接口定義在 Domain/Repository目錄下,所以它們的命名空間應該是:

    namespace Domain.Repository	
    {
    }
    

    假設目錄情況與Data/Models下面的代碼結構保持一致,然後生成代碼應該如下:

    public static void CreateRepositoryInterface(Type type)
    {
        var targetNamespace = type.Namespace.Replace("Data.Models", "");
        if (targetNamespace.StartsWith("."))
        {
            targetNamespace = targetNamespace.Remove(0);
        }
        var targetDir = Path.Combine(new[]{CurrentDirect,"Domain", "Repository"}.Concat(
            targetNamespace.Split('.')).ToArray());
        if (!Directory.Exists(targetDir))
        {
            Directory.CreateDirectory(targetDir);
        }
    
        var baseName = type.Name.Replace("Entity","");
    
        if (!string.IsNullOrEmpty(targetNamespace))
        {
            targetNamespace = $".{targetNamespace}";
        }
        var file = $"using {type.Namespace};\r\n"
            + $"using Domain.Insfrastructure;\r\n"
            + $"namespace Domain.Repository{targetNamespace}\r\n"
            + "{\r\n"
            + $"\tpublic interface I{baseName}ModifyRepository : IModifyRepository<{type.Name}>\r\n" +
            "\t{\r\n\t}\r\n"
            + $"\tpublic interface I{baseName}SearchRepository : ISearchRepository<{type.Name}>\r\n" +
            "\t{\r\n\t}\r\n}";
    
        File.WriteAllText(Path.Combine(targetDir, $"{baseName}Repository.cs"), file);
    }
    
    

    2.3 編寫Repository的實現類

    因為我們提供了一個基類,所以我們在生成方法的時候,推薦繼承這個類,那麼實現方法應該如下:

    public static void CreateRepositoryImplement(Type type)
    {
        var targetNamespace = type.Namespace.Replace("Data.Models", "");
        if (targetNamespace.StartsWith("."))
        {
            targetNamespace = targetNamespace.Remove(0);
        }
    
        var targetDir = Path.Combine(new[] {CurrentDirect, "Domain.Implements", "Repository"}.Concat(
            targetNamespace.Split('.')).ToArray());
        if (!Directory.Exists(targetDir))
        {
            Directory.CreateDirectory(targetDir);
        }
        var baseName = type.Name.Replace("Entity", "");
        if (!string.IsNullOrEmpty(targetNamespace))
        {
            targetNamespace = $".{targetNamespace}";
        }
    
        var file = $"using {type.Namespace};" +
            $"\r\nusing Domain.Implements.Insfrastructure;" +
            $"\r\nusing Domain.Repository{targetNamespace};" +
            $"\r\nusing Microsoft.EntityFrameworkCore;" +
            $"namespace Domain.Implements.Repository{targetNamespace}\r\n" +
            "{" +
            $"\r\n\tpublic class {baseName}Repository :BaseRepository<{type.Name}> ,I{baseName}ModifyRepository,I{baseName}SearchRepository " +
            "\r\n\t{" +
            $"\r\n\t\tpublic {baseName}Repository(DbContext context) : base(context)"+
            "\r\n\t\t{"+
            "\r\n\t\t}\r\n"+
            "\t}\r\n}";
        File.WriteAllText(Path.Combine(targetDir, $"{baseName}Repository.cs"), file);
    }
    

    2.4 配置文件的生成

    仔細觀察一下代碼,可以發現整體都是十分簡單的。所以這篇就不掩飾如何生成配置文件了,小夥伴們可以自行嘗試一下哦。具體實現可以等一下篇哦。

    3. 總結

    這一篇初略的介紹了兩個用來輔助EF Core實現的方法或類,這在開發中很重要。UnitOfWork用來確保一次請求一個工作流程,簡單的代碼生成類讓我們能讓我們忽略那些繁重的創建同類代碼的工作。

    更多內容煩請關注我的博客《高先生小屋》

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※Google地圖已可更新顯示潭子電動車充電站設置地點!!

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※別再煩惱如何寫文案,掌握八大原則!

    網頁設計最專業,超強功能平台可客製化

  • 曹工說Redis源碼(8)–面試時,redis 內存淘汰總被問,但是總答不好

    曹工說Redis源碼(8)–面試時,redis 內存淘汰總被問,但是總答不好

    文章導航

    Redis源碼系列的初衷,是幫助我們更好地理解Redis,更懂Redis,而怎麼才能懂,光看是不夠的,建議跟着下面的這一篇,把環境搭建起來,後續可以自己閱讀源碼,或者跟着我這邊一起閱讀。由於我用c也是好幾年以前了,些許錯誤在所難免,希望讀者能不吝指出。

    曹工說Redis源碼(1)– redis debug環境搭建,使用clion,達到和調試java一樣的效果

    曹工說Redis源碼(2)– redis server 啟動過程解析及簡單c語言基礎知識補充

    曹工說Redis源碼(3)– redis server 啟動過程完整解析(中)

    曹工說Redis源碼(4)– 通過redis server源碼來理解 listen 函數中的 backlog 參數

    曹工說Redis源碼(5)– redis server 啟動過程解析,以及EventLoop每次處理事件前的前置工作解析(下)

    曹工說Redis源碼(6)– redis server 主循環大體流程解析

    曹工說Redis源碼(7)– redis server 的周期執行任務,到底要做些啥

    什麼是內存淘汰

    內存淘汰,和平時我們設置redis key的過期時間,不是一回事;內存淘汰是說,假設我們限定redis只能使用8g內存,現在已經使用了這麼多了(包括設置了過期時間的key和沒設過期時間的key),那,後續的set操作,還怎麼辦呢?

    是不是只能報錯了?

    那不行啊,不科學吧,因為有的key,可能已經很久沒人用了,可能以後也不會再用到了,那我們是不是可以把這類key給幹掉呢?

    幹掉key的過程,就是內存淘汰。

    內存淘汰什麼時候啟用

    當我們在配置文件里設置了如下屬性時:

    # maxmemory <bytes>
    

    默認,該屬性是被註釋掉的。

    其實,這個配置項的註釋,相當有價值,我們來看看:

    # Don't use more memory than the specified amount of bytes.
    # When the memory limit is reached Redis will try to remove keys
    # according to the eviction policy selected (see maxmemory-policy).
    #
    # If Redis can't remove keys according to the policy, or if the policy is
    # set to 'noeviction', Redis will start to reply with errors to commands
    # that would use more memory, like SET, LPUSH, and so on, and will continue
    # to reply to read-only commands like GET.
    #
    # This option is usually useful when using Redis as an LRU cache, or to set
    # a hard memory limit for an instance (using the 'noeviction' policy).
    #
    # WARNING: If you have slaves attached to an instance with maxmemory on,
    # the size of the output buffers needed to feed the slaves are subtracted
    # from the used memory count, so that network problems / resyncs will
    # not trigger a loop where keys are evicted, and in turn the output
    # buffer of slaves is full with DELs of keys evicted triggering the deletion
    # of more keys, and so forth until the database is completely emptied.
    #
    # In short... if you have slaves attached it is suggested that you set a lower
    # limit for maxmemory so that there is some free RAM on the system for slave
    # output buffers (but this is not needed if the policy is 'noeviction').
    #
    # maxmemory <bytes>
    

    渣翻譯如下:

    不能使用超過指定數量bytes的內存。當該內存限制被達到時,redis會根據過期策略(eviction policy,通過參數 maxmemory-policy來指定)來驅逐key。

    如果redis根據指定的策略,或者策略被設置為“noeviction”,redis會開始針對如下這種命令,回復錯誤。什麼命令呢?會使用更多內存的那類命令,比如set、lpush;只讀命令還是不受影響,可以正常響應。

    該選項通常在redis使用LRU緩存時有用,或者在使用noeviction策略時,設置一個進程級別的內存limit。

    內存淘汰策略

    所謂策略,意思是,當我們要刪除部分key的時候,刪哪些,不刪哪些?是不是需要一個策略?比如是隨機刪,就像滅霸一樣?還是按照lru時間來刪,lru的策略意思就是,最近最少使用的key,將被優先刪除。

    總之,我們需要定一個規則。

    redis默認支持以下策略:

    # MAXMEMORY POLICY: how Redis will select what to remove when maxmemory
    # is reached. You can select among five behaviors:
    # 
    # volatile-lru -> remove the key with an expire set using an LRU algorithm
    # allkeys-lru -> remove any key accordingly to the LRU algorithm
    # volatile-random -> remove a random key with an expire set
    # allkeys-random -> remove a random key, any key
    # volatile-ttl -> remove the key with the nearest expire time (minor TTL)
    # noeviction -> don't expire at all, just return an error on write operations
    # 
    # Note: with any of the above policies, Redis will return an error on write
    #       operations, when there are not suitable keys for eviction.
    #
    #       At the date of writing this commands are: set setnx setex append
    #       incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd
    #       sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby
    #       zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby
    #       getset mset msetnx exec sort
    #
    # The default is:
    #
    # maxmemory-policy noeviction
    maxmemory-policy allkeys-lru
    
    針對設置了過期時間的,使用lru算法
    # volatile-lru -> remove the key with an expire set using an LRU algorithm
    
    針對全部key,使用lru算法
    # allkeys-lru -> remove any key accordingly to the LRU algorithm
    
    針對設置了過期時間的,隨機刪
    # volatile-random -> remove a random key with an expire set
    
    針對全部key,隨機刪
    # allkeys-random -> remove a random key, any key
    
    針對設置了過期時間的,馬上要過期的,刪掉
    # volatile-ttl -> remove the key with the nearest expire time (minor TTL)
    
    不過期,不能寫了,就報錯
    # noeviction -> don't expire at all, just return an error on write operations
    

    一般呢,我們會設置為:

    allkeys-lru,即,針對全部key,進行lru。

    源碼實現

    配置讀取

    在如下結構體中,定義了如下字段:

    struct redisServer {
    	...
    	unsigned long long maxmemory;   /* Max number of memory bytes to use */
        int maxmemory_policy;           /* Policy for key eviction */
        int maxmemory_samples;          /* Pricision of random sampling */
        ...
    }
    

    當我們在配置文件中,進入如下配置時,該結構體中幾個字段的值如下:

    maxmemory 3mb
    maxmemory-policy allkeys-lru
    # maxmemory-samples 5  這個取了默認值
    

    maxmemory_policy為3,是因為枚舉值為3:

    #define REDIS_MAXMEMORY_VOLATILE_LRU 0
    #define REDIS_MAXMEMORY_VOLATILE_TTL 1
    #define REDIS_MAXMEMORY_VOLATILE_RANDOM 2
    #define REDIS_MAXMEMORY_ALLKEYS_LRU 3
    #define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4
    #define REDIS_MAXMEMORY_NO_EVICTION 5
    #define REDIS_DEFAULT_MAXMEMORY_POLICY REDIS_MAXMEMORY_NO_EVICTION
    

    處理命令時,判斷是否進行內存淘汰

    在處理命令的時候,會調用中的

    redis.c  processCommand
        
    int processCommand(redisClient *c) {
        /* The QUIT command is handled separately. Normal command procs will
         * go through checking for replication and QUIT will cause trouble
         * when FORCE_REPLICATION is enabled and would be implemented in
         * a regular command proc. */
        // 特別處理 quit 命令
        void *commandName = c->argv[0]->ptr;
        redisLog(REDIS_NOTICE, "The server is now processing %s", commandName);
    
        if (!strcasecmp(c->argv[0]->ptr, "quit")) {
            addReply(c, shared.ok);
            c->flags |= REDIS_CLOSE_AFTER_REPLY;
            return REDIS_ERR;
        }
    
        /* Now lookup the command and check ASAP about trivial error conditions
         * such as wrong arity, bad command name and so forth. */
        // 1 查找命令,並進行命令合法性檢查,以及命令參數個數檢查
        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
        if (!c->cmd) {
            // 沒找到指定的命令
            flagTransaction(c);
            addReplyErrorFormat(c, "unknown command '%s'",
                                (char *) c->argv[0]->ptr);
            return REDIS_OK;
        }
    
        /* Check if the user is authenticated */
        //2 檢查認證信息
        if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) {
            flagTransaction(c);
            addReply(c, shared.noautherr);
            return REDIS_OK;
        }
    
        /* If cluster is enabled perform the cluster redirection here.
         *
         * 3 如果開啟了集群模式,那麼在這裏進行轉向操作。
         *
         * However we don't perform the redirection if:
         *
         * 不過,如果有以下情況出現,那麼節點不進行轉向:
         *
         * 1) The sender of this command is our master.
         *    命令的發送者是本節點的主節點
         *
         * 2) The command has no key arguments. 
         *    命令沒有 key 參數
         */
        if (server.cluster_enabled &&
            !(c->flags & REDIS_MASTER) &&
            !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) {
            int hashslot;
    
            // 集群已下線
            if (server.cluster->state != REDIS_CLUSTER_OK) {
                flagTransaction(c);
                addReplySds(c, sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
                return REDIS_OK;
    
                // 集群運作正常
            } else {
                int error_code;
                clusterNode *n = getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code);
                // 不能執行多鍵處理命令
                if (n == NULL) {
                    flagTransaction(c);
                    if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
                        addReplySds(c, sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
                    } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
                        /* The request spawns mutliple keys in the same slot,
                         * but the slot is not "stable" currently as there is
                         * a migration or import in progress. */
                        addReplySds(c, sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
                    } else {
                        redisPanic("getNodeByQuery() unknown error.");
                    }
                    return REDIS_OK;
    
                    //3.1 命令針對的槽和鍵不是本節點處理的,進行轉向
                } else if (n != server.cluster->myself) {
                    flagTransaction(c);
                    // -<ASK or MOVED> <slot> <ip>:<port>
                    // 例如 -ASK 10086 127.0.0.1:12345
                    addReplySds(c, sdscatprintf(sdsempty(),
                                                "-%s %d %s:%d\r\n",
                                                (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
                                                hashslot, n->ip, n->port));
    
                    return REDIS_OK;
                }
    
                // 如果執行到這裏,說明鍵 key 所在的槽由本節點處理
                // 或者客戶端執行的是無參數命令
            }
        }
    
        /* Handle the maxmemory directive.
         *
         * First we try to free some memory if possible (if there are volatile
         * keys in the dataset). If there are not the only thing we can do
         * is returning an error. */
        //4 如果設置了最大內存,那麼檢查內存是否超過限制,並做相應的操作
        if (server.maxmemory) {
            //4.1 如果內存已超過限制,那麼嘗試通過刪除過期鍵來釋放內存
            int retval = freeMemoryIfNeeded();
            // 如果即將要執行的命令可能佔用大量內存(REDIS_CMD_DENYOOM)
            // 並且前面的內存釋放失敗的話
            // 那麼向客戶端返回內存錯誤
            if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
                flagTransaction(c);
                addReply(c, shared.oomerr);
                return REDIS_OK;
            }
        }    
        ....
    
    • 1處,查找命令,對應的函數指針(類似於java里的策略模式,根據命令,找對應的策略)
    • 2處,檢查,是否密碼正確
    • 3處,集群相關操作;
    • 3.1處,不是本節點處理,直接返回ask,指示客戶端轉向
    • 4處,判斷是否設置了maxMemory,這裏就是本文重點:設置了maxMemory時,內存淘汰策略
    • 4.1處,調用了下方的 freeMemoryIfNeeded

    接下來,深入4.1處:

    
    int freeMemoryIfNeeded(void) {
        size_t mem_used, mem_tofree, mem_freed;
        int slaves = listLength(server.slaves);
    
        /* Remove the size of slaves output buffers and AOF buffer from the
         * count of used memory. */
        // 計算出 Redis 目前佔用的內存總數,但有兩個方面的內存不會計算在內:
        // 1)從服務器的輸出緩衝區的內存
        // 2)AOF 緩衝區的內存
        mem_used = zmalloc_used_memory();
        if (slaves) {
    		...
        }
        if (server.aof_state != REDIS_AOF_OFF) {
            mem_used -= sdslen(server.aof_buf);
            mem_used -= aofRewriteBufferSize();
        }
    
        /* Check if we are over the memory limit. */
        //1 如果目前使用的內存大小比設置的 maxmemory 要小,那麼無須執行進一步操作
        if (mem_used <= server.maxmemory) return REDIS_OK;
    
        //2 如果佔用內存比 maxmemory 要大,但是 maxmemory 策略為不淘汰,那麼直接返回
        if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION)
            return REDIS_ERR; /* We need to free memory, but policy forbids. */
    
        /* Compute how much memory we need to free. */
        // 3 計算需要釋放多少字節的內存
        mem_tofree = mem_used - server.maxmemory;
    
        // 初始化已釋放內存的字節數為 0
        mem_freed = 0;
    
        // 根據 maxmemory 策略,
        //4 遍歷字典,釋放內存並記錄被釋放內存的字節數
        while (mem_freed < mem_tofree) {
            int j, k, keys_freed = 0;
    
            // 遍歷所有字典
            for (j = 0; j < server.dbnum; j++) {
                long bestval = 0; /* just to prevent warning */
                sds bestkey = NULL;
                dictEntry *de;
                redisDb *db = server.db + j;
                dict *dict;
    
                if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
                    server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM) {
                    // 如果策略是 allkeys-lru 或者 allkeys-random 
                    //5 那麼淘汰的目標為所有數據庫鍵
                    dict = server.db[j].dict;
                } else {
                    // 如果策略是 volatile-lru 、 volatile-random 或者 volatile-ttl 
                    //6 那麼淘汰的目標為帶過期時間的數據庫鍵
                    dict = server.db[j].expires;
                }
    
    
                /* volatile-random and allkeys-random policy */
                // 如果使用的是隨機策略,那麼從目標字典中隨機選出鍵
                if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM ||
                    server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_RANDOM) {
                    de = dictGetRandomKey(dict);
                    bestkey = dictGetKey(de);
                }
                /* volatile-lru and allkeys-lru policy */
                //7 
                else if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
                         server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU) {
                    struct evictionPoolEntry *pool = db->eviction_pool;
    
                    while (bestkey == NULL) {
                        // 8 
                        evictionPoolPopulate(dict, db->dict, db->eviction_pool);
                        /* Go backward from best to worst element to evict. */
                        for (k = REDIS_EVICTION_POOL_SIZE - 1; k >= 0; k--) {
                            if (pool[k].key == NULL) continue;
                            // 8.1
                            de = dictFind(dict, pool[k].key);
    
                            /* 8.2 Remove the entry from the pool. */
                            sdsfree(pool[k].key);
                            /* Shift all elements on its right to left. */
                            memmove(pool + k, pool + k + 1,
                                    sizeof(pool[0]) * (REDIS_EVICTION_POOL_SIZE - k - 1));
                            /* Clear the element on the right which is empty
                             * since we shifted one position to the left.  */
                            pool[REDIS_EVICTION_POOL_SIZE - 1].key = NULL;
                            pool[REDIS_EVICTION_POOL_SIZE - 1].idle = 0;
    
                            /* If the key exists, is our pick. Otherwise it is
                             * a ghost and we need to try the next element. */
                            // 8.3
                            if (de) {
                                bestkey = dictGetKey(de);
                                break;
                            } else {
                                /* Ghost... */
                                continue;
                            }
                        }
                    }
                }
    
                    /* volatile-ttl */
                    // 策略為 volatile-ttl ,從一集 sample 鍵中選出過期時間距離當前時間最接近的鍵
                else if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_TTL) {
                    ...
                }
    
                /* Finally remove the selected key. */
                // 8.4 刪除被選中的鍵
                if (bestkey) {
                    long long delta;
    
                    robj *keyobj = createStringObject(bestkey, sdslen(bestkey));
                    propagateExpire(db, keyobj);
                    /* We compute the amount of memory freed by dbDelete() alone.
                     * It is possible that actually the memory needed to propagate
                     * the DEL in AOF and replication link is greater than the one
                     * we are freeing removing the key, but we can't account for
                     * that otherwise we would never exit the loop.
                     *
                     * AOF and Output buffer memory will be freed eventually so
                     * we only care about memory used by the key space. */
                    // 計算刪除鍵所釋放的內存數量
                    delta = (long long) zmalloc_used_memory();
                    dbDelete(db, keyobj);
                    delta -= (long long) zmalloc_used_memory();
                    mem_freed += delta;
    
                    // 對淘汰鍵的計數器增一
                    server.stat_evictedkeys++;
    
                    notifyKeyspaceEvent(REDIS_NOTIFY_EVICTED, "evicted",
                                        keyobj, db->id);
                    decrRefCount(keyobj);
                    keys_freed++;
    				...
                }
            }
    
            if (!keys_freed) return REDIS_ERR; /* nothing to free... */
        }
    
        return REDIS_OK;
    }
    
    • 1處,如果目前使用的內存大小比設置的 maxmemory 要小,那麼無須執行進一步操作

    • 2處,如果佔用內存比 maxmemory 要大,但是 maxmemory 策略為不淘汰,那麼直接返回

    • 3處,計算需要釋放多少字節的內存

    • 4處,遍歷字典,釋放內存並記錄被釋放內存的字節數

    • 5處,如果策略是 allkeys-lru 或者 allkeys-random 那麼淘汰的目標為所有數據庫鍵

    • 6處,如果策略是 volatile-lru 、 volatile-random 或者 volatile-ttl ,那麼淘汰的目標為帶過期時間的數據庫鍵

    • 7處,如果使用的是 LRU 策略, 那麼從 sample 鍵中選出 IDLE 時間最長的那個鍵

    • 8處,調用evictionPoolPopulate,該函數在下面講解,該函數的功能是,傳入一個鏈表,即這裏的db->eviction_pool,然後在函數內部,隨機找出n個key,放入傳入的鏈表中,並按照空閑時間排序,空閑最久的,放到最後。

      當該函數,返回后,db->eviction_pool這個鏈表裡就存放了我們要淘汰的key。

    • 8.1處,找到這個key,這個key,在後邊會被刪除

    • 8.2處,下面這一段,從db->eviction_pool將這個已經處理了的key刪掉

    • 8.3處,如果這個key,是存在的,則跳出循環,在後面8.4處,會被刪除

    • 8.4處,刪除這個key

    選擇哪些key作為被淘汰的key

    前面我們看到,在7處,如果為lru策略,則會進入8處的函數:

    evictionPoolPopulate。

    該函數的名稱為:填充(populate)驅逐(eviction)對象池(pool)。驅逐的意思,就是現在達到了maxmemory,沒辦法,只能開始刪除掉一部分元素,來騰空間了,不然新的put類型的命令,根本沒辦法執行。

    該方法的大概思路是,使用lru的時候,隨機找n個key,類似於抽樣,然後放到一個鏈表,根據空閑時間排序。

    具體看看該方法的實現:

    void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
    

    其中,傳入的第三個參數,是要被填充的對象,在c語言中,習慣傳入一個入參,然後在函數內部填充或者修改入參對象的屬性。

    該屬性,就是前面說的那個鏈表,用來存放收集的隨機的元素,該鏈表中節點的結構如下:

    struct evictionPoolEntry {
        unsigned long long idle;    /* Object idle time. */
        sds key;                    /* Key name. */
    };
    

    該結構共2個字段,一個存儲key,一個存儲空閑時間。

    該鏈表中,共maxmemory-samples個元素,會按照idle時間長短排序,idle時間長的在鏈表尾部,(假設頭在左,尾在右)。

    void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
        int j, k, count;
        dictEntry *_samples[EVICTION_SAMPLES_ARRAY_SIZE];
        dictEntry **samples;
    
        /* Try to use a static buffer: this function is a big hit...
         * Note: it was actually measured that this helps. */
        if (server.maxmemory_samples <= EVICTION_SAMPLES_ARRAY_SIZE) {
            samples = _samples;
        } else {
            samples = zmalloc(sizeof(samples[0]) * server.maxmemory_samples);
        }
    
        /* 1 Use bulk get by default. */
        count = dictGetRandomKeys(sampledict, samples, server.maxmemory_samples);
    
    	// 2
        for (j = 0; j < count; j++) {
            unsigned long long idle;
            sds key;
            robj *o;
            dictEntry *de;
    
            de = samples[j];
            key = dictGetKey(de);
            /* If the dictionary we are sampling from is not the main
             * dictionary (but the expires one) we need to lookup the key
             * again in the key dictionary to obtain the value object. */
            if (sampledict != keydict) de = dictFind(keydict, key);
            // 3
            o = dictGetVal(de);
            // 4
            idle = estimateObjectIdleTime(o);
    
            /* 5  Insert the element inside the pool.
             * First, find the first empty bucket or the first populated
             * bucket that has an idle time smaller than our idle time. */
            k = 0;
            while (k < REDIS_EVICTION_POOL_SIZE &&
                   pool[k].key &&
                   pool[k].idle < idle)
                k++;
            
    		...
                
            // 6
            pool[k].key = sdsdup(key);
            pool[k].idle = idle;
        }
        if (samples != _samples) zfree(samples);
    }
    
    • 1處,獲取 server.maxmemory_samples個key,這裡是隨機獲取的,(dictGetRandomKeys),這個值,默認值為5,放到samples中

    • 2處,遍歷返回來的samples

    • 3處,調用如下宏,獲取val

      he的類型為dictEntry:

      /*
       * 哈希表節點
       */
      typedef struct dictEntry {
          
          // 鍵
          void *key;
      
          // 值
          union {
              // 1
              void *val;
              uint64_t u64;
              int64_t s64;
          } v;
      
          // 指向下個哈希表節點,形成鏈表
          struct dictEntry *next;
      
      } dictEntry;
      

      所以,這裏去

      robj *o; 
      
      o = dictGetVal(de);
      

      實際就是獲取其v屬性中的val,(1處):

      #define dictGetVal(he) ((he)->v.val)
      
    • 4處,準備計算該val的空閑時間

      我們上面3處,看到,獲取的o的類型為robj。我們現在看看怎麼計算對象的空閑時長:

      /* Given an object returns the min number of milliseconds the object was never
       * requested, using an approximated LRU algorithm. */
      unsigned long long estimateObjectIdleTime(robj *o) {
          //4.1 獲取系統的當前時間
          unsigned long long lruclock = LRU_CLOCK();
          // 4.2
          if (lruclock >= o->lru) {
              // 4.3
              return (lruclock - o->lru) * REDIS_LRU_CLOCK_RESOLUTION;
          } else {
              return (lruclock + (REDIS_LRU_CLOCK_MAX - o->lru)) *
                          REDIS_LRU_CLOCK_RESOLUTION;
          }
      }
      

      這裏,4.1處,獲取系統的當前時間;

      4.2處,如果系統時間,大於對象的lru時間

      4.3處,則用系統時間減去對象的lru時間,再乘以單位,換算為毫秒,最終返回的單位,為毫秒(可以看註釋。)

      #define REDIS_LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */
      
    • 5處,這裏拿當前元素,和pool中已經放進去的元素,從第0個開始比較,如果當前元素的idle時長,大於pool中指針0指向的元素,則和pool中索引1的元素比較;直到條件不滿足為止。

      這句話意思就是,類似於冒泡,把當前元素一直往後冒,直到idle時長小於被比較的元素為止。

    • 6處,把當前元素放進pool中。

    經過上面的處理后,鏈表中存放了全部的抽樣元素,且ide時間最長的,在最右邊。

    對象還有字段存儲空閑時間?

    前面4處,說到,用系統的當前時間,減去對象的lru時間。

    大家看看對象的結構體

    typedef struct redisObject {
    
        // 類型
        unsigned type:4;
    
        // 編碼
        unsigned encoding:4;
    
        //1 對象最後一次被訪問的時間
        unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */
    
        // 引用計數
        int refcount;
    
        // 指向實際值的指針
        void *ptr;
    
    } robj;
    

    上面1處,lru屬性,就是用來存儲這個。

    創建對象時,直接使用當前系統時間創建

    robj *createObject(int type, void *ptr) {
    
        robj *o = zmalloc(sizeof(*o));
    
        o->type = type;
        o->encoding = REDIS_ENCODING_RAW;
        o->ptr = ptr;
        o->refcount = 1;
    
        /*1 Set the LRU to the current lruclock (minutes resolution). */
        o->lru = LRU_CLOCK();
        return o;
    }
    

    1處即是。

    robj *createEmbeddedStringObject(char *ptr, size_t len) {
        robj *o = zmalloc(sizeof(robj)+sizeof(struct sdshdr)+len+1);
        struct sdshdr *sh = (void*)(o+1);
    
        o->type = REDIS_STRING;
        o->encoding = REDIS_ENCODING_EMBSTR;
        o->ptr = sh+1;
        o->refcount = 1;
        // 1
        o->lru = LRU_CLOCK();
    
        sh->len = len;
        sh->free = 0;
        if (ptr) {
            memcpy(sh->buf,ptr,len);
            sh->buf[len] = '\0';
        } else {
            memset(sh->buf,0,len+1);
        }
        return o;
    }
    

    1處即是。

    每次查找該key時,刷新時間

    robj *lookupKey(redisDb *db, robj *key) {
    
        // 查找鍵空間
        dictEntry *de = dictFind(db->dict,key->ptr);
    
        // 節點存在
        if (de) {
            
    
            // 取出值
            robj *val = dictGetVal(de);
    
            /* Update the access time for the ageing algorithm.
             * Don't do it if we have a saving child, as this will trigger
             * a copy on write madness. */
            // 更新時間信息(只在不存在子進程時執行,防止破壞 copy-on-write 機制)
            if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
                // 1
                val->lru = LRU_CLOCK();
    
            // 返回值
            return val;
        } else {
    
            // 節點不存在
    
            return NULL;
        }
    }
    

    1處即是,包括get、set等各種操作,都會刷新該時間。

    仔細看下面的堆棧,set的,get同理:

    總結

    大家有沒有更清楚一些呢?

    總的來說,就是,設置了max-memory后,達到該內存限制后,會在處理命令時,檢查是否要進行內存淘汰;如果要淘汰,則根據maxmemory-policy的策略來。

    隨機選擇maxmemory-sample個元素,按照空閑時間排序,拉鏈表;挨個挨個清除。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    南投搬家公司費用需注意的眉眉角角,別等搬了再說!

    ※教你寫出一流的銷售文案?

  • 老大吩咐的可重入分佈式鎖,終於完美的實現了!!!

    老大吩咐的可重入分佈式鎖,終於完美的實現了!!!

    重做永遠比改造簡單

    最近在做一個項目,將一個其他公司的實現系統(下文稱作舊系統),完整的整合到自己公司的系統(下文稱作新系統)中,這其中需要將對方實現的功能完整在自己系統也實現一遍。

    舊系統還有一批存量商戶,為了不影響存量商戶的體驗,新系統提供的對外接口,還必須得跟以前一致。最後系統完整切換之後,功能只運行在新系統中,這就要求舊系統的數據還需要完整的遷移到新系統中。

    當然這些在做這個項目之前就有預期,想過這個過程很難,但是沒想到有那麼難。原本感覺排期大半年,時間還是挺寬裕,現在感覺就是大坑,還不得不在坑裡一點點去填。

    哎,說多都是淚,不吐槽了,等到下次做完再給大家復盤下真正心得體會。

    回到正文,上篇文章Redis 分佈式鎖,咱們基於 Redis 實現一個分佈式鎖。這個分佈式鎖基本功能沒什麼問題,但是缺少可重入的特性,所以這篇文章小黑哥就帶大家來實現一下可重入的分佈式鎖。

    本篇文章將會涉及以下內容:

    • 可重入
    • 基於 ThreadLocal 實現方案
    • 基於 Redis Hash 實現方案

    先贊后看,養成習慣。微信搜索「程序通事」,關注就完事了~

    可重入

    說到可重入鎖,首先我們來看看一段來自 wiki 上可重入的解釋:

    若一個程序或子程序可以“在任意時刻被中斷然後操作系統調度執行另外一段代碼,這段代碼又調用了該子程序不會出錯”,則稱其為可重入(reentrant或re-entrant)的。即當該子程序正在運行時,執行線程可以再次進入並執行它,仍然獲得符合設計時預期的結果。與多線程併發執行的線程安全不同,可重入強調對單個線程執行時重新進入同一個子程序仍然是安全的。

    當一個線程執行一段代碼成功獲取鎖之後,繼續執行時,又遇到加鎖的代碼,可重入性就就保證線程能繼續執行,而不可重入就是需要等待鎖釋放之後,再次獲取鎖成功,才能繼續往下執行。

    用一段 Java 代碼解釋可重入:

    public synchronized void a() {
        b();
    }
    
    public synchronized void b() {
        // pass
    }
    

    假設 X 線程在 a 方法獲取鎖之後,繼續執行 b 方法,如果此時不可重入,線程就必須等待鎖釋放,再次爭搶鎖。

    鎖明明是被 X 線程擁有,卻還需要等待自己釋放鎖,然後再去搶鎖,這看起來就很奇怪,我釋放我自己~

    可重入性就可以解決這個尷尬的問題,當線程擁有鎖之後,往後再遇到加鎖方法,直接將加鎖次數加 1,然後再執行方法邏輯。退出加鎖方法之後,加鎖次數再減 1,當加鎖次數為 0 時,鎖才被真正的釋放。

    可以看到可重入鎖最大特性就是計數,計算加鎖的次數。所以當可重入鎖需要在分佈式環境實現時,我們也就需要統計加鎖次數。

    分佈式可重入鎖實現方式有兩種:

    • 基於 ThreadLocal 實現方案
    • 基於 Redis Hash 實現方案

    首先我們看下基於 ThreadLocal 實現方案。

    基於 ThreadLocal 實現方案

    實現方式

    Java 中 ThreadLocal可以使每個線程擁有自己的實例副本,我們可以利用這個特性對線程重入次數進行技術。

    下面我們定義一個ThreadLocal的全局變量 LOCKS,內存存儲 Map 實例變量。

    private static ThreadLocal<Map<String, Integer>> LOCKS = ThreadLocal.withInitial(HashMap::new);
    

    每個線程都可以通過 ThreadLocal獲取自己的 Map實例,Mapkey 存儲鎖的名稱,而 value存儲鎖的重入次數。

    加鎖的代碼如下:

    /**
     * 可重入鎖
     *
     * @param lockName  鎖名字,代表需要爭臨界資源
     * @param request   唯一標識,可以使用 uuid,根據該值判斷是否可以重入
     * @param leaseTime 鎖釋放時間
     * @param unit      鎖釋放時間單位
     * @return
     */
    public Boolean tryLock(String lockName, String request, long leaseTime, TimeUnit unit) {
        Map<String, Integer> counts = LOCKS.get();
        if (counts.containsKey(lockName)) {
            counts.put(lockName, counts.get(lockName) + 1);
            return true;
        } else {
            if (redisLock.tryLock(lockName, request, leaseTime, unit)) {
                counts.put(lockName, 1);
                return true;
            }
        }
        return false;
    }
    

    ps: redisLock#tryLock 為上一篇文章實現的分佈鎖。

    由於公號外鏈無法直接跳轉,關注『程序通事』,回復分佈式鎖獲取源代碼。

    加鎖方法首先判斷當前線程是否已經已經擁有該鎖,若已經擁有,直接對鎖的重入次數加 1。

    若還沒擁有該鎖,則嘗試去 Redis 加鎖,加鎖成功之後,再對重入次數加 1 。

    釋放鎖的代碼如下:

    /**
     * 解鎖需要判斷不同線程池
     *
     * @param lockName
     * @param request
     */
    public void unlock(String lockName, String request) {
        Map<String, Integer> counts = LOCKS.get();
        if (counts.getOrDefault(lockName, 0) <= 1) {
            counts.remove(lockName);
            Boolean result = redisLock.unlock(lockName, request);
            if (!result) {
                throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName:+" + lockName + " with request: "
                        + request);
            }
    
        } else {
            counts.put(lockName, counts.get(lockName) - 1);
        }
    }
    

    釋放鎖的時首先判斷重入次數,若大於 1,則代表該鎖是被該線程擁有,所以直接將鎖重入次數減 1 即可。

    若當前可重入次數小於等於 1,首先移除 Map中鎖對應的 key,然後再到 Redis 釋放鎖。

    這裏需要注意的是,當鎖未被該線程擁有,直接解鎖,可重入次數也是小於等於 1 ,這次可能無法直接解鎖成功。

    ThreadLocal 使用過程要記得及時清理內部存儲實例變量,防止發生內存泄漏,上下文數據串用等問題。

    下次咱來聊聊最近使用 ThreadLocal 寫的 Bug。

    相關問題

    使用 ThreadLocal 這種本地記錄重入次數,雖然真的簡單高效,但是也存在一些問題。

    過期時間問題

    上述加鎖的代碼可以看到,重入加鎖時,僅僅對本地計數加 1 而已。這樣可能就會導致一種情況,由於業務執行過長,Redis 已經過期釋放鎖。

    而再次重入加鎖時,由於本地還存在數據,認為鎖還在被持有,這就不符合實際情況。

    如果要在本地增加過期時間,還需要考慮本地與 Redis 過期時間一致性的,代碼就會變得很複雜。

    不同線程/進程可重入問題

    狹義上可重入性應該只是對於同一線程的可重入,但是實際業務可能需要不同的應用線程之間可以重入同把鎖。

    ThreadLocal的方案僅僅只能滿足同一線程重入,無法解決不同線程/進程之間重入問題。

    不同線程/進程重入問題就需要使用下述方案 Redis Hash 方案解決。

    基於 Redis Hash 可重入鎖

    實現方式

    ThreadLocal 的方案中我們使用了 Map 記載鎖的可重入次數,而 Redis 也同樣提供了 Hash (哈希表)這種可以存儲鍵值對數據結構。所以我們可以使用 Redis Hash 存儲的鎖的重入次數,然後利用 lua 腳本判斷邏輯。

    加鎖的 lua 腳本如下:

    ---- 1 代表 true
    ---- 0 代表 false
    
    if (redis.call('exists', KEYS[1]) == 0) then
        redis.call('hincrby', KEYS[1], ARGV[2], 1);
        redis.call('pexpire', KEYS[1], ARGV[1]);
        return 1;
    end ;
    if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
        redis.call('hincrby', KEYS[1], ARGV[2], 1);
        redis.call('pexpire', KEYS[1], ARGV[1]);
        return 1;
    end ;
    return 0;
    

    如果 KEYS:[lock],ARGV[1000,uuid]

    不熟悉 lua 語言同學也不要怕,上述邏輯還是比較簡單的。

    加鎖代碼首先使用 Redis exists 命令判斷當前 lock 這個鎖是否存在。

    如果鎖不存在的話,直接使用 hincrby創建一個鍵為 lock hash 表,並且為 Hash 表中鍵為 uuid 初始化為 0,然後再次加 1,最後再設置過期時間。

    如果當前鎖存在,則使用 hexists判斷當前 lock 對應的 hash 表中是否存在 uuid 這個鍵,如果存在,再次使用 hincrby 加 1,最後再次設置過期時間。

    最後如果上述兩個邏輯都不符合,直接返回。

    加鎖代碼如下:

    // 初始化代碼
    
    String lockLuaScript = IOUtils.toString(ResourceUtils.getURL("classpath:lock.lua").openStream(), Charsets.UTF_8);
    lockScript = new DefaultRedisScript<>(lockLuaScript, Boolean.class);
    
    /**
     * 可重入鎖
     *
     * @param lockName  鎖名字,代表需要爭臨界資源
     * @param request   唯一標識,可以使用 uuid,根據該值判斷是否可以重入
     * @param leaseTime 鎖釋放時間
     * @param unit      鎖釋放時間單位
     * @return
     */
    public Boolean tryLock(String lockName, String request, long leaseTime, TimeUnit unit) {
        long internalLockLeaseTime = unit.toMillis(leaseTime);
        return stringRedisTemplate.execute(lockScript, Lists.newArrayList(lockName), String.valueOf(internalLockLeaseTime), request);
    }
    

    Spring-Boot 2.2.7.RELEASE

    只要搞懂 Lua 腳本加鎖邏輯,Java 代碼實現還是挺簡單的,直接使用 SpringBoot 提供的 StringRedisTemplate 即可。

    解鎖的 Lua 腳本如下:

    -- 判斷 hash set 可重入 key 的值是否等於 0
    -- 如果為 0 代表 該可重入 key 不存在
    if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then
        return nil;
    end ;
    -- 計算當前可重入次數
    local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1);
    -- 小於等於 0 代表可以解鎖
    if (counter > 0) then
        return 0;
    else
        redis.call('del', KEYS[1]);
        return 1;
    end ;
    return nil;
    

    首先使用 hexists 判斷 Redis Hash 表是否存給定的域。

    如果 lock 對應 Hash 表不存在,或者 Hash 表不存在 uuid 這個 key,直接返回 nil

    若存在的情況下,代表當前鎖被其持有,首先使用 hincrby使可重入次數減 1 ,然後判斷計算之後可重入次數,若小於等於 0,則使用 del 刪除這把鎖。

    解鎖的 Java 代碼如下:

    // 初始化代碼:
    
    
    String unlockLuaScript = IOUtils.toString(ResourceUtils.getURL("classpath:unlock.lua").openStream(), Charsets.UTF_8);
    unlockScript = new DefaultRedisScript<>(unlockLuaScript, Long.class);
    
    /**
     * 解鎖
     * 若可重入 key 次數大於 1,將可重入 key 次數減 1 <br>
     * 解鎖 lua 腳本返回含義:<br>
     * 1:代表解鎖成功 <br>
     * 0:代表鎖未釋放,可重入次數減 1 <br>
     * nil:代表其他線程嘗試解鎖 <br>
     * <p>
     * 如果使用 DefaultRedisScript<Boolean>,由於 Spring-data-redis eval 類型轉化,<br>
     * 當 Redis 返回  Nil bulk, 默認將會轉化為 false,將會影響解鎖語義,所以下述使用:<br>
     * DefaultRedisScript<Long>
     * <p>
     * 具體轉化代碼請查看:<br>
     * JedisScriptReturnConverter<br>
     *
     * @param lockName 鎖名稱
     * @param request  唯一標識,可以使用 uuid
     * @throws IllegalMonitorStateException 解鎖之前,請先加鎖。若為加鎖,解鎖將會拋出該錯誤
     */
    public void unlock(String lockName, String request) {
        Long result = stringRedisTemplate.execute(unlockScript, Lists.newArrayList(lockName), request);
        // 如果未返回值,代表其他線程嘗試解鎖
        if (result == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName:+" + lockName + " with request: "
                    + request);
        }
    }
    

    解鎖代碼執行方式與加鎖類似,只不過解鎖的執行結果返回類型使用 Long。這裏之所以沒有跟加鎖一樣使用 Boolean ,這是因為解鎖 lua 腳本中,三個返回值含義如下:

    • 1 代表解鎖成功,鎖被釋放
    • 0 代表可重入次數被減 1
    • null 代表其他線程嘗試解鎖,解鎖失敗

    如果返回值使用 BooleanSpring-data-redis 進行類型轉換時將會把 null 轉為 false,這就會影響我們邏輯判斷,所以返回類型只好使用 Long

    以下代碼來自 JedisScriptReturnConverter

    相關問題

    spring-data-redis 低版本問題

    如果 Spring-Boot 使用 Jedis 作為連接客戶端,並且使用Redis Cluster 集群模式,需要使用 2.1.9 以上版本的spring-boot-starter-data-redis,不然執行過程中將會拋出:

    org.springframework.dao.InvalidDataAccessApiUsageException: EvalSha is not supported in cluster environment.
    

    如果當前應用無法升級 spring-data-redis也沒關係,可以使用如下方式,直接使用原生 Jedis 連接執行 lua 腳本。

    以加鎖代碼為例:

    public boolean tryLock(String lockName, String reentrantKey, long leaseTime, TimeUnit unit) {
        long internalLockLeaseTime = unit.toMillis(leaseTime);
        Boolean result = stringRedisTemplate.execute((RedisCallback<Boolean>) connection -> {
            Object innerResult = eval(connection.getNativeConnection(), lockScript, Lists.newArrayList(lockName), Lists.newArrayList(String.valueOf(internalLockLeaseTime), reentrantKey));
            return convert(innerResult);
        });
        return result;
    }
    
    private Object eval(Object nativeConnection, RedisScript redisScript, final List<String> keys, final List<String> args) {
    
        Object innerResult = null;
        // 集群模式和單點模式雖然執行腳本的方法一樣,但是沒有共同的接口,所以只能分開執行
        // 集群
        if (nativeConnection instanceof JedisCluster) {
            innerResult = evalByCluster((JedisCluster) nativeConnection, redisScript, keys, args);
        }
        // 單點
        else if (nativeConnection instanceof Jedis) {
            innerResult = evalBySingle((Jedis) nativeConnection, redisScript, keys, args);
        }
        return innerResult;
    }
    

    數據類型轉化問題

    如果使用 Jedis 原生連接執行 Lua 腳本,那麼可能又會碰到數據類型的轉換坑。

    可以看到 Jedis#eval返回 Object,我們需要具體根據 Lua 腳本的返回值的,再進行相關轉化。這其中就涉及到 Lua 數據類型轉化為 Redis 數據類型。

    下面主要我們來講下 Lua 數據轉化 Redis 的規則中幾條比較容易踩坑:

    1、Lua number 與 Redis 數據類型轉換

    Lua 中 number 類型是一個雙精度的浮點數,但是 Redis 只支持整數類型,所以這個轉化過程將會丟棄小數位。

    2、Lua boolean 與 Redis 類型轉換

    這個轉化比較容易踩坑,Redis 中是不存在 boolean 類型,所以當Lua 中 true 將會轉為 Redis 整數 1。而 Lua 中 false 並不是轉化整數,而是轉化 null 返回給客戶端。

    3、Lua nil 與 Redis 類型轉換

    Lua nil 可以當做是一個空值,可以等同於 Java 中的 null。在 Lua 中如果 nil 出現在條件表達式,將會當做 false 處理。

    所以 Lua nil 也將會 null 返回給客戶端。

    其他轉化規則比較簡單,詳情參考:

    http://doc.redisfans.com/script/eval.html

    總結

    可重入分佈式鎖關鍵在於對於鎖重入的計數,這篇文章主要給出兩種解決方案,一種基於 ThreadLocal 實現方案,這種方案實現簡單,運行也比較高效。但是若要處理鎖過期的問題,代碼實現就比較複雜。

    另外一種採用 Redis Hash 數據結構實現方案,解決了 ThreadLocal 的缺陷,但是代碼實現難度稍大,需要熟悉 Lua 腳本,以及Redis 一些命令。另外使用 spring-data-redis 等操作 Redis 時不經意間就會遇到各種問題。

    幫助

    https://www.sofastack.tech/blog/sofa-jraft-rheakv-distributedlock/

    https://tech.meituan.com/2016/09/29/distributed-system-mutually-exclusive-idempotence-cerberus-gtis.html

    最後說兩句(求關注)

    看完文章,哥哥姐姐們點個吧,周更真的超累,不知覺又寫了两天,拒絕白嫖,來點正反饋唄~。

    最後感謝各位的閱讀,才疏學淺,難免存在紕漏,如果你發現錯誤的地方,可以留言指出。如果看完文章還有其他不懂的地方,歡迎加我,互相學習,一起成長~

    最後謝謝大家支持~

    最最後,重要的事再說一篇~

    快來關注我呀~
    快來關注我呀~
    快來關注我呀~

    歡迎關注我的公眾號:程序通事,獲得日常乾貨推送。如果您對我的專題內容感興趣,也可以關注我的博客:studyidea.cn

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※想知道最厲害的網頁設計公司"嚨底家"!

    ※別再煩惱如何寫文案,掌握八大原則!

    ※產品缺大量曝光嗎?你需要的是一流包裝設計!

  • 雀巢號召新創尖兵 加速開發乳製品替代品

    摘錄自2020年9月29日中央社報導

    瑞士食品業巨擘雀巢集團(Nestle)今(29日)發表聲明稿說:「公司擬將旗下位於瑞士科諾爾芬根(Konolfingen)的研發中心,開放給新創公司、學生和科學家。」,加速開發以植物為主的乳製品替代品。

    雀巢表示,將會有內部、外部以及混合編組團隊在研發中心工作,為期六個月。

    除了對永續乳製品進行測試外,集團也計畫鼓勵開發以植物為基礎的乳製品替代品。雀巢發表以此程序研發出來的一種使用蔬菜為基礎乳品。

    氣候變遷
    國際新聞
    瑞士
    乳製品
    素食

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※帶您來了解什麼是 USB CONNECTOR  ?

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    ※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

    ※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※教你寫出一流的銷售文案?