標籤: 銷售文案

  • 空間大做工好!這是6萬元區間最好的7座車?

    空間大做工好!這是6萬元區間最好的7座車?

    相比較7座SUV來說,歐尚A800的後備箱也小有優勢,主要體現在A800的後備箱高度和進深上,在這兩個參數上A800十分有優勢,超過1米的後備箱高度十分誇張。A800搭載了一台1。5T渦輪增壓發動機,型號為JL476ZQCD,這台全鋁發動機帶有DVVT技術,最大功率156馬力,最大扭矩225牛米,參數並不是很高。

    看過了非常適合家用的SUV奇駿、夠大夠霸氣的銳界、大氣實用的奧德賽、精緻好用的途安L之後,你是覺得SUV好還是MpV好呢?有興趣的朋友可以點擊鏈接查看往期文章:

    奧德賽:67.9分

    途安L:64.6分

    銳界:66分

    奇駿:65.4分

    說起8萬左右的MpV車型,就不得不提歐尚A800了!它最大的特點當然是空間、動力以及配置,這些方面它絲毫不遜於對手寶駿,而且由於這台車還是我們的工作車的原因,長期使用下來我們對它也是非常熟悉,歐尚A800外表雖然不算出色,但是論及內在絕對是一名出色的选手!

    在測試中歐尚A800也表現出了強大的實力,無論是在外觀品質、動力表現以及車內空間上都可圈可點。

    相比較長安以往的車型,歐尚A800在設計上盡量營造出時尚感與精緻感,從外觀很多細節上都能看到它的設計思路,這樣的造型設計顯然是成功的,A800雖然尺寸龐大,但是看上去卻並不顯臃腫,而且較大的車窗也能夠提供非常不錯的採光。

    內飾也是如此,我們這台高配車型中控台非常簡潔,碩大的屏幕與空調操作區的按鈕擺放都很有檔次感,全液晶儀錶盤在這個價格區間的車型里也十分少見,加上內飾的材質比較考究,整體營造的氛圍還是不錯的。

    A800的外觀工藝相比較更高價位的車型也毫不遜色,無論是外觀的鈑金縫隙,還是車漆的噴漆均勻度都很不錯,不過車漆的厚度平均不足100微米則有點太薄了。

    雖然內飾看上去不錯,但是受限於價格,A800在內飾材質上大面積使用了硬塑料,如果真的談及觸感的話還是顯得有一些廉價,不過好在內飾的拼裝工藝還是不錯的,塑料件也沒有毛刺。

    有了龐大的尺寸以及方正的設計,A800的內部空間可以說十分寬裕,無論是前排後排還是第三排空間都可以用寬敞來形容,而且A800的第二排還是採用獨立座椅設計,相比較大多數轎車來說都要更加舒適,不過受限於第三排地板以及空間,第三排的座椅規格比前兩排要小一些,硬度上也更硬一點。

    相比較7座SUV來說,歐尚A800的後備箱也小有優勢,主要體現在A800的後備箱高度和進深上,在這兩個參數上A800十分有優勢,超過1米的後備箱高度十分誇張。

    A800搭載了一台1.5T渦輪增壓發動機,型號為JL476ZQCD,這台全鋁發動機帶有DVVT技術,最大功率156馬力,最大扭矩225牛米,參數並不是很高。

    與之匹配的是6擋手動變速箱,這台變速箱齒比比較綿密,尤其是前兩個擋位可以說是為拉貨設計的,非常大的齒比對於載重來說是一件好事。

    不過由於齒比比較綿密,因此在加速上A800就有些吃虧了,2擋僅能跑到70km/h的速度來,再升上3擋之後才能破百,而3擋的加速度就遠不如1/2擋了,因此最終A800的破百成績為12.5秒,這樣的成績對於這台大傢伙來說倒也還算可以。

    作為一台MpV車型,A800顯然和運動扯不上關係,對於這類車型來說我們的要求也就是好開,從這個角度考慮A800確實算得上不錯,首先A800的離合點十分清晰,變速箱的換擋手感也不錯!加上發動機的低扭還算不錯,開起來比較得心應手。

    不過由於尺寸龐大且車身較高,懸挂也偏軟,因此A800在高速行駛的穩定性上和轎車以及多數SUV比還是不佔優勢,尤其是面對橫風的時候需要更加集中精力駕駛。

    雖然加速成績是橫評車型里最慢的,不過在實際動力感受上還是不錯,尤其是低速駕駛的時候會感覺車子很有力,再加上不錯的變速箱,A800是一台很能輕鬆駕馭的手動擋車型。

    對於這類型的MpV,其實最讓人擔心的就是隔音了,由於車內空間比較大,車子的迎風面積也大,所以容易在第二/三排產生較大的共鳴聲和風聲,不過在實際體驗中A800這個問題倒也不算嚴重,當然相比較轎車那肯定是差一些了。

    在售價上歐尚A800的指導價算是自主入門MpV中比較低的了,性價比還是不錯的。

    A800在諸多方面的表現都堪稱出色,優異的配置、不錯的駕駛感受和寬敞的空間都是它的優勢所在,對於這個價位買車的消費者來說這恰恰也是它們最關心的,再加上較低的售價使得這款車有了不錯的性價比,所以在6-9萬的MpV市場中A800確實算得上一個稱心的好選擇!

    本站聲明:網站內容來源於http://www.auto6s.com/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※超省錢租車方案

    ※別再煩惱如何寫文案,掌握八大原則!

    ※回頭車貨運收費標準

    ※教你寫出一流的銷售文案?

    ※產品缺大量曝光嗎?你需要的是一流包裝設計!

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

  • ThreadLocal源碼解析-Java8,ThreadLocal的使用場景分析,利用線性探測法解決hash衝突,Java-強引用、軟引用、弱引用、虛引用,利用線性探測法解決hash衝突,分析ThreadLocal的弱引用與內存泄漏問題-Java8

    ThreadLocal源碼解析-Java8,ThreadLocal的使用場景分析,利用線性探測法解決hash衝突,Java-強引用、軟引用、弱引用、虛引用,利用線性探測法解決hash衝突,分析ThreadLocal的弱引用與內存泄漏問題-Java8

    目錄

    一.ThreadLocal介紹

      1.1 ThreadLocal的功能

      1.2 ThreadLocal使用示例

    二.源碼分析-ThreadLocal

      2.1 ThreadLocal的類層級關係

      2.2 ThreadLocal的屬性字段

      2.3 創建ThreadLocal對象

      2.4 ThreadLocal-set操作

      2.5 ThreadLocal-get操作

      2.6 ThreadLocal-remove操作

    三.ThreadLocalMap類

      3.0 線性探測算法解決hash衝突

      3.1 Entry內部類

      3.2 ThreadLocalMap的常量介紹

      3.3 實例化ThreadLocalMap

      3.4 ThreadLocalMap的set操作

      3.5 清理陳舊Entry和rehash

    四.總結 

     

    一.介紹ThreadLocal

    1.1ThreadLocal的功能

      我們知道,變量從作用域範圍進行分類,可以分為“全局變量”、“局部變量”兩種:

      1.全局變量(global variable),比如類的靜態屬性(加static關鍵字),在類的整個生命周期都有效;

      2.局部變量(local variable),比如在一個方法中定義的變量,作用域只是在當前方法內,方法執行完畢后,變量就銷毀(釋放)了;

      使用全局變量,當多個線程同時修改靜態屬性,就容易出現併發問題,導致臟數據;而局部變量一般來說不會出現併發問題(在方法中開啟多線程併發修改局部變量,仍可能引起併發問題);

      再看ThreadLocal,可以用來保存局部變量,只不過這個“局部”是指“線程”作用域,也就是說,該變量在該線程的整個生命周期中有效。

      關於ThreadLocal的使用場景,可以查看ThreadLocal的使用場景分析。

     

    1.2ThreadLocal的使用示例

      ThreadLocal使用非常簡單。

    package cn.ganlixin;
    
    import org.junit.Test;
    
    import java.util.Arrays;
    import java.util.List;
    
    public class TestThreadLocal {
    
        private static class Goods {
            public Integer id;
            public List<String> tags;
        }
    
        @Test
        public void testReference() {
            Goods goods1 = new Goods();
            goods1.id = 10;
            goods1.tags = Arrays.asList("healthy", "cheap");
    
            ThreadLocal<Goods> threadLocal = new ThreadLocal<>();
            threadLocal.set(goods1);
    
            Goods goods2 = threadLocal.get();
            System.out.println(goods1); // cn.ganlixin.TestThreadLocal$Goods@1c655221
            System.out.println(goods2); // cn.ganlixin.TestThreadLocal$Goods@1c655221
    
            goods2.id = 100;
            System.out.println(goods1.id);  // 100
            System.out.println(goods2.id);  // 100
    
            threadLocal.remove();
            System.out.println(threadLocal.get()); // null
        }
    
        @Test
        public void test2() {
            // 一個線程中,可以創建多個ThreadLocal對象,多個ThreadLoca對象互不影響
            ThreadLocal<String> threadLocal1 = new ThreadLocal<>();
            ThreadLocal<String> threadLocal2 = new ThreadLocal<>();
            // ThreadLocal存的值默認為null
    
            System.out.println(threadLocal1.get()); // null
    
            threadLocal1.set("this is value1");
            threadLocal2.set("this is value2");
            System.out.println(threadLocal1.get()); // this is value1
            System.out.println(threadLocal2.get());  // this is value2
    
            // 可以重寫initialValue進行設置初始值
            ThreadLocal<String> threadLocal3 = new ThreadLocal<String>() {
                @Override
                protected String initialValue() {
                    return "this is initial value";
                }
            };
            System.out.println(threadLocal3.get()); // this is initial value
        }
    }
    

      

    二.源碼分析-ThreadLocal

    2.1ThreadLocal類層級關係

      

      ThreadLocal類中有一個內部類ThreadLocalMap,這個類特別重要,ThreadLocal的各種操作基本都是圍繞ThreadLocalMap進行的

      對於ThreadLocalMap有來說,它內部定義了一個Entry內部類,有一個table屬性,是一個Entry數組,和HashMap有一些相似的地方,但是ThreadLocalMap和HashMap並沒有什麼關係。

      先大概看一下內存關係圖,不理解也沒關係,看了後面的代碼應該就能理解了:

       

      大概解釋一下,棧中的Thread ref(引用)堆中的Thread對象,Thread對象有一個屬性threadlocals(ThreadLocalMap類型),這個Map中每一項(Entry)的value是ThreadLocal.set()的值,而Map的key則是ThreadLocal對象。

      下面在介紹源碼的時候,會從兩部分進行介紹,先介紹ThreadLocal的常用api,然後再介紹ThreadLocalMap,因為ThreadLocal的api內部其實都是在操作ThreadLocalMap,所以看源碼時一定要知道他們倆之間的關係

     

    2.2ThreadLocal的屬性

      ThreadLocal有3個屬性,主要的功能就是生成ThreadLocal的hash值。

    // threadLocalHashCode用來表示當前ThreadLocal對象的hashCode,通過計算獲得
    private final int threadLocalHashCode = nextHashCode();
    
    // 一個AtomicInteger類型的屬性,功能就是計數,各種操作都是原子性的,在併發時不會出現問題
    private static AtomicInteger nextHashCode = new AtomicInteger();
    
    // hash值的增量,不是隨便指定的,被稱為“黃金分割數”,能讓hash結果均衡分佈
    private static final int HASH_INCREMENT = 0x61c88647;
    
    /**
     * 通過計算,為當前ThreadLocal對象生成一個HashCode
     */
    private static int nextHashCode() {
        // 獲取當前nextHashCode,然後遞增HASH_INCREMENT
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }
    

      

    2.3創建ThreadLocal對象

      ThreadLocal類,只有一個無參構造器,如果需要是指默認值,則可以重寫initialValue方法:

    public ThreadLocal() {}
    
    /**
     * 初始值默認為null,要設置初始值,只需要設置為方法返回值即可
     *
     * @return ThreadLocal的初始值
     */
    protected T initialValue() {
        return null;
    }
    

      需要注意的是initialValue方法並不會在創建ThreadLocal對象的時候設置初始值,而是延遲執行:當ThreadLocal直接調用get時才會觸發initialValue執行(get之前沒有調用set來設置過值),initialValue方法在後面還會介紹。 

     

    2.4ThreadLocal-set操作

      下面這段代碼只給出了ThreadLocal的set代碼:

    public void set(T value) {
        // 獲取當前線程
        Thread t = Thread.currentThread();
    
        // 獲取當前線程的ThreadLocalMap屬性,ThreadLocal有一個threadLocals屬性(ThreadLocalMap類型)
        ThreadLocalMap map = getMap(t);
    
        if (map != null) {
            // 如果當前線程有關聯的ThreadLocalMap對象,則調用ThreadLocalMap的set方法進行設置
            map.set(this, value);
        } else {
            // 創建一個與當前線程關聯的ThreadLocalMap對象,並設置對應的value
            createMap(t, value);
        }
    }
    
    /**
     * 獲取線程關聯的ThreadLocalMap對象
     */
    ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }
    
    /**
     * 創建ThreadLocalMap
     * @param t          key為當前線程
     * @param firstValue value為ThreadLocal.set的值
     */
    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }

      如果想立即了解ThreadLocalMap的set方法,則可點此跳轉!

     

    2.5ThreadLocal-get操作

      前面說過“重寫ThreadLocal的initialValue方法來設置ThreadLocal的默認值,並不是在創建ThreadLocal的時候執行的,而是在直接get的時候執行的”,看了下面的代碼,就知道這句話的具體含義了,感覺設計很巧妙:

    public T get() {
        // 獲取當前線程
        Thread t = Thread.currentThread();
    
        // 獲取當前線程對象的threadLocals屬性
        ThreadLocalMap map = getMap(t);
    
        // 若當前線程對象的threadLocals屬性不為空(map不為空)
        if (map != null) {
            // 當前ThreadLocal對象作為key,獲取ThreadLocalMap中對應的Entry
            ThreadLocalMap.Entry e = map.getEntry(this);
    
            // 如果找到對應的Entry,則證明該線程的該ThreadLocal有值,返回值即可
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T) e.value;
                return result;
            }
        }
    
        // 1.當前線程對象的threadLocals屬性為空(map為空)
        // 2.或者map不為空,但是未在map中查詢到以該ThreadLocal對象為key對應的entry
        // 這兩種情況,都會進行設置初始值,並將初始值返回
        return setInitialValue();
    }
    
    /**
     * 設置ThreadLocal初始值
     *
     * @return 初始值
     */
    private T setInitialValue() {
        // 調用initialValue方法,該方法可以在創建ThreadLocal的時候重寫
        T value = initialValue();
        Thread t = Thread.currentThread();
    
        // 獲取當前線程的threadLocals屬性(map)
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            // threadLocals屬性值不為空,則進行調用ThreadLocalMap的set方法
            map.set(this, value);
        } else {
            // 沒有關聯的threadLocals,則創建ThreadLocalMap,並在map中新增一個Entry
            createMap(t, value);
        }
    
        // 返回初始值
        return value;
    }
    
    /**
     * 初始值默認為null,要設置初始值,只需要設置為方法返回值即可
     * 創建ThreadLocal設置默認值,可以覆蓋initialValue方法,initialValue方法不是在創建ThreadLocal時執行,而是這個時候執行
     *
     * @return ThreadLocal的初始值
     */
    protected T initialValue() {
        return null;
    }
    

         

    2.6ThreadLocal-remove操作

      一般是在ThreadLocal對象使用完后,調用ThreadLocal的remove方法,在一定程度上,可以避免內存泄露;

     

    /**
     * 刪除當前線程中threadLocals屬性(map)中的Entry(以當前ThreadLocal為key的)
     */
    public void remove() {
        // 獲取當前線程的threadLocals屬性(ThreadLocalMap)
        ThreadLocalMap m = getMap(Thread.currentThread());
    
        if (m != null) {
            // 調用ThreadLocalMap的remove方法,刪除map中以當前ThreadLocal為key的entry
            m.remove(this);
        }
    }

     

    三.ThreadLocalMap內部類

    3.0 線性探測算法解決hash衝突

      在介紹ThreadLocalMap的之前,強烈建議先了解一下線性探測算法,這是一種解決Hash衝突的方案,如果不了解這個算法就去看ThreadLocalMap的源碼就會非常吃力,會感到莫名其妙。

      鏈接在此:利用線性探測法解決hash衝突

     

    3.1Entry內部類

      ThreadLocalMap是ThreadLocal的內部類,ThreadLocalMap底層使用數組實現,每一個數組的元素都是Entry類型(在ThreadLocalMap中定義的),源碼如下:

    /**
     * ThreadLocalMap中存放的元素類型,繼承了弱引用類
     */
    static class Entry extends WeakReference<ThreadLocal<?>> {
        // key對應的value,注意key是ThreadLocal類型
        Object value;
    
        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }

      ThreadLocalMap和HashMap類似,比較一下:

      a:底層都是使用數組實現,數組元素類型都是內部定義,Java8中,HashMap的元素是Node類型(或者TreeNode類型),ThreadLocalMap中的元素類型是Entry類型;

      b.都是通過計算得到一個值,將這個值與數組的長度(容量)進行與操作,確定Entry應該放到哪個位置;

      c.都有初始容量、負載因子,超過擴容閾值將會觸發擴容;但是HashMap的初始容量、負載因子是可以更改的,而ThreadLocalMap的初始容量和負載因子不可修改;

      注意Entry繼承自WeakReference類,在實例化Entry時,將接收的key傳給父類構造器(也就是WeakReference的構造器),WeakReference構造器又將key傳給它的父類構造器(Reference):

    // 創建Reference對象,接受一個引用
    Reference(T referent) {
        this(referent, null);
    }
    
    // 設置引用
    Reference(T referent, ReferenceQueue<? super T> queue) {
        this.referent = referent;
        this.queue = (queue == null) ? ReferenceQueue.NULL : queue;
    }
    

      關於Java的各種引用,可以參考:Java-強引用、軟引用、弱引用、虛引用

     

    3.2ThreadLocalMap的常量介紹

    // ThreadLocalMap的初始容量
    private static final int INITIAL_CAPACITY = 16;
    
    // ThreadLocalMap底層存數據的數組
    private Entry[] table;
    
    // ThreadLocalMap中元素的個數
    private int size = 0;
    
    // 擴容閾值,當size達到閾值時會觸發擴容(loadFactor=2/3;newCapacity=2*oldCapacity)
    private int threshold; // Default to 0
    

      

    3.3創建ThreadLocalMap對象

      創建ThreadLocalMap,是在第一次調用ThreadLocal的set或者get方法時執行,其中第一次未set值,直接調用get時,就會利用ThreadLocal的初始值來創建ThreadLocalMap。

      ThreadLocalMap內部類的源碼如下:

    /**
     * 初始化一個ThreadLocalMap對象(第一次調用ThreadLocal的set方法時創建),傳入ThreadLocal對象和對應的value
     */
    ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
        // 創建一個Entry數組,容量為16(默認)
        table = new Entry[INITIAL_CAPACITY];
    
        // 計算新增的元素,應該放到數組的哪個位置,根據ThreadLocal的hash值與初始容量進行"與"操作
        int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    
        // 創建一個Entry,設置key和value,注意Entry中沒有key屬性,key屬性是傳給Entry的父類WeakReference
        table[i] = new Entry(firstKey, firstValue);
    
        // 初始容量為1
        size = 1;
    
        // 設置擴容閾值
        setThreshold(INITIAL_CAPACITY);
    }
    
    /**
     * 設置擴容閾值,接收容量值,負載因子固定為2/3
     */
    private void setThreshold(int len) {
        threshold = len * 2 / 3;
    }

     

    3.4 ThreadLocalMap的set操作

      ThreadLocal的set方法,其實核心就是調用ThreadLocalMap的set方法,set方法的流程比較長

    /**
     * 為當前ThreadLocal對象設置value
     */
    private void set(ThreadLocal<?> key, Object value) {
        Entry[] tab = table;
        int len = tab.length;
    
        // 計算新元素應該放到哪個位置(這個位置不一定是最終存放的位置,因為可能會出現hash衝突)
        int i = key.threadLocalHashCode & (len - 1);
    
        // 判斷計算出來的位置是否被佔用,如果被佔用,則需要找出應該存放的位置
        for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
            // 獲取Entry中key,也就是弱引用的對象
            ThreadLocal<?> k = e.get();
    
            // 判斷key是否相等(判斷弱引用的是否為同一個ThreadLocal對象)如果是,則進行覆蓋
            if (k == key) {
                e.value = value;
                return;
            }
    
            // k為null,也就是Entry的key已經被回收了,當前的Entry是一個陳舊的元素(stale entry)
            if (k == null) {
                // 用新元素替換掉陳舊元素,同時也會清理其他陳舊元素,防止內存泄露
                replaceStaleEntry(key, value, i);
                return;
            }
        }
    
        // map中沒有ThreadLocal對應的key,或者說沒有找到陳舊的Entry,則創建一個新的Entry,放入數組中
        tab[i] = new Entry(key, value);
        // ThreadLocalMap的元素數量加1
        int sz = ++size;
    
        // 先清理map中key為null的Entry元素,該Entry也應該被回收掉,防止內存泄露
        // 如果清理出陳舊的Entry,那麼就判斷是否需要擴容,如果需要的話,則進行rehash
        if (!cleanSomeSlots(i, sz) && sz >= threshold) {
            rehash();
        }
    }

      上面最後幾行代碼涉及到清理陳舊Entry和rehash,這兩塊的代碼在下面。

     

    3.5清理陳舊Entry和rehash

      陳舊的Entry,是指Entry的key為null,這種情況下,該Entry是不可訪問的,但是卻不會被回收,為了避免出現內存泄漏,所以需要在每次get、set、replace時,進行清理陳舊的Entry,下面只給出一部分代碼:

    /**
     * 清理map中key為null的Entry元素,該Entry也應該被回收掉,防止內存泄露
     *
     * @param i 新Entry插入的位置
     * @param n 數組中元素的數量
     * @return 是否有陳舊的entry的清除
     */
    private boolean cleanSomeSlots(int i, int n) {
        boolean removed = false;
        Entry[] tab = table;
        int len = tab.length;
        do {
            i = nextIndex(i, len);
            Entry e = tab[i];
            if (e != null && e.get() == null) {
                n = len;
                removed = true;
                i = expungeStaleEntry(i);
            }
        } while ((n >>>= 1) != 0);
        return removed;
    }
    
    private void rehash() {
        // 清除底層數組中所有陳舊的(stale)的Entry,也就是key為null的Entry
        // 同時每清除一個Entry,就對其後面的Entry重新計算hash,獲取新位置,使用線性探測法,重新確定最終位置
        expungeStaleEntries();
    
        // 清理完陳舊Entry后,判斷是否需要擴容
        if (size >= threshold - threshold / 4) {
            // 擴容時,容量變為舊容量的2倍,再進行rehash,並使用線性探測發確定Entry的新位置
            resize();
        }
    }
    

      在rehash的時候,涉及到“線性探測法”,是一種用來解決hash衝突的方案,可以查看利用線性探測法解決hash衝突了解詳情。

     

    3.6ThreadLocalMap-remove操作

      remove操作,是調用ThreadLocal.remove()方法時,刪除當前線程的ThreadLocalMap中該ThreadLocal為key的Entry。

    /**
     * 移除當前線程的threadLocals屬性中key為ThreadLocal的Entry
     *
     * @param key 要移除的Entry的key(ThreadLocal對象)
     */
    private void remove(ThreadLocal<?> key) {
        Entry[] tab = table;
        int len = tab.length;
    
        // 計算出該ThreadLocal對應的key應該存放的位置
        int i = key.threadLocalHashCode & (len - 1);
    
        // 找到指定位置,開始按照線性探測算法進行查找到該Thread的Entry
        for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
    
            // 如果Entry的key相同
            if (e.get() == key) {
                // 調用WeakReference的clear方法,Entry的key是弱引用,指向ThreadLocal,現在將key指向null
                // 則該ThreadLocal對象在會在下一次gc時,被垃圾收集器回收
                e.clear();
    
                // 將該位置的Entry中的value置為null,於是value引用的對象也會被垃圾收集器回收(不會造成內存泄漏)
                // 同時內部會調整Entry的順序(開放探測算法的特點,刪除元素後會重新調整順序)
                expungeStaleEntry(i);
    
                return;
            }
        }
    }

     

    四.總結

      在學習ThreadLocal類源碼的過程還是受益頗多的:

      1.ThreadLocal的使用場景;

      2.initialValue的延遲執行;

      3.HashMap使用鏈表+紅黑樹解決hash衝突,ThreadLocalMap使用線性探測算法(開放尋址)解決hash衝突

      另外,ThreadLocal還有一部分內容,是關於弱引用和內存泄漏的問題,可以參考:分析ThreadLocal的弱引用與內存泄漏問題-Java8。

     

      原文地址:https://www.cnblogs.com/-beyond/p/13093032.html

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※帶您來了解什麼是 USB CONNECTOR  ?

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    ※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

    ※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※教你寫出一流的銷售文案?

  • Express4.x之中間件與路由詳解及源碼分析,Express4.x之API:express,Express4.x之API:express

    Express4.x之中間件與路由詳解及源碼分析,Express4.x之API:express,Express4.x之API:express

    • Application.use()
    • Application.router()
    • express核心源碼模擬

     一、express.use()

    1.1app.use([path,] callback [, callback …])

    通過語法結構可以看到Application.use()參數分別有以下幾種情形:

    app.use(function(){...}); //給全局添加一个中間件
    app.use(path,function(){...}); //給指定路由path添加一个中間件
    app.use(function(){...}, function(){...}, ...); //給全局添加n个中間件
    app.use(path,function(){...},function(){...}, ...); //給指定路由path添加n个中間件

    關於path最簡單也是最常用的就是字符串類型(例:‘/abcd’);除了字符串Express還提供了模板和正則格式(例:’/abc?d‘, ‘/ab+cd‘, ‘/ab\*cd‘, ‘/a(bc)?d‘, ‘/\/abc|\/xyz/‘);除了單個的字符串和模板還可以將多個path作為一個數組的元素,然後將這個數組作為use的path,這樣就可以同時給多個路由添加中間件,詳細內容可以參考官方文檔:https://www.expressjs.com.cn/4x/api.html#path-examples。

    關於callbakc多個或單个中間件程序這已經再語法結構中直觀的體現出來了,這裏重點來看看回調函數的參數:

    app.use(function(req,res,next){...}); //必須提供的參數 
    app.use(function(err,req,res,next){...}); //錯誤中間件需要在最前面添加一個錯誤參數

    關於中間件的簡單應用:

    let express = require('./express');
    let app = express();
    app.use('/',function(req,res,next){
        console.log("我是一個全局中間件");
        next(); //每个中間件的最末尾必須調用next
    });
    
    app.use('/',function(err,req,res,next){
        console.log("我是一個全局錯誤中間,當發生錯誤是調用")    
        console.error(err.stack);
        res.status(500).send('服務出錯誤了!');
        //由於這個錯誤處理直接響應了客戶端,可以不再調用next,當然後面還需要處理一些業務的話也是可以調用next的
    });

    1.2簡單的模擬Express源碼實現Appliction.use()以及各個請求方法的響應註冊方法(這裏個源碼模擬路由概念還比較模糊,所以使用請求方法的響應註冊API,而沒有使用路由描述):

     1 //文件結構
     2 express
     3     index.js
     4 //源碼模擬實現
     5 let http = require("http");
     6 let url = require('url');
     7 function createApplication(){
     8     //app是一個監聽函數
     9     let app = (req,res) =>{
    10         //取出每一個層
    11         //1.獲取請求的方法
    12         let m = req.method.toLowerCase();
    13         let {pathname} = url.parse(req.url,true);
    14 
    15         //通過next方法進行迭代
    16         let index = 0;
    17         function next(err){
    18             //如果routes迭代完成還沒有找到,說明路徑不存在
    19             if(index === app.routes.length) return res.end(`Cannot ${m} ${pathname}`);
    20             let {method, path, handler} = app.routes[index++];//每次調用next就應該取下一個layer
    21             if(err){
    22                 if(handler.length === 4){
    23                     handler(err,req,res,next);
    24                 }else{
    25                     next(err);
    26                 }
    27             }else{
    28                 if(method === 'middle'){ //處理中間件
    29                     if(path === '/' || path === pathname || pathname.startsWith(path+'/')){
    30                         handler(req,res,next);
    31                     }else{
    32                         next();//如果這个中間件沒有匹配到,繼續通過next迭代路由容器routes
    33                     }
    34                 }else{ //處理路由
    35                     if( (method === m || method ==='all') && (path === pathname || path === '*')){ //匹配請求方法和請求路徑(接口)
    36                         handler(req,res);//匹配成功后執行的Callback
    37                     }else{
    38                         next();
    39                     }
    40                 }
    41             }
    42         }
    43         next();
    44     }
    45     app.routes = [];//路由容器
    46     app.use = function(path,handler){
    47         if(typeof handler !== 'function'){
    48             handler = path;
    49             path = '/';
    50         }
    51         let layer = {
    52             method:'middle', //method是middle就表示它是一个中間件
    53             path,
    54             handler
    55         }
    56         app.routes.push(layer);
    57     }
    58     app.all = function(path,handler){
    59         let layer = {
    60             method:'all',
    61             path,
    62             handler
    63         }
    64         app.routes.push(layer);
    65     }
    66     console.log(http.METHODS);
    67     http.METHODS.forEach(method =>{
    68         method = method.toLocaleLowerCase();
    69         app[method] = function (path,handler){//批量生成各個請求方法的路由註冊方法
    70             let layer = {
    71                 method,
    72                 path,
    73                 handler
    74             }
    75             app.routes.push(layer);
    76         }
    77     });
    78     //內置中間件,給req擴展path、qury屬性
    79     app.use(function(req,res,next){
    80         let {pathname,query} = url.parse(req.url,true);
    81         let hostname = req.headers['host'].split(':')[0];
    82         req.path = pathname;
    83         req.query = query;
    84         req.hostname = hostname;
    85         next();
    86     });
    87     //通過app.listen調用http.createServer()掛在app(),啟動express服務
    88     app.listen = function(){
    89         let server = http.createServer(app);
    90         server.listen(...arguments);
    91     }
    92     return app;
    93 }
    94 module.exports = createApplication;

    View Code

    測試模擬實現的Express:

     1 let express = require('./express');
     2 
     3 let app = express();
     4 
     5 app.use('/',function(req,res,next){
     6     console.log("我是一個全局中間件");
     7     next();
     8 });
     9 app.use('/user',function(req,res,next){
    10     console.log("我是user接口的中間件");
    11     next();
    12 });
    13 app.get('/name',function(req,res){
    14     console.log(req.path,req.query,req.hostname);
    15     res.end('zfpx');
    16 });
    17 app.post('/name',function(req,res){
    18     res.end('post name');
    19 });
    20 
    21 
    22 app.all("*",function(req,res){
    23     res.end('all');
    24 });
    25 
    26 app.use(function(err,req,res,next){
    27     console.log(err);
    28     next();
    29 });
    30 
    31 app.listen(12306);

    View Code

    在windows系統下測試請求:

     

     

    關於源碼的構建詳細內容可以參考這個視頻教程:app.use()模擬構建視頻教程,前面就已經說明過這個模式實現僅僅是從表面的業務邏輯,雖然有一點底層的雛形,但與源碼還是相差甚遠,這一部分也僅僅只是想幫助理解Express採用最簡單的方式表現出來。

    1.3如果你看過上面的源碼或自己也實現過,就會發現Express關於中間件的添加方式除了app.use()還有app.all()及app.METHOD()。在模擬源碼中我並未就use和all的差異做處理,都是採用了請求路徑絕對等於path,這種方式是all的特性,use的path實際表示為請求路徑的開頭:

    app.use(path,callback):path表示請求路徑的開頭部分。

    app.all(path,callback):paht表示完全等於請求路徑。

    app.METHOD(path,callback):並不是真的有METHOD這個方法,而是指HTTP請求方法,實際上表示的是app.get()、app.post()、app.put()等方法,而有時候我們會將這些方法說成用來註冊路由,這是因為路由註冊的確使用這些方法,但同時這些方法也是可以用作中間的添加,這在前一篇博客中的功能解析中就有說明(Express4.x之API:express),詳細見過後面的路由解析就會更加明了。

     二、express.router()

    2.1在實例化一個Application時會實例化一個express.router()實例並被添加到app._router屬性上,實際上這個app使用的use、all、METHOD時都是在底層調用了該Router實例上對應的方法,比如看下面這些示例:

     1 let express = require("express");
     2 let app = express();
     3 
     4 app._router.use(function(req,res,next){
     5     console.log("--app.router--");
     6     next();
     7 });
     8 
     9 app._router.post("/csJSON",function(req,res,next){
    10     res.writeHead(200);
    11     res.write(JSON.stringify(req.body));
    12     res.end();
    13 });
    14 
    15 app.listen(12306);

    上面示例中的app._router.use、app._router.post分別同等與app.use、app.post,這裏到這裏也就說明了上一篇博客中的路由與Application的關係Express4.x之API:express。

    2.2Express中的Router除了為Express.Application提供路由功能以外,Express也將它作為一個獨立的路由工具分離了出來,也就是說Router自身可以獨立作為一個應用,如果我們在實際應用中有相關業務有類似Express.Application的路由需求,可以直接實例化一個Router使用,應用的方式如下:

    let express = require('/express');
    let router = express.Router();
    //這部分可以詳細參考官方文檔有詳細的介紹

    2.3由於這篇博客主要是分析Express的中間件及路由的底層邏輯,所以就不在這裏詳細介紹某個模塊的應用,如果有時間我再寫一篇關於Router模塊的應用,這裏我直接上一份模擬Express路由的代碼以供參考:

    文件結構:

    express //根路徑
        index.js //express主入口文件
        application.js //express應用構造模塊
        router //路由路徑
            index.js //路由主入口文件
            layer.js //構造層的模塊
            route.js //子路由模塊

    Express路由系統的邏輯結構圖:

    模擬代碼(express核心源碼模擬):

    1 //express主入口文件
    2 let Application = require('./application.js');
    3 
    4 function createApplication(){
    5     return new Application();
    6 }
    7 
    8 module.exports = createApplication;

    index.js //express主入口文件

     1 //用來創建應用app
     2 let http = require('http');
     3 let url = require('url');
     4 
     5 //導入路由系統模塊
     6 let Router = require('./router');
     7 
     8 const methods = http.METHODS;
     9 
    10 //Application ---- express的應用系統
    11 function Application(){
    12     //創建一個內置的路由系統
    13     this._router = new Router();
    14 }
    15 
    16 //app.get ---- 實現路由註冊業務
    17 // Application.prototype.get = function(path,...handlers){
    18 //     this._router.get(path,'use',handlers);
    19 // }
    20 
    21 methods.forEach(method => {
    22     method = method.toLocaleLowerCase();
    23     Application.prototype[method] = function(path,...handlers){
    24         this._router[method](path,handlers);
    25     }
    26 });
    27 
    28 //app.use ---- 實現中間件註冊業務
    29 //這裏模擬處理三種參數模式:
    30 // -- 1個回調函數:callback
    31 // -- 多個回調函數:[callback,] callback [,callback...]
    32 // -- 指定路由的中間件:[path,] callback [,callback...]
    33 // -- 注意源碼中可以處理這三種參數形式還可以處理上面數據的數組形式,以及其他Application(直接將其他app上的中間件添加到當前應用上)
    34 Application.prototype.use = function(fn){
    35     let path = '/';
    36     let fns = [];
    37     let arg = [].slice.call(arguments);
    38     if(typeof fn !== 'function' && arg.length >= 2){
    39         if(typeof arg[0] !== 'string'){
    40             fns = arg;
    41         }else{
    42             path = arg[0];
    43             fns = arg.slice(1);
    44         }
    45     }else{
    46         fns = arg;
    47     }
    48     this._router.use(path,'use',fns);
    49 }
    50 
    51 Application.prototype.all = function(fn){
    52     let path = '/';
    53     let fns = [];
    54     let arg = [].slice.call(arguments);
    55     if(typeof fn !== 'function' && arg.length >= 2){
    56         if(typeof arg[0] !== 'string'){
    57             fns = arg;
    58         }else{
    59             path = arg[0];
    60             fns = arg.slice(1);
    61         }
    62       }else{
    63         fns = arg;
    64     }
    65     this._router.use(path,'all',fns);
    66 }
    67 
    68 //將http的listen方法封裝到Application的原型上
    69 Application.prototype.listen = function(){
    70     let server = http.createServer((req,res)=>{
    71         //done 用於當路由無任何可匹配項時調用的處理函數
    72         function done(){
    73             res.end(`Cannot ${req.url} ${req.method}`);
    74         }
    75         this._router.handle(req,res,done); //調用路由系統的handle方法處理請求
    76     });
    77     server.listen(...arguments);
    78 };
    79 
    80 module.exports = Application;

    application.js //express應用構造模塊

     1 //express路由系統
     2 const Layer = require('./layer.js');
     3 const Route = require('./route.js');
     4 
     5 const http = require('http');
     6 const methods = http.METHODS;
     7 
     8 const url = require('url');
     9 
    10 
    11 //路由對象構造函數
    12 function Router(){
    13     this.stack = [];
    14 }
    15 
    16 //router.route ---- 用於創建子路由對象route與主路由上層(layer)的關係
    17 //並將主路由上的層緩存到路由對象的stack容器中,該層建立路徑與子路由處理請求的關係
    18 Router.prototype.route = function(path){
    19     let route = new Route();
    20     let layer = new Layer(path,route.dispatch.bind(route));
    21     this.stack.push(layer);
    22     return route;
    23 }
    24 
    25 //router.get ---- 實現路由註冊
    26 //實際上這個方法調用router.route方法分別創建一個主路由系統層、一個子路由系統,並建立兩者之間的關係,詳細見Router.prototype.route
    27 //然後獲取子路由系統對象,並將回調函數和請求方法註冊在這個子路由系統上
    28 
    29 
    30 // Router.prototype.get = function(path,handlers){
    31 //     let route = this.route(path);
    32 //     route.get(handlers);
    33 // }
    34 
    35 methods.forEach(method =>{ 
    36     method = method.toLocaleLowerCase();
    37     //注意下面這個方法會出現內存泄漏問題,有待改進
    38     Router.prototype[method] = function(path, handlers){
    39         let route = this.route(path);
    40         route[method](handlers);
    41     }
    42 });
    43 
    44 //router.use ---- 實現中間件註冊(按照路由開頭的路徑匹配,即相對路由匹配)
    45 Router.prototype.use = function(path,routerType,fns){
    46     let router = this;
    47     fns.forEach(function(fn){
    48         let layer = new Layer(path,fn);
    49         layer.middle = true; //標記這個層為相對路由中間件
    50         layer.routerType = routerType;
    51         router.stack.push(layer);
    52     });
    53 }
    54 
    55 //調用路由處理請求
    56 Router.prototype.handle = function(req,res,out){
    57     let {pathname} = url.parse(req.url);
    58     let index = 0;
    59     let next = () => {
    60         if(index >= this.stack.length) return out();
    61         let layer = this.stack[index++];
    62         if(layer.middle && (layer.path === '/' || pathname === layer.path || pathname.startsWith(layer.path + '/'))){
    63             //處理中間件
    64             if(layer.routerType === 'use'){
    65                 layer.handle_request(req,res,next);
    66             }else if(layer.routerType === 'all' && layer.path === pathname){
    67                 layer.handle_request(req,res,next);
    68             }else{
    69                 next();
    70             }
    71         }else if(layer.match(pathname)){
    72             //處理響應--更準確的說是處理具體請求方法上的中間件或響應
    73             layer.handle_request(req,res,next);
    74         }else{
    75             next();
    76         }
    77     }
    78     next();
    79 }
    80 
    81 module.exports = Router;

    index.js //路由主入口文件

     1 //Layer的構造函數
     2 function Layer(path,handler){
     3     this.path = path; //當前層的路徑
     4     this.handler = handler;  //當前層的回調函數
     5 }
     6 
     7 //判斷請求方法與當前層的方法是否一致
     8 Layer.prototype.match = function(pathname){
     9     return this.path === pathname;
    10 }
    11 
    12 //調用當前層的回調函數handler
    13 Layer.prototype.handle_request = function(req,res,next){
    14     this.handler(req,res,next);
    15 }
    16 
    17 module.exports = Layer;

    layer.js //構造層的模塊

     1 //Layer的構造函數
     2 function Layer(path,handler){
     3     this.path = path; //當前層的路徑
     4     this.handler = handler;  //當前層的回調函數
     5 }
     6 
     7 //判斷請求方法與當前層的方法是否一致
     8 Layer.prototype.match = function(pathname){
     9     return this.path === pathname;
    10 }
    11 
    12 //調用當前層的回調函數handler
    13 Layer.prototype.handle_request = function(req,res,next){
    14     this.handler(req,res,next);
    15 }
    16 
    17 module.exports = Layer;

    route.js //子路由模塊

    測試代碼:

     1 let express = require('./express');
     2 let app = express();
     3 
     4 
     5 app.use('/name',function(req,res,next){
     6     console.log('use1');
     7     next();
     8 });
     9 app.use(function(req,res,next){
    10     console.log('use2-1');
    11     next();
    12 },function(req,res,next){
    13     console.log('use2-2');
    14     next();
    15 });
    16 
    17 app.all('/name',function(req,res,next){
    18     console.log('all-1');
    19     next();
    20 });
    21 app.all('/name/app',function(req,res,next){
    22     console.log('all-2');
    23     next();
    24 });
    25 
    26 app.get('/name/app',function(req,res){
    27     res.end(req.url);
    28 });
    29 // console.log(app._router.stack);
    30 app.listen(12306);

    View Code

    測試結果:

     

     

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

  • 線上服務的FGC問題排查,看這篇就夠了!

    線上服務的FGC問題排查,看這篇就夠了!

    線上服務的GC問題,是Java程序非常典型的一類問題,非常考驗工程師排查問題的能力。同時,幾乎是面試必考題,但是能真正答好此題的人並不多,要麼原理沒吃透,要麼缺乏實戰經驗。

    過去半年時間里,我們的廣告系統出現了多次和GC相關的線上問題,有Full GC過於頻繁的,有Young GC耗時過長的,這些問題帶來的影響是:GC過程中的程序卡頓,進一步導致服務超時從而影響到廣告收入。

    這篇文章,我將以一個FGC頻繁的線上案例作為引子,詳細介紹下GC的排查過程,另外會結合GC的運行原理給出一份實踐指南,希望對你有所幫助。內容分成以下3個部分:

    1、從一次FGC頻繁的線上案例說起

    2、GC的運行原理介紹

    3、排查FGC問題的實踐指南

    01 從一次FGC頻繁的線上案例說起

    去年10月份,我們的廣告召回系統在程序上線后收到了FGC頻繁的系統告警,通過下面的監控圖可以看到:平均每35分鐘就進行了一次FGC。而程序上線前,我們的FGC頻次大概是2天一次。下面,詳細介紹下該問題的排查過程。

    1. 檢查JVM配置

    通過以下命令查看JVM的啟動參數:
    ps aux | grep “applicationName=adsearch”

    -Xms4g -Xmx4g -Xmn2g -Xss1024K
    -XX:ParallelGCThreads=5
    -XX:+UseConcMarkSweepGC
    -XX:+UseParNewGC
    -XX:+UseCMSCompactAtFullCollection
    -XX:CMSInitiatingOccupancyFraction=80

    可以看到堆內存為4G,新生代為2G,老年代也為2G,新生代採用ParNew收集器,老年代採用併發標記清除的CMS收集器,當老年代的內存佔用率達到80%時會進行FGC。

    進一步通過 jmap -heap 7276 | head -n20 可以得知新生代的Eden區為1.6G,S0和S1區均為0.2G。

    2. 觀察老年代的內存變化

    通過觀察老年代的使用情況,可以看到:每次FGC后,內存都能回到500M左右,因此我們排除了內存泄漏的情況。

    3. 通過jmap命令查看堆內存中的對象

    通過命令 jmap -histo 7276 | head -n20

    上圖中,按照對象所佔內存大小排序,显示了存活對象的實例數、所佔內存、類名。可以看到排名第一的是:int[],而且所佔內存大小遠遠超過其他存活對象。至此,我們將懷疑目標鎖定在了 int[] .

    4. 進一步dump堆內存文件進行分析

    鎖定 int[] 后,我們打算dump堆內存文件,通過可視化工具進一步跟蹤對象的來源。考慮堆轉儲過程中會暫停程序,因此我們先從服務管理平台摘掉了此節點,然後通過以下命令dump堆內存:

    jmap -dump:format=b,file=heap 7276

    通過JVisualVM工具導入dump出來的堆內存文件,同樣可以看到各個對象所佔空間,其中int[]佔到了50%以上的內存,進一步往下便可以找到 int[] 所屬的業務對象,發現它來自於架構團隊提供的codis基礎組件。

    5. 通過代碼分析可疑對象

    通過代碼分析,codis基礎組件每分鐘會生成約40M大小的int數組,用於統計TP99 和 TP90,數組的生命周期是一分鐘。而根據第2步觀察老年代的內存變化時,發現老年代的內存基本上也是每分鐘增加40多M,因此推斷:這40M的int數組應該是從新生代晉陞到老年代。

    我們進一步查看了YGC的頻次監控,通過下圖可以看到大概1分鐘有8次左右的YGC,這樣基本驗證了我們的推斷:因為CMS收集器默認的分代年齡是6次,即YGC 6次后還存活的對象就會晉陞到老年代,而codis組件中的大數組生命周期是1分鐘,剛好滿足這個要求。

    至此,整個排查過程基本結束了,那為什麼程序上線前沒出現此問題呢?通過上圖可以看到:程序上線前YGC的頻次在5次左右,此次上線后YGC頻次變成了8次左右,從而引發了此問題。

    6. 解決方案

    為了快速解決問題,我們將CMS收集器的分代年齡改成了15次,改完后FGC頻次恢復到了2天一次,後續如果YGC的頻次超過每分鐘15次還會再次觸發此問題。當然,我們最根本的解決方案是:優化程序以降低YGC的頻率,同時縮短codis組件中int數組的生命周期,這裏就不做展開了。

    02 GC的運行原理介紹

    上面整個案例的分析過程中,其實涉及到很多GC的原理知識,如果不懂得這些原理就着手處理,其實整個排查過程是很抓瞎的。

    這裏,我選擇幾個最核心的知識點,展開介紹下GC的運行原理,最後再給出一份實踐指南。

    1. 堆內存結構

    大家都知道: GC分為YGC和FGC,它們均發生在JVM的堆內存上。先來看下JDK8的堆內存結構:

    可以看到,堆內存採用了分代結構,包括新生代和老年代。新生代又分為:Eden區,From Survivor區(簡稱S0),To Survivor區(簡稱S1區),三者的默認比例為8:1:1。另外,新生代和老年代的默認比例為1:2。

    堆內存之所以採用分代結構,是考慮到絕大部分對象都是短生命周期的,這樣不同生命周期的對象可放在不同的區域中,然後針對新生代和老年代採用不同的垃圾回收算法,從而使得GC效率最高。

    2. YGC是什麼時候觸發的?

    大多數情況下,對象直接在年輕代中的Eden區進行分配,如果Eden區域沒有足夠的空間,那麼就會觸發YGC(Minor GC),YGC處理的區域只有新生代。因為大部分對象在短時間內都是可收回掉的,因此YGC后只有極少數的對象能存活下來,而被移動到S0區(採用的是複製算法)。

    當觸發下一次YGC時,會將Eden區和S0區的存活對象移動到S1區,同時清空Eden區和S0區。當再次觸發YGC時,這時候處理的區域就變成了Eden區和S1區(即S0和S1進行角色交換)。每經過一次YGC,存活對象的年齡就會加1。

    3. FGC又是什麼時候觸發的?

    下面4種情況,對象會進入到老年代中:

    1、YGC時,To Survivor區不足以存放存活的對象,對象會直接進入到老年代。

    2、經過多次YGC后,如果存活對象的年齡達到了設定閾值,則會晉陞到老年代中。

    3、動態年齡判定規則,To Survivor區中相同年齡的對象,如果其大小之和佔到了 To Survivor區一半以上的空間,那麼大於此年齡的對象會直接進入老年代,而不需要達到默認的分代年齡。

    4、大對象:由-XX:PretenureSizeThreshold啟動參數控制,若對象大小大於此值,就會繞過新生代, 直接在老年代中分配。

    當晉陞到老年代的對象大於了老年代的剩餘空間時,就會觸發FGC(Major GC),FGC處理的區域同時包括新生代和老年代。除此之外,還有以下4種情況也會觸發FGC:

    1、老年代的內存使用率達到了一定閾值(可通過參數調整),直接觸發FGC。

    2、空間分配擔保:在YGC之前,會先檢查老年代最大可用的連續空間是否大於新生代所有對象的總空間。如果小於,說明YGC是不安全的,則會查看參數 HandlePromotionFailure 是否被設置成了允許擔保失敗,如果不允許則直接觸發Full GC;如果允許,那麼會進一步檢查老年代最大可用的連續空間是否大於歷次晉陞到老年代對象的平均大小,如果小於也會觸發 Full GC。

    3、Metaspace(元空間)在空間不足時會進行擴容,當擴容到了-XX:MetaspaceSize 參數的指定值時,也會觸發FGC。

    4、System.gc() 或者Runtime.gc() 被顯式調用時,觸發FGC。

    4. 在什麼情況下,GC會對程序產生影響?

    不管YGC還是FGC,都會造成一定程度的程序卡頓(即Stop The World問題:GC線程開始工作,其他工作線程被掛起),即使採用ParNew、CMS或者G1這些更先進的垃圾回收算法,也只是在減少卡頓時間,而並不能完全消除卡頓。

    那到底什麼情況下,GC會對程序產生影響呢?根據嚴重程度從高到底,我認為包括以下4種情況:

    1、FGC過於頻繁:FGC通常是比較慢的,少則幾百毫秒,多則幾秒,正常情況FGC每隔幾個小時甚至幾天才執行一次,對系統的影響還能接受。但是,一旦出現FGC頻繁(比如幾十分鐘就會執行一次),這種肯定是存在問題的,它會導致工作線程頻繁被停止,讓系統看起來一直有卡頓現象,也會使得程序的整體性能變差。

    2、YGC耗時過長:一般來說,YGC的總耗時在幾十或者上百毫秒是比較正常的,雖然會引起系統卡頓幾毫秒或者幾十毫秒,這種情況幾乎對用戶無感知,對程序的影響可以忽略不計。但是如果YGC耗時達到了1秒甚至幾秒(都快趕上FGC的耗時了),那卡頓時間就會增大,加上YGC本身比較頻繁,就會導致比較多的服務超時問題。

    3、FGC耗時過長:FGC耗時增加,卡頓時間也會隨之增加,尤其對於高併發服務,可能導致FGC期間比較多的超時問題,可用性降低,這種也需要關注。

    4、YGC過於頻繁:即使YGC不會引起服務超時,但是YGC過於頻繁也會降低服務的整體性能,對於高併發服務也是需要關注的。

    其中,「FGC過於頻繁」和「YGC耗時過長」,這兩種情況屬於比較典型的GC問題,大概率會對程序的服務質量產生影響。剩餘兩種情況的嚴重程度低一些,但是對於高併發或者高可用的程序也需要關注。

    03 排查FGC問題的實踐指南

    通過上面的案例分析以及理論介紹,再總結下FGC問題的排查思路,作為一份實踐指南供大家參考。

    1. 清楚從程序角度,有哪些原因導致FGC?

    1、大對象:系統一次性加載了過多數據到內存中(比如SQL查詢未做分頁),導致大對象進入了老年代。

    2、內存泄漏:頻繁創建了大量對象,但是無法被回收(比如IO對象使用完后未調用close方法釋放資源),先引發FGC,最後導致OOM.

    3、程序頻繁生成一些長生命周期的對象,當這些對象的存活年齡超過分代年齡時便會進入老年代,最後引發FGC. (即本文中的案例)

    4、程序BUG導致動態生成了很多新類,使得 Metaspace 不斷被佔用,先引發FGC,最後導致OOM.

    5、代碼中顯式調用了gc方法,包括自己的代碼甚至框架中的代碼。

    6、JVM參數設置問題:包括總內存大小、新生代和老年代的大小、Eden區和S區的大小、元空間大小、垃圾回收算法等等。

    2. 清楚排查問題時能使用哪些工具

    1、公司的監控系統:大部分公司都會有,可全方位監控JVM的各項指標。

    2、JDK的自帶工具,包括jmap、jstat等常用命令:

    查看堆內存各區域的使用率以及GC情況
    jstat -gcutil -h20 pid 1000

    查看堆內存中的存活對象,並按空間排序
    jmap -histo pid | head -n20

    dump堆內存文件
    jmap -dump:format=b,file=heap pid

    3、可視化的堆內存分析工具:JVisualVM、MAT等

    3. 排查指南

    1、查看監控,以了解出現問題的時間點以及當前FGC的頻率(可對比正常情況看頻率是否正常)

    2、了解該時間點之前有沒有程序上線、基礎組件升級等情況。

    3、了解JVM的參數設置,包括:堆空間各個區域的大小設置,新生代和老年代分別採用了哪些垃圾收集器,然後分析JVM參數設置是否合理。

    4、再對步驟1中列出的可能原因做排除法,其中元空間被打滿、內存泄漏、代碼顯式調用gc方法比較容易排查。

    5、針對大對象或者長生命周期對象導致的FGC,可通過 jmap -histo 命令並結合dump堆內存文件作進一步分析,需要先定位到可疑對象。

    6、通過可疑對象定位到具體代碼再次分析,這時候要結合GC原理和JVM參數設置,弄清楚可疑對象是否滿足了進入到老年代的條件才能下結論。

    04 最後的話

    這篇文章通過線上案例並結合GC原理詳細介紹了FGC的排查過程,同時給出了一份實踐指南。

    後續會以類似的方式,再分享一個YGC耗時過長的案例,希望能幫助大家吃透GC問題排查,如果覺得本文對你有幫助,請大家關注我的個人公眾號!

    – End –

    作者簡介:程序員,985碩士,前亞馬遜Java工程師,現58轉轉技術總監。持續分享技術和管理方向的文章。如果感興趣,可微信掃描下面的二維碼關注我的公眾號:『IT人的職場進階』

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※帶您來了解什麼是 USB CONNECTOR  ?

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    ※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

    ※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※教你寫出一流的銷售文案?

  • Dart Memo for Android Developers

    Dart Memo for Android Developers

    Dart語言一些語法特點和編程規範.

    本文適合: 日常使用Kotlin, 突然想寫個Flutter程序的Android程序員.

    Dart語言

    完整的請看A tour of the Dart language

    • 創建對象可以不用new. -> 並且規範不讓用new, lint會報錯.
    • 聲明變量可以用var, 也可以用具體類型如String. 不變量用final, 常量用const.
    • 沒有訪問修飾符, 用_來表示私有: 文件級別.
    • 字符串可以用單引號'.
    • 語句結尾要用;.
    • 創建數組可以用: var list = [1, 2, 3];.
    • assert()常用來斷定開發時不可能會出現的情況.
    • 空測試操作符: ??.
    • 過濾操作符: where.
    • 兩個點..表示鏈式調用.
    • dynamic說明類型未指定.
    • 除了throw異常, 還可以throw別的東西, 比如字符串.

    函數

    • 函數返回值在函數最開頭, 可以不標. -> 但是規範會建議標註返回值.
    bool isNoble(int atomicNumber) {
      return _nobleGases[atomicNumber] != null;
    }
    
    • =>箭頭符號, 用來簡化一句話的方法.
    bool isNoble(int atomicNumber) => _nobleGases[atomicNumber] != null;
    

    構造函數

    • 構造函數{}表示帶名字, 參數可選, 若要必選加上@required.
    const Scrollbar({Key key, @required Widget child})
    
    • 構造函數名可以是ClassName或者ClassName.identifier.
    • 空構造函數體可以省略, 用;結尾就行:
    class Point {
      double x, y;
      Point(this.x, this.y);
    }
    

    這裡會初始化相應的變量, 也不用聲明具體的參數類型.

    • factory構造, 可以用來返回緩存實例, 或者返回類型的子類:
    factory Logger(String name) {
        return _cache.putIfAbsent(name, () => Logger._internal(name));
    }
    

    異步代碼

    Future<String> lookUpVersion() async => '1.0.0';
    
    Future checkVersion() async {
      var version = await lookUpVersion();
      // Do something with version
    }
    

    編程規範類

    完整的規範在這裏: Effective Dart.

    有一些Good和Bad的舉例, 這裏僅列出比較常用的幾項.

    • 文件名要蛇形命名: lowercase_with_underscores. 類名: UpperCamelCase.
    • 對自己程序的文件, 兩種import都可以(package開頭或者相對路徑), 但是要保持一致.
    • Flutter程序嵌套比較多, 要用結尾的,來幫助格式化.

    本文緣由

    年初的時候學了一陣子Flutter, 寫了各種大小demo. 結果隔了兩個月之後, 突然心血來潮想寫個小東西, 打開Android Studio, 首先發現創建Flutter程序的按鈕都不見了. (估計是Android Studio4.0升級之後Flutter的插件沒跟上).

    接着用命令行創建了工程, 打開之後稍微整理了一下心情, 然後就….懵逼了.

    突然不知道如何下手.
    宏觀的東西還記得, 要用什麼package, 基本常用的幾個Widget都是啥, 但是微觀的, 忘了函數和數組都是咋定義的了.
    這種懵逼的狀態令我很憤怒, 果然是上年紀了嗎, 無縫切換個語言都不行.

    於是就想着還是寫個備忘錄吧.

    References

    • A tour of the Dart language
    • Effective Dart

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

  • 曹工說Redis源碼(8)–面試時,redis 內存淘汰總被問,但是總答不好

    曹工說Redis源碼(8)–面試時,redis 內存淘汰總被問,但是總答不好

    文章導航

    Redis源碼系列的初衷,是幫助我們更好地理解Redis,更懂Redis,而怎麼才能懂,光看是不夠的,建議跟着下面的這一篇,把環境搭建起來,後續可以自己閱讀源碼,或者跟着我這邊一起閱讀。由於我用c也是好幾年以前了,些許錯誤在所難免,希望讀者能不吝指出。

    曹工說Redis源碼(1)– redis debug環境搭建,使用clion,達到和調試java一樣的效果

    曹工說Redis源碼(2)– redis server 啟動過程解析及簡單c語言基礎知識補充

    曹工說Redis源碼(3)– redis server 啟動過程完整解析(中)

    曹工說Redis源碼(4)– 通過redis server源碼來理解 listen 函數中的 backlog 參數

    曹工說Redis源碼(5)– redis server 啟動過程解析,以及EventLoop每次處理事件前的前置工作解析(下)

    曹工說Redis源碼(6)– redis server 主循環大體流程解析

    曹工說Redis源碼(7)– redis server 的周期執行任務,到底要做些啥

    什麼是內存淘汰

    內存淘汰,和平時我們設置redis key的過期時間,不是一回事;內存淘汰是說,假設我們限定redis只能使用8g內存,現在已經使用了這麼多了(包括設置了過期時間的key和沒設過期時間的key),那,後續的set操作,還怎麼辦呢?

    是不是只能報錯了?

    那不行啊,不科學吧,因為有的key,可能已經很久沒人用了,可能以後也不會再用到了,那我們是不是可以把這類key給幹掉呢?

    幹掉key的過程,就是內存淘汰。

    內存淘汰什麼時候啟用

    當我們在配置文件里設置了如下屬性時:

    # maxmemory <bytes>
    

    默認,該屬性是被註釋掉的。

    其實,這個配置項的註釋,相當有價值,我們來看看:

    # Don't use more memory than the specified amount of bytes.
    # When the memory limit is reached Redis will try to remove keys
    # according to the eviction policy selected (see maxmemory-policy).
    #
    # If Redis can't remove keys according to the policy, or if the policy is
    # set to 'noeviction', Redis will start to reply with errors to commands
    # that would use more memory, like SET, LPUSH, and so on, and will continue
    # to reply to read-only commands like GET.
    #
    # This option is usually useful when using Redis as an LRU cache, or to set
    # a hard memory limit for an instance (using the 'noeviction' policy).
    #
    # WARNING: If you have slaves attached to an instance with maxmemory on,
    # the size of the output buffers needed to feed the slaves are subtracted
    # from the used memory count, so that network problems / resyncs will
    # not trigger a loop where keys are evicted, and in turn the output
    # buffer of slaves is full with DELs of keys evicted triggering the deletion
    # of more keys, and so forth until the database is completely emptied.
    #
    # In short... if you have slaves attached it is suggested that you set a lower
    # limit for maxmemory so that there is some free RAM on the system for slave
    # output buffers (but this is not needed if the policy is 'noeviction').
    #
    # maxmemory <bytes>
    

    渣翻譯如下:

    不能使用超過指定數量bytes的內存。當該內存限制被達到時,redis會根據過期策略(eviction policy,通過參數 maxmemory-policy來指定)來驅逐key。

    如果redis根據指定的策略,或者策略被設置為“noeviction”,redis會開始針對如下這種命令,回復錯誤。什麼命令呢?會使用更多內存的那類命令,比如set、lpush;只讀命令還是不受影響,可以正常響應。

    該選項通常在redis使用LRU緩存時有用,或者在使用noeviction策略時,設置一個進程級別的內存limit。

    內存淘汰策略

    所謂策略,意思是,當我們要刪除部分key的時候,刪哪些,不刪哪些?是不是需要一個策略?比如是隨機刪,就像滅霸一樣?還是按照lru時間來刪,lru的策略意思就是,最近最少使用的key,將被優先刪除。

    總之,我們需要定一個規則。

    redis默認支持以下策略:

    # MAXMEMORY POLICY: how Redis will select what to remove when maxmemory
    # is reached. You can select among five behaviors:
    # 
    # volatile-lru -> remove the key with an expire set using an LRU algorithm
    # allkeys-lru -> remove any key accordingly to the LRU algorithm
    # volatile-random -> remove a random key with an expire set
    # allkeys-random -> remove a random key, any key
    # volatile-ttl -> remove the key with the nearest expire time (minor TTL)
    # noeviction -> don't expire at all, just return an error on write operations
    # 
    # Note: with any of the above policies, Redis will return an error on write
    #       operations, when there are not suitable keys for eviction.
    #
    #       At the date of writing this commands are: set setnx setex append
    #       incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd
    #       sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby
    #       zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby
    #       getset mset msetnx exec sort
    #
    # The default is:
    #
    # maxmemory-policy noeviction
    maxmemory-policy allkeys-lru
    
    針對設置了過期時間的,使用lru算法
    # volatile-lru -> remove the key with an expire set using an LRU algorithm
    
    針對全部key,使用lru算法
    # allkeys-lru -> remove any key accordingly to the LRU algorithm
    
    針對設置了過期時間的,隨機刪
    # volatile-random -> remove a random key with an expire set
    
    針對全部key,隨機刪
    # allkeys-random -> remove a random key, any key
    
    針對設置了過期時間的,馬上要過期的,刪掉
    # volatile-ttl -> remove the key with the nearest expire time (minor TTL)
    
    不過期,不能寫了,就報錯
    # noeviction -> don't expire at all, just return an error on write operations
    

    一般呢,我們會設置為:

    allkeys-lru,即,針對全部key,進行lru。

    源碼實現

    配置讀取

    在如下結構體中,定義了如下字段:

    struct redisServer {
    	...
    	unsigned long long maxmemory;   /* Max number of memory bytes to use */
        int maxmemory_policy;           /* Policy for key eviction */
        int maxmemory_samples;          /* Pricision of random sampling */
        ...
    }
    

    當我們在配置文件中,進入如下配置時,該結構體中幾個字段的值如下:

    maxmemory 3mb
    maxmemory-policy allkeys-lru
    # maxmemory-samples 5  這個取了默認值
    

    maxmemory_policy為3,是因為枚舉值為3:

    #define REDIS_MAXMEMORY_VOLATILE_LRU 0
    #define REDIS_MAXMEMORY_VOLATILE_TTL 1
    #define REDIS_MAXMEMORY_VOLATILE_RANDOM 2
    #define REDIS_MAXMEMORY_ALLKEYS_LRU 3
    #define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4
    #define REDIS_MAXMEMORY_NO_EVICTION 5
    #define REDIS_DEFAULT_MAXMEMORY_POLICY REDIS_MAXMEMORY_NO_EVICTION
    

    處理命令時,判斷是否進行內存淘汰

    在處理命令的時候,會調用中的

    redis.c  processCommand
        
    int processCommand(redisClient *c) {
        /* The QUIT command is handled separately. Normal command procs will
         * go through checking for replication and QUIT will cause trouble
         * when FORCE_REPLICATION is enabled and would be implemented in
         * a regular command proc. */
        // 特別處理 quit 命令
        void *commandName = c->argv[0]->ptr;
        redisLog(REDIS_NOTICE, "The server is now processing %s", commandName);
    
        if (!strcasecmp(c->argv[0]->ptr, "quit")) {
            addReply(c, shared.ok);
            c->flags |= REDIS_CLOSE_AFTER_REPLY;
            return REDIS_ERR;
        }
    
        /* Now lookup the command and check ASAP about trivial error conditions
         * such as wrong arity, bad command name and so forth. */
        // 1 查找命令,並進行命令合法性檢查,以及命令參數個數檢查
        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
        if (!c->cmd) {
            // 沒找到指定的命令
            flagTransaction(c);
            addReplyErrorFormat(c, "unknown command '%s'",
                                (char *) c->argv[0]->ptr);
            return REDIS_OK;
        }
    
        /* Check if the user is authenticated */
        //2 檢查認證信息
        if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) {
            flagTransaction(c);
            addReply(c, shared.noautherr);
            return REDIS_OK;
        }
    
        /* If cluster is enabled perform the cluster redirection here.
         *
         * 3 如果開啟了集群模式,那麼在這裏進行轉向操作。
         *
         * However we don't perform the redirection if:
         *
         * 不過,如果有以下情況出現,那麼節點不進行轉向:
         *
         * 1) The sender of this command is our master.
         *    命令的發送者是本節點的主節點
         *
         * 2) The command has no key arguments. 
         *    命令沒有 key 參數
         */
        if (server.cluster_enabled &&
            !(c->flags & REDIS_MASTER) &&
            !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) {
            int hashslot;
    
            // 集群已下線
            if (server.cluster->state != REDIS_CLUSTER_OK) {
                flagTransaction(c);
                addReplySds(c, sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
                return REDIS_OK;
    
                // 集群運作正常
            } else {
                int error_code;
                clusterNode *n = getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code);
                // 不能執行多鍵處理命令
                if (n == NULL) {
                    flagTransaction(c);
                    if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
                        addReplySds(c, sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
                    } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
                        /* The request spawns mutliple keys in the same slot,
                         * but the slot is not "stable" currently as there is
                         * a migration or import in progress. */
                        addReplySds(c, sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
                    } else {
                        redisPanic("getNodeByQuery() unknown error.");
                    }
                    return REDIS_OK;
    
                    //3.1 命令針對的槽和鍵不是本節點處理的,進行轉向
                } else if (n != server.cluster->myself) {
                    flagTransaction(c);
                    // -<ASK or MOVED> <slot> <ip>:<port>
                    // 例如 -ASK 10086 127.0.0.1:12345
                    addReplySds(c, sdscatprintf(sdsempty(),
                                                "-%s %d %s:%d\r\n",
                                                (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
                                                hashslot, n->ip, n->port));
    
                    return REDIS_OK;
                }
    
                // 如果執行到這裏,說明鍵 key 所在的槽由本節點處理
                // 或者客戶端執行的是無參數命令
            }
        }
    
        /* Handle the maxmemory directive.
         *
         * First we try to free some memory if possible (if there are volatile
         * keys in the dataset). If there are not the only thing we can do
         * is returning an error. */
        //4 如果設置了最大內存,那麼檢查內存是否超過限制,並做相應的操作
        if (server.maxmemory) {
            //4.1 如果內存已超過限制,那麼嘗試通過刪除過期鍵來釋放內存
            int retval = freeMemoryIfNeeded();
            // 如果即將要執行的命令可能佔用大量內存(REDIS_CMD_DENYOOM)
            // 並且前面的內存釋放失敗的話
            // 那麼向客戶端返回內存錯誤
            if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
                flagTransaction(c);
                addReply(c, shared.oomerr);
                return REDIS_OK;
            }
        }    
        ....
    
    • 1處,查找命令,對應的函數指針(類似於java里的策略模式,根據命令,找對應的策略)
    • 2處,檢查,是否密碼正確
    • 3處,集群相關操作;
    • 3.1處,不是本節點處理,直接返回ask,指示客戶端轉向
    • 4處,判斷是否設置了maxMemory,這裏就是本文重點:設置了maxMemory時,內存淘汰策略
    • 4.1處,調用了下方的 freeMemoryIfNeeded

    接下來,深入4.1處:

    
    int freeMemoryIfNeeded(void) {
        size_t mem_used, mem_tofree, mem_freed;
        int slaves = listLength(server.slaves);
    
        /* Remove the size of slaves output buffers and AOF buffer from the
         * count of used memory. */
        // 計算出 Redis 目前佔用的內存總數,但有兩個方面的內存不會計算在內:
        // 1)從服務器的輸出緩衝區的內存
        // 2)AOF 緩衝區的內存
        mem_used = zmalloc_used_memory();
        if (slaves) {
    		...
        }
        if (server.aof_state != REDIS_AOF_OFF) {
            mem_used -= sdslen(server.aof_buf);
            mem_used -= aofRewriteBufferSize();
        }
    
        /* Check if we are over the memory limit. */
        //1 如果目前使用的內存大小比設置的 maxmemory 要小,那麼無須執行進一步操作
        if (mem_used <= server.maxmemory) return REDIS_OK;
    
        //2 如果佔用內存比 maxmemory 要大,但是 maxmemory 策略為不淘汰,那麼直接返回
        if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION)
            return REDIS_ERR; /* We need to free memory, but policy forbids. */
    
        /* Compute how much memory we need to free. */
        // 3 計算需要釋放多少字節的內存
        mem_tofree = mem_used - server.maxmemory;
    
        // 初始化已釋放內存的字節數為 0
        mem_freed = 0;
    
        // 根據 maxmemory 策略,
        //4 遍歷字典,釋放內存並記錄被釋放內存的字節數
        while (mem_freed < mem_tofree) {
            int j, k, keys_freed = 0;
    
            // 遍歷所有字典
            for (j = 0; j < server.dbnum; j++) {
                long bestval = 0; /* just to prevent warning */
                sds bestkey = NULL;
                dictEntry *de;
                redisDb *db = server.db + j;
                dict *dict;
    
                if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
                    server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM) {
                    // 如果策略是 allkeys-lru 或者 allkeys-random 
                    //5 那麼淘汰的目標為所有數據庫鍵
                    dict = server.db[j].dict;
                } else {
                    // 如果策略是 volatile-lru 、 volatile-random 或者 volatile-ttl 
                    //6 那麼淘汰的目標為帶過期時間的數據庫鍵
                    dict = server.db[j].expires;
                }
    
    
                /* volatile-random and allkeys-random policy */
                // 如果使用的是隨機策略,那麼從目標字典中隨機選出鍵
                if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM ||
                    server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_RANDOM) {
                    de = dictGetRandomKey(dict);
                    bestkey = dictGetKey(de);
                }
                /* volatile-lru and allkeys-lru policy */
                //7 
                else if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
                         server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU) {
                    struct evictionPoolEntry *pool = db->eviction_pool;
    
                    while (bestkey == NULL) {
                        // 8 
                        evictionPoolPopulate(dict, db->dict, db->eviction_pool);
                        /* Go backward from best to worst element to evict. */
                        for (k = REDIS_EVICTION_POOL_SIZE - 1; k >= 0; k--) {
                            if (pool[k].key == NULL) continue;
                            // 8.1
                            de = dictFind(dict, pool[k].key);
    
                            /* 8.2 Remove the entry from the pool. */
                            sdsfree(pool[k].key);
                            /* Shift all elements on its right to left. */
                            memmove(pool + k, pool + k + 1,
                                    sizeof(pool[0]) * (REDIS_EVICTION_POOL_SIZE - k - 1));
                            /* Clear the element on the right which is empty
                             * since we shifted one position to the left.  */
                            pool[REDIS_EVICTION_POOL_SIZE - 1].key = NULL;
                            pool[REDIS_EVICTION_POOL_SIZE - 1].idle = 0;
    
                            /* If the key exists, is our pick. Otherwise it is
                             * a ghost and we need to try the next element. */
                            // 8.3
                            if (de) {
                                bestkey = dictGetKey(de);
                                break;
                            } else {
                                /* Ghost... */
                                continue;
                            }
                        }
                    }
                }
    
                    /* volatile-ttl */
                    // 策略為 volatile-ttl ,從一集 sample 鍵中選出過期時間距離當前時間最接近的鍵
                else if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_TTL) {
                    ...
                }
    
                /* Finally remove the selected key. */
                // 8.4 刪除被選中的鍵
                if (bestkey) {
                    long long delta;
    
                    robj *keyobj = createStringObject(bestkey, sdslen(bestkey));
                    propagateExpire(db, keyobj);
                    /* We compute the amount of memory freed by dbDelete() alone.
                     * It is possible that actually the memory needed to propagate
                     * the DEL in AOF and replication link is greater than the one
                     * we are freeing removing the key, but we can't account for
                     * that otherwise we would never exit the loop.
                     *
                     * AOF and Output buffer memory will be freed eventually so
                     * we only care about memory used by the key space. */
                    // 計算刪除鍵所釋放的內存數量
                    delta = (long long) zmalloc_used_memory();
                    dbDelete(db, keyobj);
                    delta -= (long long) zmalloc_used_memory();
                    mem_freed += delta;
    
                    // 對淘汰鍵的計數器增一
                    server.stat_evictedkeys++;
    
                    notifyKeyspaceEvent(REDIS_NOTIFY_EVICTED, "evicted",
                                        keyobj, db->id);
                    decrRefCount(keyobj);
                    keys_freed++;
    				...
                }
            }
    
            if (!keys_freed) return REDIS_ERR; /* nothing to free... */
        }
    
        return REDIS_OK;
    }
    
    • 1處,如果目前使用的內存大小比設置的 maxmemory 要小,那麼無須執行進一步操作

    • 2處,如果佔用內存比 maxmemory 要大,但是 maxmemory 策略為不淘汰,那麼直接返回

    • 3處,計算需要釋放多少字節的內存

    • 4處,遍歷字典,釋放內存並記錄被釋放內存的字節數

    • 5處,如果策略是 allkeys-lru 或者 allkeys-random 那麼淘汰的目標為所有數據庫鍵

    • 6處,如果策略是 volatile-lru 、 volatile-random 或者 volatile-ttl ,那麼淘汰的目標為帶過期時間的數據庫鍵

    • 7處,如果使用的是 LRU 策略, 那麼從 sample 鍵中選出 IDLE 時間最長的那個鍵

    • 8處,調用evictionPoolPopulate,該函數在下面講解,該函數的功能是,傳入一個鏈表,即這裏的db->eviction_pool,然後在函數內部,隨機找出n個key,放入傳入的鏈表中,並按照空閑時間排序,空閑最久的,放到最後。

      當該函數,返回后,db->eviction_pool這個鏈表裡就存放了我們要淘汰的key。

    • 8.1處,找到這個key,這個key,在後邊會被刪除

    • 8.2處,下面這一段,從db->eviction_pool將這個已經處理了的key刪掉

    • 8.3處,如果這個key,是存在的,則跳出循環,在後面8.4處,會被刪除

    • 8.4處,刪除這個key

    選擇哪些key作為被淘汰的key

    前面我們看到,在7處,如果為lru策略,則會進入8處的函數:

    evictionPoolPopulate。

    該函數的名稱為:填充(populate)驅逐(eviction)對象池(pool)。驅逐的意思,就是現在達到了maxmemory,沒辦法,只能開始刪除掉一部分元素,來騰空間了,不然新的put類型的命令,根本沒辦法執行。

    該方法的大概思路是,使用lru的時候,隨機找n個key,類似於抽樣,然後放到一個鏈表,根據空閑時間排序。

    具體看看該方法的實現:

    void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
    

    其中,傳入的第三個參數,是要被填充的對象,在c語言中,習慣傳入一個入參,然後在函數內部填充或者修改入參對象的屬性。

    該屬性,就是前面說的那個鏈表,用來存放收集的隨機的元素,該鏈表中節點的結構如下:

    struct evictionPoolEntry {
        unsigned long long idle;    /* Object idle time. */
        sds key;                    /* Key name. */
    };
    

    該結構共2個字段,一個存儲key,一個存儲空閑時間。

    該鏈表中,共maxmemory-samples個元素,會按照idle時間長短排序,idle時間長的在鏈表尾部,(假設頭在左,尾在右)。

    void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
        int j, k, count;
        dictEntry *_samples[EVICTION_SAMPLES_ARRAY_SIZE];
        dictEntry **samples;
    
        /* Try to use a static buffer: this function is a big hit...
         * Note: it was actually measured that this helps. */
        if (server.maxmemory_samples <= EVICTION_SAMPLES_ARRAY_SIZE) {
            samples = _samples;
        } else {
            samples = zmalloc(sizeof(samples[0]) * server.maxmemory_samples);
        }
    
        /* 1 Use bulk get by default. */
        count = dictGetRandomKeys(sampledict, samples, server.maxmemory_samples);
    
    	// 2
        for (j = 0; j < count; j++) {
            unsigned long long idle;
            sds key;
            robj *o;
            dictEntry *de;
    
            de = samples[j];
            key = dictGetKey(de);
            /* If the dictionary we are sampling from is not the main
             * dictionary (but the expires one) we need to lookup the key
             * again in the key dictionary to obtain the value object. */
            if (sampledict != keydict) de = dictFind(keydict, key);
            // 3
            o = dictGetVal(de);
            // 4
            idle = estimateObjectIdleTime(o);
    
            /* 5  Insert the element inside the pool.
             * First, find the first empty bucket or the first populated
             * bucket that has an idle time smaller than our idle time. */
            k = 0;
            while (k < REDIS_EVICTION_POOL_SIZE &&
                   pool[k].key &&
                   pool[k].idle < idle)
                k++;
            
    		...
                
            // 6
            pool[k].key = sdsdup(key);
            pool[k].idle = idle;
        }
        if (samples != _samples) zfree(samples);
    }
    
    • 1處,獲取 server.maxmemory_samples個key,這裡是隨機獲取的,(dictGetRandomKeys),這個值,默認值為5,放到samples中

    • 2處,遍歷返回來的samples

    • 3處,調用如下宏,獲取val

      he的類型為dictEntry:

      /*
       * 哈希表節點
       */
      typedef struct dictEntry {
          
          // 鍵
          void *key;
      
          // 值
          union {
              // 1
              void *val;
              uint64_t u64;
              int64_t s64;
          } v;
      
          // 指向下個哈希表節點,形成鏈表
          struct dictEntry *next;
      
      } dictEntry;
      

      所以,這裏去

      robj *o; 
      
      o = dictGetVal(de);
      

      實際就是獲取其v屬性中的val,(1處):

      #define dictGetVal(he) ((he)->v.val)
      
    • 4處,準備計算該val的空閑時間

      我們上面3處,看到,獲取的o的類型為robj。我們現在看看怎麼計算對象的空閑時長:

      /* Given an object returns the min number of milliseconds the object was never
       * requested, using an approximated LRU algorithm. */
      unsigned long long estimateObjectIdleTime(robj *o) {
          //4.1 獲取系統的當前時間
          unsigned long long lruclock = LRU_CLOCK();
          // 4.2
          if (lruclock >= o->lru) {
              // 4.3
              return (lruclock - o->lru) * REDIS_LRU_CLOCK_RESOLUTION;
          } else {
              return (lruclock + (REDIS_LRU_CLOCK_MAX - o->lru)) *
                          REDIS_LRU_CLOCK_RESOLUTION;
          }
      }
      

      這裏,4.1處,獲取系統的當前時間;

      4.2處,如果系統時間,大於對象的lru時間

      4.3處,則用系統時間減去對象的lru時間,再乘以單位,換算為毫秒,最終返回的單位,為毫秒(可以看註釋。)

      #define REDIS_LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */
      
    • 5處,這裏拿當前元素,和pool中已經放進去的元素,從第0個開始比較,如果當前元素的idle時長,大於pool中指針0指向的元素,則和pool中索引1的元素比較;直到條件不滿足為止。

      這句話意思就是,類似於冒泡,把當前元素一直往後冒,直到idle時長小於被比較的元素為止。

    • 6處,把當前元素放進pool中。

    經過上面的處理后,鏈表中存放了全部的抽樣元素,且ide時間最長的,在最右邊。

    對象還有字段存儲空閑時間?

    前面4處,說到,用系統的當前時間,減去對象的lru時間。

    大家看看對象的結構體

    typedef struct redisObject {
    
        // 類型
        unsigned type:4;
    
        // 編碼
        unsigned encoding:4;
    
        //1 對象最後一次被訪問的時間
        unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */
    
        // 引用計數
        int refcount;
    
        // 指向實際值的指針
        void *ptr;
    
    } robj;
    

    上面1處,lru屬性,就是用來存儲這個。

    創建對象時,直接使用當前系統時間創建

    robj *createObject(int type, void *ptr) {
    
        robj *o = zmalloc(sizeof(*o));
    
        o->type = type;
        o->encoding = REDIS_ENCODING_RAW;
        o->ptr = ptr;
        o->refcount = 1;
    
        /*1 Set the LRU to the current lruclock (minutes resolution). */
        o->lru = LRU_CLOCK();
        return o;
    }
    

    1處即是。

    robj *createEmbeddedStringObject(char *ptr, size_t len) {
        robj *o = zmalloc(sizeof(robj)+sizeof(struct sdshdr)+len+1);
        struct sdshdr *sh = (void*)(o+1);
    
        o->type = REDIS_STRING;
        o->encoding = REDIS_ENCODING_EMBSTR;
        o->ptr = sh+1;
        o->refcount = 1;
        // 1
        o->lru = LRU_CLOCK();
    
        sh->len = len;
        sh->free = 0;
        if (ptr) {
            memcpy(sh->buf,ptr,len);
            sh->buf[len] = '\0';
        } else {
            memset(sh->buf,0,len+1);
        }
        return o;
    }
    

    1處即是。

    每次查找該key時,刷新時間

    robj *lookupKey(redisDb *db, robj *key) {
    
        // 查找鍵空間
        dictEntry *de = dictFind(db->dict,key->ptr);
    
        // 節點存在
        if (de) {
            
    
            // 取出值
            robj *val = dictGetVal(de);
    
            /* Update the access time for the ageing algorithm.
             * Don't do it if we have a saving child, as this will trigger
             * a copy on write madness. */
            // 更新時間信息(只在不存在子進程時執行,防止破壞 copy-on-write 機制)
            if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
                // 1
                val->lru = LRU_CLOCK();
    
            // 返回值
            return val;
        } else {
    
            // 節點不存在
    
            return NULL;
        }
    }
    

    1處即是,包括get、set等各種操作,都會刷新該時間。

    仔細看下面的堆棧,set的,get同理:

    總結

    大家有沒有更清楚一些呢?

    總的來說,就是,設置了max-memory后,達到該內存限制后,會在處理命令時,檢查是否要進行內存淘汰;如果要淘汰,則根據maxmemory-policy的策略來。

    隨機選擇maxmemory-sample個元素,按照空閑時間排序,拉鏈表;挨個挨個清除。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    南投搬家公司費用需注意的眉眉角角,別等搬了再說!

    ※教你寫出一流的銷售文案?

  • 面試官:線程池如何按照core、max、queue的執行循序去執行?(內附詳細解析)

    面試官:線程池如何按照core、max、queue的執行循序去執行?(內附詳細解析)

    前言

    這是一個真實的面試題。

    前幾天一個朋友在群里分享了他剛剛面試候選者時問的問題:“線程池如何按照core、max、queue的執行循序去執行?”

    我們都知道線程池中代碼執行順序是:corePool->workQueue->maxPool,源碼我都看過,你現在問題讓我改源碼??

    一時間群里炸開了鍋,小夥伴們紛紛打聽他所在的公司,然後拉黑避坑。(手動狗頭,大家一起調侃٩(๑ᴗ๑)۶)

    關於線程池他一共問了這麼幾個問題:

    • 線程池如何按照core、max、queue的順序去執行?
    • 子線程拋出的異常,主線程能感知到么?
    • 線程池發生了異常改怎樣處理?

    全是一些有意思的問題,我之前也寫過一篇很詳細的圖文教程:【萬字圖文-原創】 | 學會Java中的線程池,這一篇也許就夠了! ,不了解的小夥伴可以再回顧下~

    但是針對這幾個問題,可能大家一時間也有點懵。今天的文章我們以源碼為基礎來分析下該如何回答這三個問題。(之前沒閱讀過源碼也沒關係,所有的分析都會貼出源碼及圖解)

    線程池如何按照core、max、queue的順序執行?

    問題思考

    對於這個問題,很多小夥伴肯定會疑惑:“別人源碼中寫好的執行流程你為啥要改?這面試官腦子有病吧……”

    這裏來思考一下現實工作場景中是否有這種需求?之前也看到過一份簡歷也寫到過這個問題:

    一個線程池執行的任務屬於IO密集型,CPU大多屬於閑置狀態,系統資源未充分利用。如果一瞬間來了大量請求,如果線程池數量大於coreSize時,多餘的請求都會放入到等待隊列中。等待着corePool中的線程執行完成后再來執行等待隊列中的任務。

    試想一下,這種場景我們該如何優化?

    我們可以修改線程池的執行順序為corePool->maxPool->workQueue。 這樣就能夠充分利用CPU資源,提交的任務會被優先執行。當線程池中線程數量大於maxSize時才會將任務放入等待隊列中。

    你就說巧不巧?面試官的這個問題顯然是經過認真思考來提問的,這是一個很有意思的溫恩提,下面就一起看看如何解決吧。

    線程池運行流程

    我們都知道線程池執行流程是先corePoolworkQueue,最後才是maxPool的一個執行流程。

    線程池核心參數

    在回顧下ThreadPoolExecutor.execute()源碼前我們先回顧下線程池中的幾個重要參數:

    我們來看下這幾個參數的定義:
    corePoolSize: 線程池中核心線程數量
    maximumPoolSize: 線程池中最大線程數量
    keepAliveTime: 非核心的空閑線程等待新任務的時間
    unit: 時間單位。配合allowCoreThreadTimeOut也會清理核心線程池中的線程。
    workQueue: 基於Blocking的任務隊列,最好選用有界隊列,指定隊列長度
    threadFactory: 線程工廠,最好自定義線程工廠,可以自定義每個線程的名稱
    handler: 拒絕策略,默認是AbortPolicy

    ThreadPoolExecutor.execute()源碼分析

    我們可以看下execute()如下:

    接着來分析下執行過程:

    1. 第一步:workerCountOf(c)時間計算當前線程池中線程的個數,當線程個數小於核心線程數
    2. 第二步:線程池線程數量大於核心線程數,此時提交的任務會放入workQueue中,使用offer()進行操作
    3. 第三步:workQueue.offer()執行失敗,新提交的任務會直接執行,addWorker()會判斷如果當前線程池數量大於最大線程數,則執行拒絕策略

    好了,到了這裏我們都已經很清楚了,關鍵在於第二步和第三步如何交換順序執行呢?

    解決思路

    仔細想一想,如果修改workQueue.offer()的實現不就可以達到目的了?我們先來畫圖來看一下:

    現在的問題就在於,如果當前線程池中coreSize < workCount < maxSize時,一定會先執行offer()操作。

    我們如果修改offer的實現是否可以完成執行順序的更換呢?這裏也是畫圖來展示一下:

    Dubbo中EagerThreadPool解決方案

    湊巧Dubbo中也有類似的實現,在DubboEagerThreadPool自定義了一個BlockingQueue,在offer()方法中,如果當前線程池數量小於最大線程池時,直接返回false,這裏就達到了調節線程池執行順序的目的。

    源碼直達:https://github.com/apache/dubbo/blob/master/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueue.java

    看到這裏一切都真相大白了,解決思路以及方案都很簡單,學會了沒有?

    這個問題背後還隱藏了一些場景的優化、源碼的擴展等等知識,果然是一個值得思考的好問題。

    子線程拋出的異常,主線程能感知到么?

    問題思考

    這個問題其實也很容易回答,也僅僅是一個面試題而已,實際工作中子線程的異常不應該由主線程來捕獲。

    針對這個問題,希望大家清楚的是: 我們要明確線程代碼的邊界,異步化過程中,子線程拋出的異常應該由子線程自己去處理,而不是需要主線程感知來協助處理。

    解決方案

    解決方案很簡單,在虛擬機中,當一個線程如果沒有顯式處理異常而拋出時會將該異常事件報告給該線程對象的 java.lang.Thread.UncaughtExceptionHandler 進行處理,如果線程沒有設置 UncaughtExceptionHandler,則默認會把異常棧信息輸出到終端而使程序直接崩潰。

    所以如果我們想在線程意外崩潰時做一些處理就可以通過實現 UncaughtExceptionHandler 來滿足需求。

    我們使用線程池設置ThreadFactory時可以指定UncaughtExceptionHandler,這樣就可以捕獲到子線程拋出的異常了。

    代碼示例

    具體代碼如下:

    /**
     * 測試子線程異常問題
     *
     * @author wangmeng
     * @date 2020/6/13 18:08
     */
    public class ThreadPoolExceptionTest {
    
        public static void main(String[] args) throws InterruptedException {
            MyHandler myHandler = new MyHandler();
            ExecutorService execute = new ThreadPoolExecutor(10, 10,
                    0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setUncaughtExceptionHandler(myHandler).build());
    
            TimeUnit.SECONDS.sleep(5);
            for (int i = 0; i < 10; i++) {
                execute.execute(new MyRunner());
            }
        }
    
    
        private static class MyRunner implements Runnable {
            @Override
            public void run() {
                int count = 0;
                while (true) {
                    count++;
                    System.out.println("我要開始生產Bug了============");
                    if (count == 10) {
                        System.out.println(1 / 0);
                    }
    
                    if (count == 20) {
                        System.out.println("這裡是不會執行到的==========");
                        break;
                    }
                }
            }
        }
    }
    
    class MyHandler implements Thread.UncaughtExceptionHandler {
        private final static Logger LOGGER = LoggerFactory.getLogger(MyHandler.class);
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            LOGGER.error("threadId = {}, threadName = {}, ex = {}", t.getId(), t.getName(), e.getMessage());
        }
    }
    

    執行結果:

    UncaughtExceptionHandler 解析

    我們來看下Thread中的內部接口UncaughtExceptionHandler

    public class Thread {
        ......
        /**
         * 當一個線程因未捕獲的異常而即將終止時虛擬機將使用 Thread.getUncaughtExceptionHandler()
         * 獲取已經設置的 UncaughtExceptionHandler 實例,並通過調用其 uncaughtException(...) 方
         * 法而傳遞相關異常信息。
         * 如果一個線程沒有明確設置其 UncaughtExceptionHandler,則將其 ThreadGroup 對象作為其
         * handler,如果 ThreadGroup 對象對異常沒有什麼特殊的要求,則 ThreadGroup 會將調用轉發給
         * 默認的未捕獲異常處理器(即 Thread 類中定義的靜態未捕獲異常處理器對象)。
         *
         * @see #setDefaultUncaughtExceptionHandler
         * @see #setUncaughtExceptionHandler
         * @see ThreadGroup#uncaughtException
         */
        @FunctionalInterface
        public interface UncaughtExceptionHandler {
            /**
             * 未捕獲異常崩潰時回調此方法
             */
            void uncaughtException(Thread t, Throwable e);
        }
    
        /**
         * 靜態方法,用於設置一個默認的全局異常處理器。
         */
        public static void setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler eh) {
             defaultUncaughtExceptionHandler = eh;
         }
    
        /**
         * 針對某個 Thread 對象的方法,用於對特定的線程進行未捕獲的異常處理。
         */
        public void setUncaughtExceptionHandler(UncaughtExceptionHandler eh) {
            checkAccess();
            uncaughtExceptionHandler = eh;
        }
    
        /**
         * 當 Thread 崩潰時會調用該方法獲取當前線程的 handler,獲取不到就會調用 group(handler 類型)。
         * group 是 Thread 類的 ThreadGroup 類型屬性,在 Thread 構造中實例化。
         */
        public UncaughtExceptionHandler getUncaughtExceptionHandler() {
            return uncaughtExceptionHandler != null ?
                uncaughtExceptionHandler : group;
        }
    
        /**
         * 線程全局默認 handler。
         */
        public static UncaughtExceptionHandler getDefaultUncaughtExceptionHandler() {
            return defaultUncaughtExceptionHandler;
        }
        ......
    }
    

    部分內容參考自:https://mp.weixin.qq.com/s/ghnNQnpou6-NemhFjpl4Jg

    線程池發生了異常改怎樣處理?

    線程池中線程運行過程中出現了異常該怎樣處理呢?線程池提交任務有兩種方式,分別是execute()submit(),這裡會依次說明。

    ThreadPoolExecutor.runWorker()實現

    不管是使用execute()還是submit()提交任務,最終都會執行到ThreadPoolExecutor.runWorker(),我們來看下源碼(源碼基於JDK1.8):

    我們看到在執行task.run()時,出現異常會直接向上拋出,這裏處理的最好的方式就是在我們業務代碼中使用try...catch()來捕獲異常。

    FutureTask.run()實現

    如果我們使用submit()來提交任務,在ThreadPoolExecutor.runWorker()方法執行時最終會調用到FutureTask.run()方法裏面去,不清楚的小夥伴也可以看下我之前的文章:

    線程池續:你必須要知道的線程池submit()實現原理之FutureTask!

    這裏可以看到,如果業務代碼拋出異常后,會被catch捕獲到,然後調用setExeception()方法:

    可以看到其實類似於直接吞掉了,當我們調用get()方法的時候異常信息會包裝到FutureTask內部的變量outcome中,我們也會獲取到對應的異常信息。

    ThreadPoolExecutor.runWorker()最後finally中有一個afterExecute()鈎子方法,如果我們重寫了afterExecute()方法,就可以獲取到子線程拋出的具體異常信息Throwable了。

    結論

    對於線程池、包括線程的異常處理推薦以下方式:

    1. 直接使用try/catch,這個也是最推薦的方式
    2. 在我們構造線程池的時候,重寫uncaughtException()方法,上面示例代碼也有提到:
    public class ThreadPoolExceptionTest {
    
        public static void main(String[] args) throws InterruptedException {
            MyHandler myHandler = new MyHandler();
            ExecutorService execute = new ThreadPoolExecutor(10, 10,
                    0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setUncaughtExceptionHandler(myHandler).build());
    
            TimeUnit.SECONDS.sleep(5);
            for (int i = 0; i < 10; i++) {
                execute.execute(new MyRunner());
            }
        }
    }
    
    class MyHandler implements Thread.UncaughtExceptionHandler {
        private final static Logger LOGGER = LoggerFactory.getLogger(MyHandler.class);
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            LOGGER.error("threadId = {}, threadName = {}, ex = {}", t.getId(), t.getName(), e.getMessage());
        }
    }
    

    3 直接重寫afterExecute()方法,感知異常細節

    總結

    這篇文章到這裏就結束了,不知道小夥伴們有沒有一些感悟或收穫?

    通過這幾個面試問題,我也深刻的感受到學習知識要多思考,看源碼的過程中要多設置一些場景,這樣才會收穫更多。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※別再煩惱如何寫文案,掌握八大原則!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※產品缺大量曝光嗎?你需要的是一流包裝設計!

  • [.NET 開源] 高性能的 Swifter.MessagePack 已發布,併發布新版本的 Swifter.Json 和 Swifter.Data。

    [.NET 開源] 高性能的 Swifter.MessagePack 已發布,併發布新版本的 Swifter.Json 和 Swifter.Data。

    抱歉各位朋友,由於各種私事公事,本應該在 19 年底發布的 Swifter.MessagePack 庫延遲了這麼久才發布,我深感抱歉。

    MsgPack 簡介

    MsgPack 一種非常輕巧的二進制數據交換格式,巧妙的設計讓它相比其他二進制數據格式更可讀,並且有着不錯的壓縮率和邏輯性能,是目前相當火熱的數據交換格式。

    Swifter.MessagePack 遵循 MsgPack 新的規範實現;相比 .NET 其他 MsgPack 序列化庫,Swifter.MessagePack 有着更好的性能,生成的內容更緊湊合理且更簡單易用。

    Nuget:Swifter.MessagePackSwifter.JsonSwifter.Data

    GitHub:Swifter.MessagePackSwifter.Json

    如果您想使用 Swifter 庫,請在 Nuget 上安裝/下載最新版本,如需單文件版本,請自行生成/合併。

     

    簡單使用 Swifter.MessagePack

    MessagePackFormatter 類內部還有十個方法重載,包括靜態和實例方法,總有一些適合您;這些方法都是線程安全的。

    更多使用方法請參考早期關於 Swifter.Json 的文章,GitHub 或 Wiki;學習交流進 Swifter 的 QQ 群:133630914(新群,歡迎加入)。

     

    Swifter 框架的特性

    (1) Swifter 可以運行在 .NET Framework 2.0+, .NET Core 2.0+, .NET Standard 2.0+, MONO JIT, MONO AOT, Xamarin.Android, Xamarin.iOS, Unity JIT 等平台/運行時上,Unity IL2CPP 運行時由於沒有我們測試環境,不知可否正常運行,更多信息請看下面的 AOT 說明

    (2) Swifter 有着深層的抽象封裝,這雖然帶來了一些性能和內存的損耗,但也獲得了更高的擴展性;Swifter.Json/Swifter.MessagePack/Swifter.Data 的可公用的代碼非常多,這使得在 Swifter 上實現一個新的序列化庫只需要編寫少量代碼即可實現,這是其他框架難實現的。

    (3) 雖然 Swifter 有很多接口和抽象編程,但是 Swifter 並沒有因此比其他的框架慢或內存佔用大,反比它們更快和更小內存佔用;這是因為 Swifter 從來都是使用更好算法和邏輯來獲取性能,而不是使用更直接的代碼獲取直接的性能。

    (4) 作為類庫開發者,我們深知每個人開發和測試的側重點都與他人不一樣,自己找出自己的問題太難,所以 Swifter.Json 和 Swifter.MessagePack 除了我們自己的測試單元之外, 還 “偷” 了 Newtonsoft, Neuecc 和 Spanjson 的 5000+ 個測試單元( 去除了 Newtonsoft 的部分測試單元);現已測試通過 4200+ 個,不通過 800+ 個是我們認為可以允許或是更加合理的行為。(不勞而獲的測試單元確實用着很爽,但事實是我們”搬”這些測試單元用了 3 天,無腦替換改到手指抽筋)

     

    Swifter.Json 和 Swifter.MessagePack

    (1) Swifter.MessagePack 和 Swifter.Json 一樣,都有着非常優異的性能和極小的額外內存分配。

    (2) Swifter.MessagePack 和 Swifter.Json 的 API 大致相同,如果使用者同時使用它們,那麼可以極小成本在它們之間切換。

    (3) 得益於 Swifter.Core 的強大數據映射,Swifter.MessagePack 和 Swifter.Json 都同時支持 .NET 上大多數常用的數據結構和類型。

    (4) Swifter.MessagePack 和 Swifter.Json 對重複引用的對象的表示方式不一樣,在開啟 MultiReferencingReference 配置項后,Swifter.Json 將使用 { “$ref”: “#/obj/1/target” } 來表示重複引用的對象,而 Swifter.MessagePack 使用對象在 MsgPack 內容的偏移量表示重複引用的對象;相比之下 Swifter.MessagePack 的方案更簡單性能更快,但是可讀性較差,不過說來 MsgPack 本來就是要專門的工具才能閱讀。

    (5) Swifter.MessagePack 在序列化基礎類型時,在保證精度不丟失的前提下,將大數據類型轉換為更小數據類型,以得到更緊湊的 MsgPack 內容(如將 double 123 轉換為 int 123,int 123 只需要 1 個字節即可表示,如果不做轉換則需要 9 個字節表示)。

    (6) Swifter.MessagePack 在序列化未知長度的集合時(如 Enumerable<T>),會將長度定義為四字節 (FixArray32),然後在寫入完成后把實際長度賦予這四字節長度;這樣雖然在較短的未知長度集合時,將產生 1-3 個 0;但是這避免了將未知長度的集合轉換為 List<T> 或 T[], 這提高了性能也減少了內存分配,這是不虧的(因為未知長度的集合很常用,如 Linq,DbDataReader 等)。

     

    新版本做了啥?

    (1) 主要是解決了已知 BUG,包括了 Issues 上提到的幾個。

    (2) 允許將 “” 值解析為 DateTime, int?, double 等基礎類型的默認值,但是需要啟用 EmptyStringAsDefault 配置項,默認未開啟。

    (3) 解決了 Swifter.Json 浮點數: float, double 失真的問題,並增加了 UseSystemFloatingPointsMethods 配置項使用系統的浮點數方法,此配置項的更多說明請看該配置項的註釋。

    (4) 增加了序列化的事件:ObjectFiltering 和 ArrayFiltering,這兩個事件可以對正在序列化中的 鍵/值 做處理和篩選,包括駝峰命名法,忽略一些值等。它們被放在 JsonFormatter 和 MessagePackFormatter 的實例裏面。

    (5) 增加了 .NET 對象的持久序列化和反序列化功能,這個功能將對象序列化為包含類型信息和字段值的內容,不包含邏輯信息;使用 SerializationBox<T> 盒子使用此功能。圖示:

    更多新增的功能請繼續看以下內容。

     

     

    AOT

    在 Swifter 新版本里,AOT 的 JIT 的界限更加明顯,由 VersionDifferences.IsSupportEmit 字段標識;當這個字段為 true 表示當前平台是 JIT 運行時,Swifter 將在一些類中使用 Emit 技術提高性能;當此字段為 false 時,Swifter 會完全不使用 Emit 技術。

    因我們設備有限,無法提供大規模的平台測試,但我們非常希望可以 Swifter 可以支持更多的平台,所以希望朋友們加 Swifter 交流 QQ 群:(133630914),在這裏我們可以更快的提供反饋。

     

    直接文檔讀取/寫入的 API

    通常情況下,將小型對象序列化為 Json/MsgPack 和將小型 Json/MsgPack 反序列化為對象是 .NET 程序中常見的操作,Swifter 也正以此為常用場景做優化,所以 Swifter 在對小型數據操作時性能最佳,且相比其他 Json/MsgPack 解析庫優勢明顯。

    但在大型數據下優勢減少,這主要原因是大型數據的存儲需要實體類或字典/集合存儲,創建/填充/遍歷這些對象消耗了大量資源(接口編程的損耗);所以 Swifter 提供了直接讀取/寫入的 API 來繞開了對存儲介質的操作,以更快更小損耗的讀寫大型數據。

    使用 JsonFormatter.CreateJsonReader/MessagePackFormatter.CreateMessagePackReader 函數來創建文檔讀取器,使用 JsonFormatter.CreateJsonWriter/MessagePackFormatter.CreateMessagePackWriter 函數來創建文檔寫入器。

    使用文檔讀取器完整的讀取一個 Json/MsgPack 文檔將比反序列化為對象快 4-8 倍!使用文檔寫入器生成文檔的性能與將實體類序列化為 Json/MsgPack 相差較小,前提是您已構建好了這些對象。

    讀取器演示:

    寫入器演示:

     

    擁有簡單預測數組的長度的能力

    Swifter 在對小型數組,部分集合寫入時,會根據數組的類型,來源(Data,Json,MsgPack 等),名稱等信息並結合之前的一些長度記錄,簡單的預測出新的數組的長度;在寫入完成后,如果預測長度與實際長度不符,則擴展或壓縮為實際長度;如果與實際長度相符,則不需要重新創建新數組。此能力有效提高反序列化小型數組和部分集合性能,並且減少額外內存分配。

    在其他高性能的 Json 解析庫,它們使用 ArrayPool<T> 同樣可以提高性能和減少內存分配;但是由於 Swifter 對兼容性的要求,使得我們不能使用 ArrayPool<T> 方案;在數組的長度比較穩定的情況下,我們的方案更好;但在數組長度非常不穩定的情況下,我們的方案可能仍需要 1-3 次的擴容/壓縮。

     

    假定有序的對象反序列化

    Swifter.Json 和 Swifter.MessagePack 都支持了假定有序的對象反序列化,當一個 Json/MsgPack 的對象與當前的實體類對象的字段順序一致時,將有效提升反序列化性能。

    此操作默認不開啟,可以使用 AsOrderedObjectDeserialize 配置項開啟。

     

    高性能的反射封裝

    Swifter.Core 里提供了一些對反射封裝的類,它們放在 Swifter.Reflection 命名空間下;這些類型主要功能就是提高了系統反射的性能;XObjectRW 正是使用它們實現不依賴 Emit 的高性能對象讀寫器。

    雖然放棄一些安全性檢查可以提高更多的性能,但是我們並沒有這麼做;我們仍然有類型安全檢查和防溢出檢查(事實上讀寫字段和屬性大多數的損害都在這裏,如果去掉這些檢查將得到上百倍的性能;事實上這些檢查只起到了提示程序員不能這麼做的作用,程序實際運行時這些檢查無意義)。

     

    高效的数字 ToString 和 Parse 方法

    Swifter.Core 提供了一些高性能数字算法,包括 Int64, UInt64, Double, Single, Decimal 的 Parse 和 ToString 算法,它們被放在 Swifter.Tools.NumberHelper 里,這些算法被應用與 Swifter.Json 和一些其他地方,這些算法支持 2-64 進制。

     

    XConvert 萬能類型轉換器

    Swifter.Tools.XConvert.Convert<TSource, TDestination> 是一個功能強大的萬能類型轉換函數,它在初始化時嘗試以下方式獲取合適的轉換函數:

    (1) 包含在 System.Convert 里的基礎轉換函數;

    (2) 類型兼容的隱式轉換(如:從子類轉換為父類,從 Int32 轉換為 Int64,從 Int64 轉換為 Double)。

    (3) 原類型和目標類型中的 static implicit operator (隱式轉換) 函數。

    (4) 原類型中的 ToXXX 實例函數。

    (5) 目標類型中的 Parse 和 ValueOf 靜態函數。

    (7) 目標類型的構造函數。

    (8) 原類型和目標類型中的 static explicit operator (顯式轉換) 函數。

    (9) 當以上方法都沒有找到合適函數時,將使用 (TDestination)(object)value 進行強制轉換。

    簡單示例:

     

    性能測試

    ServiceStack.Json, Jil, LitJson, NetJson 等庫因為出錯太多未展示出來;如果有需要,您可以到 GitHub 上自行克隆/修改/運行,已收錄了 .NET 的大多數 Json 序列化庫。

     

    更多實用功能等你發現…

    Swifter.Core 還提供了許許多多的工具類,包括反射,委託,類型轉換,字符串,加密,哈希,数字,日期,數組和集合等工具,它們被放在 Swifter.Tools 命名空間下,您可以使用它們來提高開發效率和運行效率。

    Swifter.RW 命名空間是整個 Swifter 框架的核心,它主要邏輯是:從讀取器中讀取值,寫入到寫入器中;如:從 JsonReader 讀取值到 ObjectWriter 或 DictionaryWriter 中;熟悉它們就等於精通了 Swifter 框架。

    Swifter.Json/Swifter.MessagePack 有一個非常重要的配置項 JsonFormatterOptions/MessagePackFormatterOptions;使用前建議先閱讀它們,以配置更適合您系統的序列化和反序列化方案。

     

    最後附上 Swifter.Data 的簡介

    Swifter.Data 是一個小型的 ORM 工具,它相比 Dapper 性能要快一些,功能要強大一些。

     

    感謝閱讀

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※別再煩惱如何寫文案,掌握八大原則!

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※超省錢租車方案

    ※教你寫出一流的銷售文案?

    網頁設計最專業,超強功能平台可客製化

    ※產品缺大量曝光嗎?你需要的是一流包裝設計!

  • foreach 集合又拋經典異常了,這次一定要刨根問底

    foreach 集合又拋經典異常了,這次一定要刨根問底

    一:背景

    1. 講故事

    最近同事在寫一段業務邏輯的時候,程序跑起來總是報:集合已修改;可能無法執行枚舉操作,硬是沒有找到什麼情況下會導致這個異常產生,就讓我來找一下bug,其實這個異常在座的每個程序員幾乎都遇到過,誰也不是一生下就是大牛,簡單看了下代碼,確實是多線程操作foreach,但並沒有對foreach進行Add,Remove操作,掃完代碼其實我也是有點懵,沒撤只能調試了,在foreach里套一層trycatch,查看異常的線程堆棧從而找出了問題代碼,代碼簡化如下:

    
            static void Main(string[] args)
            {
                var dict = new Dictionary<int, int>()
                {
                    [1001] = 1,
                    [1002] = 10,
                    [1003] = 20
                };
    
                foreach (var userid in dict.Keys)
                {
                    dict[userid] = dict[userid] + 1;
                }
            }
    

    先尋找點安慰,說實話,憑肉眼你覺得這段代碼會拋出異常嗎? 反正我是被騙過了,大寫的尷尬,結論如下,運行一下便知。

    從圖中看確實是異常,說明在foreach的過程中連迭代集合的 value 都不可以修改,這讓我激起了強烈的探索欲,看看FCL中到底是怎麼限制的。

    二:源碼探索

    1. 從IL中尋找答案

    C#已發展到 9.0 了,到處都充斥着語法糖,有時候不看一下底層的IL都不知道到底是轉化成了什麼,所以這個是必須的。

    
    	IL_000d: callvirt instance void class [System.Collections]System.Collections.Generic.Dictionary`2<int32, int32>::set_Item(!0, !1)
    	IL_001b: callvirt instance void class [System.Collections]System.Collections.Generic.Dictionary`2<int32, int32>::set_Item(!0, !1)
    	IL_0029: callvirt instance void class [System.Collections]System.Collections.Generic.Dictionary`2<int32, int32>::set_Item(!0, !1)
    	IL_0037: callvirt instance valuetype [System.Collections]System.Collections.Generic.Dictionary`2/KeyCollection/Enumerator<!0, !1> class [System.Collections]System.Collections.Generic.Dictionary`2/KeyCollection<int32, int32>::GetEnumerator()
    
    	.try
    	{
    		IL_003d: br.s IL_005a
    		// loop start (head: IL_005a)
    			IL_003f: ldloca.s 1
    			IL_0041: call instance !0 valuetype [System.Collections]System.Collections.Generic.Dictionary`2/KeyCollection/Enumerator<int32, int32>::get_Current()
    			IL_004c: callvirt instance !1 class [System.Collections]System.Collections.Generic.Dictionary`2<int32, int32>::get_Item(!0)
    			IL_0053: callvirt instance void class [System.Collections]System.Collections.Generic.Dictionary`2<int32, int32>::set_Item(!0, !1)
    			IL_005a: ldloca.s 1
    			IL_005c: call instance bool valuetype [System.Collections]System.Collections.Generic.Dictionary`2/KeyCollection/Enumerator<int32, int32>::MoveNext()
    			IL_0061: brtrue.s IL_003f
    		// end loop
    
    		IL_0063: leave.s IL_0074
    	} // end .try
    	finally
    	{
    
    	} // end handler    
    
    

    從IL代碼中可以看到,先執行了三次字典的索引器操作,然後調用了 Dictionary.GetEnumerator 來生成字典的迭代類,這思路就非常清晰了,然後我們看一下類索引器都做了些什麼。

    從圖中可以看到,每一次的索引器操作,這裏都執行了version++,所以字典初始化完成之後,這裏的 version=3,沒有問題吧,然後繼續看代碼,尋找 Dictionary.GetEnumerator 方法啟動迭代類。

    上面代碼的 _version = dictionary._version; 一定要看仔細了,在啟動迭代類的時候記錄了當時字典的版本號,也就是_version=3,然後繼續探索moveNext方法幹了什麼,如下圖:

    從圖中可以看到,當每次執行moveNext的過程中,都會判斷一下字典的 version 和 當初初始化迭代類中的version 版本號是否一致,如果不一致就拋出異常,所以這行代碼就是點睛之筆了,當在foreach體中執行了 dict[userid] = dict[userid] + 1; 語句,相當於又執行了一次類索引器操作,這時候字典的version就變成 4 了,而當初初始化迭代類的時候還是3,自然下一次執行 moveNext 就是 3 != 4 拋出異常了。

    如果你非要讓我證明給你看,這裏可以使用dnspy直接調試源碼,在異常那裡下一個斷點再查看兩個version版本號不就知道啦。。。

    2. 面對疾風

    有些朋友可能要說,碼農今天分享的這篇一點水準都沒有,我18年前就知道字典是不能動態修改的,還分析的頭頭是勁。

    但是我有話要說,這個還確實是我的一個盲區,平時在迭代字典的時候value一般都是引用類型,動態修改引用類型的值自然是沒有問題的,這是因為你不管怎麼修改都不會改變 _version 版本號,但質疑我的也不要把話說的太滿,因為這種操作是非常語義化非常大眾的需求,你能保證後面net版本不支持這個嗎??? 如果你說不可能,那恭喜你,被我帶到坑裡面去啦。

    下面我用原封不動的代碼在 .net 5 下跑一次,睜大眼睛好好看哦~~~

    驚訝吧, 居然在 .Net 5 中可以的,接下來用ILSpy去查查底層源碼,.netcore 3.1 和 net5 中分別對 類索引器 都做了啥修改。

    • netcore 3.1

    Path: C:\Program Files\dotnet\shared\Microsoft.NETCore.App\3.1.2\System.Private.CoreLib.dll

    • net5

    Path: C:\Program Files\dotnet\shared\Microsoft.NETCore.App\5.0.0-preview.5.20278.1\System.Private.CoreLib.dll

    對比兩張圖你會發現 .Net5 中並沒有做 _version++ 操作,這就了,如果你再細讀代碼,你還發現 .Net5 對字典進行了較大幅度的優化,哈哈,當初在 .Net5 之前產生的錯誤,在 .Net5 中居然沒有啦!

    四: 總結

    源碼面前,不談隱私,沒事多翻翻源碼,有可能還有意外收穫,比如在 .Net 5下的這點新發現,可能還是全網第一個哦,這要是兩個大牛爭吵,讓小白去相信誰呢,嘿嘿,源碼才是真正的專家~

    如您有更多問題與我互動,掃描下方進來吧~

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※教你寫出一流的銷售文案?

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※回頭車貨運收費標準

    ※別再煩惱如何寫文案,掌握八大原則!

    ※超省錢租車方案

    ※產品缺大量曝光嗎?你需要的是一流包裝設計!

  • .Net Core微服務入門全紀錄(三)——Consul-服務註冊與發現(下)

    .Net Core微服務入門全紀錄(三)——Consul-服務註冊與發現(下)

    前言

    上一篇【.Net Core微服務入門全紀錄(二)——Consul-服務註冊與發現(上)】已經成功將我們的服務註冊到Consul中,接下來就該客戶端通過Consul去做服務發現了。

    服務發現

    • 同樣Nuget安裝一下Consul:

    • 改造一下業務系統的代碼:

    ServiceHelper.cs:

        public class ServiceHelper : IServiceHelper
        {
            private readonly IConfiguration _configuration;
    
            public ServiceHelper(IConfiguration configuration)
            {
                _configuration = configuration;
            }
    
            public async Task<string> GetOrder()
            {
                //string[] serviceUrls = { "http://localhost:9060", "http://localhost:9061", "http://localhost:9062" };//訂單服務的地址,可以放在配置文件或者數據庫等等...
    
                var consulClient = new ConsulClient(c =>
                {
                    //consul地址
                    c.Address = new Uri(_configuration["ConsulSetting:ConsulAddress"]);
                });
    
                //consulClient.Catalog.Services().Result.Response;
                //consulClient.Agent.Services().Result.Response;
                var services = consulClient.Health.Service("OrderService", null, true, null).Result.Response;//健康的服務
    
                string[] serviceUrls = services.Select(p => $"http://{p.Service.Address + ":" + p.Service.Port}").ToArray();//訂單服務地址列表
    
                if (!serviceUrls.Any())
                {
                    return await Task.FromResult("【訂單服務】服務列表為空");
                }
    
                //每次隨機訪問一個服務實例
                var Client = new RestClient(serviceUrls[new Random().Next(0, serviceUrls.Length)]);
                var request = new RestRequest("/orders", Method.GET);
    
                var response = await Client.ExecuteAsync(request);
                return response.Content;
            }
    
            public async Task<string> GetProduct()
            {
                //string[] serviceUrls = { "http://localhost:9050", "http://localhost:9051", "http://localhost:9052" };//產品服務的地址,可以放在配置文件或者數據庫等等...
    
                var consulClient = new ConsulClient(c =>
                {
                    //consul地址
                    c.Address = new Uri(_configuration["ConsulSetting:ConsulAddress"]);
                });
    
                //consulClient.Catalog.Services().Result.Response;
                //consulClient.Agent.Services().Result.Response;
                var services = consulClient.Health.Service("ProductService", null, true, null).Result.Response;//健康的服務
    
                string[] serviceUrls = services.Select(p => $"http://{p.Service.Address + ":" + p.Service.Port}").ToArray();//產品服務地址列表
    
                if (!serviceUrls.Any())
                {
                    return await Task.FromResult("【產品服務】服務列表為空");
                }
    
                //每次隨機訪問一個服務實例
                var Client = new RestClient(serviceUrls[new Random().Next(0, serviceUrls.Length)]);
                var request = new RestRequest("/products", Method.GET);
    
                var response = await Client.ExecuteAsync(request);
                return response.Content;
            }
        }
    

    appsettings.json:

    {
      "Logging": {
        "LogLevel": {
          "Default": "Information",
          "Microsoft": "Warning",
          "Microsoft.Hosting.Lifetime": "Information"
        }
      },
      "AllowedHosts": "*",
      "ConsulSetting": {
        "ConsulAddress": "http://localhost:8500"
      }
    }
    

    OK,以上代碼就完成了服務列表的獲取。

    瀏覽器測試一下:

    隨便停止2個服務:

    繼續訪問:

    這時候停止的服務地址就獲取不到了,客戶端依然正常運行。

    這時候解決了服務的發現,新的問題又來了…

    • 客戶端每次要調用服務,都先去Consul獲取一下地址,這不僅浪費資源,還增加了請求的響應時間,這顯然讓人無法接受。

    那麼怎麼保證不要每次請求都去Consul獲取地址,同時又要拿到可用的地址列表呢?
    Consul提供的解決方案:——Blocking Queries (阻塞的請求)。詳情請見官網:https://www.consul.io/api-docs/features/blocking

    Blocking Queries

    這是什麼意思呢,簡單來說就是當客戶端請求Consul獲取地址列表時,需要攜帶一個版本號信息,Consul會比較這個客戶端版本號是否和Consul服務端的版本號一致,如果一致,則Consul會阻塞這個請求,直到Consul中的服務列表發生變化,或者到達阻塞時間上限;如果版本號不一致,則立即返回。這個阻塞時間默認是5分鐘,支持自定義。
    那麼我們另外啟動一個線程去干這件事情,就不會影響每次的用戶請求了。這樣既保證了客戶端服務列表的準確性,又節約了客戶端請求服務列表的次數。

    • 繼續改造代碼:
      IServiceHelper增加一個獲取服務列表的接口方法:
        public interface IServiceHelper
        {
            /// <summary>
            /// 獲取產品數據
            /// </summary>
            /// <returns></returns>
            Task<string> GetProduct();
    
            /// <summary>
            /// 獲取訂單數據
            /// </summary>
            /// <returns></returns>
            Task<string> GetOrder();
    
            /// <summary>
            /// 獲取服務列表
            /// </summary>
            void GetServices();
        }
    

    ServiceHelper實現接口:

        public class ServiceHelper : IServiceHelper
        {
            private readonly IConfiguration _configuration;
            private readonly ConsulClient _consulClient;
            private ConcurrentBag<string> _orderServiceUrls;
            private ConcurrentBag<string> _productServiceUrls;
    
            public ServiceHelper(IConfiguration configuration)
            {
                _configuration = configuration;
                _consulClient = new ConsulClient(c =>
                {
                    //consul地址
                    c.Address = new Uri(_configuration["ConsulSetting:ConsulAddress"]);
                });
            }
    
            public async Task<string> GetOrder()
            {
                if (_productServiceUrls == null)
                    return await Task.FromResult("【訂單服務】正在初始化服務列表...");
    
                //每次隨機訪問一個服務實例
                var Client = new RestClient(_orderServiceUrls.ElementAt(new Random().Next(0, _orderServiceUrls.Count())));
                var request = new RestRequest("/orders", Method.GET);
    
                var response = await Client.ExecuteAsync(request);
                return response.Content;
            }
    
            public async Task<string> GetProduct()
            {
                if(_productServiceUrls == null)
                    return await Task.FromResult("【產品服務】正在初始化服務列表...");
    
                //每次隨機訪問一個服務實例
                var Client = new RestClient(_productServiceUrls.ElementAt(new Random().Next(0, _productServiceUrls.Count())));
                var request = new RestRequest("/products", Method.GET);
    
                var response = await Client.ExecuteAsync(request);
                return response.Content;
            }
    
            public void GetServices()
            {
                var serviceNames = new string[] { "OrderService", "ProductService" };
                Array.ForEach(serviceNames, p =>
                {
                    Task.Run(() =>
                    {
                        //WaitTime默認為5分鐘
                        var queryOptions = new QueryOptions { WaitTime = TimeSpan.FromMinutes(10) };
                        while (true)
                        {
                            GetServices(queryOptions, p);
                        }
                    });
                });
            }
            private void GetServices(QueryOptions queryOptions, string serviceName)
            {
                var res = _consulClient.Health.Service(serviceName, null, true, queryOptions).Result;
                
                //控制台打印一下獲取服務列表的響應時間等信息
                Console.WriteLine($"{DateTime.Now}獲取{serviceName}:queryOptions.WaitIndex:{queryOptions.WaitIndex}  LastIndex:{res.LastIndex}");
    
                //版本號不一致 說明服務列表發生了變化
                if (queryOptions.WaitIndex != res.LastIndex)
                {
                    queryOptions.WaitIndex = res.LastIndex;
    
                    //服務地址列表
                    var serviceUrls = res.Response.Select(p => $"http://{p.Service.Address + ":" + p.Service.Port}").ToArray();
    
                    if (serviceName == "OrderService")
                        _orderServiceUrls = new ConcurrentBag<string>(serviceUrls);
                    else if (serviceName == "ProductService")
                        _productServiceUrls = new ConcurrentBag<string>(serviceUrls);
                }
            }
        }
    

    Startup的Configure方法中調用一下獲取服務列表:

            public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IServiceHelper serviceHelper)
            {
                if (env.IsDevelopment())
                {
                    app.UseDeveloperExceptionPage();
                }
                else
                {
                    app.UseExceptionHandler("/Home/Error");
                }
                app.UseStaticFiles();
    
                app.UseRouting();
    
                app.UseAuthorization();
    
                app.UseEndpoints(endpoints =>
                {
                    endpoints.MapControllerRoute(
                        name: "default",
                        pattern: "{controller=Home}/{action=Index}/{id?}");
                });
    
                //程序啟動時 獲取服務列表
                serviceHelper.GetServices();
            }
    

    代碼完成,運行測試:

    現在不用每次先請求服務列表了,是不是流暢多了?

    看一下控制台打印:

    這時候如果服務列表沒有發生變化的話,獲取服務列表的請求會一直阻塞到我們設置的10分鐘。

    隨便停止2個服務:

    這時候可以看到,數據被立馬返回了。

    繼續訪問客戶端網站,同樣流暢。
    (gif圖傳的有點問題。。。)

    至此,我們就通過Consul完成了服務的註冊與發現。
    接下來又引發新的思考。。。

    1. 每個客戶端系統都去維護這一堆服務地址,合理嗎?
    2. 服務的ip端口直接暴露給所有客戶端,安全嗎?
    3. 這種模式下怎麼做到客戶端的統一管理呢?

    代碼放在:https://github.com/xiajingren/NetCoreMicroserviceDemo

    未完待續…

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※超省錢租車方案

    ※別再煩惱如何寫文案,掌握八大原則!

    ※回頭車貨運收費標準

    ※教你寫出一流的銷售文案?

    ※產品缺大量曝光嗎?你需要的是一流包裝設計!

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益