標籤: 租車

  • 基於NACOS和JAVA反射機制動態更新JAVA靜態常量非@Value註解

    基於NACOS和JAVA反射機制動態更新JAVA靜態常量非@Value註解

    1.前言

    項目中都會使用常量類文件, 這些值如果需要變動需要重新提交代碼,或者基於@Value註解實現動態刷新, 如果常量太多也是很麻煩; 那麼 能不能有更加簡便的實現方式呢?

    本文講述的方式是, 一個JAVA類對應NACOS中的一個配置文件,優先使用nacos中的配置,不配置則使用程序中的默認值;

    2.正文

    nacos的配置如下圖所示,為了滿足大多數情況,配置了 namespace命名空間和group;

     

     

     新建個測試工程 cloud-sm.

    bootstrap.yml 中添加nacos相關配置;

    為了支持多配置文件需要注意ext-config節點,group對應nacos的添加的配置文件的group; data-id 對應nacos上配置的data-id

    配置如下:

    server:
      port: 9010
      servlet:
        context-path: /sm
    spring:
      application:
        name: cloud-sm
      cloud:
        nacos:
          discovery:
            server-addr: 192.168.100.101:8848 #Nacos服務註冊中心地址
            namespace: 1
          config:
            server-addr: 192.168.100.101:8848 #Nacos作為配置中心地址
            namespace: 1
            ext-config:
              - group: TEST_GROUP
                data-id: cloud-sm.yaml
                refresh: true
              - group: TEST_GROUP
                data-id: cloud-sm-constant.properties
                refresh: true

    接下來是本文重點:

    1)新建註解ConfigModule,用於在配置類上;一個value屬性;

    2)新建個監聽類,用於獲取最新配置,並更新常量值

    實現流程:

    1)項目初始化時獲取所有nacos的配置

    2)遍歷這些配置文件,從nacos上獲取配置

    3)遍歷nacos配置文件,獲取MODULE_NAME的值

    4)尋找配置文件對應的常量類,從spring容器中尋找 常量類 有註解ConfigModule 且值是 MODULE_NAME對應的

    5)使用JAVA反射更改常量類的值

    6)增加監聽,用於動態刷新

     

    import org.springframework.stereotype.Component;
    import java.lang.annotation.Documented;
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    @Component
    @Documented
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.TYPE})
    public @interface ConfigModule {
        /**
         *  對應配置文件裏面key為( MODULE_NAME ) 的值
         * @return
         */
        String value();
    }
    import com.alibaba.cloud.nacos.NacosConfigProperties;
    import com.alibaba.druid.support.json.JSONUtils;
    import com.alibaba.nacos.api.NacosFactory;
    import com.alibaba.nacos.api.PropertyKeyConst;
    import com.alibaba.nacos.api.config.ConfigService;
    import com.alibaba.nacos.api.config.listener.Listener;
    import com.alibaba.nacos.api.exception.NacosException;
    import com.alibaba.nacos.client.utils.LogUtils;
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.ApplicationContext;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.io.IOException;
    import java.io.StringReader;
    import java.lang.reflect.Field;
    import java.util.Enumeration;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.Executor;
    
    /**
     * nacos 自定義監聽
     *
     * @author zch
     */
    @Component
    public class NacosConfigListener {
        private Logger LOGGER = LogUtils.logger(NacosConfigListener.class);
        @Autowired
        private NacosConfigProperties configs;
        @Value("${spring.cloud.nacos.config.server-addr:}")
        private String serverAddr;
        @Value("${spring.cloud.nacos.config.namespace:}")
        private String namespace;
        @Autowired
        private ApplicationContext applicationContext;
        /**
         * 目前只考慮properties 文件
         */
        private String fileType = "properties";
        /**
         * 需要在配置文件中增加一條 MODULE_NAME 的配置,用於找到對應的 常量類
         */
        private String MODULE_NAME = "MODULE_NAME";
    
        /**
         * NACOS監聽方法
         *
         * @throws NacosException
         */
        public void listener() throws NacosException {
            if (StringUtils.isBlank(serverAddr)) {
                LOGGER.info("未找到 spring.cloud.nacos.config.server-addr");
                return;
            }
            Properties properties = new Properties();
            properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr.split(":")[0]);
            if (StringUtils.isNotBlank(namespace)) {
                properties.put(PropertyKeyConst.NAMESPACE, namespace);
            }
    
            ConfigService configService = NacosFactory.createConfigService(properties);
            // 處理每個配置文件
            for (NacosConfigProperties.Config config : configs.getExtConfig()) {
                String dataId = config.getDataId();
                String group = config.getGroup();
                //目前只考慮properties 文件
                if (!dataId.endsWith(fileType)) continue;
    
                changeValue(configService.getConfig(dataId, group, 5000));
    
                configService.addListener(dataId, group, new Listener() {
                    @Override
                    public void receiveConfigInfo(String configInfo) {
                        changeValue(configInfo);
                    }
    
                    @Override
                    public Executor getExecutor() {
                        return null;
                    }
                });
            }
        }
    
        /**
         * 改變 常量類的 值
         *
         * @param configInfo
         */
        private void changeValue(String configInfo) {
            if(StringUtils.isBlank(configInfo)) return;
            Properties proper = new Properties();
            try {
                proper.load(new StringReader(configInfo)); //把字符串轉為reader
            } catch (IOException e) {
                e.printStackTrace();
            }
            String moduleName = "";
            Enumeration enumeration = proper.propertyNames();
            //尋找MODULE_NAME的值
            while (enumeration.hasMoreElements()) {
                String strKey = (String) enumeration.nextElement();
                if (MODULE_NAME.equals(strKey)) {
                    moduleName = proper.getProperty(strKey);
                    break;
                }
            }
            if (StringUtils.isBlank(moduleName)) return;
            Class curClazz = null;
            // 尋找配置文件對應的常量類
            // 從spring容器中 尋找類的註解有ConfigModule 且值是 MODULE_NAME對應的
            for (String beanName : applicationContext.getBeanDefinitionNames()) {
                Class clazz = applicationContext.getBean(beanName).getClass();
                ConfigModule configModule = (ConfigModule) clazz.getAnnotation(ConfigModule.class);
                if (configModule != null && moduleName.equals(configModule.value())) {
                    curClazz = clazz;
                    break;
                }
            }
            if (curClazz == null) return;
            // 使用JAVA反射機制 更改常量
            enumeration = proper.propertyNames();
            while (enumeration.hasMoreElements()) {
                String key = (String) enumeration.nextElement();
                String value = proper.getProperty(key);
                if (MODULE_NAME.equals(key)) continue;
                try {
                    Field field = curClazz.getDeclaredField(key);
                    //忽略屬性的訪問權限
                    field.setAccessible(true);
                    Class<?> curFieldType = field.getType();
                    //其他類型自行拓展
                    if (curFieldType.equals(String.class)) {
                        field.set(null, value);
                    } else if (curFieldType.equals(List.class)) { // 集合List元素
                        field.set(null, JSONUtils.parse(value));
                    } else if (curFieldType.equals(Map.class)) { //Map
                        field.set(null, JSONUtils.parse(value));
                    }
                } catch (NoSuchFieldException | IllegalAccessException e) {
                    LOGGER.info("設置屬性失敗:{} {} = {} ", curClazz.toString(), key, value);
                }
            }
        }
    
        @PostConstruct
        public void init() throws NacosException {
            listener();
        }
    }

     3.測試

    1)新建常量類Constant,增加註解@ConfigModule(“sm”),盡量測試全面, 添加常量類型有 String, List,Map

    @ConfigModule("sm")
    public class Constant {
    
        public static volatile String TEST = new String("test");
    
        public static volatile List<String> TEST_LIST = new ArrayList<>();
        static {
            TEST_LIST.add("默認值");
        }
        public static volatile Map<String,Object> TEST_MAP = new HashMap<>();
        static {
            TEST_MAP.put("KEY","初始化默認值");
        }
        public static volatile List<Integer> TEST_LIST_INT = new ArrayList<>();
        static {
            TEST_LIST_INT.add(1);
        }
    }

    2)新建個Controller用於測試這些值

    @RestController
    public class TestController {
    
        @GetMapping("/t1")
        public Map<String, Object> test1() {
            Map<String, Object> result = new HashMap<>();
    
            result.put("string" , Constant.TEST);
            result.put("list" , Constant.TEST_LIST);
            result.put("map" , Constant.TEST_MAP);
            result.put("list_int" , Constant.TEST_LIST_INT);
            result.put("code" , 1);
            return result;
        }
    }

    3)當前nacos的配置文件cloud-sm-constant.properties為空

     4)訪問測試路徑localhost:9010/sm/t1,返回為默認值

    {
        "code": 1,
        "string": "test",
        "list_int": [
            1
        ],
        "list": [
            "默認值"
        ],
        "map": {
            "KEY": "初始化默認值"
        }
    }

    5)然後更改nacos的配置文件cloud-sm-constant.properties;

     6)再次訪問測試路徑localhost:9010/sm/t1,返回為nacos中的值

    {
        "code": 1,
        "string": "12351",
        "list_int": [
            1,
            23,
            4
        ],
        "list": [
            "123",
            "sss"
        ],
        "map": {
            "A": 12,
            "B": 432
        }
    }

    4.結語

    這種實現方式優點如下:

    1)動態刷新配置,不需要重啟即可改變程序中的靜態常量值

    2)使用簡單,只需在常量類上添加一個註解

    3)避免在程序中大量使用@Value,@RefreshScope註解

     不足:

    此代碼是個人業餘時間的想法,未經過生產驗證,實現的數據類型暫時只寫幾個,其餘的需要自行拓展

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※教你寫出一流的銷售文案?

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※回頭車貨運收費標準

    ※別再煩惱如何寫文案,掌握八大原則!

    ※超省錢租車方案

    ※產品缺大量曝光嗎?你需要的是一流包裝設計!

    聚甘新

  • 學習ASP.NET Core(11)-解決跨域問題與程序部署

    學習ASP.NET Core(11)-解決跨域問題與程序部署

    上一篇我們介紹了系統日誌與測試相關的內容並添加了相關的功能;本章我們將介紹跨域與程序部署相關的內容

    一、跨域

    1、跨域的概念

    1、什麼是跨域?

    一個請求的URL由協議,域名,端口號組成,以百度的https://www.baidu.com為例,協議為https,域名由子域名www和主域名baidu組成,端口號若為80會自動隱藏(也可以配置為其它端口,通過代理服務器將80端口請求轉發給實際的端口號)。而當請求的URL的協議,域名,端口號任意一個於當前頁面的URL不同即為跨域

    2、什麼是同源策略?

    瀏覽器存在一個同源策略,即為了防範跨站腳本的攻擊,出現跨域請求時瀏覽器會限制自身不能執行其它網站的腳本(如JavaScript)。所以說當我們把項目部署到Web服務器后,通過瀏覽器進行請求時就會出現同源策略問題;而像PostMan軟件因其是客戶端形式的,所以不存在此類問題

    3、跨域會導致什麼問題?

    同源策略會限制以下行為:

    • Cookie、LocalStorage和IndexDb的讀取
    • DOM和JS對象的獲取
    • Ajax請求的發送

    2、常用的解決方法

    這裏我們將簡單介紹針對跨域問題常用的幾種解決辦法,並就其中的Cors方法進行配置,若對其它方式感興趣,可參照老張的哲學的文章,⅖ 種方法實現完美跨域

    2.1、JsonP

    1、原理

    上面有提到瀏覽器基於其同源策略會限制部分行為,但對於Script標籤是沒有限制的,而JsonP就是基於這一點,它會在頁面種動態的插入Script標籤,其Src屬性對應的就是api接口的地址,前端會以Get方式將處理函數以回調的形式傳遞給後端,後端響應後會再以回調的方式傳遞給前端,最終頁面得以显示

    2、優缺點

    JsonP出現時間較早,所以對舊版本瀏覽器支持性較好;但自身只支持Get請求,無法確認請求是否成功

    2.2、 CORS

    1、原理

    CORS的全稱是Corss Origin Resource Sharing,即跨域資源共享,它允許將當前域下的資源被其它域的腳本請求訪問。其實現原理就是在響應的head中添加Access-Control-Allow-Origin,只要有該字段就支持跨域請求

    2、優缺點

    Cors支持所有Http方法,不用考慮接口規則,使用簡單;但是對一些舊版本的瀏覽器支持性欠佳

    3、使用

    其使用非常簡單,以我們的項目為例,在BlogSystem.Core項目的Startup類的ConfigureServices方法中進行如下配置

    同時需要開啟使用中間件,如下:

    2.3、Nginx

    1、原理

    跨域問題是指在一個地址中發起另一個地址的請求,而Nginx可以利用其反向代理的功能,接受請求后直接請求該地址,類似打開了一個新的頁面,所以可以避開跨域的問題

    2、優缺點

    配置簡單,可以降低開發成本,方便配置負載均衡;靈活性差,每個環境都需要進行不同的配置

    二、程序部署

    1、部署模式

    在.NET Core中,有兩種部署模式,分別為FDD(Framework-dependent)框架依賴發布模式和SCD(Self-contained)自包含獨立發布模式

    • FDD:此類部署需要服務器安裝.NET Core SDK環境,部署的包容量會比較小,但可能因SDK版本存在兼容性問題;
    • SCD:此類部署自包含.NET Core SDK的環境,不同.NET Core版本可以共存,其部署包容量會較大,且需要對服務器進行相關配置

    2、常用部署方式

    以下內容均參考老張的哲學的文章最全的部署方案 & 最豐富的錯誤分析,有興趣的朋友可以參考原文

    2.1、Windows平台

    • 直接運行:發布目標選擇windows時會在文件夾中生成一個exe文件,我們可以直接執行exe文件或使用CLI命令調用dll運行;這種方式雖然方便,卻存在一些弊端,比如說部署多個的情況下會存在很多控制台窗口,如誤操作會導致窗口關閉等;
    • 部署服務:除了上述直接運行的方式外,我們還可以將程序發布為服務,發布后我們可以像控制系統服務一樣控製程序的啟動和關閉

    需要注意的是上述兩類方法都需要藉助IIS或者是代理服務器進行服務的轉發,否則只能在本地進行訪問;

    2.2、Linux平台

    Linux平台常用的部署方式即為程序+代理服務器,但是當我們配置完成后運行程序時,該運行命令會一直佔用操作窗口,所以我們需要使用“守護進程”來解決這個問題,簡單來說就是將程序放到後台運行,不影響我們進行其他操作

    綜上,部署模式、部署方式及部署平台有多種組合方式,接下來我們挑選下述3種方法進行演示:

    方案 依賴運行時/宿主機 依賴代理服務器 其它配置
    Windows程序(SCD)+Nginx
    Windows服務(FDD)+IIS 設置為服務
    Linux程序(FDD)+Nginx 守護進程

    3、程序發布

    1、這裏我們右擊BlogSystem.Core項目,選擇發布,選擇文件夾后,點擊高級

    2、為了演示後面的發布實例,這裏我們分別選擇3種組合模式,①獨立+win-x64;②框架依賴+win-x64;③框架依賴+linux-x64

    3、將發布實例拷貝到單獨的文件夾種,這裏我們使用SCD-Window驗證下程序能否直接運行,運行BlogSystem.Core.exe,報錯:

    原來還是老問題,BLL沒有添加到發布文件中,我們到項目的bin文件夾下將BLL和DAL的dll文件分別拷貝至3個文件夾,再次運行,出現404錯誤,經過確認發現,首頁對應的是Swagger文檔頁面,而在配置中間件時我們有添加開發環境才配置swagger的邏輯,所以這裏我們可以根據個人需求決定是否添加。

    這裏我為了方便確認發布是否成功,所以將其從判斷邏輯中取出了。重新生成發布文件,拷貝BLL和DAL的dll文件,再次運行,還是報錯。原來時Swagger的XML文件缺失,從bin文件夾下拷貝添加至發布文件,運行后成功显示頁面

    4、有的朋友會說了,每次都要拷貝這兩個dll和這兩個xml文件,太麻煩了。其實也是有對應的解決辦法的,我們可以使用dotnet的CLI命令進行發布,選擇引用的發布文件夾為bin文件夾,拷貝至發布文件夾即可,有興趣的朋友可以自行研究

    三、服務器發布

    這裏我用的是阿里雲服務器,Window系統版本是Window Server2012 R2,Linux系統版本是CentOS 8.0;在操作前記得確認拷貝的發布文件能否在本地正常運行

    1、Windows程序(SCD)+Nginx

    1、解壓后雙擊exe文件網站可以正常運行,如下:

    2、這個時候我們發現了一個問題,服務器上沒有數據庫,所以無法確認功能是否正常,這裏我們先下載安裝一個Microsoft SQL Server 2012 Express數據庫(建項目時沒有考慮到發布后測試的問題,實際上像SQLite數據庫是非常符合這類場景的)

    安裝完成后我們新建一個BlogSystem的數據庫,通過Sql文件的形式將數據庫結構和數據導入至服務器數據庫,這時候又發現一個問題,由於我們連接數據庫的邏輯放置在model層的BlogSystemContext文件夾下,所以需要將連接中的DataSource更改為Express數據庫,重新發布后覆蓋舊的發布文件(系統設計有缺陷,可以將EF上下文文件放在應用程序層或單獨一層),再次運行,成功執行查詢,如下:

    3、這個時候本地已經可以進行正常的訪問了,但是外部網絡是無法訪問調用接口的,這裏我們藉助Nginx進行服務的轉發。下載Nginx后解壓對conf文件夾下的nginx.conf文件進行如下配置:

    4、在nginx.exe文件所在目錄的文件路徑輸入cmd,鍵入nginx啟動服務訪問8081端口,成功显示頁面(確保core程序正常運行)如下:

    5、這個時候我們使用其它電腦訪問接口,發現還是無法訪問,經過查詢是阿里雲服務器進行了相關的限制,在阿里雲控制台配置安全組規則后即可正常訪問,如下:

    6、配置完成后運行,成功訪問該網站且功能正常。這類方法不需要藉助Core的運行時環境,可以說十分便捷

    2、Windows服務(FDD)+IIS

    1、首先我們將FDD發布文件壓縮后拷貝至Window Server主機,因FDD的部署方法需要藉助.NET Core運行時環境,所以這裏我們首先到官網https://dotnet.microsoft.com/download/dotnet-core/current/runtime下載安裝.NET Core運行時,這裏我們選擇的是右邊這個,安裝完需要重新啟動

    2、上一個方法中桌面显示控制台窗口顯然不是一個較佳的方案,所以這裏我們將其註冊為服務。官方提供了ASP.NET Core服務託管的方法,但使用較為複雜,這裏我們藉助一個名為nssm的工具來達到同樣的目的。我們下載nssm后,在其exe路徑運行cmd命令,執行nssm install,在彈出的窗口中進行如下配置:

    3、我們在系統服務中開啟BlogSytem.Core_Server,在控制面版中選擇安裝IIS服務,併發布對應的項目,安裝完成后,添加部署為8082端口,將應用程序池修改為無託管,如下:

    4、運行網站,成功显示頁面,但是進行功能試用時發現報錯;經過確認是由於IIS應用程序池的用戶驗證模式和sqlserver的驗證模式不同,解決辦法有三種①修改應用程序池高級設置中的進程模型中的標識②將連接數據庫字符串中的Integrated Security=True去除,並添加數據庫連接對應的賬號密碼③在數據庫的“安全性”>“登錄名”裏面,添加對應IIS程序池的名稱,並在這個用戶的“服務器角色”和“用戶映射”中給他對應的權限

    後續嘗試方案一失敗,嘗試方案二成功,方案三由於要安裝SSMS所以沒有嘗試,有遇到相同問題的朋友可以自己試下

    3、Linux程序(FDD)+Nginx

    1、首先我們使用MobaXterm工具登錄至Linux主機(選擇此工具是由於其)同時支持文件傳送和命令行操作),這裏使用的Linux版本是CentOS 8.0;藉助MobaXterm工具在home文件夾下創建WebSite文件夾,並在其內部創建BlogSystem文件夾,將我們準備好的FDD部署方式的發布文件上傳至此文件夾后,使用命令sudo dnf install dotnet-sdk-3.1安裝.net core sdk,如下圖

    2、輸入cd /home/WebSite/BlogSystem切換至項目文件夾后,使用dotnet BlogSystem.Core.dll運行程序,成功執行,但是由於我們沒有數據庫,且未配置代理服務器,所以無法驗證服務是否正常運行;所以這裏我們先參照微軟doc快速入門:在 Red Hat 上安裝 SQL Server 並創建數據庫安裝Sql Server數據庫(阿里雲默認安裝了python3作為解釋器所以無需重複安裝),安裝完成后我們開放在阿里雲實例中開放1433端口,使用可視化工具導入表結構和數據

    3、完成上述操作后我們需要配置守護進程,將程序放在後台運行。首先我們在/etc/systemd/system下新建守護進程文件,文件名以.service結尾,這裏我們新建名為BlogSystem.service文件,使用MobaXterm自帶的編輯器打開文件後進行如下配置,注意後面的中文備註需要去除否則會報錯

    [Unit]
    Description=BlogSystem    #服務描述,隨便填就好
    
    [Service]
    WorkingDirectory=/home/WebSite/BlogSystem/   #工作目錄,填你應用的絕對路徑
    ExecStart=/usr/bin/dotnet /home/WebSite/BlogSystem/BlogSystem.Core.dll    #啟動:前半截是你dotnet的位置(一般都在這個位置),後半部分是你程序入口的dll,中間用空格隔開
    Restart=always  
    RestartSec=25 #如果服務出現問題會在25秒后重啟,數值可自己設置
    SyslogIdentifier=BlogSystem  #設置日誌標識,此行可以沒有
    User=root   #配置服務用戶,越高越好
    Environment=ASPNETCORE_ENVIRONMENT=Production
    [Install]
    WantedBy=multi-user.target
    

    我們使用cd /etc/systemd/system/切換至BlogSystem.service對應的目錄,使用systemctl enable BlogSystem.service設置為開機運行后,再使用systemctl start BlogSystem.service啟動服務,另外可以使用systemctl status BlogSystem確認服務狀態

    4、接下來我們安裝代理Nginx代理默認的5000端口,使用sudo yum install nginx安裝nginx后,我們到\etc\nginx文件夾下打開nginx.conf文件進行如下配置:

    配置完成我們進入\etc\nginx文件夾下,使用systemctl enable nginx將nginx設置為開機啟動,並使用systemctl start nginx啟用服務,同樣可以使用systemctl status nginx確認其狀態。確認無誤后在阿里雲中開放8081端口,外網可正常訪問,但功能試用時報錯,原來是數據庫連接錯誤,重新設置后即可正常訪問

    本章完~

    本人知識點有限,若文中有錯誤的地方請及時指正,方便大家更好的學習和交流。

    本文部分內容參考了網絡上的視頻內容和文章,僅為學習和交流,地址如下:

    老張的哲學,系列一、ASP.NET Core 學習視頻教程

    solenovex,ASP.NET Core 3.x 入門視頻

    聲明

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※超省錢租車方案

    ※別再煩惱如何寫文案,掌握八大原則!

    ※回頭車貨運收費標準

    ※教你寫出一流的銷售文案?

    FB行銷專家,教你從零開始的技巧

    聚甘新

  • 文本挖掘之情感分析(一)

    一、文本挖掘  

         文本挖掘則是對文本進行處理,從中挖掘出來文本中有用的信息和關鍵的規則,在文本挖掘領域應用最往廣泛的是對文本進行分類和聚類,其挖掘的方法分為無監督學習和監督學習。文本挖掘還可以劃分為7大類:關鍵詞提取、文本摘要、文本主題模型、文本聚類、文本分類、觀點提取、情感分析。

       關鍵詞提取:對長文本的內容進行分析,輸出能夠反映文本關鍵信息的關鍵詞。

       文本摘要:許多文本挖掘應用程序需要總結文本文檔,以便對大型文檔或某一主題的文檔集合做出簡要概述。

       文本聚類:主要是對未標註的文本進行標註,常見的有 K均值聚類和層次聚類。

       文本分類:文本分類使用監督學習的方法,以對未知數據的分類進行預測的機器學習方法。

       文本主題模型 LDA:LDA(Latent Dirichlet Allocation)是一種文檔主題生成模型,也稱為一個三層貝恭弘=叶 恭弘斯概率模型,包含詞、主題和文檔三層結構,該模型可以用於獲取語料的主題提取和對不同類別的文檔進行分類。

       觀點抽取:對文本(主要針對評論)進行分析,抽取出核心觀點,並判斷極性(正負面),主要用於電商、美食、酒店、汽車等評論進行分析。

       情感分析:對文本進行情感傾向判斷,將文本情感分為正向、負向、中性。用於口碑分析、話題監控、輿情分析。

       因為自己的論文寫的是關於情感分析方面的內容,因此打算接下來主要寫情感分析系列的內容,今天主要寫關於情感分析的介紹以及發展史。

    二、情感分析

    1. 含義

         情感分析主要是通過分析人們對於服務、產品、事件、話題來挖掘出說話人/作者觀點、情感、情緒等的研究。情感分析按照研究內容的不同,可以分為:意見挖掘 / 意見提取 / 主觀性分析 / 情感傾向分析、情感情緒分析、情感打分等。情感傾向問題,即是指挖掘出一段語料中說話人/作者對於某一話題/事件所持有的態度,如褒義、貶義、中性、兩者兼有。情感情緒,則是將情感傾向進行更進一步的細化,依據“大連理工大學的情感詞彙本體庫”可知,可以將情感傾向可以細化為:“喜歡”、“憤怒”、“討厭”等具體的7個大類——21個小類別。情感打分,即根據情感態度對於某一事物進行評分,如淘寶系統的1-5分。 文本中的情感分析還可以分為:顯式情感、隱式情感,顯式情感是指包含明顯的情感詞語(如:高興、漂亮、討厭等),隱式情感則是指不包含情感詞語的情感文本,如:“這個杯子上面有一層灰”。由於隱式情感分析難度較大,因此目前的工作多集中在顯式情感分析領域。

         情感分析按照不同的分析對象,可以分為:文章級別的情感分析、句子級別的情感分析、詞彙級別的情感分析。按照不同的研究內容以及研究的粒度的不同,其研究情感分析的方法也有很大的變化。

         目前的情感分析研究可歸納為:情感資源構建、情感元素抽取、情感分類及情感分析應用系統;

         情感資源構建:情感資源一般來說有:情感詞典、情感語料庫。情感詞典的構建即是將現有的、整理好的情感詞典資源進行整合,比如中文情感詞典有:大連理工大學的情感詞彙本體庫、知網Hownet情感詞典、台灣大學的NTUSD簡體中文情感詞典等,根據不同的需求,應用這些情感詞典。情感語料庫,則是我們要分析的文本,如關於新聞的文本、微博評論文本、商品評論文本、電影評論文本等,這些語料的獲取可以是尋找已經整理好的數據,或者自己爬蟲獲取。推薦一個比較全的中文語料庫網站:中文NLP語料庫。

        情感元素抽取:情感元素抽取則是從語料中抽取出來能夠代表說話人/作者情感態度問題的詞彙,也稱為細粒度情感分析。語料中的評價對象和表達抽取是情感元素抽取的核心內容。評價對象是指語料中被討論的主題,比如對於商品評論來說,用戶常提起的“外觀”、“快遞”、“包裝”等方面;表達抽取主要針對顯式情感表達的文本,是指文本抽取出來能夠代表說話人/作者情感、情緒、意見等的詞彙,比如“漂亮”、“贊同”、“不贊同”等。一般來說,評價對象和表達抽取也可以作為相互獨立的兩個任務。一般來說,分析這兩者的方法有:基於規則、基於機器學習。對於評價對象來說,現如今使用最多的方法是利用主題模型中的LDA(Latent Dirichlet Allocation)模型進行分析;對於表達抽取則有:深度學習的方法、基於JST (Joint Sentiment/Topic )模型的方法等。

         情感分類:情感分類則是將文本分為一個具體的類別,比如情感傾向分析,則是將文檔分為:褒義、貶義、中性等。一般來說,進行情感分類的方法有,基於情感詞典、基於機器學習。基於情感詞典,最典型的方法則是基於知網Hownet情感詞典的So-Hownet指標進行情感分類,基於機器學習的方法則有監督學習方法、半監督學習方法等。

        針對於情感分析,現已經存在一些專有平台,如:基於Boson 數據的情感分析平台,基於產品評論的平台Google Shopping。情感分析除了在電商平台應用廣泛之外,情感分析技術還被引入到對話機器人領域。例如,微軟的“小冰”機器人 可以通過分析用戶的文本輸入和表情貼圖,理解用戶當前的情緒狀況,並據此回復文本或者語音等情感回應。部分研究機構還將情感分析技術融入實體機器人中。

     2. 發展史

         V. H. 和 K. R. McKeown 於 1997 年發表的論文 [1],該論文使用對數線性回歸模型從大量語料庫中識別形容詞的正面或負面語義,同時藉助該模型對語料中出現的形容詞進行分類預測。

         Peter Turney在 2002年在論文 [2] 提出了一種無監督學習的算法,其可以很好的將語料中的詞語分類成正面情感詞和負面情感詞。

        2002 年 Bo Pang 等人在論文 [3] 中使用了傳統的機器學習方法對電影評論數據進行分類,同時也驗證了機器學習的方法的確要比之前基於規則的方法要優。

        2003 年 Blei 等人在論文 [4] 中提出了 LDA(Latent Dirichlet Allocation)模型,在之後的情感分析領域的工作中,很多學者/研究人員都使用主題模型來進行情感分析,當然也不只是基於主題模型來進行情感分析研究,還有很多利用深度學習方法來進行情感分析的研究。

       Lin 和 He在 2009 年的論文 [5] 提出了一種基於主題模型的模型 —JST(Joint Sentiment/Topic),其有效的將情感加入到了經典的主題模型當中,因此利用該模型可以獲取到不同情感極性標籤下不同主題的分佈情況。傳統的主題模型獲取文檔的主題以及詞的分佈情況,但並沒有關注到情感的存在,因此基於該模型可以對文檔的情感傾向進行分析。利用 JST 模型可以有效的直接將語料的主題、情感信息挖掘出來,同時 JST 模型還考慮到了主題、文檔、情感、詞之間的聯繫。

        基於對於語義和句法的考慮,Jo 和 H.OH 在 2011 年提出了 ASUM(Aspect and Sentiment Unification Model)模型,該模型和 JST 模型很相似都是四層的貝恭弘=叶 恭弘斯網絡結構 [6] 。

        基於神經網絡的語義組合算法被驗證是一種非常有效的特徵學習手段,2013年,Richard Socher和Christopher Potts等人提出多個基於樹結構的Recursive Neural Network,該方法通過迭代運算的方式學習變量長度的句子或短語的語義表示,在斯坦福情感分析樹庫(Stanford Sentiment Treebank)上驗證了該方法的有效性 [7]。Nal Kalchbrenner等人描述了一個卷積體繫結構,稱為動態卷積神經網絡(DCNN),他們採用它來進行句子的語義建模。 該網絡使用動態k-Max池,這是一種線性序列的全局池操作。 該網絡處理不同長度的輸入句子,並在句子上引入能夠明確捕獲短程和長程關係的特徵圖。 網絡不依賴於解析樹,並且很容易適用於任何語言。該模型在句子級情感分類任務上取得了非常出色的效果[8]。2015年,Kai Sheng Tai,Richard Socher, Christopher D. Manning在序列化的LSTM (Long Short-Term Memory)模型的基礎上加入了句法結構的因素,該方法在句法分析的結果上進行語義組合,在句子級情感分類和文本蘊含任務(text entailment)上都取得了很好的效果[9]。

       2016年,Qiao Qian, Xiaoyan Zhu等人在LSTM和Bi-LSTM模型的基礎上加入四種規則約束,這四種規則分別是: Non-Sentiment Regularizer,Sentiment Regularizer, Negation Regularizer, Intensity Regularizer,利用語言資源和神經網絡相結合來提升情感分類問題的精度。

      除了上面的一些研究,關於情感分析領域的應用仍然有很多,比如:2015 年鄭祥雲等人通過主題模型提取出來圖書館用戶的主題信息,最後利用這些信息來進行個性化圖書的有效推薦。將 JST 模型中直接引入了情感詞典作為外部先驗知識來對新聞文本進行分析,獲取其中的主旨句,並對主旨句進行情感打分,同時利用情感主旨句來代替全文,這樣能夠使得用戶更有效、更快速的閱讀文章。

      總的來說,情感分析在很多的領域被應用,當然情感分析也有很多的局限性,就是過多的依賴於語料庫信息,同時還需要使用自然語言、人工智能的方法來能夠最大化的挖掘出來其中的信息。

     

     參考文獻:

     [1] :Predicting the semantic orientation of adjectives

     [2]:  Thumbs up or thumbs down? Semantic orientation applied to unsupervised classification of reviews 

     [3] : Thumbs up? Sentiment Classification using Machine Learning Techniques

     [4] :   Latent Dirichlet Allocation

     [5] : Joint sentiment/topic model for sentiment analysis

     [6] : Aspect and sentiment unification model for online review analysis

     [7] : Recursive Deep Models for Semantic Compositionality Over a Sentiment Treebank

     [8]:  A Convolutional Neural Network for Modelling Sentences

     [9]: Improved Semantic Representations From Tree-Structured Long Short-Term Memory Networks

     [10] : Linguistically Regularized LSTMs for Sentiment Classification

     

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

    聚甘新

  • 環境篇:Kylin3.0.1集成CDH6.2.0

    環境篇:Kylin3.0.1集成CDH6.2.0

    環境篇:Kylin3.0.1集成CDH6.2.0

    Kylin是什麼?

    Apache Kylin™是一個開源的、分佈式的分析型數據倉庫,提供Hadoop/Spark 之上的 SQL 查詢接口及多維分析(OLAP)能力以支持超大規模數據,最初由 eBay 開發並貢獻至開源社區。它能在亞秒內查詢巨大的表。

    Apache Kylin™ 令使用者僅需三步,即可實現超大數據集上的亞秒級查詢。

    1. 定義數據集上的一個星形或雪花形模型
    2. 在定義的數據表上構建cube
    3. 使用標準 SQL 通過 ODBC、JDBC 或 RESTFUL API 進行查詢,僅需亞秒級響應時間即可獲得查詢結果

    如果沒有Kylin

    大數據在數據積累后,需要計算,而數據越多,算力越差,內存需求也越高,詢時間與數據量成線性增長,而這些對於Kylin影響不大,大數據中硬盤往往比內存要更便宜,Kylin通過與計算的形式,以空間換時間,亞秒級的響應讓人們愛不釋手。

    注:所謂詢時間與數據量成線性增長:假設查詢 1 億條記錄耗時 1 分鐘,那麼查詢 10 億條記錄就需 10分鐘,100 億條記錄就至少需要 1 小時 40 分鐘。

    http://kylin.apache.org/cn/

    1 Kylin架構

    Kylin 提供與多種數據可視化工具的整合能力,如 Tableau,PowerBI 等,令用戶可以使用 BI 工具對 Hadoop 數據進行分析

    1. REST Server REST Server

    是一套面嚮應用程序開發的入口點,旨在實現針對 Kylin 平台的應用開發 工作。 此類應用程序可以提供查詢、獲取結果、觸發 cube 構建任務、獲取元數據以及獲取 用戶權限等等。另外可以通過 Restful 接口實現 SQL 查詢。

    1. 查詢引擎(Query Engine)

    當 cube 準備就緒后,查詢引擎就能夠獲取並解析用戶查詢。它隨後會與系統中的其它 組件進行交互,從而向用戶返回對應的結果。

    1. 路由器(Routing)

    在最初設計時曾考慮過將 Kylin 不能執行的查詢引導去 Hive 中繼續執行,但在實踐后 發現 Hive 與 Kylin 的速度差異過大,導致用戶無法對查詢的速度有一致的期望,很可能大 多數查詢幾秒內就返回結果了,而有些查詢則要等幾分鐘到幾十分鐘,因此體驗非常糟糕。 最後這個路由功能在發行版中默認關閉。

    1. 元數據管理工具(Metadata)

    Kylin 是一款元數據驅動型應用程序。元數據管理工具是一大關鍵性組件,用於對保存 在 Kylin 當中的所有元數據進行管理,其中包括最為重要的 cube 元數據。其它全部組件的 正常運作都需以元數據管理工具為基礎。 Kylin 的元數據存儲在 hbase 中。

    1. 任務引擎(Cube Build Engine)

    這套引擎的設計目的在於處理所有離線任務,其中包括 shell 腳本、Java API 以及 MapReduce 任務等等。任務引擎對 Kylin 當中的全部任務加以管理與協調,從而確保每一項任務 都能得到切實執行並解決其間出現的故障。

    2 Kylin軟硬件要求

    • 軟件要求
      • Hadoop: 2.7+, 3.1+ (since v2.5)
      • Hive: 0.13 – 1.2.1+
      • HBase: 1.1+, 2.0 (since v2.5)
      • Spark (optional) 2.3.0+
      • Kafka (optional) 1.0.0+ (since v2.5)
      • JDK: 1.8+ (since v2.5)
      • OS: Linux only, CentOS 6.5+ or Ubuntu 16.0.4+
    • 硬件要求
      • 最低配置:4 core CPU, 16 GB memory
      • 高負載場景:24 core CPU, 64 GB memory

    3 Kylin單機安裝

    3.1 修改環境變量

    vim /etc/profile 
    #>>>注意地址指定為自己的
    #kylin
    export KYLIN_HOME=/usr/local/src/kylin/apache-kylin-3.0.1-bin-cdh60
    export PATH=$PATH:$KYLIN_HOME/bin
        
    #cdh
    export CDH_HOME=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373
    
    #hadoop
    export HADOOP_HOME=${CDH_HOME}/lib/hadoop
    export HADOOP_DIR=${HADOOP_HOME}
    export HADOOP_CLASSPATH=${HADOOP_HOME}
    export PATH=$PATH:$HADOOP_HOME/bin
    export PATH=$PATH:$HADOOP_HOME/sbin
        
    #hbase
    export HBASE_HOME=${CDH_HOME}/lib/hbase
    export PATH=$PATH:$HBASE_HOME/bin
        
     #hive
    export HIVE_HOME=${CDH_HOME}/lib/hive
    export PATH=$PATH:$HIVE_HOME/bin
        
    #spark
    export SPARK_HOME=${CDH_HOME}/lib/spark
    export PATH=$PATH:$SPARK_HOME/bin   
    
    #kafka
    export KAFKA_HOME=${CDH_HOME}/lib/kafka
    export PATH=$PATH:$KAFKA_HOME/bin 
    #<<<
    
    source /etc/profile 
    

    3.2 修改hdfs用戶權限

    usermod -s /bin/bash hdfs
    su hdfs
    hdfs dfs -mkdir /kylin
    hdfs dfs -chmod a+rwx /kylin
    su
    

    3.3 上傳安裝包解壓

    mkdir /usr/local/src/kylin
    cd /usr/local/src/kylin
    tar -zxvf apache-kylin-3.0.1-bin-cdh60.tar.gz
    cd /usr/local/src/kylin/apache-kylin-3.0.1-bin-cdh60
    

    3.4 Java兼容hbase

    • hbase 所有節點

    在CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar后添加

    >>---
    :/opt/cloudera/parcels/CDH/lib/hbase/lib/*
    <<---
    
    • Kylin節點添加jar包
    cp /opt/cloudera/cm/common_jars/commons-configuration-1.9.cf57559743f64f0b3a504aba449c9649.jar /usr/local/src/kylin/apache-kylin-3.0.1-bin-cdh60/tomcat/lib
    

    這2步不做會引起 Could not find or load main class org.apache.hadoop.hbase.util.GetJavaProperty

    3.5 啟動停止

    ./bin/kylin.sh start
    #停止  ./bin/kylin.sh stop
    

    3.6 web頁面

    訪問端口7070

    賬號密碼:ADMIN / KYLIN

    4 Kylin集群安裝

    4.1 修改環境變量

    vim /etc/profile 
    #>>>注意地址指定為自己的
    #kylin
    export KYLIN_HOME=/usr/local/src/kylin/apache-kylin-3.0.1-bin-cdh60
    export PATH=$PATH:$KYLIN_HOME/bin
        
    #cdh
    export CDH_HOME=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373
    
    #hadoop
    export HADOOP_HOME=${CDH_HOME}/lib/hadoop
    export HADOOP_DIR=${HADOOP_HOME}
    export HADOOP_CLASSPATH=${HADOOP_HOME}
    export PATH=$PATH:$HADOOP_HOME/bin
    export PATH=$PATH:$HADOOP_HOME/sbin
        
    #hbase
    export HBASE_HOME=${CDH_HOME}/lib/hbase
    export PATH=$PATH:$HBASE_HOME/bin
        
     #hive
    export HIVE_HOME=${CDH_HOME}/lib/hive
    export PATH=$PATH:$HIVE_HOME/bin
        
    #spark
    export SPARK_HOME=${CDH_HOME}/lib/spark
    export PATH=$PATH:$SPARK_HOME/bin   
    
    #kafka
    export KAFKA_HOME=${CDH_HOME}/lib/kafka
    export PATH=$PATH:$KAFKA_HOME/bin 
    #<<<
    
    source /etc/profile 
    

    4.2 修改hdfs用戶權限

    usermod -s /bin/bash hdfs
    su hdfs
    hdfs dfs -mkdir /kylin
    hdfs dfs -chmod a+rwx /kylin
    su
    

    4.3 上傳安裝包解壓

    mkdir /usr/local/src/kylin
    cd /usr/local/src/kylin
    tar -zxvf apache-kylin-3.0.1-bin-cdh60.tar.gz
    cd /usr/local/src/kylin/apache-kylin-3.0.1-bin-cdh60
    

    4.4 Java兼容hbase

    • hbase 所有節點

    在CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar后添加

    vim /opt/cloudera/parcels/CDH/lib/hbase/bin/hbase
    >>---
    :/opt/cloudera/parcels/CDH/lib/hbase/lib/*
    <<---
    
    • Kylin節點添加jar包
    cp /opt/cloudera/cm/common_jars/commons-configuration-1.9.cf57559743f64f0b3a504aba449c9649.jar /usr/local/src/kylin/apache-kylin-3.0.1-bin-cdh60/tomcat/lib
    

    這2步不做會引起 Could not find or load main class org.apache.hadoop.hbase.util.GetJavaProperty

    4.5 修改kylin配置文件

    Kylin根據自己的運行職責狀態,可以劃分為以下三大類角色

    • Job節點:僅用於任務調度,不用於查詢
    • Query節點:僅用於查詢,不用於構建任務的調度
    • All節點:模式代表該服務同時用於任務調度和 SQL 查詢
      • 2.0以前同一個集群只能有一個節點(Kylin實例)用於job調度(all或者job模式的只能有一個實例)
      • 2.0開始可以多個job或者all節點實現HA
    vim conf/kylin.properties
    >>----
    #指定元數據庫路徑,默認值為 kylin_metadata@hbase,確保kylin集群使用一致
    kylin.metadata.url=kylin_metadata@hbase
    #指定 Kylin 服務所用的 HDFS 路徑,默認值為 /kylin,請確保啟動 Kylin 實例的用戶有讀寫該目錄的權限
    kylin.env.hdfs-working-dir=/kylin
    kylin.server.mode=all
    kylin.server.cluster-servers=cdh01.cm:7070,cdh02.cm:7070,cdh03.cm:7070
    kylin.storage.url=hbase
    #構建任務失敗后的重試次數,默認值為 0
    kylin.job.retry=2
    #最大構建併發數,默認值為 10
    kylin.job.max-concurrent-jobs=10
    #構建引擎間隔多久檢查 Hadoop 任務的狀態,默認值為 10(s)
    kylin.engine.mr.yarn-check-interval-seconds=10
    #MapReduce 任務啟動前會依據輸入預估 Reducer 接收數據的總量,再除以該參數得出 Reducer 的數目,默認值為 500(MB)
    kylin.engine.mr.reduce-input-mb=500
    #MapReduce 任務中 Reducer 數目的最大值,默認值為 500
    kylin.engine.mr.max-reducer-number=500
    #每個 Mapper 可以處理的行數,默認值為 1000000,如果將這個值調小,會起更多的 Mapper
    kylin.engine.mr.mapper-input-rows=1000000
    #啟用分佈式任務鎖
    kylin.job.scheduler.default=2
    kylin.job.lock=org.apache.kylin.storage.hbase.util.ZookeeperJobLock
    <<----
    

    4.6 啟動停止

    所有Kylin節點

    ./bin/kylin.sh start
    #停止  ./bin/kylin.sh stop
    

    4.7 nginx負載均衡

    yum -y install nginx
    
    vim /etc/nginx/nginx.conf
    >>---http中添加替換內容
    upstream kylin {
            least_conn;
            server 192.168.37.10:7070 weight=8;
            server 192.168.37.11:7070 weight=7;
            server 192.168.37.12:7070 weight=7;
    	}
        server {
            listen       9090;
            server_name  localhost;
    
            location / {
                    proxy_pass http://kylin;
            }
        }
    
    <<---
    
    #重啟 nginx 服務
    systemctl restart nginx  
    

    4.8 訪問web頁面

    訪問任何節點的7070端口都可以進入kylin

    訪問nginx所在機器9090端口/kylin負載均衡進入kylin

    賬號密碼:ADMIN / KYLIN

    4 大規模并行處理@列式存儲

    自從 10 年前 Hadoop 誕生以來,大數據的存儲和批處理問題均得到了妥善解決,而如何高速地分析數據也就成為了下一個挑戰。於是各式各樣的“SQL on Hadoop”技術應運而生,其中以 Hive 為代表,Impala、Presto、Phoenix、Drill、 SparkSQL 等緊隨其後(何以解憂–唯有CV SQL BOY)。它們的主要技術是“大規模并行處理”(Massive Parallel Processing,MPP)和“列式存儲”(Columnar Storage)

    大規模并行處理可以調動多台機器一起進行并行計算,用線性增加的資源來換取計算時間的線性下降

    列式存儲則將記錄按列存放,這樣做不僅可以在訪問時只讀取需要的列,還可以利用存儲設備擅長連續讀取的特點,大大提高讀取的速率。

    這兩項關鍵技術使得 Hadoop 上的 SQL 查詢速度從小時提高到了分鐘。 然而分鐘級別的查詢響應仍然離交互式分析的現實需求還很遠。分析師敲入 查詢指令,按下回車,還需要去倒杯咖啡,靜靜地等待查詢結果。得到結果之後才能根據情況調整查詢,再做下一輪分析。如此反覆,一個具體的場景分析常常需要幾小時甚至幾天才能完成,效率低下。 這是因為大規模并行處理和列式存儲雖然提高了計算和存儲的速度,但並沒有改變查詢問題本身的時間複雜度,也沒有改變查詢時間與數據量成線性增長的關係這一事實。

    假設查詢 1 億條記錄耗時 1 分鐘,那麼查詢 10 億條記錄就需 10分鐘,100 億條記錄就至少需要 1 小時 40 分鐘。 當然,可以用很多的優化技術縮短查詢的時間,比如更快的存儲、更高效的壓縮算法,等等,但總體來說,查詢性能與數據量呈線性相關這一點是無法改變的。雖然大規模并行處理允許十倍或百倍地擴張計算集群,以期望保持分鐘級別的查詢速度,但購買和部署十倍或百倍的計算集群又怎能輕易做到,更何況還有 高昂的硬件運維成本。 另外,對於分析師來說,完備的、經過驗證的數據模型比分析性能更加重要, 直接訪問紛繁複雜的原始數據並進行相關分析其實並不是很友好的體驗,特別是在超大規模的數據集上,分析師將更多的精力花在了等待查詢結果上,而不是在更加重要的建立領域模型上

    5 Kylin如何解決海量數據的查詢問題

    **Apache Kylin 的初衷就是要解決千億條、萬億條記錄的秒級查詢問題,其中的關鍵就是要打破查詢時間隨着數據量成線性增長的這個規律。根據OLAP分析,可以注意到兩個結論: **

    • 大數據查詢要的一般是統計結果,是多條記錄經過聚合函數計算后的統計值。原始的記錄則不是必需的,或者訪問頻率和概率都極低。

    • 聚合是按維度進行的,由於業務範圍和分析需求是有限的,有意義的維度聚合組合也是相對有限的,一般不會隨着數據的膨脹而增長。

    **基於以上兩點,我們可以得到一個新的思路——“預計算”。應盡量多地預先計算聚合結果,在查詢時刻應盡量使用預算的結果得出查詢結果,從而避免直 接掃描可能無限增長的原始記錄。 **

    舉例來說,使用如下的 SQL 來查詢 11月 11日 那天銷量最高的商品:

    select item,sum(sell_amount)
    from sell_details
    where sell_date='2020-11-11'
    group by item
    order by sum(sell_amount) desc
    

    用傳統的方法時需要掃描所有的記錄,再找到 11月 11日 的銷售記錄,然後按商品聚合銷售額,最後排序返回。

    假如 11月 11日 有 1 億條交易,那麼查詢必須讀取並累計至少 1 億條記錄,且這個查詢速度會隨將來銷量的增加而逐步下降。如果日交易量提高一倍到 2 億,那麼查詢執行的時間可能也會增加一倍。

    而使用預 計算的方法則會事先按維度 [sell_date , item] 計 算 sum(sell_amount)並存儲下來,在查詢時找到 11月 11日 的銷售商品就可以直接排序返回了。讀取的記錄數最大不會超過維度[sell_date,item]的組合數。

    顯然這個数字將遠遠小於實際的銷售記錄,比如 11月 11日 的 1 億條交易包含了 100萬條商品,那麼預計算后就只有 100 萬條記錄了,是原來的百分之一。並且這些 記錄已經是按商品聚合的結果,因此又省去了運行時的聚合運算。從未來的發展來看,查詢速度只會隨日期和商品數目(時間,商品維度)的增長而變化,與銷售記錄的總數不再有直接聯繫。假如日交易量提高一倍到 2 億,但只要商品的總數不變,那麼預計算的結果記錄總數就不會變,查詢的速度也不會變。

    預計算就是 Kylin 在“大規模并行處理”和“列式存儲”之外,提供給大數據分析的第三個關鍵技術。

    6 Kylin 入門案例

    6.1 hive數據準備

    --創建數據庫kylin_hive
    create database kylin_hive; 
    
    --創建表部門表dept
    create external table if not exists kylin_hive.dept(
    deptno int,
    dname string,
    loc int )
    row format delimited fields terminated by '\t';
    --添加數據
    INSERT INTO TABLE kylin_hive.dept VALUES(10,"ACCOUNTING",1700),(20,"RESEARCH",1800),(30,"SALES",1900),(40,"OPERATIONS",1700)
    --查看數據
    SELECT * FROM kylin_hive.dept
    
    --創建員工表emp
    create external table if not exists kylin_hive.emp(
    empno int,
    ename string,
    job string,
    mgr int,
    hiredate string, 
    sal double, 
    comm double,
    deptno int)
    row format delimited fields terminated by '\t';
    
    --添加數據
    INSERT INTO TABLE kylin_hive.emp VALUES(7369,"SMITHC","LERK",7902,"1980-12-17",800.00,0.00,20),(7499,"ALLENS","ALESMAN",7698,"1981-2-20",1600.00,300.00,30),(7521,"WARDSA","LESMAN",7698,"1981-2-22",1250.00,500.00,30),(7566,"JONESM","ANAGER",7839,"1981-4-2",2975.00,0.00,20),(7654,"MARTIN","SALESMAN",7698,"1981-9-28",1250.00,1400.00,30),(7698,"BLAKEM","ANAGER",7839,"1981-5-1",2850.00,0.00,30),(7782,"CLARKM","ANAGER",7839,"1981-6-9",2450.00,0.00,10),(7788,"SCOTTA","NALYST",7566,"1987-4-19",3000.00,0.00,20),(7839,"KINGPR","ESIDENT",7533,"1981-11-17",5000.00,0.00,10),(7844,"TURNER","SALESMAN",7698,"1981-9-8",1500.00,0.00,30),(7876,"ADAMSC","LERK",7788,"1987-5-23",1100.00,0.00,20),(7900,"JAMESC","LERK",7698,"1981-12-3",950.00,0.00,30),(7902,"FORDAN","ALYST",7566,"1981-12-3",3000.00,0.00,20),(7934,"MILLER","CLERK",7782,"1982-1-23",1300.00,0.00,10)
    --查看數據
    SELECT * FROM kylin_hive.emp
    

    6.2 創建工程

    • 輸入工程名稱以及工程描述

    6.3 Kylin加載Hive表

    雖然 Kylin 使用 SQL 作為查詢接口並利用 Hive 元數據,Kylin 不會讓用戶查詢所有的 hive 表,因為到目前為止它是一個預構建 OLAP(MOLAP) 系統。為了使表在 Kylin 中可用,使用 “Sync” 方法能夠方便地從 Hive 中同步表。

    • 選擇項目添加hive數據源
    • 添加數據源表–>hive庫名稱.表名稱(以逗號分隔)

    • 這裏只添加了表的Schema元信息,如果需要加載數據,還需要點擊Reload Table

    6.4 Kylin添加Models(模型)

    • 填寫模型名字
    • 選擇事實表,這裏選擇員工EMP表為事實表
    • 添加維度表,這裏選擇部門DEPT表為維度表,並選擇我們的join方式,以及join連接字段

    • 選擇聚合維度信息
    • 選擇度量信息
    • 添加分區信息及過濾條件之後“Save”

    6.5 Kylin構建Cube

    Kylin 的 OLAP Cube 是從星型模式的 Hive 表中獲取的預計算數據集,這是供用戶探索、管理所有 cube 的網頁管理頁面。由菜單欄進入Model 頁面,系統中所有可用的 cube 將被列出。

    • 創建一個new cube
    • 選擇我們的model以及指定cube name
    • 添加我們的自定義維度,這裡是在創建Models模型時指定的事實表和維度表中取
      • LookUpTable可選擇normal或derived(一般列、衍生列)
      • normal緯度作為普通獨立的緯度,而derived 維度不會計算入cube,將由事實表的外鍵推算出

    • 添加統計維度,勾選相應列作為度量,kylin提供8種度量:SUM、MAX、MIN、COUNT、COUNT_DISTINCT、TOP_N、EXTENDED_COLUMN、PERCENTILE
      • DISTINCT_COUNT有兩個實現:
        1. 近似實現 HyperLogLog,選擇可接受的錯誤率,低錯誤率需要更多存儲;
        2. 精確實現 bitmap
      • TopN 度量在每個維度結合時預計算,需要兩個參數:
        1. 一是被用來作為 Top 記錄的度量列,Kylin 將計算它的 SUM 值並做倒序排列,如sum(price)
        2. 二是 literal ID,代表最 Top 的記錄,如seller_id
      • EXTENDED_COLUMN
        • Extended_Column 作為度量比作為維度更節省空間。一列和零一列可以生成新的列
      • PERCENTILE
        • Percentile 代表了百分比。值越大,錯誤就越少。100為最合適的值

    • 設置多個分區cube合併信息

    如果是分區統計,需要關於歷史cube的合併,

    這裡是全量統計,不涉及多個分區cube進行合併,所以不用設置歷史多個cube進行合併

    • Auto Merge Thresholds:

      • 自動合併小的 segments 到中等甚至更大的 segment。如果不想自動合併,刪除默認2個選項
    • Volatile Range:

      • 默認為0,會自動合併所有可能的cube segments,或者用 ‘Auto Merge’ 將不會合併最新的 [Volatile Range] 天的 cube segments
    • Retention Threshold:

      • 默認為0,只會保存 cube 過去幾天的 segment,舊的 segment 將會自動從頭部刪除
    • Partition Start Date:

      • cube 的開始日期
    • 高級設置

    暫時也不做任何設

    置高級設定關係到立方體是否足夠優化,可根據實際情況將維度列定義為強制維度、層級維度、聯合維度

    • Mandatory維度指的是總會存在於group by或where中的維度
    • Hierarchy是一組有層級關係的維度,如國家、省份、城市
    • Joint是將多個維度組合成一個維度

    • 額外的其他的配置屬性

    這裏也暫時不做配置

    Kylin 允許在 Cube 級別覆蓋部分 kylin.properties 中的配置

    • 完成保存配置

    通過Planner計劃者,可以看到4個維度,得到Cuboid Conut=15,為2的4次方-1,因為全部沒有的指標不會使用,所以結果等於15。

    • 構建Cube

    6.6 數據查詢

    • 根據部門查詢,部門工資總和
    SELECT  DEPT.DNAME,SUM(EMP.SAL) 
    FROM EMP 
    LEFT JOIN DEPT 
    ON DEPT.DEPTNO = EMP.DEPTNO  
    GROUP BY DEPT.DNAME
    

    7 入門案例構建流程

    • 動畫演示

    8 Kylin的工作原理

    就是對數據模型做 Cube 預計算,並利用計算的結果加速查詢,具體工作過程如下:

    1. 指定數據模型,定義維度和度量。

    2. 預計算 Cube,計算所有 Cuboid 並保存為物化視圖。

    3. 執行查詢時,讀取 Cuboid,運算,產生查詢結果。

    由於 Kylin 的查詢過程不會掃描原始記錄,而是通過預計算預先完成表的關聯、聚合等複雜運算,並利用預計算的結果來執行查詢,因此相比非預計算的查詢技術,其速度一般要快一到兩個數量級,並且這點在超大的數據集上優勢更明顯。當數據集達到千億乃至萬億級別時,Kylin 的速度甚至可以超越其他非預計算技術 1000 倍以上。

    9 Cube 和 Cuboid

    Cube(或 Data Cube),即數據立方體,是一種常用於數據分析與索引的技術;它可以對原始數據建立多維度索引。通過 Cube 對數據進行分析,可以大大加快數據的查詢效率。

    Cuboid 特指在某一種維度組合下所計算的數據。 給定一個數據模型,我們可以對其上的所有維度進行組合。對於 N 個維度來說,組合的所有可能性共有 2 的 N 次方種。對於每一種維度的組合,將度量做 聚合運算,然後將運算的結果保存為一個物化視圖,稱為 Cuboid。

    所有維度組合的 Cuboid 作為一個整體,被稱為 Cube。所以簡單來說,一個 Cube 就是許多按維度聚合的物化視圖的集合。

    下面來列舉一個具體的例子:

    假定有一個電商的銷售數據集,其中維度包括 時間(Time)、商品(Item)、地點(Location)和供應商(Supplier),度量為銷售額(GMV)。

    • 那麼所有維度的組合就有 2 的 4 次方 =16 種
      • 一維度(1D) 的組合有[Time]、[Item]、[Location]、[Supplier]4 種
      • 二維度(2D)的組合 有[Time,Item]、[Time,Location]、[Time、Supplier]、[Item,Location]、 [Item,Supplier]、[Location,Supplier]6 種
      • 三維度(3D)的組合也有 4 種
      • 零維度(0D)的組合有 1 種
      • 四維度(4D)的組合有 1 種

    10 cube構建算法

    10.1 逐層構建算法

    我們知道,一個N維的Cube,是由1個N維子立方體、N個(N-1)維子立方體、N*(N-1)/2個(N-2)維子立方體、……、N個1維子立方體和1個0維子立方體構成,總共有2^N個子立方體組成。

    在逐層算法中,按維度數逐層減少來計算,每個層級的計算(除了第一層,它是從原始數據聚合而來),是基於它上一層級的結果來計算的。比如,[Group by A, B]的結果,可以基於[Group by A, B, C]的結果,通過去掉C后聚合得來的;這樣可以減少重複計算;當 0維度Cuboid計算出來的時候,整個Cube的計算也就完成了。

    每一輪的計算都是一個MapReduce任務,且串行執行;一個N維的Cube,至少需要N次MapReduce Job。

    算法優點:

    1. 此算法充分利用了MapReduce的優點,處理了中間複雜的排序和shuffle工作,故而算法代碼清晰簡單,易於維護;

    2. 受益於Hadoop的日趨成熟,此算法非常穩定,即便是集群資源緊張時,也能保證最終能夠完成。

    算法缺點:

    1. 當Cube有比較多維度的時候,所需要的MapReduce任務也相應增加;由於Hadoop的任務調度需要耗費額外資源,特別是集群較龐大的時候,反覆遞交任務造成的額外開銷會相當可觀;

    2. 由於Mapper邏輯中並未進行聚合操作,所以每輪MR的shuffle工作量都很大,導致效率低下。

    3. 對HDFS的讀寫操作較多:由於每一層計算的輸出會用做下一層計算的輸入,這些Key-Value需要寫到HDFS上;當所有計算都完成后,Kylin還需要額外的一輪任務將這些文件轉成HBase的HFile格式,以導入到HBase中去;

    總體而言,該算法的效率較低,尤其是當Cube維度數較大的時候。

    10.2 快速構建算法

    也被稱作“逐段”(By Segment) 或“逐塊”(By Split) 算法,從1.5.x開始引入該算法,該算法的主要思想是,每個Mapper將其所分配到的數據塊,計算成一個完整的小Cube 段(包含所有Cuboid)。每個Mapper將計算完的Cube段輸出給Reducer做合併,生成大Cube,也就是最終結果。如圖所示解釋了此流程。

    與舊的逐層構建算法相比,快速算法主要有兩點不同:

    1. Mapper會利用內存做預聚合,算出所有組合;Mapper輸出的每個Key都是不同的,這樣會減少輸出到Hadoop MapReduce的數據量,Combiner也不再需要;

    2. 一輪MapReduce便會完成所有層次的計算,減少Hadoop任務的調配。

    11 備份及恢復

    Kylin將它全部的元數據(包括cube描述和實例、項目、倒排索引描述和實例、任務、表和字典)組織成層級文件系統的形式。然而,Kylin使用hbase來存儲元數據,而不是一個普通的文件系統。如果你查看過Kylin的配置文件(kylin.properties),你會發現這樣一行:

    ## The metadata store in hbase
    kylin.metadata.url=kylin_metadata@hbase
    

    這表明元數據會被保存在一個叫作“kylin_metadata”的htable里。你可以在hbase shell里scan該htbale來獲取它。

    11.1 使用二進制包來備份Metadata Store

    有時你需要將Kylin的Metadata Store從hbase備份到磁盤文件系統。在這種情況下,假設你在部署Kylin的hadoop命令行(或沙盒)里,你可以到KYLIN_HOME並運行:

    ./bin/metastore.sh backup
    

    來將你的元數據導出到本地目錄,這個目錄在KYLIN_HOME/metadata_backps下,它的命名規則使用了當前時間作為參數:KYLIN_HOME/meta_backups/meta_year_month_day_hour_minute_second,如:meta_backups/meta_2020_06_18_19_37_49/

    11.2 使用二進制包來恢復Metatdara Store

    萬一你發現你的元數據被搞得一團糟,想要恢復先前的備份:

    1. 首先,重置Metatdara Store(這個會清理Kylin在hbase的Metadata Store的所有信息,請確保先備份):
    ./bin/metastore.sh reset
    
    1. 然後上傳備份的元數據到Kylin的Metadata Store:
    ./bin/metastore.sh restore $KYLIN_HOME/meta_backups/meta_xxxx_xx_xx_xx_xx_xx
    
    1. 等恢復操作完成,可以在“Web UI”的“System”頁面單擊“Reload Metadata”按鈕對元數據緩存進行刷新,即可看到最新的元數據

    做完備份,刪除一些文件,然後進行恢複測試,完美恢復,叮叮叮!

    12 kylin的垃圾清理

    Kylin在構建cube期間會在HDFS上生成中間文件;除此之外,當清理/刪除/合併cube時,一些HBase表可能被遺留在HBase卻以後再也不會被查詢;雖然Kylin已經開始做自動化的垃圾回收,但不一定能覆蓋到所有的情況;你可以定期做離線的存儲清理:

    1. 檢查哪些資源可以清理,這一步不會刪除任何東西:
    ${KYLIN_HOME}/bin/kylin.sh org.apache.kylin.tool.StorageCleanupJob --delete false
    
    1. 你可以抽查一兩個資源來檢查它們是否已經沒有被引用了;然後加上“–delete true”選項進行清理。
    ${KYLIN_HOME}/bin/kylin.sh org.apache.kylin.tool.StorageCleanupJob --delete true
    

    完成后,中間HDFS上的中間文件和HTable會被移除。

    13 Kylin優化

    13.1 維度優化

    如果不進行任何維度優化,直接將所有的維度放在一個聚集組裡,Kylin就會計算所有的維度組合(cuboid)。

    比如,有12個維度,Kylin就會計算2的12次方即4096個cuboid,實際上查詢可能用到的cuboid不到1000個,甚至更少。 如果對維度不進行優化,會造成集群計算和存儲資源的浪費,也會影響cube的build時間和查詢性能,所以我們需要進行cube的維度優化。

    當你在保存cube時遇到下面的異常信息時,意味1個聚集組的維度組合數已經大於 4096 ,你就必須進行維度優化了。

    或者發現cube的膨脹率過大。

    但在現實情況中,用戶的維度數量一般遠遠大於4個。假設用戶有10 個維度,那麼沒有經過任何優化的Cube就會存在 2的10次方 = 1024個Cuboid;雖然每個Cuboid的大小存在很大的差異,但是單單想到Cuboid的數量就足以讓人想象到這樣的Cube對構建引擎、存儲引擎來說壓力有多麼巨大。因此,在構建維度數量較多的Cube時,尤其要注意Cube的剪枝優化(即減少Cuboid的生成)。

    13.2 使用衍生維度

    • 衍生維度:維表中可以由主鍵推導出值的列可以作為衍⽣維度。

    • 使用場景:以星型模型接入時。例如用戶維表可以從userid推導出用戶的姓名,年齡,性別。

    • 優化效果:維度表的N個維度組合成的cuboid個數會從2的N次方降為2。

    衍生維度用於在有效維度內將維度表上的非主鍵維度排除掉,並使用維度表的主鍵(其實是事實表上相應的外鍵)來替代它們。Kylin會在底層記錄維度表主鍵與維度表其他維度之間的映射關係,以便在查詢時能夠動態地將維度表的主鍵“翻譯”成這些非主鍵維度,並進行實時聚合。

    雖然衍生維度具有非常大的吸引力,但這也並不是說所有維度表上的維度都得變成衍生維度,如果從維度表主鍵到某個維度表維度所需要的聚合工作量非常大,則不建議使用衍生維度。

    13.3 使用聚合組(Aggregation group)

    聚合組(Aggregation Group)是一種強大的剪枝工具。聚合組假設一個Cube的所有維度均可以根據業務需求劃分成若干組(當然也可以是一個組),由於同一個組內的維度更可能同時被同一個查詢用到,因此會表現出更加緊密的內在關聯。每個分組的維度集合均是Cube所有維度的一個子集,不同的分組各自擁有一套維度集合,它們可能與其他分組有相同的維度,也可能沒有相同的維度。每個分組各自獨立地根據自身的規則貢獻出一批需要被物化的Cuboid,所有分組貢獻的Cuboid的並集就成為了當前Cube中所有需要物化的Cuboid的集合。不同的分組有可能會貢獻出相同的Cuboid,構建引擎會察覺到這點,並且保證每一個Cuboid無論在多少個分組中出現,它都只會被物化一次。

    對於每個分組內部的維度,用戶可以使用如下三種可選的方式定義,它們之間的關係,具體如下。

    1. 強制維度(Mandatory)

      • 強制維度:所有cuboid必須包含的維度,不會計算不包含強制維度的cuboid。

      • 適用場景:可以將確定在查詢時一定會使用的維度設為強制維度。例如,時間維度。

      • 優化效果:將一個維度設為強制維度,則cuboid個數直接減半。

    如果一個維度被定義為強制維度,那麼這個分組產生的所有Cuboid中每一個Cuboid都會包含該維度。每個分組中都可以有0個、1個或多個強制維度。如果根據這個分組的業務邏輯,則相關的查詢一定會在過濾條件或分組條件中,因此可以在該分組中把該維度設置為強制維度。

    1. 層級維度(Hierarchy),

      • 層級維度:具有一定層次關係的維度。

      • 使用場景:像年,月,日;國家,省份,城市這類具有層次關係的維度。

      • 優化效果:將N個維度設置為層次維度,則這N個維度組合成的cuboid個數會從2的N次方減少到N+1。

    每個層級包含兩個或更多個維度。假設一個層級中包含D1,D2…Dn這n個維度,那麼在該分組產生的任何Cuboid中, 這n個維度只會以(),(D1),(D1,D2)…(D1,D2…Dn)這n+1種形式中的一種出現。每個分組中可以有0個、1個或多個層級,不同的層級之間不應當有共享的維度。如果根據這個分組的業務邏輯,則多個維度直接存在層級關係,因此可以在該分組中把這些維度設置為層級維度。

    1. 聯合維度(Joint),

      • 聯合維度:將幾個維度視為一個維度。

      • 適用場景:

        1. 可以將確定在查詢時一定會同時使用的幾個維度設為一個聯合維度。
        2. 可以將基數很小的幾個維度設為一個聯合維度。
        3. 可以將查詢時很少使用的幾個維度設為一個聯合維度。
      • 優化效果:將N個維度設置為聯合維度,則這N個維度組合成的cuboid個數會從2的N次方減少到1。

    每個聯合中包含兩個或更多個維度,如果某些列形成一個聯合,那麼在該分組產生的任何Cuboid中,這些聯合維度要麼一起出現,要麼都不出現。每個分組中可以有0個或多個聯合,但是不同的聯合之間不應當有共享的維度(否則它們可以合併成一個聯合)。如果根據這個分組的業務邏輯,多個維度在查詢中總是同時出現,則可以在該分組中把這些維度設置為聯合維度。

    這些操作可以在Cube Designer的Advanced Setting中的Aggregation Groups區域完成,如下圖所示。

    聚合組的設計非常靈活,甚至可以用來描述一些極端的設計。假設我們的業務需求非常單一,只需要某些特定的Cuboid,那麼可以創建多個聚合組,每個聚合組代表一個Cuboid。具體的方法是在聚合組中先包含某個Cuboid所需的所有維度,然後把這些維度都設置為強制維度。這樣當前的聚合組就只能產生我們想要的那一個Cuboid了。

    再比如,有的時候我們的Cube中有一些基數非常大的維度,如果不做特殊處理,它就會和其他的維度進行各種組合,從而產生一大堆包含它的Cuboid。包含高基數維度的Cuboid在行數和體積上往往非常龐大,這會導致整個Cube的膨脹率變大。如果根據業務需求知道這個高基數的維度只會與若干個維度(而不是所有維度)同時被查詢到,那麼就可以通過聚合組對這個高基數維度做一定的“隔離”。我們把這個高基數的維度放入一個單獨的聚合組,再把所有可能會與這個高基數維度一起被查詢到的其他維度也放進來。這樣,這個高基數的維度就被“隔離”在一個聚合組中了,所有不會與它一起被查詢到的維度都沒有和它一起出現在任何一個分組中,因此也就不會有多餘的Cuboid產生。這點也大大減少了包含該高基數維度的Cuboid的數量,可以有效地控制Cube的膨脹率。

    13.4 併發粒度優化

    當Segment中某一個Cuboid的大小超出一定的閾值時,系統會將該Cuboid的數據分片到多個分區中,以實現Cuboid數據讀取的并行化,從而優化Cube的查詢速度。具體的實現方式如下:構建引擎根據Segment估計的大小,以及參數“kylin.hbase.region.cut”的設置決定Segment在存儲引擎中總共需要幾個分區來存儲,如果存儲引擎是HBase,那麼分區的數量就對應於HBase中的Region數量。kylin.hbase.region.cut的默認值是5.0,單位是GB,也就是說對於一個大小估計是50GB的Segment,構建引擎會給它分配10個分區。用戶還可以通過設置kylin.hbase.region.count.min(默認為1)和kylin.hbase.region.count.max(默認為500)兩個配置來決定每個Segment最少或最多被劃分成多少個分區。

    由於每個Cube的併發粒度控制不盡相同,因此建議在Cube Designer 的Configuration Overwrites(上圖所示)中為每個Cube量身定製控制併發粒度的參數。假設將把當前Cube的kylin.hbase.region.count.min設置為2,kylin.hbase.region.count.max設置為100。這樣無論Segment的大小如何變化,它的分區數量最小都不會低於2,最大都不會超過100。相應地,這個Segment背後的存儲引擎(HBase)為了存儲這個Segment,也不會使用小於兩個或超過100個的分區。我們還調整了默認的kylin.hbase.region.cut,這樣50GB的Segment基本上會被分配到50個分區,相比默認設置,我們的Cuboid可能最多會獲得5倍的併發量。

    13.5 Row Key優化

    Kylin會把所有的維度按照順序組合成一個完整的Rowkey,並且按照這個Rowkey升序排列Cuboid中所有的行。

    設計良好的Rowkey將更有效地完成數據的查詢過濾和定位,減少IO次數,提高查詢速度,維度在rowkey中的次序,對查詢性能有顯著的影響。

    Row key的設計原則如下:

    1. 被用作where過濾的維度放在前邊。
    1. 基數大的維度放在基數小的維度前邊。

    13.6 增量cube構建

    構建全量cube,也可以實現增量cube的構建,就是通過分區表的分區時間字段來進行增量構建

    1. 更改model

    1. 更改cube

    14 Kafka 流構建 Cube(Kylin實時案例)

    Kylin v1.6 發布了可擴展的 streaming cubing 功能,它利用 Hadoop 消費 Kafka 數據的方式構建 cube。

    參考:http://kylin.apache.org/blog/2016/10/18/new-nrt-streaming/

    前期準備:kylin v1.6.0 或以上版本 和 可運行的 Kafka(v0.10.0 或以上版本)的 Hadoop 環境

    14.1 Kafka創建Topic

    • 創建樣例名為 “kylin_streaming_topic” 具有一個副本三個分區的 topic
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kylin_streaming_topic
    

    • 將樣例數據放入 topic,Kylin 有一個實用類可以做這項工作;
    cd $KYLIN_HOME
    ./bin/kylin.sh org.apache.kylin.source.kafka.util.KafkaSampleProducer --topic kylin_streaming_topic --broker cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
    

    工具每一秒會向 Kafka 發送 100 條記錄。直至本案例結束請讓其一直運行。

    14.2 用streaming定義一張表

    登陸 Kylin Web GUI,選擇一個已存在的 project 或創建一個新的 project;點擊 “Model” -> “Data Source”,點擊 “Add Streaming Table” 圖標

    • 在彈出的對話框中,輸入您從 kafka-console-consumer 中獲得的樣例記錄,點擊 “»” 按鈕,Kylin 會解析 JSON 消息並列出所有的消息
    {"country":"CHINA","amount":41.53789973661185,"qty":6,"currency":"USD","order_time":1592485535129,"category":"TOY","device":"iOS","user":{"gender":"Male","id":"12d127ab-707e-592f-2e4c-69ad654afa48","first_name":"unknown","age":25}}
    
    • 您需要為這個 streaming 數據源起一個邏輯表名;該名字會在後續用於 SQL 查詢;這裡是在 “Table Name” 字段輸入 “STREAMING_SALES_TABLE” 作為樣例。

    • 您需要選擇一個時間戳字段用來標識消息的時間;Kylin 可以從這列值中獲得其他時間值,如 “year_start”,”quarter_start”,這為您構建和查詢 cube 提供了更高的靈活性。這裏可以查看 “order_time”。您可以取消選擇那些 cube 不需要的屬性。這裏我們保留了所有字段。

    • 注意 Kylin 從 1.6 版本開始支持結構化 (或稱為 “嵌入”) 消息,會將其轉換成一個 flat table structure。默認使用 “_” 作為結構化屬性的分隔符。

    • 點擊 “Next”。在這個頁面,提供了 Kafka 集群信息;輸入 “kylin_streaming_topic” 作為 “Topic” 名;集群有 3 個 broker,其主機名為”cdh01.cm,cdh02.cm,cdh03.cm“,端口為 “9092”,點擊 “Save”。
    • 在 “Advanced setting” 部分,”timeout” 和 “buffer size” 是和 Kafka 進行連接的配置,保留它們。

    • 在 “Parser Setting”,Kylin 默認您的消息為 JSON 格式,每一個記錄的時間戳列 (由 “tsColName” 指定) 是 bigint (新紀元時間) 類型值;在這個例子中,您只需設置 “tsColumn” 為 “order_time”;

    • 在現實情況中如果時間戳值為 string 如 “Jul 20,2016 9:59:17 AM”,您需要用 “tsParser” 指定解析類和時間模式例如:
    • 點擊 “Submit” 保存設置。現在 “Streaming” 表就創建好了。

    14.3 定義數據模型

    • 有了上一步創建的表,現在我們可以創建數據模型了。步驟和您創建普通數據模型是一樣的,但有兩個要求:

      • Streaming Cube 不支持與 lookup 表進行 join;當定義數據模型時,只選擇 fact 表,不選 lookup 表;
      • Streaming Cube 必須進行分區;如果您想要在分鐘級別增量的構建 Cube,選擇 “MINUTE_START” 作為 cube 的分區日期列。如果是在小時級別,選擇 “HOUR_START”。
    • 這裏我們選擇 13 個 dimension 和 2 個 measure 列:

    保存數據模型。

    14.4 創建 Cube

    Streaming Cube 和普通的 cube 大致上一樣. 有以下幾點需要您注意:

    • 分區時間列應該是 Cube 的一個 dimension。在 Streaming OLAP 中時間總是一個查詢條件,Kylin 利用它來縮小掃描分區的範圍。
    • 不要使用 “order_time” 作為 dimension 因為它非常的精細;建議使用 “mintue_start”,”hour_start” 或其他,取決於您如何檢查數據。
    • 定義 “year_start”,”quarter_start”,”month_start”,”day_start”,”hour_start”,”minute_start” 作為層級以減少組合計算。
    • 在 “refersh setting” 這一步,創建更多合併的範圍,如 0.5 小時,4 小時,1 天,然後是 7 天;這將會幫助您控制 cube segment 的數量。
    • 在 “rowkeys” 部分,拖拽 “minute_start” 到最上面的位置,對於 streaming 查詢,時間條件會一直显示;將其放到前面將會幫助您縮小掃描範圍。

    保存 cube。

    14.5 運行Cube

    可以在 web GUI 觸發 build,通過點擊 “Actions” -> “Build”,或用 ‘curl’ 命令發送一個請求到 Kylin RESTful API:

    curl -X PUT --user ADMIN:KYLIN -H "Content-Type: application/json;charset=utf-8" -d '{ "sourceOffsetStart": 0, "sourceOffsetEnd": 9223372036854775807, "buildType": "BUILD"}' http://localhost:7070/kylin/api/cubes/{your_cube_name}/build2
    

    請注意 API 終端和普通 cube 不一樣 (這個 URL 以 “build2” 結尾)。

    這裏的 0 表示從最後一個位置開始,9223372036854775807 (Long 類型的最大值) 表示到 Kafka topic 的結束位置。如果這是第一次 build (沒有以前的 segment),Kylin 將會尋找 topics 的開頭作為開始位置。

    在 “Monitor” 頁面,一個新的 job 生成了;等待其直到 100% 完成。

    14.6 查看結果

    點擊 “Insight” 標籤,編寫 SQL 運行,例如:

    select minute_start, count(*), sum(amount), sum(qty) from streaming_sales_table group by minute_start order by minute_start
    

    14.7 自動 build

    一旦第一個 build 和查詢成功了,您可以按照一定的頻率調度增量 build。Kylin 將會記錄每一個 build 的 offsets;當收到一個 build 請求,它將會從上一個結束的位置開始,然後從 Kafka 獲取最新的 offsets。有了 REST API 您可以使用任何像 Linux cron 調度工具觸發它:

    crontab -e
    */5 * * * * curl -X PUT --user ADMIN:KYLIN -H "Content-Type: application/json;charset=utf-8" -d '{ "sourceOffsetStart": 0, "sourceOffsetEnd": 9223372036854775807, "buildType": "BUILD"}' http://localhost:7070/kylin/api/cubes/{your_cube_name}/build2
    

    現在您可以觀看 cube 從 streaming 中自動 built。當 cube segments 累積到更大的時間範圍,Kylin 將會自動的將其合併到一個更大的 segment 中。

    15 JDBC查詢kylin

    • maven依賴
        <dependencies>
            <dependency>
                <groupId>org.apache.kylin</groupId>
                <artifactId>kylin-jdbc</artifactId>
                <version>3.0.1</version>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <!-- 限制jdk版本插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    • java類
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    
    public class KylinJdbc {
        public static void main(String[] args) throws Exception {
            //Kylin_JDBC 驅動
            String KYLIN_DRIVER = "org.apache.kylin.jdbc.Driver";
            //Kylin_URL
            String KYLIN_URL = "jdbc:kylin://localhost:9090/kylin_hive";
            //Kylin的用戶名
            String KYLIN_USER = "ADMIN";
            //Kylin的密碼
            String KYLIN_PASSWD = "KYLIN";
            //添加驅動信息
            Class.forName(KYLIN_DRIVER);
            //獲取連接
            Connection connection = DriverManager.getConnection(KYLIN_URL, KYLIN_USER, KYLIN_PASSWD);
            //預編譯SQL
            PreparedStatement ps = connection.prepareStatement("SELECT sum(sal) FROM emp group by deptno");
            //執行查詢
            ResultSet resultSet = ps.executeQuery();
            //遍歷打印
            while (resultSet.next()) {
                        System.out.println(resultSet.getInt(1));
            }
        }
    }
    

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

    聚甘新

  • 80386學習(五) 80386分頁機制與虛擬內存

    80386學習(五) 80386分頁機制與虛擬內存

    一. 頁式內存管理介紹

      80386能夠將內存分為不同屬性的段,並通過段描述符、段表以及段選擇子等機制,通過段基址和段內偏移量計算出線性地址進行訪問,這一內存管理方式被稱為段式內存管理

      這裏要介紹的是另一種內存管理的方式:80386在開啟了分頁機制后,便能夠將物理內存劃分為一個個大小相同且連續的物理內存頁,訪問時通過物理內存頁號和頁內偏移計算出最終需要訪問的線性地址進行訪問,由於內存管理單元由段變成了頁,因此這一內存管理方式被稱為頁式內存管理

      80386的分頁機制只能在保護模式下開啟。

    為什麼需要頁式內存管理?

      在介紹80386分頁機制前,需要先理解為什麼CPU在管理內存時,要在段式內存管理的基礎上再引入一種有很大差異的頁式內存管理方式?頁式內存管理與純段式內存管理相比到底具有哪些優點?

      一個很重要的原因是為了解決多任務環境下,段式內存管理中多任務的創建與終止時會產生較多內存碎片,使得內存空間使用率不高的問題

      內存碎片分為外碎片和內碎片兩種。

    外碎片

      對於指令和數據的訪問通常都是連續的,所以需要為一個任務分配連續的內存空間。在段式內存管理中,通常為任務分配一個完整的內存段,或是按照任務內段功能的不同,分配包括代碼段、數據段和堆棧段在內的多個完整連續段空間。支持多道任務的系統分配的內存空間,會在某些任務退出並釋放內存時,產生外部內存碎片。

      舉個例子,假設當前存在10MB的內存空間,存在A/B/C/D四個任務,併為每個任務分配一整塊的內存空間,其所佔用的內存空間分別為3MB/2MB/4MB/1MB,如下圖所示(一個格子代表1MB內存)。

      當任務B和任務D執行完成后,所佔用的內存空間被釋放,10MB的內存空間中出現了3MB大小的空閑內存。如果此時出現了一個任務E,需要為其分配3MB的內存空間,此時內存雖然存在3MB的內存空間,卻由於空閑內存的不連續,碎片化,導致無法直接分配給任務E使用。而這裏任務B、任務D結束后釋放的空餘內存空間就被視為外碎片。

      這裏的例子任務數量少且內存空間也很小。而在實際的32位甚至64位的系統中,物理內存空間少則4GB,多則幾十甚至上百GB,由於任務內存的反覆分配和釋放,導致出現的外碎片的數量及浪費的內存空間會很多,很大程度上降低了內存空間的利用率。

      雖然理論上能夠通過操作系統小心翼翼的挪動內存,使得外碎片能夠拼接為連續的大塊,得以被有效利用(內存緊縮)。但是操作系統挪動、複製內存本身很佔用CPU資源,且存在對指令進行地址重定位、暫時暫停對所挪動內存區域的訪問等附加問題,造成的效率降低程度幾乎是不可忍受的,因此這一解決方案並沒有被廣泛使用。

      

    內碎片

      外碎片指的是不同任務內存之間的碎片,而內碎片指的是一個任務內產生的內存碎片。

      通常操作系統為了管理多任務環境下的物理內存,會將內存分隔為固定大小的分區,使用系統表記錄對應分區內存的使用情況(如是否已分配等)。分區的大小必須適當,如果分區過小,則相同物理內存大小下,系統表項過多使得所佔用的空間過大;可如果分區過大,則會產生過大的內碎片,造成不必要的內存空間浪費。

      以上述介紹外碎片的數據為例,系統中的內存分區固定大小為1MB,其中為任務C分配了4個內存分區,共4MB大小。可實際上任務C實際只需要3.5MB的空間即可滿足需求,但由於分區是內存管理的最小單元,只能為任務分配整數個的內存分區。3個分區3MB並不滿足任務C的3.5MB的內存需求,因此只能分配4個分區給任務C。而這裏任務C額外多佔用的0.5MB內存就是內碎片。 

      內碎片就是已經被分配出去,卻不能被有效利用的內存空間。

    80386是如何解決內存碎片問題的?

    外碎片的解決

      外碎片問題產生的主要原因是程序所需要分配的內存空間是連續的。為此,80386提供了分頁機制,使得最終分配給任務的物理內存空間可以不連續。如果任務所使用的內存不必連續,前面外碎片例子中提到的任務E就能夠在1MB+2MB的離散物理內存上正常運行,外碎片問題自然就得到了解決。

    內碎片的解決

      內碎片從本質上來說是很難完全避免的(內存管理最小單元不能過小),主要的問題在於前面提到的內存分區管理單元大小的較優值不好確定。開啟了分頁管理的80386,允許將物理內存分割最小為4KB固定大小的管理單元,這個固定大小的內存管理單元被稱為頁,並由專門的被稱為頁表的數據結構來追蹤內存頁的使用情況。

      對於頁表項過多的問題,80386的設計者提供了多級頁表機制,減少了頁表所佔用的空間。

      對於內碎片過大的問題,由於80386所運行的任務所佔用的內存段一般遠大於一個內存頁的大小,因此頁機制下所產生的內部碎片是十分有限的,可以達到一個令人滿意的內存使用率。

    二. 虛擬內存簡單介紹 

      為了解決應用程序高速增長的內存需求與物理內存增加緩慢的矛盾,計算機科學家們提供了虛擬內存的概念。使用了虛擬內存的系統,可以使得系統內運行的程序所佔用的內存空間總量,遠大於實際物理內存的容量。

      能夠實現虛擬內存的關鍵在於程序在特定時刻所需要訪問的內存地址是符合局部性原理的。通過操作系統和硬件的緊密配合,能夠將任務暫時不需要訪問的內存交換到外部硬盤中,而將物理內存留給真正需要訪問的那部分內存(工作集內存)。

      虛擬內存和分頁機制是一對好搭檔,分頁機制提供了管理內存的基本單位:頁,80386的頁式虛擬內存實現在工作集內存調度時也依賴分頁機制提供的頁來進行。隨着程序的執行,程序的工作集內存在動態變化,當CPU檢測到當前所訪問的內存頁不在物理內存中時,便會通知操作系統(內存缺頁異常),操作系統的缺頁異常處理程序會將硬盤交換區中的對應內存頁數據寫回物理內存。如果物理內存頁已經滿了的情況下,則還需要根據某種算法將另一個物理內存頁替換,來容納這一換入的內存頁。

    三. 80386分頁機制原理

      在介紹分頁機制原理之前,需要先理解關於80386保護模式下32位內存尋址時幾種地址的概念。

    物理地址(Physical Address):

      物理地址就是32位的地址總線所對應的真實的硬件存儲空間。對於物理內存的訪問,無論中間會經過多少次轉換,最終必須轉換為最終的物理地址進行訪問。

    邏輯地址(Logical Address):

      在80386保護模式的程序指令中,對內存的訪問是由段選擇子和段內偏移決定的。段選擇子+段內偏移 –> 邏輯地址。

    線性地址(Linear Address):

      CPU在內存尋址時,從指令中獲得段選擇子和段內偏移,即邏輯地址。由段選擇子在段表(GDT或LDT)中找到對應的段描述符,獲取段基址。段基址+段內偏移決定線性地址。

      如果沒有開啟分頁,CPU就使用生成的線性地址直接作為最終的物理地址進行訪問;如果開啟了分頁,則還需要通過頁表等機制,將線性地址進一步處理才能生成物理地址進行訪問。

    頁式虛擬內存實現原理

      程序要求訪問一個段時,其線性地址必須是連續的。在純粹的段式內存管理中,線性地址等於物理地址的情況下,就會出現外碎片的問題。而在段式內存管理的基礎上,80386如果還開啟了頁機制,就能通過抽象出一層線性地址到物理地址的映射,使得最終分配給程序的物理內存段不必連續。

      80386中的內存頁大小為4KB,在32位的內存尋址空間中(4GB),存在着0x10000 = 1048576個頁。每個頁對應的起始地址低12位都為0,第一個物理內存頁的物理地址為0x00000000,第二個物理內存頁的物理地址為0x00001000,依此類推,最後一個物理頁的物理地址是0xFFFFF000。

    頁表

      在80386的分頁機制的實現中,是通過頁表來實現線性地址到物理地址映射轉換的。每個任務都有一個自己的頁表記錄著任務的線性地址到物理地址的映射關係。

      開啟了頁機制后的線性地址也被稱為虛擬地址,這是因為線性地址已經不再直接對應真實的物理地址,而是一個不承載真實數據的虛擬內存地址。開啟了分頁機制后,一個任務的虛擬地址空間依然是連續的,但所佔用的物理地址空間卻可以不連續

      頁表保存着被稱為頁表項的數據結構集合,每一個頁表項都記載着一個虛擬內存頁到物理內存頁的映射關係。開啟了頁機制之後,CPU在內存尋址時,在通過段表計算出了線性地址(虛擬地址)后,便可以在連續排布的虛擬地址空間中找到對應的頁表項,通過頁表項獲取虛擬內存頁所對應的物理內存頁地址,進行物理內存的訪問。虛擬地址到物理地址映射的細節會在後面進行展開。

      由於是將不斷變化的虛擬內存頁裝載進相對不變的物理內存頁中,就像畫廊中展示的畫會不斷的更替,但畫框基本不變一樣。為了更好的區分這兩者,頁通常特指虛擬內存頁,而物理內存頁則被稱為頁框。

    頁表項介紹

      頁表項是32位的,其結構如下圖所示。

      

    P位:

      P(Present),存在位。標識當前虛擬內存頁是否存在於物理內存頁中。當P位為1時,表示當前虛擬內存頁存在於物理內存中,可以直接進行訪問。當P位為0時,表示對應的物理內存頁不存在,需要新分配物理內存頁或是從磁盤中將其調度回物理內存。

      分頁模式下的內存尋址,如果CPU發現對應的頁表項P位為0,會引發缺頁異常中斷,操作系統在缺頁異常處理程序中進行對應的處理,以實現虛擬內存。

    RW位:

      RW(Read/Write)位,讀寫位。標識當前頁是否能夠寫入。當RW為1時,代表當前頁可讀可寫;當RW為0時,代表當前頁是只讀的。

    US位:

      US(User/Supervisor)位,用戶/管理位。當US為1時,標識當前頁是用戶級別的,允許所有當前特權級的任務進行訪問。當US為0時,表示當前頁是屬於管理員級別的,只允許當前特權級為0、1、2的任務進行訪問,而當前特權級為3的用戶態任務無法進行訪問。

    PWT位/PCD位:

      PWT(Page-level Write Through)位,頁級通寫位。PWT為1時,表示當前物理頁的高速緩存採用通寫法;PWT為0時,表示當前物理頁的高速緩存採用回寫法。

      PCD(Page-level Cache Disable)位,頁級高速緩存禁止位。PCD為1時,表示訪問當前物理頁禁用高速緩存;PCD為0時,表示訪問當前物理頁時允許使用高速緩存。

      PWT與PCD位的使用,涉及到了80386高速緩存的工作原理與內存一致性問題,限於篇幅不在這裏展開。

    A位:

      A(Access)位,訪問位。A位為1時,代表當前頁曾經被訪問過;A位為0時,代表當前頁沒有被訪問過。

      A位的設置由CPU固件在對應內存頁訪問時自動設置為1,且可以由操作系統在適當的時候通過程序指令重置為0,用以計算內存頁的訪問頻率。通過訪問頻率,操作系統能夠以此作為虛擬內存調度算法中評估的依據,在物理內存緊張的情況下,可以選擇將最少使用的內存頁換出,以減少不必要的虛擬內存頁調度時的磁盤I/O,提高虛擬內存的效率。

    D位:

      D(Dirty)位,臟位。當D位為1時,表示當前頁被寫入修改過;D位為0時,代表當前頁沒有被寫入修改過。

      臟位由CPU在對應內存頁被寫入時自動設置為1。操作系統在進行內存頁調度時,如果發現需要被換出的內存頁D位為1時,則需要將對應物理內存頁數據寫回虛擬頁對應的磁盤交換區,保證磁盤/內存數據的一致性;當發現需要被換出的物理內存頁的D位為0時,表示當前頁自從換入物理內存以來沒有被修改過,和磁盤交換區中的數據一致,便直接將其覆蓋,而不進行磁盤的寫回,減少不必要的I/O以提高效率。

    PAT位:

      PAT(Page Attribute Table),頁屬性表支持位。PAT位的存在使得CPU能夠支持更複雜的,不同頁大小的分頁管理。當PAT=0時,每一頁的大小為4KB;當PAT=1時,每一頁的大小是4MB,或是其它大小(分CPU的情況而定)。

    G位:

      G(Global),全局位。表示當前頁是否是全局的,而不是屬於某一特定任務的。G=1時,表示當前頁是全局的;G=0時,表示當前頁是屬於特定任務的。

      為了加速頁表項的訪問,80386提供了TLB快表,作為頁表訪問的高速緩存。當任務切換時,TLB內所有G=0的非全局頁將會被清除,G=1的全局頁將會被保留。將操作系統內核中關鍵的,頻繁訪問的頁設置為全局頁,使得其能夠一直保存在TLB快表中,加速對其的訪問速度,提高效率。

    AVL位:

      AVL(Avaliable),可用位。和段描述符中的AVL位功能類似,CPU並不使用它,而是提供給操作系統軟件自定義使用。

    頁物理基地址字段:     

      頁物理基地址字段用於標識對應的物理頁,共20位。

      由於32位的80386的頁最小是4KB,而4GB的物理內存被分解為了最多0x10000個4KB的物理頁。20位的頁物理基地址字段作為物理頁的索引標號與每一個具體的物理頁一一對應。通過頁物理基地址字段,便能找到唯一對應的物理內存頁。

    多級頁表

      在32位的CPU中,操作系統可以給每個程序分配至多4GB的虛擬內存空間,如果一個內存頁佔4KB,那麼對應的每個程序的頁表中最多需要存放着0x10000個頁表項來進行映射。即使每個頁表項只佔小小的32位共4個字節(4Byte),這依然是一個不小的內存開銷(0x10000個頁表項的大小為4MB)。

      一個應用程序雖然可以被分配4GB的虛擬內存空間,但實際上可能只使用其中的一小部分,例如40MB的大小。通常程序的堆棧段和數據段都分別位於虛擬內存空間的高低兩端,並隨着程序的執行慢慢的向中間擴展,由於頁表項對應與虛擬地址空間的連續性,這就要求任務在執行時必須完整的定義整張頁表。

      可以看到,一級的平面頁表結構存在着明顯的頁表空間浪費的問題。雖然可以要求應用程序不要一下子就以4GB的內存規格進行編程,而是一開始用較小的內存,並在需要更大內存時梯度的申請更大的內存空間,並重新構造數據段和堆棧段以減少每個任務的無用頁表項空間的浪費。但這將頁表空間優化的繁重任務強加給了應用程序,並不是一個好的解決辦法。

      為此,計算機科學家們提出了多級頁表的方案來解決頁表項過多的問題。多級頁表顧名思義,頁表的結構不再是一個一級的平面結構(一級頁表),而是像一顆樹一樣,由頁目錄項節點頁表項節點組成。目錄節點中保存着下一級節點的物理頁地址等信息,恭弘=叶 恭弘子節點中則包含着真正的頁表項信息。查詢頁表項時,從一級頁目錄節點(根目錄)出發,按照一定的規則可以找到對應的下一級子目錄節點,直到查詢出對應的恭弘=叶 恭弘子節點為止。

      

    80386頁目錄項介紹

      80386採用的是二級頁表的設計,二級頁表由頁目錄表和頁表共同組成。頁目錄表中存放的是頁目錄項,頁目錄項的大小和頁表項一致,為4字節。

      通過80386指令得到的32位線性地址,其中高20位作為頁表項索引,低12位作為頁內偏移地址(4KB大小的物理頁)。如果採用的是一級頁表結構,20位的頁表項索引能直接找到4MB頁表中的對應頁表項。

      而對於80386二級頁表的設計來說,由於一個物理頁大小為4KB,最多可以容納1024(2^10)個頁表項或者頁目錄項,所以將頁表項索引的高10位作為根目錄頁中頁目錄項的索引值,通過頁目錄項中的頁表項物理頁號可以找到對應的頁表物理頁;再根據頁表項索引的后10位找到頁表中對應的頁表項。

      

    80386頁目錄項結構圖

       80386的二級頁表的頁目錄項佔32位,其低12位的含義與頁表項一致。主要區別在於其高20位存放的是下一級頁表的物理頁索引,而不是虛擬地址映射的物理內存頁地址。

      

    頁表基址寄存器

      前面提到過,和LDT一樣,每個任務都擁有着自己獨立的頁表。為此80386CPU提供了一個專門的寄存器用於追蹤定位任務自己的頁表,這個寄存器的名稱叫做頁表基址寄存器(Page Directory Base Register,PDBR),也就是控制寄存器CR3。

      由於80386分頁機制使用的是二級頁表,因此PDBR指向的是二級頁表結構中的頁目錄,通過頁目錄表便能夠間接的訪問整個二級頁表。為了效率其中存放的直接就是頁目錄表的32位物理地址,一般由操作系統負責在任務切換時將新任務對應的頁目錄表預先加載進物理內存。

      由於PDBR是和當前任務有關的,在任務切換時會被新任務TSS中的PDBR字段值所替換,指向新任務的頁目錄表,而舊任務的PDBR的值則在保護現場時被存入對應的TSS中。

    多級頁表是如何解決頁表項浪費問題的?

      以80386的二級頁表設計為例,最大4GB的虛擬內存空間下,無論如何一級頁目錄表是必須存在的。當不需要為應用程序分配過多的內存時,頁目錄表中的頁目錄項所指向的對應頁表可以不存在,即頁目錄項的P位為0,實際不使用的虛擬內存空間將沒有對應的二級頁表節點,相比一級頁表的設計其浪費的內存會少很多。

      假設需要為一個虛擬地址首尾各需要分配20MB,共佔用40MB內存的任務構建對應的頁表。

      1. 如果使用一級頁表,4GB的虛擬內存空間下需要提供0x10000個頁表項,共4MB,頁表的體積達到了任務自身所需40MB內存的10%,但其中絕大多數的頁表項都是沒用的(P位為0),不會對應實際的物理內存,空間效率很低。

      2. 如果使用二級頁表,除了佔一個物理頁4KB大小的頁目錄表是必須存在的外,其頁目錄表中只有首尾兩項的P位為1,分別指向一個實際存在的頁表(二級節點),頁目錄表中間其它的頁目錄項P位都為0,不需要為這些不會使用到的虛擬地址分配頁表。對於這個40MB的程序來說,其頁表只佔了3個物理頁面,共12KB,空間效率相比一級頁表高很多。

    TLB快表

      前面提到了多級頁表所帶來的好處:通過頁表分層,可以減少順序排列的無效頁表項數量,節約內存空間;頁表的層級越多,空間效率也越高。

      計算機領域中,通常並沒有免費的午餐,一個問題的解決,往往會帶來新的問題:多級頁表本質上是一個樹狀結構,每一個節點頁都是離散的,因此每一層級訪問都需要進行一次內存尋址操作,頁表的層級越多,訪問的次數也就越多,虛擬頁地址映射過程也越慢。在32位的80386中,2級頁表下問題還不算特別嚴重;但64位CPU的出現帶來了更大的尋址空間,也需要更多的頁表項,頁表的層級也漸漸的從2級變成了3級、4級甚至更多。頁機制開啟之後,所有的內存尋址都需要經過CPU的頁部件進行轉化才能獲得最終的物理地址,因此這一過程必須要快,不能因為頁表的離散層次訪問就嚴重影響虛擬地址空間到物理地址空間的轉換速度。

      要加快原本相對耗時的查詢操作,一個常用的辦法便是引入緩存。為了加速通用內存的訪問,80386利用局部性原理提供了高速緩存;為了加速多級頁表的頁表項訪問,80386提供了TLB。

      TLB(Translation Lookaside Buffer)直譯為地址轉換後援緩衝器,根據其作用也被稱為頁表緩存或是快表(快速頁表)。TLB中存放着一張表,其中的每一項用於緩存當前任務虛擬頁號和對應頁表項中的關鍵信息,被稱為TLB項。

      TLB的工作原理和高速緩存類似:當CPU訪問某一虛擬頁時,通過虛擬頁號先在TLB中尋找,如果發現對應的TLB項存在,則直接以TLB項中的數據進行物理地址的轉換,這被稱為TLB命中;當發現對應的TLB項不存在時(TLB未命中),則進行內存的訪問,在獲取內存中頁表項數據的同時,也將對應頁表項緩存入TLB中。如果TLB已滿則需要通過某種置換算法選出一個已存在的TLB項將其替換。

      TLB的查詢速度比內存快,但容量相對內存小很多,因此只能緩存數量有限的頁表項。但由於內存訪問的局部性,只要通過合理的設計提高TLB的命中率(通常可以達到90%以上),就能達到很好的效果。 

    四. 80386分頁機制下的內存尋址流程

      下面總結一下開啟了分頁機制的80386是如何進行內存尋址的。

      1. CPU首先從內存訪問指令中獲取段選擇子和段內偏移地址

      2. 根據段選擇子從段表(GDT或LDT)中查詢出對應的段描述符

      3. 根據段描述符中的段基址和指令中的段內偏移地址生成32位的線性地址(頁機制下的虛擬地址)

      4. 32位的線性地址根據80386二級頁表的設計,拆分成三個部分:高10位作為頁目錄項索引,中間次高10位作為頁表項索引,低12位作為頁內偏移地址。

      5. 通過高10位的頁目錄項索引從一級頁目錄表中獲取二級頁表的物理頁地址(通過物理頁框號可得),再根據中間10位的頁表項索引找到對應的物理頁框。根據物理頁框號與頁內偏移地址共同生成最終的物理地址,進行物理內存的訪問。

    五. 總結

      想要通過學習操作系統來更好的理解計算機程序底層的工作原理,基礎的硬件知識是必須要了解的。紙上得來終覺淺,絕知此事要躬行,在理解了基礎原理后,還需要通過實踐來加深對原理知識的理解,而閱讀相關操作系統的實現源碼就是一個很好的將實踐與原理緊密結合的學習方式。

      希望通過對硬件和操作系統的學習能幫助我打開計算機程序底層運行的神秘黑盒子一窺究竟,在思考問題時能夠換一個角度從底層的視角出發,去更好的理解和掌握上層的應用技術,以避免迷失在快速發展的技術浪潮中。

       

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※別再煩惱如何寫文案,掌握八大原則!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

    FB行銷專家,教你從零開始的技巧

    聚甘新

  • 【String註解驅動開發】面試官讓我說說:如何使用FactoryBean向Spring容器中註冊bean?

    寫在前面

    在前面的文章中,我們知道可以通過多種方式向Spring容器中註冊bean。可以使用@Configuration結合@Bean向Spring容器中註冊bean;可以按照條件向Spring容器中註冊bean;可以使用@Import向容器中快速導入bean對象;可以在@Import中使用ImportBeanDefinitionRegistrar向容器中註冊bean。

    項目工程源碼已經提交到GitHub:https://github.com/sunshinelyz/spring-annotation

    FactoryBean概述

    一般情況下,Spring通過反射機制利用bean的class屬性指定實現類來實例化bean 。在某些情況下,實例化bean過程比較複雜,如果按照傳統的方式,則需要在 標籤中提供大量的配置信息,配置方式的靈活性是受限的,這時採用編碼的方式可以得到一個更加簡單的方案。Spring為此提供了一個org.springframework.bean.factory.FactoryBean的工廠類接口,用戶可以通過實現該接口定製實例化bean的邏輯。

    FactoryBean接口對於Spring框架來說佔有重要的地位,Spring 自身就提供了70多個FactoryBean的實現。它們隱藏了實例化一些複雜bean的細節,給上層應用帶來了便利。從Spring 3.0 開始, FactoryBean開始支持泛型,即接口聲明改為FactoryBean 的形式:

    在Spring 5.2.6版本中,FactoryBean接口的定義如下所示。

    package org.springframework.beans.factory;
    import org.springframework.lang.Nullable;
    
    public interface FactoryBean<T> {
    
    	String OBJECT_TYPE_ATTRIBUTE = "factoryBeanObjectType";
    
    	@Nullable
    	T getObject() throws Exception;
    
    	@Nullable
    	Class<?> getObjectType();
    
    	default boolean isSingleton() {
    		return true;
    	}
    }
    
    • T getObject():返回由FactoryBean創建的bean實例,如果isSingleton()返回true,則該實例會放到Spring容器中單實例緩存池中。
    • boolean isSingleton():返回由FactoryBean創建的bean實例的作用域是singleton還是prototype。
    • Class getObjectType():返回FactoryBean創建的bean類型。

    這裏,需要注意的是:當配置文件中 標籤的class屬性配置的實現類是FactoryBean時,通過 getBean()方法返回的不是FactoryBean本身,而是FactoryBean#getObject()方法所返回的對象,相當於FactoryBean#getObject()代理了getBean()方法。

    FactoryBean實例

    首先,創建一個PersonFactoryBean,實現FactoryBean接口,如下所示。

    package io.mykit.spring.plugins.register.bean;
    
    import org.springframework.beans.factory.FactoryBean;
    /**
     * @author binghe
     * @version 1.0.0
     * @description 商品的FactoryBean,測試FactoryBean
     */
    public class PersonFactoryBean implements FactoryBean<Person> {
    
        //返回一個Person對象,這個對象會被註冊到Spring容器中
        @Override
        public Person getObject() throws Exception {
            return new Person();
        }
    
        @Override
        public Class<?> getObjectType() {
            return Person.class;
        }
    
        //bean是否為單例;true:是;false:否
        @Override
        public boolean isSingleton() {
            return true;
        }
    }
    

    接下來,我們在PersonConfig2類中加入PersonFactoryBean的聲明,如下所示。

    @Bean
    public PersonFactoryBean personFactoryBean(){
        return new PersonFactoryBean();
    }
    

    這裏需要小夥伴們注意的是:我在這裏使用@Bean註解向Spring容器中添加的是PersonFactory對象。那我們就來看看Spring容器中有哪些bean。接下來,運行SpringBeanTest類中的testAnnotationConfig7()方法,輸出的結果信息如下所示。

    org.springframework.context.annotation.internalConfigurationAnnotationProcessor
    org.springframework.context.annotation.internalAutowiredAnnotationProcessor
    org.springframework.context.annotation.internalCommonAnnotationProcessor
    org.springframework.context.event.internalEventListenerProcessor
    org.springframework.context.event.internalEventListenerFactory
    personConfig2
    io.mykit.spring.plugins.register.bean.Department
    io.mykit.spring.plugins.register.bean.Employee
    io.mykit.spring.plugins.register.bean.User
    io.mykit.spring.plugins.register.bean.Role
    person
    binghe001
    personFactoryBean
    company
    

    可以看到,結果信息中輸出了一個personFactoryBean,我們看下這個personFactoryBean到底是個什麼鬼!此時,我們對SpringBeanTest類中的testAnnotationConfig7()方法稍加改動,添加獲取personFactoryBean的代碼,並輸出personFactoryBean實例的類型,如下所示。

    @Test
    public void testAnnotationConfig7(){
        ApplicationContext context = new AnnotationConfigApplicationContext(PersonConfig2.class);
        String[] names = context.getBeanDefinitionNames();
        Arrays.stream(names).forEach(System.out::println);
    
        Object personFactoryBean = context.getBean("personFactoryBean");
        System.out.println("personFactoryBean實例的類型為:" + personFactoryBean.getClass());
    }
    

    再次運行SpringBeanTest類中的testAnnotationConfig7()方法,輸出的結果信息如下所示。

    org.springframework.context.annotation.internalConfigurationAnnotationProcessor
    org.springframework.context.annotation.internalAutowiredAnnotationProcessor
    org.springframework.context.annotation.internalCommonAnnotationProcessor
    org.springframework.context.event.internalEventListenerProcessor
    org.springframework.context.event.internalEventListenerFactory
    personConfig2
    io.mykit.spring.plugins.register.bean.Department
    io.mykit.spring.plugins.register.bean.Employee
    io.mykit.spring.plugins.register.bean.User
    io.mykit.spring.plugins.register.bean.Role
    person
    binghe001
    personFactoryBean
    company
    personFactoryBean實例的類型為:class io.mykit.spring.plugins.register.bean.Person
    

    可以看到,雖然我在代碼中使用@Bean註解注入的PersonFactoryBean對象,但是,實際上從Spring容器中獲取到的bean對象卻是調用PersonFactoryBean類中的getObject()獲取到的Person對象。

    看到這裏,是不是有種豁然開朗的感覺!!!

    在PersonFactoryBean類中,我們將Person對象設置為單實例bean,接下來,我們在SpringBeanTest類中的testAnnotationConfig7()方法多次獲取Person對象,並輸出多次獲取的對象是否為同一對象,如下所示。

    @Test
    public void testAnnotationConfig7(){
        ApplicationContext context = new AnnotationConfigApplicationContext(PersonConfig2.class);
        String[] names = context.getBeanDefinitionNames();
        Arrays.stream(names).forEach(System.out::println);
    
        Object personFactoryBean1 = context.getBean("personFactoryBean");
        Object personFactoryBean2 = context.getBean("personFactoryBean");
        System.out.println(personFactoryBean1 == personFactoryBean2);
    }
    

    運行testAnnotationConfig7()方法輸出的結果信息如下所示。

    org.springframework.context.annotation.internalConfigurationAnnotationProcessor
    org.springframework.context.annotation.internalAutowiredAnnotationProcessor
    org.springframework.context.annotation.internalCommonAnnotationProcessor
    org.springframework.context.event.internalEventListenerProcessor
    org.springframework.context.event.internalEventListenerFactory
    personConfig2
    io.mykit.spring.plugins.register.bean.Department
    io.mykit.spring.plugins.register.bean.Employee
    io.mykit.spring.plugins.register.bean.User
    io.mykit.spring.plugins.register.bean.Role
    person
    binghe001
    personFactoryBean
    company
    true
    

    可以看到,在PersonFactoryBean類的isSingleton()方法中返回true時,每次獲取到的Person對象都是同一個對象,說明Person對象是單實例bean。

    這裏,可能就會有小夥伴要問了,如果將Person對象修改成多實例bean呢?別急,這裏我們只需要在PersonFactoryBean類的isSingleton()方法中返回false,即可將Person對象設置為多實例bean,如下所示。

    //bean是否為單例;true:是;false:否
    @Override
    public boolean isSingleton() {
        return false;
    }
    

    再次運行SpringBeanTest類中的testAnnotationConfig7()方法,輸出的結果信息如下所示。

    org.springframework.context.annotation.internalConfigurationAnnotationProcessor
    org.springframework.context.annotation.internalAutowiredAnnotationProcessor
    org.springframework.context.annotation.internalCommonAnnotationProcessor
    org.springframework.context.event.internalEventListenerProcessor
    org.springframework.context.event.internalEventListenerFactory
    personConfig2
    io.mykit.spring.plugins.register.bean.Department
    io.mykit.spring.plugins.register.bean.Employee
    io.mykit.spring.plugins.register.bean.User
    io.mykit.spring.plugins.register.bean.Role
    person
    binghe001
    personFactoryBean
    company
    false
    

    可以看到,最終結果返回了false,說明此時Person對象是多實例bean。

    如何在Spring容器中獲取到FactoryBean對象?

    之前,我們使用@Bean註解向Spring容器中註冊的PersonFactoryBean,獲取出來的確實Person對象。那麼,小夥伴們可能會問:我就想獲取PersonFactoryBean實例,該怎麼辦呢?

    其實,這也很簡單, 只需要在獲取bean對象時,在id前面加上&符號即可

    打開我們的測試類SpringBeanTest,在testAnnotationConfig7()方法中添加獲取PersonFactoryBean實例的代碼,如下所示。

    @Test
    public void testAnnotationConfig7(){
        ApplicationContext context = new AnnotationConfigApplicationContext(PersonConfig2.class);
        String[] names = context.getBeanDefinitionNames();
        Arrays.stream(names).forEach(System.out::println);
    
        Object personFactoryBean1 = context.getBean("personFactoryBean");
        Object personFactoryBean2 = context.getBean("personFactoryBean");
        System.out.println("personFactoryBean1類型:" + personFactoryBean1.getClass());
        System.out.println("personFactoryBean2類型:" + personFactoryBean2.getClass());
        System.out.println(personFactoryBean1 == personFactoryBean2);
    
        Object personFactoryBean3 = context.getBean("&personFactoryBean");
        System.out.println("personFactoryBean3類型:" + personFactoryBean3.getClass());
    }
    

    運行SpringBeanTest類中的testAnnotationConfig7()方法,輸出的結果信息如下所示。

    org.springframework.context.annotation.internalConfigurationAnnotationProcessor
    org.springframework.context.annotation.internalAutowiredAnnotationProcessor
    org.springframework.context.annotation.internalCommonAnnotationProcessor
    org.springframework.context.event.internalEventListenerProcessor
    org.springframework.context.event.internalEventListenerFactory
    personConfig2
    io.mykit.spring.plugins.register.bean.Department
    io.mykit.spring.plugins.register.bean.Employee
    io.mykit.spring.plugins.register.bean.User
    io.mykit.spring.plugins.register.bean.Role
    person
    binghe001
    personFactoryBean
    company
    personFactoryBean1類型:class io.mykit.spring.plugins.register.bean.Person
    personFactoryBean2類型:class io.mykit.spring.plugins.register.bean.Person
    false
    personFactoryBean3類型:class io.mykit.spring.plugins.register.bean.PersonFactoryBean
    

    可以看到,在獲取bean時,在id前面加上&符號就會獲取到PersonFactoryBean實例對象。

    那問題又來了!!為什麼在id前面加上&符號就會獲取到PersonFactoryBean實例對象呢?

    接下來,我們就揭開這個神秘的面紗,打開BeanFactory接口,

    package org.springframework.beans.factory;
    import org.springframework.beans.BeansException;
    import org.springframework.core.ResolvableType;
    import org.springframework.lang.Nullable;
    
    public interface BeanFactory {
    	String FACTORY_BEAN_PREFIX = "&";
        /**************以下省略n行代碼***************/
    }
    

    看到這裏,是不是明白了呢?沒錯,在BeanFactory接口中定義了一個&前綴,只要我們使用bean的id來從Spring容器中獲取bean時,Spring就會知道我們是在獲取FactoryBean本身。

    好了,咱們今天就聊到這兒吧!別忘了給個在看和轉發,讓更多的人看到,一起學習一起進步!!

    項目工程源碼已經提交到GitHub:https://github.com/sunshinelyz/spring-annotation

    寫在最後

    如果覺得文章對你有點幫助,請微信搜索並關注「 冰河技術 」微信公眾號,跟冰河學習Spring註解驅動開發。公眾號回復“spring註解”關鍵字,領取Spring註解驅動開發核心知識圖,讓Spring註解驅動開發不再迷茫。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※別再煩惱如何寫文案,掌握八大原則!

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※超省錢租車方案

    ※教你寫出一流的銷售文案?

    網頁設計最專業,超強功能平台可客製化

    聚甘新

  • Netty中的這些知識點,你需要知道!

    Netty中的這些知識點,你需要知道!

    一、Channel

    Channel是一個接口,而且是一個很大的接口,我們稱之為“大而全”,囊括了server端及client端接口所需要的接口。

    Channel是一個門面,封裝了包括網絡I/O及相關的所有操作。

    Channel聚合了包括網絡讀寫、鏈路管理、網絡連接信息、獲取EventLoop、Pipeline等相關功能類;統一分配,調度實現相應場景的功能。

    一個Channel 對應一個物理連接,是基於物理連接上的操作包裝。

    二、EventLoop

    EventLoop,Event意為事件、Loop意為環,EventLoo即為事件環

    EventLoop是一種程序設計結構等待以及分發事件。

    NioEventLoop,是一個Netty工作線程,又不僅僅是一個Netty工作線程。

    標準的netty線程模型 中我們講過Netty的標準線程池模型,池子里的每個線程對象就是一個NioEventLoop對象。或負責接受連接,或負責網絡I/O

    說它不僅僅是一個Netty線程,因為它實現了很多功能,我們可以看下它的繼承圖:

    它的上方有兩個枝丫,一個線程屬性,一個EventLoop,它是Netty的Reactor線程

    既然是Reactor線程,那麼首先我們需要一個多路復用器。在Netty NioEventLoop中,包就含一個 Selector,它的操作對象是Channel。

    NioEventLoop的主要邏輯在它的run()方法,方法體內是一個無限循環 for (;;),循環體內實現Loop功能。這也是通用的NIO線程實現方式。

     

    Loop 從任務隊列里獲取任務,然後檢查多路復用器中就緒的Channel進行處理。

    三、Unsafe

    Netty中的Unsafe,一個Channel內部聚合接口,用以處理實際的網絡I/O讀寫。當然,取Unsafe命名,源碼中釋義:提供的網絡相關的操作方法,永遠不應該被開發人員操作使用。

    它是Channel的一個輔助接口,主要方法:

    1、register:註冊Channel

    2、deregister:取消註冊

    3、bind:綁定地址,服務端綁定監聽特定端口;客戶端指定本地綁定Socket地址。

    4、connect:建立連接

    5、disconnect:斷開連接

    6、close:關閉連接

    7、write:調度寫,將數據寫入buffer,並未真正進入Channel

    8、flush:將緩衝區中的數據寫入Channel

    四、AdaptiveRecvByteBufAllocator

    動態緩衝區分配器,源碼說明:根據實時的反饋動態的增加或者減少預需的緩衝區大小。

    如果一次分配的緩衝區被填滿了,則調高下一次分配的緩衝區大小。

    如果連續兩次實際使用的容量低於分配的緩衝區大小特定比例,則減小下一次分配的緩衝區大小。

    其它情景,保持分配大小不變。

    Netty的這種“智能化”處理,可以說是相當有用的:

    1、首先,實際的應用場景千差萬別,同一場景下不同時刻的緩衝區需求也是實時變化(一句話可以是一個字,也可能是1000個字),這就需要Netty動態調整緩衝分配大小以適應不同的業務場景,時刻場景

    2、其次,過大的不必要的內存分配,會導致Buffer處理性能下降;過小的內存分配,則會導致頻繁的分配釋放。這都是一個優良的網絡框架不應該有的。 

    3、最後,動態的調整最直接的好處就是內存的的高效使用,一定程度上做到了按需分配。 

    五、ChannelPipeline

    Pipeline 管道,Channel的數據流通管道,在這個管道中,可以做很多事情。

    ChannelPipeline 是一種職責鏈,可以對其中流動的數據進行過濾、攔截處理,是一種插拔式的鏈路裝配器

    1、ChannelPipline是一個容器

    支持查詢、添加、刪除、替換等容器操作。

    2、ChannelPipline支持動態的添加和刪除 Handler

    ChannelPipline的這種特性給了我們相當的想象空間,例如動態的添加系統擁塞保護Handler,敏感數據過濾Handler、日誌記錄Handler、性能統計Handler等。

    3、ChannelPipline 是線程安全的

    ChannelPipline使用 synchronized 實現線程安全,業務線程可以併發的操作ChannelPipline。但需要注意的是,Handler是非線程安全的

    六、HandlerAdapter

    Adapter是一種適配器,對於用戶自定義的Handler,可以通過繼承HandlerAdapter,來規避不必要的接口實現

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※教你寫出一流的銷售文案?

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※回頭車貨運收費標準

    ※別再煩惱如何寫文案,掌握八大原則!

    ※超省錢租車方案

    ※產品缺大量曝光嗎?你需要的是一流包裝設計!

    聚甘新

  • 【原創】強擼 .NET Redis Cluster 集群訪問組件

    【原創】強擼 .NET Redis Cluster 集群訪問組件

      Hello 大家好,我是TANZAME,我們又見面了。今天我們來聊聊怎麼手擼一個 Redis Cluster 集群客戶端,純手工有乾貨,您細品。

      隨着業務增長,線上環境的QPS暴增,自然而然將當前的單機 Redis 切換到群集模式。燃鵝,我們悲劇地發現,ServiceStack.Redis這個官方推薦的 .NET 客戶端並沒有支持集群模式。一通度娘翻牆無果后,決定自己強擼一個基於ServiceStack.Redis的Redis集群訪問組件。

      話不多說,先上運行效果圖:

     

      Redis-Cluster集群使用 hash slot 算法對每個key計算CRC16值,然後對16383取模,可以獲取key對應的 hash slot。Redis-Cluster中每個master都會持有部分 slot,在訪問key時根據計算出來的hash slot去找到具體的master節點,再由當前找到的節點去執行具體的 Redis 命令(具體可查閱官方說明文檔)。

      由於 ServiceStack.Redis已經實現了單個實例的Redis命令,因此我們可以將即將要實現的 Redis 集群客戶端當做一個代理,它只負責計算 key 落在哪一個具體節點(尋址)然後將Redis命令轉發給對應的節點執行即可。

      ServiceStack.Redis的RedisClient是非線程安全的,ServiceStack.Redis 使用緩存客戶端管理器(PooledRedisClientManager)來提高性能和併發能力,我們的Redis Cluster集群客戶端也應集成PooledRedisClientManager來獲取 RedisClient 實例。

      同時,Redis-Cluster集群支持在線動態擴容和slot遷移,我們的Redis集群客戶端也應具備自動智能發現新節點和自動刷新 slot 分佈的能力。

      總結起來,要實現一個Redis-Cluster客戶端,需要實現以下幾個要點:

    • 根據 key 計算 hash slot
    • 自動讀取群集上所有的節點信息
    • 為節點分配緩存客戶端管理器
    • 將 hash slot 路由到正確的節點
    • 自動發現新節點和自動刷新slot分佈

      如下面類圖所示,接下來我們詳細分析具體的代碼實現。

      

      一、CRC16  

      CRC即循環冗餘校驗碼,是信息系統中一種常見的檢錯碼。CRC校驗碼不同的機構有不同的標準,這裏Redis遵循的標準是CRC-16-CCITT標準,這也是被XMODEM協議使用的CRC標準,所以也常用XMODEM CRC代指,是比較經典的“基於字節查表法的CRC校驗碼生成算法”。 

     1 /// <summary>
     2 /// 根據 key 計算對應的哈希槽
     3 /// </summary>
     4 public static int GetSlot(string key)
     5 {
     6     key = CRC16.ExtractHashTag(key);
     7     // optimization with modulo operator with power of 2 equivalent to getCRC16(key) % 16384
     8     return GetCRC16(key) & (16384 - 1);
     9 }
    10 
    11 /// <summary>
    12 /// 計算給定字節組的 crc16 檢驗碼
    13 /// </summary>
    14 public static int GetCRC16(byte[] bytes, int s, int e)
    15 {
    16     int crc = 0x0000;
    17 
    18     for (int i = s; i < e; i++)
    19     {
    20         crc = ((crc << 8) ^ LOOKUP_TABLE[((crc >> 8) ^ (bytes[i] & 0xFF)) & 0xFF]);
    21     }
    22     return crc & 0xFFFF;
    23 }

     

      二、讀取集群節點

      從集群中的任意節點使用 CLUSTER NODES 命令可以讀取到集群中所有的節點信息,包括連接狀態,它們的標誌,屬性和分配的槽等等。CLUSTER NODES 以串行格式提供所有這些信息,輸出示例:

    d99b65a25ef726c64c565901e345f98c496a1a47 127.0.0.1:7007 master - 0 1592288083308 8 connected
    2d71879d6529d1edbfeed546443051986245c58e 127.0.0.1:7003 master - 0 1592288084311 11 connected 10923-16383
    654cdc25a5fa11bd44b5b716cdf07d4ce176efcd 127.0.0.1:7005 slave 484e73948d8aacd8327bf90b89469b52bff464c5 0 1592288085313 10 connected
    ed65d52dad7ef6854e0e261433b56a551e5e11cb 127.0.0.1:7004 slave 754d0ec7a7f5c7765f784a6a2c370ea38ea0c089 0 1592288081304 9 connected
    754d0ec7a7f5c7765f784a6a2c370ea38ea0c089 127.0.0.1:7001 master - 0 1592288080300 9 connected 0-5460
    484e73948d8aacd8327bf90b89469b52bff464c5 127.0.0.1:7002 master - 0 1592288082306 10 connected 5461-10922
    2223bc6d099bd9838e5d2f1fbd9a758f64c554c4 127.0.0.1:7006 myself,slave 2d71879d6529d1edbfeed546443051986245c58e 0 0 6 connected
    

      每個字段的含義如下:

      1. id:節點 ID,一個40個字符的隨機字符串,當一個節點被創建時不會再發生變化(除非CLUSTER RESET HARD被使用)。

      2. ip:port:客戶端應該聯繫節點以運行查詢的節點地址。

      3. flags:逗號列表分隔的標誌:myselfmasterslavefail?failhandshakenoaddrnoflags。標誌在下一節詳細解釋。

      4. master:如果節點是從屬節點,並且主節點已知,則節點ID為主節點,否則為“ – ”字符。

      5. ping-sent:以毫秒為單位的當前激活的ping發送的unix時間,如果沒有掛起的ping,則為零。

      6. pong-recv:毫秒 unix 時間收到最後一個乒乓球。

      7. config-epoch:當前節點(或當前主節點,如果該節點是從節點)的配置時期(或版本)。每次發生故障切換時,都會創建一個新的,唯一的,單調遞增的配置時期。如果多個節點聲稱服務於相同的哈希槽,則具有較高配置時期的節點將獲勝。

      8. link-state:用於節點到節點集群總線的鏈路狀態。我們使用此鏈接與節點進行通信。可以是connecteddisconnected

      9. slot:散列槽號或範圍。從參數9開始,但總共可能有16384個條目(限制從未達到)。這是此節點提供的散列槽列表。如果條目僅僅是一個数字,則被解析為這樣。如果它是一個範圍,它是在形式start-end,並且意味着節點負責所有散列時隙從startend包括起始和結束值。

    標誌的含義(字段編號3):

    • myself:您正在聯繫的節點。
    • master:節點是主人。
    • slave:節點是從屬的。
    • fail?:節點處於PFAIL狀態。對於正在聯繫的節點無法訪問,但仍然可以在邏輯上訪問(不處於FAIL狀態)。
    • fail:節點處於FAIL狀態。對於將PFAIL狀態提升為FAIL的多個節點而言,這是無法訪問的。
    • handshake:不受信任的節點,我們握手。
    • noaddr:此節點沒有已知的地址。
    • noflags:根本沒有標誌。
      1 // 讀取集群上的節點信息
      2 static IList<InternalClusterNode> ReadClusterNodes(IEnumerable<ClusterNode> source)
      3 {
      4     RedisClient c = null;
      5     StringReader reader = null;
      6     IList<InternalClusterNode> result = null;
      7 
      8     int index = 0;
      9     int rowCount = source.Count();
     10 
     11     foreach (var node in source)
     12     {
     13         try
     14         {
     15             // 從當前節點讀取REDIS集群節點信息
     16             index += 1;
     17             c = new RedisClient(node.Host, node.Port, node.Password);
     18             RedisData data = c.RawCommand("CLUSTER".ToUtf8Bytes(), "NODES".ToUtf8Bytes());
     19             string info = Encoding.UTF8.GetString(data.Data);
     20 
     21             // 將讀回的字符文本轉成強類型節點實體
     22             reader = new StringReader(info);
     23             string line = reader.ReadLine();
     24             while (line != null)
     25             {
     26                 if (result == null) result = new List<InternalClusterNode>();
     27                 InternalClusterNode n = InternalClusterNode.Parse(line);
     28                 n.Password = node.Password;
     29                 result.Add(n);
     30 
     31                 line = reader.ReadLine();
     32             }
     33 
     34             // 只要任意一個節點拿到集群信息,直接退出
     35             if (result != null && result.Count > 0) break;
     36         }
     37         catch (Exception ex)
     38         {
     39             // 出現異常,如果還沒到最後一個節點,則繼續使用下一下節點讀取集群信息
     40             // 否則拋出異常
     41             if (index < rowCount)
     42                 Thread.Sleep(100);
     43             else
     44                 throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex);
     45         }
     46         finally
     47         {
     48             if (reader != null) reader.Dispose();
     49             if (c != null) c.Dispose();
     50         }
     51     }
     52 
     53 
     54     if (result == null)
     55         result = new List<InternalClusterNode>(0);
     56     return result;
     57 }
     58 
     59 /// <summary>
     60 /// 從 cluster nodes 的每一行命令里讀取出集群節點的相關信息
     61 /// </summary>
     62 /// <param name="line">集群命令</param>
     63 /// <returns></returns>
     64 public static InternalClusterNode Parse(string line)
     65 {
     66     if (string.IsNullOrEmpty(line))
     67         throw new ArgumentException("line");
     68 
     69     InternalClusterNode node = new InternalClusterNode();
     70     node._nodeDescription = line;
     71     string[] segs = line.Split(' ');
     72 
     73     node.NodeId = segs[0];
     74     node.Host = segs[1].Split(':')[0];
     75     node.Port = int.Parse(segs[1].Split(':')[1]);
     76     node.MasterNodeId = segs[3] == "-" ? null : segs[3];
     77     node.PingSent = long.Parse(segs[4]);
     78     node.PongRecv = long.Parse(segs[5]);
     79     node.ConfigEpoch = int.Parse(segs[6]);
     80     node.LinkState = segs[7];
     81 
     82     string[] flags = segs[2].Split(',');
     83     node.IsMater = flags[0] == MYSELF ? flags[1] == MASTER : flags[0] == MASTER;
     84     node.IsSlave = !node.IsMater;
     85     int start = 0;
     86     if (flags[start] == MYSELF)
     87         start = 1;
     88     if (flags[start] == SLAVE || flags[start] == MASTER)
     89         start += 1;
     90     node.NodeFlag = string.Join(",", flags.Skip(start));
     91 
     92     if (segs.Length > 8)
     93     {
     94         string[] slots = segs[8].Split('-');
     95         node.Slot.Start = int.Parse(slots[0]);
     96         if (slots.Length > 1) node.Slot.End = int.Parse(slots[1]);
     97 
     98         for (int index = 9; index < segs.Length; index++)
     99         {
    100             if (node.RestSlots == null)
    101                 node.RestSlots = new List<HashSlot>();
    102 
    103             slots = segs[index].Split('-');
    104 
    105             int s1 = 0;
    106             int s2 = 0;
    107             bool b1 = int.TryParse(slots[0], out s1);
    108             bool b2 = int.TryParse(slots[1], out s2);
    109             if (!b1 || !b2)
    110                 continue;
    111             else
    112                 node.RestSlots.Add(new HashSlot(s1, slots.Length > 1 ? new Nullable<int>(s2) : null));
    113         }
    114     }
    115 
    116     return node;
    117 }

    View Code

     

      三、為節點分配緩存客戶端管理器

      在單實例的Redis中,我們通過 PooledRedisClientManager 這個管理器來獲取RedisClient。借鑒這個思路,在Redis Cluster集群中,我們為每一個主節點實例化一個 PooledRedisClientManager,並且該主節點持有的 slot 都共享一個 PooledRedisClientManager 實例。以 slot 做為 key 將 slot 與 PooledRedisClientManager 一一映射並緩存起來。

     1 // 初始化集群管理
     2 void Initialize(IList<InternalClusterNode> clusterNodes = null)
     3 {
     4     // 從 redis 讀取集群信息
     5     IList<InternalClusterNode> nodes = clusterNodes == null ? RedisCluster.ReadClusterNodes(_source) : clusterNodes;
     6 
     7     // 生成主節點,每個主節點的 slot 對應一個REDIS客戶端緩衝池管理器
     8     IList<InternalClusterNode> masters = null;
     9     IDictionary<int, PooledRedisClientManager> managers = null;
    10     foreach (var n in nodes)
    11     {
    12         // 節點無效或者
    13         if (!(n.IsMater &&
    14             !string.IsNullOrEmpty(n.Host) &&
    15             string.IsNullOrEmpty(n.NodeFlag) &&
    16             (string.IsNullOrEmpty(n.LinkState) || n.LinkState == InternalClusterNode.CONNECTED))) continue;
    17 
    18         n.SlaveNodes = nodes.Where(x => x.MasterNodeId == n.NodeId);
    19         if (masters == null)
    20             masters = new List<InternalClusterNode>();
    21         masters.Add(n);
    22 
    23         // 用每一個主節點的哈希槽做鍵,導入REDIS客戶端緩衝池管理器
    24         // 然後,方法表指針(又名類型對象指針)上場,佔據 4 個字節。 4 * 16384 / 1024 = 64KB
    25         if (managers == null)
    26             managers = new Dictionary<int, PooledRedisClientManager>();
    27 
    28         string[] writeHosts = new[] { n.HostString };
    29         string[] readHosts = n.SlaveNodes.Where(n => false).Select(n => n.HostString).ToArray();
    30         var pool = new PooledRedisClientManager(writeHosts, readHosts, _config);
    31         managers.Add(n.Slot.Start, pool);
    32         if (n.Slot.End != null)
    33         {
    34             // 這個範圍內的哈希槽都用同一個緩衝池
    35             for (int s = n.Slot.Start + 1; s <= n.Slot.End.Value; s++)
    36                 managers.Add(s, pool);
    37         }
    38         if (n.RestSlots != null)
    39         {
    40             foreach (var slot in n.RestSlots)
    41             {
    42                 managers.Add(slot.Start, pool);
    43                 if (slot.End != null)
    44                 {
    45                     // 這個範圍內的哈希槽都用同一個緩衝池
    46                     for (int s = slot.Start + 1; s <= slot.End.Value; s++)
    47                         managers.Add(s, pool);
    48                 }
    49             }
    50         }
    51     }
    52 
    53     _masters = masters;
    54     _redisClientManagers = managers;
    55     _clusterNodes = nodes != null ? nodes : null;
    56 
    57     if (_masters == null) _masters = new List<InternalClusterNode>(0);
    58     if (_clusterNodes == null) _clusterNodes = new List<InternalClusterNode>(0);
    59     if (_redisClientManagers == null) _redisClientManagers = new Dictionary<int, PooledRedisClientManager>(0);
    60 
    61     if (_masters.Count > 0)
    62         _source = _masters.Select(n => new ClusterNode(n.Host, n.Port, n.Password)).ToList();
    63 }

    View Code

     

      四、將 hash slot 路由到正確的節點

      在訪問一個 key 時,根據第三步緩存起來的 PooledRedisClientManager ,用 key 計算出來的 hash slot 值可以快速找出這個 key 對應的 PooledRedisClientManager 實例,調用 PooledRedisClientManager.GetClient() 即可將 hash slot 路由到正確的主節點。

     1 // 執行指定動作並返回值
     2 private T DoExecute<T>(string key, Func<RedisClient, T> action) => this.DoExecute(() => this.GetRedisClient(key), action);
     3 
     4 // 執行指定動作並返回值
     5 private T DoExecute<T>(Func<RedisClient> slot, Func<RedisClient, T> action, int tryTimes = 1)
     6 {
     7     RedisClient c = null;
     8     try
     9     {
    10         c = slot();
    11         return action(c);
    12     }
    13     catch (Exception ex)
    14     {
    15         // 此處省略 ...
    16     }
    17     finally
    18     {
    19         if (c != null)
    20             c.Dispose();
    21     }
    22 }
    23 
    24 // 獲取指定key對應的主設備節點
    25 private RedisClient GetRedisClient(string key)
    26 {
    27     if (string.IsNullOrEmpty(key))
    28         throw new ArgumentNullException("key");
    29 
    30     int slot = CRC16.GetSlot(key);
    31     if (!_redisClientManagers.ContainsKey(slot))
    32         throw new SlotNotFoundException(string.Format("No reachable node in cluster for slot {{{0}}}", slot), slot, key);
    33 
    34     var pool = _redisClientManagers[slot];
    35     return (RedisClient)pool.GetClient();
    36 }

       

      五、自動發現新節點和自動刷新slot分佈

      在實際生產環境中,Redis 集群經常會有添加/刪除節點、遷移 slot 、主節點宕機從節點轉主節點等,針對這些情況,我們的 Redis Cluster 組件必須具備自動發現節點和刷新在 第三步  緩存起來的 slot 的能力。在這裏我的實現思路是當節點執行 Redis 命令時返回 RedisException 異常時就強制刷新集群節點信息並重新緩存 slot 與 節點之間的映射。

      1 // 執行指定動作並返回值
      2 private T DoExecute<T>(Func<RedisClient> slot, Func<RedisClient, T> action, int tryTimes = 1)
      3 {
      4     RedisClient c = null;
      5     try
      6     {
      7         c = slot();
      8         return action(c);
      9     }
     10     catch (Exception ex)
     11     {
     12         if (!(ex is RedisException) || tryTimes == 0) throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex);
     13         else
     14         {
     15             tryTimes -= 1;
     16             // 嘗試重新刷新集群信息
     17             bool isRefresh = DiscoveryNodes(_source, _config);
     18             if (isRefresh)
     19                 // 集群節點有更新過,重新執行
     20                 return this.DoExecute(slot, action, tryTimes);
     21             else
     22                 // 集群節點未更新過,直接拋出異常
     23                 throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex);
     24         }
     25     }
     26     finally
     27     {
     28         if (c != null)
     29             c.Dispose();
     30     }
     31 }
     32 
     33 // 重新刷新集群信息
     34 private bool DiscoveryNodes(IEnumerable<ClusterNode> source, RedisClientManagerConfig config)
     35 {
     36     bool lockTaken = false;
     37     try
     38     {
     39         // noop
     40         if (_isDiscoverying) { }
     41 
     42         Monitor.Enter(_objLock, ref lockTaken);
     43 
     44         _source = source;
     45         _config = config;
     46         _isDiscoverying = true;
     47 
     48         // 跟上次同步時間相隔 {MONITORINTERVAL} 秒鐘以上才需要同步
     49         if ((DateTime.Now - _lastDiscoveryTime).TotalMilliseconds >= MONITORINTERVAL)
     50         {
     51             bool isRefresh = false;
     52             IList<InternalClusterNode> newNodes = RedisCluster.ReadClusterNodes(_source);
     53             foreach (var node in newNodes)
     54             {
     55                 var n = _clusterNodes.FirstOrDefault(x => x.HostString == node.HostString);
     56                 isRefresh =
     57                     n == null ||                        // 新節點                                                                
     58                     n.Password != node.Password ||      // 密碼變了                                                                
     59                     n.IsMater != node.IsMater ||        // 主變從或者從變主                                                                
     60                     n.IsSlave != node.IsSlave ||        // 主變從或者從變主                                                                
     61                     n.NodeFlag != node.NodeFlag ||      // 節點標記位變了                                                                
     62                     n.LinkState != node.LinkState ||    // 節點狀態位變了                                                                
     63                     n.Slot.Start != node.Slot.Start ||  // 哈希槽變了                                                                
     64                     n.Slot.End != node.Slot.End ||      // 哈希槽變了
     65                     (n.RestSlots == null && node.RestSlots != null) ||
     66                     (n.RestSlots != null && node.RestSlots == null);
     67                 if (!isRefresh && n.RestSlots != null && node.RestSlots != null)
     68                 {
     69                     var slots1 = n.RestSlots.OrderBy(x => x.Start).ToList();
     70                     var slots2 = node.RestSlots.OrderBy(x => x.Start).ToList();
     71                     for (int index = 0; index < slots1.Count; index++)
     72                     {
     73                         isRefresh =
     74                             slots1[index].Start != slots2[index].Start ||   // 哈希槽變了                                                                
     75                             slots1[index].End != slots2[index].End;         // 哈希槽變了
     76                         if (isRefresh) break;
     77                     }
     78                 }
     79 
     80                 if (isRefresh) break;
     81             }
     82 
     83             if (isRefresh)
     84             {
     85                 // 重新初始化集群
     86                 this.Dispose();
     87                 this.Initialize(newNodes);
     88                 this._lastDiscoveryTime = DateTime.Now;
     89             }
     90         }
     91 
     92         // 最後刷新時間在 {MONITORINTERVAL} 內,表示是最新群集信息 newest
     93         return (DateTime.Now - _lastDiscoveryTime).TotalMilliseconds < MONITORINTERVAL;
     94     }
     95     finally
     96     {
     97         if (lockTaken)
     98         {
     99             _isDiscoverying = false;
    100             Monitor.Exit(_objLock);
    101         }
    102     }
    103 }

    View Code

     

      六、配置訪問組件調用入口

      最後我們需要為組件提供訪問入口,我們用 RedisCluster 類實現 字符串、列表、哈希、集合、有序集合和Keys的基本操作,並且用 RedisClusterFactory 工廠類對外提供單例操作,這樣就可以像單實例 Redis 那樣調用 Redis Cluster 集群。調用示例:

    var node = new ClusterNode("127.0.0.1", 7001);
    var redisCluster = RedisClusterFactory.Configure(node, config);
    string key = "B070x14668";
    redisCluster.Set(key, key);
    string value = redisCluster.Get<string>(key);
    redisCluster.Del(key);
     1 /// <summary>
     2 /// REDIS 集群工廠
     3 /// </summary>
     4 public class RedisClusterFactory
     5 {
     6     static RedisClusterFactory _factory = new RedisClusterFactory();
     7     static RedisCluster _cluster = null;
     8 
     9     /// <summary>
    10     /// Redis 集群
    11     /// </summary>
    12     public static RedisCluster Cluster
    13     {
    14         get
    15         {
    16             if (_cluster == null)
    17                 throw new Exception("You should call RedisClusterFactory.Configure to config cluster first.");
    18             else
    19                 return _cluster;
    20         }
    21     }
    22 
    23     /// <summary>
    24     /// 初始化 <see cref="RedisClusterFactory"/> 類的新實例
    25     /// </summary>
    26     private RedisClusterFactory()
    27     {
    28     }
    29 
    30     /// <summary>
    31     /// 配置 REDIS 集群
    32     /// <para>若群集中有指定 password 的節點,必須使用  IEnumerable&lt;ClusterNode&gt; 重載列舉出這些節點</para>
    33     /// </summary>
    34     /// <param name="node">集群節點</param>
    35     /// <returns></returns>
    36     public static RedisCluster Configure(ClusterNode node)
    37     {
    38         return RedisClusterFactory.Configure(node, null);
    39     }
    40 
    41     /// <summary>
    42     /// 配置 REDIS 集群
    43     /// <para>若群集中有指定 password 的節點,必須使用  IEnumerable&lt;ClusterNode&gt; 重載列舉出這些節點</para>
    44     /// </summary>
    45     /// <param name="node">集群節點</param>
    46     /// <param name="config"><see cref="RedisClientManagerConfig"/> 客戶端緩衝池配置</param>
    47     /// <returns></returns>
    48     public static RedisCluster Configure(ClusterNode node, RedisClientManagerConfig config)
    49     {
    50         return RedisClusterFactory.Configure(new List<ClusterNode> { node }, config);
    51     }
    52 
    53     /// <summary>
    54     /// 配置 REDIS 集群
    55     /// </summary>
    56     /// <param name="nodes">集群節點</param>
    57     /// <param name="config"><see cref="RedisClientManagerConfig"/> 客戶端緩衝池配置</param>
    58     /// <returns></returns>
    59     public static RedisCluster Configure(IEnumerable<ClusterNode> nodes, RedisClientManagerConfig config)
    60     {
    61         if (nodes == null)
    62             throw new ArgumentNullException("nodes");
    63 
    64         if (nodes == null || nodes.Count() == 0)
    65             throw new ArgumentException("There is no nodes to configure cluster.");
    66 
    67         if (_cluster == null)
    68         {
    69             lock (_factory)
    70             {
    71                 if (_cluster == null)
    72                 {
    73                     RedisCluster c = new RedisCluster(nodes, config);
    74                     _cluster = c;
    75                 }
    76             }
    77         }
    78 
    79         return _cluster;
    80     }
    81 }

    View Code

     

      總結

      今天我們詳細介紹了如何從0手寫一個Redis Cluster集群客戶端訪問組件,相信對同樣在尋找類似解決方案的同學們會有一定的啟發,喜歡的同學請點個 star。在沒有相同案例可以參考的情況下筆者通過查閱官方說明文檔和借鑒 Java 的 JedisCluster 的實現思路,雖說磕磕碰碰但最終也初步完成這個組件並投入使用,必須給自己加一個雞腿!!在此我有一個小小的疑問,.NET 的同學們在用 Redis 集群時,你們是用什麼組件耍的,為何網上的相關介紹和現成組件幾乎都沒有?歡迎討論。

      GitHub 代碼託管:https://github.com/TANZAME/ServiceStack.Redis.Cluster

      技術交流 QQ 群:816425449

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※超省錢租車方案

    ※別再煩惱如何寫文案,掌握八大原則!

    ※回頭車貨運收費標準

    ※教你寫出一流的銷售文案?

    FB行銷專家,教你從零開始的技巧

    聚甘新

  • 印度雨季後氣候乾燥 德里登革熱疫情飆升

    摘錄自2018年09月18日中央社報導

    「印度斯坦時報」(Hindustan Times)18日引述南德里市政機構(SDMC)彙整新德里、東德里和北德里等另3個市政機構的統計顯示,德里地區今年迄今的登革熱病例達243例。

    在15日之前的一個星期,德里地區就通報了106起登革熱病例,高達全年迄今243例的近一半。這段時間是德里近2個月連續降雨以來,首個未降雨的星期。而雨季後出現的乾燥氣候,專家認為是登革熱病例突然飆升的主因。醫護人員和專家都擔心,最近登革熱病例的上升,現在只是開端。

    為防止登革熱疫情傳布,南德里市政機構最近已對社區大規模噴灑殺蟲劑。北德里市政機構也表示,正派檢查員調查和處罰未清理積水者。

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

    聚甘新

  • 造假醜聞、禁令、民眾信心跌 德國柴油車榮景恐回不去了

    環境資訊中心記者 陳文姿報導

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※別再煩惱如何寫文案,掌握八大原則!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

    FB行銷專家,教你從零開始的技巧

    聚甘新