標籤: 網頁設計公司推薦

  • 動手造輪子:實現簡單的 EventQueue

    動手造輪子:實現簡單的 EventQueue

    動手造輪子:實現簡單的 EventQueue

    Intro

    最近項目里有遇到一些併發的問題,想實現一個隊列來將併發的請求一個一個串行處理,可以理解為使用消息隊列處理併發問題,之前實現過一個簡單的 EventBus,於是想在 EventBus 的基礎上改造一下,加一個隊列,改造成類似消息隊列的處理模式。消息的處理(Consumer)直接使用 .netcore 里的 IHostedService 來實現了一個簡單的後台任務處理。

    初步設計

    • Event 抽象的事件
    • EventHandler 處理 Event 的方法
    • EventStore 保存訂閱 Event 的 EventHandler
    • EventQueue 保存 Event 的隊列
    • EventPublisher 發布 Event
    • EventConsumer 處理 Event 隊列里的 Event
    • EventSubscriptionManager 管理訂閱 Event 的 EventHandler

    實現代碼

    EventBase 定義了基本事件信息,事件發生時間以及事件的id:

    public abstract class EventBase
    {
        [JsonProperty]
        public DateTimeOffset EventAt { get; private set; }
    
        [JsonProperty]
        public string EventId { get; private set; }
    
        protected EventBase()
        {
          this.EventId = GuidIdGenerator.Instance.NewId();
          this.EventAt = DateTimeOffset.UtcNow;
        }
    
        [JsonConstructor]
        public EventBase(string eventId, DateTimeOffset eventAt)
        {
          this.EventId = eventId;
          this.EventAt = eventAt;
        }
    }

    EventHandler 定義:

    public interface IEventHandler
    {
        Task Handle(IEventBase @event);
    }
    
    public interface IEventHandler<in TEvent> : IEventHandler where TEvent : IEventBase
    {
        Task Handle(TEvent @event);
    }
    
    public class EventHandlerBase<TEvent> : IEventHandler<TEvent> where TEvent : EventBase
    {
        public virtual Task Handle(TEvent @event)
        {
            return Task.CompletedTask;
        }
    
        public Task Handle(IEventBase @event)
        {
            return Handle(@event as TEvent);
        }
    }

    EventStore:

    public class EventStore
    {
        private readonly Dictionary<Type, Type> _eventHandlers = new Dictionary<Type, Type>();
    
        public void Add<TEvent, TEventHandler>() where TEventHandler : IEventHandler<TEvent> where TEvent : EventBase
        {
            _eventHandlers.Add(typeof(TEvent), typeof(TEventHandler));
        }
    
        public object GetEventHandler(Type eventType, IServiceProvider serviceProvider)
        {
            if (eventType == null || !_eventHandlers.TryGetValue(eventType, out var handlerType) || handlerType == null)
            {
                return null;
            }
            return serviceProvider.GetService(handlerType);
        }
    
        public object GetEventHandler(EventBase eventBase, IServiceProvider serviceProvider) =>
            GetEventHandler(eventBase.GetType(), serviceProvider);
    
        public object GetEventHandler<TEvent>(IServiceProvider serviceProvider) where TEvent : EventBase =>
            GetEventHandler(typeof(TEvent), serviceProvider);
    }

    EventQueue 定義:

    public class EventQueue
    {
        private readonly ConcurrentDictionary<string, ConcurrentQueue<EventBase>> _eventQueues =
            new ConcurrentDictionary<string, ConcurrentQueue<EventBase>>();
    
        public ICollection<string> Queues => _eventQueues.Keys;
    
        public void Enqueue<TEvent>(string queueName, TEvent @event) where TEvent : EventBase
        {
            var queue = _eventQueues.GetOrAdd(queueName, q => new ConcurrentQueue<EventBase>());
            queue.Enqueue(@event);
        }
    
        public bool TryDequeue(string queueName, out EventBase @event)
        {
            var queue = _eventQueues.GetOrAdd(queueName, q => new ConcurrentQueue<EventBase>());
            return queue.TryDequeue(out @event);
        }
    
        public bool TryRemoveQueue(string queueName)
        {
            return _eventQueues.TryRemove(queueName, out _);
        }
    
        public bool ContainsQueue(string queueName) => _eventQueues.ContainsKey(queueName);
    
        public ConcurrentQueue<EventBase> this[string queueName] => _eventQueues[queueName];
    }

    EventPublisher:

    public interface IEventPublisher
    {
        Task Publish<TEvent>(string queueName, TEvent @event)
            where TEvent : EventBase;
    }
    public class EventPublisher : IEventPublisher
    {
        private readonly EventQueue _eventQueue;
    
        public EventPublisher(EventQueue eventQueue)
        {
            _eventQueue = eventQueue;
        }
    
        public Task Publish<TEvent>(string queueName, TEvent @event)
            where TEvent : EventBase
        {
            _eventQueue.Enqueue(queueName, @event);
            return Task.CompletedTask;
        }
    }

    EventSubscriptionManager:

    public interface IEventSubscriptionManager
    {
        void Subscribe<TEvent, TEventHandler>()
            where TEvent : EventBase
            where TEventHandler : IEventHandler<TEvent>;
    }
    
    public class EventSubscriptionManager : IEventSubscriptionManager
    {
        private readonly EventStore _eventStore;
    
        public EventSubscriptionManager(EventStore eventStore)
        {
            _eventStore = eventStore;
        }
    
        public void Subscribe<TEvent, TEventHandler>()
            where TEvent : EventBase
            where TEventHandler : IEventHandler<TEvent>
        {
            _eventStore.Add<TEvent, TEventHandler>();
        }
    }
    

    EventConsumer:

    public class EventConsumer : BackgroundService
    {
        private readonly EventQueue _eventQueue;
        private readonly EventStore _eventStore;
        private readonly int maxSemaphoreCount = 256;
        private readonly IServiceProvider _serviceProvider;
        private readonly ILogger _logger;
    
        public EventConsumer(EventQueue eventQueue, EventStore eventStore, IConfiguration configuration, ILogger<EventConsumer> logger, IServiceProvider serviceProvider)
        {
            _eventQueue = eventQueue;
            _eventStore = eventStore;
            _logger = logger;
            _serviceProvider = serviceProvider;
        }
    
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            using (var semaphore = new SemaphoreSlim(Environment.ProcessorCount, maxSemaphoreCount))
            {
                while (!stoppingToken.IsCancellationRequested)
                {
                    var queues = _eventQueue.Queues;
                    if (queues.Count > 0)
                    {
                        await Task.WhenAll(
                        queues
                            .Select(async queueName =>
                            {
                                if (!_eventQueue.ContainsQueue(queueName))
                                {
                                    return;
                                }
                                try
                                {
                                    await semaphore.WaitAsync(stoppingToken);
                                    //
                                    if (_eventQueue.TryDequeue(queueName, out var @event))
                                    {
                                        var eventHandler = _eventStore.GetEventHandler(@event, _serviceProvider);
                                        if (eventHandler is IEventHandler handler)
                                        {
                                            _logger.LogInformation(
                                                "handler {handlerType} begin to handle event {eventType}, eventId: {eventId}, eventInfo: {eventInfo}",
                                                eventHandler.GetType().FullName, @event.GetType().FullName,
                                                @event.EventId, JsonConvert.SerializeObject(@event));
    
                                            try
                                            {
                                                await handler.Handle(@event);
                                            }
                                            catch (Exception e)
                                            {
                                                _logger.LogError(e, "event  {eventId}  handled exception", @event.EventId);
                                            }
                                            finally
                                            {
                                                _logger.LogInformation("event {eventId} handled", @event.EventId);
                                            }
                                        }
                                        else
                                        {
                                            _logger.LogWarning(
                                                "no event handler registered for event {eventType}, eventId: {eventId}, eventInfo: {eventInfo}",
                                                @event.GetType().FullName, @event.EventId,
                                                JsonConvert.SerializeObject(@event));
                                        }
                                    }
                                }
                                catch (Exception ex)
                                {
                                    _logger.LogError(ex, "error running EventConsumer");
                                }
                                finally
                                {
                                    semaphore.Release();
                                }
                            })
                    );
                    }
    
                    await Task.Delay(50, stoppingToken);
                }
            }
        }
    }

    為了方便使用定義了一個 Event 擴展方法:

    public static IServiceCollection AddEvent(this IServiceCollection services)
    {
        services.TryAddSingleton<EventStore>();
        services.TryAddSingleton<EventQueue>();
        services.TryAddSingleton<IEventPublisher, EventPublisher>();
        services.TryAddSingleton<IEventSubscriptionManager, EventSubscriptionManager>();
    
        services.AddSingleton<IHostedService, EventConsumer>();
        return services;
    }

    使用示例

    定義 PageViewEvent 記錄請求信息:

    public class PageViewEvent : EventBase
    {
        public string Path { get; set; }
    }

    這裏作為示例只記錄了請求的Path信息,實際使用可以增加更多需要記錄的信息

    定義 PageViewEventHandler,處理 PageViewEvent

    public class PageViewEventHandler : EventHandlerBase<PageViewEvent>
    {
        private readonly ILogger _logger;
    
        public PageViewEventHandler(ILogger<PageViewEventHandler> logger)
        {
            _logger = logger;
        }
    
        public override Task Handle(PageViewEvent @event)
        {
            _logger.LogInformation($"handle pageViewEvent: {JsonConvert.SerializeObject(@event)}");
            return Task.CompletedTask;
        }
    }

    這個 handler 里什麼都沒做只是輸出一個日誌

    這個示例項目定義了一個記錄請求路徑的事件以及一個發布請求記錄事件的中間件

    // 發布 Event 的中間件
    app.Use(async (context, next) =>
    {
        var eventPublisher = context.RequestServices.GetRequiredService<IEventPublisher>();
        await eventPublisher.Publish("pageView", new PageViewEvent() { Path = context.Request.Path.Value });
        await next();
    });

    Startup 配置:

    public void ConfigureServices(IServiceCollection services)
    {
        // ...
        services.AddEvent();
        services.AddSingleton<PageViewEventHandler>();// 註冊 Handler
    }
    
    // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
    public void Configure(IApplicationBuilder app, IHostingEnvironment env, IEventSubscriptionManager eventSubscriptionManager)
    {
        eventSubscriptionManager.Subscribe<PageViewEvent, PageViewEventHandler>();
        app.Use(async (context, next) =>
        {
            var eventPublisher = context.RequestServices.GetRequiredService<IEventPublisher>();
            await eventPublisher.Publish("pageView", new PageViewEvent() { Path = context.Request.Path.Value });
            await next();
        });
        // ...
    }

    使用效果:

    More

    注:只是一個初步設計,基本可以實現功能,還是有些不足,實際應用的話還有一些要考慮的事情

    1. Consumer 消息邏輯,現在的實現有些問題,我們的應用場景目前比較簡單還可以滿足,如果事件比較多就會而且每個事件可能處理需要的時間長短不一樣,會導致在一個批次中執行的 Event 中已經完成的事件要等待其他還沒完成的事件完成之後才能繼續取下一個事件,理想的消費模式應該是各個隊列相互獨立,在同一個隊列中保持順序消費即可
    2. 上面示例的 EventStore 的實現只是簡單的實現了一個事件一個 Handler 的處理情況,實際業務場景中很可能會有一個事件需要多個 Handler 的情況
    3. 這個實現是基於內存的,如果要在分佈式場景下使用就不適用了,需要自己實現一下基於redis或者數據庫的以滿足分佈式的需求
    4. and more…

    上面所有的代碼可以在 Github 上獲取,示例項目 Github 地址:

    Reference

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

  • 推薦算法之用矩陣分解做協調過濾——LFM模型

    隱語義模型(Latent factor model,以下簡稱LFM),是基於矩陣分解的推薦算法,在其基本算法上引入L2正則的FunkSVD算法在推薦系統領域更是廣泛使用,在Spark上也有其實現。本文將對 LFM原理進行詳細闡述,給出其基本算法原理。此外,還將介紹使得隱語義模型聲名大噪的算法FunkSVD和在其基礎上改進較為成功的BiasSVD。最後,對LFM進行一個較為全面的總結。

    1. 矩陣分解應用於推薦算法要解決的問題

    在推薦系統中,我們經常可能面臨的場景是:現有大量用戶和物品,以及少部分用戶對少部分物品的評分,我們需要使用現有的用戶對少部分物品的評分去推測用戶對物品集中其他物品的可能的評分,從而將預測中評分高的物品推薦給用戶。例如下面的用戶物品評分表:

    用戶\物品 物品 1 物品 2 物品 3 物品 4 物品 5
    用戶 1 3 2
    用戶 2 1 2 6
    用戶 3 3 4 6
    用戶 4 1 2 5
    用戶 5 4 2 3

    對於每個用戶,我們希望較準確的預測出其對未評分物品的評分。將m個用戶和n個物品的評分看做一個矩陣M,從而將矩陣分解應用到該場景,即可解決這一問題。而本文,將關注於矩陣分解用於到推薦的方法之一,即LFM算法的解決方案。

    2. LFM

    LFM算法的核心思想是通過隱含特徵(Latent factor)聯繫用戶和物品,該算法最早在文本挖掘領域中被提出用於找到文本的隱含語義,相關名詞還有LDATopic Model等。

    2.1 如何表示用戶的偏好和物品(item)屬性?

    在被問到這個問題時,針對MovieLens(電影評分)數據集,你可能會說用戶是否喜歡動作片,物品所屬電影類型去回答。但用戶對其他類型的電影的偏好程度呢?物品在其它類型所佔的權重又是多少呢?就算針對現有的電影類型去表徵用戶偏好和物品,那麼能否能夠完全的表示出用戶的偏好和物品屬性呢?答案是不能,比如用戶喜歡看成龍的電影這個偏好沒法表示出來,電影由誰導演,主演是誰沒法表示。但你要問我用哪些屬性去表徵呢?這個誰也無法給出一個很好的答案,粒度很難控制。

    2.2 LFM來救場

    隱語義模型較好的解決了該問題,它從數據出發,通過基於用戶行為統計的自動聚類,可指定出表徵用戶偏好和物品的向量維度,最終得到用戶的偏好向量以及物品的表徵向量。LFM通過以下公式計算用戶u對物品i的偏好:
    \[ preference(u, i)=p^T_u q_i=\sum_f^F{p_{u,k}q_{i,k}} \]
    這個公式,\(p_{u,k}\)度量了用戶u的偏好和第f個隱類的關係,\(q_{i,k}\)度量了物品i和第f個隱類的關係。

    那麼現在,我們期望用戶的評分矩陣M這樣分解:
    \[ M_{m*n}=P^T_{m*k}Q_{k*n} \]
    那麼,我們如何將矩陣分解呢?這裏採用了線性回歸的思想,即盡可能的讓用戶的評分和我們預測的評分的殘差盡可能小,也就是說,可以用均方差作為損失函數來尋找最終的PQ。考慮所有的用戶和樣本的組合,我們期望的最小化損失函數為:
    \[ \sum_{i,j}{(m_{ij}-p_i^Tq_j)^2} \]
    只要我們能夠最小化上面的式子,並求出極值所對應的\(p_i\)\(q_j\),則我們最終可以得到矩陣PQ,那麼對於任意矩陣M任意一個空白評分的位置,我們就可以通過\(p^T_i q_j\)計算預測評分。

    2.3 FunkSVD用於推薦

    上面是隱語義模型LFM的基本原理,但在實際業務中,為防止過擬合,我們常常會加入一個L2的正則化項,這也就誕生了我們的FunkSVD算法。其優化目標函數\(J(p,q)\)定義為:
    \[ \underbrace{argmin}_{p_i,q_j}\sum_{i,j}{(m_{ij}-p^T_iq_j)^2+\lambda({\Arrowvert{p_i}\Arrowvert}^2_2+{\Arrowvert{q_i}\Arrowvert}^2_2)} \]
    其中λ為正則化係數,需要調參。對於這個優化問題,我們一般通過梯度下降法來進行優化得到結果。

    將上式分別對\(p_i\)\(q_j\)求導我們得到:
    \[ \frac{\partial{J}}{\partial{p_i}}=-2(m_{ij}-p^T_iq_j)q_j+2\lambda{p_i} \]

    \[ \frac{\partial{J}}{\partial{q_j}}=-2(m_{ij}-p^T_iq_j)p_i+2\lambda{q_j} \]

    則梯度下降中迭代公式為:
    \[ p_i = p_i +\alpha((m_{ij}-p^T_iq_j)q_j-\lambda{p_i}) \]

    \[ q_j = q_j+\alpha((m_{ij}-p^T_iq_j)p_i-\lambda{q_j}) \]

    通過迭代我們最終可以得到PQ,進而用於推薦。

    為讀者進一步理解,筆者實現了基於MovieLens數據集實現了該方法。代碼詳見github:

    2.4 BiasSVD用於推薦

    BiasSVDFunkSVD較為成功的改進版算法。BiasSVD假設評分系統包括三部分的偏置因素:一些和用戶物品無關的評分因素。用戶有一些和物品無關的評分因素,稱為用戶偏置項。而物品也有一些和用戶無關的評分因素,稱為物品偏置項。這很好理解,對於樂觀的用戶來說,它的評分行為普遍偏高,而對批判性用戶來說,他的評分記錄普遍偏低,即使他們對同一物品的評分相同,但是他們對該物品的喜好程度卻並不一樣。同理,對物品來說,以電影為例,受大眾歡迎的電影得到的評分普遍偏高,而一些爛片的評分普遍偏低,這些因素都是獨立於用戶或產品的因素,而和用戶對產品的的喜好無關。

    假設評分系統平均分為μ,第i個用戶的用戶偏置項為\(b_i\),而第j個物品的物品偏置項為\(b_j\),則加入了偏置項以後的優化目標函數\(J(p_i,q_j)\)是這樣的:
    \[ \underbrace{argmin}_{p_i,q_j}\sum_{i,j}{(m_{ij}-p^T_iq_j-u-b_i-b_j)^2+\lambda({\Arrowvert{p_i}\Arrowvert}^2_2+{\Arrowvert{q_i}\Arrowvert}^2_2+{\Arrowvert{b_i}\Arrowvert}^2_2+{\Arrowvert{b_j}\Arrowvert}^2_2)} \]
    這個優化目標也可以採用梯度下降法求解。和FunkSVD不同的是,此時我們多了兩個偏執項\(b_i\)\(b_j\)\(p_i\)\(q_j\)的迭代公式和FunkSVD類似,只是每一步的梯度導數稍有不同而已。\(b_i\)\(b_j\)一般可以初始設置為0,然後參与迭代。迭代公式為:
    \[ p_i = p_i +\alpha((m_{ij}-p^T_iq_j-u-b_i-b_j)q_j-\lambda{p_i}) \]

    \[ q_j = q_j+\alpha((m_{ij}-p^T_iq_j-u-b_i-b_j)p_i-\lambda{q_j}) \]

    \[ b_i=b_i+\alpha(m_{ij}-p^T_iq_j-u-b_i-b_j-\lambda{b_i}) \]

    \[ b_j=b_j+\alpha(m_{ij}-p^T_iq_j-u-b_i-b_j-\lambda{b_j}) \]

    通過迭代我們最終可以得到PQ,進而用於推薦。BiasSVD增加了一些額外因素的考慮,因此在某些場景會比FunkSVD表現好。

    為讀者進一步理解,筆者實現了基於MovieLens數據集實現了該方法。代碼詳見github

    小結

    LFM 是一種基於機器學習的方法,具有比較好的理論基礎,通過優化一個設定的指標建立最優的模型。它實質上是矩陣分解應用到推薦的方法,其中FunkSVD更是將矩陣分解用於推薦方法推到了新的高度,在實際應用中使用非常廣泛。當然矩陣分解方法也在不停的進步,目前矩陣分解推薦算法中分解機方法(factorization machine, FM)已成為一個趨勢。

    對於矩陣分解用於推薦方法本身來說,它容易編程實現,實現複雜度低,預測效果也好,同時還能保持擴展性。這些都是其寶貴的優點。但是LFM 無法給出很好的推薦解釋,它計算出的隱類雖然在語義上確實代表了一類興趣和物品,卻很難用自然語言描述並生成解釋展現給用戶。

    LFM 在建模過程中,假設有 M 個用戶、 N 個物品、 K 條用戶對物品的行為記錄,如果是 F 個隱類,那麼它離線計算的空間複雜度是 \(O(F*(M+N))\) ,迭代 S次則時間複雜度為 \(O(K * F * S)\)。當 M(用戶數量)和 N(物品數量)很大時LFM相對於ItemCFUserCF可以很好地節省離線計算的內存,在時間複雜度由於LFM會多次迭代上所以和ItemCFUserCF沒有質的差別。

    同時,遺憾的是,LFM 無法進行在線實時推薦,即當用戶有了新的行為後,他的推薦列表不會發生變化。而從 LFM的預測公式可以看到, LFM 在給用戶生成推薦列表時,需要計算用戶對所有物品的興趣權重,然後排名,返回權重最大的 N 個物品。那麼,在物品數很多時,這一過程的時間複雜度非常高,可達 \(O(M*N*F)\) 。因此, LFM 不太適合用於物品數非常龐大的系統,如果要用,我們也需要一個比較快的算法給用戶先計算一個比較小的候選列表,然後再用LFM重新排名。另一方面,LFM 在生成一個用戶推薦列表時速度太慢,因此不能在線實時計算,而需要離線將所有用戶的推薦結果事先計算好存儲在數據庫中。

    參考:

    • 推薦系統實戰—項亮

    (歡迎轉載,轉載請註明出處。歡迎溝通交流: losstie@outlook.com)

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

    ※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

  • .NET進階篇06-async異步、thread多線程3

    .NET進階篇06-async異步、thread多線程3

    知識需要不斷積累、總結和沉澱,思考和寫作是成長的催化劑

    梯子

    一、任務Task

    System.Threading.Tasks在.NET4引入,前麵線程的API太多了,控制不方便,而ThreadPool控制能力又太弱,比如做線程的延續、阻塞、取消、超時等功能不太方便,所以Task就抽象了線程功能,在後台使用ThreadPool

    1、啟動任務

    可以使用TaskFactory類或Task類的構造函數和Start()方法,委託可以提供帶有一個Object類型的輸入參數,所以可以給任務傳遞任意數據,還漏了一個常用的Task.Run

    TaskFactory taskFactory = new TaskFactory();
    taskFactory.StartNew(() => 
    {
        Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
    });
    Task.Factory.StartNew(() =>
    {
        Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
    });
    Task task = new Task(() =>
    {
        Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
    });
    task.Start();

    只有Task類實例方式需要Start()去啟動任務,當然可以RunSynchronously()來同步執行任務,主線程會等待,就是用主線程來執行這個task任務

    Task task = new Task(() =>
    {
        Thread.Sleep(10000);
        Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
    });
    task.RunSynchronously();

    2、阻塞延續

    在Thread中我們使用join來阻塞等待,在多個Thread時進行控制就不太方便。Task中我們使用實例方法Wait阻塞單個任務或靜態方法WaitAll和WaitAny阻塞多個任務

    var task = new Task(() =>
    {
        Thread.Sleep(5*1000);
        Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
    });
    var task2 = new Task(() =>
    {
        Thread.Sleep(10 * 1000);
        Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
    });
    task.Start();
    task2.Start();
    //task.Wait();//單任務等待
    //Task.WaitAny(task, task2);//任何一個任務完成就繼續
    Task.WaitAll(task, task2);//任務都完成才繼續

    如果不希望阻塞主線程,實現當一個任務或幾個任務完成后執行別的任務,可以使用Task靜態方法WhenAll和WhenAny,他們將返回一個Task,但這個Task不允許你控制,將會在滿足WhenAll和WhenAny里任務完成時自動完成,然後調用Task的ContinueWith方法,就可以在一個任務完成后緊跟開始另一個任務

    Task.WhenAll(task, task2).ContinueWith((t) =>
    {
        Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
    });

    Task.Factory工廠中也存在類似ContinueWhenAll和ContinueWhenAny

    3、任務層次結構

    不僅可以在一個任務結束后執行另一個任務,也可以在一個任務內啟動一個任務,這就啟動了一個父子層次結構

    var parentTask = new Task(()=> 
    {
        Console.WriteLine($"parentId={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
        Thread.Sleep(5*1000);
        var childTask = new Task(() =>
        {
            Thread.Sleep(10 * 1000);
            Console.WriteLine($"childId={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}")
        });
        childTask.Start();
    });
    parentTask.Start();

    如果父任務在子任務之前結束,父任務的狀態為WaitingForChildrenToComplete,當子任務也完成時,父任務的狀態就變為RanToCompletion,當然,在創建任務時指定TaskCreationOptions枚舉參數,可以控制任務的創建和執行的可選行為

    4、枚舉參數

    簡單介紹下創建任務中的TaskCreationOptions枚舉參數,創建任務時我們可以提供TaskCreationOptions枚舉參數,用於控制任務的創建和執行的可選行為的標誌

    1. AttachedToParent:指定將任務附加到任務層次結構中的某個父級,意思就是建立父子關係,父任務必須等待子任務完成才可以繼續執行。和WaitAll效果一樣。上面例子如果在創建子任務時指定TaskCreationOptions.AttachedToParent,那麼父任務wait時也會等子任務的結束
    2. DenyChildAttach:不讓子任務附加到父任務上
    3. LongRunning:指定是長時間運行任務,如果事先知道這個任務會耗時比較長,建議設置此項。這樣,Task調度器會創建Thread線程,而不使用ThreadPool線程。因為你長時間佔用ThreadPool線程不還,那它可能必要時會在線程池中開啟新的線程,造成調度壓力
    4. PreferFairness:盡可能公平的安排任務,這意味着較早安排的任務將更可能較早運行,而較晚安排運行的任務將更可能較晚運行。實際通過把任務放到線程池的全局隊列中,讓工作線程去爭搶,默認是在本地隊列中。

    另一個枚舉參數是ContinueWith方法中的TaskContinuationOptions枚舉參數,它除了擁有幾個和上面同樣功能的枚舉值外,還擁有控制任務的取消延續等功能

    1. LazyCancellation:在延續取消的情況下,防止延續的完成直到完成先前的任務。什麼意思呢?
    CancellationTokenSource source = new CancellationTokenSource();
    source.Cancel();
    var task1 = new Task(() => 
    {
        Console.WriteLine($"task1 id={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
    });
    var task2 = task1.ContinueWith(t =>
    {
        Console.WriteLine($"task2 id={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
    },source.Token);
    var task3 = task2.ContinueWith(t =>
    {
        Console.WriteLine($"task3 id={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
    });
    task1.Start();

    上面例子我們企圖task1->task2->task3順序執行,然後通過CancellationToken來取消task2的執行。結果會是怎樣呢?結果task1和task3會并行執行(task3也是會執行的,而且是和task1并行,等於原來的一條鏈變成了兩條鏈),然後我們嘗試使用LazyCancellation,

    var task2 = task1.ContinueWith(t =>
    {
        Console.WriteLine($"task2 id={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
    },source.Token,TaskContinuationOptions.LazyCancellation,TaskScheduler.Current);

    這樣,將會在task1執行完成后,task2才去判斷source.Token,為Cancel就不執行,接下來執行task3就保證了原來的順序

    1. ExecuteSynchronously:指定應同步執行延續任務,比如上例中,在延續任務task2中指定此參數,則task2會使用執行task1的線程來執行,這樣防止線程切換,可以做些共有資源的訪問。不指定的話就隨機,但也能也用到task1的線程
    2. NotOnRanToCompletion:延續任務必須在前面任務非完成狀態下執行
    3. OnlyOnRanToCompletion:延續任務必須在前面任務完成狀態才能執行
    4. NotOnFaulted,OnlyOnCanceled,OnlyOnFaulted等等

    5、任務取消

    在上篇使用Thread時,我們使用一個變量isStop標記是否取消任務,這種訪問共享變量的方式難免會出問題。task中提出CancellationTokenSource類專門處理任務取消,常見用法看下面代碼註釋

    CancellationTokenSource source = new CancellationTokenSource();//構造函數中也可指定延遲取消
    //註冊一個取消時調用的委託
    source.Token.Register(() =>
    {
        Console.WriteLine("當前source已經取消,可以在這裏做一些其他事情(比如資源清理)...");
    });
    var task1 = new Task(() => 
    {
        while (!source.IsCancellationRequested)
        {
            Console.WriteLine($"task1 id={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
        }
    },source.Token);
    task1.Start();
    //source.Cancel();//取消
    source.CancelAfter(1000);//延時取消

    6、任務結果

    讓子線程返回結果,可以將信息寫入到線程安全的共享變量中去,或則使用可以返回結果的任務。使用Task的泛型版本Task<TResult>,就可以定義返回結果的任務。Task是繼承自Task的,Result獲取結果時是要阻塞等待直到任務完成返回結果的,內部判斷沒有完成則wait。通過TaskStatus屬性可獲得此任務的狀態是啟動、運行、異常還是取消等

    var task = new Task<string>(() =>
    {
         return "hello ketty";
    });
    task.Start();
    string result = task.Result;

    7、異常

    可以使用AggregateException來接受任務中的異常信息,這是一個聚合異常繼承自Exception,可以遍歷獲取包含的所有異常,以及進行異常處理,決定是否繼續往上拋異常等

    var task = Task.Factory.StartNew(() =>
    {
        var childTask1 = Task.Factory.StartNew(() =>
        {
            throw new Exception("childTask1異常...");
        },TaskCreationOptions.AttachedToParent);
        var childTask12= Task.Factory.StartNew(() =>
        {
            throw new Exception("childTask2異常...");
        }, TaskCreationOptions.AttachedToParent);
    });
    try
    {
        try
        {
            task.Wait();
        }
        catch (AggregateException ex)
        {
            foreach (var item in ex.InnerExceptions)
            {
                Console.WriteLine($"message{item.InnerException.Message}");
            }
            ex.Handle(x =>
            {
                if (x.InnerException.Message == "childTask1異常...")
                {
                    return true;//異常被處理,不繼續往上拋了
                }
                return false;
            });
        }
    }
    catch (Exception ex)
    {
        throw;
    }

    二、并行Parallel

    1、Parallel.For()、Parallel.ForEach()

    在.NET4中,另一個新增的抽象的線程時Parallel類。這個類定義了并行的for和foreach的靜態方法。Parallel.For()和Parallel.ForEach()方法多次調用一個方法,而Parallel.Invoke()方法允許同時調用不同的方法。首先Parallel是會阻塞主線程的,它將讓主線程也參与到任務中
    Parallel.For()類似於for允許語句,并行迭代同一個方法,迭代順序沒有保證的

    ParallelLoopResult result = Parallel.For(010, i =>
    {
        Console.WriteLine($"{i} task:{Task.CurrentId} thread:{Thread.CurrentThread.ManagedThreadId}");
    });
    Console.WriteLine(result.IsCompleted);

    也可以提前中斷Parallel.For()方法。For()方法的一個重載版本接受Action<int,parallelloopstate style=”font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;”>類型參數。一般不使用,像下面這樣,本想大於5就停止,但實際也可能有大於5的任務已經在跑了。可以通過ParallelOptions傳入允許最大線程數以及取消Token等

    ParallelLoopResult result = Parallel.For(010new ParallelOptions() { MaxDegreeOfParallelism = 8 },(i,loop) =>
    {
        Console.WriteLine($"{i} task:{Task.CurrentId} thread:{Thread.CurrentThread.ManagedThreadId}");
        if (i > 5)
        {
            loop.Break();
        }
    });

    2、Parallel.For<TLocal>

    For還有一個高級泛型版本,相當於并行的聚合計算

    ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, Func<TLocal> localInit, Func<int, ParallelLoopStateTLocalTLocal> body, Action<TLocal> localFinally);

    像下面這樣我們求0…100的和,第三個參數更定一個種子初始值,第四個參數迭代累計,最後聚合

    int totalNum = 0;
    Parallel.For<int>(0100() => { return 0; }, (current, loop, total) =>
    {
        total += current;
        return total;
    }, (total) =>
    {
        Interlocked.Add(ref totalNum, total);
    });

    上面For用來處理數組數據,ForEach()方法用來處理非數組的數據任務,比如字典數據繼承自IEnumerable的集合等

    3、Parallel.Invoke()

    Parallel.Invoke()則可以并行調用不同的方法,參數傳遞一個Action的委託數組

    Parallel.Invoke(() => { Console.WriteLine($"方法1 thread:{Thread.CurrentThread.ManagedThreadId}"); }
        , () => { Console.WriteLine($"方法2 thread:{Thread.CurrentThread.ManagedThreadId}"); }
        , () => { Console.WriteLine($"方法3 thread:{Thread.CurrentThread.ManagedThreadId}"); });

    4、PLinq

    Plinq,為了能夠達到最大的靈活度,linq有了并行版本。使用也很簡單,只需要將原始集合AsParallel就轉換為支持并行化的查詢。也可以AsOrdered來順序執行,取消Token,強制并行等

    var nums = Enumerable.Range(0100);
    var query = from n in nums.AsParallel()
                select new
                {
                    thread=$"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}"
                };

    三、異步等待AsyncAwait

    異步編程模型,可能還需要大篇幅來學習,這裏先介紹下基本用法,內在本質需要用ILSpy反編譯來看,以後可能要分專題總結。文末先給幾個參考資料,有興趣自己闊以先琢磨琢磨鴨

    1、簡單使用

    這是.NET4.5開始提供的一對語法糖,使得可以較簡便的使用異步編程。async用在方法定義前面,await只能寫在帶有async標記的方法中,任何方法都可以增加async,一般成對出現,只有async沒有意義,只有await會報錯,請先看下面的示例

    private static async void AsyncTest()
    {
        //主線程執行
        Console.WriteLine($"before await ThreadId={Thread.CurrentThread.ManagedThreadId}");
        TaskFactory taskFactory = new TaskFactory();
        Task task = taskFactory.StartNew(() =>
        {
            Thread.Sleep(3000);
            Console.WriteLine($"task ThreadId={Thread.CurrentThread.ManagedThreadId}");
        });
        await task;//主線程到這裏就返回了,執行主線程任務
        //子線程執行,其實是封裝成委託,在task之後成為回調(編譯器功能  狀態機實現) 後面相當於task.ContinueWith()
        //這個回調的線程是不確定的:可能是主線程  可能是子線程  也可能是其他線程,在winform中是主線程
        Console.WriteLine($"after await ThreadId={Thread.CurrentThread.ManagedThreadId}");
    }

    一般使用async都會讓方法返回一個Task的,像下面這樣複雜一點的

    private static async Task<stringAsyncTest2()
    {
        Console.WriteLine($"before await ThreadId={Thread.CurrentThread.ManagedThreadId}");
        TaskFactory taskFactory = new TaskFactory();
        string x = await taskFactory.StartNew(() =>
          {
              Thread.Sleep(3000);
              Console.WriteLine($"task ThreadId={Thread.CurrentThread.ManagedThreadId}");
              return "task over";
          });

        Console.WriteLine($"after await ThreadId={Thread.CurrentThread.ManagedThreadId}");
        return x;
    }

    通過var reslult = AsyncTest2().Result;調用即可。但注意如果調用Wait或Result的代碼位於UI線程,Task的實際執行在其他線程,其需要返回UI線程則會造成死鎖,所以應該Async all the way

    2、優雅

    從上面簡單示例中可以看出異步編程的執行邏輯:主線程A邏輯->異步任務線程B邏輯->主線程C邏輯
    異步方法的返回類型只能是void、Task、Task。示例中異步方法的返回值類型是Task,通常void也不推薦使用,沒有返回值直接用Task就是

    上一篇也大概了解到如果我們要在任務中更新UI,需要調用Invoke通知UI線程來更新,代碼看起來像下面這樣,在一個任務後去更新UI

    private void button1_Click(object sender, EventArgs e)
    {
        var ResultTask = Task.Run(() => {
            Thread.Sleep(5000);
            return "任務完成";
        });
        ResultTask.ContinueWith((r)=> 
        {
            textBox1.Invoke(() => {
                textBox1.Text = r.Result;
            });
        });
    }

    如果使用async/await會看起來像這樣,是不是優雅了許多。以看似同步編程的方式實現異步

    private async void button1_Click(object sender, EventArgs e)
    {
        var t = Task.Run(() => {
            Thread.Sleep(5000);
            return "任務完成";
        });
        textBox1.Text = await t;
    }

    3、最後

    在.NET 4.5中引入的Async和Await兩個新的關鍵字后,用戶能以一種簡潔直觀的方式實現異步編程。甚至都不需要改變代碼的邏輯結構,就能將原來的同步函數改造為異步函數。
    在內部實現上,Async和Await這兩個關鍵字由編譯器轉換為狀態機,通過System.Threading.Tasks中的并行類實現代碼的異步執行。

    字數有點多了,我的能力也就高考作文800字能寫的出奇好。看了很多異步編程,腦袋有點炸,等消化后再輸出一次,技藝不足,只能用輸出倒逼輸入了,下一篇會是線程安全集合、鎖問題、同步問題,基於事件的異步模式等

    Search the fucking web
    Read the fucking maunal

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

  • 同步鎖基本原理與實現

      為充分利用機器性能,人們發明了多線程。但同時帶來了線程安全問題,於是人們又發明了同步鎖。

      這個問題自然人人知道,但你真的了解同步鎖嗎?還是說你會用其中的上鎖與解鎖功能?

      今天我們就一起來深入看同步鎖的原理和實現吧!

     

    一、同步鎖的職責

      同步鎖的職責可以說就一個,限制資源的使用(線程安全從屬)。

      它一般至少會包含兩個功能: 1. 給資源加鎖; 2. 給資源解鎖;另外,它一般還有 等待/通知 即 wait/notify 的功能;

      同步鎖的應用場景:多個線程同時操作一個事務必須保證正確性;一個資源只能同時由一線程訪問操作;一個資源最多只能接入k的併發訪問;保證訪問的順序性;

      同步鎖的實現方式:操作系統調度實現;應用自行實現;CAS自旋;

      同步鎖的幾個問題:

        為什麼它能保證線程安全?

        鎖等待耗CPU嗎?

        使用鎖后性能下降嚴重的原因是啥?

     

    二、同步鎖的實現一:lock/unlock

      其實對於應用層來說,非常多就是 lock/unlock , 這也是鎖的核心。

      AQS 是java中很多鎖實現的基礎,因為它屏蔽了很多繁雜而底層的阻塞操作,為上層抽象出易用的接口。

      我們就以AQS作為跳板,先來看一下上鎖的過程。為不至於陷入具體鎖的業務邏輯中,我們先以最簡單的 CountDownLatch 看看。

        // 先看看 CountDownLatch 的基礎數據結構,可以說是不能再簡單了,就繼承了 AQS,然後簡單覆寫了幾個必要方法。
        // java.util.concurrent.CountDownLatch.Sync
        /**
         * Synchronization control For CountDownLatch.
         * Uses AQS state to represent count.
         */
        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                // 只有一種情況會獲取鎖成功,即 state == 0 的時候
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    // 原始的鎖數量是在初始化時指定的不可變的,每次釋放一個鎖標識
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        // 只有一情況會釋放鎖成功,即本次釋放后 state == 0
                        return nextc == 0;
                }
            }
        }
        private final Sync sync;

     

    重點1,我們看看上鎖過程,即 await() 的調用。

        public void await() throws InterruptedException {
            // 調用 AQS 的接口,由AQS實現了鎖的骨架邏輯
            sync.acquireSharedInterruptibly(1);
        }
        
        // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
        /**
         * Acquires in shared mode, aborting if interrupted.  Implemented
         * by first checking interrupt status, then invoking at least once
         * {@link #tryAcquireShared}, returning on success.  Otherwise the
         * thread is queued, possibly repeatedly blocking and unblocking,
         * invoking {@link #tryAcquireShared} until success or the thread
         * is interrupted.
         * @param arg the acquire argument.
         * This value is conveyed to {@link #tryAcquireShared} but is
         * otherwise uninterpreted and can represent anything
         * you like.
         * @throws InterruptedException if the current thread is interrupted
         */
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 首先嘗試獲取鎖,如果成功就不用阻塞了
            // 而從上面的邏輯我們看到,獲取鎖相當之簡單,所以,獲取鎖本身並沒有太多的性能消耗喲
            // 如果獲取鎖失敗,則會進行稍後嘗試,這應該是複雜而精巧的
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
        
        /**
         * Acquires in shared interruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            // 首先將當前線程添加排隊隊尾,此處會保證線程安全,稍後我們可以看到
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    // 獲取其上一節點,如果上一節點是頭節點,就代表當前線程可以再次嘗試獲取鎖了
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    // 先檢測是否需要阻塞,然後再進行阻塞等待,阻塞由 LockSupport 底層支持
                    // 如果阻塞后,將不會主動喚醒,只會由 unlock 時,主動被通知
                    // 因此,此處即是獲取鎖的最終等待點
                    // 操作系統將不會再次調度到本線程,直到獲取到鎖
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        // 如此線程安全地添加當前線程到隊尾? CAS 保證
        /**
         * Creates and enqueues node for current thread and given mode.
         *
         * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
         * @return the new node
         */
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
        /**
         * Inserts node into queue, initializing if necessary. See picture above.
         * @param node the node to insert
         * @return node's predecessor
         */
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
        
        // 檢測是否需要進行阻塞
        /**
         * Checks and updates status for a node that failed to acquire.
         * Returns true if thread should block. This is the main signal
         * control in all acquire loops.  Requires that pred == node.prev.
         *
         * @param pred node's predecessor holding status
         * @param node the node
         * @return {@code true} if thread should block
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                 // 只有前置節點是 SIGNAL 狀態的節點,才需要進行 阻塞等待,當然前置節點會在下一次循環中被設置好
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
        
        // park 阻塞實現
        /**
         * Convenience method to park and then check if interrupted
         *
         * @return {@code true} if interrupted
         */
        private final boolean parkAndCheckInterrupt() {
            // 將當前 AQS 實例作為鎖對象 blocker, 進行操作系統調用阻塞, 所以所有等待鎖的線程將會在同一個鎖前提下執行
            LockSupport.park(this);
            return Thread.interrupted();
        }

      如上,上鎖過程是比較簡單明了的。加入一隊列,然後由操作系統將線程調出。(那麼操作系統是如何把線程調出的呢?有興趣自行研究)

     

    重點2. 解鎖過程,即 countDown() 調用

        public void countDown() {
            // 同樣直接調用 AQS 的接口,由AQS實現了鎖的釋放骨架邏輯
            sync.releaseShared(1);
        }
        // java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
        /**
         * Releases in shared mode.  Implemented by unblocking one or more
         * threads if {@link #tryReleaseShared} returns true.
         *
         * @param arg the release argument.  This value is conveyed to
         *        {@link #tryReleaseShared} but is otherwise uninterpreted
         *        and can represent anything you like.
         * @return the value returned from {@link #tryReleaseShared}
         */
        public final boolean releaseShared(int arg) {
            // 調用業務實現的釋放邏輯,如果成功,再執行底層的釋放,如隊列移除,線程通知等等
            // 在 CountDownLatch 的實現中,只有 state == 0 時才會成功,所以它只會執行一次底層釋放
            // 這也是我們認為 CountDownLatch 能夠做到多線程同時執行的效果的原因之一
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
        
        /**
         * Release action for shared mode -- signals successor and ensures
         * propagation. (Note: For exclusive mode, release just amounts
         * to calling unparkSuccessor of head if it needs signal.)
         */
        private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (;;) {
                Node h = head;
                // 隊列不為空才進行釋放
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    // 看過上面的 lock 邏輯,我們知道只要在阻塞狀態,一定是 Node.SIGNAL 
                    if (ws == Node.SIGNAL) {
                        // 狀態改變成功,才進行後續的喚醒邏輯
                        // 因為先改變狀態成功,才算是線程安全的,再進行喚醒,否則進入下一次循環再檢查
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        // 將頭節點的下一節點喚醒,如有必要
                        unparkSuccessor(h);
                    }
                    // 這裏的 propagates, 是要傳播啥呢??
                    // 為什麼只喚醒了一個線程,其他線程也可以動了?
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
        /**
         * Wakes up node's successor, if one exists.
         *
         * @param node the node
         */
        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            // 喚醒下一個節點
            // 但如果下一節點已經取消等待了,那麼就找下一個沒最近的沒被取消的線程進行喚醒
            // 喚醒只是針對一個線程的喲
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }

     

    重要3. 線程解鎖的傳播性?

      因為從上一節的講解中,我們看到,當用戶調用 countDown 時,僅僅是讓操作系統喚醒了 head 的下一個節點線程或者最近未取消的節點。那麼,從哪裡來的所有線程都獲取了鎖從而運行呢?

      其實是在 獲取鎖的過程中,還有一點我們未看清:

        // java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared
        /**
         * Acquires in shared uninterruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        // 當countDown被調用后,head節點被喚醒,執行
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            // 獲取到鎖后,設置node為下一個頭節點,並把喚醒狀態傳播下去,而這裏面肯定會做一些喚醒其他線程的操作,請看下文
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
        
        /**
         * Sets head of queue, and checks if successor may be waiting
         * in shared mode, if so propagating if either propagate > 0 or
         * PROPAGATE status was set.
         *
         * @param node the node
         * @param propagate the return value from a tryAcquireShared
         */
        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);
            /*
             * Try to signal next queued node if:
             *   Propagation was indicated by caller,
             *     or was recorded (as h.waitStatus either before
             *     or after setHead) by a previous operation
             *     (note: this uses sign-check of waitStatus because
             *      PROPAGATE status may transition to SIGNAL.)
             * and
             *   The next node is waiting in shared mode,
             *     or we don't know, because it appears null
             *
             * The conservatism in both of these checks may cause
             * unnecessary wake-ups, but only when there are multiple
             * racing acquires/releases, so most need signals now or soon
             * anyway.
             */
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                // 如果有必要,則做一次喚醒下一線程的操作
                // 在 countDown() 不會觸發此操作,所以這裏只是一個內部調用傳播
                Node s = node.next;
                if (s == null || s.isShared())
                    // 此處鎖釋放邏輯如上,總之,又是另一次的喚醒觸發
                    doReleaseShared();
            }
        }

      到此,我們明白了它是怎麼做到一個鎖釋放,所有線程可通行的。也從根本上回答了我們猜想,所有線程同時併發運行。然而並沒有,它只是通過喚醒傳播性來依次喚醒各個等待線程的。從絕對時間性上來講,都是有先後關係的。以後可別再淺顯說是同時執行了喲。

     

    三、 鎖的切換:wait/notify

      上面看出,針對一個lock/unlock 的過程還是很簡單的,由操作系統負責大頭,實現代碼也並不多。

      但是針對稍微有點要求的場景,就會進行條件式的操作。比如:持有某個鎖運行一段代碼,但是,運行時發現某條件不滿足,需要進行等待而不能直接結束,直到條件成立。即所謂的 wait 操作。

      乍一看,wait/notify 與 lock/unlock 很像,其實不然。區分主要是 lock/unlock 是針對整個代碼段的,而 wait/notify 則是針對某個條件的,即獲取了鎖不代表條件成立了,但是條件成立了一定要在鎖的前提下才能進行安全操作。

      那麼,是否 wait/notify 也一樣的實現簡單呢?比如java的最基礎類 Object 類就提供了 wait/notify 功能。

      我們既然想一探究竟,還是以併發包下的實現作為基礎吧,畢竟 java 才是我們的強項。

      本次,咱們以  ArrayBlockingQueue#put/take 作為基礎看下這種場景的使用先。

      ArrayBlockingQueue 的put/take 特性就是,put當隊列滿時,一直阻塞,直到有可用位置才繼續運行下一步。而take當隊列為空時一樣阻塞,直到隊列里有數據才運行下一步。這種場景使用鎖主不好搞了,因為這是一個條件判斷。put/take 如下:

        // java.util.concurrent.ArrayBlockingQueue#put
        /**
         * Inserts the specified element at the tail of this queue, waiting
         * for space to become available if the queue is full.
         *
         * @throws InterruptedException {@inheritDoc}
         * @throws NullPointerException {@inheritDoc}
         */
        public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                // 當隊列滿時,一直等待
                while (count == items.length)
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
        
        // java.util.concurrent.ArrayBlockingQueue#take
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                // 當隊列為空時一直等待
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }

      看起來相當簡單,完全符合人類思維。只是,這裏使用的兩個變量進行控制流程 notFull,notEmpty. 這兩個變量是如何進行關聯的呢?

      在這之前,我們還需要補充下上面的例子,即 notFull.await(), notEmpty.await(); 被阻塞了,何時才能運行呢?如上代碼在各自的入隊和出隊完成之後進行通知就可以了。

        // 與 put 對應,入隊完成后,隊列自然就不為空了,通知下 notEmpty 就好了
        /**
         * Inserts element at current put position, advances, and signals.
         * Call only when holding lock.
         */
        private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            // 我已放入一個元素,不為空了
            notEmpty.signal();
        }
        // 與 take 對應,出隊完成后,自然就不可能是滿的了,至少一個空餘空間。
        /**
         * Extracts element at current take position, advances, and signals.
         * Call only when holding lock.
         */
        private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            // 我已移除一個元素,肯定沒有滿了,你們繼續放入吧
            notFull.signal();
            return x;
        }

      是不是超級好理解。是的。不過,我們不是想看 ArrayBlockingQueue 是如何實現的,我們是要論清 wait/notify 是如何實現的。因為畢竟,他們不是一個鎖那麼簡單。

        // 三個鎖的關係,即 notEmpty, notFull 都是 ReentrantLock 的條件鎖,相當於是其子集吧
        /** Main lock guarding all access */
        final ReentrantLock lock;
    
        /** Condition for waiting takes */
        private final Condition notEmpty;
    
        /** Condition for waiting puts */
        private final Condition notFull;
        
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
        // lock.newCondition() 是什麼鬼?它是 AQS 中實現的 ConditionObject
        // java.util.concurrent.locks.ReentrantLock#newCondition
        public Condition newCondition() {
            return sync.newCondition();
        }
            // java.util.concurrent.locks.ReentrantLock.Sync#newCondition
            final ConditionObject newCondition() {
                // AQS 中定義
                return new ConditionObject();
            }

      接下來,我們要帶着幾個疑問來看這個 Condition 的對象:

        1. 它的 wait/notify 是如何實現的?
        2. 它是如何與互相進行聯繫的?
        3. 為什麼 wait/notify 必須要在外面的lock獲取之後才能執行?
        4. 它與Object的wait/notify 有什麼相同和不同點?

      能夠回答了上面的問題,基本上對其原理與實現也就理解得差不多了。

     

    重點1. wait/notify 是如何實現的?

      我們從上面可以看到,它是通過調用 await()/signal() 實現的,到底做事如何,且看下面。

            // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()
            /**
             * Implements interruptible condition wait.
             * <ol>
             * <li> If current thread is interrupted, throw InterruptedException.
             * <li> Save lock state returned by {@link #getState}.
             * <li> Invoke {@link #release} with saved state as argument,
             *      throwing IllegalMonitorStateException if it fails.
             * <li> Block until signalled or interrupted.
             * <li> Reacquire by invoking specialized version of
             *      {@link #acquire} with saved state as argument.
             * <li> If interrupted while blocked in step 4, throw InterruptedException.
             * </ol>
             */
            public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                // 添加當前線程到 等待線程隊列中,有 lastWaiter/firstWaiter 維護
                Node node = addConditionWaiter();
                // 釋放當前lock中持有的鎖,詳情且看下文
                int savedState = fullyRelease(node);
                // 從以下開始,將不再保證線程安全性,因為當前的鎖已經釋放,其他線程將會重新競爭鎖使用
                int interruptMode = 0;
                // 循環判定,如果當前節點不在 sync 同步隊列中,那麼就反覆阻塞自己
                // 所以判斷是否在 同步隊列上,是很重要的
                while (!isOnSyncQueue(node)) {
                    // 沒有在同步隊列,阻塞
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                // 當條件被滿足后,需要重新競爭鎖,詳情看下文
                // 競爭到鎖后,原樣返回到 wait 的原點,繼續執行業務邏輯
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                // 下面是異常處理,忽略
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
        /**
         * Invokes release with current state value; returns saved state.
         * Cancels node and throws exception on failure.
         * @param node the condition node for this wait
         * @return previous sync state
         */
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();
                // 預期的,都是釋放鎖成功,如果失敗,說明當前線程並並未獲取到鎖,引發異常
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
        /**
         * Releases in exclusive mode.  Implemented by unblocking one or
         * more threads if {@link #tryRelease} returns true.
         * This method can be used to implement method {@link Lock#unlock}.
         *
         * @param arg the release argument.  This value is conveyed to
         *        {@link #tryRelease} but is otherwise uninterpreted and
         *        can represent anything you like.
         * @return the value returned from {@link #tryRelease}
         */
        public final boolean release(int arg) {
            // tryRelease 由客戶端自定義實現
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
        
        // 如何判定當前線程是否在同步隊列中或者可以進行同步隊列?
        /**
         * Returns true if a node, always one that was initially placed on
         * a condition queue, is now waiting to reacquire on sync queue.
         * @param node the node
         * @return true if is reacquiring
         */
        final boolean isOnSyncQueue(Node node) {
            // 如果上一節點還沒有被移除,當前節點就不能被加入到同步隊列
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            // 如果當前節點的下游節點已經存在,則它自身必定已經被移到同步隊列中
            if (node.next != null) // If has successor, it must be on queue
                return true;
            /*
             * node.prev can be non-null, but not yet on queue because
             * the CAS to place it on queue can fail. So we have to
             * traverse from tail to make sure it actually made it.  It
             * will always be near the tail in calls to this method, and
             * unless the CAS failed (which is unlikely), it will be
             * there, so we hardly ever traverse much.
             */
             // 最終直接從同步隊列中查找,如果找到,則自身已經在同步隊列中
            return findNodeFromTail(node);
        }
    
        /**
         * Returns true if node is on sync queue by searching backwards from tail.
         * Called only when needed by isOnSyncQueue.
         * @return true if present
         */
        private boolean findNodeFromTail(Node node) {
            Node t = tail;
            for (;;) {
                if (t == node)
                    return true;
                if (t == null)
                    return false;
                t = t.prev;
            }
        }
        
        // 當條件被滿足后,需要重新競爭鎖,以保證外部的鎖語義,因為之前自己已經將鎖主動釋放
        // 這個鎖與 lock/unlock 時的一毛一樣,沒啥可講的
        // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
        /**
         * Acquires in exclusive uninterruptible mode for thread already in
         * queue. Used by condition wait methods as well as acquire.
         *
         * @param node the node
         * @param arg the acquire argument
         * @return {@code true} if interrupted while waiting
         */
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

      總結一下 wait 的邏輯:

        1. 前提:自身已獲取到外部鎖;
        2. 將當前線程添加到 ConditionQueue 等待隊列中;
        3. 釋放已獲取到的鎖;
        4. 反覆檢查進入等待,直到當前節點被移動到同步隊列中;
        5. 條件滿足被喚醒,重新競爭外部鎖,成功則返回,否則繼續阻塞;(外部鎖是同一個,這也是要求兩個對象必須存在依賴關係的原因)
        6. wait前線程持有鎖,wait后線程持有鎖,沒有一點外部鎖變化;

     

    重點2. 釐清了 wait, 接下來,我們看 signal() 通知喚醒的實現:

            // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
            /**
             * Moves the longest-waiting thread, if one exists, from the
             * wait queue for this condition to the wait queue for the
             * owning lock.
             *
             * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
             *         returns {@code false}
             */
            public final void signal() {
                // 只有獲取鎖的實例,才可以進行signal,否則你拿什麼去保證線程安全呢
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                // 通知 firstWaiter 
                if (first != null)
                    doSignal(first);
            }
            
            /**
             * Removes and transfers nodes until hit non-cancelled one or
             * null. Split out from signal in part to encourage compilers
             * to inline the case of no waiters.
             * @param first (non-null) the first node on condition queue
             */
            private void doSignal(Node first) {
                // 最多只轉移一個 節點
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
        // 將一個節點從 等待隊列 移動到 同步隊列中,即可參与下一輪競爭
        // 只有確實移動成功才會返回 true
        // 說明:當前線程是持有鎖的線程
        // java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal
        /**
         * Transfers a node from a condition queue onto sync queue.
         * Returns true if successful.
         * @param node the node
         * @return true if successfully transferred (else the node was
         * cancelled before signal)
         */
        final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            // 同步隊列由 head/tail 指針維護
            Node p = enq(node);
            int ws = p.waitStatus;
            // 注意,此處正常情況下並不會喚醒等待線程,僅是將隊列轉移。 
            // 因為當前線程的鎖保護區域並未完成,完成后自然會喚醒其他等待線程
            // 否則將會存在當前線程任務還未執行完成,卻被其他線程搶了先去,那接下來的任務當如何??
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }

      總結一下,notify 的功能原理如下:

        1. 前提:自身已獲取到外部鎖;
        2. 轉移下一個等待隊列的節點到同步隊列中;
        3. 如果遇到下一節點被取消情況,順延到再下一節點直到為空,至多轉移一個節點;
        4. 正常情況下不做線程的喚醒操作;

      所以,實現 wait/notify, 最關鍵的就是維護兩個隊列,等待隊列與同步隊列,而且都要求是在有外部鎖保證的情況下執行。

      到此,我們也能回答一個問題:為什麼wait/notify一定要在鎖模式下才能運行?

      因為wait是等待條件成立,此時必定存在競爭需要做保護,而它自身又必須釋放鎖以使外部條件可成立,且後續需要做恢復動作;而notify之後可能還有後續工作必須保障安全,notify只是鎖的一個子集。。。

     

    四、通知所有線程的實現:notifyAll

      有時條件成立后,可以允許所有線程通行,這時就可以進行 notifyAll, 那麼如果達到通知所有的目的呢?是一起通知還是??

      以下是 AQS 中的實現:

            // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalAll
            public final void signalAll() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignalAll(first);
            }
            /**
             * Removes and transfers all nodes.
             * @param first (non-null) the first node on condition queue
             */
            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }

      可以看到,它是通過遍歷所有節點,依次轉移等待隊列到同步隊列(通知)的,原本就沒有人能同時干幾件事的!

      本文從java實現的角度去解析同步鎖的原理與實現,但並不局限於java。道理總是相通的,只是像操作系統這樣的大佬,能幹的活更純粹:比如讓cpu根本不用調度一個線程。

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

    ※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

  • Kafka冪等性原理及實現剖析

    Kafka冪等性原理及實現剖析

    1.概述

    最近和一些同學交流的時候反饋說,在面試Kafka時,被問到Kafka組件組成部分、API使用、Consumer和Producer原理及作用等問題都能詳細作答。但是,問到一個平時不注意的問題,就是Kafka的冪等性,被卡主了。那麼,今天筆者就為大家來剖析一下Kafka的冪等性原理及實現。

    2.內容

    2.1 Kafka為啥需要冪等性?

    Producer在生產發送消息時,難免會重複發送消息。Producer進行retry時會產生重試機制,發生消息重複發送。而引入冪等性后,重複發送只會生成一條有效的消息。Kafka作為分佈式消息系統,它的使用場景常見與分佈式系統中,比如消息推送系統、業務平台系統(如物流平台、銀行結算平台等)。以銀行結算平台來說,業務方作為上游把數據上報到銀行結算平台,如果一份數據被計算、處理多次,那麼產生的影響會很嚴重。

    2.2 影響Kafka冪等性的因素有哪些?

    在使用Kafka時,需要確保Exactly-Once語義。分佈式系統中,一些不可控因素有很多,比如網絡、OOM、FullGC等。在Kafka Broker確認Ack時,出現網絡異常、FullGC、OOM等問題時導致Ack超時,Producer會進行重複發送。可能出現的情況如下:

     

     

    2.3 Kafka的冪等性是如何實現的?

    Kafka為了實現冪等性,它在底層設計架構中引入了ProducerID和SequenceNumber。那這兩個概念的用途是什麼呢?

    • ProducerID:在每個新的Producer初始化時,會被分配一個唯一的ProducerID,這個ProducerID對客戶端使用者是不可見的。
    • SequenceNumber:對於每個ProducerID,Producer發送數據的每個Topic和Partition都對應一個從0開始單調遞增的SequenceNumber值。

    2.3.1 冪等性引入之前的問題?

    Kafka在引入冪等性之前,Producer向Broker發送消息,然後Broker將消息追加到消息流中后給Producer返回Ack信號值。實現流程如下:

     

    上圖的實現流程是一種理想狀態下的消息發送情況,但是實際情況中,會出現各種不確定的因素,比如在Producer在發送給Broker的時候出現網絡異常。比如以下這種異常情況的出現:

     

    上圖這種情況,當Producer第一次發送消息給Broker時,Broker將消息(x2,y2)追加到了消息流中,但是在返回Ack信號給Producer時失敗了(比如網絡異常) 。此時,Producer端觸發重試機制,將消息(x2,y2)重新發送給Broker,Broker接收到消息后,再次將該消息追加到消息流中,然後成功返回Ack信號給Producer。這樣下來,消息流中就被重複追加了兩條相同的(x2,y2)的消息。

    2.3.2 冪等性引入之後解決了什麼問題?

    面對這樣的問題,Kafka引入了冪等性。那麼冪等性是如何解決這類重複發送消息的問題的呢?下面我們可以先來看看流程圖:

     

     同樣,這是一種理想狀態下的發送流程。實際情況下,會有很多不確定的因素,比如Broker在發送Ack信號給Producer時出現網絡異常,導致發送失敗。異常情況如下圖所示:

     

     當Producer發送消息(x2,y2)給Broker時,Broker接收到消息並將其追加到消息流中。此時,Broker返回Ack信號給Producer時,發生異常導致Producer接收Ack信號失敗。對於Producer來說,會觸發重試機制,將消息(x2,y2)再次發送,但是,由於引入了冪等性,在每條消息中附帶了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber發送給Broker,而之前Broker緩存過之前發送的相同的消息,那麼在消息流中的消息就只有一條(x2,y2),不會出現重複發送的情況。

    2.3.3 ProducerID是如何生成的?

    客戶端在生成Producer時,會實例化如下代碼:

    // 實例化一個Producer對象
    Producer<String, String> producer = new KafkaProducer<>(props);

    在org.apache.kafka.clients.producer.internals.Sender類中,在run()中有一個maybeWaitForPid()方法,用來生成一個ProducerID,實現代碼如下:

     private void maybeWaitForPid() {
            if (transactionState == null)
                return;
    
            while (!transactionState.hasPid()) {
                try {
                    Node node = awaitLeastLoadedNodeReady(requestTimeout);
                    if (node != null) {
                        ClientResponse response = sendAndAwaitInitPidRequest(node);
                        if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
                            InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
                            transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
                        } else {
                            log.error("Received an unexpected response type for an InitPidRequest from {}. " +
                                    "We will back off and try again.", node);
                        }
                    } else {
                        log.debug("Could not find an available broker to send InitPidRequest to. " +
                                "We will back off and try again.");
                    }
                } catch (Exception e) {
                    log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
                }
                log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
                time.sleep(retryBackoffMs);
                metadata.requestUpdate();
            }
        }

    3.事務

    與冪等性有關的另外一個特性就是事務。Kafka中的事務與數據庫的事務類似,Kafka中的事務屬性是指一系列的Producer生產消息和消費消息提交Offsets的操作在一個事務中,即原子性操作。對應的結果是同時成功或者同時失敗。

    這裏需要與數據庫中事務進行區別,操作數據庫中的事務指一系列的增刪查改,對Kafka來說,操作事務是指一系列的生產和消費等原子性操作。

    3.1 Kafka引入事務的用途?

    在事務屬性引入之前,先引入Producer的冪等性,它的作用為:

    • Producer多次發送消息可以封裝成一個原子性操作,即同時成功,或者同時失敗;
    • 消費者&生產者模式下,因為Consumer在Commit Offsets出現問題時,導致重複消費消息時,Producer重複生產消息。需要將這個模式下Consumer的Commit Offsets操作和Producer一系列生產消息的操作封裝成一個原子性操作。

    產生的場景有:

    比如,在Consumer中Commit Offsets時,當Consumer在消費完成時Commit的Offsets為100(假設最近一次Commit的Offsets為50),那麼執行觸發Balance時,其他Consumer就會重複消費消息(消費的Offsets介於50~100之間的消息)。

    3.2 事務提供了哪些可使用的API?

    Producer提供了五種事務方法,它們分別是:initTransactions()、beginTransaction()、sendOffsetsToTransaction()、commitTransaction()、abortTransaction(),代碼定義在org.apache.kafka.clients.producer.Producer<K,V>接口中,具體定義接口如下:

    // 初始化事務,需要注意確保transation.id屬性被分配
    void initTransactions();
    
    // 開啟事務
    void beginTransaction() throws ProducerFencedException;
    
    // 為Consumer提供的在事務內Commit Offsets的操作
    void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                                  String consumerGroupId) throws ProducerFencedException;
    
    // 提交事務
    void commitTransaction() throws ProducerFencedException;
    
    // 放棄事務,類似於回滾事務的操作
    void abortTransaction() throws ProducerFencedException;

    3.3 事務的實際應用場景有哪些?

    在Kafka事務中,一個原子性操作,根據操作類型可以分為3種情況。情況如下:

    • 只有Producer生產消息,這種場景需要事務的介入;
    • 消費消息和生產消息並存,比如Consumer&Producer模式,這種場景是一般Kafka項目中比較常見的模式,需要事務介入;
    • 只有Consumer消費消息,這種操作在實際項目中意義不大,和手動Commit Offsets的結果一樣,而且這種場景不是事務的引入目的。

    4.總結

    Kafka的冪等性和事務是比較重要的特性,特別是在數據丟失和數據重複的問題上非常重要。Kafka引入冪等性,設計的原理也比較好理解。而事務與數據庫的事務特性類似,有數據庫使用的經驗對理解Kafka的事務也比較容易接受。

    5.結束語

    這篇博客就和大家分享到這裏,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!

    另外,博主出書了《》和《》,喜歡的朋友或同學, 可以在公告欄那裡點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。關注下面公眾號,根據提示,可免費獲取書籍的教學視頻。 

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

  • 【原創】(十一)Linux內存管理slub分配器

    【原創】(十一)Linux內存管理slub分配器

    背景

    • Read the fucking source code! –By 魯迅
    • A picture is worth a thousand words. –By 高爾基

    說明:

    1. Kernel版本:4.14
    2. ARM64處理器,Contex-A53,雙核
    3. 使用工具:Source Insight 3.5, Visio

    1. 概述

    之前的文章分析的都是基於頁面的內存分配,而小塊內存的分配和管理是通過塊分配器來實現的。目前內核中,有三種方式來實現小塊內存分配:slab, slub, slob,最先有slab分配器,slub/slob分配器是改進版,slob分配器適用於小內存嵌入式設備,而slub分配器目前已逐漸成為主流塊分配器。接下來的文章,就是以slub分配器為目標,進一步深入。

    先來一個初印象:

    2. 數據結構

    有四個關鍵的數據結構:

    • struct kmem_cache:用於管理SLAB緩存,包括該緩存中對象的信息描述,per-CPU/Node管理slab頁面等;
      關鍵字段如下:
    /*
     * Slab cache management.
     */
    struct kmem_cache {
        struct kmem_cache_cpu __percpu *cpu_slab;       //每個CPU slab頁面
        /* Used for retriving partial slabs etc */
        unsigned long flags;
        unsigned long min_partial;
        int size;       /* The size of an object including meta data */
        int object_size;    /* The size of an object without meta data */
        int offset;     /* Free pointer offset. */
    #ifdef CONFIG_SLUB_CPU_PARTIAL
        /* Number of per cpu partial objects to keep around */
        unsigned int cpu_partial;
    #endif
        struct kmem_cache_order_objects oo;     //該結構體會描述申請頁面的order值,以及object的個數
    
        /* Allocation and freeing of slabs */
        struct kmem_cache_order_objects max;
        struct kmem_cache_order_objects min;
        gfp_t allocflags;   /* gfp flags to use on each alloc */
        int refcount;       /* Refcount for slab cache destroy */
        void (*ctor)(void *);           // 對象構造函數
        int inuse;      /* Offset to metadata */
        int align;      /* Alignment */
        int reserved;       /* Reserved bytes at the end of slabs */
        int red_left_pad;   /* Left redzone padding size */
        const char *name;   /* Name (only for display!) */
        struct list_head list;  /* List of slab caches */       //kmem_cache最終會鏈接在一個全局鏈表中
        struct kmem_cache_node *node[MAX_NUMNODES];     //Node管理slab頁面
    };
    • struct kmem_cache_cpu:用於管理每個CPU的slab頁面,可以使用無鎖訪問,提高緩存對象分配速度;
    struct kmem_cache_cpu {
        void **freelist;    /* Pointer to next available object */                  //指向空閑對象的指針
        unsigned long tid;  /* Globally unique transaction id */                
        struct page *page;  /* The slab from which we are allocating */     //slab緩存頁面
    #ifdef CONFIG_SLUB_CPU_PARTIAL
        struct page *partial;   /* Partially allocated frozen slabs */
    #endif
    #ifdef CONFIG_SLUB_STATS
        unsigned stat[NR_SLUB_STAT_ITEMS];
    #endif
    };
    • struct kmem_cache_node:用於管理每個Node的slab頁面,由於每個Node的訪問速度不一致,slab頁面由Node來管理;
    /*
     * The slab lists for all objects.
     */
    struct kmem_cache_node {
        spinlock_t list_lock;
    
    #ifdef CONFIG_SLUB
        unsigned long nr_partial;    //slab頁表數量
        struct list_head partial;       //slab頁面鏈表
    #ifdef CONFIG_SLUB_DEBUG
        atomic_long_t nr_slabs;
        atomic_long_t total_objects;
        struct list_head full;
    #endif
    #endif
    };
    • struct page:用於描述slab頁面struct page結構體中很多字段都是通過union聯合體進行復用的。
      struct page結構中,用於slub的成員如下:
    struct page {
        union {
           ...
            void *s_mem;            /* slab first object */
           ...
        };
        
        /* Second double word */
        union {
           ...
            void *freelist;     /* sl[aou]b first free object */
           ...
        };
        
        union {
           ...
            struct {
                union {
                  ...
                    struct {            /* SLUB */
                        unsigned inuse:16;
                        unsigned objects:15;
                        unsigned frozen:1;
                    };
                    ...
                };
           ...
            };       
        };   
        
        /*
         * Third double word block
         */
        union {
           ...
            struct {        /* slub per cpu partial pages */
                struct page *next;  /* Next partial slab */
    #ifdef CONFIG_64BIT
                int pages;  /* Nr of partial slabs left */
                int pobjects;   /* Approximate # of objects */
    #else
                short int pages;
                short int pobjects;
    #endif
            };
    
            struct rcu_head rcu_head;   /* Used by SLAB
                             * when destroying via RCU
                             */
        };
        ...
            struct kmem_cache *slab_cache;  /* SL[AU]B: Pointer to slab */    
        ...
    }

    圖來了:

    3. 流程分析

    針對Slub的使用,可以從三個維度來分析:

    1. slub緩存創建
    2. slub對象分配
    3. slub對象釋放

    下邊將進一步分析。

    3.1 kmem_cache_create

    在內核中通過kmem_cache_create接口來創建一個slab緩存

    先看一下這個接口的函數調用關係圖:

    1. kmem_cache_create完成的功能比較簡單,就是創建一個用於管理slab緩存kmem_cache結構,並對該結構體進行初始化,最終添加到全局鏈表中。kmem_cache結構體初始化,包括了上文中分析到的kmem_cache_cpukmem_cache_node兩個字段結構。

    2. 在創建的過程中,當發現已有的slab緩存中,有存在對象大小相近,且具有兼容標誌的slab緩存,那就只需要進行merge操作並返回,而無需進一步創建新的slab緩存

    3. calculate_sizes函數會根據指定的force_order或根據對象大小去計算kmem_cache結構體中的size/min/oo等值,其中kmem_cache_order_objects結構體,是由頁面分配order值和對象數量兩者通過位域拼接起來的。

    4. 在創建slab緩存的時候,有一個先雞后蛋的問題:kmem_cache結構體來管理一個slab緩存,而創建kmem_cache結構體又是從slab緩存中分配出來的對象,那麼這個問題是怎麼解決的呢?可以看一下kmem_cache_init函數,內核中定義了兩個靜態的全局變量kmem_cachekmem_cache_node,在kmem_cache_init函數中完成了這兩個結構體的初始化之後,相當於就是創建了兩個slab緩存,一個用於分配kmem_cache結構體對象的緩存池,一個用於分配kmem_cache_node結構體對象的緩存池。由於kmem_cache_cpu結構體是通過__alloc_percpu來分配的,因此不需要創建一個相關的slab緩存

    3.2 kmem_cache_alloc

    kmem_cache_alloc接口用於從slab緩存池中分配對象。

    看一下大體的調用流程圖:

    從上圖中可以看出,分配slab對象與Buddy System中分配頁面類似,存在快速路徑和慢速路徑兩種,所謂的快速路徑就是per-CPU緩存,可以無鎖訪問,因而效率更高。

    整體的分配流程大體是這樣的:優先從per-CPU緩存中進行分配,如果per-CPU緩存中已經全部分配完畢,則從Node管理的slab頁面中遷移slab頁per-CPU緩存中,再重新分配。當Node管理的slab頁面也不足的情況下,則從Buddy System中分配新的頁面,添加到per-CPU緩存中。

    還是用圖來說明更清晰,分為以下幾步來分配:

    1. fastpath
      快速路徑下,以原子的方式檢索per-CPU緩存的freelist列表中的第一個對象,如果freelist為空並且沒有要檢索的對象,則跳入慢速路徑操作,最後再返回到快速路徑中重試操作。

    2. slowpath-1
      將per-CPU緩存中page指向的slab頁中的空閑對象遷移到freelist中,如果有空閑對象,則freeze該頁面,沒有空閑對象則跳轉到slowpath-2

    3. slowpath-2
      將per-CPU緩存中partial鏈表中的第一個slab頁遷移到page指針中,如果partial鏈表為空,則跳轉到slowpath-3

    4. slowpath-3
      將Node管理的partial鏈表中的slab頁遷移到per-CPU緩存中的page中,並重複第二個slab頁將其添加到per-CPU緩存中的partial鏈表中。如果遷移的slab中空閑對象超過了kmem_cache.cpu_partial的一半,則僅遷移slab頁,並且不再重複。
      如果每個Node的partial鏈表都為空,跳轉到slowpath-4

    5. slowpath-4
      Buddy System中獲取頁面,並將其添加到per-CPU的page中。

    3.2 kmem_cache_free

    kmem_cache_free的操作,可以看成是kmem_cache_alloc的逆過程,因此也分為快速路徑和慢速路徑兩種方式,同時,慢速路徑中又分為了好幾種情況,可以參考kmem_cache_alloc的過程。

    調用流程圖如下:

    效果如下:

    1. 快速路徑釋放
      快速路徑下,直接將對象返回到freelist中即可。

    2. put_cpu_partial
      put_cpu_partial函數主要是將一個剛freeze的slab頁,放入到partial鏈表中。
      put_cpu_partial函數中調用unfreeze_partials函數,這時候會將per-CPU管理的partial鏈表中的slab頁面添加到Node管理的partial鏈表的尾部。如果超出了Node的partial鏈表,溢出的slab頁面中沒有分配對象的slab頁面將會返回到夥伴系統。

    3. add_partial
      添加slab頁到Node的partial鏈表中。

    4. remove_partial
      從Node的partial鏈表移除slab頁。

    具體釋放的流程走哪個分支,跟對象的使用情況,partial鏈表的個數nr_partial/min_partial等相關,細節就不再深入分析了。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

  • 實現 Redis 協議解析器

    本文是 《用 Golang 實現一個 Redis》系列文章第二篇,本文將分別介紹 以及 的實現,若您對協議有所了解可以直接閱讀協議解析器部分。

    Redis 通信協議

    Redis 自 2.0 版本起使用了統一的協議 RESP (REdis Serialization Protocol),該協議易於實現,計算機可以高效的進行解析且易於被人類讀懂。

    RESP 是一個二進制安全的文本協議,工作於 TCP 協議上。客戶端和服務器發送的命令或數據一律以 \r\n (CRLF)結尾。

    RESP 定義了5種格式:

    • 簡單字符串(Simple String): 服務器用來返回簡單的結果,比如”OK”。非二進制安全,且不允許換行。
    • 錯誤信息(Error): 服務器用來返回簡單的結果,比如”ERR Invalid Synatx”。非二進制安全,且不允許換行。
    • 整數(Integer): llenscard等命令的返回值, 64位有符號整數
    • 字符串(Bulk String): 二進制安全字符串, get 等命令的返回值
    • 數組(Array, 舊版文檔中稱 Multi Bulk Strings): Bulk String 數組,客戶端發送指令以及lrange等命令響應的格式

    RESP 通過第一個字符來表示格式:

    • 簡單字符串:以”+” 開始, 如:”+OK\r\n”
    • 錯誤:以”-” 開始,如:”-ERR Invalid Synatx\r\n”
    • 整數:以”:”開始,如:”:1\r\n”
    • 字符串:以 $ 開始
    • 數組:以 * 開始

    Bulk String有兩行,第一行為 $+正文長度,第二行為實際內容。如:

    $3\r\nSET\r\n

    Bulk String 是二進制安全的可以包含任意字節,就是說可以在 Bulk String 內部包含 “\r\n” 字符(行尾的CRLF被隱藏):

    $4
    a\r\nb

    $-1 表示 nil, 比如使用 get 命令查詢一個不存在的key時,響應即為$-1

    Array 格式第一行為 “*”+數組長度,其後是相應數量的 Bulk String。如, ["foo", "bar"]的報文:

    *2
    $3
    foo
    $3
    bar

    客戶端也使用 Array 格式向服務端發送指令。命令本身將作為第一個參數,如 SET key value指令的RESP報文:

    *3
    $3
    SET
    $3
    key
    $5
    value

    將換行符打印出來:

    *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n

    協議解析器

    我們在 一文中已經介紹過TCP服務器的實現,協議解析器將實現其 Handler 接口充當應用層服務器。

    協議解析器將接收 Socket 傳來的數據,並將其數據還原為 [][]byte 格式,如 "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\value\r\n" 將被還原為 ['SET', 'key', 'value']

    本文完整代碼:

    來自客戶端的請求均為數組格式,它在第一行中標記報文的總行數並使用CRLF作為分行符。

    bufio 標準庫可以將從 reader 讀到的數據緩存到 buffer 中,直至遇到分隔符或讀取完畢后返回,所以我們使用 reader.ReadBytes('\n') 來保證每次讀取到完整的一行。

    需要注意的是RESP是二進制安全的協議,它允許在正文中使用CRLF字符。舉例來說 Redis 可以正確接收並執行SET "a\r\nb" 1指令, 這條指令的正確報文是這樣的:

    *3  
    $3
    SET
    $4
    a\r\nb 
    $7
    myvalue

    ReadBytes 讀取到第五行 “a\r\nb\r\n”時會將其誤認為兩行:

    *3  
    $3
    SET
    $4
    a  // 錯誤的分行
    b // 錯誤的分行
    $7
    myvalue

    因此當讀取到第四行$4后, 不應該繼續使用 ReadBytes('\n') 讀取下一行, 應使用 io.ReadFull(reader, msg) 方法來讀取指定長度的內容。

    msg = make([]byte, 4 + 2) // 正文長度4 + 換行符長度2
    _, err = io.ReadFull(reader, msg)

    定義 Client 結構體作為客戶端抽象:

    type Client struct {
        /* 與客戶端的 Tcp 連接 */
        conn   net.Conn
    
        /* 
         * 帶有 timeout 功能的 WaitGroup, 用於優雅關閉
         * 當響應被完整發送前保持 waiting 狀態, 阻止鏈接被關閉
         */
        waitingReply wait.Wait
    
        /* 標記客戶端是否正在發送指令 */ 
        sending atomic.AtomicBool
        
        /* 客戶端正在發送的參數數量, 即 Array 第一行指定的數組長度 */
        expectedArgsCount uint32
        
        /* 已經接收的參數數量, 即 len(args)*/ 
        receivedCount uint32
        
        /*
         * 已經接收到的命令參數,每個參數由一個 []byte 表示
         */
        args [][]byte
    }

    定義解析器:

    type Handler struct {
    
        /* 
         * 記錄活躍的客戶端鏈接 
         * 類型為 *Client -> placeholder 
         */
        activeConn sync.Map 
    
        /* 數據庫引擎,執行指令並返回結果 */
        db db.DB
    
        /* 關閉狀態標誌位,關閉過程中時拒絕新建連接和新請求 */
        closing atomic.AtomicBool 
    }

    接下來可以編寫主要部分了:

    func (h *Handler)Handle(ctx context.Context, conn net.Conn) {
        if h.closing.Get() {
            // 關閉過程中不接受新連接
            _ = conn.Close()
        }
    
        /* 初始化客戶端狀態 */
        client := &Client {
            conn:   conn,
        }
        h.activeConn.Store(client, 1)
    
        reader := bufio.NewReader(conn)
        var fixedLen int64 = 0 // 將要讀取的 BulkString 的正文長度
        var err error
        var msg []byte
        for {
            /* 讀取下一行數據 */ 
            if fixedLen == 0 { // 正常模式下使用 CRLF 區分數據行
                msg, err = reader.ReadBytes('\n')
                // 判斷是否以 \r\n 結尾
                if len(msg) == 0 || msg[len(msg) - 2] != '\r' {
                    errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
                    _, _ =  client.conn.Write(errReply.ToBytes())
                }
            } else { // 當讀取到 BulkString 第二行時,根據給出的長度進行讀取
                msg = make([]byte, fixedLen + 2)
                _, err = io.ReadFull(reader, msg)
                // 判斷是否以 \r\n 結尾
                if len(msg) == 0 || 
                  msg[len(msg) - 2] != '\r' ||  
                  msg[len(msg) - 1] != '\n'{
                    errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
                    _, _ =  client.conn.Write(errReply.ToBytes())
                }
                // Bulk String 讀取完畢,重新使用正常模式
                fixedLen = 0 
            }
            // 處理 IO 異常
            if err != nil {
                if err == io.EOF || err == io.ErrUnexpectedEOF {
                    logger.Info("connection close")
                } else {
                    logger.Warn(err)
                }
                _ = client.Close()
                h.activeConn.Delete(client)
                return // io error, disconnect with client
            }
    
            /* 解析收到的數據 */
            if !client.sending.Get() { 
                // sending == false 表明收到了一條新指令
                if msg[0] == '*' {
                    // 讀取第一行獲取參數個數
                    expectedLine, err := strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
                    if err != nil {
                        _, _ = client.conn.Write(UnknownErrReplyBytes)
                        continue
                    }
                    // 初始化客戶端狀態
                    client.waitingReply.Add(1) // 有指令未處理完成,阻止服務器關閉
                    client.sending.Set(true) // 正在接收指令中
                    // 初始化計數器和緩衝區 
                    client.expectedArgsCount = uint32(expectedLine) 
                    client.receivedCount = 0
                    client.args = make([][]byte, expectedLine)
                } else {
                    // TODO: text protocol
                }
            } else {
                // 收到了指令的剩餘部分(非首行)
                line := msg[0:len(msg)-2] // 移除換行符
                if line[0] == '$' { 
                    // BulkString 的首行,讀取String長度
                    fixedLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
                    if err != nil {
                        errReply := &reply.ProtocolErrReply{Msg:err.Error()}
                        _, _ = client.conn.Write(errReply.ToBytes())
                    }
                    if fixedLen <= 0 {
                        errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
                        _, _ = client.conn.Write(errReply.ToBytes())
                    }
                } else { 
                    // 收到參數
                    client.args[client.receivedCount] = line
                    client.receivedCount++
                }
    
    
                // 一條命令發送完畢
                if client.receivedCount == client.expectedArgsCount {
                    client.sending.Set(false)
    
                    // 執行命令並響應
                    result := h.db.Exec(client.args)
                    if result != nil {
                        _, _ = conn.Write(result.ToBytes())
                    } else {
                        _, _ = conn.Write(UnknownErrReplyBytes)
                    }
    
                    // 重置客戶端狀態,等待下一條指令
                    client.expectedArgsCount = 0
                    client.receivedCount = 0
                    client.args = nil
                    client.waitingReply.Done()
                }
            }
        }
    }

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

    ※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

  • MySQL 複製表結構和表數據

    1、前言

      在功能開發完畢,在本地或者測試環境進行測試時,經常會遇到這種情況:有專門的測試數據,測試過程會涉及到修改表中的數據,經常不能一次測試成功,所以,每次執行測試后,原來表中的數據其實已經被修改了,下一次測試,就需要將數據恢復。

      我一般的做法是:先創建一個副本表,比如測試使用的user表,我在測試前創建副本表user_bak,每次測試后,將user表清空,然後將副本表user_bak的數據導入到user表中。

      上面的操作是對一個table做備份,如果涉及到的table太多,可以創建database的副本。

      接下來我將對此處的表結構複製以及表數據複製進行闡述,並非數據庫的複製原理!!!!

      下面是staff表的表結構

    create table staff (
    	id int not null auto_increment comment '自增id',
    	name char(20) not null comment '用戶姓名',
    	dep char(20) not null comment '所屬部門',
    	gender tinyint not null default 1 comment '性別:1男; 2女',
    	addr char(30) not null comment '地址',
    	primary key(id),
    	index idx_1 (name, dep),
    	index idx_2 (name, gender)
    ) engine=innodb default charset=utf8mb4 comment '員工表';
    

     

    2、具體方式 

      2.1、執行舊錶的創建SQL來創建表

      如果原始表已經存在,那麼可以使用命令查看該表的創建語句:

    mysql> show create table staff\G
    *************************** 1. row ***************************
           Table: staff
    Create Table: CREATE TABLE `staff` (
      `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
      `name` char(20) NOT NULL COMMENT '用戶姓名',
      `dep` char(20) NOT NULL COMMENT '所屬部門',
      `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
      `addr` char(30) NOT NULL,
      PRIMARY KEY (`id`),
      KEY `idx_1` (`name`,`dep`),
      KEY `idx_2` (`name`,`gender`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='員工表'
    1 row in set (0.01 sec)
    

      可以看到,上面show creat table xx的命令執行結果中,Create Table的值就是創建表的語句,此時可以直接複製創建表的SQL,然後重新執行一次就行了。

      當數據表中有數據的時候,看到的創建staff表的sql就會稍有不同。比如,我在staff中添加了兩條記錄:

    mysql> insert into staff values (null, '李明', 'RD', 1, '北京');
    Query OK, 1 row affected (0.00 sec)
    
    mysql> insert into staff values (null, '張三', 'PM', 0, '上海');
    Query OK, 1 row affected (0.00 sec)
    
    mysql> select * from staff;
    +----+--------+-----+--------+--------+
    | id | name   | dep | gender | addr   |
    +----+--------+-----+--------+--------+
    |  1 | 李明   | RD  |      1 | 北京   |
    |  2 | 張三   | PM  |      0 | 上海   |
    +----+--------+-----+--------+--------+
    2 rows in set (0.00 sec)
    

      此時在執行show create table命令:

    mysql> show create table staff\G
    *************************** 1. row ***************************
           Table: staff
    Create Table: CREATE TABLE `staff` (
      `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
      `name` char(20) NOT NULL COMMENT '用戶姓名',
      `dep` char(20) NOT NULL COMMENT '所屬部門',
      `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
      `addr` char(30) NOT NULL,
      PRIMARY KEY (`id`),
      KEY `idx_1` (`name`,`dep`),
      KEY `idx_2` (`name`,`gender`)
    ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COMMENT='員工表'
    1 row in set (0.00 sec)
    

      注意,上面結果中的倒數第二行

        ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COMMENT=’員工表’

      因為staff表的id是自增的,且已經有了2條記錄,所以下一次插入數據的自增id應該為3,這個信息,也會出現在表的創建sql中。

        

      2.2、使用like創建新表(僅包含表結構)

      使用like根據已有的表來創建新表,特點如下:

      1、方便,不需要查看原表的表結構定義信息;

      2、創建的新表中,表結構定義、完整性約束,都與原表保持一致。

      3、創建的新表是一個空表,全新的表,沒有數據。

      用法如下:

    mysql> select * from staff;  #舊錶中已有2條數據
    +----+--------+-----+--------+--------+
    | id | name   | dep | gender | addr   |
    +----+--------+-----+--------+--------+
    |  1 | 李明   | RD  |      1 | 北京   |
    |  2 | 張三   | PM  |      0 | 上海   |
    +----+--------+-----+--------+--------+
    2 rows in set (0.00 sec)
    
    mysql> create table staff_bak_1 like staff;  # 直接使用like,前面指定新表名,後面指定舊錶(參考的表)
    Query OK, 0 rows affected (0.02 sec)
    
    mysql> show create table staff_bak_1\G
    *************************** 1. row ***************************
           Table: staff_bak_1
    Create Table: CREATE TABLE `staff_bak_1` (
      `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
      `name` char(20) NOT NULL COMMENT '用戶姓名',
      `dep` char(20) NOT NULL COMMENT '所屬部門',
      `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
      `addr` char(30) NOT NULL,
      PRIMARY KEY (`id`),
      KEY `idx_1` (`name`,`dep`),
      KEY `idx_2` (`name`,`gender`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='員工表'  # 注意沒有AUTO_INCREMENT=3
    1 row in set (0.00 sec)
    
    mysql> select * from staff_bak_1; # 沒有包含舊錶的數據
    Empty set (0.00 sec)
    

      

      2.3、使用as來創建新表(包含數據)

      使用as來創建新表,有一下特點:

      1、可以有選擇性的決定新表包含哪些字段;

      2、創建的新表中,會包含舊錶的數據;

      3、創建的新表不會包含舊錶的完整性約束(比如主鍵、索引等),僅包含最基礎的表結構定義。

      用法如下:

    mysql> create table staff_bak_2 as select * from staff;
    Query OK, 2 rows affected (0.02 sec)
    Records: 2  Duplicates: 0  Warnings: 0
    
    mysql> select * from staff_bak_2;
    +----+--------+-----+--------+--------+
    | id | name   | dep | gender | addr   |
    +----+--------+-----+--------+--------+
    |  1 | 李明   | RD  |      1 | 北京   |
    |  2 | 張三   | PM  |      0 | 上海   |
    +----+--------+-----+--------+--------+
    2 rows in set (0.00 sec)
    
    mysql> show create table staff_bak_2\G
    *************************** 1. row ***************************
           Table: staff_bak_2
    Create Table: CREATE TABLE `staff_bak_2` (
      `id` int(11) NOT NULL DEFAULT '0' COMMENT '自增id',
      `name` char(20) CHARACTER SET utf8mb4 NOT NULL COMMENT '用戶姓名',
      `dep` char(20) CHARACTER SET utf8mb4 NOT NULL COMMENT '所屬部門',
      `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
      `addr` char(30) CHARACTER SET utf8mb4 NOT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=latin1
    1 row in set (0.00 sec)
    

      

      利用as創建表的時候沒有保留完整性約束,其實這個仔細想一下也能想明白。因為使用as創建表的時候,可以指定新表包含哪些字段呀,如果你創建新表時,忽略了幾個字段,這樣的話即使保留了完整約束,保存數據是也不能滿足完整性約束。

      比如,staff表有一個索引idx1,由name和dep字段組成;但是我創建的新表中,沒有name和dep字段(只選擇了其他字段),那麼新表中保存idx1也沒有必要,對吧。

    mysql> --  只選擇id、gender、addr作為新表的字段,那麼name和dep組成的索引就沒必要存在了
    mysql> create table staff_bak_3 as (select id, gender, addr from staff);
    Query OK, 2 rows affected (0.02 sec)
    Records: 2  Duplicates: 0  Warnings: 0
    
    mysql> show create table staff_bak_3\G
    *************************** 1. row ***************************
           Table: staff_bak_3
    Create Table: CREATE TABLE `staff_bak_3` (
      `id` int(11) NOT NULL DEFAULT '0' COMMENT '自增id',
      `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
      `addr` char(30) CHARACTER SET utf8mb4 NOT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=latin1
    1 row in set (0.00 sec)
    
    mysql> select * from staff_bak_3;
    +----+--------+--------+
    | id | gender | addr   |
    +----+--------+--------+
    |  1 |      1 | 北京   |
    |  2 |      0 | 上海   |
    +----+--------+--------+
    2 rows in set (0.00 sec)
    

      

      2.4、使用like+insert+select創建原表的副本(推薦)

      使用like創建新表,雖然保留了舊錶的各種表結構定義以及完整性約束,但是如何將舊錶的數據導入到新表中呢?

      最極端的方式:寫一個程序,先將舊錶數據讀出來,然後寫入到新表中,這個方式我就不嘗試了。

      有一個比較簡單的命令:

    mysql> select * from staff; #原表數據
    +----+--------+-----+--------+--------+
    | id | name   | dep | gender | addr   |
    +----+--------+-----+--------+--------+
    |  1 | 李明   | RD  |      1 | 北京   |
    |  2 | 張三   | PM  |      0 | 上海   |
    +----+--------+-----+--------+--------+
    2 rows in set (0.00 sec)
    
    mysql> select * from staff_bak_1; # 使用like創建的表,與原表相同的表結構和完整性約束(自增除外)
    Empty set (0.00 sec)
    
    mysql> insert into staff_bak_1 select * from staff;  # 將staff表的所有記錄的所有字段值都插入副本表中
    Query OK, 2 rows affected (0.00 sec)
    Records: 2  Duplicates: 0  Warnings: 0
    
    mysql> select * from staff_bak_1;
    +----+--------+-----+--------+--------+
    | id | name   | dep | gender | addr   |
    +----+--------+-----+--------+--------+
    |  1 | 李明   | RD  |      1 | 北京   |
    |  2 | 張三   | PM  |      0 | 上海   |
    +----+--------+-----+--------+--------+
    2 rows in set (0.00 sec)
    

      

      其實這條SQL語句,是知道兩個表的表結構和完整性約束相同,所以,可以直接select *。

    insert into staff_bak_1 select * from staff;
    

      

      如果兩個表結構不相同,其實也是可以這個方式的,比如:

    mysql> show create table demo\G
    *************************** 1. row ***************************
           Table: demo
    Create Table: CREATE TABLE `demo` (
      `_id` int(11) NOT NULL AUTO_INCREMENT,
      `_name` char(20) DEFAULT NULL,
      `_gender` tinyint(4) DEFAULT '1',
      PRIMARY KEY (`_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
    1 row in set (0.00 sec)
    
    # 只將staff表中的id和name字段組成的數據記錄插入到demo表中,對應_id和_name字段
    mysql> insert into demo (_id, _name) select id,name from staff;
    Query OK, 2 rows affected (0.00 sec)
    Records: 2  Duplicates: 0  Warnings: 0
    
    mysql> select * from demo;
    +-----+--------+---------+
    | _id | _name  | _gender |
    +-----+--------+---------+
    |   1 | 李明   |       1 |
    |   2 | 張三   |       1 |
    +-----+--------+---------+
    2 rows in set (0.00 sec)
    

      這是兩個表的字段數量不相同的情況,此時需要手動指定列名,否則就會報錯

     

      另外,如果兩個表的字段數量,以及相同順序的字段類型相同,如果是全部字段複製,即使字段名不同,也可以直接複製

    # staff_bak_5的字段名與staff表並不相同,但是字段數量、相同順序字段的類型相同,所以可以直接插入
    mysql> show create table staff_bak_5\G
    *************************** 1. row ***************************
           Table: staff_bak_5
    Create Table: CREATE TABLE `staff_bak_5` (
      `_id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
      `_name` char(20) NOT NULL COMMENT '用戶姓名',
      `_dep` char(20) NOT NULL COMMENT '所屬部門',
      `_gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
      `_addr` char(30) NOT NULL,
      PRIMARY KEY (`_id`),
      KEY `idx_1` (`_name`,`_dep`),
      KEY `idx_2` (`_name`,`_gender`)
    ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COMMENT='員工表'
    1 row in set (0.00 sec)
    
    mysql> insert into staff_bak_5 select * from staff;
    Query OK, 2 rows affected (0.00 sec)
    Records: 2  Duplicates: 0  Warnings: 0
    
    mysql> select * from staff_bak_5;
    +-----+--------+------+---------+--------+
    | _id | _name  | _dep | _gender | _addr  |
    +-----+--------+------+---------+--------+
    |   1 | 李明   | RD   |       1 | 北京   |
    |   2 | 張三   | PM   |       0 | 上海   |
    +-----+--------+------+---------+--------+
    2 rows in set (0.00 sec)
    

      

      

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

  • 【目標檢測實戰】目標檢測實戰之一–手把手教你LMDB格式數據集製作!

    【目標檢測實戰】目標檢測實戰之一–手把手教你LMDB格式數據集製作!

    文章目錄

    1 目標檢測簡介
    2 lmdb數據製作
        2.1 VOC數據製作
        2.2 lmdb文件生成

    lmdb格式的數據是在使用caffe進行目標檢測或分類時,使用的一種數據格式。這裏我主要以目標檢測為例講解lmdb格式數據的製作。

    1 目標檢測簡介

    【1】目標檢測主要有兩個任務:

    1. 判斷圖像中對象的類別
    2. 類別的位置

    【2】目標檢測需要的數據:

    1. 訓練所需的圖像數據,可以是jpg、png等圖片格式
    2. 圖像數據對應的類別信息和類別框的位置信息。

    2 lmdb數據製作

    caffe一般使用lmdb格式的數據,在製作數據之前,我們需要對數據進行標註,可以使用labelImg對圖像進行標註(),這裏就不多贅述數據標註的問題。總之,我們得到了圖像的標註Annotations數據。lmdb數據製作,首先需要將annotations數據和圖像數據製作為VOC格式,然後將其生成LMDB文件即可。下邊是詳細的步驟:

    2.1 VOC數據製作

    這裏我以caffe環境的Mobilenet+YOLOv3模型的代碼為例(),進行lmdb數據製作,並且也假設你已經對其配置編譯成功(如沒成功,可以參考博文進行配置),所以我們的根目錄為:caffe-Mobilenet-YOLO-master,下邊為詳細步驟:

    【1】VOC格式目錄建立:

    VOC格式目錄主要包含為:

    其中,Annotations里存儲的是xml標註信息,JPEGImages存儲的是圖片,ImageSets則是訓練和測試的txt列表等信息,下邊我們就要安裝如上的目錄進行建立我們自己的數據目錄。

    創建Annotations、JPEGImages、ImageSets/Main等文件,命令如下(也可直接界面操作哈):

    注:建議新手按照我的名稱,對於後續文件修改容易!!!

    cd ~/   # 進入home目錄
    cd Documents/  # 進入Documents目錄
    cd caffe-Mobilenet-YOLO-master/  # 進入我們的根目錄
    cd data         # 進入data目錄內
    mkdir VOCdevkit   # 創建存儲我們自己的數據的文件夾
    cd VOCdevkit
    mkdir MyDataSet  # 創建存儲voc的目錄
    cd MyDataSet   
    # 創建VOC格式目錄
    mkdir Annotations
    mkdir JPEGImages
    mkdir ImageSets
    cd ImageSets
    mkdir Main

    好啦,我們的文件夾就建立好了,如下圖所示:

    【2】將所有xml文件考入至Annotations文件夾內
    【3】將所有圖片考入至JPEGImages文件夾內
    【4】劃分訓練接、驗證集合測試集,如下為Python代碼,需要修改的地方註釋已標明:

    import os  
    import random 
    # 標註文件的路徑,需要你自己修改
    xmlfilepath=r'/home/Documents/caffe-Mobilenet-YOLO-master/data/VOCdevkit/MyDataSet/Annotations/'      
    # 這裡是存儲數據的本目錄,需要改為你自己的目錄              
    saveBasePath=r"/home/Documents/caffe-Mobilenet-YOLO-master/data/VOCdevkit/"                        
    trainval_percent=0.8            # 表示訓練集和驗證集所佔比例,你需要自己修改,也可選擇不修改
    train_percent=0.8               # 表示訓練集所佔訓練集驗證集的比例,你需要自己修改,也可選擇不修改       
    total_xml = os.listdir(xmlfilepath)
    num=len(total_xml)    
    list=range(num)    
    tv=int(num*trainval_percent)    
    tr=int(tv*train_percent)    
    trainval= random.sample(list,tv)    
    train=random.sample(trainval,tr)    
      
    print("train and val size",tv)  
    print("traub suze",tr)  
    ftrainval = open(os.path.join(saveBasePath,'MyDataSet/ImageSets/Main/trainval.txt'), 'w')    
    ftest = open(os.path.join(saveBasePath,'MyDataSet/ImageSets/Main/test.txt'), 'w')    
    ftrain = open(os.path.join(saveBasePath,'MyDataSet/ImageSets/Main/train.txt'), 'w')    
    fval = open(os.path.join(saveBasePath,'MyDataSet/ImageSets/Main/val.txt'), 'w')    
      
    for i  in list:    
        name=total_xml[i][:-4]+'\n'    
        if i in trainval:    
            ftrainval.write(name)    
            if i in train:    
                ftrain.write(name)    
            else:    
                fval.write(name)    
        else:    
            ftest.write(name)    
        
    ftrainval.close()    
    ftrain.close()    
    fval.close()    
    ftest .close() 

    上述代碼修改之後,在根目錄caffe-Mobilenet-YOLO-master執行上述代碼即可,
    在data/VOCdevkit/MyDataSet/ImageSets下生成trainval.txt、test.txt、train.txt、val.txt等所需的txt文件,如下圖所示:

    這些TXT文件會包含圖片的名字,不帶路徑,如下圖所示:

    2.2 lmdb文件生成

    【1】執行如下命令,將生成lmdb所需的腳本複製至data/VOCdevkit/MyDataSet文件夾內:

    cp data/VOC0712/create_* data/MyDataSet/                # 把create_list.sh和create_data.sh複製到MyDataSet目錄                  
    cp data/VOC0712/labelmap_voc.prototxt data/MyDataSet/   # 把labelmap_voc.prototxt複製到MyDataSet目錄 

    【2】修改create_list.sh文件:

    1 第3行修改目錄路徑,截止到VOCdevkit即可

    2 第13行修改為for name in MyDataSet(VOCdevkit下自己建立的文件夾名字)

    3 第15-18行註釋掉

    4 第41行get_image_size修改為自己的路徑(注意,這裡是build caffe_mobilenet_yolo之後才會形成的):

    #!/bin/bash
    # 如果嚴格安裝我上述的步驟,就可以不用修改路徑位置。
    # 需要修改的位置也使用註釋進行了標註和解釋
    
    # 這裏需要更改,你數據的根目錄位置,需要修改的地方!!!!
    root_dir="/home/Documents/Caffe_Mobilenet_YOLO/data/VOCdevkit/"   
    sub_dir=ImageSets/Main
    bash_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
    for dataset in trainval test
    do
      dst_file=$bash_dir/$dataset.txt
      if [ -f $dst_file ]
      then
        rm -f $dst_file
      fi
      for name in MyDataSet  # 如果你建立的不是MyDataSet,這裏需要修改為你自己的名字
      do
        # 這裏需要修改,註釋掉即可
        #if [[ $dataset == "test" && $name == "VOC2012" ]]
        #then
        #  continue
        #fi
        echo "Create list for $name $dataset..."
        dataset_file=$root_dir/$name/$sub_dir/$dataset.txt
    
        img_file=$bash_dir/$dataset"_img.txt"
        cp $dataset_file $img_file
        sed -i "s/^/$name\/JPEGImages\//g" $img_file
        sed -i "s/$/.jpg/g" $img_file
    
        label_file=$bash_dir/$dataset"_label.txt"
        cp $dataset_file $label_file
        sed -i "s/^/$name\/Annotations\//g" $label_file
        sed -i "s/$/.xml/g" $label_file
    
        paste -d' ' $img_file $label_file >> $dst_file
    
        rm -f $label_file
        rm -f $img_file
      done
    
      # Generate image name and size infomation.
      if [ $dataset == "test" ]
      then
        home/Documents/Caffe_Mobilenet_YOLO/caffe-MobileNet-YOLO-master/build/tools/get_image_size $root_dir $dst_file $bash_dir/$dataset"_name_size.txt"

    【3】creat_data.sh修改:

    1 第2行修改為自己的路徑:root_dir=”/home/Documents/caffe-MobileNet-YOLO-master/”

    2 第7行修改為:data_root_dir=”/home/Documents/caffe-MobileNet-YOLO-master/data/VOVdevkit/

    3 第8行修改為:dataset_name=”MyDataSet”

    4 第9行修改為:mapfile=”\(root_dir/data/VOCdevkit/\)dataset_name/labelmap_voc.prototxt”

    5 第26行修改為\(root_dir/data/VOCdevkit/\)dataset_name/$subset.txt

    cur_dir=$(cd $( dirname ${BASH_SOURCE[0]} ) && pwd )
    # 修改為自己的路徑
    root_dir="/home/Documents/Caffe_Mobilenet_YOLO/caffe-MobileNet-YOLO-master/"
    
    cd $root_dir
    
    redo=1
    # 這裏需要修改為自己的路徑
    data_root_dir="/home/Documents/Caffe_Mobilenet_YOLO/caffe-MobileNet-YOLO-master/data/VOCdevkit/"
    dataset_name="MyDataSet"  # 修改為自己的名字
    mapfile="$root_dir/data/VOCdevkit/$dataset_name/labelmap_voc.prototxt"  # 修改為自己的路徑
    anno_type="detection"
    db="lmdb"
    min_dim=0
    max_dim=0
    width=0
    height=0
    
    extra_cmd="--encode-type=jpg --encoded"
    if [ $redo ]
    then
      extra_cmd="$extra_cmd --redo"
    fi
    for subset in test trainval
    # subset.txt路徑需要修改
    do
      python $root_dir/scripts/create_annoset.py --anno-type=$anno_type \
      --label-map-file=$mapfile --min-dim=$min_dim --max-dim=$max_dim --resize-width=$width \
      --resize-height=$height --check-label $extra_cmd $data_root_dir $root_dir/data/VOCdevkit/$dataset_name/$subset.txt \
      $data_root_dir/$dataset_name/$db/$dataset_name"_"$subset"_"$db examples/$dataset_name

    【3】修改labelmap_voc.prototxt文件:

    除了第一個背景標籤部分不要修改,其他改成自己的標籤就行,多的刪掉,少了添加進入就行

    【4】最後在caffe-MobileNet-YOLO-master/examples文件夾內新建一個MyDataSet文件夾(空的)

    【5】運行create_list.sh腳本: ./data/VOCdevkit/MyDataSet/create_list.sh,運行完后,會在自己建的VOCdevkit/MyDataSet/目錄內生成trainval.txt, test.txt, test_name_size.txt。

    【6】運行create_data.sh腳本: ./data/VOCdevkit/MyDataSet/create_data.sh

    運行此命令時,提示:bash:./data/VOCdevkit/MyDataSet/create_list.sh:Permission denied,沒有權限,需要執行如下命令賦予執行命令:

    chmod u+x data/VOCdevkit/MyDataSet/create_data.sh

    出現了錯誤:ValueError: need more than 2 values to unpack,

    需要將create_annoset.py中第88行的seg去掉,因為我們的Annotations只有兩個值,img_file和anno。

    運行完后,會在會在自己建的VOCdevkit/MyDataSet目錄內生成lmdb文件夾:

    lmdb對應訓練集和測試集的lmdb格式的文件夾:

    ***
    好啦,今天的教程就到這裏,如有疑問,可關注公眾號【計算機視覺聯盟】私信我或留言交流!!

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

    ※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

  • 電動車商機大、日電池材料商瘋增產

    電動車商機大、日電池材料商瘋增產

     

    日本鋰離子電池材料商W-Scope於9月4日發布新聞稿宣布,因來自電動車的需求急增,加上家電產品、電動工具需求持續強勁,故決議投下200億日圓擴增鋰離子電池關鍵材料分隔膜(separator)產能,計畫在南韓子公司W-SCOPE CHUNGJU PLANT CO., LTD.增設四條產線,其中第12-13號產線預計2019年下半年量產、第14-15號產線預計2020年上半年量產。

    W-Scope指出,除上述四條新產線之外,該公司之前已進行增產投資,興建第8-11號分隔膜產線,其中第8-9號產線預計2017年下半年量產、第10-11號預計2018年上半年量產,而待上述八條產線全數導入量產後,2020年末整體分隔膜產能將擴增至2016年末的3.6倍水準。

    在電動車市場持續擴大的背景下,日本其他鋰電池材料廠紛紛傳出增產消息。日刊工業新聞8月1日報導,Toray計畫在2020年結束前總計砸下約1,200億日圓擴增車用鋰離子電池關鍵材料分隔膜產能,目標將分隔膜年產能擴增至19.5億平方公尺、將達現行的約五倍。

    日刊工業新聞6月23日報導,因車廠加快電動車的研發腳步、帶動電池材料市場成長速度超乎預期,故旭化成(Asahi Kasei)計畫上修鋰離子電池關鍵材料分隔膜的增產計畫,目標在2020年結束前將分隔膜年產能最高擴增至15億平方公尺、將達現行的2.5倍,且將遠高於原先規劃的11億平方公尺目標,期望藉由積極投資、鞏固全球龍頭位置。預估追加擴產所需的投資額約300億日圓。

    鋰離子電池四大關鍵材料分別為正極材、負極材、分隔膜和電解液,而這些電池材料皆由日系廠商握有高市佔率,其中,在全球分隔膜市場上,旭化成為龍頭廠、Toray緊追在後。

    根據日本市調機構富士經濟(Fuji Keizai)預估,2020年全球分隔膜市場規模將增至3,000億日圓、將達2015年的2倍水準,而電動車、混合動力車等車用用途是推動分隔膜需求急增的最大功臣,預估2020年車用分隔膜佔整體市場比重將達約45%。

    富士經濟6月22日公布調查報告指出,預估2030年時電動車年銷售量將增至407萬台、超越混合動力車,且之後雙方的差距將持續擴大。富士經濟預估,在中國需求增加加持下,2035年電動車全球銷售量將擴大至630萬台、將達2016年的13.4倍。

    (本文內容由授權使用。首圖為鋰電池材料分隔膜,出處:MoneyDJ)

    本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整