分類: 3C資訊

  • 【algo&ds】8.最小生成樹

    【algo&ds】8.最小生成樹

    1.最小生成樹介紹

    什麼是最小生成樹?

    最小生成樹(Minimum spanning tree,MST)是在一個給定的無向圖G(V,E)中求一棵樹T,使得這棵樹擁有圖G中的所有頂點,且所有邊都是來自圖G中的邊,並且滿足整棵樹的邊權值和最小。

    2.prim算法

    和Dijkstra算法很像!!請看如下Gif圖,prim算法的核心思想是對圖G(V,E)設置集合S,存放已被訪問的頂點,然後每次從集合V-S中選擇與集合S的最短距離最小的一個頂點(記為u),訪問並加入集合S。之後,令頂點u為中間點,優化所有從u能到達的頂點v與集合s之間的最短距離。這樣的操作執行n次,直到集合s中包含所有頂點。

    不同的是,Dijkstra算法中的dist是從源點s到頂點w的最短路徑;而prim算法中的dist是從集合S到頂點w的最短路徑,以下是他們的偽碼描述對比,關於Dijkstra算法的詳細描述請

    算法實現:

    #include<iostream>
    #include<vector>
    #define INF 100000
    #define MaxVertex 105
    typedef int Vertex; 
    int G[MaxVertex][MaxVertex];
    int parent[MaxVertex];   // 並查集 
    int dist[MaxVertex]; // 距離 
    int Nv;    // 結點 
    int Ne;    // 邊 
    int sum;  // 權重和 
    using namespace std; 
    vector<Vertex> MST;  // 最小生成樹 
    
    // 初始化圖信息 
    void build(){
        Vertex v1,v2;
        int w;
        cin>>Nv>>Ne;
        for(int i=1;i<=Nv;i++){
            for(int j=1;j<=Nv;j++)
                G[i][j] = 0;  // 初始化圖 
            dist[i] = INF;   // 初始化距離
            parent[i] = -1;  // 初始化並查集 
        }
        // 初始化點
        for(int i=0;i<Ne;i++){
            cin>>v1>>v2>>w;
            G[v1][v2] = w;
            G[v2][v1] = w;
        }
    }
    
    // Prim算法前的初始化 
    void IniPrim(Vertex s){
        dist[s] = 0;
        MST.push_back(s);
        for(Vertex i =1;i<=Nv;i++)
            if(G[s][i]){
                dist[i] = G[s][i];
                parent[i] = s;
            } 
    }
    
    // 查找未收錄中dist最小的點 
    Vertex FindMin(){
        int min = INF;
        Vertex xb = -1;
        for(Vertex i=1;i<=Nv;i++)
            if(dist[i] && dist[i] < min){ 
                min = dist[i];
                xb = i;
            }
        return xb;
    }
    
    void output(){
        cout<<"被收錄順序:"<<endl; 
        for(Vertex i=1;i<=Nv;i++)
            cout<<MST[i]<<" ";
        cout<<"權重和為:"<<sum<<endl; 
        cout<<"該生成樹為:"<<endl; 
        for(Vertex i=1;i<=Nv;i++)
            cout<<parent[i]<<" ";
    }
    
    void Prim(Vertex s){
        IniPrim(s);
        while(1){
            Vertex v = FindMin();
            if(v == -1)
                break;
            sum += dist[v];
            dist[v] = 0;
            MST.push_back(v);
            for(Vertex w=1;w<=Nv;w++)
                if(G[v][w] && dist[w])
                    if(G[v][w] < dist[w]){
                        dist[w] = G[v][w];
                        parent[w] = v;
                    }
        }
    }
    
    
    int main(){
        build();
        Prim(1);
        output();
        return 0;
    } 

    關於prim算法的更加詳細講解請

    3.kruskal算法

    Kruskal算法也可以用來解決最小生成樹的問題,其算法思想很容易理解,典型的邊貪心,其算法思想為:

    • 在初始狀態時隱去圖中所有的邊,這樣圖中每個頂點都是一個單獨的連通塊,一共有n個連通塊
    • 對所有邊按邊權從小到大進行排序
    • 按邊權從小到大測試所有邊,如果當前測試邊所連接的兩個頂點不在同一個連通塊中,則把這條測試邊加入當前最小生成樹中,否則,將邊捨棄。
    • 重複執行上一步驟,直到最小生成樹中的邊數等於總頂點數減一 或者測試完所有邊時結束;如果結束時,最小生成樹的邊數小於總頂點數減一,說明該圖不連通。

    請看下面的Gif圖!

    算法實現:

    #include<iostream>
    #include<string>
    #include<vector>
    #include<queue>
    #define INF 100000
    #define MaxVertex 105
    typedef int Vertex; 
    int G[MaxVertex][MaxVertex];
    int parent[MaxVertex];   // 並查集最小生成樹 
    int Nv;    // 結點 
    int Ne;    // 邊 
    int sum;  // 權重和 
    using namespace std; 
    struct Node{
        Vertex v1;
        Vertex v2;
        int weight; // 權重 
        // 重載運算符成最大堆 
        bool operator < (const Node &a) const
        {
            return weight>a.weight;
        }
    };
    vector<Node> MST;  // 最小生成樹 
    priority_queue<Node> q;   // 最小堆 
    
    // 初始化圖信息 
    void build(){
        Vertex v1,v2;
        int w;
        cin>>Nv>>Ne;
        for(int i=1;i<=Nv;i++){
            for(int j=1;j<=Nv;j++)
                G[i][j] = 0;  // 初始化圖
            parent[i] = -1;
        }
        // 初始化點
        for(int i=0;i<Ne;i++){
            cin>>v1>>v2>>w;
            struct Node tmpE;
            tmpE.v1 = v1;
            tmpE.v2 = v2;
            tmpE.weight = w;
            q.push(tmpE); 
        }
    }
    
    //  路徑壓縮查找 
    int Find(int x){
        if(parent[x] < 0)
            return x;
        else
            return parent[x] = Find(parent[x]);
    } 
    
    //  按秩歸併 
    void Union(int x1,int x2){
        if(parent[x1] < parent[x2]){
            parent[x1] += parent[x2];
            parent[x2] = x1;
        }else{
            parent[x2] += parent[x1];
            parent[x1] = x2;
        }
    } 
    
    void Kruskal(){
        // 最小生成樹的邊不到 Nv-1 條且還有邊 
        while(MST.size()!= Nv-1 && !q.empty()){
            Node E = q.top();  // 從最小堆取出一條權重最小的邊
            q.pop(); // 出隊這條邊 
            if(Find(E.v1) != Find(E.v2)){  // 檢測兩條邊是否在同一集合 
                sum += E.weight; 
                Union(E.v1,E.v2);     // 並起來 
                MST.push_back(E);
            }
        }
        
    } 
    
    
    void output(){
        cout<<"被收錄順序:"<<endl; 
        for(Vertex i=0;i<Nv;i++)
            cout<<MST[i].weight<<" ";
        cout<<"權重和為:"<<sum<<endl; 
        for(Vertex i=1;i<=Nv;i++)
            cout<<parent[i]<<" ";
        cout<<endl;
    }
    
    
    int main(){
        build();
        Kruskal();
        output();
        return 0;
    } 

    關於kruskal算法更詳細的講解

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

    ※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

    ※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

  • 【原創】(十一)Linux內存管理slub分配器

    【原創】(十一)Linux內存管理slub分配器

    背景

    • Read the fucking source code! –By 魯迅
    • A picture is worth a thousand words. –By 高爾基

    說明:

    1. Kernel版本:4.14
    2. ARM64處理器,Contex-A53,雙核
    3. 使用工具:Source Insight 3.5, Visio

    1. 概述

    之前的文章分析的都是基於頁面的內存分配,而小塊內存的分配和管理是通過塊分配器來實現的。目前內核中,有三種方式來實現小塊內存分配:slab, slub, slob,最先有slab分配器,slub/slob分配器是改進版,slob分配器適用於小內存嵌入式設備,而slub分配器目前已逐漸成為主流塊分配器。接下來的文章,就是以slub分配器為目標,進一步深入。

    先來一個初印象:

    2. 數據結構

    有四個關鍵的數據結構:

    • struct kmem_cache:用於管理SLAB緩存,包括該緩存中對象的信息描述,per-CPU/Node管理slab頁面等;
      關鍵字段如下:
    /*
     * Slab cache management.
     */
    struct kmem_cache {
        struct kmem_cache_cpu __percpu *cpu_slab;       //每個CPU slab頁面
        /* Used for retriving partial slabs etc */
        unsigned long flags;
        unsigned long min_partial;
        int size;       /* The size of an object including meta data */
        int object_size;    /* The size of an object without meta data */
        int offset;     /* Free pointer offset. */
    #ifdef CONFIG_SLUB_CPU_PARTIAL
        /* Number of per cpu partial objects to keep around */
        unsigned int cpu_partial;
    #endif
        struct kmem_cache_order_objects oo;     //該結構體會描述申請頁面的order值,以及object的個數
    
        /* Allocation and freeing of slabs */
        struct kmem_cache_order_objects max;
        struct kmem_cache_order_objects min;
        gfp_t allocflags;   /* gfp flags to use on each alloc */
        int refcount;       /* Refcount for slab cache destroy */
        void (*ctor)(void *);           // 對象構造函數
        int inuse;      /* Offset to metadata */
        int align;      /* Alignment */
        int reserved;       /* Reserved bytes at the end of slabs */
        int red_left_pad;   /* Left redzone padding size */
        const char *name;   /* Name (only for display!) */
        struct list_head list;  /* List of slab caches */       //kmem_cache最終會鏈接在一個全局鏈表中
        struct kmem_cache_node *node[MAX_NUMNODES];     //Node管理slab頁面
    };
    • struct kmem_cache_cpu:用於管理每個CPU的slab頁面,可以使用無鎖訪問,提高緩存對象分配速度;
    struct kmem_cache_cpu {
        void **freelist;    /* Pointer to next available object */                  //指向空閑對象的指針
        unsigned long tid;  /* Globally unique transaction id */                
        struct page *page;  /* The slab from which we are allocating */     //slab緩存頁面
    #ifdef CONFIG_SLUB_CPU_PARTIAL
        struct page *partial;   /* Partially allocated frozen slabs */
    #endif
    #ifdef CONFIG_SLUB_STATS
        unsigned stat[NR_SLUB_STAT_ITEMS];
    #endif
    };
    • struct kmem_cache_node:用於管理每個Node的slab頁面,由於每個Node的訪問速度不一致,slab頁面由Node來管理;
    /*
     * The slab lists for all objects.
     */
    struct kmem_cache_node {
        spinlock_t list_lock;
    
    #ifdef CONFIG_SLUB
        unsigned long nr_partial;    //slab頁表數量
        struct list_head partial;       //slab頁面鏈表
    #ifdef CONFIG_SLUB_DEBUG
        atomic_long_t nr_slabs;
        atomic_long_t total_objects;
        struct list_head full;
    #endif
    #endif
    };
    • struct page:用於描述slab頁面struct page結構體中很多字段都是通過union聯合體進行復用的。
      struct page結構中,用於slub的成員如下:
    struct page {
        union {
           ...
            void *s_mem;            /* slab first object */
           ...
        };
        
        /* Second double word */
        union {
           ...
            void *freelist;     /* sl[aou]b first free object */
           ...
        };
        
        union {
           ...
            struct {
                union {
                  ...
                    struct {            /* SLUB */
                        unsigned inuse:16;
                        unsigned objects:15;
                        unsigned frozen:1;
                    };
                    ...
                };
           ...
            };       
        };   
        
        /*
         * Third double word block
         */
        union {
           ...
            struct {        /* slub per cpu partial pages */
                struct page *next;  /* Next partial slab */
    #ifdef CONFIG_64BIT
                int pages;  /* Nr of partial slabs left */
                int pobjects;   /* Approximate # of objects */
    #else
                short int pages;
                short int pobjects;
    #endif
            };
    
            struct rcu_head rcu_head;   /* Used by SLAB
                             * when destroying via RCU
                             */
        };
        ...
            struct kmem_cache *slab_cache;  /* SL[AU]B: Pointer to slab */    
        ...
    }

    圖來了:

    3. 流程分析

    針對Slub的使用,可以從三個維度來分析:

    1. slub緩存創建
    2. slub對象分配
    3. slub對象釋放

    下邊將進一步分析。

    3.1 kmem_cache_create

    在內核中通過kmem_cache_create接口來創建一個slab緩存

    先看一下這個接口的函數調用關係圖:

    1. kmem_cache_create完成的功能比較簡單,就是創建一個用於管理slab緩存kmem_cache結構,並對該結構體進行初始化,最終添加到全局鏈表中。kmem_cache結構體初始化,包括了上文中分析到的kmem_cache_cpukmem_cache_node兩個字段結構。

    2. 在創建的過程中,當發現已有的slab緩存中,有存在對象大小相近,且具有兼容標誌的slab緩存,那就只需要進行merge操作並返回,而無需進一步創建新的slab緩存

    3. calculate_sizes函數會根據指定的force_order或根據對象大小去計算kmem_cache結構體中的size/min/oo等值,其中kmem_cache_order_objects結構體,是由頁面分配order值和對象數量兩者通過位域拼接起來的。

    4. 在創建slab緩存的時候,有一個先雞后蛋的問題:kmem_cache結構體來管理一個slab緩存,而創建kmem_cache結構體又是從slab緩存中分配出來的對象,那麼這個問題是怎麼解決的呢?可以看一下kmem_cache_init函數,內核中定義了兩個靜態的全局變量kmem_cachekmem_cache_node,在kmem_cache_init函數中完成了這兩個結構體的初始化之後,相當於就是創建了兩個slab緩存,一個用於分配kmem_cache結構體對象的緩存池,一個用於分配kmem_cache_node結構體對象的緩存池。由於kmem_cache_cpu結構體是通過__alloc_percpu來分配的,因此不需要創建一個相關的slab緩存

    3.2 kmem_cache_alloc

    kmem_cache_alloc接口用於從slab緩存池中分配對象。

    看一下大體的調用流程圖:

    從上圖中可以看出,分配slab對象與Buddy System中分配頁面類似,存在快速路徑和慢速路徑兩種,所謂的快速路徑就是per-CPU緩存,可以無鎖訪問,因而效率更高。

    整體的分配流程大體是這樣的:優先從per-CPU緩存中進行分配,如果per-CPU緩存中已經全部分配完畢,則從Node管理的slab頁面中遷移slab頁per-CPU緩存中,再重新分配。當Node管理的slab頁面也不足的情況下,則從Buddy System中分配新的頁面,添加到per-CPU緩存中。

    還是用圖來說明更清晰,分為以下幾步來分配:

    1. fastpath
      快速路徑下,以原子的方式檢索per-CPU緩存的freelist列表中的第一個對象,如果freelist為空並且沒有要檢索的對象,則跳入慢速路徑操作,最後再返回到快速路徑中重試操作。

    2. slowpath-1
      將per-CPU緩存中page指向的slab頁中的空閑對象遷移到freelist中,如果有空閑對象,則freeze該頁面,沒有空閑對象則跳轉到slowpath-2

    3. slowpath-2
      將per-CPU緩存中partial鏈表中的第一個slab頁遷移到page指針中,如果partial鏈表為空,則跳轉到slowpath-3

    4. slowpath-3
      將Node管理的partial鏈表中的slab頁遷移到per-CPU緩存中的page中,並重複第二個slab頁將其添加到per-CPU緩存中的partial鏈表中。如果遷移的slab中空閑對象超過了kmem_cache.cpu_partial的一半,則僅遷移slab頁,並且不再重複。
      如果每個Node的partial鏈表都為空,跳轉到slowpath-4

    5. slowpath-4
      Buddy System中獲取頁面,並將其添加到per-CPU的page中。

    3.2 kmem_cache_free

    kmem_cache_free的操作,可以看成是kmem_cache_alloc的逆過程,因此也分為快速路徑和慢速路徑兩種方式,同時,慢速路徑中又分為了好幾種情況,可以參考kmem_cache_alloc的過程。

    調用流程圖如下:

    效果如下:

    1. 快速路徑釋放
      快速路徑下,直接將對象返回到freelist中即可。

    2. put_cpu_partial
      put_cpu_partial函數主要是將一個剛freeze的slab頁,放入到partial鏈表中。
      put_cpu_partial函數中調用unfreeze_partials函數,這時候會將per-CPU管理的partial鏈表中的slab頁面添加到Node管理的partial鏈表的尾部。如果超出了Node的partial鏈表,溢出的slab頁面中沒有分配對象的slab頁面將會返回到夥伴系統。

    3. add_partial
      添加slab頁到Node的partial鏈表中。

    4. remove_partial
      從Node的partial鏈表移除slab頁。

    具體釋放的流程走哪個分支,跟對象的使用情況,partial鏈表的個數nr_partial/min_partial等相關,細節就不再深入分析了。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

  • .NET高級特性-Emit(1)

    .NET高級特性-Emit(1)

      在這個大數據/雲計算/人工智能研發普及的時代,Python的崛起以及Javascript的前後端的侵略,程序員與企業似乎越來越青睞動態語言所帶來的便捷性與高效性,即使靜態語言在性能,錯誤檢查等方面的優於靜態語言。對於.NETer來說,.NET做為一門靜態語言,我們不僅要打好.NET的基本功,如基本類型/語法/底層原理/錯誤檢查等知識,也要深入理解.NET的一些高級特性,來為你的工作減輕負擔和提高代碼質量。

      ok,咱們今天開始聊一聊.NET中的Emit。

    一、什麼是Emit?

      Emit含義為發出、產生的含義,這是.NET中的一組類庫,命名空間為System.Reflection.Emit,幾乎所有的.NET版本(Framework/Mono/NetCore)都支持Emit,可以實現用C#代碼生成代碼的類庫

    二、Emit的本質

      我們知道.NET可以由各種語言進行編寫,比如VB,C++等,當然絕大部分程序員進行.NET開發都是使用C#語言進行的,這些語言都會被各自的語言解釋器解釋為IL語言並執行,而Emit類庫的作用就是用這些語言來編寫生成IL語言,並交給CLR(公共語言運行時)進行執行。

      我們先來看看IL語言長什麼樣子:

      (1) 首先我們創建一個Hello,World程序

        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("Hello World!");
            }
        }

      (2) 將程序編譯成dll文件,我們可以看到在開發目錄下生成了bin文件夾

      

      (3) 向下尋找,我們可以看到dll文件已經生成,筆者使用netcore3進行開發,故路徑為bin/Debug/netcoreapp3.0

      

      (4) 這時候,我們就要祭出我們的il查看神器了,ildasm工具

      

      如何找到這個工具?打開開始菜單,找到Visual Studio文件夾,打開Developer Command Prompt,在打開的命令行中鍵入ildasm回車即可,筆者使用vs2019進行演示,其它vs版本操作方法均一致

      

     

     

     

     

     

     

       (5) 在dasm菜單欄選擇文件->打開,選擇剛剛生成的dll文件

      

     

     

       (6) 即可查看生成il代碼

      

     

      有了ildasm的輔助,我們就能夠更好的了解IL語言以及如何編寫IL語言,此外,Visual Studio中還有許多插件支持查看il代碼,比如JetBrains出品的Resharper插件等,如果覺得筆者方式較為麻煩可以使用以上插件查看il代碼

    三、理解IL代碼

      在上一章節中,我們理解了Emit的本質其實就是用C#來編寫IL代碼,既然要編寫IL代碼,那麼我們首先要理解IL代碼是如何進行工作的,IL代碼是如何完成C#當中的順序/選擇/循環結構的,是如何實現類的定義/字段的定義/屬性的定義/方法的定義的。

      IL代碼是一種近似於指令式的代碼語言,與彙編語言比較相近,所以習慣於寫高級語言的.NETer來說比較難以理解

      讓我們來看看Hello,World程序的IL代碼:

    IL_0000:  nop
    IL_0001:  ldstr      "Hello World!"
    IL_0006:  call       void [System.Console]System.Console::WriteLine(string)
    IL_000b:  nop
    IL_000c:  ret

      我們可以把IL代碼看成棧的運行

      第一條指令,nop表示不做任何事情,表示代碼不做任何事情

      第二條指令,ldstr表示將字符串放入棧中,字符串的值為“Hello,World!”

      第三條指令,call表示調用方法,參數為調用方法的方法信息,並把返回的結構壓入棧中,使用的參數為之前已經入棧的“Hello World!”,以此類推,如果方法有n個參數,那麼他就會調取棧中n個數據,並返回一個結果放回棧中

      第四條指令,nop表示不做任何事情

      第五條指令,ret表示將棧中頂部的數據返回,如果方法定義為void,則無返回值

      關於Hello,world程序IL的理解就說到這裏,更多的指令含義讀者可以參考微軟官方文檔,筆者之後也會繼續對Emit進行講解和Emit的應用

    四、用Emit類庫編寫IL代碼

      既然IL代碼咱們理解的差不多了,咱們就開始嘗試用C#來寫IL代碼了,有了IL代碼的參考,咱們也可以依葫蘆畫瓢的把代碼寫出來了

      (1) 引入Emit命名空間

    using System.Reflection.Emit;

      (2) 首先我們定義一個Main方法,入參無,返回類型void

    //定義方法名,返回類型,輸入類型
    var method = new DynamicMethod("Main", null, Type.EmptyTypes);

      (3) 生成IL代碼

    //生成IL代碼
    var ilGenerator = method.GetILGenerator();
    ilGenerator.Emit(OpCodes.Nop);
    ilGenerator.Emit(OpCodes.Ldstr,"Hello World!");
    ilGenerator.Emit(OpCodes.Call, typeof(Console).GetMethod("WriteLine", new Type[] { typeof(string) })); //尋找Console的WriteLine方法
    ilGenerator.Emit(OpCodes.Nop);
    ilGenerator.Emit(OpCodes.Ret);

      (4) 創建委託並調用

    //創建委託
    var helloWorldMethod = method.CreateDelegate(typeof(Action)) as Action;
    helloWorldMethod.Invoke();

      (5)運行,即輸出Hello World!

    五、小結

      Emit的本質是使用高級語言生成IL代碼,進而進行調用的的一組類庫,依賴Emit我們可以實現用代碼生成代碼的操作,即編程語言的自舉,可以有效彌補靜態語言的靈活性的缺失。

      Emit的性能非常好,除了第一次構建IL代碼所需要時間外,之後只要將操作緩存在計算機內存中,速度與手寫代碼相差無幾

      有許多著名.NET類庫均依賴於Emit:

      (.NET JSON操作庫)Json.NET/Newtonsoft.Json:

      (輕量ORM)Dapper:

      (ObjectToObjectMapper)EmitMapper:

      (AOP庫)Castle.DynamicProxy:

      學習Emit:

      .NET官方文檔:

      .NET API瀏覽器:

      之後作者將繼續講解.NET Emit的相關內容和應用,感謝閱讀

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    ※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

    ※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

    ※Google地圖已可更新顯示潭子電動車充電站設置地點!!

    ※帶您來看台北網站建置台北網頁設計,各種案例分享

  • 實現 Redis 協議解析器

    本文是 《用 Golang 實現一個 Redis》系列文章第二篇,本文將分別介紹 以及 的實現,若您對協議有所了解可以直接閱讀協議解析器部分。

    Redis 通信協議

    Redis 自 2.0 版本起使用了統一的協議 RESP (REdis Serialization Protocol),該協議易於實現,計算機可以高效的進行解析且易於被人類讀懂。

    RESP 是一個二進制安全的文本協議,工作於 TCP 協議上。客戶端和服務器發送的命令或數據一律以 \r\n (CRLF)結尾。

    RESP 定義了5種格式:

    • 簡單字符串(Simple String): 服務器用來返回簡單的結果,比如”OK”。非二進制安全,且不允許換行。
    • 錯誤信息(Error): 服務器用來返回簡單的結果,比如”ERR Invalid Synatx”。非二進制安全,且不允許換行。
    • 整數(Integer): llenscard等命令的返回值, 64位有符號整數
    • 字符串(Bulk String): 二進制安全字符串, get 等命令的返回值
    • 數組(Array, 舊版文檔中稱 Multi Bulk Strings): Bulk String 數組,客戶端發送指令以及lrange等命令響應的格式

    RESP 通過第一個字符來表示格式:

    • 簡單字符串:以”+” 開始, 如:”+OK\r\n”
    • 錯誤:以”-” 開始,如:”-ERR Invalid Synatx\r\n”
    • 整數:以”:”開始,如:”:1\r\n”
    • 字符串:以 $ 開始
    • 數組:以 * 開始

    Bulk String有兩行,第一行為 $+正文長度,第二行為實際內容。如:

    $3\r\nSET\r\n

    Bulk String 是二進制安全的可以包含任意字節,就是說可以在 Bulk String 內部包含 “\r\n” 字符(行尾的CRLF被隱藏):

    $4
    a\r\nb

    $-1 表示 nil, 比如使用 get 命令查詢一個不存在的key時,響應即為$-1

    Array 格式第一行為 “*”+數組長度,其後是相應數量的 Bulk String。如, ["foo", "bar"]的報文:

    *2
    $3
    foo
    $3
    bar

    客戶端也使用 Array 格式向服務端發送指令。命令本身將作為第一個參數,如 SET key value指令的RESP報文:

    *3
    $3
    SET
    $3
    key
    $5
    value

    將換行符打印出來:

    *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n

    協議解析器

    我們在 一文中已經介紹過TCP服務器的實現,協議解析器將實現其 Handler 接口充當應用層服務器。

    協議解析器將接收 Socket 傳來的數據,並將其數據還原為 [][]byte 格式,如 "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\value\r\n" 將被還原為 ['SET', 'key', 'value']

    本文完整代碼:

    來自客戶端的請求均為數組格式,它在第一行中標記報文的總行數並使用CRLF作為分行符。

    bufio 標準庫可以將從 reader 讀到的數據緩存到 buffer 中,直至遇到分隔符或讀取完畢后返回,所以我們使用 reader.ReadBytes('\n') 來保證每次讀取到完整的一行。

    需要注意的是RESP是二進制安全的協議,它允許在正文中使用CRLF字符。舉例來說 Redis 可以正確接收並執行SET "a\r\nb" 1指令, 這條指令的正確報文是這樣的:

    *3  
    $3
    SET
    $4
    a\r\nb 
    $7
    myvalue

    ReadBytes 讀取到第五行 “a\r\nb\r\n”時會將其誤認為兩行:

    *3  
    $3
    SET
    $4
    a  // 錯誤的分行
    b // 錯誤的分行
    $7
    myvalue

    因此當讀取到第四行$4后, 不應該繼續使用 ReadBytes('\n') 讀取下一行, 應使用 io.ReadFull(reader, msg) 方法來讀取指定長度的內容。

    msg = make([]byte, 4 + 2) // 正文長度4 + 換行符長度2
    _, err = io.ReadFull(reader, msg)

    定義 Client 結構體作為客戶端抽象:

    type Client struct {
        /* 與客戶端的 Tcp 連接 */
        conn   net.Conn
    
        /* 
         * 帶有 timeout 功能的 WaitGroup, 用於優雅關閉
         * 當響應被完整發送前保持 waiting 狀態, 阻止鏈接被關閉
         */
        waitingReply wait.Wait
    
        /* 標記客戶端是否正在發送指令 */ 
        sending atomic.AtomicBool
        
        /* 客戶端正在發送的參數數量, 即 Array 第一行指定的數組長度 */
        expectedArgsCount uint32
        
        /* 已經接收的參數數量, 即 len(args)*/ 
        receivedCount uint32
        
        /*
         * 已經接收到的命令參數,每個參數由一個 []byte 表示
         */
        args [][]byte
    }

    定義解析器:

    type Handler struct {
    
        /* 
         * 記錄活躍的客戶端鏈接 
         * 類型為 *Client -> placeholder 
         */
        activeConn sync.Map 
    
        /* 數據庫引擎,執行指令並返回結果 */
        db db.DB
    
        /* 關閉狀態標誌位,關閉過程中時拒絕新建連接和新請求 */
        closing atomic.AtomicBool 
    }

    接下來可以編寫主要部分了:

    func (h *Handler)Handle(ctx context.Context, conn net.Conn) {
        if h.closing.Get() {
            // 關閉過程中不接受新連接
            _ = conn.Close()
        }
    
        /* 初始化客戶端狀態 */
        client := &Client {
            conn:   conn,
        }
        h.activeConn.Store(client, 1)
    
        reader := bufio.NewReader(conn)
        var fixedLen int64 = 0 // 將要讀取的 BulkString 的正文長度
        var err error
        var msg []byte
        for {
            /* 讀取下一行數據 */ 
            if fixedLen == 0 { // 正常模式下使用 CRLF 區分數據行
                msg, err = reader.ReadBytes('\n')
                // 判斷是否以 \r\n 結尾
                if len(msg) == 0 || msg[len(msg) - 2] != '\r' {
                    errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
                    _, _ =  client.conn.Write(errReply.ToBytes())
                }
            } else { // 當讀取到 BulkString 第二行時,根據給出的長度進行讀取
                msg = make([]byte, fixedLen + 2)
                _, err = io.ReadFull(reader, msg)
                // 判斷是否以 \r\n 結尾
                if len(msg) == 0 || 
                  msg[len(msg) - 2] != '\r' ||  
                  msg[len(msg) - 1] != '\n'{
                    errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
                    _, _ =  client.conn.Write(errReply.ToBytes())
                }
                // Bulk String 讀取完畢,重新使用正常模式
                fixedLen = 0 
            }
            // 處理 IO 異常
            if err != nil {
                if err == io.EOF || err == io.ErrUnexpectedEOF {
                    logger.Info("connection close")
                } else {
                    logger.Warn(err)
                }
                _ = client.Close()
                h.activeConn.Delete(client)
                return // io error, disconnect with client
            }
    
            /* 解析收到的數據 */
            if !client.sending.Get() { 
                // sending == false 表明收到了一條新指令
                if msg[0] == '*' {
                    // 讀取第一行獲取參數個數
                    expectedLine, err := strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
                    if err != nil {
                        _, _ = client.conn.Write(UnknownErrReplyBytes)
                        continue
                    }
                    // 初始化客戶端狀態
                    client.waitingReply.Add(1) // 有指令未處理完成,阻止服務器關閉
                    client.sending.Set(true) // 正在接收指令中
                    // 初始化計數器和緩衝區 
                    client.expectedArgsCount = uint32(expectedLine) 
                    client.receivedCount = 0
                    client.args = make([][]byte, expectedLine)
                } else {
                    // TODO: text protocol
                }
            } else {
                // 收到了指令的剩餘部分(非首行)
                line := msg[0:len(msg)-2] // 移除換行符
                if line[0] == '$' { 
                    // BulkString 的首行,讀取String長度
                    fixedLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
                    if err != nil {
                        errReply := &reply.ProtocolErrReply{Msg:err.Error()}
                        _, _ = client.conn.Write(errReply.ToBytes())
                    }
                    if fixedLen <= 0 {
                        errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
                        _, _ = client.conn.Write(errReply.ToBytes())
                    }
                } else { 
                    // 收到參數
                    client.args[client.receivedCount] = line
                    client.receivedCount++
                }
    
    
                // 一條命令發送完畢
                if client.receivedCount == client.expectedArgsCount {
                    client.sending.Set(false)
    
                    // 執行命令並響應
                    result := h.db.Exec(client.args)
                    if result != nil {
                        _, _ = conn.Write(result.ToBytes())
                    } else {
                        _, _ = conn.Write(UnknownErrReplyBytes)
                    }
    
                    // 重置客戶端狀態,等待下一條指令
                    client.expectedArgsCount = 0
                    client.receivedCount = 0
                    client.args = nil
                    client.waitingReply.Done()
                }
            }
        }
    }

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

    ※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

  • I/O多路復用模型

    背景

    在文章中提到了五種I/O模型,其中前四種:阻塞模型、非阻塞模型、信號驅動模型、I/O復用模型都是同步模型;還有一種是異步模型。

    想寫一個系列的文章,介紹從I/O多路復用到異步編程和RPC框架,整個演進過程,這一系列可能包括:

    1. Reactor和Proactor模型
    2. 為什麼需要異步編程
    3. enable_shared_from_this用法分析
    4. 網絡通信庫和RPC

    為什麼有多路復用?

    多路復用技術要解決的是“通信”問題,解決核心在於“同步事件分離器”(de-multiplexer),linux系統帶有的分離器select、poll、epoll網上介紹的比較多,大家可以看看這篇介紹的不錯的文章:。通信的一方想要知道另一方的狀態(以決定自己做什麼),有兩種方法: 一是輪詢,二是消息通知。

    輪詢

    輪詢的一種典型的實現可能是這樣的:當然這裏的epoll_wait()也可以使用poll()或者select()替換。

    whiletrue) {
        active_stream[] = epoll_wait(epollfd)
        for i in active_stream[] {
            read or write till
        }
    }

    輪詢方式主要存在以下不足:

    • 增加系統開銷。無論是任務輪詢還是定時器輪詢都需要消耗對應的系統資源。
    • 無法及時感知設備狀態變化。在輪詢間隔內的設備狀態變化只有在下次輪詢時才能被發現,這將無法滿足對實時性敏感的應用場合。
    • 浪費CPU資源。無論設備是否發生狀態改變,輪詢總在進行。在實際情況中,大多數設備的狀態改變通常不會那麼頻繁,輪詢空轉將白白浪費CPU時間片。

    消息通知

    其實現方式通常是: “阻塞-通知”機制。阻塞會導致一個任務(task_struct,進程或者線程)只能處理一個”I/O流”或者類似的操作,要處理多個,就要多個任務(需要多個進程或線程),因此靈活性上又不如輪詢(一個任務足夠),很矛盾。

     

    select、poll、epoll對比

    矛盾的根源就是”一”和”多”的矛盾: 希望一個任務處理多個對象,同時避免處理阻塞-通知機制的內部細節。解決方案是多路復用(muliplex)。多路復用有3種基本方案,select()/poll()/epoll(),都是來解決這一矛盾的。

    • 通知代理: 用戶把需要關心的對象註冊給select()/poll()/epoll()函數。
    • 一對多: 所有的被關心的對象,只要有一個對象有了通知事件,select()/poll()/epoll()就會結束阻塞狀態。
    • 方便性: 用戶(程序員)不用再關心如何阻塞和被通知,以及哪些情況下會有通知產生。這件事情已經由上述幾個系統調用做了,用戶只需要實現”通知來了我該做什麼”。

     

    那麼上面3個系統調用的區別是什麼呢?
    第一個select(),結合了輪詢和阻塞兩種方式,沒有問題,每次有一個對象事件發生的時候,select()只是知道有事件發生了,具體是哪個對象發生的,不知道,需要從頭到尾輪詢一遍,複雜度是O(n)。poll函數相對select函數變化不大,只是提升了最大的可輪詢的對象個數。epoll函數把時間複雜度降到O(1)。

     

    為什麼select慢而epoll效率高?
    select()之所以慢,有幾個原因: select()的參數是一個FD數組,意味着每次select調用,都是一次新的註冊-阻塞-回調,每次select都要把一個數組從用戶空間拷貝到內核空間,內核檢測到某個對象狀態變化並寫入后,再從內核空間拷貝回用戶空間,select再把這個數組讀取一遍,並返回。這個過程非常低效。

    epoll的解決方案相當於是一種對select()的算法優化: 它把select()一個函數做的事情分解成了3步,首先epoll_create()創建一個epollfd對象(相當於一個池子),然後所有被監聽的fd通過epoll_ctrl()註冊到這個池子,也就是為每個fd指定了一個內部的回調函數(這樣,就沒有了每次調用時的來回拷貝,用戶空間的數組到內核空間只有這一次拷貝)。epoll_wait阻塞等待。在內核態有一個和epoll_wait對應的函數調用,把就緒的fd,填入到一個就緒列表中,而epoll_wait讀取這個就緒列表,做到了快速返回(O(1))。

    詳細的對比可以參考select、poll、epoll之間的區別總結:

     

    有了上面的原理介紹,這裏舉例來說明下epoll到底是怎麼使用的,加深理解。舉兩個例子:

    一個是比較簡單的父子進程通信的例子,單個小程序,不需要跑多個應用實例,不需要用戶輸入。
    一個是比較實戰的socket+epoll,畢竟現實案例中哪有兩個父子進程間通訊這麼簡單的應用場景。

    有了多路復用,難道還不夠?

    有了I/O復用,有了epoll已經可以使服務器併發幾十萬連接的同時,維持高TPS了,難道這還不夠嗎?答案是,技術層面足夠了,但在軟件工程層面卻是不夠的。例如,總要有個for循環去調用epoll,總來處理epoll的返回,這是每次都要重複的工作。for循環體裏面寫什麼—-通知返回之後,做事情的程序最好能以一種回調的機制,提供一個編程框架,讓程序更有結構一些。另一方面,如果希望每個事件通知之後,做的事情能有機會被代理到某個線程裏面去單獨運行,而線程完成的狀態又能通知回主任務,那麼”異步”的進制就必須被引入。

    所以,還有兩個問題要解決,一是”編程框架”,一是”異步”。我們先看幾個目前流行的框架,大部分框架已經包含了某種異步的機制。我們接下來的篇章將介紹“編程框架”和“異步I/O模型”。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    ※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

    網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

    ※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

  • pod刪除主要流程源碼解析

    本文以v1.12版本進行分析

    當一個pod刪除時,client端向apiserver發送請求,apiserver將pod的deletionTimestamp打上時間。kubelet watch到該事件,開始處理。

    syncLoop

    kubelet對pod的處理主要都是在syncLoop中處理的。

    func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    for {
    ...
            if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
                break
            }
    ...

    與pod刪除主要在syncLoopIteration中需要關注的是以下這兩個。

    func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
        syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
        select {
        case u, open := <-configCh:
    ...
            switch u.Op {
    ...
            case kubetypes.UPDATE:
                handler.HandlePodUpdates(u.Pods)
    ...
        case <-housekeepingCh:
            if !kl.sourcesReady.AllReady() {
            } else {
                if err := handler.HandlePodCleanups(); err != nil {
                    glog.Errorf("Failed cleaning pods: %v", err)
                }
            }
        }

    第一個是由於發送給apiserver的DELETE請求觸發的,增加了deletionTimestamp的事件。這裏對應於kubetypes.UPDATE。也就是會走到HandlePodUpdates函數。

    另外一個與delete相關的是每2s執行一次的來自於housekeepingCh的定時事件,用於清理pod,執行的是handler.HandlePodCleanups函數。這兩個作用不同,下面分別進行介紹。

    HandlePodUpdates

    先看HandlePodUpdates這個流程。只要打上了deletionTimestamp,就必然走到這個流程里去。

    func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
        for _, pod := range pods {
    ...
            kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
        }
    }

    在HandlePodUpdates中,進而將pod的信息傳遞到dispatchWork中處理。

    func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
        if kl.podIsTerminated(pod) {
            if pod.DeletionTimestamp != nil {
                kl.statusManager.TerminatePod(pod)
            }
            return
        }
        // Run the sync in an async worker.
        kl.podWorkers.UpdatePod(&UpdatePodOptions{
            Pod:        pod,
            MirrorPod:  mirrorPod,
            UpdateType: syncType,
            OnCompleteFunc: func(err error) {
    ...

    這裏首先通過判斷了kl.podIsTerminated(pod)判斷pod是不是已經處於了Terminated狀態。如果是的話,則不進行下面的kl.podWorkers.UpdatePod。

    func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
        status, ok := kl.statusManager.GetPodStatus(pod.UID)
        if !ok {
            status = pod.Status
        }
        return status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(status.ContainerStatuses))
    }

    這個地方特別值得注意的是,並不是由了DeletionTimestamp就會認為是Terminated狀態,而是有DeletionTimestamp且所有的容器不在運行了。也就是說如果是一個正在正常運行的pod,是也會走到kl.podWorkers.UpdatePod中的。UpdatePod通過一系列函數調用,最終會通過異步的方式執行syncPod函數中進入到syncPod函數中。

    func (kl *Kubelet) syncPod(o syncPodOptions) error {
    ...
        if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
            var syncErr error
            if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
    ...

    在syncPod中,調用killPod從而對pod進行停止操作。

    killPod

    killPod是停止pod的主體。在很多地方都會使用。這裏主要介紹下起主要的工作流程。停止pod的過程主要發生在killPodWithSyncResult函數中。

    func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) {
        killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodOverride)
    ...
        for _, podSandbox := range runningPod.Sandboxes {
                if err := m.runtimeService.StopPodSandbox(podSandbox.ID.ID); err != nil {
    ...

    killPodWithSyncResult的主要工作分為兩個部分。killContainersWithSyncResult負責將pod中的container停止掉,在停止后再執行StopPodSandbox。

    func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, reason string, gracePeriodOverride *int64) error {
        if err := m.internalLifecycle.PreStopContainer(containerID.ID); err != nil {
            return err
        }
    ...
        err := m.runtimeService.StopContainer(containerID.ID, gracePeriod)

    killContainersWithSyncResult的主要工作是在killContainer中完成的,這裏可以看到,其中的主要兩個步驟是在容器中進行prestop的操作。待其成功后,進行container的stop工作。至此所有的應用容器都已經停止了。下一步是停止pause容器。而StopPodSandbox就是執行這一過程的。將sandbox,也就是pause容器停止掉。StopPodSandbox是在dockershim中執行的。

    func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopPodSandboxRequest) (*runtimeapi.StopPodSandboxResponse, error) {
    ...
    if !hostNetwork && (ready || !ok) {
    ...
            err := ds.network.TearDownPod(namespace, name, cID, annotations)
    ...
        }
        if err := ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod); err != nil {

    StopPodSandbox中主要的部分是先進行網絡卸載,再停止相應的容器。在完成StopPodSandbox后,至此pod的所有容器都已經停止完成。至於volume的卸載,是在volumeManager中進行的。本文不做單獨介紹了。停止后的容器在pod徹底清理后,會被gc回收。這裏也不展開講了。

    HandlePodCleanups

    上面這個流程並不是刪除流程的全部。一個典型的情況就是,如果container都不是running,那麼在UpdatePod的時候都return了,那麼又由誰來處理呢?這裏我們回到最開始,就是那個每2s執行一次的HandlePodCleanups的流程。也就是說比如container處於crash,container正好不是running等情況,其實是在這個流程里進行處理的。當然HandlePodCleanups的作用不僅僅是清理not running的pod,再比如數據已經在apiserver中強制清理掉了,或者由於其他原因這個節點上還有一些沒有完成清理的pod,都是在這個流程中進行處理。

    func (kl *Kubelet) HandlePodCleanups() error {
    ... 
        for _, pod := range runningPods {
            if _, found := desiredPods[pod.ID]; !found {
                kl.podKillingCh <- &kubecontainer.PodPair{APIPod: nil, RunningPod: pod}
            }
        }

    runningPods是從cache中獲取節點現有的pod,而desiredPods則是節點上應該存在未被停止的pod。如果存在runningPods中有而desiredPods中沒有的pod,那麼它應該被停止,所以發送到podKillingCh中。

    func (kl *Kubelet) podKiller() {
    ...
        for podPair := range kl.podKillingCh {
    ...
    
            if !exists {
                go func(apiPod *v1.Pod, runningPod *kubecontainer.Pod) {
                    glog.V(2).Infof("Killing unwanted pod %q", runningPod.Name)
                    err := kl.killPod(apiPod, runningPod, nil, nil)
    ...
                }(apiPod, runningPod)
            }
        }
    }

    在podKiller流程中,會去接收來自podKillingCh的消息,從而執行killPod,上文已經做了該函數的介紹了。

    statusManager

    在最後,statusManager中的syncPod流程,將會進行檢測,通過canBeDeleted確認是否所有的容器關閉了,volume卸載了,cgroup清理了等等。如果這些全部完成了,則從apiserver中將pod信息徹底刪除。

    func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
    ...
        if m.canBeDeleted(pod, status.status) {
            deleteOptions := metav1.NewDeleteOptions(0)
            deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID))
            err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
    ...

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    ※帶您來了解什麼是 USB CONNECTOR  ?

    ※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

    ※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

    ※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

    ※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

  • MySQL 5.7 – 通過 BINLOG 恢複數據

    MySQL 5.7 – 通過 BINLOG 恢複數據

    日常開發,運維中,經常會出現誤刪數據的情況。誤刪數據的類型大致可分為以下幾類:

    • 使用 delete 誤刪行
    • 使用 drop table 或 truncate table 誤刪表
    • 使用 drop database 語句誤刪數據庫
    • 使用 rm 命令誤刪整個 MySQL 實例。

    不同的情況,都會有其優先的解決方案:

    • 針對誤刪行,可以通過 Flashback 工具將數據恢復
    • 針對誤刪表或庫,一般採用通過 BINLOG 將數據恢復。
    • 而對於誤刪 MySQL 實例,則需要我們搭建 HA 的 MySQL 集群,並保證我們的數據跨機房,跨城市保存。

    本篇主要討論的內容是誤刪表或者庫,會先介紹有關 BINLOG 的操作命令,然後會對誤刪表的這種情況進行實際的模擬。

    BINLOG 常見操作命令

    BINLOG 的查詢方式一般分為兩種,一種是進入 MySQL 控制台進行查詢,另一種是通過 MySQL 提供的工具 mysqlbinlog 進行查詢,兩者的不同會在下面介紹。

    通過 MySQL Cli 查詢 BINLOG 信息

    在 cli 中,常見的命令如下:

    # 查詢 BINLOG 格式
    show VARIABLES like 'binlog_format';
    
    # 查詢 BINLOG 位置
    show VARIABLES like 'datadir';
    
    # 查詢當前數據庫中 BINLOG 名稱及大小
    show binary logs;
    
    # 查看 master 正在寫入的 BINLOG 信息
    show master status\G;
    
    # 通過 offset 查看 BINLOG 信息
    show BINLOG events in 'mysql-bin.000034' limit 9000,  10;
    
    # 通過 position 查看 binlog 信息
    show BINLOG events in 'mysql-bin.000034' from 1742635 limit 10;

    使用 show BINLOG events 的問題:

    • 使用該命令時,如果當前 binlog 文件很大,而且沒有指定 limit,會引發對資源的過度消耗。因為 MySQL 客戶端需要將 binlog 的全部內容處理,返回並显示出來。為了防止這種情況,mysqlbinlog 工具是一個很好的選擇。

    通過 mysqlbinlog 查詢 BINLOG 信息

    在介紹 mysqlbinlog 工具使用前,先來看下 BINLOG 文件的內容:

    # 查詢 BINLOG 的信息
    mysqlbinlog  --no-defaults mysql-bin.000034 | less
    # at 141
    #100309  9:28:36 server id 123  end_log_pos 245
      Query thread_id=3350  exec_time=11  error_code=0
    • at 表示 offset 或者說事件開始的起始位置
    • 100309 9:28:36 server id 123 表示 server 123 開始執行事件的日期
    • end_log_pos 245 表示事件的結束位置 + 1,或者說是下一個事件的起始位置。
    • exec_time 表示在 master 上花費的時間,在 salve 上,記錄的時間是從 Master 記錄開始,一直到 Slave 結束完成所花費的時間。
    • rror_code=0 表示沒有錯誤發生。

    在大致了解 binlog 的內容后,mysqlbinlog 的用途有哪些?:

    • mysqlbinlog 可以作為代替 cli 讀取 binlog 的工具。
    • mysqlbinlog 可以將執行過的 SQL 語句輸出,用於數據的恢復或備份。

    查詢 BINLOG 日誌:

    # 查詢規定時候后發生的 BINLOG 日誌
    mysqlbinlog --no-defaults --base64-output=decode-rows -v  --start-datetime  "2019-11-22 14:00:00" --database sync_test  mysql-bin.000034 | less

    導出 BINLOG 日誌,用於分析和排查 sql 語句:

    mysqlbinlog --no-defaults --base64-output=decode-rows -v  --start-datetime  "2019-11-22 14:00:00" --database sync_test  mysql-bin.000034 > /home/mysql_backup/binlog_raw.sql

    導入 BINLOG 日誌

    # 通過 BINLOG 進行恢復。
    mysqlbinlog --start-position=1038 --stop-position=1164 --database=db_name  mysql-bin.000034 | mysql  -u cisco -p db_name
    
    # 通過 BINLOG 導出的 sql 進行恢復。
    mysql -u cisco -p db_name < binlog_raw.sql.sql

    mysqlbinlog 的常用參數:

    • --database 僅僅列出配置的數據庫信息
    • --no-defaults 讀取沒有選項的文件, 指定的原因是由於 mysqlbinlog 無法識別 BINLOG 中的 default-character-set=utf8 指令
    • --offset 跳過 log 中 N 個條目
    • --verbose 將日誌信息重建為原始的 SQL 陳述。
      • -v 僅僅解釋行信息
      • -vv 不但解釋行信息,還將 SQL 列類型的註釋信息也解析出來
    • --start-datetime 显示從指定的時間或之後的時間的事件。
      • 接收 DATETIME 或者 TIMESTRAMP 格式。
    • --base64-output=decode-rows 將 BINLOG 語句中事件以 base-64 的編碼显示,對一些二進制的內容進行屏蔽。
      • AUTO 默認參數,自動显示 BINLOG 中的必要的語句
      • NEVER 不會显示任何的 BINLOG 語句,如果遇到必須显示的 BINLOG 語言,則會報錯退出。
      • DECODE-ROWS 显示通過 -v 显示出來的 SQL 信息,過濾到一些 BINLOG 二進制數據。

    MySQL Cli 和 mysqlbinlog 工具之間的比較

    如果想知道當前 MySQL 中正在寫入的 BINLOG 的名稱,大小等基本信息時,可以通過 Cli 相關的命令來查詢。

    但想查詢,定位,恢復 BINLOG 中具體的數據時,要通過 mysqlbinlog 工具,因為相較於 Cli 來說,mysqlbinlog 提供了 --start-datetime--stop-position 等這樣更為豐富的參數供我們選擇。這時 Cli 中 SHOW BINLOG EVENTS 的簡要語法就變得相形見絀了。

    使用 BINLOG 恢複數據

    恢復的大致流程如下:

    1. 會創建數據庫和表,並插入數據。
    2. 誤刪一條數據。
    3. 繼續插入數據。
    4. 誤刪表。
    5. 最後將原來以及之後插入的數據進行恢復。

    準備數據

    準備數據庫,表及數據:

    # 創建臨時數據庫
    CREATE DATABASE IF NOT EXISTS test_binlog default charset utf8 COLLATE utf8_general_ci; 
    
    
    # 創建臨時表
    CREATE TABLE `sync_test` (`id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    
    # 添加數據
    insert into sync_test (id, name) values (null, 'xiaoa');
    insert into sync_test (id, name) values (null, 'xiaob');
    insert into sync_test (id, name) values (null, 'xiaoc');
    
    # 查看添加的數據
    select * from sync_test;

    刪除表或者數據

    誤刪操作:

    # 刪除 name=xiaob 的數據
    delete from sync_test where id=3
    
    # 插入幾條數據
    insert into sync_test (id, name) values (null, 'xiaod');
    insert into sync_test (id, name) values (null, 'xiaoe');
    insert into sync_test (id, name) values (null, 'xiaof');
    
    # 刪除表
    DROP TABLE sync_test;

    數據的恢復

    在執行數據恢復前,如果操作的是生產環境,會有如下的建議:

    • 使用 flush logs 命令,替換當前主庫中正在使用的 binlog 文件,好處如下:
      • 可將誤刪操作,定位在一個 BINLOG 文件中,便於之后的數據分析和恢復。
      • 避免操作正在被使用的 BINLOG 文件,防止發生意外情況。
    • 數據的恢復不要在生產庫中執行,先在臨時庫恢復,確認無誤后,再倒回生產庫。防止對數據的二次傷害。

    通常來說,恢復主要有兩個步驟:

    1. 在臨時庫中,恢復定期執行的全量備份數據。
    2. 然後基於全量備份的數據點,通過 BINLOG 來恢復誤操作和正常的數據。

    使用 BINLOG 做數據恢復前:

    # 查看正在使用的 Binlog 文件
    show master status\G;
    # 显示結果是: mysql-bin.000034
    
    # 執行 flush logs 操作,生成新的 BINLOG
    flush logs;
    
    # 查看正在使用的 Binlog 文件
    show master status\G;
    # 結果是:mysql-bin.000035

    確定恢複數據的步驟:

    這裏主要是有兩條誤刪的操作,數據行的誤刪和表的誤刪。有兩種方式進行恢復。

    • 方式一:首先恢復到刪除表操作之前的位置,然後再單獨恢復誤刪的數據行。
    • 方式二:首先恢復到誤刪數據行的之前的位置,然後跳過誤刪事件再恢複數據表操作之前的位置。

    這裏採用方式一的方案進行演示,由於是演示,就不額外找一個臨時庫進行全量恢復了,直接進行操作。

    查詢創建表的事件位置和刪除表的事件位置

    #  根據時間確定位置信息
    mysqlbinlog --no-defaults --base64-output=decode-rows -v  --start-datetime  "2019-11-22 14:00:00" --database test_binlog  mysql-bin.000034 | less

    創建表的開始位置:

    刪除表的結束位置:

    插入 name=’xiaob’ 的位置:

    # 根據位置導出 SQL 文件
    mysqlbinlog --no-defaults --base64-output=decode-rows -v --start-position "2508132" --stop-position "2511004" --database test_binlog  mysql-bin.000034 > /home/mysql_backup/test_binlog_step1.sql
     
     
    mysqlbinlog --no-defaults --base64-output=decode-rows -v --start-position "2508813" --stop-position "2509187" --database test_binlog  mysql-bin.000034 > /home/mysql_backup/test_binlog_step2.sql
     
    
    # 使用 mysql 進行恢復
    mysql -u cisco -p < /home/mysql_backup/test_binlog_step1.sql
    mysql -u cisco -p < /home/mysql_backup/test_binlog_step2.sql

    MySQL 5.7 中無論是否打開 GTID 的配置,在每次事務開啟時,都首先會出 GTID 的一個事務,用於并行複製。所以在確定導出開始事務位置時,要算上這個事件。

    在使用 –stop-position 導出時,會導出在指定位置的前一個事件,所以這裏要推后一個事務。

    對於 DML 的語句,主要結束位置要算上 COMMIT 的位置。

    總結

    在文章開始時,我們熟悉了操作 BINLOG 的兩種方式 CLI 和 mysqlbinlog 工具,接着介紹了其間的區別和使用場景,對於一些大型的 BINLOG 文件,使用 mysqlbinlog 會更加的方便和效率。並對 mysqlbinlog 的一些常見參數進行了介紹。

    接着通過使用 mysqlbinlog 實際模擬了數據恢復的過程,並在恢複數據時,提出了一些需要注意的事項,比如 flush logs 等。

    最後在恢複數據時,要注意 start-positionend-position 的一些小細節,來保證找到合適的位置。

    參考

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    ※為什麼 USB CONNECTOR 是電子產業重要的元件?

    網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

    ※想要讓你的商品成為最夯、最多人討論的話題?網頁設計公司讓你強力曝光

    ※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

  • 網絡相關的命令工具研究報告

    網絡相關的命令工具研究報告

    主機配置:DHCP

      DHCP(動態主機配置協議),是在一台主機啟動后,第一個運行的客戶/服務器應用程序。換言之,當一台主機啟動后,如果它認為自己當前應當連接到因特網上,但又不知道自己的IP地址時,DHCP就以引導程序的身份發揮作用。

      每個連接到TCP/IP互聯網的計算機都必須知道自己的IP地址、一個路由器的IP地址、一個名字服務器的IP地址以及自己的子網掩碼這四種信息。

     DHCP分組格式:

     

     

    一、曾經使用過的協議

      在DHCP成為正式的主機配置協議之前,還有過一些其他的協議。

    1.RARP:

      在因特網時代的初期,人們曾設計了一個稱為逆地址解析協議(Reverse Address Resolution Protocol,RARP)來向被引導的主機提供IP地址。實際上,RARP是ARP的一個版本。ARP將一個IP地址映射為一個物理地址,而RARP則將一個物理地址映射成為一個IP地址。但是RARP已經被淘汰了,原因有兩個:首先,RARP利用了數據鏈路層的廣播服務,這也就表示每個網絡上都必須存在一台RARP服務器。第二,RARP只能提供計算機的IP地址,但如今的計算機需要前面提到的所有四種信息。

    2.BOOTP:

      引導程序協議(BOOTstrap Protocol,BOOTP)是DHCP的先驅。它是一個客戶/服務器協議,被設計用來克服RARP協議存在的缺陷。但是BOOTP是一個靜態配置協議,當客戶請求自己的IP地址時,BOOTP服務器就諮詢一張表,將客戶的物理地址映射成相應的IP地址。這就意味着客戶的物理地址和IP地址之間的綁定是已經存在的。這個綁定關係是事先設定好的。

      在某些場合,我們需要的是一個動態配置協議。例如,當一台主機從一個物理網絡移動到另一個物理網絡時,它的物理地址就改變了。再比如,有時候主機需要在某一段時間內使用一個臨時的IP地址。BOOTP無法處理這種狀況,因為物理地址和IP地址之間的綁定是靜態的,是固定存放在一張表中的,除非管理員更改這張表。

      而DHCP的設計就是為了解決這些不足之處。

    3. DHCP:

      動態主機配置協議(Dynamic Host Configuration Protocol,DHCP)是一種客戶/服務器協議,設計這個協議是為了將上述四種信息傳遞給無盤計算機或者第一次啟動的計算機。DHCP是BOOTP的繼承者,並且能夠兼容BOOTP。 

    二、DHCP操作

      DHCP客戶和DHCP服務器可以在同一個網絡上,也可以位於不同的網絡。

    1.DHCP客戶和DHCP服務器在同一個網絡

      雖然這種情況不是很常見,不過管理員可以把客戶和服務器放在同一個網絡中。如圖所示:

     

    這種情況的操作如下:

    (1)DHCP服務器在UDP端口67發出被動打開命令,等待客戶請求。

    (2)被引導的客戶在UDP端口68發出主動打開命令。這個報文被封裝成UDP用戶報,其目的端口是67,源端口號是68。這個UDP用戶數據報在封裝成IP數據包。客戶使用的是全0的源地址和全1的目的地址。

    (3)服務器或者用廣播報文,或者用單播報文來響應這個用戶,它使用了UDP源端口號67和目的端口68.這個響應可以是單播的,因為服務器知道客戶的IP地址,同時也知道客戶的物理地址,也就是說它不需要使用ARP的服務進行從邏輯地址到物理地址的映射。但是某些系統不允許旁路掉ARP,結果就要使用廣播地址。

    2. DHCP客戶和DHCP服務器在不同的網絡

    如圖所示:

     
      像其他應用層的進程一樣,客戶可以在某個網絡上,而服務器可以在相隔好幾個網絡之外的另一網絡上。這就帶來了一個必須要解決的問題。DHCP請求是廣播發送的,因為客戶不知道服務器的IP地址。而廣播的IP數據報不能通過任何路由器。路由器收到這樣的分組就丟棄它。

      要解決這個問題,就需要一个中介物。某台主機(或是一台能夠配置為在應用層工作的路由器)可以用來充當中繼。在這種情況下,該主機就稱為中繼代理。中繼代理知道DHCP服務器的單播地址,並在端口67監聽廣播報文。當它收到這種類型的分組后,就把它封裝成一個單播數據報,並且把此請求發送給DHCP服務器。攜帶了單播目的地址的分組可以被任何一個路由器轉發,最終到達DHCP服務器。DHCP服務器知道這個報文來自中繼代理,因為在請求報文中有一個字段定義了中繼代理的IP地址。中繼代理在收到回答后,再把它發送給DHCP客戶。 

    三、配置

      人們設計DHCP是為了提供靜態和動態的地址分配。

    1.靜態地址分配

      對於靜態地址分配,DHCP有一個專門的數據庫,可以靜態地吧物理地址綁定到IP地址。

    2.動態地址分配

      DHCP還有第二個數據庫,包括一個可用的IP地址池。第二個數據庫使DHCP成為動態的。當DHCP客戶請求臨時的IP地址時,DHCP服務器就從可用(即為使用的)IP地址池中取出一個IP地址進行指派,這個IP地址的使用時間長短可協商。

      當DHCP客戶想DHCP服務器發送請求是,服務器首先檢查它的靜態數據庫。若靜態數據庫中存在所請求物理地址的表項,則返回給這個客戶的永久IP地址。反之,若靜態數據庫中沒有這個表項,服務器就從可用IP地址池中選擇一個IP地址,並把這個地址指派給客戶,然後再把相應的表項加入到動態數據庫中。

      如果主機要從一個網絡移動到另一個網絡,或者與一個網絡時連時斷,那麼DHCP的這種動態特性就有了用武之地。DHCP可以在有限時間內提供一個臨時的IP地址。

    從地址池指派的地址都是臨時地址。DHCP服務器向客戶授予某一段時間內對該地址池的租用權。當租用時效過期,客戶或者停止使用這個IP地址,或者續租。服務器有權力選擇同意或不同意續租。若服務器不同意,客戶就停止使用這個地址。

    3.轉換狀態

      為了提供動態的地址分配,DHCP客戶可以像狀態機那樣從一個狀態轉換到另一個狀態,狀態轉換取決於收到的報文和發送的報文。在這種情況下,報文的類型是由包含在DHCP分組中的標記為53的選項來定義的。標記為53 的選項如圖所示:

     

    DHCP的不同狀態:

    (1)INIT狀態

      當DHCP客戶首次啟動時,它處於INIT狀態(初始化狀態)。客戶使用端口67廣播DHCPDISCOVER報文(一個帶有DHCPDISCOVER選項的請求報文)。

    (2)SELECTING狀態

      在發送DHCPDISCOVER報文後,客戶就進入SELECTING(選擇)狀態。能夠提供這種類型服務的服務器要用DHCPOFFER報文進行相應。在此類報文中,服務器提供了一個IP地址。它們還要提供租用時間長度,其默認值是1小時。在發送DHCPOFFER報文的服務器,把提供的IP地址鎖定,使這個地址不會再提供給任何其他的客戶。客戶選擇所提供的地址中的一個,並向所選擇的服務器發送DHCPREQUEST報文。然後就進入REQUESTING狀態。如果客戶沒有收到DHCPOFFER報文,它還要再嘗試四次,每一次間隔2秒。如果對這些DHCPDISCOVER都沒有收到回答,客戶就睡眠5分鐘后再試。

    (3)REQUESTING狀態

      客戶保持在這個REQUESTING(請求)狀態,直至它收到來自服務器的DHCPACK報文為止,這個報文創建了客戶物理地址和它的IP地址之間的綁定。客戶收到DHCPACK報文後進入BOUND狀態。

    (4)BOUND狀態

      在這種狀態下,客戶可以使用該IP地址,直到租用時間到期。當到達租用時間的50%時,客戶就再發送一個DHCPREQUEST報文以請求更新。於是,客戶進入RENEWING。在BOUND(綁定)狀態時,客戶也可以取消租用,並進入到初始化狀態。

    (5)RENEWING狀態

      客戶保持在RENEWING(更新)狀態,直至下面兩個事件之一發生。客戶可以收到更新租用協定的DHCPACK報文。在這種情況下,客戶把計時器複位,然後回到BOUND狀態。或者,如果沒有收到DHCPACK,同時到達租用時間的87.5%時,客戶就進入REBINDING狀態。

    (6)REBINDING狀態

      客戶保持在REBINDING(重新綁定)狀態,直至下面三個事件之一發生。若客戶收到一個DHCPNACK報文或者租用時間到期,則回到初始化狀態,並嘗試得到另一個IP地址。若客戶收到DHCPACK報文,它就進入綁定狀態,並把計時器複位。

    DHCP不同狀態的轉換圖:

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

    ※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

    ※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

  • MySQL 複製表結構和表數據

    1、前言

      在功能開發完畢,在本地或者測試環境進行測試時,經常會遇到這種情況:有專門的測試數據,測試過程會涉及到修改表中的數據,經常不能一次測試成功,所以,每次執行測試后,原來表中的數據其實已經被修改了,下一次測試,就需要將數據恢復。

      我一般的做法是:先創建一個副本表,比如測試使用的user表,我在測試前創建副本表user_bak,每次測試后,將user表清空,然後將副本表user_bak的數據導入到user表中。

      上面的操作是對一個table做備份,如果涉及到的table太多,可以創建database的副本。

      接下來我將對此處的表結構複製以及表數據複製進行闡述,並非數據庫的複製原理!!!!

      下面是staff表的表結構

    create table staff (
    	id int not null auto_increment comment '自增id',
    	name char(20) not null comment '用戶姓名',
    	dep char(20) not null comment '所屬部門',
    	gender tinyint not null default 1 comment '性別:1男; 2女',
    	addr char(30) not null comment '地址',
    	primary key(id),
    	index idx_1 (name, dep),
    	index idx_2 (name, gender)
    ) engine=innodb default charset=utf8mb4 comment '員工表';
    

     

    2、具體方式 

      2.1、執行舊錶的創建SQL來創建表

      如果原始表已經存在,那麼可以使用命令查看該表的創建語句:

    mysql> show create table staff\G
    *************************** 1. row ***************************
           Table: staff
    Create Table: CREATE TABLE `staff` (
      `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
      `name` char(20) NOT NULL COMMENT '用戶姓名',
      `dep` char(20) NOT NULL COMMENT '所屬部門',
      `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
      `addr` char(30) NOT NULL,
      PRIMARY KEY (`id`),
      KEY `idx_1` (`name`,`dep`),
      KEY `idx_2` (`name`,`gender`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='員工表'
    1 row in set (0.01 sec)
    

      可以看到,上面show creat table xx的命令執行結果中,Create Table的值就是創建表的語句,此時可以直接複製創建表的SQL,然後重新執行一次就行了。

      當數據表中有數據的時候,看到的創建staff表的sql就會稍有不同。比如,我在staff中添加了兩條記錄:

    mysql> insert into staff values (null, '李明', 'RD', 1, '北京');
    Query OK, 1 row affected (0.00 sec)
    
    mysql> insert into staff values (null, '張三', 'PM', 0, '上海');
    Query OK, 1 row affected (0.00 sec)
    
    mysql> select * from staff;
    +----+--------+-----+--------+--------+
    | id | name   | dep | gender | addr   |
    +----+--------+-----+--------+--------+
    |  1 | 李明   | RD  |      1 | 北京   |
    |  2 | 張三   | PM  |      0 | 上海   |
    +----+--------+-----+--------+--------+
    2 rows in set (0.00 sec)
    

      此時在執行show create table命令:

    mysql> show create table staff\G
    *************************** 1. row ***************************
           Table: staff
    Create Table: CREATE TABLE `staff` (
      `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
      `name` char(20) NOT NULL COMMENT '用戶姓名',
      `dep` char(20) NOT NULL COMMENT '所屬部門',
      `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
      `addr` char(30) NOT NULL,
      PRIMARY KEY (`id`),
      KEY `idx_1` (`name`,`dep`),
      KEY `idx_2` (`name`,`gender`)
    ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COMMENT='員工表'
    1 row in set (0.00 sec)
    

      注意,上面結果中的倒數第二行

        ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COMMENT=’員工表’

      因為staff表的id是自增的,且已經有了2條記錄,所以下一次插入數據的自增id應該為3,這個信息,也會出現在表的創建sql中。

        

      2.2、使用like創建新表(僅包含表結構)

      使用like根據已有的表來創建新表,特點如下:

      1、方便,不需要查看原表的表結構定義信息;

      2、創建的新表中,表結構定義、完整性約束,都與原表保持一致。

      3、創建的新表是一個空表,全新的表,沒有數據。

      用法如下:

    mysql> select * from staff;  #舊錶中已有2條數據
    +----+--------+-----+--------+--------+
    | id | name   | dep | gender | addr   |
    +----+--------+-----+--------+--------+
    |  1 | 李明   | RD  |      1 | 北京   |
    |  2 | 張三   | PM  |      0 | 上海   |
    +----+--------+-----+--------+--------+
    2 rows in set (0.00 sec)
    
    mysql> create table staff_bak_1 like staff;  # 直接使用like,前面指定新表名,後面指定舊錶(參考的表)
    Query OK, 0 rows affected (0.02 sec)
    
    mysql> show create table staff_bak_1\G
    *************************** 1. row ***************************
           Table: staff_bak_1
    Create Table: CREATE TABLE `staff_bak_1` (
      `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
      `name` char(20) NOT NULL COMMENT '用戶姓名',
      `dep` char(20) NOT NULL COMMENT '所屬部門',
      `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
      `addr` char(30) NOT NULL,
      PRIMARY KEY (`id`),
      KEY `idx_1` (`name`,`dep`),
      KEY `idx_2` (`name`,`gender`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='員工表'  # 注意沒有AUTO_INCREMENT=3
    1 row in set (0.00 sec)
    
    mysql> select * from staff_bak_1; # 沒有包含舊錶的數據
    Empty set (0.00 sec)
    

      

      2.3、使用as來創建新表(包含數據)

      使用as來創建新表,有一下特點:

      1、可以有選擇性的決定新表包含哪些字段;

      2、創建的新表中,會包含舊錶的數據;

      3、創建的新表不會包含舊錶的完整性約束(比如主鍵、索引等),僅包含最基礎的表結構定義。

      用法如下:

    mysql> create table staff_bak_2 as select * from staff;
    Query OK, 2 rows affected (0.02 sec)
    Records: 2  Duplicates: 0  Warnings: 0
    
    mysql> select * from staff_bak_2;
    +----+--------+-----+--------+--------+
    | id | name   | dep | gender | addr   |
    +----+--------+-----+--------+--------+
    |  1 | 李明   | RD  |      1 | 北京   |
    |  2 | 張三   | PM  |      0 | 上海   |
    +----+--------+-----+--------+--------+
    2 rows in set (0.00 sec)
    
    mysql> show create table staff_bak_2\G
    *************************** 1. row ***************************
           Table: staff_bak_2
    Create Table: CREATE TABLE `staff_bak_2` (
      `id` int(11) NOT NULL DEFAULT '0' COMMENT '自增id',
      `name` char(20) CHARACTER SET utf8mb4 NOT NULL COMMENT '用戶姓名',
      `dep` char(20) CHARACTER SET utf8mb4 NOT NULL COMMENT '所屬部門',
      `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
      `addr` char(30) CHARACTER SET utf8mb4 NOT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=latin1
    1 row in set (0.00 sec)
    

      

      利用as創建表的時候沒有保留完整性約束,其實這個仔細想一下也能想明白。因為使用as創建表的時候,可以指定新表包含哪些字段呀,如果你創建新表時,忽略了幾個字段,這樣的話即使保留了完整約束,保存數據是也不能滿足完整性約束。

      比如,staff表有一個索引idx1,由name和dep字段組成;但是我創建的新表中,沒有name和dep字段(只選擇了其他字段),那麼新表中保存idx1也沒有必要,對吧。

    mysql> --  只選擇id、gender、addr作為新表的字段,那麼name和dep組成的索引就沒必要存在了
    mysql> create table staff_bak_3 as (select id, gender, addr from staff);
    Query OK, 2 rows affected (0.02 sec)
    Records: 2  Duplicates: 0  Warnings: 0
    
    mysql> show create table staff_bak_3\G
    *************************** 1. row ***************************
           Table: staff_bak_3
    Create Table: CREATE TABLE `staff_bak_3` (
      `id` int(11) NOT NULL DEFAULT '0' COMMENT '自增id',
      `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
      `addr` char(30) CHARACTER SET utf8mb4 NOT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=latin1
    1 row in set (0.00 sec)
    
    mysql> select * from staff_bak_3;
    +----+--------+--------+
    | id | gender | addr   |
    +----+--------+--------+
    |  1 |      1 | 北京   |
    |  2 |      0 | 上海   |
    +----+--------+--------+
    2 rows in set (0.00 sec)
    

      

      2.4、使用like+insert+select創建原表的副本(推薦)

      使用like創建新表,雖然保留了舊錶的各種表結構定義以及完整性約束,但是如何將舊錶的數據導入到新表中呢?

      最極端的方式:寫一個程序,先將舊錶數據讀出來,然後寫入到新表中,這個方式我就不嘗試了。

      有一個比較簡單的命令:

    mysql> select * from staff; #原表數據
    +----+--------+-----+--------+--------+
    | id | name   | dep | gender | addr   |
    +----+--------+-----+--------+--------+
    |  1 | 李明   | RD  |      1 | 北京   |
    |  2 | 張三   | PM  |      0 | 上海   |
    +----+--------+-----+--------+--------+
    2 rows in set (0.00 sec)
    
    mysql> select * from staff_bak_1; # 使用like創建的表,與原表相同的表結構和完整性約束(自增除外)
    Empty set (0.00 sec)
    
    mysql> insert into staff_bak_1 select * from staff;  # 將staff表的所有記錄的所有字段值都插入副本表中
    Query OK, 2 rows affected (0.00 sec)
    Records: 2  Duplicates: 0  Warnings: 0
    
    mysql> select * from staff_bak_1;
    +----+--------+-----+--------+--------+
    | id | name   | dep | gender | addr   |
    +----+--------+-----+--------+--------+
    |  1 | 李明   | RD  |      1 | 北京   |
    |  2 | 張三   | PM  |      0 | 上海   |
    +----+--------+-----+--------+--------+
    2 rows in set (0.00 sec)
    

      

      其實這條SQL語句,是知道兩個表的表結構和完整性約束相同,所以,可以直接select *。

    insert into staff_bak_1 select * from staff;
    

      

      如果兩個表結構不相同,其實也是可以這個方式的,比如:

    mysql> show create table demo\G
    *************************** 1. row ***************************
           Table: demo
    Create Table: CREATE TABLE `demo` (
      `_id` int(11) NOT NULL AUTO_INCREMENT,
      `_name` char(20) DEFAULT NULL,
      `_gender` tinyint(4) DEFAULT '1',
      PRIMARY KEY (`_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
    1 row in set (0.00 sec)
    
    # 只將staff表中的id和name字段組成的數據記錄插入到demo表中,對應_id和_name字段
    mysql> insert into demo (_id, _name) select id,name from staff;
    Query OK, 2 rows affected (0.00 sec)
    Records: 2  Duplicates: 0  Warnings: 0
    
    mysql> select * from demo;
    +-----+--------+---------+
    | _id | _name  | _gender |
    +-----+--------+---------+
    |   1 | 李明   |       1 |
    |   2 | 張三   |       1 |
    +-----+--------+---------+
    2 rows in set (0.00 sec)
    

      這是兩個表的字段數量不相同的情況,此時需要手動指定列名,否則就會報錯

     

      另外,如果兩個表的字段數量,以及相同順序的字段類型相同,如果是全部字段複製,即使字段名不同,也可以直接複製

    # staff_bak_5的字段名與staff表並不相同,但是字段數量、相同順序字段的類型相同,所以可以直接插入
    mysql> show create table staff_bak_5\G
    *************************** 1. row ***************************
           Table: staff_bak_5
    Create Table: CREATE TABLE `staff_bak_5` (
      `_id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
      `_name` char(20) NOT NULL COMMENT '用戶姓名',
      `_dep` char(20) NOT NULL COMMENT '所屬部門',
      `_gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
      `_addr` char(30) NOT NULL,
      PRIMARY KEY (`_id`),
      KEY `idx_1` (`_name`,`_dep`),
      KEY `idx_2` (`_name`,`_gender`)
    ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COMMENT='員工表'
    1 row in set (0.00 sec)
    
    mysql> insert into staff_bak_5 select * from staff;
    Query OK, 2 rows affected (0.00 sec)
    Records: 2  Duplicates: 0  Warnings: 0
    
    mysql> select * from staff_bak_5;
    +-----+--------+------+---------+--------+
    | _id | _name  | _dep | _gender | _addr  |
    +-----+--------+------+---------+--------+
    |   1 | 李明   | RD   |       1 | 北京   |
    |   2 | 張三   | PM   |       0 | 上海   |
    +-----+--------+------+---------+--------+
    2 rows in set (0.00 sec)
    

      

      

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

  • 源碼分析RocketMQ消息軌跡

    源碼分析RocketMQ消息軌跡

    目錄

    本文沿着的思路,從如下3個方面對其源碼進行解讀:

    1. 發送消息軌跡
    2. 消息軌跡格式
    3. 存儲消息軌跡數據

    @(本節目錄)

    1、發送消息軌跡流程

    首先我們來看一下在消息發送端如何啟用消息軌跡,示例代碼如下:

    public class TraceProducer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);      // @1
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
            for (int i = 0; i < 10; i++)
                try {
                    {
                        Message msg = new Message("TopicTest",
                            "TagA",
                            "OrderID188",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                        SendResult sendResult = producer.send(msg);
                        System.out.printf("%s%n", sendResult);
                    }
    
                } catch (Exception e) {
                    e.printStackTrace();
                }
            producer.shutdown();
        }
    }

    從上述代碼可以看出其關鍵點是在創建DefaultMQProducer時指定開啟消息軌跡跟蹤。我們不妨瀏覽一下DefaultMQProducer與啟用消息軌跡相關的構造函數:

    public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)
    public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)

    參數如下:

    • String producerGroup
      生產者所屬組名。
    • boolean enableMsgTrace
      是否開啟跟蹤消息軌跡,默認為false。
    • String customizedTraceTopic
      如果開啟消息軌跡跟蹤,用來存儲消息軌跡數據所屬的主題名稱,默認為:RMQ_SYS_TRACE_TOPIC。

    1.1 DefaultMQProducer構造函數

    public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) {      // @1
        this.producerGroup = producerGroup;
        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
        //if client open the message trace feature
        if (enableMsgTrace) {                                                                                                                                                                                            // @2
            try {
                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);                                                         
                dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
                traceDispatcher = dispatcher;
                this.getDefaultMQProducerImpl().registerSendMessageHook(
                    new SendMessageTraceHookImpl(traceDispatcher));                                                                                                                             // @3
            } catch (Throwable e) {
                log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
            }
        }
    }

    代碼@1:首先介紹一下其局部變量。

    • String producerGroup
      生產者所屬組。
    • RPCHook rpcHook
      生產者發送鈎子函數。
    • boolean enableMsgTrace
      是否開啟消息軌跡跟蹤。
    • String customizedTraceTopic
      定製用於存儲消息軌跡的數據。

    代碼@2:用來構建AsyncTraceDispatcher,看其名:異步轉發消息軌跡數據,稍後重點關注。

    代碼@3:構建SendMessageTraceHookImpl對象,並使用AsyncTraceDispatcher用來異步轉發。

    1.2 SendMessageTraceHookImpl鈎子函數

    1.2.1 SendMessageTraceHookImpl類圖

    1. SendMessageHook
      消息發送鈎子函數,用於在消息發送之前、發送之後執行一定的業務邏輯,是記錄消息軌跡的最佳擴展點。
    2. TraceDispatcher
      消息軌跡轉發處理器,其默認實現類AsyncTraceDispatcher,異步實現消息軌跡數據的發送。下面對其屬性做一個簡單的介紹:
      • int queueSize
        異步轉發,隊列長度,默認為2048,當前版本不能修改。
      • int batchSize
        批量消息條數,消息軌跡一次消息發送請求包含的數據條數,默認為100,當前版本不能修改。
      • int maxMsgSize
        消息軌跡一次發送的最大消息大小,默認為128K,當前版本不能修改。
      • DefaultMQProducer traceProducer
        用來發送消息軌跡的消息發送者。
      • ThreadPoolExecutor traceExecuter
        線程池,用來異步執行消息發送。
      • AtomicLong discardCount
        記錄丟棄的消息個數。
      • Thread worker
        woker線程,主要負責從追加隊列中獲取一批待發送的消息軌跡數據,提交到線程池中執行。
      • ArrayBlockingQueue< TraceContext> traceContextQueue
        消息軌跡TraceContext隊列,用來存放待發送到服務端的消息。
      • ArrayBlockingQueue< Runnable> appenderQueue
        線程池內部隊列,默認長度1024。
      • DefaultMQPushConsumerImpl hostConsumer
        消費者信息,記錄消息消費時的軌跡信息。
      • String traceTopicName
        用於跟蹤消息軌跡的topic名稱。

    1.2.2 源碼分析SendMessageTraceHookImpl

    1.2.2.1 sendMessageBefore
    public void sendMessageBefore(SendMessageContext context) { 
        //if it is message trace data,then it doesn't recorded
        if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {   // @1
            return;
        }
        //build the context content of TuxeTraceContext
        TraceContext tuxeContext = new TraceContext();
        tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
        context.setMqTraceContext(tuxeContext);
        tuxeContext.setTraceType(TraceType.Pub);
        tuxeContext.setGroupName(context.getProducerGroup());                                                                                                                       // @2
        //build the data bean object of message trace
        TraceBean traceBean = new TraceBean();                                                                                                                                                // @3
        traceBean.setTopic(context.getMessage().getTopic());
        traceBean.setTags(context.getMessage().getTags());
        traceBean.setKeys(context.getMessage().getKeys());
        traceBean.setStoreHost(context.getBrokerAddr());
        traceBean.setBodyLength(context.getMessage().getBody().length);
        traceBean.setMsgType(context.getMsgType());
        tuxeContext.getTraceBeans().add(traceBean);
    }

    代碼@1:如果topic主題為消息軌跡的Topic,直接返回。

    代碼@2:在消息發送上下文中,設置用來跟蹤消息軌跡的上下環境,裏面主要包含一個TraceBean集合、追蹤類型(TraceType.Pub)與生產者所屬的組。

    代碼@3:構建一條跟蹤消息,用TraceBean來表示,記錄原消息的topic、tags、keys、發送到broker地址、消息體長度等消息。

    從上文看出,sendMessageBefore主要的用途就是在消息發送的時候,先準備一部分消息跟蹤日誌,存儲在發送上下文環境中,此時並不會發送消息軌跡數據。

    1.2.2.2 sendMessageAfter
    public void sendMessageAfter(SendMessageContext context) {
        //if it is message trace data,then it doesn't recorded
        if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())     // @1
            || context.getMqTraceContext() == null) {
            return;
        }
        if (context.getSendResult() == null) {
            return;
        }
    
        if (context.getSendResult().getRegionId() == null
            || !context.getSendResult().isTraceOn()) {
            // if switch is false,skip it
            return;
        }
    
        TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
        TraceBean traceBean = tuxeContext.getTraceBeans().get(0);                                                                                                // @2
        int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());     // @3
        tuxeContext.setCostTime(costTime);                                                                                                                                      // @4
        if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {                                                                    
            tuxeContext.setSuccess(true);
        } else {
            tuxeContext.setSuccess(false);
        }
        tuxeContext.setRegionId(context.getSendResult().getRegionId());                                                                                      
        traceBean.setMsgId(context.getSendResult().getMsgId());
        traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
        traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
        localDispatcher.append(tuxeContext);                                                                                                                                   // @5
    }

    代碼@1:如果topic主題為消息軌跡的Topic,直接返回。

    代碼@2:從MqTraceContext中獲取跟蹤的TraceBean,雖然設計成List結構體,但在消息發送場景,這裏的數據永遠只有一條,及時是批量發送也不例外。

    代碼@3:獲取消息發送到收到響應結果的耗時。

    代碼@4:設置costTime(耗時)、success(是否發送成功)、regionId(發送到broker所在的分區)、msgId(消息ID,全局唯一)、offsetMsgId(消息物理偏移量,如果是批量消息,則是最後一條消息的物理偏移量)、storeTime,這裏使用的是(客戶端發送時間 + 二分之一的耗時)來表示消息的存儲時間,這裡是一個估值。

    代碼@5:將需要跟蹤的信息通過TraceDispatcher轉發到Broker服務器。其代碼如下:

    public boolean append(final Object ctx) {
        boolean result = traceContextQueue.offer((TraceContext) ctx);
        if (!result) {
            log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
        }
        return result;
    }

    這裏一個非常關鍵的點是offer方法的使用,當隊列無法容納新的元素時會立即返回false,並不會阻塞。

    接下來將目光轉向TraceDispatcher的實現。

    1.3 TraceDispatcher實現原理

    TraceDispatcher,用於客戶端消息軌跡數據轉發到Broker,其默認實現類:AsyncTraceDispatcher。

    1.3.1 TraceDispatcher構造函數

    public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException {    
        // queueSize is greater than or equal to the n power of 2 of value
        this.queueSize = 2048;
        this.batchSize = 100;
        this.maxMsgSize = 128000;                                        
        this.discardCount = new AtomicLong(0L);         
        this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
        this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
        if (!UtilAll.isBlank(traceTopicName)) {
            this.traceTopicName = traceTopicName;
        } else {
            this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
        }                   // @1
        this.traceExecuter = new ThreadPoolExecutor(// :
            10, //
            20, //
            1000 * 60, //
            TimeUnit.MILLISECONDS, //
            this.appenderQueue, //
            new ThreadFactoryImpl("MQTraceSendThread_"));
        traceProducer = getAndCreateTraceProducer(rpcHook);      // @2
    }

    代碼@1:初始化核心屬性,該版本這些值都是“固化”的,用戶無法修改。

    • queueSize
      隊列長度,默認為2048,異步線程池能夠積壓的消息軌跡數量。
    • batchSize
      一次向Broker批量發送的消息條數,默認為100.
    • maxMsgSize
      向Broker彙報消息軌跡時,消息體的總大小不能超過該值,默認為128k。
    • discardCount
      整個運行過程中,丟棄的消息軌跡數據,這裏要說明一點的是,如果消息TPS發送過大,異步轉發線程處理不過來時,會主動丟棄消息軌跡數據。
    • traceContextQueue
      traceContext積壓隊列,客戶端(消息發送、消息消費者)在收到處理結果后,將消息軌跡提交到噶隊列中,則會立即返回。
    • appenderQueue
      提交到Broker線程池中隊列。
    • traceTopicName
      用於接收消息軌跡的Topic,默認為RMQ_SYS_TRANS_HALF_TOPIC。
    • traceExecuter
      用於發送到Broker服務的異步線程池,核心線程數默認為10,最大線程池為20,隊列堆積長度2048,線程名稱:MQTraceSendThread_。、
    • traceProducer
      發送消息軌跡的Producer。

    代碼@2:調用getAndCreateTraceProducer方法創建用於發送消息軌跡的Producer(消息發送者),下面詳細介紹一下其實現。

    1.3.2 getAndCreateTraceProducer詳解

    private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) {
            DefaultMQProducer traceProducerInstance = this.traceProducer;
            if (traceProducerInstance == null) {  //@1
                traceProducerInstance = new DefaultMQProducer(rpcHook);
                traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
                traceProducerInstance.setSendMsgTimeout(5000);
                traceProducerInstance.setVipChannelEnabled(false);
                // The max size of message is 128K
                traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);
            }
            return traceProducerInstance;
        }

    代碼@1:如果還未建立發送者,則創建用於發送消息軌跡的消息發送者,其GroupName為:_INNER_TRACE_PRODUCER,消息發送超時時間5s,最大允許發送消息大小118K。

    1.3.3 start

    public void start(String nameSrvAddr) throws MQClientException {
        if (isStarted.compareAndSet(false, true)) {     // @1
            traceProducer.setNamesrvAddr(nameSrvAddr);
            traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
            traceProducer.start();
        }
        this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);   // @2
        this.worker.setDaemon(true);
        this.worker.start();                                                                                   
        this.registerShutDownHook();
    }

    開始啟動,其調用的時機為啟動DefaultMQProducer時,如果啟用跟蹤消息軌跡,則調用之。

    代碼@1:如果用於發送消息軌跡的發送者沒有啟動,則設置nameserver地址,並啟動着。

    代碼@2:啟動一個線程,用於執行AsyncRunnable任務,接下來將重點介紹。

    1.3.4 AsyncRunnable

    class AsyncRunnable implements Runnable {
             private boolean stopped;
        public void run() {
            while (!stopped) {
                List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);     // @1
                for (int i = 0; i < batchSize; i++) {
                    TraceContext context = null;
                    try {
                        //get trace data element from blocking Queue — traceContextQueue
                        context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);        // @2
                    } catch (InterruptedException e) {
                    }
                    if (context != null) {
                        contexts.add(context);
                    } else {
                        break;
                    }
                }
                if (contexts.size() > 0) {                                                                               :
                    AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);  // @3
                    traceExecuter.submit(request);                                                               
                } else if (AsyncTraceDispatcher.this.stopped) {
                    this.stopped = true;
                }
            }
        }
    }

    代碼@1:構建待提交消息跟蹤Bean,每次最多發送batchSize,默認為100條。

    代碼@2:從traceContextQueue中取出一個待提交的TraceContext,設置超時時間為5s,即如何該隊列中沒有待提交的TraceContext,則最多等待5s。

    代碼@3:向線程池中提交任務AsyncAppenderRequest。

    1.3.5 AsyncAppenderRequest#sendTraceData

    public void sendTraceData(List<TraceContext> contextList) {
        Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
        for (TraceContext context : contextList) {        //@1
            if (context.getTraceBeans().isEmpty()) {
                continue;
            }
            // Topic value corresponding to original message entity content
            String topic = context.getTraceBeans().get(0).getTopic();     // @2
            // Use  original message entity's topic as key
            String key = topic;
            List<TraceTransferBean> transBeanList = transBeanMap.get(key);
            if (transBeanList == null) {
                transBeanList = new ArrayList<TraceTransferBean>();
                transBeanMap.put(key, transBeanList);
            }
            TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);    // @3
            transBeanList.add(traceData);
        }
        for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {       // @4
            flushData(entry.getValue());
        }
    }

    代碼@1:遍歷收集的消息軌跡數據。

    代碼@2:獲取存儲消息軌跡的Topic。

    代碼@3:對TraceContext進行編碼,這裡是消息軌跡的傳輸數據,稍後對其詳細看一下,了解其上傳的格式。

    代碼@4:將編碼后的數據發送到Broker服務器。

    1.3.6 TraceDataEncoder#encoderFromContextBean

    根據消息軌跡跟蹤類型,其格式會有一些不一樣,下面分別來介紹其合適。

    1.3.6.1 PUB(消息發送)
    case Pub: {
        TraceBean bean = ctx.getTraceBeans().get(0);
        //append the content of context and traceBean to transferBean's TransData
        sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
         .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);
    }

    消息軌跡數據的協議使用字符串拼接,字段的分隔符號為1,整個數據以2結尾,感覺這個設計還是有點“不可思議”,為什麼不直接使用json協議呢?

    1.3.6.2 SubBefore(消息消費之前)
    for (TraceBean bean : ctx.getTraceBeans()) {
        sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);//
        }
    }

    軌跡就是按照上述順序拼接而成,各個字段使用1分隔,每一條記錄使用2結尾。

    1.3.2.3 SubAfter(消息消費后)
    case SubAfter: {
        for (TraceBean bean : ctx.getTraceBeans()) {
            sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
              .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
              .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
              .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
              .append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
              .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
              .append(ctx.getContextCode()).append(TraceConstants.FIELD_SPLITOR);
            }
        }
    }

    格式編碼一樣,就不重複多說。

    經過上面的源碼跟蹤,消息發送端的消息軌跡跟蹤流程、消息軌跡數據編碼協議就清晰了,接下來我們使用一張序列圖來結束本部分的講解。

    其實行文至此,只關注了消息發送的消息軌跡跟蹤,消息消費的軌跡跟蹤又是如何呢?其實現原理其實是一樣的,就是在消息消費前後執行特定的鈎子函數,其實現類為ConsumeMessageTraceHookImpl,由於其實現與消息發送的思路類似,故就不詳細介紹了。

    2、 消息軌跡數據如何存儲

    其實從上面的分析,我們已經得知,RocketMQ的消息軌跡數據存儲在到Broker上,那消息軌跡的主題名如何指定?其路由信息又怎麼分配才好呢?是每台Broker上都創建還是只在其中某台上創建呢?RocketMQ支持系統默認與自定義消息軌跡的主題。

    2.1 使用系統默認的主題名稱

    RocketMQ默認的消息軌跡主題為:RMQ_SYS_TRACE_TOPIC,那該Topic需要手工創建嗎?其路由信息呢?

    {
        if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {    // @1
            String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
            TopicConfig topicConfig = new TopicConfig(topic);
            this.systemTopicList.add(topic);
            topicConfig.setReadQueueNums(1);                                              // @2
            topicConfig.setWriteQueueNums(1);
            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
        }
    }

    上述代碼出自TopicConfigManager的構造函數,在Broker啟動的時候會創建topicConfigManager對象,用來管理topic的路由信息。

    代碼@1:如果Broker開啟了消息軌跡跟蹤(traceTopicEnable=true)時,會自動創建默認消息軌跡的topic路由信息,注意其讀寫隊列數為1。

    2.2 用戶自定義消息軌跡主題

    在創建消息發送者、消息消費者時,可以显示的指定消息軌跡的Topic,例如:

    public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic)
    
    public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
            AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic)

    通過customizedTraceTopic來指定消息軌跡Topic。

    溫馨提示:通常在生產環境上,將不會開啟自動創建主題,故需要RocketMQ運維管理人員提前創建好Topic。

    好了,本文就介紹到這裏了,本文詳細介紹了RocktMQ消息軌跡的實現原理,下一篇,我們將進入到多副本的學習中。

    作者介紹:
    丁威,《RocketMQ技術內幕》作者,RocketMQ 社區佈道師,公眾號: 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    ※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

    ※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

    ※Google地圖已可更新顯示潭子電動車充電站設置地點!!

    ※帶您來看台北網站建置台北網頁設計,各種案例分享