標籤: 銷售文案

  • 【String註解驅動開發】面試官讓我說說:如何使用FactoryBean向Spring容器中註冊bean?

    寫在前面

    在前面的文章中,我們知道可以通過多種方式向Spring容器中註冊bean。可以使用@Configuration結合@Bean向Spring容器中註冊bean;可以按照條件向Spring容器中註冊bean;可以使用@Import向容器中快速導入bean對象;可以在@Import中使用ImportBeanDefinitionRegistrar向容器中註冊bean。

    項目工程源碼已經提交到GitHub:https://github.com/sunshinelyz/spring-annotation

    FactoryBean概述

    一般情況下,Spring通過反射機制利用bean的class屬性指定實現類來實例化bean 。在某些情況下,實例化bean過程比較複雜,如果按照傳統的方式,則需要在 標籤中提供大量的配置信息,配置方式的靈活性是受限的,這時採用編碼的方式可以得到一個更加簡單的方案。Spring為此提供了一個org.springframework.bean.factory.FactoryBean的工廠類接口,用戶可以通過實現該接口定製實例化bean的邏輯。

    FactoryBean接口對於Spring框架來說佔有重要的地位,Spring 自身就提供了70多個FactoryBean的實現。它們隱藏了實例化一些複雜bean的細節,給上層應用帶來了便利。從Spring 3.0 開始, FactoryBean開始支持泛型,即接口聲明改為FactoryBean 的形式:

    在Spring 5.2.6版本中,FactoryBean接口的定義如下所示。

    package org.springframework.beans.factory;
    import org.springframework.lang.Nullable;
    
    public interface FactoryBean<T> {
    
    	String OBJECT_TYPE_ATTRIBUTE = "factoryBeanObjectType";
    
    	@Nullable
    	T getObject() throws Exception;
    
    	@Nullable
    	Class<?> getObjectType();
    
    	default boolean isSingleton() {
    		return true;
    	}
    }
    
    • T getObject():返回由FactoryBean創建的bean實例,如果isSingleton()返回true,則該實例會放到Spring容器中單實例緩存池中。
    • boolean isSingleton():返回由FactoryBean創建的bean實例的作用域是singleton還是prototype。
    • Class getObjectType():返回FactoryBean創建的bean類型。

    這裏,需要注意的是:當配置文件中 標籤的class屬性配置的實現類是FactoryBean時,通過 getBean()方法返回的不是FactoryBean本身,而是FactoryBean#getObject()方法所返回的對象,相當於FactoryBean#getObject()代理了getBean()方法。

    FactoryBean實例

    首先,創建一個PersonFactoryBean,實現FactoryBean接口,如下所示。

    package io.mykit.spring.plugins.register.bean;
    
    import org.springframework.beans.factory.FactoryBean;
    /**
     * @author binghe
     * @version 1.0.0
     * @description 商品的FactoryBean,測試FactoryBean
     */
    public class PersonFactoryBean implements FactoryBean<Person> {
    
        //返回一個Person對象,這個對象會被註冊到Spring容器中
        @Override
        public Person getObject() throws Exception {
            return new Person();
        }
    
        @Override
        public Class<?> getObjectType() {
            return Person.class;
        }
    
        //bean是否為單例;true:是;false:否
        @Override
        public boolean isSingleton() {
            return true;
        }
    }
    

    接下來,我們在PersonConfig2類中加入PersonFactoryBean的聲明,如下所示。

    @Bean
    public PersonFactoryBean personFactoryBean(){
        return new PersonFactoryBean();
    }
    

    這裏需要小夥伴們注意的是:我在這裏使用@Bean註解向Spring容器中添加的是PersonFactory對象。那我們就來看看Spring容器中有哪些bean。接下來,運行SpringBeanTest類中的testAnnotationConfig7()方法,輸出的結果信息如下所示。

    org.springframework.context.annotation.internalConfigurationAnnotationProcessor
    org.springframework.context.annotation.internalAutowiredAnnotationProcessor
    org.springframework.context.annotation.internalCommonAnnotationProcessor
    org.springframework.context.event.internalEventListenerProcessor
    org.springframework.context.event.internalEventListenerFactory
    personConfig2
    io.mykit.spring.plugins.register.bean.Department
    io.mykit.spring.plugins.register.bean.Employee
    io.mykit.spring.plugins.register.bean.User
    io.mykit.spring.plugins.register.bean.Role
    person
    binghe001
    personFactoryBean
    company
    

    可以看到,結果信息中輸出了一個personFactoryBean,我們看下這個personFactoryBean到底是個什麼鬼!此時,我們對SpringBeanTest類中的testAnnotationConfig7()方法稍加改動,添加獲取personFactoryBean的代碼,並輸出personFactoryBean實例的類型,如下所示。

    @Test
    public void testAnnotationConfig7(){
        ApplicationContext context = new AnnotationConfigApplicationContext(PersonConfig2.class);
        String[] names = context.getBeanDefinitionNames();
        Arrays.stream(names).forEach(System.out::println);
    
        Object personFactoryBean = context.getBean("personFactoryBean");
        System.out.println("personFactoryBean實例的類型為:" + personFactoryBean.getClass());
    }
    

    再次運行SpringBeanTest類中的testAnnotationConfig7()方法,輸出的結果信息如下所示。

    org.springframework.context.annotation.internalConfigurationAnnotationProcessor
    org.springframework.context.annotation.internalAutowiredAnnotationProcessor
    org.springframework.context.annotation.internalCommonAnnotationProcessor
    org.springframework.context.event.internalEventListenerProcessor
    org.springframework.context.event.internalEventListenerFactory
    personConfig2
    io.mykit.spring.plugins.register.bean.Department
    io.mykit.spring.plugins.register.bean.Employee
    io.mykit.spring.plugins.register.bean.User
    io.mykit.spring.plugins.register.bean.Role
    person
    binghe001
    personFactoryBean
    company
    personFactoryBean實例的類型為:class io.mykit.spring.plugins.register.bean.Person
    

    可以看到,雖然我在代碼中使用@Bean註解注入的PersonFactoryBean對象,但是,實際上從Spring容器中獲取到的bean對象卻是調用PersonFactoryBean類中的getObject()獲取到的Person對象。

    看到這裏,是不是有種豁然開朗的感覺!!!

    在PersonFactoryBean類中,我們將Person對象設置為單實例bean,接下來,我們在SpringBeanTest類中的testAnnotationConfig7()方法多次獲取Person對象,並輸出多次獲取的對象是否為同一對象,如下所示。

    @Test
    public void testAnnotationConfig7(){
        ApplicationContext context = new AnnotationConfigApplicationContext(PersonConfig2.class);
        String[] names = context.getBeanDefinitionNames();
        Arrays.stream(names).forEach(System.out::println);
    
        Object personFactoryBean1 = context.getBean("personFactoryBean");
        Object personFactoryBean2 = context.getBean("personFactoryBean");
        System.out.println(personFactoryBean1 == personFactoryBean2);
    }
    

    運行testAnnotationConfig7()方法輸出的結果信息如下所示。

    org.springframework.context.annotation.internalConfigurationAnnotationProcessor
    org.springframework.context.annotation.internalAutowiredAnnotationProcessor
    org.springframework.context.annotation.internalCommonAnnotationProcessor
    org.springframework.context.event.internalEventListenerProcessor
    org.springframework.context.event.internalEventListenerFactory
    personConfig2
    io.mykit.spring.plugins.register.bean.Department
    io.mykit.spring.plugins.register.bean.Employee
    io.mykit.spring.plugins.register.bean.User
    io.mykit.spring.plugins.register.bean.Role
    person
    binghe001
    personFactoryBean
    company
    true
    

    可以看到,在PersonFactoryBean類的isSingleton()方法中返回true時,每次獲取到的Person對象都是同一個對象,說明Person對象是單實例bean。

    這裏,可能就會有小夥伴要問了,如果將Person對象修改成多實例bean呢?別急,這裏我們只需要在PersonFactoryBean類的isSingleton()方法中返回false,即可將Person對象設置為多實例bean,如下所示。

    //bean是否為單例;true:是;false:否
    @Override
    public boolean isSingleton() {
        return false;
    }
    

    再次運行SpringBeanTest類中的testAnnotationConfig7()方法,輸出的結果信息如下所示。

    org.springframework.context.annotation.internalConfigurationAnnotationProcessor
    org.springframework.context.annotation.internalAutowiredAnnotationProcessor
    org.springframework.context.annotation.internalCommonAnnotationProcessor
    org.springframework.context.event.internalEventListenerProcessor
    org.springframework.context.event.internalEventListenerFactory
    personConfig2
    io.mykit.spring.plugins.register.bean.Department
    io.mykit.spring.plugins.register.bean.Employee
    io.mykit.spring.plugins.register.bean.User
    io.mykit.spring.plugins.register.bean.Role
    person
    binghe001
    personFactoryBean
    company
    false
    

    可以看到,最終結果返回了false,說明此時Person對象是多實例bean。

    如何在Spring容器中獲取到FactoryBean對象?

    之前,我們使用@Bean註解向Spring容器中註冊的PersonFactoryBean,獲取出來的確實Person對象。那麼,小夥伴們可能會問:我就想獲取PersonFactoryBean實例,該怎麼辦呢?

    其實,這也很簡單, 只需要在獲取bean對象時,在id前面加上&符號即可

    打開我們的測試類SpringBeanTest,在testAnnotationConfig7()方法中添加獲取PersonFactoryBean實例的代碼,如下所示。

    @Test
    public void testAnnotationConfig7(){
        ApplicationContext context = new AnnotationConfigApplicationContext(PersonConfig2.class);
        String[] names = context.getBeanDefinitionNames();
        Arrays.stream(names).forEach(System.out::println);
    
        Object personFactoryBean1 = context.getBean("personFactoryBean");
        Object personFactoryBean2 = context.getBean("personFactoryBean");
        System.out.println("personFactoryBean1類型:" + personFactoryBean1.getClass());
        System.out.println("personFactoryBean2類型:" + personFactoryBean2.getClass());
        System.out.println(personFactoryBean1 == personFactoryBean2);
    
        Object personFactoryBean3 = context.getBean("&personFactoryBean");
        System.out.println("personFactoryBean3類型:" + personFactoryBean3.getClass());
    }
    

    運行SpringBeanTest類中的testAnnotationConfig7()方法,輸出的結果信息如下所示。

    org.springframework.context.annotation.internalConfigurationAnnotationProcessor
    org.springframework.context.annotation.internalAutowiredAnnotationProcessor
    org.springframework.context.annotation.internalCommonAnnotationProcessor
    org.springframework.context.event.internalEventListenerProcessor
    org.springframework.context.event.internalEventListenerFactory
    personConfig2
    io.mykit.spring.plugins.register.bean.Department
    io.mykit.spring.plugins.register.bean.Employee
    io.mykit.spring.plugins.register.bean.User
    io.mykit.spring.plugins.register.bean.Role
    person
    binghe001
    personFactoryBean
    company
    personFactoryBean1類型:class io.mykit.spring.plugins.register.bean.Person
    personFactoryBean2類型:class io.mykit.spring.plugins.register.bean.Person
    false
    personFactoryBean3類型:class io.mykit.spring.plugins.register.bean.PersonFactoryBean
    

    可以看到,在獲取bean時,在id前面加上&符號就會獲取到PersonFactoryBean實例對象。

    那問題又來了!!為什麼在id前面加上&符號就會獲取到PersonFactoryBean實例對象呢?

    接下來,我們就揭開這個神秘的面紗,打開BeanFactory接口,

    package org.springframework.beans.factory;
    import org.springframework.beans.BeansException;
    import org.springframework.core.ResolvableType;
    import org.springframework.lang.Nullable;
    
    public interface BeanFactory {
    	String FACTORY_BEAN_PREFIX = "&";
        /**************以下省略n行代碼***************/
    }
    

    看到這裏,是不是明白了呢?沒錯,在BeanFactory接口中定義了一個&前綴,只要我們使用bean的id來從Spring容器中獲取bean時,Spring就會知道我們是在獲取FactoryBean本身。

    好了,咱們今天就聊到這兒吧!別忘了給個在看和轉發,讓更多的人看到,一起學習一起進步!!

    項目工程源碼已經提交到GitHub:https://github.com/sunshinelyz/spring-annotation

    寫在最後

    如果覺得文章對你有點幫助,請微信搜索並關注「 冰河技術 」微信公眾號,跟冰河學習Spring註解驅動開發。公眾號回復“spring註解”關鍵字,領取Spring註解驅動開發核心知識圖,讓Spring註解驅動開發不再迷茫。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※別再煩惱如何寫文案,掌握八大原則!

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※超省錢租車方案

    ※教你寫出一流的銷售文案?

    網頁設計最專業,超強功能平台可客製化

    聚甘新

  • Netty中的這些知識點,你需要知道!

    Netty中的這些知識點,你需要知道!

    一、Channel

    Channel是一個接口,而且是一個很大的接口,我們稱之為“大而全”,囊括了server端及client端接口所需要的接口。

    Channel是一個門面,封裝了包括網絡I/O及相關的所有操作。

    Channel聚合了包括網絡讀寫、鏈路管理、網絡連接信息、獲取EventLoop、Pipeline等相關功能類;統一分配,調度實現相應場景的功能。

    一個Channel 對應一個物理連接,是基於物理連接上的操作包裝。

    二、EventLoop

    EventLoop,Event意為事件、Loop意為環,EventLoo即為事件環

    EventLoop是一種程序設計結構等待以及分發事件。

    NioEventLoop,是一個Netty工作線程,又不僅僅是一個Netty工作線程。

    標準的netty線程模型 中我們講過Netty的標準線程池模型,池子里的每個線程對象就是一個NioEventLoop對象。或負責接受連接,或負責網絡I/O

    說它不僅僅是一個Netty線程,因為它實現了很多功能,我們可以看下它的繼承圖:

    它的上方有兩個枝丫,一個線程屬性,一個EventLoop,它是Netty的Reactor線程

    既然是Reactor線程,那麼首先我們需要一個多路復用器。在Netty NioEventLoop中,包就含一個 Selector,它的操作對象是Channel。

    NioEventLoop的主要邏輯在它的run()方法,方法體內是一個無限循環 for (;;),循環體內實現Loop功能。這也是通用的NIO線程實現方式。

     

    Loop 從任務隊列里獲取任務,然後檢查多路復用器中就緒的Channel進行處理。

    三、Unsafe

    Netty中的Unsafe,一個Channel內部聚合接口,用以處理實際的網絡I/O讀寫。當然,取Unsafe命名,源碼中釋義:提供的網絡相關的操作方法,永遠不應該被開發人員操作使用。

    它是Channel的一個輔助接口,主要方法:

    1、register:註冊Channel

    2、deregister:取消註冊

    3、bind:綁定地址,服務端綁定監聽特定端口;客戶端指定本地綁定Socket地址。

    4、connect:建立連接

    5、disconnect:斷開連接

    6、close:關閉連接

    7、write:調度寫,將數據寫入buffer,並未真正進入Channel

    8、flush:將緩衝區中的數據寫入Channel

    四、AdaptiveRecvByteBufAllocator

    動態緩衝區分配器,源碼說明:根據實時的反饋動態的增加或者減少預需的緩衝區大小。

    如果一次分配的緩衝區被填滿了,則調高下一次分配的緩衝區大小。

    如果連續兩次實際使用的容量低於分配的緩衝區大小特定比例,則減小下一次分配的緩衝區大小。

    其它情景,保持分配大小不變。

    Netty的這種“智能化”處理,可以說是相當有用的:

    1、首先,實際的應用場景千差萬別,同一場景下不同時刻的緩衝區需求也是實時變化(一句話可以是一個字,也可能是1000個字),這就需要Netty動態調整緩衝分配大小以適應不同的業務場景,時刻場景

    2、其次,過大的不必要的內存分配,會導致Buffer處理性能下降;過小的內存分配,則會導致頻繁的分配釋放。這都是一個優良的網絡框架不應該有的。 

    3、最後,動態的調整最直接的好處就是內存的的高效使用,一定程度上做到了按需分配。 

    五、ChannelPipeline

    Pipeline 管道,Channel的數據流通管道,在這個管道中,可以做很多事情。

    ChannelPipeline 是一種職責鏈,可以對其中流動的數據進行過濾、攔截處理,是一種插拔式的鏈路裝配器

    1、ChannelPipline是一個容器

    支持查詢、添加、刪除、替換等容器操作。

    2、ChannelPipline支持動態的添加和刪除 Handler

    ChannelPipline的這種特性給了我們相當的想象空間,例如動態的添加系統擁塞保護Handler,敏感數據過濾Handler、日誌記錄Handler、性能統計Handler等。

    3、ChannelPipline 是線程安全的

    ChannelPipline使用 synchronized 實現線程安全,業務線程可以併發的操作ChannelPipline。但需要注意的是,Handler是非線程安全的

    六、HandlerAdapter

    Adapter是一種適配器,對於用戶自定義的Handler,可以通過繼承HandlerAdapter,來規避不必要的接口實現

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※教你寫出一流的銷售文案?

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※回頭車貨運收費標準

    ※別再煩惱如何寫文案,掌握八大原則!

    ※超省錢租車方案

    ※產品缺大量曝光嗎?你需要的是一流包裝設計!

    聚甘新

  • 【原創】強擼 .NET Redis Cluster 集群訪問組件

    【原創】強擼 .NET Redis Cluster 集群訪問組件

      Hello 大家好,我是TANZAME,我們又見面了。今天我們來聊聊怎麼手擼一個 Redis Cluster 集群客戶端,純手工有乾貨,您細品。

      隨着業務增長,線上環境的QPS暴增,自然而然將當前的單機 Redis 切換到群集模式。燃鵝,我們悲劇地發現,ServiceStack.Redis這個官方推薦的 .NET 客戶端並沒有支持集群模式。一通度娘翻牆無果后,決定自己強擼一個基於ServiceStack.Redis的Redis集群訪問組件。

      話不多說,先上運行效果圖:

     

      Redis-Cluster集群使用 hash slot 算法對每個key計算CRC16值,然後對16383取模,可以獲取key對應的 hash slot。Redis-Cluster中每個master都會持有部分 slot,在訪問key時根據計算出來的hash slot去找到具體的master節點,再由當前找到的節點去執行具體的 Redis 命令(具體可查閱官方說明文檔)。

      由於 ServiceStack.Redis已經實現了單個實例的Redis命令,因此我們可以將即將要實現的 Redis 集群客戶端當做一個代理,它只負責計算 key 落在哪一個具體節點(尋址)然後將Redis命令轉發給對應的節點執行即可。

      ServiceStack.Redis的RedisClient是非線程安全的,ServiceStack.Redis 使用緩存客戶端管理器(PooledRedisClientManager)來提高性能和併發能力,我們的Redis Cluster集群客戶端也應集成PooledRedisClientManager來獲取 RedisClient 實例。

      同時,Redis-Cluster集群支持在線動態擴容和slot遷移,我們的Redis集群客戶端也應具備自動智能發現新節點和自動刷新 slot 分佈的能力。

      總結起來,要實現一個Redis-Cluster客戶端,需要實現以下幾個要點:

    • 根據 key 計算 hash slot
    • 自動讀取群集上所有的節點信息
    • 為節點分配緩存客戶端管理器
    • 將 hash slot 路由到正確的節點
    • 自動發現新節點和自動刷新slot分佈

      如下面類圖所示,接下來我們詳細分析具體的代碼實現。

      

      一、CRC16  

      CRC即循環冗餘校驗碼,是信息系統中一種常見的檢錯碼。CRC校驗碼不同的機構有不同的標準,這裏Redis遵循的標準是CRC-16-CCITT標準,這也是被XMODEM協議使用的CRC標準,所以也常用XMODEM CRC代指,是比較經典的“基於字節查表法的CRC校驗碼生成算法”。 

     1 /// <summary>
     2 /// 根據 key 計算對應的哈希槽
     3 /// </summary>
     4 public static int GetSlot(string key)
     5 {
     6     key = CRC16.ExtractHashTag(key);
     7     // optimization with modulo operator with power of 2 equivalent to getCRC16(key) % 16384
     8     return GetCRC16(key) & (16384 - 1);
     9 }
    10 
    11 /// <summary>
    12 /// 計算給定字節組的 crc16 檢驗碼
    13 /// </summary>
    14 public static int GetCRC16(byte[] bytes, int s, int e)
    15 {
    16     int crc = 0x0000;
    17 
    18     for (int i = s; i < e; i++)
    19     {
    20         crc = ((crc << 8) ^ LOOKUP_TABLE[((crc >> 8) ^ (bytes[i] & 0xFF)) & 0xFF]);
    21     }
    22     return crc & 0xFFFF;
    23 }

     

      二、讀取集群節點

      從集群中的任意節點使用 CLUSTER NODES 命令可以讀取到集群中所有的節點信息,包括連接狀態,它們的標誌,屬性和分配的槽等等。CLUSTER NODES 以串行格式提供所有這些信息,輸出示例:

    d99b65a25ef726c64c565901e345f98c496a1a47 127.0.0.1:7007 master - 0 1592288083308 8 connected
    2d71879d6529d1edbfeed546443051986245c58e 127.0.0.1:7003 master - 0 1592288084311 11 connected 10923-16383
    654cdc25a5fa11bd44b5b716cdf07d4ce176efcd 127.0.0.1:7005 slave 484e73948d8aacd8327bf90b89469b52bff464c5 0 1592288085313 10 connected
    ed65d52dad7ef6854e0e261433b56a551e5e11cb 127.0.0.1:7004 slave 754d0ec7a7f5c7765f784a6a2c370ea38ea0c089 0 1592288081304 9 connected
    754d0ec7a7f5c7765f784a6a2c370ea38ea0c089 127.0.0.1:7001 master - 0 1592288080300 9 connected 0-5460
    484e73948d8aacd8327bf90b89469b52bff464c5 127.0.0.1:7002 master - 0 1592288082306 10 connected 5461-10922
    2223bc6d099bd9838e5d2f1fbd9a758f64c554c4 127.0.0.1:7006 myself,slave 2d71879d6529d1edbfeed546443051986245c58e 0 0 6 connected
    

      每個字段的含義如下:

      1. id:節點 ID,一個40個字符的隨機字符串,當一個節點被創建時不會再發生變化(除非CLUSTER RESET HARD被使用)。

      2. ip:port:客戶端應該聯繫節點以運行查詢的節點地址。

      3. flags:逗號列表分隔的標誌:myselfmasterslavefail?failhandshakenoaddrnoflags。標誌在下一節詳細解釋。

      4. master:如果節點是從屬節點,並且主節點已知,則節點ID為主節點,否則為“ – ”字符。

      5. ping-sent:以毫秒為單位的當前激活的ping發送的unix時間,如果沒有掛起的ping,則為零。

      6. pong-recv:毫秒 unix 時間收到最後一個乒乓球。

      7. config-epoch:當前節點(或當前主節點,如果該節點是從節點)的配置時期(或版本)。每次發生故障切換時,都會創建一個新的,唯一的,單調遞增的配置時期。如果多個節點聲稱服務於相同的哈希槽,則具有較高配置時期的節點將獲勝。

      8. link-state:用於節點到節點集群總線的鏈路狀態。我們使用此鏈接與節點進行通信。可以是connecteddisconnected

      9. slot:散列槽號或範圍。從參數9開始,但總共可能有16384個條目(限制從未達到)。這是此節點提供的散列槽列表。如果條目僅僅是一個数字,則被解析為這樣。如果它是一個範圍,它是在形式start-end,並且意味着節點負責所有散列時隙從startend包括起始和結束值。

    標誌的含義(字段編號3):

    • myself:您正在聯繫的節點。
    • master:節點是主人。
    • slave:節點是從屬的。
    • fail?:節點處於PFAIL狀態。對於正在聯繫的節點無法訪問,但仍然可以在邏輯上訪問(不處於FAIL狀態)。
    • fail:節點處於FAIL狀態。對於將PFAIL狀態提升為FAIL的多個節點而言,這是無法訪問的。
    • handshake:不受信任的節點,我們握手。
    • noaddr:此節點沒有已知的地址。
    • noflags:根本沒有標誌。
      1 // 讀取集群上的節點信息
      2 static IList<InternalClusterNode> ReadClusterNodes(IEnumerable<ClusterNode> source)
      3 {
      4     RedisClient c = null;
      5     StringReader reader = null;
      6     IList<InternalClusterNode> result = null;
      7 
      8     int index = 0;
      9     int rowCount = source.Count();
     10 
     11     foreach (var node in source)
     12     {
     13         try
     14         {
     15             // 從當前節點讀取REDIS集群節點信息
     16             index += 1;
     17             c = new RedisClient(node.Host, node.Port, node.Password);
     18             RedisData data = c.RawCommand("CLUSTER".ToUtf8Bytes(), "NODES".ToUtf8Bytes());
     19             string info = Encoding.UTF8.GetString(data.Data);
     20 
     21             // 將讀回的字符文本轉成強類型節點實體
     22             reader = new StringReader(info);
     23             string line = reader.ReadLine();
     24             while (line != null)
     25             {
     26                 if (result == null) result = new List<InternalClusterNode>();
     27                 InternalClusterNode n = InternalClusterNode.Parse(line);
     28                 n.Password = node.Password;
     29                 result.Add(n);
     30 
     31                 line = reader.ReadLine();
     32             }
     33 
     34             // 只要任意一個節點拿到集群信息,直接退出
     35             if (result != null && result.Count > 0) break;
     36         }
     37         catch (Exception ex)
     38         {
     39             // 出現異常,如果還沒到最後一個節點,則繼續使用下一下節點讀取集群信息
     40             // 否則拋出異常
     41             if (index < rowCount)
     42                 Thread.Sleep(100);
     43             else
     44                 throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex);
     45         }
     46         finally
     47         {
     48             if (reader != null) reader.Dispose();
     49             if (c != null) c.Dispose();
     50         }
     51     }
     52 
     53 
     54     if (result == null)
     55         result = new List<InternalClusterNode>(0);
     56     return result;
     57 }
     58 
     59 /// <summary>
     60 /// 從 cluster nodes 的每一行命令里讀取出集群節點的相關信息
     61 /// </summary>
     62 /// <param name="line">集群命令</param>
     63 /// <returns></returns>
     64 public static InternalClusterNode Parse(string line)
     65 {
     66     if (string.IsNullOrEmpty(line))
     67         throw new ArgumentException("line");
     68 
     69     InternalClusterNode node = new InternalClusterNode();
     70     node._nodeDescription = line;
     71     string[] segs = line.Split(' ');
     72 
     73     node.NodeId = segs[0];
     74     node.Host = segs[1].Split(':')[0];
     75     node.Port = int.Parse(segs[1].Split(':')[1]);
     76     node.MasterNodeId = segs[3] == "-" ? null : segs[3];
     77     node.PingSent = long.Parse(segs[4]);
     78     node.PongRecv = long.Parse(segs[5]);
     79     node.ConfigEpoch = int.Parse(segs[6]);
     80     node.LinkState = segs[7];
     81 
     82     string[] flags = segs[2].Split(',');
     83     node.IsMater = flags[0] == MYSELF ? flags[1] == MASTER : flags[0] == MASTER;
     84     node.IsSlave = !node.IsMater;
     85     int start = 0;
     86     if (flags[start] == MYSELF)
     87         start = 1;
     88     if (flags[start] == SLAVE || flags[start] == MASTER)
     89         start += 1;
     90     node.NodeFlag = string.Join(",", flags.Skip(start));
     91 
     92     if (segs.Length > 8)
     93     {
     94         string[] slots = segs[8].Split('-');
     95         node.Slot.Start = int.Parse(slots[0]);
     96         if (slots.Length > 1) node.Slot.End = int.Parse(slots[1]);
     97 
     98         for (int index = 9; index < segs.Length; index++)
     99         {
    100             if (node.RestSlots == null)
    101                 node.RestSlots = new List<HashSlot>();
    102 
    103             slots = segs[index].Split('-');
    104 
    105             int s1 = 0;
    106             int s2 = 0;
    107             bool b1 = int.TryParse(slots[0], out s1);
    108             bool b2 = int.TryParse(slots[1], out s2);
    109             if (!b1 || !b2)
    110                 continue;
    111             else
    112                 node.RestSlots.Add(new HashSlot(s1, slots.Length > 1 ? new Nullable<int>(s2) : null));
    113         }
    114     }
    115 
    116     return node;
    117 }

    View Code

     

      三、為節點分配緩存客戶端管理器

      在單實例的Redis中,我們通過 PooledRedisClientManager 這個管理器來獲取RedisClient。借鑒這個思路,在Redis Cluster集群中,我們為每一個主節點實例化一個 PooledRedisClientManager,並且該主節點持有的 slot 都共享一個 PooledRedisClientManager 實例。以 slot 做為 key 將 slot 與 PooledRedisClientManager 一一映射並緩存起來。

     1 // 初始化集群管理
     2 void Initialize(IList<InternalClusterNode> clusterNodes = null)
     3 {
     4     // 從 redis 讀取集群信息
     5     IList<InternalClusterNode> nodes = clusterNodes == null ? RedisCluster.ReadClusterNodes(_source) : clusterNodes;
     6 
     7     // 生成主節點,每個主節點的 slot 對應一個REDIS客戶端緩衝池管理器
     8     IList<InternalClusterNode> masters = null;
     9     IDictionary<int, PooledRedisClientManager> managers = null;
    10     foreach (var n in nodes)
    11     {
    12         // 節點無效或者
    13         if (!(n.IsMater &&
    14             !string.IsNullOrEmpty(n.Host) &&
    15             string.IsNullOrEmpty(n.NodeFlag) &&
    16             (string.IsNullOrEmpty(n.LinkState) || n.LinkState == InternalClusterNode.CONNECTED))) continue;
    17 
    18         n.SlaveNodes = nodes.Where(x => x.MasterNodeId == n.NodeId);
    19         if (masters == null)
    20             masters = new List<InternalClusterNode>();
    21         masters.Add(n);
    22 
    23         // 用每一個主節點的哈希槽做鍵,導入REDIS客戶端緩衝池管理器
    24         // 然後,方法表指針(又名類型對象指針)上場,佔據 4 個字節。 4 * 16384 / 1024 = 64KB
    25         if (managers == null)
    26             managers = new Dictionary<int, PooledRedisClientManager>();
    27 
    28         string[] writeHosts = new[] { n.HostString };
    29         string[] readHosts = n.SlaveNodes.Where(n => false).Select(n => n.HostString).ToArray();
    30         var pool = new PooledRedisClientManager(writeHosts, readHosts, _config);
    31         managers.Add(n.Slot.Start, pool);
    32         if (n.Slot.End != null)
    33         {
    34             // 這個範圍內的哈希槽都用同一個緩衝池
    35             for (int s = n.Slot.Start + 1; s <= n.Slot.End.Value; s++)
    36                 managers.Add(s, pool);
    37         }
    38         if (n.RestSlots != null)
    39         {
    40             foreach (var slot in n.RestSlots)
    41             {
    42                 managers.Add(slot.Start, pool);
    43                 if (slot.End != null)
    44                 {
    45                     // 這個範圍內的哈希槽都用同一個緩衝池
    46                     for (int s = slot.Start + 1; s <= slot.End.Value; s++)
    47                         managers.Add(s, pool);
    48                 }
    49             }
    50         }
    51     }
    52 
    53     _masters = masters;
    54     _redisClientManagers = managers;
    55     _clusterNodes = nodes != null ? nodes : null;
    56 
    57     if (_masters == null) _masters = new List<InternalClusterNode>(0);
    58     if (_clusterNodes == null) _clusterNodes = new List<InternalClusterNode>(0);
    59     if (_redisClientManagers == null) _redisClientManagers = new Dictionary<int, PooledRedisClientManager>(0);
    60 
    61     if (_masters.Count > 0)
    62         _source = _masters.Select(n => new ClusterNode(n.Host, n.Port, n.Password)).ToList();
    63 }

    View Code

     

      四、將 hash slot 路由到正確的節點

      在訪問一個 key 時,根據第三步緩存起來的 PooledRedisClientManager ,用 key 計算出來的 hash slot 值可以快速找出這個 key 對應的 PooledRedisClientManager 實例,調用 PooledRedisClientManager.GetClient() 即可將 hash slot 路由到正確的主節點。

     1 // 執行指定動作並返回值
     2 private T DoExecute<T>(string key, Func<RedisClient, T> action) => this.DoExecute(() => this.GetRedisClient(key), action);
     3 
     4 // 執行指定動作並返回值
     5 private T DoExecute<T>(Func<RedisClient> slot, Func<RedisClient, T> action, int tryTimes = 1)
     6 {
     7     RedisClient c = null;
     8     try
     9     {
    10         c = slot();
    11         return action(c);
    12     }
    13     catch (Exception ex)
    14     {
    15         // 此處省略 ...
    16     }
    17     finally
    18     {
    19         if (c != null)
    20             c.Dispose();
    21     }
    22 }
    23 
    24 // 獲取指定key對應的主設備節點
    25 private RedisClient GetRedisClient(string key)
    26 {
    27     if (string.IsNullOrEmpty(key))
    28         throw new ArgumentNullException("key");
    29 
    30     int slot = CRC16.GetSlot(key);
    31     if (!_redisClientManagers.ContainsKey(slot))
    32         throw new SlotNotFoundException(string.Format("No reachable node in cluster for slot {{{0}}}", slot), slot, key);
    33 
    34     var pool = _redisClientManagers[slot];
    35     return (RedisClient)pool.GetClient();
    36 }

       

      五、自動發現新節點和自動刷新slot分佈

      在實際生產環境中,Redis 集群經常會有添加/刪除節點、遷移 slot 、主節點宕機從節點轉主節點等,針對這些情況,我們的 Redis Cluster 組件必須具備自動發現節點和刷新在 第三步  緩存起來的 slot 的能力。在這裏我的實現思路是當節點執行 Redis 命令時返回 RedisException 異常時就強制刷新集群節點信息並重新緩存 slot 與 節點之間的映射。

      1 // 執行指定動作並返回值
      2 private T DoExecute<T>(Func<RedisClient> slot, Func<RedisClient, T> action, int tryTimes = 1)
      3 {
      4     RedisClient c = null;
      5     try
      6     {
      7         c = slot();
      8         return action(c);
      9     }
     10     catch (Exception ex)
     11     {
     12         if (!(ex is RedisException) || tryTimes == 0) throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex);
     13         else
     14         {
     15             tryTimes -= 1;
     16             // 嘗試重新刷新集群信息
     17             bool isRefresh = DiscoveryNodes(_source, _config);
     18             if (isRefresh)
     19                 // 集群節點有更新過,重新執行
     20                 return this.DoExecute(slot, action, tryTimes);
     21             else
     22                 // 集群節點未更新過,直接拋出異常
     23                 throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex);
     24         }
     25     }
     26     finally
     27     {
     28         if (c != null)
     29             c.Dispose();
     30     }
     31 }
     32 
     33 // 重新刷新集群信息
     34 private bool DiscoveryNodes(IEnumerable<ClusterNode> source, RedisClientManagerConfig config)
     35 {
     36     bool lockTaken = false;
     37     try
     38     {
     39         // noop
     40         if (_isDiscoverying) { }
     41 
     42         Monitor.Enter(_objLock, ref lockTaken);
     43 
     44         _source = source;
     45         _config = config;
     46         _isDiscoverying = true;
     47 
     48         // 跟上次同步時間相隔 {MONITORINTERVAL} 秒鐘以上才需要同步
     49         if ((DateTime.Now - _lastDiscoveryTime).TotalMilliseconds >= MONITORINTERVAL)
     50         {
     51             bool isRefresh = false;
     52             IList<InternalClusterNode> newNodes = RedisCluster.ReadClusterNodes(_source);
     53             foreach (var node in newNodes)
     54             {
     55                 var n = _clusterNodes.FirstOrDefault(x => x.HostString == node.HostString);
     56                 isRefresh =
     57                     n == null ||                        // 新節點                                                                
     58                     n.Password != node.Password ||      // 密碼變了                                                                
     59                     n.IsMater != node.IsMater ||        // 主變從或者從變主                                                                
     60                     n.IsSlave != node.IsSlave ||        // 主變從或者從變主                                                                
     61                     n.NodeFlag != node.NodeFlag ||      // 節點標記位變了                                                                
     62                     n.LinkState != node.LinkState ||    // 節點狀態位變了                                                                
     63                     n.Slot.Start != node.Slot.Start ||  // 哈希槽變了                                                                
     64                     n.Slot.End != node.Slot.End ||      // 哈希槽變了
     65                     (n.RestSlots == null && node.RestSlots != null) ||
     66                     (n.RestSlots != null && node.RestSlots == null);
     67                 if (!isRefresh && n.RestSlots != null && node.RestSlots != null)
     68                 {
     69                     var slots1 = n.RestSlots.OrderBy(x => x.Start).ToList();
     70                     var slots2 = node.RestSlots.OrderBy(x => x.Start).ToList();
     71                     for (int index = 0; index < slots1.Count; index++)
     72                     {
     73                         isRefresh =
     74                             slots1[index].Start != slots2[index].Start ||   // 哈希槽變了                                                                
     75                             slots1[index].End != slots2[index].End;         // 哈希槽變了
     76                         if (isRefresh) break;
     77                     }
     78                 }
     79 
     80                 if (isRefresh) break;
     81             }
     82 
     83             if (isRefresh)
     84             {
     85                 // 重新初始化集群
     86                 this.Dispose();
     87                 this.Initialize(newNodes);
     88                 this._lastDiscoveryTime = DateTime.Now;
     89             }
     90         }
     91 
     92         // 最後刷新時間在 {MONITORINTERVAL} 內,表示是最新群集信息 newest
     93         return (DateTime.Now - _lastDiscoveryTime).TotalMilliseconds < MONITORINTERVAL;
     94     }
     95     finally
     96     {
     97         if (lockTaken)
     98         {
     99             _isDiscoverying = false;
    100             Monitor.Exit(_objLock);
    101         }
    102     }
    103 }

    View Code

     

      六、配置訪問組件調用入口

      最後我們需要為組件提供訪問入口,我們用 RedisCluster 類實現 字符串、列表、哈希、集合、有序集合和Keys的基本操作,並且用 RedisClusterFactory 工廠類對外提供單例操作,這樣就可以像單實例 Redis 那樣調用 Redis Cluster 集群。調用示例:

    var node = new ClusterNode("127.0.0.1", 7001);
    var redisCluster = RedisClusterFactory.Configure(node, config);
    string key = "B070x14668";
    redisCluster.Set(key, key);
    string value = redisCluster.Get<string>(key);
    redisCluster.Del(key);
     1 /// <summary>
     2 /// REDIS 集群工廠
     3 /// </summary>
     4 public class RedisClusterFactory
     5 {
     6     static RedisClusterFactory _factory = new RedisClusterFactory();
     7     static RedisCluster _cluster = null;
     8 
     9     /// <summary>
    10     /// Redis 集群
    11     /// </summary>
    12     public static RedisCluster Cluster
    13     {
    14         get
    15         {
    16             if (_cluster == null)
    17                 throw new Exception("You should call RedisClusterFactory.Configure to config cluster first.");
    18             else
    19                 return _cluster;
    20         }
    21     }
    22 
    23     /// <summary>
    24     /// 初始化 <see cref="RedisClusterFactory"/> 類的新實例
    25     /// </summary>
    26     private RedisClusterFactory()
    27     {
    28     }
    29 
    30     /// <summary>
    31     /// 配置 REDIS 集群
    32     /// <para>若群集中有指定 password 的節點,必須使用  IEnumerable&lt;ClusterNode&gt; 重載列舉出這些節點</para>
    33     /// </summary>
    34     /// <param name="node">集群節點</param>
    35     /// <returns></returns>
    36     public static RedisCluster Configure(ClusterNode node)
    37     {
    38         return RedisClusterFactory.Configure(node, null);
    39     }
    40 
    41     /// <summary>
    42     /// 配置 REDIS 集群
    43     /// <para>若群集中有指定 password 的節點,必須使用  IEnumerable&lt;ClusterNode&gt; 重載列舉出這些節點</para>
    44     /// </summary>
    45     /// <param name="node">集群節點</param>
    46     /// <param name="config"><see cref="RedisClientManagerConfig"/> 客戶端緩衝池配置</param>
    47     /// <returns></returns>
    48     public static RedisCluster Configure(ClusterNode node, RedisClientManagerConfig config)
    49     {
    50         return RedisClusterFactory.Configure(new List<ClusterNode> { node }, config);
    51     }
    52 
    53     /// <summary>
    54     /// 配置 REDIS 集群
    55     /// </summary>
    56     /// <param name="nodes">集群節點</param>
    57     /// <param name="config"><see cref="RedisClientManagerConfig"/> 客戶端緩衝池配置</param>
    58     /// <returns></returns>
    59     public static RedisCluster Configure(IEnumerable<ClusterNode> nodes, RedisClientManagerConfig config)
    60     {
    61         if (nodes == null)
    62             throw new ArgumentNullException("nodes");
    63 
    64         if (nodes == null || nodes.Count() == 0)
    65             throw new ArgumentException("There is no nodes to configure cluster.");
    66 
    67         if (_cluster == null)
    68         {
    69             lock (_factory)
    70             {
    71                 if (_cluster == null)
    72                 {
    73                     RedisCluster c = new RedisCluster(nodes, config);
    74                     _cluster = c;
    75                 }
    76             }
    77         }
    78 
    79         return _cluster;
    80     }
    81 }

    View Code

     

      總結

      今天我們詳細介紹了如何從0手寫一個Redis Cluster集群客戶端訪問組件,相信對同樣在尋找類似解決方案的同學們會有一定的啟發,喜歡的同學請點個 star。在沒有相同案例可以參考的情況下筆者通過查閱官方說明文檔和借鑒 Java 的 JedisCluster 的實現思路,雖說磕磕碰碰但最終也初步完成這個組件並投入使用,必須給自己加一個雞腿!!在此我有一個小小的疑問,.NET 的同學們在用 Redis 集群時,你們是用什麼組件耍的,為何網上的相關介紹和現成組件幾乎都沒有?歡迎討論。

      GitHub 代碼託管:https://github.com/TANZAME/ServiceStack.Redis.Cluster

      技術交流 QQ 群:816425449

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※超省錢租車方案

    ※別再煩惱如何寫文案,掌握八大原則!

    ※回頭車貨運收費標準

    ※教你寫出一流的銷售文案?

    FB行銷專家,教你從零開始的技巧

    聚甘新

  • 荷蘭開放首條塑膠再製自行車道 約含50萬個瓶蓋

    摘錄自2018年09月18日科技新報報導

    荷蘭人以愛好騎單車聞名,該國約 1,700 萬人,卻擁有超過 2,200 萬輛腳踏車,光是阿姆斯特丹就鋪設了近 800 公里的自行車道,東北部城鎮茲沃勒(Zwolle)日前又正式開放一條全由回收塑膠再製生成的自行車道。

    該塑膠車道概念來自道路工程公司 KWS 的 Anne Koudstaal 和 Simon Jorritsma,道路全長 30 公尺,採用預製模塊,重量輕、易安裝,下方設計了排水系統讓管道、電纜通過,可讓水快速流通,遇到暴雨時還能充當臨時儲水槽、避免淹水。

    據估計,這條自行車道包含了約 218,000 個塑膠杯,或 50 萬個塑膠瓶蓋,使用年限可比傳統道路長 2~3 倍(有待觀察),路面應該也不會出現裂縫或坑洞。

     

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

  • 印度雨季後氣候乾燥 德里登革熱疫情飆升

    摘錄自2018年09月18日中央社報導

    「印度斯坦時報」(Hindustan Times)18日引述南德里市政機構(SDMC)彙整新德里、東德里和北德里等另3個市政機構的統計顯示,德里地區今年迄今的登革熱病例達243例。

    在15日之前的一個星期,德里地區就通報了106起登革熱病例,高達全年迄今243例的近一半。這段時間是德里近2個月連續降雨以來,首個未降雨的星期。而雨季後出現的乾燥氣候,專家認為是登革熱病例突然飆升的主因。醫護人員和專家都擔心,最近登革熱病例的上升,現在只是開端。

    為防止登革熱疫情傳布,南德里市政機構最近已對社區大規模噴灑殺蟲劑。北德里市政機構也表示,正派檢查員調查和處罰未清理積水者。

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

    聚甘新

  • 黑龍江大豆因霜害減產5.5億斤

    摘錄自2018年9月18日大紀元報導

    中國高度依賴進口大豆,中美貿易戰後,進口美國大豆數量驟減,黑龍江大豆主產區又遇9月上旬出現低溫霜凍天氣,導致近1,500萬畝大豆遭受凍害,估計減產5.5億斤。

    中國農業農村部前(17)日通報,黑龍江省西北部地區9月9-10日出現低溫霜凍天氣,監測結果表明,大豆低溫霜凍面積約為1,478.5萬畝,產量損失預計為5.5億斤左右。大豆是喜溫作物,在整個生產期,大豆最適合生長的溫度是日平均氣溫20°~25°C,5-9月份氣溫平均降低1度,大豆畝產減產15-20斤。

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    南投搬家公司費用需注意的眉眉角角,別等搬了再說!

    ※教你寫出一流的銷售文案?

    聚甘新

  • 造假醜聞、禁令、民眾信心跌 德國柴油車榮景恐回不去了

    環境資訊中心記者 陳文姿報導

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※別再煩惱如何寫文案,掌握八大原則!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

    FB行銷專家,教你從零開始的技巧

    聚甘新

  • 空污不只傷身 研究:廢氣吸多恐增失智症風險

    摘錄自2018年09月18日中央社報導

    根據發表在「英國醫學期刊網路版」(BMJOpen)的研究結果顯示,排除大量飲酒、吸菸及其他確定導致失智症的風險因素之後,都市地區主要由車輛排放廢氣所造成的空氣污染,與居民罹患失智症的風險增加有關。

    英國倫敦大學(University of London)人口健康研究所(Population Health Research Institute)的卡瑞(Iain Carey)率旗下研究團隊,分析大倫敦區(Greater London)13萬1000名居民的健康記錄,他們在2004年時年齡介於50和79歲間。當時這些人並無任何失智症症狀。

    根據這13萬多人的居住地址,估計他們每年接觸的二氧化氮及細懸浮微粒(PM2.5)量,並花7年追蹤這些人的健康情況。結果發現,有1.7%或將近2200人確診為失智症。

    研究作者也提醒,由於這項研究屬事後分析,而非在實驗場所進行的臨床分析,因此無法得出空污與失智症間的確切因果關係。但研究結果清楚顯示,燃燒柴油及汽油所排出的化學副產物將損害大腦功能。
     

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※別再煩惱如何寫文案,掌握八大原則!

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※超省錢租車方案

    ※教你寫出一流的銷售文案?

    網頁設計最專業,超強功能平台可客製化

    聚甘新

  • 微塑膠汙染新途徑 透過孑孓進入食物鏈

    摘錄自2018年09月19日中央社報導

    英國瑞丁大學(University of Reading)研究人員19日表示,蚊子幼蟲孑孓會吞食塑膠製品破裂後形成的微塑膠,被吞食的微粒中,許多會隨著孑孓的成長,轉移到成蚊體內,微塑膠可能透過蚊子和其他飛行昆蟲,從空中進入人類生態系。

    同時,這也代表若有任何生物吃下這些會飛的蚊子,也會跟著吃進微小塑膠粒,而眾所周知,會捕食這類昆蟲的動物包括數種鳥類、蝙蝠和蜘蛛,這些動物又成為其他動物獵食的目標,進而在食物鏈內傳遞。

    研究報告主要作者、瑞丁大學生物科學家賈拉漢(Amanda Callaghan)接受媒體訪問時談到:「這項發現的重要性是此現象可能相當普遍。我們只挑選蚊子作為樣本進行觀察,不過還有許多昆蟲活在水裡,生命週期也和孑孓一樣,會吞食水裡的東西,然後變成成蟲。」

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※教你寫出一流的銷售文案?

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※回頭車貨運收費標準

    ※別再煩惱如何寫文案,掌握八大原則!

    ※超省錢租車方案

    ※產品缺大量曝光嗎?你需要的是一流包裝設計!

    聚甘新

  • 德反煤護樹運動佔領森林六年 警方清場 一記者死亡

    環境資訊中心綜合外電;姜唯、彭瑞祥 編譯;林大利 審校

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※超省錢租車方案

    ※別再煩惱如何寫文案,掌握八大原則!

    ※回頭車貨運收費標準

    ※教你寫出一流的銷售文案?

    FB行銷專家,教你從零開始的技巧

    聚甘新