標籤: USB CONNECTOR

  • 面試必問:分佈式鎖實現之zk(Zookeeper)

    面試必問:分佈式鎖實現之zk(Zookeeper)

    點贊再看,養成習慣,微信搜索【三太子敖丙】關注這個互聯網苟且偷生的工具人。

    本文 GitHub https://github.com/JavaFamily 已收錄,有一線大廠面試完整考點、資料以及我的系列文章。

    前言

    鎖我想不需要我過多的去說,大家都知道是怎麼一回事了吧?

    在多線程環境下,由於上下文的切換,數據可能出現不一致的情況或者數據被污染,我們需要保證數據安全,所以想到了加鎖。

    所謂的加鎖機制呢,就是當一個線程訪問該類的某個數據時,進行保護,其他線程不能進行訪問,直到該線程讀取完,其他線程才可使用。

    還記得我之前說過Redis在分佈式的情況下,需要對存在併發競爭的數據進行加鎖,老公們十分費解,Redis是單線程的嘛?為啥還要加鎖呢?

    看來老公們還是年輕啊,你說的不需要加鎖的情況是這樣的:

    單個服務去訪問Redis的時候,確實因為Redis本身單線程的原因是不用考慮線程安全的,但是,現在有哪個公司還是單機的呀?肯定都是分佈式集群了嘛。

    老公們你看下這樣的場景是不是就有問題了:

    你們經常不是說秒殺嘛,拿到庫存判斷,那老婆告訴你分佈式情況就是會出問題的。

    我們為了減少DB的壓力,把庫存預熱到了KV,現在KV的庫存是1。

    1. 服務A去Redis查詢到庫存發現是1,那說明我能搶到這個商品對不對,那我就準備減一了,但是還沒減。
    2. 同時服務B也去拿發現也是1,那我也搶到了呀,那我也減。
    3. C同理。
    4. 等所有的服務都判斷完了,你發現誒,怎麼變成-2了,超賣了呀,這下完了。

    老公們是不是發現問題了,這就需要分佈式鎖的介入了,我會分三個章節去分別介紹分佈式鎖的三種實現方式(Zookeeper,Redis,MySQL),說出他們的優缺點,以及一般大廠的實踐場景。

    正文

    一個騷里騷氣的面試官啥也沒拿的就走了進來,你一看,這不是你老婆嘛,你正準備叫他的時候,發現他一臉嚴肅,死鬼還裝嚴肅,肯定會給我放水的吧。

    B站搜:三太子敖丙

    咳咳,我們啥也不說了,開始今天的面試吧。

    正常線程進程同步的機制有哪些?

    • 互斥:互斥的機制,保證同一時間只有一個線程可以操作共享資源 synchronized,Lock等。
    • 臨界值:讓多線程串行話去訪問資源
    • 事件通知:通過事件的通知去保證大家都有序訪問共享資源
    • 信號量:多個任務同時訪問,同時限制數量,比如發令槍CDL,Semaphore等

    那分佈式鎖你了解過有哪些么?

    分佈式鎖實現主要以Zookeeper(以下簡稱zk)、Redis、MySQL這三種為主。

    那先跟我聊一下zk吧,你能說一下他常見的使用場景么?

    他主要的應用場景有以下幾個:

    • 服務註冊與訂閱(共用節點)
    • 分佈式通知(監聽znode)
    • 服務命名(znode特性)
    • 數據訂閱、發布(watcher)
    • 分佈式鎖(臨時節點)

    zk是啥?

    他是個數據庫,文件存儲系統,並且有監聽通知機制(觀察者模式)

    存文件系統,他存了什麼?

    節點

    zk的節點類型有4大類

    • 持久化節點(zk斷開節點還在)

    • 持久化順序編號目錄節點

    • 臨時目錄節點(客戶端斷開後節點就刪除了)

    • 臨時目錄編號目錄節點

    節點名稱都是唯一的。

    節點怎麼創建?

    我特么,這樣問的么?可是我面試只看了分佈式鎖,我得好好想想!!!

    還好我之前在自己的服務器搭建了一個zk的集群,我剛好跟大家回憶一波。

    create /test laogong // 創建永久節點 

    那臨時節點呢?

    create -e /test laogong // 創建臨時節點

    臨時節點就創建成功了,如果我斷開這次鏈接,這個節點自然就消失了,這是我的一個zk管理工具,目錄可能清晰點。

    如何創建順序節點呢?

    create -s /test // 創建順序節點

    臨時順序節點呢?

    我想聰明的老公都會搶答了

    create -e -s /test  // 創建臨時順序節點

    我退出后,重新連接,發現剛才創建的所有臨時節點都沒了。

    開篇演示這麼多呢,我就是想給大家看到的zk大概的一個操作流程和數據結構,中間涉及的搭建以及其他的技能我就不說了,我們重點聊一下他在分佈式鎖中的實現。

    zk就是基於節點去實現各種分佈式鎖的。

    就拿開頭的場景來說,zk應該怎麼去保證分佈式情況下的線程安全呢?併發競爭他是怎麼控制的呢?

    為了模擬併發競爭這樣一個情況,我寫了點偽代碼,大家可以先看看

    我定義了一個庫存inventory值為1,還用到了一個CountDownLatch發令槍,等10個線程都就緒了一起去扣減庫存。

    是不是就像10台機器一起去拿到庫存,然後扣減庫存了?

    所有機器一起去拿,發現都是1,那大家都認為是自己搶到了,都做了減一的操作,但是等所有人都執行完,再去set值的時候,發現其實已經超賣了,我打印出來給大家看看。

    是吧,這還不是超賣一個兩個的問題,超賣7個都有,代碼裏面明明判斷了庫存大於0才去減的,怎麼回事開頭我說明了。

    那怎麼解決這個問題?

    sync,lock也只能保證你當前機器線程安全,這樣分佈式訪問還是有問題。

    上面跟大家提到的zk的節點就可以解決這個問題。

    zk節點有個唯一的特性,就是我們創建過這個節點了,你再創建zk是會報錯的,那我們就利用一下他的唯一性去實現一下。

    怎麼實現呢?

    上面不是10個線程嘛?

    我們全部去創建,創建成功的第一個返回true他就可以繼續下面的扣減庫存操作,後續的節點訪問就會全部報錯,扣減失敗,我們把它們丟一個隊列去排隊。

    那怎麼釋放鎖呢?

    刪除節點咯,刪了再通知其他的人過來加鎖,依次類推。

    我們實現一下,zk加鎖的場景。

    是不是,只有第一個線程能扣減成功,其他的都失敗了。

    但是你發現問題沒有,你加了鎖了,你得釋放啊,你不釋放後面的報錯了就不重試了。

    那簡單,刪除鎖就釋放掉了,Lock在finally裏面unLock,現在我們在finally刪除節點。

    加鎖我們知道創建節點就夠了,但是你得實現一個阻塞的效果呀,那咋搞?

    死循環,遞歸不斷去嘗試,直到成功,一個偽裝的阻塞效果。

    怎麼知道前面的老哥刪除節點了嗯?

    監聽節點的刪除事件

    但是你發現你這樣做的問題沒?

    是的,會出現死鎖。

    第一個仔加鎖成功了,在執行代碼的時候,機器宕機了,那節點是不是就不能刪除了?

    你要故作沉思,自問自答,時而看看遠方,時而看看面試官,假裝自己什麼都不知道。

    哦我想起來了,創建臨時節點就好了,客戶端連接一斷開,別的就可以監聽到節點的變化了。

    嗯還不錯,那你發現還有別的問題沒?

    好像這種監聽機制也不好。

    怎麼個不好呢?

    你們可以看到,監聽,是所有服務都去監聽一個節點的,節點的釋放也會通知所有的服務器,如果是900個服務器呢?

    這對服務器是很大的一個挑戰,一個釋放的消息,就好像一個牧羊犬進入了羊群,大家都四散而開,隨時可能幹掉機器,會佔用服務資源,網絡帶寬等等。

    這就是羊群效應。

    那怎麼解決這個問題?

    繼續故作沉思,啊啊啊,好難,我的腦袋。。。。

    你TM別裝了好不好?

    好的,臨時順序節點,可以順利解決這個問題。

    怎麼實現老公你先別往下看,先自己想想。

    之前說了全部監聽一個節點問題很大,那我們就監聽我們的前一個節點,因為是順序的,很容易找到自己的前後。

    和之前監聽一個永久節點的區別就在於,這裏每個節點只監聽了自己的前一個節點,釋放當然也是一個個釋放下去,就不會出現羊群效應了。

    以上所有代碼我都會開源到我的https://github.com/AobingJava/Thanos其實上面的還有瑕疵,大家可以去拉下來改一下提交pr,我會看合適的會通過的。

    你說了這麼多,挺不錯的,你能說說ZK在分佈式鎖中實踐的一些缺點么?

    Zk性能上可能並沒有緩存服務那麼高。

    因為每次在創建鎖和釋放鎖的過程中,都要動態創建、銷毀瞬時節點來實現鎖功能。

    ZK中創建和刪除節點只能通過Leader服務器來執行,然後將數據同步到所有的Follower機器上。(這裏涉及zk集群的知識,我就不展開了,以後zk章節跟老公們細聊)

    還有么?

    使用Zookeeper也有可能帶來併發問題,只是並不常見而已。

    由於網絡抖動,客戶端可ZK集群的session連接斷了,那麼zk以為客戶端掛了,就會刪除臨時節點,這時候其他客戶端就可以獲取到分佈式鎖了。

    就可能產生併發問題了,這個問題不常見是因為zk有重試機制,一旦zk集群檢測不到客戶端的心跳,就會重試,Curator客戶端支持多種重試策略。

    多次重試之後還不行的話才會刪除臨時節點。

    Tip:所以,選擇一個合適的重試策略也比較重要,要在鎖的粒度和併發之間找一個平衡。

    有更好的實現么?

    基於Redis的分佈式鎖

    能跟我聊聊么?

    我看看了手上的表,老公,今天天色不早了,你全問完了,我怎麼多水幾篇文章呢?

    行確實很晚了,那你回家去把家務幹了吧?

    我????

    =

    總結

    zk通過臨時節點,解決掉了死鎖的問題,一旦客戶端獲取到鎖之後突然掛掉(Session連接斷開),那麼這個臨時節點就會自動刪除掉,其他客戶端自動獲取鎖。

    zk通過節點排隊監聽的機制,也實現了阻塞的原理,其實就是個遞歸在那無限等待最小節點釋放的過程。

    我上面沒實現鎖的可重入,但是也很好實現,可以帶上線程信息就可以了,或者機器信息這樣的唯一標識,獲取的時候判斷一下。

    zk的集群也是高可用的,只要半數以上的或者,就可以對外提供服務了。

    這周會寫完Redis和數據庫的分佈式鎖的,老公們等好。

    我是敖丙,一個在互聯網苟且偷生的工具人。

    最好的關係是互相成就老公們的「三連」就是丙丙創作的最大動力,我們下期見!

    注:如果本篇博客有任何錯誤和建議,歡迎老公們留言,老公你快說句話啊

    文章持續更新,可以微信搜索「 三太子敖丙 」第一時間閱讀,回復【資料】【面試】【簡歷】有我準備的一線大廠面試資料和簡歷模板,本文 GitHub https://github.com/JavaFamily 已經收錄,有大廠面試完整考點,歡迎Star。

    你知道的越多,你不知道的越多

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

    台北網頁設計公司這麼多該如何選擇?

    ※智慧手機時代的來臨,RWD網頁設計為架站首選

    ※評比南投搬家公司費用收費行情懶人包大公開

    ※回頭車貨運收費標準

  • 自己動手實現深度學習框架-8 RNN文本分類和文本生成模型

    自己動手實現深度學習框架-8 RNN文本分類和文本生成模型

    代碼倉庫: https://github.com/brandonlyg/cute-dl

    目標

            上階段cute-dl已經可以構建基礎的RNN模型。但對文本相模型的支持不夠友好, 這個階段的目標是, 讓框架能夠友好地支持文本分類和本文生成任務。具體包括:

    1. 添加嵌入層, 為文本尋找高效的向量表示。
    2. 添加類別抽樣函數, 根據模型輸出的類別分佈抽樣得到生成的文本。
    3. 使用imdb-review數據集驗證文本分類模型。
    4. 使用一個古詩數據集驗證文本生成模型。

            這階段涉及到的代碼比較簡單因此接下來會重點描述RNN語言相關模型中涉及到的數學原理和工程方法。

    數學原理

    文本分類模型

            可以把文本看成是一個詞的序列\(W=[w_1, w_2, …, w_T]\), 在訓練數據集中每個文本屬於一個類別\(a_i\), \(a_i∈A\), 集合 \(A = \{ a_1, a_2, …, a_k \}\) 是一個類別別集合. 分類模型要做的是給定一個文本W, 計算所有類別的后驗概率:

    \[P(a_i|W) = P(a_i|w_1,w_2,…,w_T), \quad i=1,2,…k \]

            那麼文本序列W的類別為:

    \[a = arg \max_{a_i} P(a_i|w_1,w_2,…,w_T) \]

            即在給定文本的條件下, 具有最大后驗概率的類別就是文本序列W所屬的類別.

    文本預測模型

            設任意一個文本序列為\(W=[w_1,w_2,…,W_T]\), 任意一個詞\(w_i ∈ V\), V是所有詞彙的集合,也叫詞彙表, 這裏需要強調的是\(w_i\)在V中是無序的, 但在W中是有序的, 文本預測的任務是, 計算任意一個詞\(w_i ∈ V\)在給定一個序列中的任意一個位置出現的概率:

    \[P(w_1,…,W_T) = ∏_{t=1}^T P(w_t|w_1,…,w_{t-1}) \]

            文本預測輸出一個\(w_i ∈ V\)的分佈列, 根據這個分佈列從V中抽取一個詞即為預測結果。不同於分類任務,這裏不是取概率最大的詞, 這裏的預測結果是某個詞出現的在一個序列特定位置的個概率,只要概率不是0都有可能出現,所以要用抽樣的方法確定某次預測的結果。

    詞的数字化表示

            任意一條數據在送入模型之前都要表示為一個数字化的向量, 文本數據也不例外。一個文本可以看成詞的序列,因此只要把詞数字化了,文本自然也就数字化了。對於詞來說,最簡單的方式是用詞在詞彙表中的唯一ID來表示, ID需要遵守兩個最基本的規則:

    1. 每個詞的ID在詞彙表中必須是唯一的.
    2. 每個詞的ID一旦確定不能變化.

            這種表示很難表達詞之間的關係, 例如: 在詞彙表中把”好”的ID指定為100, 如果希望ID能夠反映詞意的關係, 需要把”好”的近意詞: “善”, “美”, “良”, “可以”編碼為98, 99, 101, 102. 目前為止這看起還行. 如果還希望ID能夠反映詞之間的語法關係, “好”前後經常出現的詞: “友”, “人”, “的”, 這幾個詞的ID就很難選擇, 不論怎樣, 都會發現兩個詞它們在語義和語法上的關係都很遠,但ID卻很接近。這也說明了標量的表達能力很有限,無法表達多個維度的關係。為了能夠表達詞之間多個維度的的關係,多維向量是一個很好的選擇. 向量之間的夾大小衡量它們之間的關係:

    \[cos(θ) = \frac{<A, B>}{|A||B|} \]

            對於兩個向量A, B使用它們的點積, 模的乘積就能得到夾角θ餘弦值。當cos(θ)->1表示兩個向量的相似度高, cos(θ)->0 表示兩個向量是不相關的, cos(θ)->-1 表示兩個向量是相反的。

            把詞的ID轉換成向量,最簡單的辦法是使用one-hot編碼, 這樣得到的向量有兩個問題:

    1. 任意兩個向量A,B, <A,B>=0, 夾角的餘弦值cos(θ)=0, 不能表達詞之間的關係.
    2. 向量的維度等於詞彙表的大小, 而且是稀疏向量,這和導致模型有大量的參數,模型訓練過程的運算量也很大.

            詞嵌入技術就是為解決詞表示的問題而提出的。詞嵌入把詞ID映射到一個合適維度的向量空間中, 在這個向量空間中為每個ID分配一個唯一的向量, 把這些向量當成參數看待, 在特定任務的模型中學習這些參數。當模型訓練完成后, 這些向量就是詞在這個特定任務中的一個合適的表示。詞嵌入向量的訓練步驟有:

    1. 收集訓練數據集中的詞彙, 構建詞彙表。
    2. 為詞彙表中的每個詞分配一個唯一的ID。假設詞彙表中的詞彙量是N, 詞ID的取值為:0,1,2,…,N-1, 對人任意一個0<ID<N-1, 必然存在ID-1, ID+1.
    3. 隨機初始化N個D維嵌入向量, 向量的索引為0,1,2,…,N-1. 這樣詞ID就成了向量的索引.
    4. 定義一個模型, 把嵌入向量作為模型的輸入層參与訓練.
    5. 訓練模型.

    嵌入層實現

            代碼: cutedl/rnn_layers.py, Embedding類.

            初始化嵌入向量, 嵌入向量使用(-1, 1)區間均勻分佈的隨機變量初始化:

    '''
    dims 嵌入向量維數
    vocabulary_size 詞彙表大小
    need_train 是否需要訓練嵌入向量
    '''
    def __init__(self, dims, vocabulary_size, need_train=True):
        #初始化嵌入向量
        initializer = self.weight_initializers['uniform']
        self.__vecs = initializer((vocabulary_size, dims))
    
        super().__init__()
    
        self.__params = None
        if need_train:
            self.__params = []
            self.__cur_params = None
            self.__in_batch = None
    

            初始化層參數時把所有的嵌入向量變成參与訓練的參數:

    def init_params(self):
        if self.__params is None:
            return
    
        voc_size, _ = self.__vecs.shape
        for i in range(voc_size):
            pname = 'weight_%d'%i
            p = LayerParam(self.name, pname, self.__vecs[i])
            self.__params.append(p)
    

            向前傳播時, 把形狀為(m, t)的數據轉換成(m, t, n)形狀的數據, 其中t是序列長度, n是嵌入向量的維數.

    '''
    in_batch shape=(m, T)
    return shape (m, T, dims)
    '''
    def forward(self, in_batch, training):
        m,T = in_batch.shape
        outshape = (m, T, self.outshape[-1])
        out = np.zeros(outshape)
    
        #得到每個序列的嵌入向量表示
        for i in range(m):
            out[i] = self.__vecs[in_batch[i]]
    
        if training and self.__params is not None:
            self.__in_batch = in_batch
    
        return out
    

            反向傳播時只關注當前批次使用到的向量, 注意同一個向量可能被多次使用, 需要累加同一個嵌入向量的梯度.

    def backward(self, gradient):
        if self.__params is None:
            return
    
        #pdb.set_trace()
        in_batch = self.__in_batch
        params = {}
        m, T, _ = gradient.shape
        for i in range(m):
            for t in range(T):
                grad = gradient[i, t]
                idx = self.__in_batch[i, t]
    
                #更新當前訓練批次的梯度
                if idx not in params:
                    #當前批次第一次發現該嵌入向量
                    params[idx] = self.__params[idx]
                    params[idx].gradient = grad
                else:
                    #累加當前批次梯度
                    params[idx].gradient += grad
    
        self.__cur_params = list(params.values())
    

    驗證

    imdb-review數據集上的分類模型

            代碼: examples/rnn/text_classify.py.

            數據集下載地址: https://pan.baidu.com/s/13spS_Eac_j0uRvCVi7jaMw 密碼: ou26

    數據集處理

            數據集處理時有幾個需要注意的地方:

    1. imdb-review數據集由長度不同的文本構成, 送入模型的數據形狀為(m, t, n), 至少要求一個批次中的數據具有相同的序列長度, 因此在對數據進行分批時, 對數據按批次填充.
    2. 一般使用0為填充編碼. 在構建詞彙表時, 假設有v個詞彙, 詞彙的編碼為1,2,…,v.
    3. 由於對文本進行分詞, 編碼比較耗時。可以把編碼后的數據保存起來,作為數據集的預處理數據, 下次直接加載使用。

    模型

    def fit_gru():
        print("fit gru")
        model = Model([
                    rnn.Embedding(64, vocab_size+1),
                    wrapper.Bidirectional(rnn.GRU(64), rnn.GRU(64)),
                    nn.Filter(),
                    nn.Dense(64),
                    nn.Dropout(0.5),
                    nn.Dense(1, activation='linear')
                ])
        model.assemble()
        fit('gru', model)
    

            訓練報告:

    這個模型和tensorflow給出的模型略有差別, 少了一個RNN層wrapper.Bidirectional(rnn.GRU(32), rnn.GRU(32)), 這個模型經過16輪的訓練達到了tensorflow模型的水平.

    文本生成模型

            我自己收集了一個古由詩詞構成的小型數據集, 用來驗證文本生成模型. 代碼: examples/rnn/text_gen.py.

            數據集下載地址: https://pan.baidu.com/s/14oY_wol0d9hE_9QK45IkzQ 密碼: 5f3c

            模型定義:

    def fit_gru():
        vocab_size = vocab.size()
        print("vocab size: ", vocab_size)
        model = Model([
                    rnn.Embedding(256, vocab_size),
                    rnn.GRU(1024, stateful=True),
                    nn.Dense(1024),
                    nn.Dropout(0.5),
                    nn.Dense(vocab_size, activation='linear')
                ])
    
        model.assemble()
        fit("gru", model)
    

            訓練報告:

            生成七言詩:

    def gen_text():
        mpath = model_path+"gru"
    
        model = Model.load(mpath)
        print("loadding model finished")
        outshape = (4, 7)
    
        print("vocab size: ", vocab.size())
    
        def do_gen(txt):
            #編碼
            #pdb.set_trace()
            res = vocab.encode(sentence=txt)
    
            m, n = outshape
    
            for i in range(m*n - 1):
                in_batch = np.array(res).reshape((1, -1))
                preds = model.predict(in_batch)
                #取最後一維的預測結果
                preds = preds[:, -1]
                outs = dlmath.categories_sample(preds, 1)
                res.append(outs[0,0])
    
            #pdb.set_trace()
            txt = ""
            for i in range(m):
                txt = txt + ''.join(vocab.decode(res[i*n:(i+1)*n])) + "\n"
    
            return txt
    
    
        starts = ['雲', '故', '畫', '花']
        for txt in starts:
            model.reset()
            res = do_gen(txt)
            print(res)
    

            生成的文本:

    雲填纜首月悠覺
    纜濯醉二隱隱白
    湖杖雨遮雙雨鄉
    焉秣都滄楓寓功
    
    故民民時都人把
    陳雨積存手菜破
    好纜簾二龍藕卻
    趣晚城矣中村桐
    
    畫和春覺上蓋騎
    滿楚事勝便京兵
    肯霆唇恨朔上楊
    志月隨肯八焜著
    
    花夜維他客陳月
    客到夜狗和悲布
    關欲摻似瓦闊靈
    山商過牆灘幽惘
    

            是不是很像李商隱的風格?

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※帶您來了解什麼是 USB CONNECTOR  ?

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    ※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

    ※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※教你寫出一流的銷售文案?

  • RocketMQ系列(六)批量發送與過濾

    RocketMQ系列(六)批量發送與過濾

    今天我們再來看看RocketMQ的另外兩個小功能,消息的批量發送和過濾。這兩個小功能提升了我們使用RocketMQ的效率。

    批量發送

    以前我們發送消息的時候,都是一個一個的發送,這樣效率比較低下。能不能一次發送多個消息呢?當然是可以的,RocketMQ為我們提供了這樣的功能。但是它也有一些使用的條件:

    • 同一批發送的消息的Topic必須相同;
    • 同一批消息的waitStoreMsgOK 必須相同;
    • 批量發送的消息不支持延遲,就是上一節說的延遲消息;
    • 同一批次的消息,大小不能超過1MiB;

    好了,只要我們滿足上面的這些限制,就可以使用批量發送了,我們來看看發送端的代碼吧,

    @Test
    public void producerBatch() throws Exception {
    
        List<Message> messages = new ArrayList<>();
        for (int i = 0;i<3;i++) {
            MessageExt message = new MessageExt();
            message.setTopic("cluster-topic");
            message.setKeys("key-"+i);
            message.setBody(("this is batchMQ,my NO is "+i+"---"+new Date()).getBytes());
            messages.add(message);
        }
        SendResult sendResult = defaultMQProducer.send(messages);
        System.out.println("sendResult:" + sendResult.getSendStatus().toString());
    }
    
    • 其實批量發送很簡單,我們只是把消息放到一個List當中,然後統一的調用send方法發送就可以了。

    再來看看消費端的代碼,

    @Bean(initMethod = "start",destroyMethod = "shutdown")
    public DefaultMQPushConsumer pushConsumer()  {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultMQPushConsumer");
            consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
            consumer.subscribe("cluster-topic", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    System.out.println("msgs.size():"+msgs.size());
                    if (msgs != null && msgs.size() > 0) {
                        for (MessageExt msg : msgs) {
                            System.out.println(new String(msg.getBody()));
                        }
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            return consumer;
        }catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    
    • 消費端的代碼沒有任何的變化,正常的接收消息就可以了,我們只是打印出了msgs.size(),看看一次接收一個消息,還是一次可以批量的接收多個消息。

    我們啟動項目,批量發送一下,看看效果吧,

    發送端的日誌如下:

    sendResult:SEND_OK
    

    發送成功,看來我們批量發送的3個消息都進入到了隊列中,再看看消費端,是一次消費一個,還是一次消費3個,如下:

    msgs.size():1
    this is batchMQ,my NO is 0---Mon Jun 15 09:31:04 CST 2020
    msgs.size():1
    this is batchMQ,my NO is 1---Mon Jun 15 09:31:04 CST 2020
    msgs.size():1
    this is batchMQ,my NO is 2---Mon Jun 15 09:31:04 CST 2020
    

    看樣子是一次只消費了一個消息,那麼能不能一次消費3個消息呢?當然是可以的,不過要進行特殊的設置,

    consumer.setConsumeMessageBatchMaxSize(5);
    

    在消費端,我們設置批量消費消息的數量是5,這個值默認是1。我們再看看消費端的日誌,

    msgs.size():3
    this is batchMQ,my NO is 0---Mon Jun 15 09:35:47 CST 2020
    this is batchMQ,my NO is 1---Mon Jun 15 09:35:47 CST 2020
    this is batchMQ,my NO is 2---Mon Jun 15 09:35:47 CST 2020
    

    這次一次消費了3個消息,如果消息比較多的話,最大一次能消費5個。這就是RocketMQ的批量發送和批量消費。

    消息過濾

    其實我們在大多數情況下,使用tag標籤就能夠很好的實現消息過濾。雖然tag標籤咱們並沒有過多的介紹,其實也很好理解,就是一個子Topic的概念,咱們在構建消息message的時候,message.setTags("xxx")。然後在消費的時候,訂閱Topic的時候,也可以指定訂閱的tag,

    consumer.subscribe("cluster-topic", "*");
    

    看到那個”*”了嗎?它就是訂閱的tag,”*”代表全部的tag,如果您想訂閱其中的一個或幾個,可以使用這種方式”tagA || tagB || tagC”,這是訂閱了cluster-topic下的3個tag,其他的tag是不會被消費的。

    這裏我們所說的消息過濾比tag要高級很多,是可以支持sql的,怎麼樣?高級吧。比如:我們訂閱”a > 5 and b = ‘abc’”的消息,如下圖:

    但是,RocketMQ畢竟不是數據庫,它只能支持一些基礎的SQL語句,並不是所有的SQL都支持,

    • 数字型的支持,>, >=, <, <=, BETWEEN, =

    • 字符串支持,=, <>, IN

    • IS NULL或者IS NOT NULL

    • 邏輯判斷,ANDORNOT

    字段的類型也只是簡單的幾種,

    • 数字型,支持123,543.123,整型、浮點都可以;
    • 字符串,必須使用單引號”括起來;
    • 空值,NULL;
    • 布爾型,TRUE或者FALSE;

    並且對消費者的類型也有一定的限制,只能使用push consumer才可以進行消息過濾。好了,說了這麼多了,我們看看怎麼使用吧,消費端和生產端都要進行相應的改造,先看看生產端吧,

    @Test
    public void producerBatch() throws Exception {
    
        List<Message> messages = new ArrayList<>();
        for (int i = 0;i<3;i++) {
            MessageExt message = new MessageExt();
            message.setTopic("cluster-topic");
            message.setKeys("key-"+i);
            message.setBody(("this is batchMQ,my NO is "+i+"---"+new Date()).getBytes());
    
            int a = i+4;
            message.putUserProperty("a",String.valueOf(a));
    
            messages.add(message);
        }
        SendResult sendResult = defaultMQProducer.send(messages);
        System.out.println("sendResult:" + sendResult.getSendStatus().toString());
    }
    

    我們在之前批量發送的基礎上進行了修改,定義了a的值,等於i+4,這樣循環3次,a的值就是4,5,6。然後調用message.putUserProperty("a",String.valueOf(a))注意,在使用消息過濾的時候,這些附加的條件屬性都是通過putUserProperty方法進行設置。這裏,我們設置了a的值。再看看消費端,

    consumer.subscribe("cluster-topic", MessageSelector.bySql("a > 5"));
    

    消費端,整體上沒有變化,只是在訂閱的方法中,使用MessageSelector.bySql("a > 5"),進行了條件的過濾。有的小夥伴可能會有疑問,我既想用sql過濾又想用tag過濾怎麼辦?當然也是可以,我們可以使用MessageSelector.bySql("a > 5").byTag("xx),byTag和bySql不分前後,怎麼樣,很強大吧。我們運行一下程序,看看效果吧。

    我們啟動一下服務,報錯了,怎麼回事?錯誤信息如下:

    The broker does not support consumer to filter message by SQL92
    

    隊列不支持過濾消息,我們查詢了RocketMQ源碼中的BrokerConfig類,這個類就是對broker的一些設置,其中發現了這兩個屬性,

    // whether do filter when retry.
    private boolean filterSupportRetry = false;
    private boolean enablePropertyFilter = false;
    
    • filterSupportRetry是在重試的時候,是否支持filter;
    • enablePropertyFilter,這個就是是否支持過濾消息的屬性;

    我們把這兩個屬性在broker的配置文件改為true吧,如下:

    filterSupportRetry=true
    enablePropertyFilter=true
    

    然後,再重新部署一下我們兩主兩從的集群環境。環境部署完以後,我們再重啟應用,沒有報錯。在生產端發送一下消息看看吧,

    sendResult:SEND_OK
    

    生產端發送消息沒有問題,說明3個消息都發送成功了。再看看消費端的日誌,

    msgs.size():1
    this is batchMQ,my NO is 2---Mon Jun 15 10:59:37 CST 2020
    

    只消費了一個消息,並且這個消息中i的值是2,那麼a的值就是2+4=6,它是>5的,滿足SQL的條件,所以被消費掉了。這完全符合我們的預期。

    總結

    今天的兩個小功能還是比較有意思的,但裡邊也有需要注意的地方,

    • 消息的批量發送,只要我們滿足它的條件,然後使用List發送就可以了;批量消費,默認的消費個數是1,我們可以調整它的值,這樣就可以一次消費多個消息了;
    • 過濾消息中,最大的坑就是隊列的配置里,需要設置enablePropertyFilter=true,否則消費端在啟動的時候報不支持SQL的錯誤;

    我們在使用的時候,多加留意就可以了,有問題,評論區留言吧~

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※為什麼 USB CONNECTOR 是電子產業重要的元件?

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※台北網頁設計公司全省服務真心推薦

    ※想知道最厲害的網頁設計公司"嚨底家"!

    ※推薦評價好的iphone維修中心

  • 分析ThreadLocal的弱引用與內存泄漏問題-Java8,利用線性探測法解決hash衝突

    分析ThreadLocal的弱引用與內存泄漏問題-Java8,利用線性探測法解決hash衝突

    目錄

    一.介紹

    二.問題提出

      2.1內存原理圖

      2.2幾個問題

    三.回答問題

      3.1為什麼會出現內存泄漏

      3.2若Entry使用弱引用

      3.3弱引用配合自動回收

    四.總結  

     

     

     

    一.介紹

      之前使用ThreadLocal的時候,就聽過ThreadLocal怎麼怎麼的可能會出現內存泄漏,具體原因也沒去深究,就是一種不清不楚的狀態。最近在看JDK的源碼,其中就包含ThreadLocal,在對ThreadLocal的使用場景介紹以及源碼的分析后,對於ThreadLocal中可能存在的內存泄漏問題也搞清楚了,所以這裏專門寫一篇博客分析一下。

      在分析內存泄漏之前,先了解2個概念,就是內存泄漏和內存溢出:

      內存溢出(memory overflow):是指不能申請到足夠的內存進行使用,就會發生內存溢出,比如出現的OOM(Out Of Memory)

      內存泄漏(memory lack):內存泄露是指在程序中已經動態分配的堆內存由於某種原因未釋放或者無法釋放(已經沒有用處了,但是沒有釋放),造成系統內存的浪費,這種現象叫“內存泄露”。

      當內存泄露到達一定規模后,造成系統能申請的內存較少,甚至無法申請內存,最終導致內存溢出,所以內存泄露是導致內存溢出的一個原因。

     

    二.問題提出

    2.1內存原理圖

      下圖是程序運行中的內存分布圖,簡要介紹一下這種圖:當前線程有一個threadLocals屬性(ThreadLocalMap屬性),該map的底層是數組,每個數組元素時Entry類型,Entry類型的key是ThreadLocal類型(也就是創建的ThreadLocal對象),而value是則是ThreadLocal.set()方法設置的value。

      

      需要注意的是ThreadLocalMap的Entry,繼承自弱引用,定義如下,關於Java的引用介紹,可以參考:Java-強引用、軟引用、弱引用、虛引用

    /**
     * ThreadLocalMap中存放的元素類型,繼承了弱引用類
     */
    static class Entry extends WeakReference<ThreadLocal<?>> {
        // key對應的value,注意key是ThreadLocal類型
        Object value;
    
        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }
    

     

    2.2問題提出

      在看了上面ThreadLocal和ThreadLocalMap相關的內存分佈以及關聯后,提出這樣幾個問題:

      1.ThreadLocal為什麼會出現內存溢出?

      2.Entry的key為什麼要用弱引用?

      3.使用弱引用是否就能解決內存溢出?

      為了回答上面這3個問題,我寫了一段代碼,後面根據這段代碼進行分析:

    public void step1() {
        // some action
        
        step2();
        step3();
        
        // other action
    }
    
    // 在stepX中都會創建threadLocal對象
    public void step2() {
        ThreadLocal<String> tl = new ThreadLocal<>();
        tl.set("this is value");
    }
    public void step3() {
        ThreadLocal<Integer> tl = new ThreadLocal<>();
        tl.set(99);
    }
    

      在step1中會調用step2和step3,step2和step3都會創建ThreadLocal對象,當step2和step3執行完畢后,其中的棧內存中ThreadLocal引用就會被清除。

     

    三.回答問題

     

      

      現在針對這個圖,一步一步的分析問題,中途會得出一些臨時的結論,但是最終的結論才是正確的

     

    3.1為什麼會出現內存泄露

      現在有2點假設,本小節的分析都是基於這兩個假設之上的:

      1.Entry的key使用強引用,key對ThreadLocal對象使用強引用,也就是上面圖中連線5是強引用(key強引用ThreadLocal對象);

      2.ThreadLocalMap中不會對過期的Entry進行清理。

      上面代碼中,如果ThreadLocalMap的key使用強引用,那麼即使棧內存的ThreadLocal引用被清除,但是堆中的ThreadLocal對象卻並不會被清除,這是因為ThreadLocalMap中Entry的key對ThreadLocal對象是強引用。

      如果當前線程不結束,那麼堆中的ThreadLocal對象將會一直存在,對應的內存就不會被回收,與之關聯的Entry也不會被回收(Entry對應的value也不會被回收),當這種情況出現數量比較多的時候,未釋放的內存就會上升,就可能出現內存泄漏的問題。

      上面的結論是暫時的,有前提假設!!!最終結論還需要看後面分析。

     

    3.2若Entry使用弱引用

      

      仍舊有1個假設,就是ThreadLocalMap中不會對過期的Entry進行清理,陳舊的Entry是指Entry的key為null。

      按照源碼,Entry繼承弱引用,其Key對ThreadLocal是弱引用,也就是上圖中連線5是弱引用,連線6仍為強引用。

      同樣以上面代碼為例,step2和step3創建了ThreadLocal對象,step2和step3執行完后,棧中的ThreadLocal引用被清除了;由於堆內存中ThreadLocalMap的Entry key弱引用ThreadLocal對象,根據垃圾收集器對弱引用對象的處理:

    當垃圾收集器工作時,無論當前內存是否足夠,都會回收掉只被弱引用關聯的對象。

      此時堆中ThreadLocal對象會被gc回收(因為現在沒有對ThreadLocal的強引用,只有一個弱引用ThreadLocal對象),Entry的key為null,但是value不為null,且value也是強引用(連線6),所以Entry仍舊不能回收,只能釋放ThreadLocal的內存,仍舊可能導致內存泄漏

      在沒有自動清理陳舊Entry的前提下,即使Entry使用弱引用,仍可能出現內存泄漏。

     

    3.3弱引用配合自動回收

      通過3.2的分析,其實只要陳舊的Entry能自動被回收,就能解決內存泄漏的問題,其實JDK就是這麼做的。

      如果看過源碼,就知道,ThreadLocalMap底層使用數組來保存元素,使用“線性探測法”來解決hash衝突,關於線性探測法的介紹可以查看:利用線性探測法解決hash衝突

      在每次調用ThreadLocal類的get、set、remove這些方法的時候,內部其實都是對ThreadLocalMap進行操作,對應ThreadLocalMap的get、set、remove操作。

      重點來了!重點來了!重點來了!

      ThreadLocalMap的每次get、set、remove,都會清理過期的Entry,下面以get操作解釋,其他操作也是一個意思,大致如下:

      1.ThreadLocalMap底層用數組保存元素,當get一個Entry時,根據key的hash值(非hashCode)計算出該Entry應該出在什麼位置;

      2.計算出的位置可能會有衝突,比如預期位置是position=5,但是position=5的位置已經有其他Entry了;

      3.出現衝突后,會使用線性探測法,找position=6位置上的Entry是否匹配(匹配是指hash相同),如果匹配,則返回position=6的Entry。

      4.在這個過程中,如果position=5位置上的Entry已經是陳舊的Entry(Entry的key為null),此時position=5的key就應該被清理;

      5.光清理position=5的Entry還不夠,為了保證線性探測法的規則,需要判斷數組中的其他元素是否需要調整位置(如果需要,則調整位置),在這個過程中,也會進行清理陳舊Entry的操作。

      上面這5個步驟就保證了每次get都會清理數組中(map)的陳舊Entry,清理一個陳舊的Entry,就是下面這三行代碼:

    Entry.value = null; // 將Entry的value設為null
    table[index] = null;// 將數組中該Entry的位置設置null
    size--;	// map的size減一
    

      對於ThreadLocal的set、remove也類似這個原理。

      有了自動回收陳舊Entry的操作,需要注意的是,在這個時候,key使用弱引用就是至關重要的一點!!!

      因為key使用弱引用后,當弱引用的ThreadLocal對象被會回收后,該key的引用為null,則該Entry在下一次get、set、remove的時候就才會被清理,從未避免內存泄漏的問題。

      

    四.總結

      在上面的分析中,看到ThreadLocal基本不會出現內存泄漏的問題了,因為ThreadLocalMap中會在get、set、remove的時候清理陳舊的Entry,與Entry的key使用弱引用密不可分。

      當然我們也可以在代碼中手動調用ThreadLocal的remove方法進行清除map中key為該threadLocal對象的Entry,同時清理過期的Entry。

      

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

  • 豐田奕澤怎麼樣?讓它的設計師來解答這個問題吧

    豐田奕澤怎麼樣?讓它的設計師來解答這個問題吧

    具體的策略,用我的概來概括就是“非常6+1”,六個策略+一個特別關注的事情,具體分為品牌策略、車型策略、網絡策略、區域策略、經銷商盈利策略,一個就是跟經銷店溝通支持,提供快速解決。通過非常6+1的落實,Q1結果出來,銷售了16萬,加上關閉了的4款車型,去年同比增長11%,應該說順利實現開門紅。

    如果你要問我一汽豐田近期最受關注的一款車是什麼,我絕對會毫不猶豫告訴你——奕澤,作為一汽豐田旗下首款小型SUV,奕澤憑藉設計犀利的造型、雙色的車身等諸多亮點吸引了不少人的關注,而這一次北京車展上奕澤也正式亮相,雖然並沒有上市,但是距離奕澤馳騁在中國大街小巷那一天已經比較近了,而這一次北京車展我們有幸採訪到一汽豐田汽車銷售有限公司的田青久先生以及一汽豐田銷售常務副總經理水谷雅史先生,一起看看他們能夠給我們分享哪些關於奕澤以及一汽豐田的秘密吧!

    主題:一汽豐田專訪

    時間:2018年4月25日下午2:30

    地點:中國國際會展中心E1

    受訪人:一汽豐田汽車銷售有限公司 田青久先生

    一汽豐田銷售常務副總經理水谷雅史先生

    一汽豐田銷售常務副總經理水谷雅史先生

    提問:一汽豐田的各位領導好,我想請問一下炫酷的奕澤6月份上市,有哪些亮點抓住用戶呢?

    田青久:他的亮點很多:首先,在基礎的駕駛性能上,無論是駕趣、拐彎、剎車技術性,均達到世界領先的水平。前期在珠海,我們經銷商進行的對比試駕活動,我也參与了試駕,確實有着改朝換代的感覺,跟其他的SUV完全不同,我們將儘快的安排在座的媒體老師體驗這款車。

    說起來亮點幾大方面:第一,奕動美學的設計,顏值即正義,顏值用太多的話無法說清楚,只有大家真正感受,用我的話就是一見傾心,心動不已;二見鍾情,買它回家。中看不中用也不行,是吧?首先,鑽石動力組合,搭載了Dynamic Force Engine 2.0L “噴汽流控發動機”加上10速變動器。為什麼是鑽石動力組合,它的性能達到了非常高的水平,126千瓦,熱效率達到世界領先40%。這是從中用的角度來說。中看中用之後,安全不安全呢?在安全配置上從起步車型開始,全系搭載了豐田智行安全系統(TSS系統)和10氣囊,媒體老師都很了解,這種配置一般只在豪華車型上會配備。所以,概括來說就是前所未有的顏值,前所未有的駕趣,前所未有的安全。

    提問:伴隨着消費升級,一汽豐田2018年會進行怎麼樣的調整,來應對中國汽車市場的變化?

    水谷雅史:首先感謝您對一汽豐田的關心,感謝您的提問。接下來我來回答一下您的問題。

    首先,中國市場在飛速的變化發展,這是我的認知。通過這一次的車展,我們看到新的國產車不斷出現,以及新的品牌不斷出現,新的市場在充實。同時,我們也感覺危機感。所以,我想未來的汽車市場外資品牌與純國資品牌的競爭會日趨激烈,這是毋庸置疑的。所以,在這樣的競爭環境下,各大企業必須要提升自己商品的魅力與實力才可以。

    為此,從今年開始我們開始導入了TNGA豐巢概念下的新車型。昨天大家看到奕澤IZOA這款車型,在外觀、環境貢獻能力、安全都有進化和提升,商品大幅進化,就是激烈的市場,帶給我們的價值。

    第二,我們經銷店給我們客戶提供什麼樣的服務,在未來是越來越重要的。未來,我們將吸取海外的服務經驗,將它們引進國內,給客戶提供更好的服務。謝謝您的問題。

    提問:我想問田總一個問題,大家都關注奕澤的問題,我想問一下銷量的問題,2018年銷量目標定位69.5萬台,目標設定是基於什麼樣的考慮,今年將採取什麼樣的措施來確保目標的完成?

    田青久:我們考慮了幾個因素:

    第一,外部環境。媒體老師都知道,中國車市進入穩定低速增長狀態,前些年動輒二位數高速增長時代一去不復返。

    第二,內部因素。一汽豐田正處於發展調整期,今年有四款車型退市,奕澤下半年才上市量銷,有拉伸的過程。今年的增量,需要我們通過現有車型營銷努力,提高營銷質量來解決。

    那麼我們在制訂目標的時候,不局限於這個目標,而是要挑戰71,乃至於更高,具體涉及到打法、策略的問題。總體策略是12個字:“增量為本,節奏為先,結構為王”,找增量出口,這是本源。節奏上半年贏,全年贏,把車型銷售作為調整,有明星車型。具體的策略,用我的概來概括就是“非常6+1”,六個策略+一個特別關注的事情,具體分為品牌策略、車型策略、網絡策略、區域策略、經銷商盈利策略,一個就是跟經銷店溝通支持,提供快速解決。通過非常6+1的落實,Q1結果出來,銷售了16萬,加上關閉了的4款車型,去年同比增長11%,應該說順利實現開門紅。我相信後幾個季度堅定執行策略,很好的超額完成,在69.5萬基礎上的目標。

    提問:我想問水谷雅史先生一個問題,上午一汽豐田中國發布會公布了卡羅拉雙擎E+的上市會,你能否介紹一下這款車以及戰略意義?

    水谷雅史:感謝您的提問。今天上午剛剛公布了卡羅拉雙擎E+這款車型,我認為這款車型的戰略意義,未來面向新能源需求的1號車,具體會在2019年正式進行導入。

    我認為這款卡羅拉車型,是目前為止豐田在全球銷售的混合動力的進化版、升級版。大家可能都知道,豐田電動化車型在全世界銷售了100多個國家,銷售量達到1100萬台以上。那麼這一次,在原有車型基礎上,進一步技術能力出現了插電式混合動力卡羅拉雙擎E+的這款車型。這款車型未來面對中國能源政策,是一款毫無問題的車型,而且在充電續航能力上,即使沒有油電支持下,只有純電動的情況下,也可以行駛相當長距離。同時大家也知道,卡羅拉這款車型長期受到全球歡迎,結合上新能源技術,強強聯合出來的車型。

    我們會好好準備,把這樣的好產品奉獻給中國市場。這裏我再做一個預告,2020年作為新能源政策的對應第二號車型,是EV車型。再跟大家預告一下,這款車型將成為一汽豐田在全球導入第一款奕澤的EV車型,中國是先於全世界的任何一個國家。

    提問:移動互聯網增長迅猛,應對新時代變革方面,一汽豐田在数字營銷、粉絲營銷方面,有沒有什麼創新的想法?

    田青久:這個問題我來回答,一汽豐田董事長特別提倡營銷創新,有一句話創新到不能再創新。那麼一汽豐田的創新突破點在哪?最後選在数字體驗營銷方面。為什麼選擇這個端口呢?根據統計目前汽車市場消費者75%是80、90后,而這部分客戶從小接觸互聯網,是互聯網的原住民。在中國消費市場上,移動互聯網應用越來越廣泛,所以我們把創新的突破點鎖定在這個窗口上。

    具體正在嘗試做兩方面的工作,一是我們開發了数字聯合運營平台,流量聚合,全過程進行可視化管理,支持渠道(經銷商)DCC業務,提高信息量。第二方面打造粉絲營銷矩陣,600萬保有客戶、620家經銷家、3000零件供應商的現有龐大基盤、加上以在座各位為代表的四五百家友好的媒體、廣大的員工,這五個維度加在一起,有好內容產生的話,在粉絲營銷矩陣上一次觸達接近2000萬人群,二次裂變就可以覆蓋上億的人群。

    當今時代人人都是自媒體,就是媒體平台,傳播效率不言而喻。同時建立粉絲社群,通過跟粉絲互動,及時了解粉絲需求的效果,進而通過蓄水和放水的功能,實現銷售的轉化。假設按照0.5%轉化率來算,我們有200萬的粉絲,就是有一萬台增量。所以,在這方面积極探索。目前來看是效果是非常好的。今後大家有好主意,歡迎积極跟我說一下,吸收大家的智慧,我們一起來做嘗試。

    提問:我想問田總一個問題,去年600萬達成,一汽豐田推出“安享管家”計劃。能否從客戶利好的角度,給我們介紹具體的內容?

    田青久:營銷2.0時代汽車銷售與服務之外,確實各個品牌都在研究汽車延伸價值鏈的計劃。我們“安享計劃”,是把客戶在購車、用車過程當中的痛點都覆蓋到,具體安享計劃有八個方面,包括安心二手車、純牌零件、純正精品、安心租車、AAA延保、AAA保險、貼心金融等。只要用戶購車、用車的過程當中有困難,我們都有相應的產品服務來對應到他們。

    謝謝。本站聲明:網站內容來源於http://www.auto6s.com/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※帶您來了解什麼是 USB CONNECTOR  ?

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    ※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

    ※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※教你寫出一流的銷售文案?

  • 10萬級最多人買的SUV,換代后還能不能讓你買單?

    10萬級最多人買的SUV,換代后還能不能讓你買單?

    新車還針對霧燈區域裝飾調整,而大燈光源也更換為LED光源,照明效果更出眾。煥然一新的全新哈弗H6無疑是耐看的。要知道 耐看 向來就是比較難以保證的,在面對不斷變化的消費者眼光,懂得適時微調車型造型是一件識時務的事。

    前幾天,奔赴北京,試駕了全新哈弗H6超豪型。與全新哈弗H6超豪型近距離接觸了幾天,哈弗H6已經是一款非常熟悉的SUV車型,對於它的了解也已經比較深刻;但是一款重點的全新換代,還是會讓人有不少的期待,更何況哈弗H6是一款在銷量上讓眾多同級別競品汗顏的熱銷產品,全新換代後會呈現怎樣的產品力,不得不說十分令人期待。

    畢竟這是一場試駕為主的活動,所以更多的是衝著這款車本身的試駕感受去的,那文章的開始就先聊聊全新哈弗H6超豪型的駕駛體驗好了。

    我從來不會擔心哈弗H6的底盤質感,在第一代哈弗H6上市后我就對它厚實穩重的底盤姿態感到十分的欣喜,而全新哈弗H6的底盤在此基礎上做了更加深度和全面的優化,底盤噪音降至一個很低的水平,在行駛過程中的舒適性得以充分保障;前麥弗遜,后多連桿的前後獨立懸架將路面振動過濾得比較徹底,而且避震響應動作同樣也比較利落,在面對常見顛簸路面的時候不會有晃晃悠悠的動作,行駛姿態非常從容。

    其次動力總成的變化同樣讓人印象深刻,新的1.5T渦輪增壓發動機新增了長城自己正向研發的可變氣門升程技術,新技術的引入不僅改善了油耗水平,更在功率和峰值扭矩上做了12.7%和35.7%的提升,讓哈弗H6的綜合性能有了明顯的提高。

    雙離合變速箱的標定水平也是有了長足的進步,這款濕式雙離合變速箱在實際駕駛中可以很明顯的感知到工程師在對它進行標定的時候極大程度上考慮到了行駛平順的重要性,換擋平順程度十分友好,不會有明顯突兀的頓挫感。

    除了駕駛感受給留下不錯的印象,對哈弗H6的外觀印象也越來越好了。

    全新哈弗H6超豪型藍標版的前臉有着比較大變化,新車用上了面積更大的進氣格柵,且鍍鉻裝飾條數量增加為五條;前保險杠處的通風口更換為網狀設計;而新車尾部整體造型變化不大。整車看起來更時尚、動感了。

    至於全新哈弗H6超豪型紅標版呢?外觀也是有着不容忽視的變化。與藍標版的改變相似,紅標版新車型的進氣格柵也稍微變大了,且有着5條鍍鉻裝飾條;新車還針對霧燈區域裝飾調整,而大燈光源也更換為LED光源,照明效果更出眾。

    煥然一新的全新哈弗H6無疑是耐看的。要知道 耐看 向來就是比較難以保證的,在面對不斷變化的消費者眼光,懂得適時微調車型造型是一件識時務的事。哈弗H6就很好地做到這一點,全新哈弗H6超豪型更符合當下年輕消費者的口味,潛在消費人群無疑會也進一步擴大。

    進入車內,可以感受到全新哈弗H6超豪型的座椅包裹性有了明顯改善,增強了乘坐舒適性。不僅如此,新車內飾是蒙皮經純植物提取的香料處理了,還配有pM2.5粉塵過濾系統,車內的乘坐環境更為舒適。此外,新車還在後排空調出風口下方設計了兩個USB接口,車內配備了0.82㎡的超大全景天窗、前排座椅加熱等配置,這一切都令全新哈弗H6超豪型的駕乘舒適性達到了新的高度。

    儲物空間方面,哈弗的工程師針對換擋桿前方置物盒造型進行優化,優化后的儲物盒能容納下更多更大的隨身物品。而新車的後備廂經優化后,整體容積增大近20L,實用性更強了。

    安全,也是全新哈弗H6超豪型所注重的。新車增加了四項主動安全配置,包含半自動泊車系統、ACC自適應巡航系統、360°環視系統、FCW前碰撞預警系統+AEB自動剎車系統。此外,全新哈弗H6超豪型配備了側氣簾。主/被動安全系統更加完善的哈弗H6有助於對乘客進行全方位智能防護。

    比你優秀的人不可怕,可怕的是比你優秀的人比你更努力。哈弗H6就是那個很努力的優秀“人”。作為國產緊湊型SUV市場的常勝將軍,哈弗H6一向都是走實力派路線的。試駕過很多版本的哈弗H6,可以說是看着它將自己的小毛病一個一個地改掉,到近年來接近完美的狀態。至於經多方升級的全新哈弗H6超豪型,我們有理由相信它未來將助力哈弗H6家族繼續在SUV市場叱吒風雲。

    本站聲明:網站內容來源於http://www.auto6s.com/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※為什麼 USB CONNECTOR 是電子產業重要的元件?

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※台北網頁設計公司全省服務真心推薦

    ※想知道最厲害的網頁設計公司"嚨底家"!

    ※推薦評價好的iphone維修中心

  • 跑得不夠快,配置不夠高,但TA的魅力依然擋不住

    跑得不夠快,配置不夠高,但TA的魅力依然擋不住

    聽瑪莎拉蒂堅持與寶華韋健合作隨車打造的音響系統可以帶給你在聽覺上殿堂級的享受,甚至可以調整側重地方,比如往後排照顧,就可以向後排調節,能輕易分出前排後排的區別。最神奇的是,經過無數次調校之後,發動機艙傳來的聲浪並不會影響音響效果,甚至會相互加成,帶給你獨特的聽覺享受。

    瑪莎拉蒂,一個最喜愛的超豪華品牌,沒有之一。在有的人眼裡,瑪莎拉蒂是個跑車裡的異類,沒有讓人驚艷的百公里加速成績,沒有讓人眼前一亮的高科技配置,更沒有跑車具有的戰鬥姿態,但是,可以告訴你,瑪莎拉蒂具有的是其他品牌所沒有的一種浪漫,把豪華、運動、享受都融合在一起的意式浪漫。

    在這次北京車展專門跑去瑪莎拉蒂展台個性化配置專區感受一番這種浪漫情懷,並通過看、聽、聞、觸、味全方位體驗。



    看瑪莎拉蒂全系車型就是一種享受,造型不僅獨特個性,且在不經意間會發現瑪莎拉蒂融入車裡的品牌造型,比如,你會發現在輪轂的形狀就是圍繞瑪莎拉蒂的標誌(海神三叉戟)而設計。並且全系車型都接受定製,從車身顏色、車漆種類、輪轂、制動卡鉗到內飾材料、多功能方向盤、豪華還是運動座椅、座椅面料等等,無不體現屬於自己的個性訂製。



    瑪莎拉蒂堅持與寶華韋健合作隨車打造的音響系統可以帶給你在聽覺上殿堂級的享受,甚至可以調整側重地方,比如往後排照顧,就可以向後排調節,能輕易分出前排後排的區別。最神奇的是,經過無數次調校之後,發動機艙傳來的聲浪並不會影響音響效果,甚至會相互加成,帶給你獨特的聽覺享受。



    意大利頂級皮革帶給車內獨特的芳香,讓駕駛者更覺舒適,展台專門聘請的咖啡師調出香濃的意式咖啡,帶來極致的享受。



    意大利頂級皮革以及精緻的手工縫製,無一不在體現瑪莎拉蒂在工藝方面的水平以及造車的誠意。



    最後,在瑪莎拉蒂個性化配置專區還提供了頂級廚師烹飪的意大利美食,開啟味覺享受的同時也結束了本次瑪莎拉蒂的體驗之旅。

    瑪莎拉蒂,一個百年來始終堅持做自己的個性品牌,這是喜歡它的原因,有人說,瑪莎拉蒂是一個“全靠浪(聲浪)”的品牌,這個說法對也不對,不可否認,瑪莎拉蒂在聲浪方面所下的功夫確實無人能敵,但是其在豪華體驗上也是讓人不得不翹起個大拇指說“Good”!本站聲明:網站內容來源於http://www.auto6s.com/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

    台北網頁設計公司這麼多該如何選擇?

    ※智慧手機時代的來臨,RWD網頁設計為架站首選

    ※評比南投搬家公司費用收費行情懶人包大公開

    ※回頭車貨運收費標準

  • 深入理解React:懶加載(lazy)實現原理

    目錄

    • 代碼分割
    • React的懶加載
      • import() 原理
      • React.lazy 原理
      • Suspense 原理
    • 參考

    1.代碼分割

    (1)為什麼要進行代碼分割?

    現在前端項目基本都採用打包技術,比如 Webpack,JS邏輯代碼打包後會產生一個 bundle.js 文件,而隨着我們引用的第三方庫越來越多或業務邏輯代碼越來越複雜,相應打包好的 bundle.js 文件體積就會越來越大,因為需要先請求加載資源之後,才會渲染頁面,這就會嚴重影響到頁面的首屏加載。

    而為了解決這樣的問題,避免大體積的代碼包,我們則可以通過技術手段對代碼包進行分割,能夠創建多個包並在運行時動態地加載。現在像 Webpack、 Browserify等打包器都支持代碼分割技術。

    (2)什麼時候應該考慮進行代碼分割?

    這裏舉一個平時開發中可能會遇到的場景,比如某個體積相對比較大的第三方庫或插件(比如JS版的PDF預覽庫)只在單頁應用(SPA)的某一個不是首頁的頁面使用了,這種情況就可以考慮代碼分割,增加首屏的加載速度。

    2.React的懶加載

    示例代碼:

    import React, { Suspense } from 'react';
    
    const OtherComponent = React.lazy(() => import('./OtherComponent'));
    
    function MyComponent() {
      return (
        <div>
          <Suspense fallback={<div>Loading...</div>}>
            <OtherComponent />
          </Suspense>
        </div>
      );
    }
    

    如上代碼中,通過 import() React.lazySuspense 共同一起實現了 React 的懶加載,也就是我們常說了運行時動態加載,即 OtherComponent 組件文件被拆分打包為一個新的包(bundle)文件,並且只會在 OtherComponent 組件渲染時,才會被下載到本地。

    那麼上述中的代碼拆分以及動態加載究竟是如何實現的呢?讓我們來一起探究其原理是怎樣的。

    import() 原理

    import() 函數是由TS39提出的一種動態加載模塊的規範實現,其返回是一個 promise。在瀏覽器宿主環境中一個import()的參考實現如下:

    function import(url) {
      return new Promise((resolve, reject) => {
        const script = document.createElement("script");
        const tempGlobal = "__tempModuleLoadingVariable" + Math.random().toString(32).substring(2);
        script.type = "module";
        script.textContent = `import * as m from "${url}"; window.${tempGlobal} = m;`;
    
        script.onload = () => {
          resolve(window[tempGlobal]);
          delete window[tempGlobal];
          script.remove();
        };
    
        script.onerror = () => {
          reject(new Error("Failed to load module script with URL " + url));
          delete window[tempGlobal];
          script.remove();
        };
    
        document.documentElement.appendChild(script);
      });
    }
    

    當 Webpack 解析到該import()語法時,會自動進行代碼分割。

    React.lazy 原理

    以下 React 源碼基於 16.8.0 版本

    React.lazy 的源碼實現如下:

    export function lazy<T, R>(ctor: () => Thenable<T, R>): LazyComponent<T> {
      let lazyType = {
        $$typeof: REACT_LAZY_TYPE,
        _ctor: ctor,
        // React uses these fields to store the result.
        _status: -1,
        _result: null,
      };
    
      return lazyType;
    }
    

    可以看到其返回了一個 LazyComponent 對象。

    而對於 LazyComponent 對象的解析:

    ...
    case LazyComponent: {
      const elementType = workInProgress.elementType;
      return mountLazyComponent(
        current,
        workInProgress,
        elementType,
        updateExpirationTime,
        renderExpirationTime,
      );
    }
    ...
    
    function mountLazyComponent(
      _current,
      workInProgress,
      elementType,
      updateExpirationTime,
      renderExpirationTime,
    ) { 
      ...
      let Component = readLazyComponentType(elementType);
      ...
    }
    
    // Pending = 0, Resolved = 1, Rejected = 2
    export function readLazyComponentType<T>(lazyComponent: LazyComponent<T>): T {
      const status = lazyComponent._status;
      const result = lazyComponent._result;
      switch (status) {
        case Resolved: {
          const Component: T = result;
          return Component;
        }
        case Rejected: {
          const error: mixed = result;
          throw error;
        }
        case Pending: {
          const thenable: Thenable<T, mixed> = result;
          throw thenable;
        }
        default: { // lazyComponent 首次被渲染
          lazyComponent._status = Pending;
          const ctor = lazyComponent._ctor;
          const thenable = ctor();
          thenable.then(
            moduleObject => {
              if (lazyComponent._status === Pending) {
                const defaultExport = moduleObject.default;
                lazyComponent._status = Resolved;
                lazyComponent._result = defaultExport;
              }
            },
            error => {
              if (lazyComponent._status === Pending) {
                lazyComponent._status = Rejected;
                lazyComponent._result = error;
              }
            },
          );
          // Handle synchronous thenables.
          switch (lazyComponent._status) {
            case Resolved:
              return lazyComponent._result;
            case Rejected:
              throw lazyComponent._result;
          }
          lazyComponent._result = thenable;
          throw thenable;
        }
      }
    }
    

    注:如果 readLazyComponentType 函數多次處理同一個 lazyComponent,則可能進入Pending、Rejected等 case 中。

    從上述代碼中可以看出,對於最初 React.lazy() 所返回的 LazyComponent 對象,其 _status 默認是 -1,所以首次渲染時,會進入 readLazyComponentType 函數中的 default 的邏輯,這裏才會真正異步執行 import(url)操作,由於並未等待,隨後會檢查模塊是否 Resolved,如果已經Resolved了(已經加載完畢)則直接返回moduleObject.default(動態加載的模塊的默認導出),否則將通過 throw 將 thenable 拋出到上層。

    為什麼要 throw 它?這就要涉及到 Suspense 的工作原理,我們接着往下分析。

    Suspense 原理

    由於 React 捕獲異常並處理的代碼邏輯比較多,這裏就不貼源碼,感興趣可以去看 throwException 中的邏輯,其中就包含了如何處理捕獲的異常。簡單描述一下處理過程,React 捕獲到異常之後,會判斷異常是不是一個 thenable,如果是則會找到 SuspenseComponent ,如果 thenable 處於 pending 狀態,則會將其 children 都渲染成 fallback 的值,一旦 thenable 被 resolve 則 SuspenseComponent 的子組件會重新渲染一次。

    為了便於理解,我們也可以用 componentDidCatch 實現一個自己的 Suspense 組件,如下:

    class Suspense extends React.Component {
      state = {
        promise: null
      }
    
      componentDidCatch(err) {
        // 判斷 err 是否是 thenable
        if (err !== null && typeof err === 'object' && typeof err.then === 'function') {
          this.setState({ promise: err }, () => {
            err.then(() => {
              this.setState({
                promise: null
              })
            })
          })
        }
      }
    
      render() {
        const { fallback, children } = this.props
        const { promise } = this.state
        return <>{ promise ? fallback : children }</>
      }
    }
    

    小結

    至此,我們分析完了 React 的懶加載原理。簡單來說,React利用 React.lazyimport()實現了渲染時的動態加載 ,並利用Suspense來處理異步加載資源時頁面應該如何显示的問題。

    3.參考

    代碼分割– React

    動態import – MDN – Mozilla

    proposal-dynamic-import

    React Lazy 的實現原理

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※帶您來了解什麼是 USB CONNECTOR  ?

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    ※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

    ※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※教你寫出一流的銷售文案?

  • 如何使用 Shell 腳本來查看多個服務器的端口是否打開?

    如何使用 Shell 腳本來查看多個服務器的端口是否打開?

    我們在進行服務器配置的時候,經常要查看服務器的某個端口是否已經開放。如果服務器只有一兩台的話,那很好辦,只需要使用 nc 命令一個個查看即可。

    但是,如果你的服務器是個集群,有很多台呢?那如果還一個個手動去檢查的話,效率肯定是無比低下的,年底裁員名單里肯定有你。

    在這種情況下,我們完全可以使用 Shell 腳本配合 nc 命令來達到我們的目的。而且,不管服務器有幾台,需要檢查的端口有幾個,都可以實現這樣的目標。

    在本文里,我們用 Shell 腳本來實現兩個需求:

    • 掃描多台服務器的一個端口是否打開
    • 掃描多台服務器的多個端口是否打開

    在開始之前,我們先來了解一下 nc 命令。

    nc 命令簡介

    nc 是英文單詞 netcat 的縮寫,它是通過使用 TCP 或 UDP 的網絡協議的連接來讀或寫數據,可以直接被第三方程序或腳本直接調用。

    同時,它是一款功能非常強大的網絡調試工具,因為它可以創建幾乎所有你所需要的連接方式。

    nc 工具主要有三種功能模式:連接模式、監聽模式、通道模式。它的一般使用格式如下:

    $ nc [-options] [HostName or IP] [PortNumber]
    

    接下來,我們就用 Shell 腳本結合 nc 命令來實現我們的兩個需求。

    1. 掃描多台服務器的一個端口是否打開

    在這裏,我們先把需要查詢的所有服務器地址全部放在一個 server-list.txt 文件里,每個地址單獨一行,如下:

    # cat server-list.txt
    192.168.1.2
    192.168.1.3
    192.168.1.4
    192.168.1.5
    192.168.1.6
    192.168.1.7
    

    然後,我們再用 for 循環依次掃描 server-list.txt 里對應服務器的端口是否打開。在這裏,我們掃描 22 端口是否打開。

    # vi port_scan.sh
    
    #!/bin/sh
    for server in `more server-list.txt`
    do
    #echo $i
    nc -zvw3 $server 22
    done
    

    最後,我們給這個腳本賦予可執行權限即可。

    $ chmod +x port_scan.sh
    

    之後,我們就可以用這個腳本來自動依次檢查多個服務器的 22 端口是否已打開。

    # sh port_scan.sh
    
    Connection to 192.168.1.2 22 port [tcp/ssh] succeeded!
    Connection to 192.168.1.3 22 port [tcp/ssh] succeeded!
    Connection to 192.168.1.4 22 port [tcp/ssh] succeeded!
    Connection to 192.168.1.5 22 port [tcp/ssh] succeeded!
    Connection to 192.168.1.6 22 port [tcp/ssh] succeeded!
    Connection to 192.168.1.7 22 port [tcp/ssh] succeeded!
    

    2. 掃描多台服務器的多個端口是否打開

    在這裏,我們同樣把需要查詢的所有服務器地址全部放在一個 server-list.txt 文件里,每個地址單獨一行。這裏就不重複演示了。

    與此同時,我們也把需要查詢的服務器端口放在另一個 port-list.txt 文件里,每個端口單獨一行,如下所示:

    # cat port-list.txt
    22
    80
    

    然後,我們再用 for 循環依次掃描 server-list.txt 里對應服務器 port-list.txt 所列的端口是否打開。注意,這裏用到了兩個 for 循環,第一層是服務器列表,第二層是端口列表。

    # vi multiple_port_scan.sh
    
    #!/bin/sh
    for server in `more server-list.txt`
    do
    for port in `more port-list.txt`
    do
    #echo $server
    nc -zvw3 $server $port
    echo ""
    done
    done
    

    最後,我們給這個腳本賦予可執行權限即可。

    $ chmod +x multiple_port_scan.sh
    

    之後,我們就可以用這個腳本來自動依次檢查多個服務器的多個端口是否已打開。

    # sh multiple_port_scan.sh
    Connection to 192.168.1.2 22 port [tcp/ssh] succeeded!
    Connection to 192.168.1.2 80 port [tcp/http] succeeded!
    
    Connection to 192.168.1.3 22 port [tcp/ssh] succeeded!
    Connection to 192.168.1.3 80 port [tcp/http] succeeded!
    
    Connection to 192.168.1.4 22 port [tcp/ssh] succeeded!
    Connection to 192.168.1.4 80 port [tcp/http] succeeded!
    
    Connection to 192.168.1.5 22 port [tcp/ssh] succeeded!
    Connection to 192.168.1.5 80 port [tcp/http] succeeded!
    
    Connection to 192.168.1.6 22 port [tcp/ssh] succeeded!
    Connection to 192.168.1.6 80 port [tcp/http] succeeded!
    
    Connection to 192.168.1.7 22 port [tcp/ssh] succeeded!
    Connection to 192.168.1.7 80 port [tcp/http] succeeded!
    

    公眾號:良許Linux

    有收穫?希望老鐵們來個三連擊,給更多的人看到這篇文章

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※為什麼 USB CONNECTOR 是電子產業重要的元件?

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※台北網頁設計公司全省服務真心推薦

    ※想知道最厲害的網頁設計公司"嚨底家"!

    ※推薦評價好的iphone維修中心

  • Spark文檔閱讀之二:Programming Guides – Quick Start

    Spark文檔閱讀之二:Programming Guides – Quick Start

    Quick Start: https://spark.apache.org/docs/latest/quick-start.html

     

    在Spark 2.0之前,Spark的編程接口為RDD (Resilient Distributed Dataset)。而在2.0之後,RDDs被Dataset替代。Dataset很像RDD,但是有更多優化。RDD仍然支持,不過強烈建議切換到Dataset,以獲得更好的性能。 RDD文檔: https://spark.apache.org/docs/latest/rdd-programming-guide.html Dataset文檔: https://spark.apache.org/docs/latest/sql-programming-guide.html  

    一、最簡單的Spark Shell交互分析

    scala> val textFile = spark.read.textFile("README.md")   # 構建一個Dataset
    textFile: org.apache.spark.sql.Dataset[String] = [value: string]
    
    scala> textFile.count()  # Dataset的簡單計算
    res0: Long = 104 
    
    scala> val linesWithSpark = textFile.filter(line => line.contain("Spark"))  # 由現有Dataset生成新Dataset
    res1: org.apache.spark.sql.Dataset[String] = [value: string]
    # 等價於:
    # res1 = new Dataset()
    # for line in textFile:
    #     if line.contain("Spark"):
    #         res1.append(line)
    # linesWithSpark = res1
    
    scala> linesWithSpark.count()
    res2: Long = 19
    
    # 可以將多個操作串行起來
    scala> textFile.filter(line => line.contain("Spark")).count()
    res3: Long = 19

     

    進一步的Dataset分析:

    scala> textFile.map(line => line.split(" ").size).reduce((a,b) => if (a > b) a else b)
    res12: Int = 16
    # 其實map和reduce就是兩個普通的算子,不要被MapReduce中一個map配一個reduce、先map后reduce的思想所束縛
    # map算子就是對Dataset的元素X計算fun(X),並且將所有f(X)作為新的Dataset返回
    # reduce算子其實就是通過兩兩計算fun(X,Y)=Z,將Dataset中的所有元素歸約為1個值
    
    # 也可以引入庫進行計算
    scala> import java.lang.Math
    import java.lang.Math
    
    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
    res14: Int = 16
    
    # 還可以使用其他算子
    scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
    
    # flatMap算子也是對Dataset的每個元素X執行fun(X)=Y,只不過map的res是
    #     res.append(Y),如[[Y11, Y12], [Y21, Y22]],結果按元素區分
    # 而flatMap是
    #     res += Y,如[Y11, Y12, Y21, Y22],各元素結果合在一起
    
    # groupByKey算子將Dataset的元素X作為參數傳入進行計算f(X),並以f(X)作為key進行分組,返回值為KeyValueGroupedDataset類型
    # 形式類似於(key: k; value: X1, X2, ...),不過KeyValueGroupedDataset不是一個Dataset,value列表也不是一個array
    # 注意:這裏的textFile和textFile.flatMap都是Dataset,不是RDD,groupByKey()中可以傳func;如果以sc.textFile()方法讀文件,得到的是RDD,groupByKey()中間不能傳func
    
    # identity就是函數 x => x,即返回自身的函數
    
    # KeyValueGroupedDataset的count()方法返回(key, len(value))列表,結果是Dataset類型
    
    scala> wordCounts.collect()
    res37: Array[(String, Long)] = Array((online,1), (graphs,1), ...
    # collect操作:將分佈式存儲在集群上的RDD/Dataset中的所有數據都獲取到driver端

     

    數據的cache:

    scala> linesWithSpark.cache()  # in-memory cache,讓數據在分佈式內存中緩存
    res38: linesWithSpark.type = [value: string]
    
    scala> linesWithSpark.count()
    res41: Long = 19

     

    二、最簡單的獨立Spark任務(spark-submit提交)

    需提前安裝sbt,sbt是scala的編譯工具(Scala Build Tool),類似java的maven。 brew install sbt   1)編寫SimpleApp.scala

    import org.apache.spark.sql.SparkSession
    
    object SimpleApp {
        def main(args: Array[String]) {
            val logFile = "/Users/dxm/work-space/spark-2.4.5-bin-hadoop2.7/README.md"
            val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
            val logData = spark.read.textFile(logFile).cache()
            val numAs = logData.filter(line => line.contains("a")).count()  # 包含字母a的行數
            val numBs = logData.filter(line => line.contains("b")).count()  # 包含字母b的行數
            println(s"Lines with a: $numAs, Lines with b: $numBs")
            spark.stop()
        }
    }

     

    2)編寫sbt依賴文件build.sbt

    name := "Simple Application"
    
    version := "1.0"
    
    scalaVersion := "2.12.10"
    
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"

     

    其中,”org.apache.spark” %% “spark-sql” % “2.4.5”這類庫名可以在網上查到,例如https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10/1.0.0

     

    3)使用sbt打包 目錄格式如下,如果SimpleApp.scala和build.sbt放在一個目錄下會編不出來

    $ find .
    .
    ./build.sbt
    ./src
    ./src/main
    ./src/main/scala
    ./src/main/scala/SimpleApp.scala

     

    sbt目錄格式要求見官方文檔 https://www.scala-sbt.org/1.x/docs/Directories.html

    src/
      main/
        resources/
           <files to include in main jar here>
        scala/
           <main Scala sources>
        scala-2.12/
           <main Scala 2.12 specific sources>
        java/
           <main Java sources>
      test/
        resources
           <files to include in test jar here>
        scala/
           <test Scala sources>
        scala-2.12/
           <test Scala 2.12 specific sources>
        java/
           <test Java sources>

     

    使用sbt打包

    # 打包
    $ sbt package
    ...
    [success] Total time: 97 s (01:37), completed 2020-6-10 10:28:24
    # jar包位於 target/scala-2.12/simple-application_2.12-1.0.jar

     

    4)提交並執行Spark任務

    $ bin/spark-submit --class "SimpleApp" --master spark://xxx:7077 ../scala-tests/SimpleApp/target/scala-2.12/simple-application_2.12-1.0.jar
    # 報錯:Caused by: java.lang.ClassNotFoundException: scala.runtime.LambdaDeserialize
    # 參考:https://stackoverflow.com/questions/47172122/classnotfoundexception-scala-runtime-lambdadeserialize-when-spark-submit
    # 這是spark版本和scala版本不匹配導致的

     

    查詢spark所使用的scala的版本

    $ bin/spark-shell --master spark://xxx:7077
    
    scala> util.Properties.versionString
    res0: String = version 2.11.12

     

    修改build.sbt: scalaVersion := “2.11.12” 從下載頁也可驗證,下載的spark 2.4.5使用的是scala 2.11  

     

    重新sbt package,產出位置變更為target/scala-2.11/simple-application_2.11-1.0.jar 再次spark-submit,成功

     

    $ bin/spark-submit --class "SimpleApp" --master spark://xxx:7077 ../scala-tests/SimpleApp/target/scala-2.11/simple-application_2.11-1.0.jar 
    Lines with a: 61, Lines with b: 30

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

    台北網頁設計公司這麼多該如何選擇?

    ※智慧手機時代的來臨,RWD網頁設計為架站首選

    ※評比南投搬家公司費用收費行情懶人包大公開

    ※回頭車貨運收費標準