標籤: 台北網頁設計

  • 深圳已成全球新能源汽車應用規模最大的城市

    目前深圳公交行業已累計投入運營的2350輛,開通新能源公交線路136條,各類新能源汽車累計行駛里程達1.9億公里,初步形成了新能源公交服務網路。成為全球新能源汽車應用規模最大的城市。

    深圳市交委日前介紹,2009年6月,深圳巴士集團率先使用混合動力公車輛,開創了使用新能源車輛進行公交營運的先河。至今,巴士集團已有超千輛新能源車輛在營運,深圳市成為世界最大的新能源車輛的營運城市。

    今年9月22日世界無車日當天,深圳首條純電動大巴線路226路全線投入運營,成為深圳純電動大巴從中等規模分散運營到較大規模集中運營的轉捩點。由於純電動大巴具有無噪音、無污染的特點,因而受到了市民的高度稱讚和熱捧。

    據瞭解,目前,深圳公交行業已累計投入運營的新能源汽車2350輛,其中插電式混合動力大巴1751輛,混合動力雙層大巴20輛,純電動大巴253輛,純電動中巴26輛,純電動計程車300輛。現已開通新能源公交線路136條,各類新能源汽車累計行駛里程達1.9億公里,初步形成了新能源公交服務網路。

    據悉,深圳交通運輸部門將逐步加大新能源汽車在公交行業的推廣應用。2012年,計畫推廣純電動公交大巴1000輛,純電動計程車500輛。力爭至2015年,推廣新能源公交大巴7000輛,純電動計程車3000輛,新能源公交大巴約占全市公車輛總數的50%,使深圳成為全球新能源公車投放最多、運行效果最好、管理最規範的示範城市。

    本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    新北清潔公司,居家、辦公、裝潢細清專業服務

    ※別再煩惱如何寫文案,掌握八大原則!

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※超省錢租車方案

    ※教你寫出一流的銷售文案?

    網頁設計最專業,超強功能平台可客製化

  • 中國高新汽車國際峰會

     

    展會名稱:

    中國高新汽車國際峰會與Automechanika Shanghai

    展館地址:

    中國‧上海,浦東嘉里大酒店

    展會日期:

    2012年12月11-13日

    主辦單位:

    法蘭克福展覽有限公司

    網址:

    會議演講主題:

     

    演講主題範圍包括商業模式,技術創新,基礎設施,服務和生態系統等相關的題目:   ‧ 汽車行業可持續發展的新興商業模式和生態系統合作 ‧ 提高能源效率的混合動力,清潔燃料和電動汽車解決方案 ‧ 個人移動服務的創新設計,技術,市場和服務 ‧ 商用車輛和運輸業的綠色商業模式和經營策略 ‧ 連網車輛和遠程信息處理技術,服務和應用所產生的新收益源 ‧ 充電技術,基礎設施和不斷發展的商業模式 ‧ 先進的電池技術和管理系統,以提高經濟負擔能力 ‧ 替代低碳燃料,能源效率和減少廢氣排放 ‧ 最高性能和效率的動力/傳動系統和電子控制系統 ‧ 先進的材料和製造工藝,以提高生產力和降低成本 ‧ 綠色汽車市場的發展,包括以消費者為導向的策略,售後市場的增長和經銷商的機會 ‧ 未來發展和國際化的投資及融資策略

    演講者及參會者

    峰會的來賓將有來自C級管理,戰略規劃等有影響力的高層決策者,產品創新與服務高管,專業技術人員和工程師:   ‧ 客運和商用車原始設備製造商 ‧ 主要供應商和零部件製造商 ‧ 戰略投資者 ‧ 專業技術人員和工程師 ‧ 車隊管理人員和經營者 ‧ 公用事業和能源服務公司 ‧ 替代燃料供應商 ‧ 先進的傳動系統和電池系統開發商 ‧ 服務提供商 ‧ 政府管理部門和市政領導 ‧ 具有影響力的行業研究機構  

     

    連絡資訊

    連絡人:葉家欣小姐

    電話:(852) 2230 9202

    電子郵件:keiann.yip@hongkong.messefrankfurt.com

    官方網站:www.nextgenautosummit-china.com

    本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※教你寫出一流的銷售文案?

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※回頭車貨運收費標準

    ※別再煩惱如何寫文案,掌握八大原則!

    ※超省錢租車方案

    ※產品缺大量曝光嗎?你需要的是一流包裝設計!

  • 程序員不能一直停留在愛學習的階段

    今天在人人都是產品經理的上,看到一篇文章 《一個創業程序員的35歲人生總結(下)》 。其實也道出了我曾經作為技術人員,各種失敗的嘗試。

     

    下面是一種的一段引用,我非常認可

    先說技術,技術是我死磕時間最長的技能。最早在大學選擇FLASH,完全是出於愛好,當時別說我,全世界估計也沒幾個人能預測到僅僅兩年後,FLASH程序員就會隨着網頁遊戲的興起,成為當時最搶手的程序員種類之一。後來畢業了,選工作的時候,更多是學習的心態,創業什麼的,甚至工資,都無所謂,只要能提升技術就行。後來技術到一定程度了,就希望能幫助項目和公司更好地實現大家想要的產品,最終實現大家共同的夢想。

    在類似我經歷的公司中,有兩個問題,會同時困擾大部分程序員和老闆。第一個問題就是“學習”!

    程序員,尤其是前端程序員,天生有一種極強的學習慾望。前端這門技術,半年不學習可能就要落後,一年不學習估計就有被淘汰的風險了。程序員愛學習,不停提升自己的專業技能,這本來是好事。

    但是對於很多創業公司,卻成為不能承受之重。因為很多程序員,會極端地掉入學習的漩渦中,簡直跟掉入錢眼兒里的老闆有得一拼,眼中除了學習啥也容不下,比如曾經的我。更要命的是,有些程序員,自己的人生規劃和學習方向,還跟公司的業務方向不太一致。

    我是06年畢業,畢業就進入了一所當時還不錯的互聯網公司,公司名稱就不說了,反正對這家公司也沒有啥好感,雖然現在很多人都想進去修福報。

      在這家公司裏面,認識了幾個比較好的朋友,算是一個非常大的收穫。更關鍵的是,我們都是一群比較有想法的人,喜歡用技術去做各種各樣的嘗試。   在2006年的時候,我們第一次嘗試做一個網址收藏夾。當時的想法很簡單,我們可以把自己的喜歡的網站地址收藏起來,並且可以隨時隨地的分享到網站裏面,實現起來算是比較簡答。但是對標了一個競品,名字忘記了,好像叫做 ”好網角“。當時三個人,一個負責產品,兩個搞技術。按照道理說,用wordpress之類的網站很快的。結果當時我們兩個做技術的,被公司的環境給洗腦了,覺得一定要用牛逼的技術做出來的東西才有價值。等基礎架構搭建完成之後,我們就不想寫業務代碼了,覺得好無趣,結果不了了之。   在2007年,由於當時我們都是單身,有一天吃飯,想到了是不是可以做一個妹子網站,上面都是妹子。作為單身者,尤其是程序員,完全可以去上面找妹子約會。結果還是卡在了產品設計,因為沒有願意做產品,都想着做技術。由於公司的引導,內部開始架構化轉型,從此開始了架構文化,我這個時候對技術的更加執着。   在2008年和2009年這兩年之間,加班比較多。正好趕緊上公司晉陞P,就比較老實了,沒有太多的想法。   在2010年,微博出來,比較火爆的時候。發現一個蠻有趣的現象,就是微博必須註冊才能看內容,屏蔽了百度的搜索。當時我們知道做垃圾站可以賺錢,就是利用百度seo的流量。就開始搞。這一次吸取了教訓,快速用wordpress搭建了一個網站,然後也不不用爬蟲,直接人工編輯的方式,每天人肉搬運微博最熱門的內容。後來就搬運百度top的內容,反正就是什麼熱門放什麼。這個時候就已經開始在考慮,能不能把今天最熱門的資訊信息找出來,做成一個類似今日熱門的諮詢網站。我們的方法很原始,就是爬蟲去top.baidu.com,微博熱門資訊排行的內容。運營半年的時候,有一天一個帖子爆了,當時獨立IP直接突破5W。不過也因為這個,被新聞辦公廳警告了。後面連續幾次違規,省新聞辦公廳以沒有新聞出版牌照,把網站關停了。不過這個網站看來,還算是比較成功的。至少運營了大半年,也開始讓我思考運營的價值,時間的價值。   2011-2013年,在網站被關停之後,我就意識到內容的價值,開始持續輸出技術文章。曾經的blog鏈接: https://www.cnblogs.com/aigongsi/ ,一共100多篇文章,大概500W閱讀。 不過在輸出內容上,也犯了一個同樣的錯誤,就是覺得應該寫能突出技術能力的內容。當時身邊的技術人都挺瞧不起阮一峰的,覺得他的內容太基礎,沒有啥技術含量。在我輸出內容的時候,純粹就是愛好,沒有考慮目標受眾和持續的運營,更沒有想過IP的問題。這個時候如果有產品經理思維,可能就會明白,阮一峰的目標受眾非常廣,並且商業變現價值是非常高。而所謂技術架構類文章,受眾就小了很多,商業的價值更在於合作,而非變現上。   2015年開始創業,就慢慢開始轉向產品、營銷,運營,算是徹底轉型了。   我現在身邊的技術圈的朋友都是我的同事,都在創業,但是和他們交流起來,他們談論最多的還是怎麼實現功能,用什麼樣的方案,能夠支撐多少用戶量。很少談論到我們的用戶是誰,他們有什麼樣的需求,我們的產品怎麼解決這個需求。更不會考慮我們的產品適合在什麼樣的推廣方式,是內容營銷,還是渠道合作,或者說sem。    
    為什麼不能一直停留在愛學習的階段 其實程序員應該是最愛學習的一個群體。包括很多知識付費,都在輸出各種知識,主要面向的對象都是各種專業技能的。 對於初學者,學習能力肯定是重要的能力。你必須具備基本的知識、技能、經驗。如果你是科研工作者,持續的學習研究是晉陞的必要條件。   商業或者創業,本質上是要拿結果的,反映在經營公司上就是“利潤”,反映在產品上,就是用戶數的增長。技術人員肯定有太多的理由拒絕這些結果,比如曾經我覺得做這些沒有技術挑戰;這些臟活累活對我的職業生涯發展不好;我不喜歡這樣的用戶,他們太low了。   反映在職場上,你也要拿結果。我第一家公司之前一直強調以結果為導向。如果你要晉陞,和領導關係是不是要處理,向上管理是不是要學會。晉陞的話,要準備PPT,基本的PPT技能和演講是不是要學會。如果說邀功是晉陞的必要條件,那就怎麼學者做自我營銷,在公司裏面做自己的IP,讓自己更多的曝光,這樣晉陞的幾率是不是更高一些。我過去這些基本上全部都沒有做到,以至於在職場很難混下去。   如果你以拿結果的思維去看一些事情,技術的牛逼與否僅僅是其中的一個環節。很多時候,我們說自己愛學習,其實是給自己找了一個不去拿結果的借口。因為拿結果太難了,並且很多時候都會面臨失敗。當我們害怕失敗的時候,就會找一個理由拒絕行動,並且這個理由會讓加強我們的某些特質,那麼“愛學習”這個就是非常好一個理由。某些知識付費上癮者,內心是在逃避。   本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※帶您來了解什麼是 USB CONNECTOR  ?

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    ※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

    ※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※教你寫出一流的銷售文案?

  • C#數據結構與算法系列(十):逆波蘭計算器——逆波蘭表達式(後綴表達式)

    C#數據結構與算法系列(十):逆波蘭計算器——逆波蘭表達式(後綴表達式)

    1.介紹

    後綴表達式又稱逆波蘭表達式,與前綴表達式相似,只是運算符位於操作數之後

    2.舉例說明

    (3+4)*5-6對應的後綴表達式就是3 4 +5 * 6 –

    3.示例

    輸入一個逆波蘭表達式(後綴表達式),使用棧(Stack),計算其結果

    思路分析:

    從左至右掃描表達式,遇到数字時,將数字壓入堆棧,遇到運算符時,彈出棧頂的兩個數,用運算符對它們做相應的計算(次頂元素 和 棧頂元素),並將結果入棧;

    重複上述過程直到表達式最右端,最後運算得出的值即為表達式的結果例如: (3+4)×5-6 對應的後綴表達式就是 3 4 + 5 × 6 – , 

    針對後綴表達式求值步驟如下:

    從左至右掃描,將3和4壓入堆棧;
    遇到+運算符,因此彈出4和3(4為棧頂元素,3為次頂元素),計算出3+4的值,得7,再將7入棧;
    將5入棧;
    接下來是×運算符,因此彈出5和7,計算出7×5=35,將35入棧;
    將6入棧;
    最後是-運算符,計算出35-6的值,即29,由此得出最終結果

    代碼實現:

    using System;
    using System.Collections.Generic;
    using System.Text.RegularExpressions;
    
    namespace DataStructure
    {
        public class PolandNotation
        {
            public static void Test()
            {
                try
                {
                    //定義逆波蘭表達式
                    string suffixExpression = "3 4 + 5 * 6 -";
    
                    //將suffixExpression轉換成鏈表的方式
                    var list = GetListString(suffixExpression);
    
                    //輸出結果
                    var result = Calculate(list);
    
                    Console.WriteLine($"{suffixExpression}的結果是{result}");
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
               
            }
            /// <summary>
            /// 獲取集合
            /// </summary>
            /// <param name="suffixExpression"></param>
            /// <returns></returns>
            public static List<string> GetListString(string suffixExpression)
            {
                //首先實例化List
                List<string> list = new List<string>();
    
                //將字符串通過空格切換成數組
                string[] split=suffixExpression.Split(" ");
    
                //循環添加
                foreach (var item in split)
                {
                    list.Add(item);
                }
    
                return list;
            }
    
            /// <summary>
            /// 計算
            /// </summary>
            /// <param name="list"></param>
            /// <returns></returns>
            public static int Calculate(List<string> list)
            {
                //創建棧
                Stack<string> stack = new Stack<string>();
    
                //循環遍歷
                list.ForEach(item =>
                {
                    //正則表達式判斷是否是数字,匹配的是多位數
                    if (Regex.IsMatch(item,"\\d+"))
                    {
                        //如果是数字直接入棧
                        stack.Push(item);
                    }
                    //如果是操作符
                    else
                    {
                        //出棧兩個数字,並運算,再入棧
                        int num1 =int.Parse(stack.Pop());
    
                        int num2 = int.Parse(stack.Pop());
    
                        int result = 0;
    
                        if(item.Equals("+"))
                        {
                            result = num2 + num1;
                        }
                        else if(item.Equals("*"))
                        {
                            result = num2 * num1;
                        }
                        else if(item.Equals("/"))
                        {
                            result = num2 / num1;
                        }
                        else if (item.Equals("-"))
                        {
                            result = num2 - num1;
                        }
                        else
                        {
                            throw new Exception("無法識別符號");
                        }
    
                        stack.Push(""+result);
                    }
                });
    
                //最後把stack中數據返回
                return int.Parse(stack.Pop());
            }
        }
    }

    結果圖:

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※為什麼 USB CONNECTOR 是電子產業重要的元件?

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※台北網頁設計公司全省服務真心推薦

    ※想知道最厲害的網頁設計公司"嚨底家"!

    新北清潔公司,居家、辦公、裝潢細清專業服務

    ※推薦評價好的iphone維修中心

  • Kubernetes學習筆記(九):StatefulSet–部署有狀態的多副本應用

    StatefulSet如何提供穩定的網絡標識和狀態

    ReplicaSet中的Pod都是無狀態,可隨意替代的。又因為ReplicaSet中的Pod是根據模板生成的多副本,無法對每個副本都指定單獨的PVC。

    來看一下StatefulSet如何解決的。

    提供穩定的網絡標識

    StatefulSet創建Pod都有一個從零開始的順序索引,這會體現在Pod的名稱和主機名上,同樣也會體現在Pod對應的固定存儲上。所以這些名字是可預先知道的,不同於ReplicaSet的隨機生成名字。

    因為他們的名字都是固定的,而且彼此狀態都不同,通常會操作他們其中的一個。如此情況,一般都會創建一個與之對應的headless Service,通過這個Service,每個Pod將擁有獨立的DNS記錄。

    擴容一個StatefulSet會使用下一個順序索引創建一個新的Pod,縮容會刪除索引值最高的。並且縮容任何時候只會操作一個Pod。

    如何提供穩定的存儲

    StatefulSet可以擁有一個或多個PVC模板,這些PVC會在創建Pod前創建出來,綁定到一個Pod實例上。

    擴容的時候會創建一個Pod以及若干個PVC,刪除的時候只會刪除Pod。StatefulSet縮容時不會刪除PVC,擴容時會重新掛上。

    使用StatefulSet

    定義三個PV

    定義pv-(a|b|c)

    # stateful-pv-list.yaml
    apiVersion: v1
    kind: PersistentVolume
    metadata:
      name: pv-a
    spec:
      capacity:
        storage: 1Mi
      accessModes:
      - ReadWriteOnce
      persistentVolumeReclaimPolicy: Recycle
      hostPath:
        path: /tmp/pva
    ---
    apiVersion: v1
    kind: PersistentVolume
    # 以下忽略
    

    headless的Service

    # stateful-service-headless.yaml
    apiVersion: v1
    kind: Service
    metadata:
      name: rwfile
    spec:
      clusterIP: None
      selector:
        app: rwfile
      ports:
      - port: 80
    

    定義StatefulSet

    先創建兩個Pod副本。使用volumeClaimTemplates定義了PVC模板。

    # stateful.yaml
    apiVersion: apps/v1
    kind: StatefulSet
    metadata:
      name: rwfile
    spec:
      replicas: 2
      serviceName: rwfile
      selector:
        matchLabels:
         app: rwfile
      template:
        metadata:
          labels:
            app: rwfile
        spec:
          containers:
          - image: registry.cn-hangzhou.aliyuncs.com/orzi/rwfile
            name: rwfile
            ports:
            - containerPort: 8000
            volumeMounts:
            - name: data
              mountPath: /tmp/data
      volumeClaimTemplates:
      - metadata:
          name: data
        spec:
          resources:
            requests:
              storage: 1Mi
          accessModes:
          - ReadWriteOnce
    

    創建三個PV,一個headless的Service,一個StatefulSet

    -> [root@kube0.vm] [~] k create -f stateful-pv-list.yaml
    persistentvolume/pv-a created
    persistentvolume/pv-b created
    persistentvolume/pv-c created
    
    -> [root@kube0.vm] [~] k create -f stateful-service-headless.yaml
    service/rwfile created
    
    -> [root@kube0.vm] [~] k create -f stateful.yaml
    statefulset.apps/rwfile created
    

    查看

    -> [root@kube0.vm] [~] k get all -o wide
    NAME                    READY   STATUS      RESTARTS   AGE   IP            NODE       NOMINATED NODE   READINESS GATES
    pod/rwfile-0            1/1     Running     0          12s   10.244.1.52   kube1.vm   <none>           <none>
    pod/rwfile-1            1/1     Running     0          8s    10.244.2.56   kube2.vm   <none>           <none>
    
    NAME                 TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)   AGE   SELECTOR
    service/kubernetes   ClusterIP   10.96.0.1    <none>        443/TCP   81s   <none>
    service/rwfile       ClusterIP   None         <none>        80/TCP    23s   app=rwfile
    
    NAME                      READY   AGE   CONTAINERS   IMAGES
    statefulset.apps/rwfile   2/2     12s   rwfile       registry.cn-hangzhou.aliyuncs.com/orzi/rwfile
    

    查看PV和PVC,可以看到已經有兩個PVC綁定了PV

    -> [root@kube0.vm] [~] k get pv,pvc -o wide
    NAME                    CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS      CLAIM                   STORAGECLASS   REASON   AGE     VOLUMEMODE
    persistentvolume/pv-a   1Mi        RWO            Recycle          Bound       default/data-rwfile-0                           7m20s   Filesystem
    persistentvolume/pv-b   1Mi        RWO            Recycle          Bound       default/data-rwfile-1                           7m20s   Filesystem
    persistentvolume/pv-c   1Mi        RWO            Recycle          Available                                                   7m20s   Filesystem
    
    NAME                                  STATUS   VOLUME   CAPACITY   ACCESS MODES   STORAGECLASS   AGE     VOLUMEMODE
    persistentvolumeclaim/data-rwfile-0   Bound    pv-a     1Mi        RWO                           6m55s   Filesystem
    persistentvolumeclaim/data-rwfile-1   Bound    pv-b     1Mi        RWO                           6m51s   Filesystem
    

    請求Pod

    啟動代理

    -> [root@kube0.vm] [~] k proxy
    Starting to serve on 127.0.0.1:8001
    

    發送請求

    -> [root@kube0.vm] [~] curl http://localhost:8001/api/v1/namespaces/default/pods/rwfile-0/proxy/ -d "a=123"
    data stored in : rwfile-0
    
    -> [root@kube0.vm] [~] curl http://localhost:8001/api/v1/namespaces/default/pods/rwfile-0/proxy/
    a=123
    

    刪除測試

    刪除rwfile-0,然後查看,從時間上看確實是被刪除重建的。

    -> [root@kube0.vm] [~] k delete po rwfile-0
    pod "rwfile-0" deleted
    
    -> [root@kube0.vm] [~] k get po
    NAME                READY   STATUS      RESTARTS   AGE
    rwfile-0            1/1     Running     0          7s
    rwfile-1            1/1     Running     0          19m
    

    看一下之前存儲的數據還在不在

    -> [root@kube0.vm] [~] curl http://localhost:8001/api/v1/namespaces/default/pods/rwfile-0/proxy/
    a=123
    

    還是在的,此次測試實際上也證明了StatefulSet提供了穩定的網絡標識和存儲。

    發現StatefulSet的夥伴節點

    使用DNS解析headless的Service的FQDN。
    例子以後再寫吧。。

    如何處理節點失效

    除非確定節點無法運行或者不會在訪問,否則不要強制刪除有狀態的Pod

    k delete pod rwfile-0 --force --grace-period 0
    

    小結

    • StatefulSet創建Pod都有一個從零開始的順序索引
    • 通常會創建一個與StatefulSet對應的headless Service。
    • 擴容一個StatefulSet會使用下一個順序索引創建一個新的Pod,縮容會刪除索引值最高的。
    • 新建StatefulSet需要指定headless ServiceName和volumeClaimTemplates。
    • 使用DNS發現StatefulSet的夥伴節點
    • 強制刪除:k delete pod rwfile-0 --force --grace-period 0

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
    【其他文章推薦】

    USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

    台北網頁設計公司這麼多該如何選擇?

    ※智慧手機時代的來臨,RWD網頁設計為架站首選

    ※評比南投搬家公司費用收費行情懶人包大公開

    ※幫你省時又省力,新北清潔一流服務好口碑

    ※回頭車貨運收費標準

  • 挖洞入門_union型SQL注入

    挖洞入門_union型SQL注入

    簡介:在漏洞盒子挖洞已經有一段時間了,雖說還不是大佬,但技術也有所進步,安全行業就是這樣,只有自己動手去做,才能將理論的知識變為個人的經驗。本篇文章打算分享一下我在挖union型SQL注入漏洞過程中的一些個人理解,如有不足也請大佬不吝指教。

    0x00:什麼是SQL注入

    SQL注入,相信大多數人一開始接觸安全,聽說的第一種漏洞類型就會是SQL注入,眾所周知,其本質就是將用戶輸入的數據當成了SQL語句來執行

    開發過網站的朋友應該都清楚,大多數的小型企業或個人的站點大都採用了LAMP結構,即Linux + Apache + MySQL + PHP,當然還有一些其它常見的技術如下錶:

    操作系統 Web服務器 數據庫 編程語言
    Linux Apache MySQL PHP
    Windows Server Nginx Oracle JSP
    Tomcat SQL Server ASP
    Python

    總的來說,絕大多數網站都採用了動態Web開發技術,而動態Web開發離不開數據庫,如果沒有處理好這兩者之間的關係,那麼SQL注入就會隨之而來了。

    舉例來說,當我們想要通過參數id來獲取相對應的新聞時,整個過程簡單來說就是用戶通過URL請求新聞–>後台通過用戶請求去數據庫查詢相對應的新聞–>將查詢到的新聞回傳給用戶。在第二步查詢相對應的新聞時,後台會執行SQL語句來查詢,就像SELECT * FROM news WHERE id=''id的值是用戶來控制的,當id=1時,就會返回id=1的新聞,id=2時返回id=2的新聞,以此類推,就可以動態的控制web界面了。

    這時,當用戶輸入的id值不正確時,後台就無法獲取相對應的新聞,前端就會沒有數據显示,可當用戶輸入的數據為1'; DROP TABLE news-- a時,恐怖的事情就發生了,數據庫中的news表被刪除了,這就說明這個參數存在SQL注入

    回到剛才用戶輸入的數據,拼接到後台查詢數據時,整個SQL語句就變成了SELECT * FROM news WHERE id='1'; DROP TABLE news-- a',分析這條語句可知,用戶輸入的單引號閉合了id的值分號閉合了SELECT語句,然後又新建了一條DROP語句刪除了表news,最後的— a註釋掉了id值后的那個單引號,SQL注入就這麼產生了。

    當一個站點存在SQL注入時用戶的輸入就可以傳入數據庫執行,理論上這樣可以獲得數據庫的全部數據,也就是常說的脫庫了。獲得數據的方法也多種多樣,可以通過頁面直接返回想要查詢的數據,也可以通過sleep延時函數猜測數據,都不行的話我們還可以使用DNS解析日誌來獲得數據。其中,最簡單的一種方法就是union型的SQL注入了。

    union型SQL注入只是SQL注入的其中一種,也是最簡單的一種,對於這種漏洞的防範也特別簡單,可這種漏洞在互聯網中仍不計其數…這也可見全國乃至全球對於網絡安全知識普及的不足,接下來,我會從三個方面來講講這種漏洞,分別是為什麼會產生怎麼利用以及怎麼防範

    0x01:為什麼會產生union型SQL注入

    union型SQL注入,看名字就能知道,使用這種方法可以直接在頁面中返回我們要查詢的數據,方法也很簡單,即使用UNION聯合查詢即可。

    但使用UNION聯合查詢時還要滿足一個條件,那就是我們構造的SELECT語句的字段數要和當前表的字段數相同才能聯合查詢,即首先我們要確定當前表的字段數。order by x是數據庫中的一個排序語句,order by 1即通過第一個字段進行排序。這時我們就可以構造SELECT * FROM news WHERE id='1' order by x-- a'來猜測當前表的字段數,x值遞增,當頁面返回數據異常時,即無當前字段時,用當前的x值減一即可得到當前表的字段數了。

    知道了當前表的字段數,就可以進行UNION聯合查詢了。但聯合查詢時,頁面只會显示查詢到數據的第一條,也就是UNION前的SELECT語句的結果,想要显示我們自己聯合查詢的結果時,還必須使前一條語句失效,這裏我們構造and 1=2使前一句SELECT語句失效。回到剛才的案例,假設當前表的字段數為3,我們就可以構造SELECT * FROM news WHERE id='1' and 1=2 UNION SELECT 1,2,3-- a'來查詢當前頁面的顯錯點了,通過下圖的案例可知,當前的顯錯點為第一字段第三字段

    這個顯錯點又是什麼意思呢?比如當前表中共有三個字段,一個是標題(title)、一個是時間(time)、一個是內容(data),而我們前端不需要显示時間,只需要展示標題和內容即可。那麼從數據庫獲得的數據中,也只有標題字段和內容字段會展示在頁面上,這兩個點就是顯錯點

    通過這裏的顯錯點,用戶就可以獲得數據庫中的所有數據了。當用戶輸入的數據為1' and 1=2 UNION SELECT 1,2,database()-- a時,即SQL語句為SELECT * FROM news WHERE id='1' and 1=2 UNION SELECT 1,2,database()-- a'時,就可以直接得到數據庫的庫名

    0x02:怎麼利用union型SQL注入

    1.判斷是否存在注入

    構造and 1=1/and 1=2查看頁面是否有異常,若有異常,即有可能存在注入,另外還可通過該語句判斷該站點是否有WAF,若有WAF的話會有攔截警告,當然,WAF也是可以繞過的。。。

    2.查詢當前表的字段數

    構造order by x,當頁面返回異常時,利用x減一即可得到當前表的字段數

    3.查詢顯錯點

    構造and 1=2 union select 1,2,3,若頁面显示了我們構造的1,2,3,則對應的字段即為顯錯點

    4.查詢數據庫庫名

    構造and 1=2 union select 1,2,database(),即可在顯錯點显示當前數據庫庫名

    一般挖漏洞的話到此步驟就可以提交了,切記千萬不可非法獲得數據,挖洞有風險,同志需謹慎!

    5.查詢數據庫中的表名

    構造and 1=2 union select 1,2,table_name from information_schema.tables where table_schema=database() limit 0,1,即可在顯錯點显示當前庫中的表名,因為顯錯點一次只能显示一條數據,這時可以通過limit語句選擇不同的表名進行查看。

    6.查詢選擇表中的字段名

    構造and 1=2 union select 1,2,column_name from information_schema.columns where table_schema=database() and table_name='XXX' limit 0,1,即可在顯錯點显示字段名,這裏也是通過limit語句選擇不同的字段名進行查看。

    7.查詢數據庫中的數據

    構造and 1=2 union select 1,2,XXX from XXX limit 0,1,即可獲得數據庫中的數據了。

    0x03:怎麼防範union型SQL注入

    針對union型SQL注入,我們可以對用戶輸入的數據進行一次篩查,設置黑名單,攔截注入常用的一些關鍵詞,比如andorder byunionselectfrom等。

    除了設置黑名單外,還有一種比較靠譜的方法,即使用預編譯語句,而不是動態的生成SQL語句,這樣可以有效的避免用戶輸入的數據連接到數據庫執行,就是實現起來比較複雜,需要設置大量的預編譯語句。

    另外還有一種目前最靠譜的方法,實現起來還簡單,就是上硬件防火牆。。。就是有點小貴。

    0x04:互聯網中的一些案例

    依據網絡安全法,本文旨在分享個人學習經驗,內容禁止用於違法犯罪行為!

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

  • RocketMQ(1)—架構原理及環境搭建

    RocketMQ(1)—架構原理及環境搭建

    一、架構簡述

    RocketMQ阿里開源的一個分佈式消息傳遞和流媒體平台,具有低延遲,高性能和可靠性, 萬億級容量和靈活的可伸縮性。跟其它中間件相比,RocketMQ的特點是JAVA實現,在發生宕機和其它故障時消息丟失率更低

    它由四個部分組成:nameserver,broker,生產者和消費者。它們中的每一個都可以水平擴展,而沒有單個故障點。  

    Nameserver:提供輕量級的服務發現和路由。生產者和消費者通過nameserver獲取broker信息。它幾乎是無狀態的,nameserver結點之間沒有任何的數據同步,broker註冊信息時會註冊到每一個nameserver結點上面,所以每個nameserver節點都記錄了完整broker信息,提供相應的讀寫服務,並支持快速的存儲擴展。

    Broker:通過提供輕量級的topic和queue機制來存儲消息。與nameserver中的每個節點建立長連接,定時註冊topic等信息到nameserver上面。broker一般都是主從模式,因為消息是真實存儲消息的地方,避免一個結點掛了,導致這個節點數據全部丟失。

    Producer:與nameserver集群中的一個結點建立長連接,定期的拉取broker 的topic路由信息,再將消息發送到對應broker的topic上面

    Consumer:與nameserver集群中的一個結點建立長連接,定期的拉取broker 的topic路由信息,再去消費對應broker的topic信息

     

     

    二、環境搭建

     1.官網下載:http://rocketmq.apache.org/release_notes/release-notes-4.7.0/

    2.解壓 unzip rocketmq-all-4.7.0-bin-release.zip

    3.修改啟動參數配置。默認的jvm參數內存設置特別大,如果自己機器不行的話需要手動改下bin目錄下的啟動參數文件:runbroker.sh 和runserver.sh文件 我的虛擬機內存分配不大,改成256m 256m 128m

       這是默認的

     

     

    4.啟動nameserver: nohup sh mqnamesrv ‐n 192.168.0.67:9876 &   (將日誌輸出當前目錄的nohub.out文件,方便查看啟動日誌,ip是當前機器的ip)

     

     

     

     5.啟動broker:nohup sh mqbroker ‐n 192.168.0.67:9876 autoCreateTopicEnable=true &  (autoCreateTopicEnable=true 自動創建topic,如果不設置true的話,生產者發送消息的時候如果沒有topic就會發送失敗,需要提前把topic創建好,設置true會在發送時自動創建topic,192.168.0.67:9876 是name server)

    也可以使用配置文件啟動broker:nohup sh mqbroker ‐n 192.168.0.67:9876 ‐c conf/broker.conf &

    簡單看下默認的配置文件中的一些參數:

    #集群名字
    brokerClusterName = DefaultCluster
    #broker名字,集群中主從都要用這個名字,才會組成一個集群
    brokerName = broker-a
    #id為0的是master  非0的slava
    brokerId = 0
    #消息處理時間,凌晨4點
    deleteWhen = 04
    #消息保存時間默認48小時,48小時之後的凌晨4點就會清理
    fileReservedTime = 48
    #集群主從之間數據同步方式 
    #異步只需要發到master成功就返回客戶端段成功,性能高,但是如果master掛了 slave還未同步就會丟失消息。根據自身業務場景選擇合適方式
    brokerRole = ASYNC_MASTER
    #消息刷盤機制,和主從數據同步類似,同步就是說需要寫進磁盤了才返回成功。異步就是寫進內存了就返回成功,後面再去落盤。
    flushDiskType = ASYNC_FLUSH
    #自動創建topic
    autoCreateTopicEnable=true

    使用配置文件啟動:nohup sh bin/mqbroker ‐n 192.168.0.67:9876 -c conf/broker.conf &

    broker 192.168.0.67:10911關聯的nameserver是192.168.0.67:9876

     

     

    至此一個單機的rocketMQ的環境就搭建好了   正常退出: sh mqshutdown broker  和  sh mqshutdown namesrv

     

    測試下消息發送,使用rocketMQ提供的測試腳本:

    export NAMESRV_ADDR=192.168.0.67:9876

    生產者腳本

    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer 消費者腳本

    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer 發送消息:  

     

     消費消息:

     

    三、控制台搭建

    1.下載rocketMQ的擴展包,master分支:https://github.com/apache/rocketmq-external 

     2.啟動rocketmq-console-ng模塊

     

     3.修改此模塊配置:

      3.1 maven依賴rocketMQ版本改成自己部署版本對應的,我部署的MQ是最新的4.7.0版本

     

     

      3.2 配置文件中配置namserver地址和控制台數據存放地址

     

     正常來說改完這兩個地方就可以直接啟動控制台的這個springboot程序了。

    但是因為我用的MQ是最新的4.7版本,控制台對應的還沒有更新到最新的。編譯都有會報錯的地方

    1.DefaultMQPullConsumer這個類已經不推薦使用了,並且4.7.0中有兩個類似的構造器,原來代碼直接傳了一個null,第二個參數無法識別是哪個構造器的。修改下把第二參數強轉String或者RPCHook

     

     

    2.MQAdminExt這個接口中加了新方法,但是控制台中MQAdminExtImpl還沒有實現對應的方法。這個問題在github上幾天前已經有人提了Issues了,我這裡是自己添加一下默認實現然後服務就可以正常啟動了,還不確定後續後面有什麼影響沒有,至少可以啟動了

     

     

     

     

    之前使用測試腳本發送的消息 以及topic都可以在控制台看到了

     

     

     

     

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※Google地圖已可更新顯示潭子電動車充電站設置地點!!

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※別再煩惱如何寫文案,掌握八大原則!

    網頁設計最專業,超強功能平台可客製化

  • 接口中的默認方法與靜態方法

    在Java8之前的版本中,接口中只能聲明常量和抽象方法,接口的實現類中必須實現接口中所有的抽象方法。而在Java8中,接口中可以聲明默認方法靜態方法。

    接口中的默認方法

    Java 8中允許接口中包含具有具體實現的方法,該方法稱為“默認方法”,默認方法使用“ default ”關鍵字修飾 。

    示例:

    public interface MyInterface {
        default String getMsg(String srcMsg){
            return "======"+srcMsg;
        }
    }

    接口中的默認方法,有一個“類優先”原則:

    若一個接口中定義了一個默認方法,而另外一個父類或接口中又定義了同一個同名的方法時:

    • 選擇父類中的方法。如果一個父類提供了具體的實現,那麼接口中具有相同名稱的參數的默認方法會被忽略。
    • 接口衝突。如果一個父接口提供一個默認方法,而另一個接口中也提供了一個具有相同名稱和參數列表的方法(不管方法是否是默認方法),那麼必須覆蓋該方法來解決衝突。

    示例1:

    public interface MyInterface1 {
        default String getMsg(String srcMsg){
            return "===我是MyInterface1111111==="+srcMsg;
        }
    }
    ///////////////////////////////////////////////////////
    public class MyClass1 {
        public String getMsg(String srcMsg){
            return "===我是MyClass11111==="+srcMsg;
        }
    }
    ///////////////////////////////////////////////////////
    public class MySubClass1 extends MyClass1 implements MyInterface1 {
    }
    
    ///////////////////////////////////////////////////////
    public class InterfaceTest {
    
        public static void main(String[] args) {
            MySubClass1 ms1 = new MySubClass1();
    
            String srcMsg = "Java 牛逼!!";
            //MySubClass1 類繼承了 MyClass1 類,實現了MyInterface1 接口,根據類優先原則,調用同名方法時,會忽略掉接口 MyInterface1 中的默認方法。
            System.out.println(ms1.getMsg(srcMsg));//輸出結果:===我是MyClass11111===Java 牛逼!!
    
        }
    }

    示例2:

    public interface MyInterface2 {
        default String getMsg(String srcMsg){
            return "===我是MyInterface2222222==="+srcMsg;
        }
    }
    ////////////////////////////////////////////////////////////////
    public class MySubClass2 implements MyInterface1,MyInterface2 {
        @Override
        public String getMsg(String srcMsg) {
            //同時實現了 MyInterface1,MyInterface2  接口,根據 類優先 原則,兩個父接口中都提供了相同的方法,那麼子類中就必須重寫這個方法來解決衝突。
            return MyInterface1.super.getMsg(srcMsg);
            //return MyInterface2.super.getMsg(srcMsg);
            //return "------"+srcMsg;
        }
    }
    ////////////////////////////////////////////////////////////////
    public class InterfaceTest {
    
        public static void main(String[] args) {
            MySubClass2 ms2 = new MySubClass2();
    
            //MySubClass2 重新實現了兩個父接口中都存在的相同名稱的方法。
            System.out.println(ms2.getMsg(srcMsg));//輸出結果:===我是MyInterface1111111===Java 牛逼!!
        }
    }

     

    接口中的靜態方法

    在Java8中,接口中允許添加 靜態方法,使用方式:“接口名.方法名”

    示例:

    public interface MyInterface3 {
        static String getMsg(String msg){
            return "我是接口中的靜態方法:"+msg;
        }
    
        static void main(String[] args) {
            System.out.println(MyInterface3.getMsg("Java牛逼!!"));
        }
    }

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    南投搬家公司費用需注意的眉眉角角,別等搬了再說!

    新北清潔公司,居家、辦公、裝潢細清專業服務

    ※教你寫出一流的銷售文案?

  • 磨皮美顏算法 附完整C代碼

    磨皮美顏算法 附完整C代碼

    前言

    2017年底時候寫了這篇《集 降噪 美顏 虛化 增強 為一體的極速圖像潤色算法 附Demo程序

    這也算是學習過程中比較有成就感的一個算法。

    自2015年做算法開始到今天,還有個把月,就滿五年了。

    歲月匆匆,人生能有多少個五年。

    這五年裡,從音頻圖像到視頻,從傳統算法到深度學習,從2D到3D各種算法幾乎都走了一個遍。

    好在,不論在哪個領域都能有些許建樹,這是博主我自身很欣慰的事情。

    雖然有所間斷但是仍然堅持寫博客,並且堅持完整開源分享。

    目的就是為了幫助那些一開始跟我一樣,想要學習算法的萌新,

    一起踏入算法領域去跟大家“排排坐,吃果果”。

    引子

    在這個特別的時間點,就想做點特別的事情。

    那就是開源當時寫的這個“美顏算法”,開源代碼和當時的版本有些許出入,但是思路是一樣的。

    早些年的時候大家發現採用保邊濾波的思路可以做到降噪,進而衍生出來針對皮膚的降噪,簡稱磨皮或者美顏。

    從此百家爭鳴,而這個課題到今天也還在發展,當然日新月異了。

    故此,想談談針對美顏磨皮的一些算法思路,為後續想學習並改進的萌新提供一些養分。

    概述美顏磨皮方法

    1.基於保邊降噪

    這類算法有很多方法,但不外乎2種基礎思路,

    基於空間和基於頻率,當然再展開的話,還可以細分為紋理和顏色。

    例如通過膚色或紋理區域做針對性的處理。

    這類算法的優點是計算簡單,通用型強,但缺點就是不夠細膩完美。

    2.基於人臉檢測貼圖

    這種嚴格意義上來說,是易容術,就是基於人臉檢測出的關鍵數據。

    例如人臉關鍵點,將人臉皮膚區域提取出來,重新貼上一張事先準備的皮膚圖,進行皮膚貼合融合。

    臉已經被置換了,效果很贊。有點繆修斯之船的味道。

    這類算法優點是效果極其驚艷,但是算法複雜通用性差,一般只能針對少數角度表情的人臉。

    3.結合1和2的深度學習方法

    前兩者的思路早期大行其道,如今到了數據時代,

    基於深度學習的工具方案,可以非常好地結合前兩者的思路,進行訓練,求一個數據解。

    很多人將深度學習等同於AI,這個做法有點激進。

    基於深度學習的做法,仍然存在前兩者一樣的問題,簡單的不夠細膩,細膩的不夠簡單,

    而如果要設計一個優秀的模型,其實跟設計一個傳統算法一樣困難。

    基於數據驅動的算法,驗證成本非常高,可控性比較差,當然在金錢的驅動下確實能產出還不錯的算法模型。

    這類算法的優點,往往能求出很不錯的局部最優解,甚至以假亂真,缺點就是需要大量金錢和數據的驅動。

    總結來說的話,不付出代價,就別想有好的結果,非常的現實。

     

    據我所知目前使用最多的方案是第一種和第三種,第二種可操作性不強,只有少數公司掌握了這方面的核心技術。

    但是不管是哪種方案,無非就是以下幾個步驟。

    1.確定人臉的皮膚區域

    2.定位人臉的雜質(痘痘,斑點,痣,膚色不均等)

    3.根據定位到雜質進行填補修復或濾除

     

    這就是圖像處理經典三部曲

    1.定位 2.檢測 3.處理

    每一個細分展開,都非常宏大且複雜的算法。

     

    以上,僅以磨皮美顏為例子,闡述圖像方面的算法想要或正在解決什麼樣的問題。

    我們在工作中碰到的圖像問題無非以上幾個核心問題,問題都是類似的,只是不同場景和需求下各有難處。

    本次開源的算法思路

    本次開源的算法是基於保邊降噪的思路,

    當然這個思路可以通過改寫,參數化后可以集成到深度學習中,作為一個先驗層輔助訓練。

    算法步驟如下:

    1.  檢測皮膚顏色,確定皮膚占圖像的比率

    2. 根據皮膚比率進行邊緣檢測,產出細節映射圖

    3. 基於細節映射圖和磨皮強度進行保邊降噪

    4. 對降噪好的圖進行再一次膚色檢測,保留膚色區域的降噪,其他區域還原為原圖

    步驟比較簡單,但是要同時兼顧效果性能,是很不容易的。

    當然這個算法膚色檢測那一部分可以採用深度學習“語義分割”方面的思路進而改進效果。

    做得好,將本算法改良到准商用,驚艷的程度是沒有問題的。

    深度學習相關技術就不展開細說了,有能力的朋友,感興趣的話,可以自行實操。

     

    完整源代碼開源地址:

    https://github.com/cpuimage/skin_smoothing

    項目沒有第三方依賴,完整純c代碼。

    有編譯問題的同學自行參考《Windows下C,C++開發環境搭建指南》搭建編譯環境。

    附上算法效果的示例:

     

     

    以上,權當拋磚引玉之用。

    授人以魚不如授人以漁。

     

    2020年,疫情之下,

    願大家都能事業有成,身體健康。

    世界和平,人們皆友愛。

     

    若有其他相關問題或者需求也可以郵件聯繫俺探討。

    郵箱地址是: gaozhihan@vip.qq.com

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※想知道最厲害的網頁設計公司"嚨底家"!

    ※幫你省時又省力,新北清潔一流服務好口碑

    ※別再煩惱如何寫文案,掌握八大原則!

    ※產品缺大量曝光嗎?你需要的是一流包裝設計!

  • Kafka源碼解析(二)—Log分析

    Kafka源碼解析(二)—Log分析

    上一篇文章講了LogSegment和Log的初始化,這篇來講講Log的主要操作有哪些。

    一般來說Log 的常見操作分為 4 大部分。

    1. 高水位管理操作
    2. 日誌段管理
    3. 關鍵位移值管理
    4. 讀寫操作

    其中關鍵位移值管理主要包含Log Start Offset 和 LEO等。

    高水位HighWatermark

    高水位HighWatermark初始化

    高水位是通過LogOffsetMetadata類來定義的:

    @volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
    

    這裏傳入的初始值是logStartOffset,表明當首次構建高水位時,它會被賦值成 Log Start Offset 值。

    我們再來看看LogOffsetMetadata類:

    case class LogOffsetMetadata(messageOffset: Long,
                                 segmentBaseOffset: Long = Log.UnknownOffset,
                                 relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {
    
      // check if this offset is already on an older segment compared with the given offset
      def onOlderSegment(that: LogOffsetMetadata): Boolean = {
        if (messageOffsetOnly)
          throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
    
        this.segmentBaseOffset < that.segmentBaseOffset
      }
      ...
    }
    

    LogOffsetMetadata有三個初始值:

    messageOffset表示消息位移值;

    segmentBaseOffset保存消息位移值所在日誌段的起始位移,用來判斷兩條消息是否處於同一個日誌段的;

    relativePositionSegment保存消息位移值所在日誌段的物理磁盤位置;

    上面的onOlderSegment表明,要比較哪個日誌段更老,只需要比較segmentBaseOffset的大小就可以了。

    高水位HighWatermark設值與更新

      private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
        //高水位的值不可能小於零
        if (newHighWatermark.messageOffset < 0)
          throw new IllegalArgumentException("High watermark offset should be non-negative")
    
        lock synchronized {// 保護Log對象修改的Monitor鎖
          highWatermarkMetadata = newHighWatermark// 賦值新的高水位值
          //事務相關,暫時忽略
          producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
          //事務相關,暫時忽略
          maybeIncrementFirstUnstableOffset()
        }
        trace(s"Setting high watermark $newHighWatermark")
      }
    

    設置高水位的值是很簡單的,首先校驗高水位的值是否大於零,然後通過直接加鎖之後更新高水位的值。

    更新更新高水位值的方法有兩個:updateHighWatermark 和 maybeIncrementHighWatermark,我們分別分析。

    updateHighWatermark

      def updateHighWatermark(hw: Long): Long = {
        //傳入的高水位的值如果小於logStartOffset,設置為logStartOffset
        val newHighWatermark = if (hw < logStartOffset)
          logStartOffset
        //  傳入的高水位的值如果大於LEO,那麼設置為LEO
        else if (hw > logEndOffset)
          logEndOffset
        else
          hw
        //將newHighWatermark封裝成一個LogOffsetMetadata然後更新高水位的值
        updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
        //返回新的高水位的值
        newHighWatermark
      }
    

    這個方法邏輯也很簡潔,因為高水位的值是不可能大於LEO,也不可能小於logStartOffset,所以需要對傳入的hw校驗然後設置成正確的值,然後調用上面的設置高水位的方法設值。

    maybeIncrementHighWatermark

    /**
     * Update the high watermark to a new value if and only if it is larger than the old value. It is
     * an error to update to a value which is larger than the log end offset.
     *
     * This method is intended to be used by the leader to update the high watermark after follower
     * fetch offsets have been updated.
     *
     * @return the old high watermark, if updated by the new value
     */
    //  當新的高水位的值大於舊的高水位的值時才做更新,如果新的高水位的值大於LEO,會報錯
    //  這個方法是leader在確認Follower已經拉取了日誌之後才做更新
    def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
      //如果新的高水位的值大於LEO,會報錯
      if (newHighWatermark.messageOffset > logEndOffset)
        throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
          s"log end offset $logEndOffsetMetadata")
    
      lock.synchronized {
        // 獲取老的高水位值
        val oldHighWatermark = fetchHighWatermarkMetadata
    
        // Ensure that the high watermark increases monotonically. We also update the high watermark when the new
        // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
        //只有當新的高水位值大於老的值,因為要維護高水位的單調遞增性
        //或者當新的高水位值和老的高水位值相等,但是新的高水位在一個新的日誌段上面時才做更新
        if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
          (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
          updateHighWatermarkMetadata(newHighWatermark)
          Some(oldHighWatermark)// 返回老的高水位值
        } else {
          None
        }
      }
    }
    
    

    這個方法我將這個方法的英文註釋貼出來了,這個註釋的說明我也寫到方法上了,邏輯很清楚,大家看看註釋應該能理解。

    這兩個方法主要的區別是,updateHighWatermark 方法,主要用在 Follower 副本從 Leader 副本獲取到消息后更新高水位值。而 maybeIncrementHighWatermark 方法,主要是用來更新 Leader 副本的高水位值。

    上面的方法中通過調用fetchHighWatermarkMetadata來獲取高水位的值,我們下面看看這個方法:

    fetchHighWatermarkMetadata

      private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
        // 讀取時確保日誌不能被關閉
        checkIfMemoryMappedBufferClosed()
    
        val offsetMetadata = highWatermarkMetadata
        if (offsetMetadata.messageOffsetOnly) {//沒有獲得到完整的高水位元數據
          lock.synchronized {
            // 通過讀日誌文件的方式把完整的高水位元數據信息拉出來
            val fullOffset = convertToOffsetMetadataOrThrow(highWatermark)
            updateHighWatermarkMetadata(fullOffset)
            fullOffset
          }
        } else {
          offsetMetadata
        }
      }
    
      private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
        //通過給的offset,去日誌文件中找到相應的日誌信息
        val fetchDataInfo = read(offset,
          maxLength = 1,
          isolation = FetchLogEnd,
          minOneMessage = false)
        fetchDataInfo.fetchOffsetMetadata
      }
    

    然後我們提前看一下日誌的read方法,是如何根據索引讀取數據的:

    日誌段操作

    日誌讀取操作

    read

      def read(startOffset: Long,
               maxLength: Int,
               isolation: FetchIsolation,
               minOneMessage: Boolean): FetchDataInfo = {
        maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
          trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes")
    
          //convertToOffsetMetadataOrThrow傳進來是FetchLogEnd,所以這裡是false
          val includeAbortedTxns = isolation == FetchTxnCommitted
     
          // 由於沒有使用鎖,所以使用變量緩存當前的nextOffsetMetadata狀態
          val endOffsetMetadata = nextOffsetMetadata
          val endOffset = endOffsetMetadata.messageOffset
          // 到日字段中根據索引尋找最近的日誌段
          var segmentEntry = segments.floorEntry(startOffset)
    
          // return error on attempt to read beyond the log end offset or read below log start offset
          // 這裏給出了幾種異常場景:
          // 1. 給的日誌索引大於最大值;
          // 2. 通過索引找的日誌段為空;
          // 3. 給的日誌索引小於logStartOffset
          if (startOffset > endOffset || segmentEntry == null || startOffset < logStartOffset)
            throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
              s"but we only have log segments in the range $logStartOffset to $endOffset.")
    
          //convertToOffsetMetadataOrThrow傳進來是FetchLogEnd,所以最大值是endOffsetMetadata
          // 查看一下讀取隔離級別設置。
          // 普通消費者能夠看到[Log Start Offset, LEO)之間的消息
          // 事務型消費者只能看到[Log Start Offset, Log Stable Offset]之間的消息。Log Stable Offset(LSO)是比LEO值小的位移值,為Kafka事務使用
          // Follower副本消費者能夠看到[Log Start Offset,高水位值]之間的消息
          val maxOffsetMetadata = isolation match {
            case FetchLogEnd => endOffsetMetadata
            case FetchHighWatermark => fetchHighWatermarkMetadata
            case FetchTxnCommitted => fetchLastStableOffsetMetadata
          }
          //如果尋找的索引等於maxOffsetMetadata,那麼直接返回
          if (startOffset == maxOffsetMetadata.messageOffset) {
            return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
          //如果尋找的索引大於maxOffsetMetadata,返回空的消息集合,因為沒法讀取任何消息
          } else if (startOffset > maxOffsetMetadata.messageOffset) {
            val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
            return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
          }
     
          // 開始遍曆日志段對象,直到讀出東西來或者讀到日誌末尾
          while (segmentEntry != null) {
            val segment = segmentEntry.getValue
            // 找到日誌段中最大的日誌位移
            val maxPosition = { 
              if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
                maxOffsetMetadata.relativePositionInSegment
              } else {
                segment.size
              }
            }
            // 根據位移信息從日誌段中讀取日誌信息
            val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
            // 如果找不到日誌信息,那麼去日誌段集合中找更大的日誌位移的日誌段
            if (fetchInfo == null) {
              segmentEntry = segments.higherEntry(segmentEntry.getKey)
            } else {
              return if (includeAbortedTxns)
                addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
              else
                fetchInfo
            }
          }
    
          //找了所有日誌段的位移依然找不到,這可能是因為大於指定的日誌位移的消息都被刪除了,這種情況返回空
          FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
        }
      }
    

    read方法,有四個參數,分別是:

    • startOffset:讀取的日誌索引位置。
    • maxLength:讀取數據量長度。
    • isolation:隔離級別,多用於 Kafka 事務。
    • minOneMessage:是否至少返回一條消息。設想如果消息很大,超過了 maxLength,正常情況下 read 方法永遠不會返回任何消息。但如果設置了該參數為 true,read 方法就保證至少能夠返回一條消息。

    代碼中使用了segments,來根據位移查找日誌段:

      private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
    

    我們下面看看read方法具體做了哪些事:

    1. 由於沒有使用鎖,所以使用變量緩存當前的nextOffsetMetadata狀態,作為最大索引LEO;
    2. 去日誌段集合里尋找小於或等於指定索引的日誌段;
    3. 校驗異常情況:
      1. startOffset是不是超過了LEO;
      2. 是不是日誌段集合里沒有索引小於startOffset;
      3. startOffset小於Log Start Offset;
    4. 接下來獲取一下隔離級別;
    5. 如果尋找的索引等於LEO,那麼返回空;
    6. 如果尋找的索引大於LEO,返回空的消息集合,因為沒法讀取任何消息;
    7. 開始遍曆日志段對象,直到讀出東西來或者讀到日誌末尾;
      1. 首先找到日誌段中最大的位置;
      2. 根據位移信息從日誌段中讀取日誌信息(這個read方法我們上一篇已經講解過了);
      3. 如果找不到日誌信息,那麼讀取日誌段集合中下一個日誌段;
    8. 找了所有日誌段的位移依然找不到,這可能是因為大於指定的日誌位移的消息都被刪除了,這種情況返回空;

    我們在上面的read操作中可以看到,使用了segments來查找日誌。我們主要看看刪除操作

    刪除日誌

    刪除日誌的入口是:deleteOldSegments

      //  如果topic deletion開關是打開的,那麼會刪去過期的日誌段以及超過設置保留日誌大小的日誌
      // 無論是否開啟刪除規則,都會刪除在log start offset之前的日誌段
      def deleteOldSegments(): Int = {
        if (config.delete) {
          deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
        } else {
          deleteLogStartOffsetBreachedSegments()
        }
      }
    

    deleteOldSegments方法會判斷是否開啟刪除規則,如果開啟,那麼會分別調用:

    deleteRetentionMsBreachedSegments刪除segment的時間戳超過了設置時間的日誌段;

    deleteRetentionSizeBreachedSegments刪除日誌段空間超過設置空間大小的日誌段;

    deleteLogStartOffsetBreachedSegments刪除日誌段的baseOffset小於logStartOffset的日誌段;

    我這裏列舉一下這三個方法主要是怎麼實現的:

      private def deleteRetentionMsBreachedSegments(): Int = {
        if (config.retentionMs < 0) return 0
        val startMs = time.milliseconds
        //調用deleteOldSegments方法,並傳入匿名函數,判斷當前的segment的時間戳是否超過了設置時間
        deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs,
          reason = s"retention time ${config.retentionMs}ms breach")
      }
      
      private def deleteRetentionSizeBreachedSegments(): Int = {
        if (config.retentionSize < 0 || size < config.retentionSize) return 0
        var diff = size - config.retentionSize
        //判斷日誌段空間是否超過設置空間大小
        //shouldDelete函數會將傳入的日誌段去減diff,直到小於等於零
        def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = {
          if (diff - segment.size >= 0) {
            diff -= segment.size
            true
          } else {
            false
          }
        }
    
        deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach")
      }
      
      private def deleteLogStartOffsetBreachedSegments(): Int = {
        //shouldDelete函數主要判斷日誌段的baseOffset是否小於logStartOffset
        def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) =
          nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
    
        deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach")
      }
    

    這種寫代碼的方式非常的靈活,通過不同方法設置不同的函數來實現代碼復用的目的,最後都是通過調用deleteOldSegments來實現刪除日誌段的目的。

    下面我們來看一下deleteOldSegments的操作:

    deleteOldSegments

    這個deleteOldSegments方法和上面的入口方法傳入的參數是不一致的,這個方法傳入了一個predicate函數,用於判斷哪些日誌段是可以被刪除的,reason用來說明被刪除的原因。

      private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
        //刪除任何匹配到predicate規則的日誌段
        lock synchronized {
          val deletable = deletableSegments(predicate)
          if (deletable.nonEmpty)
            info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
          deleteSegments(deletable)
        }
      }
    

    這個方法調用了兩個主要的方法,一個是deletableSegments,用於獲取可以被刪除的日誌段的集合;deleteSegments用於刪除日誌段。

    deletableSegments

      private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
        //如果日誌段是空的,那麼直接返回
        if (segments.isEmpty) {
          Seq.empty
        } else {
          val deletable = ArrayBuffer.empty[LogSegment]
          var segmentEntry = segments.firstEntry
          //如果日誌段集合不為空,找到第一個日誌段
          while (segmentEntry != null) {
            val segment = segmentEntry.getValue
            //獲取下一個日誌段
            val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
            val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry != null)
              (nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false)
            else
              (null, logEndOffset, segment.size == 0)
            //如果下一個日誌段的位移沒有大於或等於HW,並且日誌段是匹配predicate函數的,下一個日誌段也不是空的
            //那麼將這個日誌段放入可刪除集合中,然後遍歷下一個日誌段
            if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) {
              deletable += segment
              segmentEntry = nextSegmentEntry
            } else {
              segmentEntry = null
            }
          }
          deletable
        }
      }
    

    這個方法邏輯十分清晰,主要做了如下幾件事:

    1. 判斷日誌段集合是否為空,為空那麼直接返回空集合;

    2. 如果日誌段集合不為空,那麼從日誌段集合的第一個日誌段開始遍歷;

    3. 判斷當前被遍曆日志段是否能夠被刪除

      1. 日誌段的下一個日誌段的位移有沒有大於或等於HW;
      2. 日誌段是否能夠通過predicate函數校驗;
      3. 日誌段是否是最後一個日誌段;
    4. 將符合條件的日誌段都加入到deletable集合中,並返回。

    接下來調用deleteSegments函數:

      private def deleteSegments(deletable: Iterable[LogSegment]): Int = {
        maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
          val numToDelete = deletable.size
          if (numToDelete > 0) {
            // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
            // 我們至少保證要存在一個日誌段,如果要刪除所有的日誌;
            //所以調用roll方法創建一個全新的日誌段對象,並且關閉當前寫入的日誌段對象;
            if (segments.size == numToDelete)
              roll()
            lock synchronized {
              // 確保Log對象沒有被關閉
              checkIfMemoryMappedBufferClosed()
              // remove the segments for lookups
              // 刪除給定的日誌段對象以及底層的物理文件
              removeAndDeleteSegments(deletable, asyncDelete = true)
              // 嘗試更新日誌的Log Start Offset值
              maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
            }
          }
          numToDelete
        }
      }
    

    寫日誌

    寫日誌的方法主要有兩個:

    appendAsLeader

      def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true,
                         interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
        append(records, isFromClient, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch)
      }
    

    appendAsFollower

      def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
        append(records, isFromClient = false, interBrokerProtocolVersion = ApiVersion.latestVersion, assignOffsets = false, leaderEpoch = -1)
      }
    

    appendAsLeader 是用於寫 Leader 副本的,appendAsFollower 是用於 Follower 副本同步的。它們的底層都調用了 append 方法

    append

      private def append(records: MemoryRecords, isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
        maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
          // 第1步:分析和驗證待寫入消息集合,並返回校驗結果
          val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)
    
          // return if we have no valid messages or if this is a duplicate of the last appended entry
          // 如果壓根就不需要寫入任何消息,直接返回即可
          if (appendInfo.shallowCount == 0)
            return appendInfo
    
          // trim any invalid bytes or partial messages before appending it to the on-disk log
          // 第2步:消息格式規整,即刪除無效格式消息或無效字節
          var validRecords = trimInvalidBytes(records, appendInfo)
    
          // they are valid, insert them in the log
          lock synchronized {
            // 確保Log對象未關閉
            checkIfMemoryMappedBufferClosed()
            //需要分配位移值
            if (assignOffsets) {
              // assign offsets to the message set
              // 第3步:使用當前LEO值作為待寫入消息集合中第一條消息的位移值,nextOffsetMetadata為LEO值
              val offset = new LongRef(nextOffsetMetadata.messageOffset)
              appendInfo.firstOffset = Some(offset.value)
              val now = time.milliseconds
              val validateAndOffsetAssignResult = try {
                LogValidator.validateMessagesAndAssignOffsets(validRecords,
                  topicPartition,
                  offset,
                  time,
                  now,
                  appendInfo.sourceCodec,
                  appendInfo.targetCodec,
                  config.compact,
                  config.messageFormatVersion.recordVersion.value,
                  config.messageTimestampType,
                  config.messageTimestampDifferenceMaxMs,
                  leaderEpoch,
                  isFromClient,
                  interBrokerProtocolVersion,
                  brokerTopicStats)
              } catch {
                case e: IOException =>
                  throw new KafkaException(s"Error validating messages while appending to log $name", e)
              }
              // 更新校驗結果對象類LogAppendInfo
              validRecords = validateAndOffsetAssignResult.validatedRecords
              appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
              appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
              appendInfo.lastOffset = offset.value - 1
              appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
              if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
                appendInfo.logAppendTime = now
    
              // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
              // format conversion)
              // 第4步:驗證消息,確保消息大小不超限
              if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
                for (batch <- validRecords.batches.asScala) {
                  if (batch.sizeInBytes > config.maxMessageSize) {
                    // we record the original message set size instead of the trimmed size
                    // to be consistent with pre-compression bytesRejectedRate recording
                    brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
                    brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
                    throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
                      s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
                  }
                }
              }
              // 直接使用給定的位移值,無需自己分配位移值
            } else {
              // we are taking the offsets we are given
              if (!appendInfo.offsetsMonotonic)// 確保消息位移值的單調遞增性
                throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
                                                     records.records.asScala.map(_.offset))
    
              if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
                // we may still be able to recover if the log is empty
                // one example: fetching from log start offset on the leader which is not batch aligned,
                // which may happen as a result of AdminClient#deleteRecords()
                val firstOffset = appendInfo.firstOffset match {
                  case Some(offset) => offset
                  case None => records.batches.asScala.head.baseOffset()
                }
    
                val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"
                throw new UnexpectedAppendOffsetException(
                  s"Unexpected offset in append to $topicPartition. $firstOrLast " +
                  s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
                  s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
                  s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
                  firstOffset, appendInfo.lastOffset)
              }
            }
    
            // update the epoch cache with the epoch stamped onto the message by the leader
            // 第5步:更新Leader Epoch緩存
            validRecords.batches.asScala.foreach { batch =>
              if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
                maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
              } else {
                // In partial upgrade scenarios, we may get a temporary regression to the message format. In
                // order to ensure the safety of leader election, we clear the epoch cache so that we revert
                // to truncation by high watermark after the next leader election.
                leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
                  warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
                  cache.clearAndFlush()
                }
              }
            }
    
            // check messages set size may be exceed config.segmentSize
            // 第6步:確保消息大小不超限
            if (validRecords.sizeInBytes > config.segmentSize) {
              throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
                s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
            }
    
            // maybe roll the log if this segment is full
            // 第7步:執行日誌切分。當前日誌段剩餘容量可能無法容納新消息集合,因此有必要創建一個新的日誌段來保存待寫入的所有消息
            //下面情況將會執行日誌切分:
            //logSegment 已經滿了
            //日誌段中的第一個消息的maxTime已經過期
            //index索引滿了
            val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
    
            val logOffsetMetadata = LogOffsetMetadata(
              messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
              segmentBaseOffset = segment.baseOffset,
              relativePositionInSegment = segment.size)
    
            // now that we have valid records, offsets assigned, and timestamps updated, we need to
            // validate the idempotent/transactional state of the producers and collect some metadata
            // 第8步:驗證事務狀態
            val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
              logOffsetMetadata, validRecords, isFromClient)
    
            maybeDuplicate.foreach { duplicate =>
              appendInfo.firstOffset = Some(duplicate.firstOffset)
              appendInfo.lastOffset = duplicate.lastOffset
              appendInfo.logAppendTime = duplicate.timestamp
              appendInfo.logStartOffset = logStartOffset
              return appendInfo
            }
            // 第9步:執行真正的消息寫入操作,主要調用日誌段對象的append方法實現
            segment.append(largestOffset = appendInfo.lastOffset,
              largestTimestamp = appendInfo.maxTimestamp,
              shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
              records = validRecords)
    
            // Increment the log end offset. We do this immediately after the append because a
            // write to the transaction index below may fail and we want to ensure that the offsets
            // of future appends still grow monotonically. The resulting transaction index inconsistency
            // will be cleaned up after the log directory is recovered. Note that the end offset of the
            // ProducerStateManager will not be updated and the last stable offset will not advance
            // if the append to the transaction index fails.
            // 第10步:更新LEO對象,其中,LEO值是消息集合中最後一條消息位移值+1
            // 前面說過,LEO值永遠指向下一條不存在的消息
            updateLogEndOffset(appendInfo.lastOffset + 1)
    
            // update the producer state
            // 第11步:更新事務狀態
            for (producerAppendInfo <- updatedProducers.values) {
              producerStateManager.update(producerAppendInfo)
            }
    
            // update the transaction index with the true last stable offset. The last offset visible
            // to consumers using READ_COMMITTED will be limited by this value and the high watermark.
            for (completedTxn <- completedTxns) {
              val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
              segment.updateTxnIndex(completedTxn, lastStableOffset)
              producerStateManager.completeTxn(completedTxn)
            }
    
            // always update the last producer id map offset so that the snapshot reflects the current offset
            // even if there isn't any idempotent data being written
            producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
    
            // update the first unstable offset (which is used to compute LSO)
            maybeIncrementFirstUnstableOffset()
    
            trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
              s"first offset: ${appendInfo.firstOffset}, " +
              s"next offset: ${nextOffsetMetadata.messageOffset}, " +
              s"and messages: $validRecords")
            // 是否需要手動落盤。一般情況下我們不需要設置Broker端參數log.flush.interval.messages
            // 落盤操作交由操作系統來完成。但某些情況下,可以設置該參數來確保高可靠性
            if (unflushedMessages >= config.flushInterval)
              flush()
            // 第12步:返回寫入結果
            appendInfo
          }
        }
      }
    

    上面代碼的主要步驟如下:

    我們下面看看analyzeAndValidateRecords是如何進行消息校驗的:

    analyzeAndValidateRecords

      private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = {
        var shallowMessageCount = 0
        var validBytesCount = 0
        var firstOffset: Option[Long] = None
        var lastOffset = -1L
        var sourceCodec: CompressionCodec = NoCompressionCodec
        var monotonic = true
        var maxTimestamp = RecordBatch.NO_TIMESTAMP
        var offsetOfMaxTimestamp = -1L
        var readFirstMessage = false
        var lastOffsetOfFirstBatch = -1L
    
        for (batch <- records.batches.asScala) {
          // we only validate V2 and higher to avoid potential compatibility issues with older clients
          // 消息格式Version 2的消息批次,起始位移值必須從0開始
          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0)
            throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " +
              s"be 0, but it is ${batch.baseOffset}")
    
          // update the first offset if on the first message. For magic versions older than 2, we use the last offset
          // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
          // For magic version 2, we can get the first offset directly from the batch header.
          // When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower
          // case, validation will be more lenient.
          // Also indicate whether we have the accurate first offset or not
          if (!readFirstMessage) {
            if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
              firstOffset = Some(batch.baseOffset) // 更新firstOffset字段
            lastOffsetOfFirstBatch = batch.lastOffset // 更新lastOffsetOfFirstBatch字段
            readFirstMessage = true
          }
    
          // check that offsets are monotonically increasing
          // 一旦出現當前lastOffset不小於下一個batch的lastOffset,說明上一個batch中有消息的位移值大於後面batch的消息
          // 這違反了位移值單調遞增性
          if (lastOffset >= batch.lastOffset)
            monotonic = false
    
          // update the last offset seen
          // 使用當前batch最後一條消息的位移值去更新lastOffset
          lastOffset = batch.lastOffset
    
          // Check if the message sizes are valid.
          val batchSize = batch.sizeInBytes
          // 檢查消息批次總字節數大小是否超限,即是否大於Broker端參數max.message.bytes值
          if (batchSize > config.maxMessageSize) {
            brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
            brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
            throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " +
              s"which exceeds the maximum configured value of ${config.maxMessageSize}.")
          }
    
          // check the validity of the message by checking CRC
          // 執行消息批次校驗,包括格式是否正確以及CRC校驗
          if (!batch.isValid) {
            brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark()
            throw new CorruptRecordException(s"Record is corrupt (stored crc = ${batch.checksum()}) in topic partition $topicPartition.")
          }
          // 更新maxTimestamp字段和offsetOfMaxTimestamp
          if (batch.maxTimestamp > maxTimestamp) {
            maxTimestamp = batch.maxTimestamp
            offsetOfMaxTimestamp = lastOffset
          }
          // 累加消息批次計數器以及有效字節數,更新shallowMessageCount字段
          shallowMessageCount += 1
          validBytesCount += batchSize
          // 從消息批次中獲取壓縮器類型
          val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)
          if (messageCodec != NoCompressionCodec)
            sourceCodec = messageCodec
        }
    
        // Apply broker-side compression if any
        // 獲取Broker端設置的壓縮器類型,即Broker端參數compression.type值。
        // 該參數默認值是producer,表示sourceCodec用的什麼壓縮器,targetCodec就用什麼
        val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
        // 最後生成LogAppendInfo對象並返回
        LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset,
          RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch)
      }
    

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    新北清潔公司,居家、辦公、裝潢細清專業服務

    ※別再煩惱如何寫文案,掌握八大原則!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

    FB行銷專家,教你從零開始的技巧