標籤: USB CONNECTOR

  • 日產聆風在美“充電不花錢”項目將新增三個城市

    日產聆風在美國進行的“充電不花錢”(No Charge to Charge)項目將新增三個城市。

    據報導,此前有26個不同市城市消費者可以享受這個項目,不過好消息是,現在位於紐約市、費城與聖巴巴拉市的聆風用戶也可以進行免費充電了。用戶可以在指定的公共充電站使用贈送的費用充電,同樣,用戶也可以使用EZ-Charge網站或APP輕鬆的定位合作的充電站。

    “充電不花錢”項目針對日產聆風電動汽車的購買者或租賃者,並提供免費兩年的充電機會。新車主將受到一張EZ-Charge卡,這張卡可以介入充電點(ChargePoint)。

    除了新增的城市,其他26個城市是三藩市、洛杉磯、沙加緬度、聖地牙哥、夫勒斯諾市、波特蘭、芝加哥、達拉斯-沃思堡、惠斯頓、印度安納波利斯、那什維爾、鳳凰城、丹佛、華府、巴爾的摩、波斯頓、蒙特利、亞特蘭大、 羅利、鹽湖城和明尼阿波利斯—聖保羅都會區。
     

    本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※帶您來了解什麼是 USB CONNECTOR  ?

    ※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

    ※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

    ※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

    ※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

  • 騰訊、和諧汽車、富士康“互聯網+智慧電動汽車”專案公佈企業負責人

    騰訊、和諧汽車、富士康“互聯網+智慧電動汽車”專案公佈企業負責人

    和諧富騰互聯網加智慧電動汽車公司(以下簡稱“和諧富騰”)日前宣佈旗下“互聯網+智慧電動汽車”企業負責人,畢福康博士(Dr. Carsten Breitfeld)將擔任該企業首席執行官,戴雷博士(Dr. Daniel Kirchert)將擔任首席運營官。兩位管理層到任後將即刻啟動企業的運營。畢福康博士和戴雷博士亦將是該企業的事業合夥人和公司董事會成員。

    左:畢福康博士(Dr. Carsten Breitfeld);右:戴雷博士(Dr. Daniel Kirchert)

    和諧富騰是由中國和諧新能源汽車控股有限公司、鴻海集團與騰訊集團透過各自附屬實體聯合創立的創新投資平臺,“互聯網+智慧電動汽車”是該平臺旗下核心戰略專案和獨立企業,旨在開發面向未來的個人出行解決方案,塑造源自中國、佈局世界的高端品牌,為消費者提供智慧、愉悅、生態友好的駕乘體驗。

    畢福康博士擁有機械工程學博士學位,是全球電動汽車研發領域的一流專家。畢福康博士此前在寶馬集團總部工作20年,擔任過底盤開發、傳動系統開發及產品戰略等方面的多個高級管理崗位,2010年起擔任寶馬集團新一代電動超級跑車i8項目總監,成為這一世界汽車行業劃時代旗艦車型的研發主腦。通過引入革命性的開發流程,畢福康博士帶領團隊實現了i8車型于2014年成功面世,在產品性能、材料、技術革新及研發速度等各個方面顯著優於傳統汽車產品,創下了全球汽車行業新的標杆。在畢福康博士的推動下,i8車型還引進了創新的銷售和客戶體驗機制。

    戴雷博士是中國豪華汽車領域擁有最豐富銷售、運營和品牌塑造經驗的高層主管之一,也是業內公認的“中國通”。戴雷博士此前曾擔任東風英菲尼迪汽車有限公司總經理和華晨寶馬汽車有限公司行銷高級副總裁,相關品牌在其任內均創下豪華車市場的銷售增長紀錄,在品牌塑造和市場行銷方面也創造了若干標杆性的案例。戴雷博士在產品戰略、銷售網路發展和合資企業組建運營方面也擁有深厚的經驗。

    畢福康博士和戴雷博士將在就任後與傳媒見面,並就“互聯網+智慧電動汽車”的戰略規劃和企業具體運營資訊與傳媒和公眾溝通。

    本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※為什麼 USB CONNECTOR 是電子產業重要的元件?

    網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

    ※想要讓你的商品成為最夯、最多人討論的話題?網頁設計公司讓你強力曝光

    ※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

  • 戴姆勒總裁蔡澈:電動汽車續航里程應達499公里以上

    日前,戴姆勒集團總裁迪特•蔡澈(Dieter Zetsche)稱,電動汽車一次充電必須能供應至少310英里(499公里)的行程,才能替代燃油汽車,成為主流。

    但蔡澈在接受美國媒體採訪時稱,在短時期內讓所有消費者接受電動汽車不太可能,並稱這是一個連續的過程。首先要降低車載電池的成本。蔡澈估計該價格在170美元每千瓦時左右,而110-130美元則會使汽車很有競爭優勢。

    本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

    ※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

    ※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

    ※評比南投搬家公司費用收費行情懶人包大公開

  • HashMap7淺析

    HashMap7淺析

    一、概述

      HashMap,基於哈希結構的Map接口的一個實現,無序,允許null鍵值對,線程不安全的。可以使用集合工具類Collections中的synchronizedMap方法,去創建一個線程安全的集合map。

      在jdk1.7中,HashMap主要是基於 數組+鏈表 的結構實現的。鏈表的存在主要是解決 hash 衝突而存在的。插入數據的時候,計算key的hash值,取得存儲的數組下標,如果衝突已有元素,則會在衝突地址上生成個鏈表,再通過key的比較,鏈表是否已存在,存在則覆蓋,不存在則鏈表上添加。這種方式,如果存在大量衝突的時候,會導致鏈表過長,那麼直接導致的就是犧牲了查詢和添加的效率。所以在jdk1.8版本之後,使用的就是 數組 + 鏈表 + 紅黑樹,當鏈表長度超過 8(實際加上初始的節點,整個有效長度是 9) 的時候,轉為紅黑樹存儲。

      本文中內容,主要基於jdk1.7版本,單線程環境下使用的HahsMap沒有啥問題,但是當在多線程下使用的時候,則可能會出現併發異常,具體表象是CPU會直線上升100%。下面是主要介紹相關的存取以及為什麼會出現線程安全性問題。

    二、結構

      

      HashMap默認初始化size=16的哈希數組,然後通過計算待存儲的key的hash值,去計算得到哈希數組的下標值,然後放入鏈表中(新增節點或更新)。鏈表的存在即是解決hash衝突的。

    三、源碼實現分析

      1、存儲具體數據的table數組:

          

        Entry為HashMap中的靜態內部類,其具體結構如下圖

          

        key、value屬性就是存儲鍵值對的,next則是指向鏈表的下一個元素節點。

         2、 默認初始化方法:

        

        默認構造方法,不對table進行初始化new(真正初始化動作放在put中,後面會看到),只是設置參數的默認值,hashmap長度和table長度初始化成DEFAULT_INITIAL_CAPACITY(16),加載因子loadFactor默認DEFAULT_LOAD_FACTOR(0.75f,至於為什麼是0.75,這個可以參見 )。

        加載因子:默認情況下,16*0.75=12,也就是在存儲第13個元素的時候,就會進行擴容(jdk1.7的threshold真正計算放在第一次初始化中,後面會再提及)。此元素的設置,直接影響到的是key的hash衝突問題。

      3、put方法

     public V put(K key, V value) {
       
    if (table == EMPTY_TABLE) { inflateTable(threshold); } if (key == null) return putForNullKey(value); int hash = hash(key); int i = indexFor(hash, table.length); for (Entry<K,V> e = table[i]; e != null; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || key.equals(k))) { V oldValue = e.value; e.value = value; e.recordAccess(this); return oldValue; } } modCount++; addEntry(hash, key, value, i); return null; }

      3.1、EMPTY_TABLE是HashMap中的一個靜態的空的Entry數組,table也是HashMap的一個屬性,默認就是EMPTY_TABLE(這兩句可參見上面源碼),table就是我們真正數據存儲使用的。
      3.2、前面提及,無參構造的時候,並未真正完成對HashMap的初始化new操作,而僅僅只是設置幾個常量,所以在第一次put數據的時候,table是空的。則會進入下面的初始化table方法中。

    if (table == EMPTY_TABLE) {
        inflateTable(threshold);
    }
    
    private void inflateTable(int toSize) {
        // Find a power of 2 >= toSize
        int capacity = roundUpToPowerOf2(toSize);
    
        threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1); //計算加載因子,默認情況下結果為12
        table = new Entry[capacity];  //真正的初始化table數組
        initHashSeedAsNeeded(capacity);
    }

      3.3、key的null判斷

    if (key == null)
        return putForNullKey(value);
    
    private V putForNullKey(V value) {
        for (Entry<K,V> e = table[0]; e != null; e = e.next) {
            if (e.key == null) {
                V oldValue = e.value;
                e.value = value;
                e.recordAccess(this);
                return oldValue;
            }
        }
        modCount++;
        addEntry(0, null, value, 0);
        return null;
    }
    
    void addEntry(int hash, K key, V value, int bucketIndex) {
        if ((size >= threshold) && (null != table[bucketIndex])) {
            resize(2 * table.length);
            hash = (null != key) ? hash(key) : 0;
            bucketIndex = indexFor(hash, table.length);
        }
    
        createEntry(hash, key, value, bucketIndex);
    }
    
    void createEntry(int hash, K key, V value, int bucketIndex) {
        Entry<K,V> e = table[bucketIndex];
        table[bucketIndex] = new Entry<>(hash, key, value, e);
        size++;
    }

      具體步驟解析:

        1、key為null,取出table[0]的鏈表結構Enrty,如果取出的元素不為null,則對其進行循環遍歷,查找其中是否存在key為null的節點元素。

           2、如果存在key == null的節點,則使用新的value去更新節點的oldValue,並且將oldValue返回。

        3、如果不存在key == null的元素,則執行新增元素addEntry方法:

          (1)判斷是否需要擴容,size為當前數組table中,已存放的Entry鏈表個數,更直接點說,就是map.size()方法的返回值。threshold上面的真正初始化HashMap的時候已經提到,默認情況下,計算得到 threshold=12。若同時滿足  (size >= threshold) && (null != table[bucketIndex]) ,則對map進行2倍的擴容,然後對key進行重新計算hash值和新的數組下標。

          (2)創建新的節點原色createEntry方法,首先獲取table數組中下標為bucketIndex的鏈表的表頭元素,然後新建個Entry作為新的表頭,並且新表頭其中的next指向老的表頭數據。

      3.4、key不為null的存儲  
        原理以及過程上通key==null的大體相同,只不過,key==null的時候,固定是獲取table[0]的鏈表進行操作,而在不為key != null的時候,下標位置是通過
      int hash = hash(key); int i = indexFor(hash, table.length); 計算得到的

      static int indexFor(int h, int length) {
            // assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2";
            return h & (length-1);
        }

      很清晰的就能看明白,先計算key的hash,然後與當前table的長度進行相與,這樣計算得到待存放數據的下標。得到下標后,過程就與key==null一致了,遍歷是否存在,存在則更新並返回oldVlaue,不存在則新建Entry。

      4、get方法

     public V get(Object key) {
            if (key == null)
                return getForNullKey();
            Entry<K,V> entry = getEntry(key);
    
            return null == entry ? null : entry.getValue();
        }
        如果key == null,則調用getForNullKey方法,遍歷table[0]處的鏈表。
    private V getForNullKey() {
            if (size == 0) {
                return null;
            }
            for (Entry<K,V> e = table[0]; e != null; e = e.next) {
                if (e.key == null)
                    return e.value;
            }
            return null;
        }

      如果key != null,則調用getEntry,根據key計算得到在table數組中的下標,獲取鏈表Entry,然後遍歷查找元素,key相等,則返回該節點元素。

     final Entry<K,V> getEntry(Object key) {
            if (size == 0) {
                return null;
            }
    
            int hash = (key == null) ? 0 : hash(key);
            for (Entry<K,V> e = table[indexFor(hash, table.length)];
                 e != null;
                 e = e.next) {
                Object k;
                if (e.hash == hash &&
                    ((k = e.key) == key || (key != null && key.equals(k))))
                    return e;
            }
            return null;
        }

    四、線程不安全分析

      上述,主要淺析了下HashMap的存取過程,HashMap的線程安全性問題主要也就是在上述的擴容resize方法上,下面來看看在高併發下,擴容后,是如何引起100%問題的。

      1、在進行新元素 put 的時候,這在上面中的3.3的代碼片段中可以查看,addEntry 添加新節點的時候,會計算是否需要擴容處理:(size >= threshold) && (null != table[bucketIndex]) 。

      2、如果擴容的話,會接下來調用 resize 方法

     void resize(int newCapacity) {
            Entry[] oldTable = table;
            int oldCapacity = oldTable.length;
            if (oldCapacity == MAXIMUM_CAPACITY) {
                threshold = Integer.MAX_VALUE;
                return;
            }
    
            Entry[] newTable = new Entry[newCapacity];
            //關鍵性代碼,構建新hashmap並將老的數據移動過來
            transfer(newTable, initHashSeedAsNeeded(newCapacity));
            table = newTable;
            threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
        }

      3、其中,出現100%問題的關鍵就是上面的 transfer 方法,新建hashmap移動複製老數據

     1  void transfer(Entry[] newTable, boolean rehash) {
     2         int newCapacity = newTable.length;
     3         for (Entry<K,V> e : table) {
     4             // 遍歷老的HashMap,當遇到不為空的節點的是,進入移動方法
     5             while(null != e) {
     6                 // 首先創建個Entry節點 指向該節點所在鏈表的下一個節點數據
     7                 Entry<K,V> next = e.next;
     8                 if (rehash) {
     9                     e.hash = null == e.key ? 0 : hash(e.key);
    10                 }
    11               // 計算老的數據在新Hashmap中的下標位置
    12                 int i = indexFor(e.hash, newCapacity);
    13              // 將新HashMap中相應位置的元素,掛載到老數據的後面(不管有無數據)
    14                 e.next = newTable[i];
    15                 // 將新HashMap中相應位置指向上面已經成功掛載新數據的老數據
    16              newTable[i] = e;
    17              // 移動到鏈表節點中的下一個數據,繼續複製節點
    18                 e = next;
    19             }
    20         }
    21     }    

      問題的關鍵就在上述的14、15行上,這兩行的動作,在高併發下可能就會造成循環鏈表,循環鏈表在等待下一個嘗試 get 獲取數據的時候,就悲劇了。下面舉例模擬說說這個過程:

      (1)假設目前某個位置的鏈表存儲結構為 A -> B -> C,有兩個線程同時進行擴容操作

      (2)線程1執行到第7行 Entry<K,V> next = e.next; 的時候被掛起了,此時,線程1的 e 指向 A , next 指向的是 B

      (3)線程2執行完成了整個的擴容過程,那麼此時的鏈表結構應該是變為了 C -> B -> A

      (4)線程1喚醒繼續執行,而需要操作的鏈表實際就變成了了上述線程2完成后的 C ->B -> A,下面分為幾步去完成整個操作:

          第一次循環:

            (i)執行 e.next = newTable[i] ,將 A 的 next 指向線程1的新的HashMap,由於此時無數據,所以 e.next = null

            (ii)執行 newTable[i] = e,將線程1的新的HashMap的第一個元素指向 A 

            (iii)執行e = next,移動到鏈表中的下一個元素,也就是上面的(2)中的 線程掛起的時候的 B

          第二次循環:

            (i)執行 Entry<K,V> next = e.next,此時的 e 指向 B,next指向 A

            (ii)執行 e.next = newTable[i] ,將 B 的 next 指向線程1的新的HashMap,由於此時有數據A,所以 e.next = A

            (iii)執行 newTable[i] = e,將線程1的新的HashMap的第一個元素指向 B,此時線程1的新Hashmap鏈表結構為B -> A

            (iiii)執行e = next,移動到鏈表中的下一個元素 A

          第三次循環:

            (i)執行 Entry<K,V> next = e.next,此時的 e 指向 A,next指向 null

            (ii)執行 e.next = newTable[i] ,將 A 的 next 指向線程1的新的HashMap,由於此時有數據B,所以 e.next = B

            (iii)執行 newTable[i] = e,將線程1的新的HashMap的第一個元素指向 A ,此時線程1的新Hashmap鏈表結構為 A -> B -> A

            (iiii)執行e = next,移動到鏈表中的下一個元素,已移動到鏈表結尾,結束 while 循環,完成鏈表的轉移。

      (5)上述過程中,很顯然的,最終的鏈表結構中,出現了 A -> B -> A 的循環結構。擴容完成了,剩下的等待的是get獲取的時候, getEntry 方法中 for循環e = e.next中就永遠出不來了。

      注意:擴容過程中,newTable是每個擴容線程獨有的,共享的只是每個Entry節點數據,最終的擴容是會調用 table = newTable 賦值操作完成。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
    【其他文章推薦】

    ※帶您來了解什麼是 USB CONNECTOR  ?

    ※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

    ※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

    ※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

    ※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

  • 9012年了,再不會Https就老了

    9012年了,再不會Https就老了

    前言  

           合格的web後端程序員,在開發之外,必須給IIS、Nginx、各種原生web服務器上配置Https,不然你就僅僅是個碼農,本博最近將專題記錄

    • 如何為IIS,Nginx配置Https

    • 如何申請適用於生產的免費SSL證書

    本博客小試牛刀,先實操在Nginx for Docker上添加自簽名SSL證書

    為啥先倒騰自簽名SSL證書,申請公網SSL證書需要公網可識別的域名或者公網IP;

    如果有實際SSL證書, 按照本文替換即可。

    手繪Https原理

    長話短說:  目前常見的Http請求明文傳輸, 報文可被截取並篡改,請求可被偽造; 

    因此基於常見HTTP(HTTP-TCP-IP)協議棧引入SSL/TSL(Transport Secure Layer) ,HTTPS在進行加密傳輸之前會進行一次握手,確定傳輸密鑰。

    流程解讀:

    ① 傳輸密鑰是對稱密鑰,用於雙方對傳輸數據的加解密

    ② 怎麼在傳輸之前確立傳輸密鑰呢? 針對普遍的多客戶端訪問受信web服務器的場景, 提出非對稱密鑰(公鑰存於客戶端,私鑰存於web服務器),雙方能互相加解密,說明中間數據(傳輸密鑰)沒被篡改。

    ③ 再拋出疑問,怎麼認定下發的公鑰是這個web服務器匹配的密鑰?怎麼確定這個公鑰下發過程沒被截取篡改? 這就是追溯到握手階段的下發證書過程,瀏覽器內置的CA機構認定該證書是其有效下發,並通過簽名認定該證書沒被篡改,最終認定該 證書下發的公鑰是受信web服務器準確下發。

    ④ 如果面向面試記憶Https原理,恐怕有些難度,所以個人用一種 【雞生蛋還是蛋生雞】的方式向上追溯流程, 方便大家知其然更知其所以然。

    前置準備

     >   CentOS機器上安裝Docker、 Docker-Compose

     >   常規操作構建 Nginx for Docker網站, 項目結構如下: 

    ssl-docker-nginx
        ├── docker-compose.yml
        ├── nginx   
        │      └── nginx.conf
        └── site
               └── index.html

        該項目將會使用  nginx/nginx.conf、site/index.html替換Nginx鏡像默認配置文件和默認啟動頁,docker-compose.yml 如下:

    version: '2'
    services:
      server:
        image: nginx:latest
        volumes:
          - ./nginx/nginx.conf:/etc/nginx/nginx.conf
          - ./site:/usr/share/nginx/html
        ports:
        - "8080:80"

        docker-compose  up -d 啟動Nginx容器,還是那樣熟悉的味道: 【chrome默認將http連接認定為不安全

    添加SSL自簽名證書

    很明顯: web服務器需要存儲證書(內置了公鑰)和私鑰

     ① 創建自簽名證書 (什麼叫自簽名,就是自己給自己頒發 SSL證書)

    [nodotnet@gs-server-5809 ssl-docker-nginx]$ openssl req -newkey rsa:2048 -nodes -keyout nginx/my-site.com.key -x509 -days 365out nginx/my-site.com.crt

    req是證書請求的子命令,-newkey rsa:2048 -keyout nginx/my-site.com.key表示生成私鑰(PKCS8格式),

              -nodes 表示私鑰不加密,

               -x509表示輸出證書,-days365 為有效期,此後根據提示輸入證書擁有者信息;

           之後會在nginx目錄下生產2個文件, 分別是私鑰、證書

     ② 將證書和密鑰掛載到Nginx Image, 修改docker-compose.yml

    version: '2'
    services:
      server:
        image: nginx:latest
        volumes:
          - ./nginx/nginx.conf:/etc/nginx/nginx.conf
          - ./site:/usr/share/nginx/html
          - ./nginx/my-site.com.crt:/etc/nginx/my-site.com.crt    #新行 - ./nginx/my-site.com.key:/etc/nginx/my-site.com.key    #新行
        ports:
        - "8080:80"
        - "443:443"        // 容器開啟HTTPS默認的443端口

     ③  修改nginx/nginx.conf,接受Https請求

    events {
      worker_connections  4096;  ## Default: 1024
    }
    
    http {
        server {
            listen 80;
            root         /usr/share/nginx/html/;
        }
    
        server {            # 新Server接受來自443端口的Https請求
            listen              443 ssl;
            ssl_certificate     /etc/nginx/my-site.com.crt;
            ssl_certificate_key /etc/nginx/my-site.com.key;
            root        /usr/share/nginx/html;
        }
    }

    執行docker-compose down && docker-compose up -d 發起https://10.201.80.126:443請求,當前自簽名證書頒發機構不在瀏覽器內置的CA機構,所以該證書目前被瀏覽器認為是無效。

    理論上將 該自簽名證書導出,之後在 【chrome瀏覽器】-【高級設置】-【管理證書】中導入該證書,即可讓 chrome接受自簽名SSL證書。

    That‘s All,  Https作為以後web的主流配置,碼農進階資深必須掌握;後續會記錄Https & HSTS, 申請免費SSL證書,盡請關注。

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※為什麼 USB CONNECTOR 是電子產業重要的元件?

    網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

    ※想要讓你的商品成為最夯、最多人討論的話題?網頁設計公司讓你強力曝光

    ※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

  • 如何高效的學習技術

    如何高效的學習技術

      我們相信努力學習一定會有收穫,但是方法不當,既讓人身心疲憊,也沒有切實的回報。高中時代,我的同桌是個漂亮女同學。她的物理成績很差,雖然她非常勤奮的學習,但成績總是不理想。為了鞏固純潔的同學關係,我親密無間地輔導她的物理,發現她不知道題目考什麼。我們的教科書與試題都圍繞着考試大綱展開,看到一道題,應該先想想它在考哪些定理和公式的運用。
      不少朋友每天都閱讀技術文章,但是第二天就忘乾淨了。工作中領導和同事都認可你的溝通和技術能力,但是跳槽面試卻屢屢碰壁。面試官問技術方案,明明心裏清楚,用嘴說出來卻前言不搭后語。面試官再問底層算法,你說看過但是忘記了。他不在乎你看沒看過,答不上就是零分。正如男女相親,男方談吐瀟洒才能吸引姑娘。可是男方緊張了,平時挺能說,關鍵時候卻支支吾吾,姑娘必然認為他不行。人生充滿了許多考試,有形的和無形的,每次考試的機會只有一次。
      工作五年十年後,別人成了架構師,自己還在基層打滾,原因是什麼?職場上無法成功升遷的原因有很多,沒有持續學習、學習效果不好、無法通過心儀公司的的面試,一定是很重要的原因。
      把自己當成一台計算機,既有輸入,也要有輸出,用輸出倒逼輸入

      近些年誕生了許多新技術,比如最時髦的AI(目前還在智障階段),數學基礎是初中就接觸過的概率統計。萬丈高樓從地起,不要被新工具或者中間件迷住雙眼,一味地追新求快。基礎知識是所有技術的基石,在未來很長的時間都不會變化,應該花費足夠的時間鞏固基礎。
      以數據結構和算法為例,大家閱讀一下Java的BitSet的源碼,裏面有大量的移位操作,移位運算掌握的好,看這份源碼就沒問題。Java同步工具類AQS用到了雙向鏈表,鏈表知識不過關,肯定搞不懂它的原理。互聯網大廠都喜歡考算法,為了通過面試也要精通算法。
      以Java工程師應該掌握的知識為例,按重要程度排出六個梯度:

    • 第一梯度:計算機組成原理、數據結構和算法、網絡通信原理、操作系統原理;
    • 第二梯度:Java基礎、JVM內存模型和GC算法、JVM性能調優、JDK工具、設計模式;
    • 第三梯度:Spring系列、Mybatis、Dubbo等主流框架的運用和原理;
    • 第四梯度:MySQL(含SQL編程)、Redis、RabbitMQ/RocketMQ/Kafka、ZooKeeper等數據庫或者中間件的運用和原理;
    • 第五梯度:CAP理論、BASE理論、Paxos和Raft算法等其他分佈式理論;
    • 第六梯度:容器化、大數據、AI、區塊鏈等等前沿技術理論;

    有同學認為第五梯度應該在移到第一梯度。其實很多小公司的日活犹如古天樂一樣平平無奇,離大型分佈式架構還遠得很。學習框架和中間件的時候,順手掌握分佈式理論,效果更好。

      許多公司的招聘JD沒有設定技術人員年齡門檻,但是會加上一句“具備與年齡相當的知識的廣度與深度”。多廣才算廣,多深才算深?這是很主觀的話題,這裏不展開討論。
      如何變得更廣更深呢?突破收入上升的瓶頸,發掘自己真正的興趣
      大多數人只是公司的普通職員,收入上升的瓶頸就是升職加薪。許多IT公司會對技術人員有個評級,如果你的評級不高,那就依照晉級章程努力升級。如果你在一個小公司,收入一般,發展前景不明,準備大廠的面試就是最好的學習過程。在這些過程中,你必然學習更多知識,變得更廣更深。
      個人興趣是前進的動力之一,許多知名開源項目都源於作者的興趣。個人興趣並不局限技術領域,可以是其他學科。我有個朋友喜歡玩山地自行車,還給一些做自行車話題的自媒體投稿。久而久之,居然能夠寫一手好文章了,我相信他也能寫好技術文檔。

      哲學不是故作高深的學科,它的現實意義就是解決問題。年輕小伙是怎麼泡妞的?三天兩頭花不斷,大庭廣眾跪求愛。這類套路為什麼總是能成功呢?禮物滿足女人的物慾,當眾求愛滿足女人的虛榮心,投其所好。食堂大媽打菜的手越來越抖,辣子雞丁變成辣子辣丁,為什麼呢?食堂要控製成本,直接提價會惹眾怒。
      科學上的哲學,一般指研究事物發展的規律,歸納終極的解決方案。軟件行業充滿哲學味道的作品非常多,比如。舉個例子,當軟件系統遇到性能問題,嘗試下面兩種哲學思想提升性能:

    • 空間換時間:比如引入緩存,消耗額外的存儲提高響應速度。
    • 時間換空間:比如大文件的分片處理,分段處理后再匯總結果。

    設計穩健高可用的系統,嘗試從三個方面考慮問題:

    • 存儲:數據會丟失嗎,數據一致性怎麼解決。
    • 計算:計算怎麼擴容,應用允許任意增加節點嗎。
    • 傳輸:網絡中斷或擁塞怎麼辦。

    從無數的失敗或者成功的經驗中,總結出高度概括性的方案,讓我們下一步做的更好。

      英語是極為重要的基礎,學好英語與掌握編程語言一樣重要。且不說外企對英語的要求,許多知名博客就是把英文翻譯成中文,充當知識的搬運工。如果英語足夠好,直接閱讀一手英語資料,避免他人翻譯存在的謬誤。

      體系化的知識比零散的更容易記憶和理解,這正如一部好的電視劇,劇情環環相扣才能吸引觀眾。建議大家使用思維導圖羅列知識點,構建體繫結構,如下圖所示:

      高中是我們知識的巔峰時刻,每周小考每月大考,教輔資料堆成山,地獄式的反覆操練強化記憶。複習是對抗遺忘的唯一辦法。大腦的遺忘是有規律的,先快后慢。一天後,學到的知識只剩下原來的25%,甚至更低。隨着時間的推移,遺忘的速度減慢,遺忘的數量也就減少。

    時間間隔 記憶量
    剛看完 100%
    20分鐘后 60%
    1小時后 40%
    1天後 30%
    2天後 27%

    每個人的遺忘程度都不一樣,建議第二天複習前一天的內容,七天後複習這段時間的所有內容。

      不少朋友利用碎片時間學習,比如在公交上看公眾號的推送。其實我們都高估了自己的抗干擾能力,如果處在嘈雜的環境,注意力容易被打斷,記憶留存度也很低。碎片時間適合學習簡單孤立的知識點,比如鏈表的定義與實現。
      學習複雜的知識,需要大段的連續時間。圖書館是個好地方,安靜氛圍好。手機放一邊,不要理會QQ微信,最好閱讀紙質書,泡上一整天。有些城市出現了付費自習室,提供格子間、茶水等等,也是非常好的選擇。

      從下面這張圖我們可以看到,教授他人是知識留存率最高的方式。

      準備PPT和演講內容,給同事來一場技術分享。不光複習知識,還鍛煉口才。曾經有個同事說話又快又急,口頭禪也多,比如”對吧、是不是”,別人經常聽不清,但是他本人不以為然。領導讓他做了幾次技術分享,聽眾的反應可想而知,他才徹底認清缺點。
      堅持寫技術博客,別在意你寫的東西在網上已經重複千百遍。當自己動手的時候,才會意識到眼高手低。讓文章讀起來流暢清晰,需要嘔心瀝血的刪改。寫作是對大腦的長期考驗,想不到肯定寫不出,想不清楚肯定寫不清楚。

    我們經常說不要重複造輪子。為了開發效率,可以不造輪子,但是必須具備造輪子的能力。建議造一個簡單的MQ,你能用到通信協議、設計模式、隊列等許多知識。在造輪子的過程中,你會頻繁的翻閱各種手冊或者博客,這就是用輸出倒逼輸入

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

    ※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

    ※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

    ※評比南投搬家公司費用收費行情懶人包大公開

  • 5種常見Bean映射工具的性能比對

    5種常見Bean映射工具的性能比對

    本文由 JavaGuide 翻譯自 https://www.baeldung.com/java-performance-mapping-frameworks 。轉載請註明原文地址以及翻譯作者。

    1. 介紹

    創建由多個層組成的大型 Java 應用程序需要使用多種領域模型,如持久化模型、領域模型或者所謂的 DTO。為不同的應用程序層使用多個模型將要求我們提供 bean 之間的映射方法。手動執行此操作可以快速創建大量樣板代碼並消耗大量時間。幸運的是,Java 有多個對象映射框架。在本教程中,我們將比較最流行的 Java 映射框架的性能。

    綜合日常使用情況和相關測試數據,個人感覺 MapStruct、ModelMapper 這兩個 Bean 映射框架是最佳選擇。

    2. 常見 Bean 映射框架概覽

    2.1. Dozer

    Dozer 是一個映射框架,它使用遞歸將數據從一個對象複製到另一個對象。框架不僅能夠在 bean 之間複製屬性,還能夠在不同類型之間自動轉換。

    要使用 Dozer 框架,我們需要添加這樣的依賴到我們的項目:

    <dependency>
        <groupId>net.sf.dozer</groupId>
        <artifactId>dozer</artifactId>
        <version>5.5.1</version>
    </dependency>

    更多關於 Dozer 的內容可以在官方文檔中找到: http://dozer.sourceforge.net/documentation/gettingstarted.html ,或者你也可以閱讀這篇文章:https://www.baeldung.com/dozer 。

    2.2. Orika

    Orika 是一個 bean 到 bean 的映射框架,它遞歸地將數據從一個對象複製到另一個對象。

    Orika 的工作原理與 Dozer 相似。兩者之間的主要區別是 Orika 使用字節碼生成。這允許以最小的開銷生成更快的映射器。

    要使用 Orika 框架,我們需要添加這樣的依賴到我們的項目:

    <dependency>
        <groupId>ma.glasnost.orika</groupId>
        <artifactId>orika-core</artifactId>
        <version>1.5.2</version>
    </dependency>

    更多關於 Orika 的內容可以在官方文檔中找到:https://orika-mapper.github.io/orika-docs/,或者你也可以閱讀這篇文章:https://www.baeldung.com/orika-mapping。

    2.3. MapStruct

    MapStruct 是一個自動生成 bean mapper 類的代碼生成器。MapStruct 還能夠在不同的數據類型之間進行轉換。Github 地址:https://github.com/mapstruct/mapstruct。

    要使用 MapStruct 框架,我們需要添加這樣的依賴到我們的項目:

    <dependency>
        <groupId>org.mapstruct</groupId>
        <artifactId>mapstruct-processor</artifactId>
        <version>1.2.0.Final</version>
    </dependency>

    更多關於 MapStruct 的內容可以在官方文檔中找到:https://mapstruct.org/,或者你也可以閱讀這篇文章:https://www.baeldung.com/mapstruct。

    要使用 MapStruct 框架,我們需要添加這樣的依賴到我們的項目:

    <dependency>
        <groupId>org.mapstruct</groupId>
        <artifactId>mapstruct-processor</artifactId>
        <version>1.2.0.Final</version>
    </dependency>

    2.4. ModelMapper

    ModelMapper 是一個旨在簡化對象映射的框架,它根據約定確定對象之間的映射方式。它提供了類型安全的和重構安全的 API。

    更多關於 ModelMapper 的內容可以在官方文檔中找到:http://modelmapper.org/ 。

    要使用 ModelMapper 框架,我們需要添加這樣的依賴到我們的項目:

    <dependency>
      <groupId>org.modelmapper</groupId>
      <artifactId>modelmapper</artifactId>
      <version>1.1.0</version>
    </dependency>

    2.5. JMapper

    JMapper 是一個映射框架,旨在提供易於使用的、高性能的 Java bean 之間的映射。該框架旨在使用註釋和關係映射應用 DRY 原則。該框架允許不同的配置方式:基於註釋、XML 或基於 api。

    更多關於 JMapper 的內容可以在官方文檔中找到:https://github.com/jmapper-framework/jmapper-core/wiki。

    要使用 JMapper 框架,我們需要添加這樣的依賴到我們的項目:

    <dependency>
        <groupId>com.googlecode.jmapper-framework</groupId>
        <artifactId>jmapper-core</artifactId>
        <version>1.6.0.1</version>
    </dependency>
    

    3.測試模型

    為了能夠正確地測試映射,我們需要有一個源和目標模型。我們已經創建了兩個測試模型。

    第一個是一個只有一個字符串字段的簡單 POJO,它允許我們在更簡單的情況下比較框架,並檢查如果我們使用更複雜的 bean 是否會發生任何變化。

    簡單的源模型如下:

    public class SourceCode {
        String code;
        // getter and setter
    }
    

    它的目標也很相似:

    public class DestinationCode {
        String code;
        // getter and setter
    }

    源 bean 的實際示例如下:

    public class SourceOrder {
        private String orderFinishDate;
        private PaymentType paymentType;
        private Discount discount;
        private DeliveryData deliveryData;
        private User orderingUser;
        private List<Product> orderedProducts;
        private Shop offeringShop;
        private int orderId;
        private OrderStatus status;
        private LocalDate orderDate;
        // standard getters and setters
    }

    目標類如下圖所示:

    public class Order {
        private User orderingUser;
        private List<Product> orderedProducts;
        private OrderStatus orderStatus;
        private LocalDate orderDate;
        private LocalDate orderFinishDate;
        private PaymentType paymentType;
        private Discount discount;
        private int shopId;
        private DeliveryData deliveryData;
        private Shop offeringShop;
        // standard getters and setters
    }

    整個模型結構可以在這裏找到:https://github.com/eugenp/tutorials/tree/master/performance-tests/src/main/java/com/baeldung/performancetests/model/source。

    4. 轉換器

    為了簡化測試設置的設計,我們創建了如下所示的轉換器接口:

    public interface Converter {
        Order convert(SourceOrder sourceOrder);
        DestinationCode convert(SourceCode sourceCode);
    }

    我們所有的自定義映射器都將實現這個接口。

    4.1. OrikaConverter

    Orika 支持完整的 API 實現,這大大簡化了 mapper 的創建:

    public class OrikaConverter implements Converter{
        private MapperFacade mapperFacade;
    
        public OrikaConverter() {
            MapperFactory mapperFactory = new DefaultMapperFactory
              .Builder().build();
    
            mapperFactory.classMap(Order.class, SourceOrder.class)
              .field("orderStatus", "status").byDefault().register();
            mapperFacade = mapperFactory.getMapperFacade();
        }
    
        @Override
        public Order convert(SourceOrder sourceOrder) {
            return mapperFacade.map(sourceOrder, Order.class);
        }
    
        @Override
        public DestinationCode convert(SourceCode sourceCode) {
            return mapperFacade.map(sourceCode, DestinationCode.class);
        }
    }

    4.2. DozerConverter

    Dozer 需要 XML 映射文件,有以下幾個部分:

    <mappings xmlns="http://dozer.sourceforge.net"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://dozer.sourceforge.net
      http://dozer.sourceforge.net/schema/beanmapping.xsd">
    
        <mapping>
            <class-a>com.baeldung.performancetests.model.source.SourceOrder</class-a>
            <class-b>com.baeldung.performancetests.model.destination.Order</class-b>
            <field>
                <a>status</a>
                <b>orderStatus</b>
            </field>
        </mapping>
        <mapping>
            <class-a>com.baeldung.performancetests.model.source.SourceCode</class-a>
            <class-b>com.baeldung.performancetests.model.destination.DestinationCode</class-b>
        </mapping>
    </mappings>

    定義了 XML 映射后,我們可以從代碼中使用它:

    public class DozerConverter implements Converter {
        private final Mapper mapper;
    
        public DozerConverter() {
            DozerBeanMapper mapper = new DozerBeanMapper();
            mapper.addMapping(
              DozerConverter.class.getResourceAsStream("/dozer-mapping.xml"));
            this.mapper = mapper;
        }
    
        @Override
        public Order convert(SourceOrder sourceOrder) {
            return mapper.map(sourceOrder,Order.class);
        }
    
        @Override
        public DestinationCode convert(SourceCode sourceCode) {
            return mapper.map(sourceCode, DestinationCode.class);
        }
    }

    4.3. MapStructConverter

    Map 結構的定義非常簡單,因為它完全基於代碼生成:

    @Mapper
    public interface MapStructConverter extends Converter {
        MapStructConverter MAPPER = Mappers.getMapper(MapStructConverter.class);
    
        @Mapping(source = "status", target = "orderStatus")
        @Override
        Order convert(SourceOrder sourceOrder);
    
        @Override
        DestinationCode convert(SourceCode sourceCode);
    }

    4.4. JMapperConverter

    JMapperConverter 需要做更多的工作。接口實現后:

    public class JMapperConverter implements Converter {
        JMapper realLifeMapper;
        JMapper simpleMapper;
    
        public JMapperConverter() {
            JMapperAPI api = new JMapperAPI()
              .add(JMapperAPI.mappedClass(Order.class));
            realLifeMapper = new JMapper(Order.class, SourceOrder.class, api);
            JMapperAPI simpleApi = new JMapperAPI()
              .add(JMapperAPI.mappedClass(DestinationCode.class));
            simpleMapper = new JMapper(
              DestinationCode.class, SourceCode.class, simpleApi);
        }
    
        @Override
        public Order convert(SourceOrder sourceOrder) {
            return (Order) realLifeMapper.getDestination(sourceOrder);
        }
    
        @Override
        public DestinationCode convert(SourceCode sourceCode) {
            return (DestinationCode) simpleMapper.getDestination(sourceCode);
        }
    }

    我們還需要向目標類的每個字段添加@JMap註釋。此外,JMapper 不能在 enum 類型之間轉換,它需要我們創建自定義映射函數:

    @JMapConversion(from = "paymentType", to = "paymentType")
    public PaymentType conversion(com.baeldung.performancetests.model.source.PaymentType type) {
        PaymentType paymentType = null;
        switch(type) {
            case CARD:
                paymentType = PaymentType.CARD;
                break;
    
            case CASH:
                paymentType = PaymentType.CASH;
                break;
    
            case TRANSFER:
                paymentType = PaymentType.TRANSFER;
                break;
        }
        return paymentType;
    }

    4.5. ModelMapperConverter

    ModelMapperConverter 只需要提供我們想要映射的類:

    public class ModelMapperConverter implements Converter {
        private ModelMapper modelMapper;
    
        public ModelMapperConverter() {
            modelMapper = new ModelMapper();
        }
    
        @Override
        public Order convert(SourceOrder sourceOrder) {
           return modelMapper.map(sourceOrder, Order.class);
        }
    
        @Override
        public DestinationCode convert(SourceCode sourceCode) {
            return modelMapper.map(sourceCode, DestinationCode.class);
        }
    }
    

    5. 簡單的模型測試

    對於性能測試,我們可以使用 Java Microbenchmark Harness,關於如何使用它的更多信息可以在 這篇文章:https://www.baeldung.com/java-microbenchmark-harness 中找到。

    我們為每個轉換器創建了一個單獨的基準測試,並將基準測試模式指定為 Mode.All。

    5.1. 平均時間

    對於平均運行時間,JMH 返回以下結果(越少越好):

    這個基準測試清楚地表明,MapStruct 和 JMapper 都有最佳的平均工作時間。

    5.2. 吞吐量

    在這種模式下,基準測試返回每秒的操作數。我們收到以下結果(越多越好):

    在吞吐量模式中,MapStruct 是測試框架中最快的,JMapper 緊隨其後。

    5.3. SingleShotTime

    這種模式允許測量單個操作從開始到結束的時間。基準給出了以下結果(越少越好):

    這裏,我們看到 JMapper 返回的結果比 MapStruct 好得多。

    5.4. 採樣時間

    這種模式允許對每個操作的時間進行採樣。三個不同百分位數的結果如下:

    所有的基準測試都表明,根據場景的不同,MapStruct 和 JMapper 都是不錯的選擇,儘管 MapStruct 對 SingleShotTime 給出的結果要差得多。

    6. 真實模型測試

    對於性能測試,我們可以使用 Java Microbenchmark Harness,關於如何使用它的更多信息可以在 這篇文章:https://www.baeldung.com/java-microbenchmark-harness 中找到。

    我們為每個轉換器創建了一個單獨的基準測試,並將基準測試模式指定為 Mode.All。

    6.1. 平均時間

    JMH 返回以下平均運行時間結果(越少越好):

    該基準清楚地表明,MapStruct 和 JMapper 均具有最佳的平均工作時間。

    6.2. 吞吐量

    在這種模式下,基準測試返回每秒的操作數。我們收到以下結果(越多越好):

    在吞吐量模式中,MapStruct 是測試框架中最快的,JMapper 緊隨其後。

    6.3. SingleShotTime

    這種模式允許測量單個操作從開始到結束的時間。基準給出了以下結果(越少越好):

    6.4. 採樣時間

    這種模式允許對每個操作的時間進行採樣。三個不同百分位數的結果如下:

    儘管簡單示例和實際示例的確切結果明顯不同,但是它們的趨勢相同。在哪種算法最快和哪種算法最慢方面,兩個示例都給出了相似的結果。

    6.5. 結論

    根據我們在本節中執行的真實模型測試,我們可以看出,最佳性能顯然屬於 MapStruct。在相同的測試中,我們看到 Dozer 始終位於結果表的底部。

    7. 總結

    在這篇文章中,我們已經進行了五個流行的 Java Bean 映射框架性能測試:ModelMapper MapStruct Orika ,Dozer, JMapper。

    示例代碼地址:https://github.com/eugenp/tutorials/tree/master/performance-tests。

    開源項目推薦

    作者的其他開源項目推薦:

    1. :【Java學習+面試指南】 一份涵蓋大部分Java程序員所需要掌握的核心知識。
    2. : 適合新手入門以及有經驗的開發人員查閱的 Spring Boot 教程(業餘時間維護中,歡迎一起維護)。
    3. : 我覺得技術人員應該有的一些好習慣!
    4. :從零入門 !Spring Security With JWT(含權限驗證)後端部分代碼。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
    【其他文章推薦】

    ※帶您來了解什麼是 USB CONNECTOR  ?

    ※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

    ※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

    ※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

    ※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

  • RocketMQ ACL使用指南

    RocketMQ ACL使用指南

    目錄

    @(本節目錄)

    1、什麼是ACL?

    ACL是access control list的簡稱,俗稱訪問控制列表。訪問控制,基本上會涉及到用戶、資源、權限、角色等概念,那在RocketMQ中上述會對應哪些對象呢?

    • 用戶
      用戶是訪問控制的基礎要素,也不難理解,RocketMQ ACL必然也會引入用戶的概念,即支持用戶名、密碼。
    • 資源
      資源,需要保護的對象,在RocketMQ中,消息發送涉及的Topic、消息消費涉及的消費組,應該進行保護,故可以抽象成資源。
    • 權限
      針對資源,能進行的操作,
    • 角色
      RocketMQ中,只定義兩種角色:是否是管理員。

    另外,RocketMQ還支持按照客戶端IP進行白名單設置。

    2、ACL基本流程圖

    在講解如何使用ACL之前,我們先簡單看一下RocketMQ ACL的請求流程:

    對於上述具體的實現,將在後續文章中重點講解,本文的目的只是希望給讀者一個大概的了解。

    3、如何配置ACL

    3.1 acl配置文件

    acl默認的配置文件名:plain_acl.yml,需要放在${ROCKETMQ_HOME}/store/config目錄下。下面對其配置項一一介紹。

    3.1.1 globalWhiteRemoteAddresses

    全局白名單,其類型為數組,即支持多個配置。其支持的配置格式如下:


    • 表示不設置白名單,該條規則默認返回false。
    • “*”
      表示全部匹配,該條規則直接返回true,將會阻斷其他規則的判斷,請慎重使用。
    • 192.168.0.{100,101}
      多地址配置模式,ip地址的最後一組,使用{},大括號中多個ip地址,用英文逗號(,)隔開。
    • 192.168.1.100,192.168.2.100
      直接使用,分隔,配置多個ip地址。
    • 192.168..或192.168.100-200.10-20
      每個IP段使用 “*” 或”-“表示範圍。

    3.1.2 accounts

    配置用戶信息,該類型為數組類型。擁有accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms子元素。

    3.1.2.1 accessKey

    登錄用戶名,長度必須大於6個字符。

    3.1.2.2 secretKey

    登錄密碼。長度必須大於6個字符。

    3.1.2.3 whiteRemoteAddress

    用戶級別的IP地址白名單。其類型為一個字符串,其配置規則與globalWhiteRemoteAddresses,但只能配置一條規則。

    3.1.2.4 admin

    boolean類型,設置是否是admin。如下權限只有admin=true時才有權限執行。

    • UPDATE_AND_CREATE_TOPIC
      更新或創建主題。
    • UPDATE_BROKER_CONFIG
      更新Broker配置。
    • DELETE_TOPIC_IN_BROKER
      刪除主題。
    • UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
      更新或創建訂閱組信息。
    • DELETE_SUBSCRIPTIONGROUP
      刪除訂閱組信息。
    3.1.2.5 defaultTopicPerm

    默認topic權限。該值默認為DENY(拒絕)。

    3.1.2.6 defaultGroupPerm

    默認消費組權限,該值默認為DENY(拒絕),建議值為SUB。

    3.1.2.7 topicPerms

    設置topic的權限。其類型為數組,其可選擇值在下節介紹。

    3.1.2.8 groupPerms

    設置消費組的權限。其類型為數組,其可選擇值在下節介紹。可以為每一消費組配置不一樣的權限。

    3.2 RocketMQ ACL權限可選值

    • DENY
      拒絕。
    • PUB
      擁有發送權限。
    • SUB
      擁有訂閱權限。

    3.3、權限驗證流程

    上面定義了全局白名單、用戶級別的白名單,用戶級別的權限,為了更好的配置ACL權限規則,下面給出權限匹配邏輯。

    4、使用示例

    4.1 Broker端安裝

    首先,需要在broker.conf文件中,增加參數aclEnable=true。並拷貝distribution/conf/plain_acl.yml文件到${ROCKETMQ_HOME}/conf目錄。

    broker.conf的配置文件如下:

    brokerClusterName = DefaultCluster
    brokerName = broker-b
    brokerId = 0
    deleteWhen = 04
    fileReservedTime = 48
    brokerRole = ASYNC_MASTER
    flushDiskType = ASYNC_FLUSH
    listenPort=10915
    storePathRootDir=E:/SH2019/tmp/rocketmq_home/rocketmq4.5MB/store
    storePathCommitLog=E:/SH2019/tmp/rocketmq_home/rocketmq4.5MB/store/commitlog
    namesrvAddr=127.0.0.1:9876
    autoCreateTopicEnable=false
    aclEnable=true

    plain_acl.yml文件內容如下:

    globalWhiteRemoteAddresses:
    
    accounts:
    - accessKey: RocketMQ
      secretKey: 12345678
      whiteRemoteAddress:
      admin: false
      defaultTopicPerm: DENY
      defaultGroupPerm: SUB
      topicPerms:
      - TopicTest=PUB
      groupPerms:
      # the group should convert to retry topic
      - oms_consumer_group=DENY
    
    - accessKey: admin
      secretKey: 12345678
      whiteRemoteAddress:
      # if it is admin, it could access all resources
      admin: true

    從上面的配置可知,用戶RocketMQ只能發送TopicTest的消息,其他topic無權限發送;拒絕oms_consumer_group消費組的消息消費,其他消費組默認可消費。

    4.2 消息發送端示例

    public class AclProducer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name", getAclRPCHook());
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
            for (int i = 0; i < 1; i++) {
                try {
                    Message msg = new Message("TopicTest3" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
            producer.shutdown();
        }
    
        static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));
        }
    }

    運行效果如圖所示:

    4.3 消息消費端示例

    public class AclConsumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4", getAclRPCHook(),new AllocateMessageQueueAveragely());
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.subscribe("TopicTest", "*");
            consumer.setNamesrvAddr("127.0.0.1:9876");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    
        static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));
        }
    }

    發現並不沒有消費消息,符合預期。

    關於RocketMQ ACL的使用就介紹到這裏了,下一篇將介紹RocketMQ ACL實現原理。

    推薦閱讀:
    1、

    2、

    3、

    4、

    作者介紹:
    丁威,《RocketMQ技術內幕》作者,RocketMQ 社區佈道師,公眾號: 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※為什麼 USB CONNECTOR 是電子產業重要的元件?

    網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

    ※想要讓你的商品成為最夯、最多人討論的話題?網頁設計公司讓你強力曝光

    ※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

  • 詳解Kafka Producer

    詳解Kafka Producer

    上一篇文章我們主要介紹了什麼是 Kafka,Kafka 的基本概念是什麼,Kafka 單機和集群版的搭建,以及對基本的配置文件進行了大致的介紹,還對 Kafka 的幾個主要角色進行了描述,我們知道,不管是把 Kafka 用作消息隊列、消息總線還是數據存儲平台來使用,最終是繞不過消息這個詞的,這也是 Kafka 最最核心的內容,Kafka 的消息從哪裡來?到哪裡去?都干什麼了?別著急,一步一步來,先說說 Kafka 的消息從哪來。

    生產者概述

    在 Kafka 中,我們把產生消息的那一方稱為生產者,比如我們經常回去淘寶購物,你打開淘寶的那一刻,你的登陸信息,登陸次數都會作為消息傳輸到 Kafka 後台,當你瀏覽購物的時候,你的瀏覽信息,你的搜索指數,你的購物愛好都會作為一個個消息傳遞給 Kafka 後台,然後淘寶會根據你的愛好做智能推薦,致使你的錢包從來都禁不住誘惑,那麼這些生產者產生的消息是怎麼傳到 Kafka 應用程序的呢?發送過程是怎麼樣的呢?

    儘管消息的產生非常簡單,但是消息的發送過程還是比較複雜的,如圖

    我們從創建一個ProducerRecord 對象開始,ProducerRecord 是 Kafka 中的一個核心類,它代表了一組 Kafka 需要發送的 key/value 鍵值對,它由記錄要發送到的主題名稱(Topic Name),可選的分區號(Partition Number)以及可選的鍵值對構成。

    在發送 ProducerRecord 時,我們需要將鍵值對對象由序列化器轉換為字節數組,這樣它們才能夠在網絡上傳輸。然後消息到達了分區器。

    如果發送過程中指定了有效的分區號,那麼在發送記錄時將使用該分區。如果發送過程中未指定分區,則將使用key 的 hash 函數映射指定一個分區。如果發送的過程中既沒有分區號也沒有,則將以循環的方式分配一個分區。選好分區后,生產者就知道向哪個主題和分區發送數據了。

    ProducerRecord 還有關聯的時間戳,如果用戶沒有提供時間戳,那麼生產者將會在記錄中使用當前的時間作為時間戳。Kafka 最終使用的時間戳取決於 topic 主題配置的時間戳類型。

    • 如果將主題配置為使用 CreateTime,則生產者記錄中的時間戳將由 broker 使用。
    • 如果將主題配置為使用LogAppendTime,則生產者記錄中的時間戳在將消息添加到其日誌中時,將由 broker 重寫。

    然後,這條消息被存放在一個記錄批次里,這個批次里的所有消息會被發送到相同的主題和分區上。由一個獨立的線程負責把它們發到 Kafka Broker 上。

    Kafka Broker 在收到消息時會返回一個響應,如果寫入成功,會返回一個 RecordMetaData 對象,它包含了主題和分區信息,以及記錄在分區里的偏移量,上面兩種的時間戳類型也會返回給用戶。如果寫入失敗,會返回一個錯誤。生產者在收到錯誤之後會嘗試重新發送消息,幾次之後如果還是失敗的話,就返回錯誤消息。

    創建 Kafka 生產者

    要往 Kafka 寫入消息,首先需要創建一個生產者對象,並設置一些屬性。Kafka 生產者有3個必選的屬性

    • bootstrap.servers

    該屬性指定 broker 的地址清單,地址的格式為 host:port。清單里不需要包含所有的 broker 地址,生產者會從給定的 broker 里查找到其他的 broker 信息。不過建議至少要提供兩個 broker 信息,一旦其中一個宕機,生產者仍然能夠連接到集群上。

    • key.serializer

    broker 需要接收到序列化之後的 key/value值,所以生產者發送的消息需要經過序列化之後才傳遞給 Kafka Broker。生產者需要知道採用何種方式把 Java 對象轉換為字節數組。key.serializer 必須被設置為一個實現了org.apache.kafka.common.serialization.Serializer 接口的類,生產者會使用這個類把鍵對象序列化為字節數組。這裏拓展一下 Serializer 類

    Serializer 是一個接口,它表示類將會採用何種方式序列化,它的作用是把對象轉換為字節,實現了 Serializer 接口的類主要有 ByteArraySerializerStringSerializerIntegerSerializer ,其中 ByteArraySerialize 是 Kafka 默認使用的序列化器,其他的序列化器還有很多,你可以通過 查看其他序列化器。要注意的一點:key.serializer 是必須要設置的,即使你打算只發送值的內容

    • value.serializer

    與 key.serializer 一樣,value.serializer 指定的類會將值序列化。

    下面代碼演示了如何創建一個 Kafka 生產者,這裏只指定了必要的屬性,其他使用默認的配置

    private Properties properties = new Properties();
    properties.put("bootstrap.servers","broker1:9092,broker2:9092");
    properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    properties = new KafkaProducer<String,String>(properties);

    來解釋一下這段代碼

    • 首先創建了一個 Properties 對象
    • 使用 StringSerializer 序列化器序列化 key / value 鍵值對
    • 在這裏我們創建了一個新的生產者對象,併為鍵值設置了恰當的類型,然後把 Properties 對象傳遞給他。

    實例化生產者對象后,接下來就可以開始發送消息了,發送消息主要由下面幾種方式

    直接發送,不考慮結果

    使用這種發送方式,不會關心消息是否到達,會丟失一些消息,因為 Kafka 是高可用的,生產者會自動嘗試重發,這種發送方式和 UDP 運輸層協議很相似。

    同步發送

    同步發送仍然使用 send() 方法發送消息,它會返回一個 Future 對象,調用 get() 方法進行等待,就可以知道消息時候否發送成功。

    異步發送

    異步發送指的是我們調用 send() 方法,並制定一個回調函數,服務器在返迴響應時調用該函數。

    下一節我們會重新討論這三種實現。

    向 Kafka 發送消息

    簡單消息發送

    Kafka 最簡單的消息發送如下:

    ProducerRecord<String,String> record =
                    new ProducerRecord<String, String>("CustomerCountry","West","France");
    
    producer.send(record);

    代碼中生產者(producer)的 send() 方法需要把 ProducerRecord 的對象作為參數進行發送,ProducerRecord 有很多構造函數,這個我們下面討論,這裏調用的是

    public ProducerRecord(String topic, K key, V value) {}

    這個構造函數,需要傳遞的是 topic主題,key 和 value。

    把對應的參數傳遞完成后,生產者調用 send() 方法發送消息(ProducerRecord對象)。我們可以從生產者的架構圖中看出,消息是先被寫入分區中的緩衝區中,然後分批次發送給 Kafka Broker。

    發送成功后,send() 方法會返回一個 Future(java.util.concurrent) 對象,Future 對象的類型是 RecordMetadata 類型,我們上面這段代碼沒有考慮返回值,所以沒有生成對應的 Future 對象,所以沒有辦法知道消息是否發送成功。如果不是很重要的信息或者對結果不會產生影響的信息,可以使用這種方式進行發送。

    我們可以忽略發送消息時可能發生的錯誤或者在服務器端可能發生的錯誤,但在消息發送之前,生產者還可能發生其他的異常。這些異常有可能是 SerializationException(序列化失敗)BufferedExhaustedException 或 TimeoutException(說明緩衝區已滿),又或是 InterruptedException(說明發送線程被中斷)

    同步發送消息

    第二種消息發送機制如下所示

    ProducerRecord<String,String> record =
                    new ProducerRecord<String, String>("CustomerCountry","West","France");
    
    try{
      RecordMetadata recordMetadata = producer.send(record).get();
    }catch(Exception e){
      e.printStackTrace();
    }
    

    這種發送消息的方式較上面的發送方式有了改進,首先調用 send() 方法,然後再調用 get() 方法等待 Kafka 響應。如果服務器返回錯誤,get() 方法會拋出異常,如果沒有發生錯誤,我們會得到 RecordMetadata 對象,可以用它來查看消息記錄。

    生產者(KafkaProducer)在發送的過程中會出現兩類錯誤:其中一類是重試錯誤,這類錯誤可以通過重發消息來解決。比如連接的錯誤,可以通過再次建立連接來解決;無錯誤則可以通過重新為分區選舉首領來解決。KafkaProducer 被配置為自動重試,如果多次重試后仍無法解決問題,則會拋出重試異常。另一類錯誤是無法通過重試來解決的,比如消息過大對於這類錯誤,KafkaProducer 不會進行重試,直接拋出異常。

    異步發送消息

    同步發送消息都有個問題,那就是同一時間只能有一個消息在發送,這會造成許多消息無法直接發送,造成消息滯后,無法發揮效益最大化。

    比如消息在應用程序和 Kafka 集群之間一個來回需要 10ms。如果發送完每個消息后都等待響應的話,那麼發送100個消息需要 1 秒,但是如果是異步方式的話,發送 100 條消息所需要的時間就會少很多很多。大多數時候,雖然Kafka 會返回 RecordMetadata 消息,但是我們並不需要等待響應。

    為了在異步發送消息的同時能夠對異常情況進行處理,生產者提供了回掉支持。下面是回調的一個例子

    ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("CustomerCountry", "Huston", "America");
            producer.send(producerRecord,new DemoProducerCallBack());
    
    
    class DemoProducerCallBack implements Callback {
    
      public void onCompletion(RecordMetadata metadata, Exception exception) {
        if(exception != null){
          exception.printStackTrace();;
        }
      }
    }

    首先實現回調需要定義一個實現了org.apache.kafka.clients.producer.Callback的類,這個接口只有一個 onCompletion方法。如果 kafka 返回一個錯誤,onCompletion 方法會拋出一個非空(non null)異常,這裏我們只是簡單的把它打印出來,如果是生產環境需要更詳細的處理,然後在 send() 方法發送的時候傳遞一個 Callback 回調的對象。

    生產者分區機制

    Kafka 對於數據的讀寫是以分區為粒度的,分區可以分佈在多個主機(Broker)中,這樣每個節點能夠實現獨立的數據寫入和讀取,並且能夠通過增加新的節點來增加 Kafka 集群的吞吐量,通過分區部署在多個 Broker 來實現負載均衡的效果。

    上面我們介紹了生產者的發送方式有三種:不管結果如何直接發送發送並返回結果發送並回調。由於消息是存在主題(topic)的分區(partition)中的,所以當 Producer 生產者發送產生一條消息發給 topic 的時候,你如何判斷這條消息會存在哪個分區中呢?

    這其實就設計到 Kafka 的分區機制了。

    分區策略

    Kafka 的分區策略指的就是將生產者發送到哪個分區的算法。Kafka 為我們提供了默認的分區策略,同時它也支持你自定義分區策略。

    如果要自定義分區策略的話,你需要显示配置生產者端的參數 Partitioner.class,我們可以看一下這個類它位於 org.apache.kafka.clients.producer 包下

    public interface Partitioner extends Configurable, Closeable {
      
      public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
    
      public void close();
      
      default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
    }

    Partitioner 類有三個方法,分別來解釋一下

    • partition(): 這個類有幾個參數: topic,表示需要傳遞的主題;key 表示消息中的鍵值;keyBytes表示分區中序列化過後的key,byte數組的形式傳遞;value 表示消息的 value 值;valueBytes 表示分區中序列化后的值數組;cluster表示當前集群的原數據。Kafka 給你這麼多信息,就是希望讓你能夠充分地利用這些信息對消息進行分區,計算出它要被發送到哪個分區中。
    • close() : 繼承了 Closeable 接口能夠實現 close() 方法,在分區關閉時調用。
    • onNewBatch(): 表示通知分區程序用來創建新的批次

    其中與分區策略息息相關的就是 partition() 方法了,分區策略有下面這幾種

    順序輪訓

    順序分配,消息是均勻的分配給每個 partition,即每個分區存儲一次消息。就像下面這樣

    上圖表示的就是輪訓策略,輪訓策略是 Kafka Producer 提供的默認策略,如果你不使用指定的輪訓策略的話,Kafka 默認會使用順序輪訓策略的方式。

    隨機輪訓

    隨機輪訓簡而言之就是隨機的向 partition 中保存消息,如下圖所示

    實現隨機分配的代碼只需要兩行,如下

    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    return ThreadLocalRandom.current().nextInt(partitions.size());

    先計算出該主題總的分區數,然後隨機地返回一個小於它的正整數。

    本質上看隨機策略也是力求將數據均勻地打散到各個分區,但從實際表現來看,它要遜於輪詢策略,所以如果追求數據的均勻分佈,還是使用輪詢策略比較好。事實上,隨機策略是老版本生產者使用的分區策略,在新版本中已經改為輪詢了。

    按照 key 進行消息保存

    這個策略也叫做 key-ordering 策略,Kafka 中每條消息都會有自己的key,一旦消息被定義了 Key,那麼你就可以保證同一個 Key 的所有消息都進入到相同的分區裏面,由於每個分區下的消息處理都是有順序的,故這個策略被稱為按消息鍵保序策略,如下圖所示

    實現這個策略的 partition 方法同樣簡單,只需要下面兩行代碼即可:

    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    return Math.abs(key.hashCode()) % partitions.size();

    上面這幾種分區策略都是比較基礎的策略,除此之外,你還可以自定義分區策略。

    生產者壓縮機制

    壓縮一詞簡單來講就是一種互換思想,它是一種經典的用 CPU 時間去換磁盤空間或者 I/O 傳輸量的思想,希望以較小的 CPU 開銷帶來更少的磁盤佔用或更少的網絡 I/O 傳輸。如果你還不了解的話我希望你先讀完這篇文章 ,然後你就明白壓縮是怎麼回事了。

    Kafka 壓縮是什麼

    Kafka 的消息分為兩層:消息集合 和 消息。一個消息集合中包含若干條日誌項,而日誌項才是真正封裝消息的地方。Kafka 底層的消息日誌由一系列消息集合日誌項組成。Kafka 通常不會直接操作具體的一條條消息,它總是在消息集合這個層面上進行寫入操作。

    在 Kafka 中,壓縮會發生在兩個地方:Kafka Producer 和 Kafka Consumer,為什麼啟用壓縮?說白了就是消息太大,需要變小一點 來使消息發的更快一些。

    Kafka Producer 中使用 compression.type 來開啟壓縮

    private Properties properties = new Properties();
    properties.put("bootstrap.servers","192.168.1.9:9092");
    properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    properties.put("compression.type", "gzip");
    
    Producer<String,String> producer = new KafkaProducer<String, String>(properties);
    
    ProducerRecord<String,String> record =
      new ProducerRecord<String, String>("CustomerCountry","Precision Products","France");

    上面代碼錶明該 Producer 的壓縮算法使用的是 GZIP

    有壓縮必有解壓縮,Producer 使用壓縮算法壓縮消息后併發送給服務器后,由 Consumer 消費者進行解壓縮,因為採用的何種壓縮算法是隨着 key、value 一起發送過去的,所以消費者知道採用何種壓縮算法。

    Kafka 重要參數配置

    在上一篇文章 中,我們主要介紹了一下 kafka 集群搭建的參數,本篇文章我們來介紹一下 Kafka 生產者重要的配置,生產者有很多可配置的參數,在文檔里(

    key.serializer

    用於 key 鍵的序列化,它實現了 org.apache.kafka.common.serialization.Serializer 接口

    value.serializer

    用於 value 值的序列化,實現了 org.apache.kafka.common.serialization.Serializer 接口

    acks

    acks 參數指定了要有多少個分區副本接收消息,生產者才認為消息是寫入成功的。此參數對消息丟失的影響較大

    • 如果 acks = 0,就表示生產者也不知道自己產生的消息是否被服務器接收了,它才知道它寫成功了。如果發送的途中產生了錯誤,生產者也不知道,它也比較懵逼,因為沒有返回任何消息。這就類似於 UDP 的運輸層協議,只管發,服務器接受不接受它也不關心。
    • 如果 acks = 1,只要集群的 Leader 接收到消息,就會給生產者返回一條消息,告訴它寫入成功。如果發送途中造成了網絡異常或者 Leader 還沒選舉出來等其他情況導致消息寫入失敗,生產者會受到錯誤消息,這時候生產者往往會再次重發數據。因為消息的發送也分為 同步異步,Kafka 為了保證消息的高效傳輸會決定是同步發送還是異步發送。如果讓客戶端等待服務器的響應(通過調用 Future 中的 get() 方法),顯然會增加延遲,如果客戶端使用回調,就會解決這個問題。
    • 如果 acks = all,這種情況下是只有當所有參与複製的節點都收到消息時,生產者才會接收到一個來自服務器的消息。不過,它的延遲比 acks =1 時更高,因為我們要等待不只一個服務器節點接收消息。

    buffer.memory

    此參數用來設置生產者內存緩衝區的大小,生產者用它緩衝要發送到服務器的消息。如果應用程序發送消息的速度超過發送到服務器的速度,會導致生產者空間不足。這個時候,send() 方法調用要麼被阻塞,要麼拋出異常,具體取決於 block.on.buffer.null 參數的設置。

    compression.type

    此參數來表示生產者啟用何種壓縮算法,默認情況下,消息發送時不會被壓縮。該參數可以設置為 snappy、gzip 和 lz4,它指定了消息發送給 broker 之前使用哪一種壓縮算法進行壓縮。下面是各壓縮算法的對比

    retries

    生產者從服務器收到的錯誤有可能是臨時性的錯誤(比如分區找不到首領),在這種情況下,reteis 參數的值決定了生產者可以重發的消息次數,如果達到這個次數,生產者會放棄重試並返回錯誤。默認情況下,生產者在每次重試之間等待 100ms,這個等待參數可以通過 retry.backoff.ms 進行修改。

    batch.size

    當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算。當批次被填滿,批次里的所有消息會被發送出去。不過生產者井不一定都會等到批次被填滿才發送,任意條數的消息都可能被發送。

    client.id

    此參數可以是任意的字符串,服務器會用它來識別消息的來源,一般配置在日誌里

    max.in.flight.requests.per.connection

    此參數指定了生產者在收到服務器響應之前可以發送多少消息,它的值越高,就會佔用越多的內存,不過也會提高吞吐量。把它設為1 可以保證消息是按照發送的順序寫入服務器。

    timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms

    request.timeout.ms 指定了生產者在發送數據時等待服務器返回的響應時間,metadata.fetch.timeout.ms 指定了生產者在獲取元數據(比如目標分區的首領是誰)時等待服務器返迴響應的時間。如果等待時間超時,生產者要麼重試發送數據,要麼返回一個錯誤。timeout.ms 指定了 broker 等待同步副本返回消息確認的時間,與 asks 的配置相匹配—-如果在指定時間內沒有收到同步副本的確認,那麼 broker 就會返回一個錯誤。

    max.block.ms

    此參數指定了在調用 send() 方法或使用 partitionFor() 方法獲取元數據時生產者的阻塞時間當生產者的發送緩衝區已捕,或者沒有可用的元數據時,這些方法就會阻塞。在阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。

    max.request.size

    該參數用於控制生產者發送的請求大小。它可以指能發送的單個消息的最大值,也可以指單個請求里所有消息的總大小。

    receive.buffer.bytes 和 send.buffer.bytes

    Kafka 是基於 TCP 實現的,為了保證可靠的消息傳輸,這兩個參數分別指定了 TCP Socket 接收和發送數據包的緩衝區的大小。如果它們被設置為 -1,就使用操作系統的默認值。如果生產者或消費者與 broker 處於不同的數據中心,那麼可以適當增大這些值。

    文章參考:

    《Kafka 權威指南》

    極客時間 -《Kafka 核心技術與實戰》

    Kafka 源碼

    關注公眾號獲取更多優質电子書,關注一下你就知道資源是有多好了

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

    ※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

    ※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

    ※評比南投搬家公司費用收費行情懶人包大公開

  • EFK教程 – ElasticSearch高性能高可用架構

    EFK教程 – ElasticSearch高性能高可用架構

    通過將elasticsearch的data、ingest、master角色進行分離,搭建起高性能+高可用的ES架構

    作者:“發顛的小狼”,歡迎轉載與投稿

    目錄

    ▪ 用途
    ▪ 架構
    ▪ 步驟說明
    ▪ elasticsearch-data部署
    ▪ elasticsearch-ingest部署
    ▪ elasticsearch-master部署

    用途

    在第一篇《EFK教程 – 快速入門指南》中,闡述了EFK的安裝部署,其中ES的架構為三節點,即master、ingest、data角色同時部署在三台服務器上。

    在本文中,將進行角色分離部署,並且每個角色分別部署三節點,在實現性能最大化的同時保障高可用。

    ▷ elasticsearch的master節點:用於調度,採用普通性能服務器來部署
    ▷ elasticsearch的ingest節點:用於數據預處理,採用性能好的服務器來部署
    ▷ elasticsearch的data節點:用於數據落地存儲,採用存儲性能好的服務器來部署

    若不知道去哪找《EFK教程 - 快速入門指南》,可在主流搜索引擎里搜索:
    小慢哥 EFK教程 快速入門指南
    或者
    小慢哥 EFK教程 基於多節點ES的EFK安裝部署配置

    架構

    服務器配置

    注意:此處的架構是之前的文章《EFK教程 – 快速入門指南》的拓展,因此請先按照《EFK教程 – 快速入門指南》完成部署

    步驟說明

    1️⃣ 部署3台data節點,加入原集群
    2️⃣ 部署3台ingest節點,加入原集群
    3️⃣ 將原有的es索引遷移到data節點
    4️⃣ 將原有的es節點改造成master節點

    elasticsearch-data部署

    之前已完成了基礎的elasticsearch架構,現需要新增三台存儲節點加入集群,同時關閉master和ingest功能

    elasticsearch-data安裝:3台均執行相同的安裝步驟

    tar -zxvf elasticsearch-7.3.2-linux-x86_64.tar.gz
    mv elasticsearch-7.3.2 /opt/elasticsearch
    useradd elasticsearch -d /opt/elasticsearch -s /sbin/nologin
    mkdir -p /opt/logs/elasticsearch
    chown elasticsearch.elasticsearch /opt/elasticsearch -R
    chown elasticsearch.elasticsearch /opt/logs/elasticsearch -R
    # 數據盤需要elasticsearch寫權限
    chown elasticsearch.elasticsearch /data/SAS -R
    
    # 限制一個進程可以擁有的VMA(虛擬內存區域)的數量要超過262144,不然elasticsearch會報max virtual memory areas vm.max_map_count [65535] is too low, increase to at least [262144]
    echo "vm.max_map_count = 655350" >> /etc/sysctl.conf
    sysctl -p

    elasticsearch-data配置

    ▷ 192.168.1.51 /opt/elasticsearch/config/elasticsearch.yml

    cluster.name: my-application
    node.name: 192.168.1.51
    # 數據盤位置,如果有多個硬盤位置,用","隔開
    path.data: /data/SAS
    path.logs: /opt/logs/elasticsearch
    network.host: 192.168.1.51
    
    discovery.seed_hosts: ["192.168.1.31","192.168.1.32","192.168.1.33"]
    cluster.initial_master_nodes: ["192.168.1.31","192.168.1.32","192.168.1.33"]
    http.cors.enabled: true
    http.cors.allow-origin: "*"
    
    # 關閉master功能
    node.master: false
    # 關閉ingest功能
    node.ingest: false
    # 開啟data功能
    node.data: true

    ▷ 192.168.1.52 /opt/elasticsearch/config/elasticsearch.yml

    cluster.name: my-application
    node.name: 192.168.1.52
    # 數據盤位置,如果有多個硬盤位置,用","隔開
    path.data: /data/SAS
    path.logs: /opt/logs/elasticsearch
    network.host: 192.168.1.52
    
    discovery.seed_hosts: ["192.168.1.31","192.168.1.32","192.168.1.33"]
    cluster.initial_master_nodes: ["192.168.1.31","192.168.1.32","192.168.1.33"]
    http.cors.enabled: true
    http.cors.allow-origin: "*"
    
    # 關閉master功能
    node.master: false
    # 關閉ingest功能
    node.ingest: false
    # 開啟data功能
    node.data: true

    ▷ 192.168.1.53 /opt/elasticsearch/config/elasticsearch.yml

    cluster.name: my-application
    node.name: 192.168.1.53
    # 數據盤位置,如果有多個硬盤位置,用","隔開
    path.data: /data/SAS
    path.logs: /opt/logs/elasticsearch
    network.host: 192.168.1.53
    
    discovery.seed_hosts: ["192.168.1.31","192.168.1.32","192.168.1.33"]
    cluster.initial_master_nodes: ["192.168.1.31","192.168.1.32","192.168.1.33"]
    http.cors.enabled: true
    http.cors.allow-origin: "*"
    
    # 關閉master功能
    node.master: false
    # 關閉ingest功能
    node.ingest: false
    # 開啟data功能
    node.data: true

    elasticsearch-data啟動

    sudo -u elasticsearch /opt/elasticsearch/bin/elasticsearch

    elasticsearch集群狀態

    curl "http://192.168.1.31:9200/_cat/health?v"

    elasticsearch-data狀態

    curl "http://192.168.1.31:9200/_cat/nodes?v"

    elasticsearch-data參數說明

    status: green  # 集群健康狀態
    node.total: 6  # 有6台機子組成集群
    node.data: 6  # 有6個節點的存儲
    node.role: d  # 只擁有data角色
    node.role: i  # 只擁有ingest角色
    node.role: m  # 只擁有master角色
    node.role: mid  # 擁master、ingest、data角色

    elasticsearch-ingest部署

    現需要新增三台ingest節點加入集群,同時關閉master和data功能

    elasticsearch-ingest安裝:3台es均執行相同的安裝步驟

    tar -zxvf elasticsearch-7.3.2-linux-x86_64.tar.gz
    mv elasticsearch-7.3.2 /opt/elasticsearch
    useradd elasticsearch -d /opt/elasticsearch -s /sbin/nologin
    mkdir -p /opt/logs/elasticsearch
    chown elasticsearch.elasticsearch /opt/elasticsearch -R
    chown elasticsearch.elasticsearch /opt/logs/elasticsearch -R
    
    # 限制一個進程可以擁有的VMA(虛擬內存區域)的數量要超過262144,不然elasticsearch會報max virtual memory areas vm.max_map_count [65535] is too low, increase to at least [262144]
    echo "vm.max_map_count = 655350" >> /etc/sysctl.conf
    sysctl -p

    elasticsearch-ingest配置

    ▷ 192.168.1.41 /opt/elasticsearch/config/elasticsearch.yml

    cluster.name: my-application
    node.name: 192.168.1.41
    path.logs: /opt/logs/elasticsearch
    network.host: 192.168.1.41
    
    discovery.seed_hosts: ["192.168.1.31","192.168.1.32","192.168.1.33"]
    cluster.initial_master_nodes: ["192.168.1.31","192.168.1.32","192.168.1.33"]
    http.cors.enabled: true
    http.cors.allow-origin: "*"
    
    # 關閉master功能
    node.master: false
    # 開啟ingest功能
    node.ingest: true
    # 關閉data功能
    node.data: false

    ▷ 192.168.1.42 /opt/elasticsearch/config/elasticsearch.yml

    cluster.name: my-application
    node.name: 192.168.1.42
    path.logs: /opt/logs/elasticsearch
    network.host: 192.168.1.42
    
    discovery.seed_hosts: ["192.168.1.31","192.168.1.32","192.168.1.33"]
    cluster.initial_master_nodes: ["192.168.1.31","192.168.1.32","192.168.1.33"]
    http.cors.enabled: true
    http.cors.allow-origin: "*"
    
    # 關閉master功能
    node.master: false
    # 開啟ingest功能
    node.ingest: true
    # 關閉data功能
    node.data: false

    ▷ 192.168.1.43 /opt/elasticsearch/config/elasticsearch.yml

    cluster.name: my-application
    node.name: 192.168.1.43
    path.logs: /opt/logs/elasticsearch
    network.host: 192.168.1.43
    
    discovery.seed_hosts: ["192.168.1.31","192.168.1.32","192.168.1.33"]
    cluster.initial_master_nodes: ["192.168.1.31","192.168.1.32","192.168.1.33"]
    http.cors.enabled: true
    http.cors.allow-origin: "*"
    
    # 關閉master功能
    node.master: false
    # 開啟ingest功能
    node.ingest: true
    # 關閉data功能
    node.data: false

    elasticsearch-ingest啟動

    sudo -u elasticsearch /opt/elasticsearch/bin/elasticsearch

    elasticsearch集群狀態

    curl "http://192.168.1.31:9200/_cat/health?v"

    elasticsearch-ingest狀態

    curl "http://192.168.1.31:9200/_cat/nodes?v"

    elasticsearch-ingest參數說明

    status: green  # 集群健康狀態
    node.total: 9  # 有9台機子組成集群
    node.data: 6  # 有6個節點的存儲
    node.role: d  # 只擁有data角色
    node.role: i  # 只擁有ingest角色
    node.role: m  # 只擁有master角色
    node.role: mid  # 擁master、ingest、data角色

    elasticsearch-master部署

    首先,將上一篇《EFK教程 – 快速入門指南》中部署的3台es(192.168.1.31、192.168.1.32、192.168.1.33)改成只有master的功能, 因此需要先將這3台上的索引數據遷移到本次所做的data節點中

    1️⃣ 索引遷移:一定要做這步,將之前的索引放到data節點上

    curl -X PUT "192.168.1.31:9200/*/_settings?pretty" -H 'Content-Type: application/json' -d'
    {
      "index.routing.allocation.include._ip": "192.168.1.51,192.168.1.52,192.168.1.53"
    }'

    2️⃣ 確認當前索引存儲位置:確認所有索引不在192.168.1.31、192.168.1.32、192.168.1.33節點上

    curl "http://192.168.1.31:9200/_cat/shards?h=n"

    elasticsearch-master配置

    注意事項:修改配置,重啟進程,需要一台一台執行,要確保第一台成功后,再執行下一台。重啟進程的方法:由於上一篇文章《EFK教程 – 快速入門指南》里,是執行命令跑在前台,因此直接ctrl – c退出再啟動即可,啟動命令如下

    sudo -u elasticsearch /opt/elasticsearch/bin/elasticsearch

    ▷ 192.168.1.31 /opt/elasticsearch/config/elasticsearch.yml

    cluster.name: my-application
    node.name: 192.168.1.31
    path.logs: /opt/logs/elasticsearch
    network.host: 192.168.1.31
    
    discovery.seed_hosts: ["192.168.1.31","192.168.1.32","192.168.1.33"]
    cluster.initial_master_nodes: ["192.168.1.31","192.168.1.32","192.168.1.33"]
    http.cors.enabled: true
    http.cors.allow-origin: "*"
    
    #開啟master功能
    node.master: true
    #關閉ingest功能
    node.ingest: false
    #關閉data功能
    node.data: false

    ▷ 192.168.1.32 /opt/elasticsearch/config/elasticsearch.yml

    cluster.name: my-application
    node.name: 192.168.1.32
    path.logs: /opt/logs/elasticsearch
    network.host: 192.168.1.32
    
    discovery.seed_hosts: ["192.168.1.31","192.168.1.32","192.168.1.33"]
    cluster.initial_master_nodes: ["192.168.1.31","192.168.1.32","192.168.1.33"]
    http.cors.enabled: true
    http.cors.allow-origin: "*"
    
    #開啟master功能
    node.master: true
    #關閉ingest功能
    node.ingest: false
    #關閉data功能
    node.data: false

    ▷ 192.168.1.33 /opt/elasticsearch/config/elasticsearch.yml

    cluster.name: my-application
    node.name: 192.168.1.33
    path.logs: /opt/logs/elasticsearch
    network.host: 192.168.1.33
    
    discovery.seed_hosts: ["192.168.1.31","192.168.1.32","192.168.1.33"]
    cluster.initial_master_nodes: ["192.168.1.31","192.168.1.32","192.168.1.33"]
    http.cors.enabled: true
    http.cors.allow-origin: "*"
    
    #開啟master功能
    node.master: true
    #關閉ingest功能
    node.ingest: false
    #關閉data功能
    node.data: false

    elasticsearch集群狀態

    curl "http://192.168.1.31:9200/_cat/health?v"

    elasticsearch-master狀態

    curl "http://192.168.1.31:9200/_cat/nodes?v"

    至此,當node.role里所有服務器都不再出現“mid”,則表示一切順利完成。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
    【其他文章推薦】

    ※帶您來了解什麼是 USB CONNECTOR  ?

    ※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

    ※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

    ※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

    ※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益