標籤: 聚甘新

  • Spring Data 教程 – Redis

    Spring Data 教程 – Redis

    1. Redis簡介

    Redis(Remote Dictionary Server ),即遠程字典服務,是一個開源的使用ANSI C語言編寫、支持網絡、可基於內存亦可持久化的日誌型、Key-Value 數據庫,並提供多種語言的API。Redis 是一個高性能的key-value數據庫。 redis的出現,在部分場合可以對關係數據庫起到很好的補充作用。它提供了Java,C/C++,C#,PHP,JavaScript,Perl,Object-C,Python,Ruby,Erlang等客戶端,使用很方便。

    Redis支持主從同步。數據可以從主服務器向任意數量的從服務器上同步,從服務器可以是關聯其他從服務器的主服務器。這使得Redis可執行單層樹複製。存盤可以有意無意的對數據進行寫操作。由於完全實現了發布/訂閱機制,使得從數據庫在任何地方同步樹時,可訂閱一個頻道並接收主服務器完整的消息發布記錄。同步對讀取操作的可擴展性和數據冗餘很有幫助。

    redis的key都是字符串String類型,它的value是多樣化的,如下圖:

    redis數據類型 ENCODING返回的編碼 底層對應的數據結構
    string int long類型的整數
    string embstr embstr編碼的簡單動態字符串
    string raw 簡單動態字符串
    list ziplist 壓縮列表
    list linkedlist 雙向鏈表
    hash ziplist 壓縮列表
    hash ht 字典
    set intset 整數集合
    set ht 字典
    zset ziplist 壓縮列表
    zset skiplist 跳錶

    2. Redis的五種數據類型

    2.1 字符串對象(String)

    字符串對象的模型:

    redis底層提供了三種不同的數據結構實現字符串對象,根據不同的數據自動選擇合適的數據結構。這裏的字符串對象並不是指的純粹的字符串,数字也是可以的。

    • int:當數據是long類型的整数字符串時,底層使用long類型的整數實現。這個值會直接存儲在字符串對象的ptr屬性中,同時OBJECT ENCODING為int。

    • raw:當數據為長度大於44字節的字符串時,底層使用簡單動態字符串實現,說到這裏就不得不提下redis的簡單隨機字符串(Simple Dynamic String,SDS),SDS有三個屬性,free,len和buf。free存的是還剩多少空間,len存的是目前字符串長度,不包含結尾的空字符。buf是一個list,存放真實字符串數據,包含free和空字符。針對SDS本文不做詳細介紹,歡迎點擊SDS了解。

    • embstr:當數據為長度小於44字節的字符串時,底層使用embstr編碼的簡單動態字符串實現。相比於raw,embstr內存分配只需要一次就可完成,分配的是一塊連續的內存空間。

    2.2 列表對象(List)

    列表對象的模型:

    redis中的列表對象經常被用作消息隊列使用,底層採用ziplist和linkedlist實現。大家使用的時候當作鏈表使用就可以了。

    • ziplist

      列表對象使用ziplist編碼需要滿足兩個要求,一是所有字符串長度都小於設定值值64字節(可以在配置文件中修改list-max-ziplist-value字段改變)。二是所存元素數量小於設定值512個(可以在配置文件中修改list-max-ziplist-entries字段改變)。ziplist類似與python中的list,佔用一段連續的內存地址,由此減小指針內存佔用。

      zlbytes:占內存總數

      zltail:到尾部的偏移量

      zllen:內部節點數

      node:節點

      zlend:尾部標識

      previous_entry_length:前一節點的長度

      encoding:數據類型

      content:真實數據

      遍歷的時候會根據zlbytes和zltail直接找到尾部節點nodeN,然後根據每個節點的previous_entry_length反向遍歷。增加和刪除節點會導致其他節點連鎖更新,因為每個節點都存儲了前一節點的長度。

    • linkedlist

      linkedlist有三個屬性,head,tail和len。head指向鏈表的頭部,tail指向鏈表的尾部,len為鏈表的長度。

    2.3 哈希類型對象(Hash)

    哈希類型對象的模型:

    redis的value類型hash類型,其實就是map類型,就是在值的位置放一個map類型的數據。大家想詳細了解一下,可以參考一下這篇文章:https://www.jianshu.com/p/658365f0abfc 。

    2.4 集合對象(Set)

    集合對象類型的模型:

    Set類型的value保證每個值都不重複。

    redis中的集合對象底層有兩種實現方式,分別有整數集合和hashtable。當所有元素都是整數且元素數小於512(可在配置文件中set-max-intset-entries字段配置)時採用整數集合實現,其餘情況都採用hashtable實現。hashtable請移駕上文鏈接查閱,接下來介紹整數集合intset。intset有三個屬性,encoding:記錄数字的類型,有int16,int32和int64等,length:記錄集合的長度,content:存儲具體數據。具體結構如下圖:

    2.5 有序集合對象

    有序集合對象(zset)和集合對象(set)沒有很大區別,僅僅是多了一個分數(score)用來排序。

    redis中的有序集合底層採用ziplist和skiplist跳錶實現,當所有字符串長度都小於設定值值64字節(可以在配置文件中修改list-max-ziplist-value字段改變),並且所存元素數量小於設定值512個(可以在配置文件中修改list-max-ziplist-entries字段改變)使用ziplist實現,其他情況均使用skiplist實現,跳躍表的實現原理這裏偷個懶,給大家推薦一篇寫的非常好的博客,點擊查看跳躍表原理。

    3. Redis的安裝

    可以去官網或者中文網下載Redis。redis的windows版本現在已經不更新了,所以我們安裝redis的6.0.3版本,這個版本支持的東西很多,在此次教程中,我們只對redis的五種數據類型做解釋和學習。

    官網:https://redis.io/

    中文網:https://www.redis.net.cn/

    本教程安裝的redis版本為6.0.3版本,redis使用C語言編寫的,CentOS7的gcc自帶版本為4.8.5,而redis6.0+需要的gcc版本為5.3及以上,所以需要升級gcc版本。

    下載Linux版本的tar.gz包,解壓以後進入解壓產生的包:

    cd redis-6.0.3
    

    發現沒有bin目錄,這裏需要通過make進行安裝。

    # 先檢查gcc的環境 
    gcc -v 
    # 查看gcc版本 
    yum -y install centos-release-scl 
    # 升級到9.1版本 
    yum -y install devtoolset-9-gcc devtoolset-9-gcc- c++ devtoolset-9-binutils 
    
    scl enable devtoolset-9 bash 
    #以上為臨時啟用,如果要長期使用gcc 9.1的話: 
    echo "source /opt/rh/devtoolset-9/enable" >>/etc/profile 
    # 進入redis解壓文件 
    make 
    # 6.0的坑,gcc版本 9.0 以上
    # 等待完畢
    

    執行完make操作之後,就可以在redis目錄看到src目錄了。進入src目錄后就可以看到redis-serverredis-cli

    這裏建議將Redis的配置文件複製,保留一份原生的配置文件。

    redis的配置大家可以在網上搜一下常用的配置,在這裏給大家推薦一個常用的配置,比較詳細:

    https://blog.csdn.net/ymrfzr/article/details/51362125

    到這裏redis就可以啟動並且正常訪問了。

    注意:一定要將redis的IP地址綁定註釋掉,允許所有的IP地址訪問,不然我們從Windows訪問就訪問不了。

    註釋掉下面的這一行:

    同時關閉Redis的服務保護模式,將protected-mode設置為no。如下:

    4. Spring Boot 整合 Redis

    • 4.1 搭建工程,引入依賴

      搭建工程的操作我這裏就不在寫出來了。直接上pom.xml

      <!--springboot父工程-->
      <parent>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-parent</artifactId>
          <version>2.2.2.RELEASE</version>
          <relativePath/> <!-- lookup parent from repository -->
      </parent>
      
      <dependencies>
          <!--springboot-web組件-->
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-web</artifactId>
              <version>2.2.2.RELEASE</version>
          </dependency>
          <!--redis整合springboot組件-->
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-data-redis</artifactId>
              <version>2.3.0.RELEASE</version>
          </dependency>
          <!--lombok組件-->
          <dependency>
              <groupId>org.projectlombok</groupId>
              <artifactId>lombok</artifactId>
              <version>1.18.10</version>
          </dependency>
      </dependencies>
      
    • 4.2 redis的配置

      項目的配置文件,application.yml

      butterflytri:
        host: 127.0.0.1
      server:
        port: 8080 # 應用端口
        servlet:
          context-path: /butterflytri # 應用映射
      spring:
        application:
          name: redis # 應用名稱
        redis:
          host: ${butterflytri.host} # redis地址
          port: 6379 # redis端口,默認是6379
          timeout: 10000 # 連接超時時間(ms)
          database: 0 # redis默認情況下有16個分片,這裏配置具體使用的分片,默認是0
          jedis: # 使用連接redis的工具-jedis
            pool:
              max-active: 8 # 連接池最大連接數(使用負值表示沒有限制) 默認 8
              max-wait: -1 # 連接池最大阻塞等待時間(使用負值表示沒有限制) 默認 -1
              max-idle: 8 # 連接池中的最大空閑連接 默認 8
              min-idle: 0 # 連接池中的最小空閑連接 默認 0
      

      另外還有額外的配置類RedisConfig.java

      package com.butterflytri.config;
      
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.data.redis.connection.RedisConnectionFactory;
      import org.springframework.data.redis.core.RedisTemplate;
      import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
      import org.springframework.data.redis.serializer.RedisSerializer;
      import org.springframework.data.redis.serializer.StringRedisSerializer;
      
      /**
       * @author: WJF
       * @date: 2020/5/24
       * @description: RedisConfig
       */
      
      @Configuration
      public class RedisConfig {
      
          /**
           * redis鍵值對的值的序列化方式:通用方式
           * @return RedisSerializer
           */
          private RedisSerializer redisValueSerializer() {
              return new GenericJackson2JsonRedisSerializer();
          }
      
          /**
           * redis鍵值對的健的序列化方式:所有的健都是字符串
           * @return RedisSerializer
           */
          private RedisSerializer redisKeySerializer() {
              return new StringRedisSerializer();
          }
      
          @Bean("redisTemplate")
          public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
              RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
              redisTemplate.setConnectionFactory(redisConnectionFactory);
              redisTemplate.setKeySerializer(redisKeySerializer());
              redisTemplate.setValueSerializer(redisValueSerializer());
              return redisTemplate;
          }
      
      }
      
    • 4.3 redisTemplate的使用

      value類型的值的CRUD:

      ValueServiceImpl.java

      package com.butterflytri.service.impl;
      
      import com.butterflytri.service.ValueService;
      import org.springframework.data.redis.core.RedisTemplate;
      import org.springframework.stereotype.Service;
      
      import javax.annotation.Resource;
      
      /**
       * @author: WJF
       * @date: 2020/5/27
       * @description: ValueServiceImpl
       */
      @Service
      public class ValueServiceImpl implements ValueService {
      
          @Resource
          private RedisTemplate<String, Object> redisTemplate;
      
          @Override
          public void addValue(String key, Object value) {
              redisTemplate.opsForValue().set(key,value);
          }
      
          @Override
          public Object get(String key) {
              return redisTemplate.opsForValue().get(key);
          }
      
          @Override
          public Object update(String key, Object newValue) {
              return redisTemplate.opsForValue().getAndSet(key,newValue);
          }
      
          @Override
          public void delete(String key) {
              redisTemplate.delete(key);
          }
      }	
      

      List類型的值的CRUD:

      這裏我加了枚舉類型用來控制增加的位置,因為List類型對應的是鏈表。

      ListServiceImpl.java

      package com.butterflytri.service.impl;
      
      import com.butterflytri.enums.OpsType;
      import com.butterflytri.service.ListService;
      import org.springframework.data.redis.core.RedisTemplate;
      import org.springframework.stereotype.Service;
      
      import javax.annotation.Resource;
      import java.util.List;
      
      /**
       * @author: WJF
       * @date: 2020/5/28
       * @description: ListServiceImpl
       */
      @Service
      public class ListServiceImpl implements ListService {
      
          @Resource
          private RedisTemplate<String, Object> redisTemplate;
      
          @Override
          public void addList(String key, List<Object> list, OpsType type) {
              switch (type) {
                  case RIGHT:
                      redisTemplate.opsForList().rightPushAll(key, list);
                      break;
                  case LEFT:
                      redisTemplate.opsForList().leftPushAll(key, list);
                      break;
                  default:
                      throw new RuntimeException("type不能為null");
              }
          }
      
          @Override
          public void add(String redisKey, Object value, OpsType type) {
              switch (type) {
                  case RIGHT:
                      redisTemplate.opsForList().rightPush(redisKey, value);
                      break;
                  case LEFT:
                      redisTemplate.opsForList().leftPush(redisKey, value);
                      break;
                  default:
                      throw new RuntimeException("type不能為null");
              }
          }
      
          @Override
          public List<Object> get(String key) {
              return redisTemplate.opsForList().range(key, 0, -1);
          }
      
          @Override
          public Object update(String key, Object value, Integer index) {
              Object obj = redisTemplate.opsForList().index(key, index);
              redisTemplate.opsForList().set(key,index,value);
              return obj;
          }
      
          @Override
          public void delete(String key) {
              redisTemplate.delete(key);
          }
      
          @Override
          public void deleteValue(String redisKey, OpsType type) {
              switch (type) {
                  case RIGHT:
                      redisTemplate.opsForList().rightPop(redisKey);
                      break;
                  case LEFT:
                      redisTemplate.opsForList().leftPop(redisKey);
                      break;
                  default:
                      throw new RuntimeException("type不能為null");
              }
          }
      }
      

      Hash類型的值的CRUD:

      hash類型是我們使用最常用的類型。

      HashServiceImpl.java:

      package com.butterflytri.service.impl;
      
      import com.butterflytri.service.HashService;
      import org.springframework.data.redis.core.RedisTemplate;
      import org.springframework.stereotype.Service;
      
      import javax.annotation.Resource;
      import java.util.Map;
      
      /**
       * @author: WJF
       * @date: 2020/5/28
       * @description: HashServiceImpl
       */
      @Service
      public class HashServiceImpl implements HashService {
      
          @Resource
          private RedisTemplate<String, Object> redisTemplate;
      
          @Override
          public void addHashAll(String key, Map<String, Object> value) {
              redisTemplate.opsForHash().putAll(key, value);
          }
      
          @Override
          public void addHash(String redisKey, String key, Object value) {
              redisTemplate.opsForHash().put(redisKey, key, value);
          }
      
          @Override
          public Object get(String redisKey, String key) {
              return redisTemplate.opsForHash().get(redisKey, key);
          }
      
          @Override
          public Object update(String redisKey, String key, Object value) {
              Object obj = this.get(redisKey, key);
              this.delete(redisKey,key);
              redisTemplate.opsForHash().put(redisKey, key, value);
              return obj;
          }
      
          @Override
          public void delete(String redisKey, String key) {
              redisTemplate.opsForHash().delete(redisKey, key);
          }
      
          @Override
          public void deleteAll(String redisKey) {
              redisTemplate.delete(redisKey);
          }
      }
      

      Set的值的CRUD:

      package com.butterflytri.service.impl;
      
      import com.butterflytri.service.SetService;
      import org.springframework.data.redis.core.RedisTemplate;
      import org.springframework.stereotype.Service;
      
      import javax.annotation.Resource;
      import java.util.Set;
      
      /**
       * @author: WJF
       * @date: 2020/5/28
       * @description: SetServiceImpl
       */
      @Service
      public class SetServiceImpl implements SetService {
      
          @Resource
          private RedisTemplate<String, Object> redisTemplate;
      
      
          @Override
          public void addAll(String key, Set<Object> set) {
              redisTemplate.opsForSet().add(key,set);
          }
      
          @Override
          public void add(String key, Object value) {
              redisTemplate.opsForSet().add(key,value);
          }
      
          @Override
          public Set<Object> findAll(String key) {
              return redisTemplate.opsForSet().members(key);
          }
      
          @Override
          public void deleteValue(String key, Object value) {
              redisTemplate.opsForSet().remove(key,value);
          }
      
          @Override
          public void delete(String key) {
              redisTemplate.delete(key);
          }
      }
      

      ZSet類型的值的CRUD:

      package com.butterflytri.service.impl;
      
      import com.butterflytri.service.SortedSetService;
      import org.springframework.data.redis.core.RedisTemplate;
      import org.springframework.stereotype.Service;
      
      import javax.annotation.Resource;
      import java.util.LinkedHashSet;
      
      /**
       * @author: WJF
       * @date: 2020/5/28
       * @description: SortedSetServiceImpl
       */
      @Service
      public class SortedSetServiceImpl implements SortedSetService {
      
          @Resource
          private RedisTemplate<String, Object> redisTemplate;
      
          @Override
          public void add(String key, String value, Double score) {
              redisTemplate.opsForZSet().add(key, value, score);
          }
      
          @Override
          public LinkedHashSet<Object> findAll(String key) {
              return (LinkedHashSet<Object>) redisTemplate.opsForZSet().range(key,0,-1);
          }
      
          @Override
          public Long count(String key, Double scoreFrom, Double scoreTo) {
              return redisTemplate.opsForZSet().count(key,scoreFrom,scoreTo);
          }
      
          @Override
          public LinkedHashSet<Object> findByScore(String key, Double scoreFrom, Double scoreTo) {
              return (LinkedHashSet<Object>) redisTemplate.opsForZSet().rangeByScore(key,scoreFrom,scoreTo);
          }
      
          @Override
          public Long rank(String key, Object value) {
              return redisTemplate.opsForZSet().rank(key,value);
          }
      
          @Override
          public void remove(String key, String value) {
              redisTemplate.opsForZSet().remove(key,value);
          }
      
          @Override
          public void delete(String key) {
              redisTemplate.delete(key);
          }
      
      }
      

      redis的Java客戶端有很多,在這裏我們使用的是jedis,還有一個很好的Java語言的客戶端叫lettuce,大家可以去了解一下,Spring從不重複造輪子,只會簡化輪子的使用,redisTemplate就是一個超級簡單的使用實現。到這裏redis整合Spring Boot 就結束了。

    5. 項目地址

    本項目傳送門:

    • GitHub —> spring-data-redis
    • Gitee —> spring-data-redis

    此教程會一直更新下去,覺得博主寫的可以的話,關注一下,也可以更方便下次來學習。

    • 作者:Butterfly-Tri
    • 出處:Butterfly-Tri個人博客
    • 版權所有,歡迎保留原文鏈接進行轉載

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※Google地圖已可更新顯示潭子電動車充電站設置地點!!

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※別再煩惱如何寫文案,掌握八大原則!

    網頁設計最專業,超強功能平台可客製化

    聚甘新

  • xenomai內核解析之雙核系統調用(一)

    xenomai內核解析之雙核系統調用(一)

    版權聲明:本文為本文為博主原創文章,轉載請註明出處。如有錯誤,歡迎指正。博客地址:https://www.cnblogs.com/wsg1100/

    目錄

    • xenomai 內核系統調用
      • 一、32位Linux系統調用
      • 二、32位實時系統調用
      • 三、 64位系統調用
      • 五、 實時系統調用表cobalt_syscalls
      • 六、實時系統調用權限控制cobalt_sysmodes
      • 參考

    xenomai 內核系統調用

    解析系統調用是了解內核架構最有力的一把鑰匙,在這之前先搞懂xenomai與linux兩個內核共存后系統調用是如何實現的。

    為什麼需要系統調用

    linux內核中設置了一組用於實現系統功能的子程序,稱為系統調用。系統調用和普通庫函數調用非常相似,只是系統調用由操作系統核心提供,運行於內核態,而普通的函數調用由函數庫或用戶自己提供,運行於用戶態

    一般的,進程是不能訪問內核的。它不能訪問內核所佔內存空間也不能調用內核函數。CPU硬件決定了這些(這就是為什麼它被稱作“保護模式”

    為了和用戶空間上運行的進程進行交互,內核提供了一組接口。透過該接口,應用程序可以訪問硬件設備和其他操作系統資源。這組接口在應用程序和內核之間扮演了使者的角色,應用程序發送各種請求,而內核負責滿足這些請求(或者讓應用程序暫時擱置)。實際上提供這組接口主要是為了保證系統穩定可靠,避免應用程序肆意妄行,惹出大麻煩。

    系統調用在用戶空間進程和硬件設備之間添加了一个中間層。該層主要作用有三個:

    • 它為用戶空間提供了一種統一的硬件的抽象接口。比如當需要讀些文件的時候,應用程序就可以不去管磁盤類型和介質,甚至不用去管文件所在的文件系統到底是哪種類型。
    • 系統調用保證了系統的穩定和安全。作為硬件設備和應用程序之間的中間人,內核可以基於權限和其他一些規則對需要進行的訪問進行裁決。舉例來說,這樣可以避免應用程序不正確地使用硬件設備,竊取其他進程的資源,或做出其他什麼危害系統的事情。
    • 每個進程都運行在虛擬系統中,而在用戶空間和系統的其餘部分提供這樣一層公共接口,也是出於這種考慮。如果應用程序可以隨意訪問硬件而內核又對此一無所知的話,幾乎就沒法實現多任務和虛擬內存,當然也不可能實現良好的穩定性和安全性。在Linux中,系統調用是用戶空間訪問內核的惟一手段;除異常和中斷外,它們是內核惟一的合法入口。

    Linux加上實時系統內核xenomai后,實時任務常調用xenomai系統調用來完成實時的服務,如果實時任務需要用到linux的服務,還會調用linux的系統調用。

    一、32位Linux系統調用

    linux應用程序除直接系統調用外還會由glibc觸發系統調用,glibc為了提高應用程序的性能,對一些系統調用進行了封裝。
    32位系統系統調用使用軟中斷int 0x80指令實現,軟中斷屬於異常的一種,通過它陷入(trap)內核,trap在整理的文檔x86 Linux中斷系統有說明。tarp_init()中設置IDT(Interrupt Descriptor Table 每个中斷處理程序的地址都保存在一個特殊的位置)由關int 0x80的IDT如下:

    static const __initconst struct idt_data def_idts[] = {
    	......
    	SYSG(IA32_SYSCALL_VECTOR,	entry_INT80_32),
    	......
    };
    

    當生系統調用時,硬件根據向量號在 IDT 中找到對應的表項,即中斷描述符,進行特權級檢查,發現 DPL = CPL = 3 ,允許調用。然後硬件將切換到內核棧 (tss.ss0 : tss.esp0)。接着根據中斷描述符的 segment selector 在 GDT / LDT 中找到對應的段描述符,從段描述符拿到段的基址,加載到 cs 。將 offset 加載到 eip。最後硬件將 ss / sp / eflags / cs / ip / error code 依次壓到內核棧。於是開始執行entry_INT80_32函數,該函數在entry_32.S定義:

    ENTRY(entry_INT80_32)
    	ASM_CLAC
    	pushl	%eax		/* pt_regs->orig_ax */
    	SAVE_ALL pt_regs_ax=$-ENOSYS	/* *存儲當前用戶態寄存器,保存在pt_regs結構里*/
    	/*
    	 * User mode is traced as though IRQs are on, and the interrupt gate
    	 * turned them off.
    	 */
    	TRACE_IRQS_OFF
    
    	movl	%esp, %eax
    	call	do_int80_syscall_32
    .Lsyscall_32_done:
    	.......
    .Lirq_return:
    	INTERRUPT_RETURN/*iret 指令將原來用戶態保存的現場恢復回來,包含代碼段、指令指針寄存器等。這時候用戶態
    進程恢復執行。*/
    

    在內核棧的最高地址端,存放的是結構 pt_regs,首先通過 push 和 SAVE_ALL 將當前用戶態的寄存器,保存在棧中 pt_regs 結構裏面.保存完畢后,關閉中斷,將當前棧指針保存到 eax,即do_int80_syscall_32的參數1。
    調用do_int80_syscall_32=>do_syscall_32_irqs_on。先看看沒有ipipe時Linux實現如下:

    __always_inline void do_syscall_32_irqs_on(struct pt_regs *regs)
    {
    	struct thread_info *ti = pt_regs_to_thread_info(regs);
    	unsigned int nr = (unsigned int)regs->orig_ax;
    
    	.....
    	if (likely(nr < IA32_NR_syscalls)) {
    		nr = array_index_nospec(nr, IA32_NR_syscalls);
    		regs->ax = ia32_sys_call_table[nr](	/*根據系統調用號索引直接執行*/
    			(unsigned int)regs->bx, (unsigned int)regs->cx,
    			(unsigned int)regs->dx, (unsigned int)regs->si,
    			(unsigned int)regs->di, (unsigned int)regs->bp);
    	}
    	syscall_return_slowpath(regs);
    }
    

    在這裏,將系統調用號從pt_reges中eax 裏面取出來,然後根據系統調用號,在系統調用表中找到相應的函數進行調用,並將寄存器中保存的參數取出來,作為函數參數。如果仔細比對,就能發現,這些參數所對應的寄存器,和 Linux 的註釋是一樣的。ia32_sys_call_table系統調用表生成後面解析(此圖來源於網絡)。

    相關內核調用執行完后,一直返回到 do_syscall_32_irqs_on ,如果系統調用有返回值,會被保存到 regs->ax 中。接着返回 entry_INT80_32 繼續執行,最後執行 INTERRUPT_RETURN 。 INTERRUPT_RETURN 在 arch/x86/include/asm/irqflags.h 中定義為 iret ,iret 指令將原來用戶態保存的現場恢復回來,包含代碼段、指令指針寄存器等。這時候用戶態進程恢復執行。

    系統調用執行完畢。

    二、32位實時系統調用

    Xenomai使用I-pipe 攔截常規Linux系統調用調度程序,並將系統調用定向到實現它們的系統。

    實時系統調用,除了直接系統調用外,xenomai還實現了libcoblat實時庫,相當於glibc,通過libcoblat進行xenomai系統調用,以libcoblat庫函數sem_open為例,libcolat庫中C函數實現如下:

    COBALT_IMPL(sem_t *, sem_open, (const char *name, int oflags, ...))
    {
    	......
    	err = XENOMAI_SYSCALL5(sc_cobalt_sem_open,
    			       &rsem, name, oflags, mode, value);
    	if (err == 0) {
    		if (rsem != sem)
    			free(sem);
    		return &rsem->native_sem;
    	}
    	.......
    	return SEM_FAILED;
    }
    

    libcolat庫調用系統調用使用宏XENOMAI_SYSCALL5XENOAI_SYSCALL宏在\include\asm\xenomai\syscall.h中聲明,XENOMAI_SYSCALL5中的’5’代表’該系統調用有五個參數:

    #define XENOMAI_DO_SYSCALL(nr, op, args...)			\
    ({								\
    	unsigned __resultvar;					\
    	asm volatile (						\
    		LOADARGS_##nr					\
    		"movl %1, %%eax\n\t"				\
    		DOSYSCALL					\
    		RESTOREARGS_##nr				\
    		: "=a" (__resultvar)				\
    		: "i" (__xn_syscode(op)) ASMFMT_##nr(args)	\
    		: "memory", "cc");				\
    	(int) __resultvar;					\
    })
    
    #define XENOMAI_SYSCALL0(op)			XENOMAI_DO_SYSCALL(0,op)
    #define XENOMAI_SYSCALL1(op,a1)			XENOMAI_DO_SYSCALL(1,op,a1)
    #define XENOMAI_SYSCALL2(op,a1,a2)		XENOMAI_DO_SYSCALL(2,op,a1,a2)
    #define XENOMAI_SYSCALL3(op,a1,a2,a3)		XENOMAI_DO_SYSCALL(3,op,a1,a2,a3)
    #define XENOMAI_SYSCALL4(op,a1,a2,a3,a4)	XENOMAI_DO_SYSCALL(4,op,a1,a2,a3,a4)
    #define XENOMAI_SYSCALL5(op,a1,a2,a3,a4,a5)	XENOMAI_DO_SYSCALL(5,op,a1,a2,a3,a4,a5)
    

    每個宏中,內嵌另一個宏DOSYSCALL,即實現系統調用的int指令:int $0x80

    #define DOSYSCALL  "int $0x80\n\t"
    

    系統調用過程硬件處理及中斷入口上節一致,從do_syscall_32_irqs_on開始不同,有ipipe后變成下面這樣子:

    static __always_inline void do_syscall_32_irqs_on(struct pt_regs *regs)
    {
    	struct thread_info *ti = current_thread_info();
    	unsigned int nr = (unsigned int)regs->orig_ax;/*取出系統調用號*/
    	int ret;
    	
    	ret = pipeline_syscall(ti, nr, regs);/*pipeline 攔截系統調用*/
    	......
    done:
    	syscall_return_slowpath(regs);
    }
    

    套路和ipipe接管中斷類似,在關鍵路徑上攔截系統調用,然後調用ipipe_handle_syscall(ti, nr, regs)讓ipipe來接管處理:

    int ipipe_handle_syscall(struct thread_info *ti,
    			 unsigned long nr, struct pt_regs *regs)
    {
    	unsigned long local_flags = READ_ONCE(ti->ipipe_flags);
    	int ret; 
    	if (nr >= NR_syscalls && (local_flags & _TIP_HEAD)) {/*運行在head域且者系統調用號超過linux*/
    		ipipe_fastcall_hook(regs);			/*快速系統調用路徑*/
    		local_flags = READ_ONCE(ti->ipipe_flags);
    		if (local_flags & _TIP_HEAD) {
    			if (local_flags &  _TIP_MAYDAY)
    				__ipipe_call_mayday(regs);
    			return 1; /* don't pass down, no tail work. */
    		} else {
    			sync_root_irqs();
    			return -1; /* don't pass down, do tail work. */
    		}
    	}
    
    	if ((local_flags & _TIP_NOTIFY) || nr >= NR_syscalls) {
    		ret =__ipipe_notify_syscall(regs);
    		local_flags = READ_ONCE(ti->ipipe_flags);
    		if (local_flags & _TIP_HEAD)
    			return 1; /* don't pass down, no tail work. */
    		if (ret)
    			return -1; /* don't pass down, do tail work. */
    	}
    
    	return 0; /* pass syscall down to the host. */
    }
    

    這個函數的處理邏輯是這樣,怎樣區分xenomai系統調用和linux系統調用?每個CPU架構不同linux系統調用總數不同,在x86系統中有300多個,用變量NR_syscalls表示,系統調用號與系統調用一一對應。首先獲取到的系統調用號nr >= NR_syscalls,不用多想,那這個系統調用是xenomai內核的系統調用。
    另外還有個問題,如果是Linux非實時任務觸發的xenomai系統調用,或者xenomai 實時任務要調用linux的服務,這些交叉服務涉及實時任務與非實時任務在兩個內核之間運行,優先級怎麼處理等問題。這些涉及cobalt_sysmodes[].

    首先看怎麼區分一個任務是realtime還是no_realtime。在task_struct結構的頭有一個成員結構體thread_info,存儲着當前線程的信息,ipipe在結構體thread_info中增加了兩個成員變量ipipe_flagsipipe_data,ipipe_flags用來來標示一個線程是實時還是非實時,_TIP_HEAD置位表示已經是實時上下文。對於需要切換到xenomai上下文的系統調用_TIP_NOTIFY置位。

    struct thread_info {
    	unsigned long		flags;		/* low level flags */
    	u32			status;		/* thread synchronous flags */
    #ifdef CONFIG_IPIPE
    	unsigned long		ipipe_flags;
    	struct ipipe_threadinfo ipipe_data;
    #endif
    };
    

    ipipe_handle_syscall處理邏輯:
    1.對於已經在實時上下文的實時任務發起xenomai的系統調用,使用快速調用路徑函數ipipe_fastcall_hook(regs);
    2.需要切換到實時上下文或者非實時調用實時的,使用慢速調用路徑:

    __ipipe_notify_syscall(regs)
    ->ipipe_syscall_hook(caller_domain, regs)

    快速調用ipipe_fastcall_hook(regs)內直接handle_head_syscall執行代碼如下:

    static int handle_head_syscall(struct ipipe_domain *ipd, struct pt_regs *regs)
    {
    	....
    	code = __xn_syscall(regs);
    	nr = code & (__NR_COBALT_SYSCALLS - 1);
    	......
    	handler = cobalt_syscalls[code];
    	sysflags = cobalt_sysmodes[nr];
    	........
    
    	ret = handler(__xn_reg_arglist(regs));
    	.......
    
    	__xn_status_return(regs, ret);
    
    	.......
    }
    

    這個函數很複雜,涉及xenomai與linux之間很多聯繫,代碼是簡化后的,先取出系統調用號,然後從cobalt_syscalls取出系統調用入口handler,然後執行handler(__xn_reg_arglist(regs))執行完成后將執行結果放到寄存器ax,後面的文章會詳細分析ipipe如何處理系統調用。

    三、 64位系統調用

    我們再來看 64 位的情況,系統調用,不是用中斷了,而是改用 syscall 指令。並且傳遞參數的寄存器也變了。

    #define DO_SYSCALL(name, nr, args...)			\
    ({							\
    	unsigned long __resultvar;			\
    	LOAD_ARGS_##nr(args)				\
    	LOAD_REGS_##nr					\
    	asm volatile (					\
    		"syscall\n\t"				\
    		: "=a" (__resultvar)			\
    		: "0" (name) ASM_ARGS_##nr		\
    		: "memory", "cc", "r11", "cx");		\
    	(int) __resultvar;				\
    })
    
    #define XENOMAI_DO_SYSCALL(nr, op, args...) \
    	DO_SYSCALL(__xn_syscode(op), nr, args)
    
    #define XENOMAI_SYSBIND(breq) \
    	XENOMAI_DO_SYSCALL(1, sc_cobalt_bind, breq)
    

    這裏將系統調用號使用__xn_syscode(op)處理了一下,把最高位置1,表示Cobalt系統調用,然後使用syscall 指令。

    #define __COBALT_SYSCALL_BIT	0x10000000
    #define __xn_syscode(__nr)	(__COBALT_SYSCALL_BIT | (__nr))
    

    syscall 指令還使用了一種特殊的寄存器,我們叫特殊模塊寄存器(Model Specific Registers,簡稱 MSR)。這種寄存器是 CPU 為了完成某些特殊控制功能為目的的寄存器,其中就有系統調用。在系統初始化的時候,trap_init 除了初始化上面的中斷模式,這裏面還會調用 cpu_init->syscall_init。這裏面有這樣的代碼:

    wrmsrl(MSR_LSTAR, (unsigned long)entry_SYSCALL_64);
    

    rdmsr 和 wrmsr 是用來讀寫特殊模塊寄存器的。MSR_LSTAR 就是這樣一個特殊的寄存器, 當 syscall 指令調用的時候,會從這個寄存器裏面拿出函數地址來調用,也就是調entry_SYSCALL_64。
    該函數在’entry_64.S’定義:

    ENTRY(entry_SYSCALL_64)
    	UNWIND_HINT_EMPTY
    	......
    	swapgs
    	/*
    	 * This path is only taken when PAGE_TABLE_ISOLATION is disabled so it
    	 * is not required to switch CR3.
    	 */
    	movq	%rsp, PER_CPU_VAR(rsp_scratch)
    	movq	PER_CPU_VAR(cpu_current_top_of_stack), %rsp
    
    	/* Construct struct pt_regs on stack */
    	pushq	$__USER_DS			/* pt_regs->ss */
    	pushq	PER_CPU_VAR(rsp_scratch)	/* pt_regs->sp */
    	pushq	%r11				/* pt_regs->flags */
    	pushq	$__USER_CS			/* pt_regs->cs */
    	pushq	%rcx				/* pt_regs->ip *//*保存用戶太指令指針寄存器*/
    GLOBAL(entry_SYSCALL_64_after_hwframe)
    	pushq	%rax				/* pt_regs->orig_ax */
    
    	PUSH_AND_CLEAR_REGS rax=$-ENOSYS
    
    	TRACE_IRQS_OFF
    
    	/* IRQs are off. */
    	movq	%rsp, %rdi
    	call	do_syscall_64		/* returns with IRQs disabled */
    
    	TRACE_IRQS_IRETQ		/* we're about to change IF */
    
    	/*
    	 * Try to use SYSRET instead of IRET if we're returning to
    	 * a completely clean 64-bit userspace context.  If we're not,
    	 * go to the slow exit path.
    	 */
    	movq	RCX(%rsp), %rcx
    	movq	RIP(%rsp), %r11
    
    	cmpq	%rcx, %r11	/* SYSRET requires RCX == RIP */
    	jne	swapgs_restore_regs_and_return_to_usermode
    	.......
    	testq	$(X86_EFLAGS_RF|X86_EFLAGS_TF), %r11
    	jnz	swapgs_restore_regs_and_return_to_usermode
    
    	/* nothing to check for RSP */
    
    	cmpq	$__USER_DS, SS(%rsp)		/* SS must match SYSRET */
    	jne	swapgs_restore_regs_and_return_to_usermode
    
    	/*
    	 * We win! This label is here just for ease of understanding
    	 * perf profiles. Nothing jumps here.
    	 */
    syscall_return_via_sysret:
    	/* rcx and r11 are already restored (see code above) */
    	UNWIND_HINT_EMPTY
    	POP_REGS pop_rdi=0 skip_r11rcx=1
    
    	/*
    	 * Now all regs are restored except RSP and RDI.
    	 * Save old stack pointer and switch to trampoline stack.
    	 */
    	movq	%rsp, %rdi
    	movq	PER_CPU_VAR(cpu_tss_rw + TSS_sp0), %rsp
    
    	pushq	RSP-RDI(%rdi)	/* RSP */
    	pushq	(%rdi)		/* RDI */
    
    	/*
    	 * We are on the trampoline stack.  All regs except RDI are live.
    	 * We can do future final exit work right here.
    	 */
    	SWITCH_TO_USER_CR3_STACK scratch_reg=%rdi
    
    	popq	%rdi
    	popq	%rsp
    	USERGS_SYSRET64
    END(entry_SYSCALL_64)
    

    這裏先保存了很多寄存器到 pt_regs 結構裏面,例如用戶態的代碼段、數據段、保存參數的寄存器.

    然後調用 entry_SYSCALL64_slow_pat->do_syscall_64

    __visible void do_syscall_64(struct pt_regs *regs)
    {
    	struct thread_info *ti = current_thread_info();
    	unsigned long nr = regs->orig_ax;	/*取出系統調用號*/
    	int ret;
    
    	enter_from_user_mode();
    	enable_local_irqs();
    
    	ret = ipipe_handle_syscall(ti, nr & __SYSCALL_MASK, regs);
    	if (ret > 0) {
    		disable_local_irqs();
    		return;
    	}
    	if (ret < 0)
    		goto done;
    	......
    	if (likely((nr & __SYSCALL_MASK) < NR_syscalls)) {
    		nr = array_index_nospec(nr & __SYSCALL_MASK, NR_syscalls);
    		regs->ax = sys_call_table[nr](
    			regs->di, regs->si, regs->dx,
    			regs->r10, regs->r8, regs->r9);
    	}
    done:
    	syscall_return_slowpath(regs);
    }
    

    與32位一樣,ipipe攔截了系統調用,後面的處理流程類似所以,無論是 32 位,還是 64 位,都會到linux系統調用表 sys_call_table和xenomai系統調用表cobalt_syscalls[] 這裏來。

    五、 實時系統調用表cobalt_syscalls

    xenomai每個系統的系統系統調用號在\cobalt\uapi\syscall.h中:

    
    #define sc_cobalt_bind				0
    #define sc_cobalt_thread_create			1
    #define sc_cobalt_thread_getpid			2
    	......
    #define sc_cobalt_extend			96
    

    bind()函數在內核代碼中對應的聲明和實現為:

    /*聲明*/
    #define COBALT_SYSCALL_DECL(__name, __args)	\
    	long CoBaLt_ ## __name __args
    static COBALT_SYSCALL_DECL(bind, lostage,
    		      (struct cobalt_bindreq __user *u_breq));
    /*實現*/
    #define COBALT_SYSCALL(__name, __mode, __args)	\
    	long CoBaLt_ ## __name __args
    static COBALT_SYSCALL(bind, lostage,
    		      (struct cobalt_bindreq __user *u_breq)){......}
    
    

    其中__name表示系統調用名對應bind、__mode表示該系統調用模式對應lostage。COBALT_SYSCALL展開定義的bind函數后如下:

    long CoBaLt_bind(struct cobalt_bindreq __user *u_breq){......}
    

    怎麼將CoBaLt_bind與系統調用號sc_cobalt_bind聯繫起來後放入cobalt_syscalls[]的呢?
    在編譯過程中Makefile使用腳本gen-syscall-entries.sh處理各個.c文件中的COBALT_SYSCALL宏,生成一個頭文件syscall_entries.h,裏面是對每個COBALT_SYSCALL宏處理后后的項,以上面COBALT_SYSCALL(bind,...)為例syscall_entries.h中會生成如下兩項,第一項為系統調用入口,第二項為系統調用的模式:

    #define __COBALT_CALL_ENTRIES __COBALT_CALL_ENTRY(bind)
    #define __COBALT_CALL_MODES	__COBALT_MODE(lostage)
    

    實時系統調用表cobalt_syscalls[]定義在文件kernel\cobalt\posix\syscall.c中:

    #define __syshand__(__name)	((cobalt_syshand)(CoBaLt_ ## __name))
    
    #define __COBALT_NI	__syshand__(ni)
    
    #define __COBALT_CALL_NI				\
    	[0 ... __NR_COBALT_SYSCALLS-1] = __COBALT_NI,	\
    	__COBALT_CALL32_INITHAND(__COBALT_NI)
    
    #define __COBALT_CALL_NFLAGS				\
    	[0 ... __NR_COBALT_SYSCALLS-1] = 0,		\
    	__COBALT_CALL32_INITMODE(0)
    
    #define __COBALT_CALL_ENTRY(__name)				\
    	[sc_cobalt_ ## __name] = __syshand__(__name),		\
    	__COBALT_CALL32_ENTRY(__name, __syshand__(__name))
    
    #define __COBALT_MODE(__name, __mode)	\
    	[sc_cobalt_ ## __name] = __xn_exec_##__mode,
    	
    #include "syscall_entries.h"		/*該頭文件由腳本生成*/
    
    static const cobalt_syshand cobalt_syscalls[] = {
    	__COBALT_CALL_NI
    	__COBALT_CALL_ENTRIES
    };
    
    static const int cobalt_sysmodes[] = {
    	__COBALT_CALL_NFLAGS
    	__COBALT_CALL_MODES
    };
    

    __COBALT_CALL_NI宏表示數組空間大小為__NR_COBALT_SYSCALLS(128),每一項由__COBALT_CALL_ENTRIES定義,即腳本頭文件syscall_entries.h中生成的每一項來填充:

    #define __COBALT_CALL_ENTRY(__name)				\
    	[sc_cobalt_ ## __name] = __syshand__(__name),		\
    	__COBALT_CALL32_ENTRY(__name, __syshand__(__name))
    

    __COBALT_CALL32_ENTRY是定義兼容的系統調用,宏展開如下,相當於在數組的多個位置定義包含了同一項CoBaLt_bind

    #define __COBALT_CALL32_ENTRY(__name, __handler)	\
    	__COBALT_CALL32x_ENTRY(__name, __handler)	\
    	__COBALT_CALL32emu_ENTRY(__name, __handler)
    
    #define __COBALT_CALL32emu_ENTRY(__name, __handler)		\
    			[sc_cobalt_ ## __name + 256] = __handler,
    #define __COBALT_CALL32x_ENTRY(__name, __handler)		\
    		[sc_cobalt_ ## __name + 128] = __handler,
    

    最後bind系統調用在cobalt_syscalls[]中如下

    static const cobalt_syshand cobalt_syscalls[] = {
    	[sc_cobalt_bind] = CoBaLt_bind,
        [sc_cobalt_bind + 128] = CoBaLt_bind,   /*x32 support */
        [sc_cobalt_bind + 256] = CoBaLt_bind,   /*ia32 emulation support*/
    	.....
    };
    

    相應的數組cobalt_sysmodes[]中的內容如下:

    static const int cobalt_sysmodes[] = {
    	[sc_cobalt_bind] = __xn_exec_bind,
        [sc_cobalt_bind + 256] = __xn_exec_lostage, /*x32 support */
        [sc_cobalt_bind + 128] = __xn_exec_lostage, /*ia32 emulation support*/
        ......
    };
    

    六、實時系統調用權限控制cobalt_sysmodes

    上面說到,ipipe管理應用的系統調用時需要分清該系統調用是否合法,是否需要域切換等等。cobalt_sysmodes[]就是每個系統調用對應的模式,控制着每個系統調用的調用路徑。系統調用號為下標,值為具體模式。每個系統調用的sysmode如何生成見上一節,還是以實時應用的bind系統調用為例:

    static const int cobalt_sysmodes[] = {
    	[sc_cobalt_bind] = __xn_exec_bind,
        [sc_cobalt_bind + 256] = __xn_exec_lostage, /*x32 support */
        [sc_cobalt_bind + 128] = __xn_exec_lostage, /*ia32 emulation support*/
        ......
    };
    

    xenomai中所有的系統調用模式定義如下:

    /*xenomai\posix\syscall.c*/
    #define __xn_exec_lostage    0x1	/*必須在linux域運行該系統調用*/	
    #define __xn_exec_histage    0x2	/*必須在Xenomai域運行該系統調用*/	
    #define __xn_exec_shadow     0x4		/*影子系統調用:必須映射調用方*/
    #define __xn_exec_switchback 0x8 	/*切換回切換; 調用者必須返回其原始模式*/
    #define __xn_exec_current    0x10		/*在不管域直接執行。*/
    #define __xn_exec_conforming 0x20  	/*在兼容域(Xenomai或Linux)中執行*/
    #define __xn_exec_adaptive   0x40	/* 先直接執行如果返回-ENOSYS,則嘗試在相反的域中重新執行系統調用 */
    #define __xn_exec_norestart  0x80  /*收到信號后不要重新啟動syscall*/
     /*Shorthand初始化系統調用的簡寫*/
    #define __xn_exec_init       __xn_exec_lostage 
    /*Xenomai空間中shadow系統調用的簡寫*/
    #define __xn_exec_primary   (__xn_exec_shadow|__xn_exec_histage) 
    /*Linux空間中shadow系統調用的簡寫*/
    #define __xn_exec_secondary (__xn_exec_shadow|__xn_exec_lostage)
    /*Linux空間中syscall的簡寫,如果有shadow則切換回linux*/
    #define __xn_exec_downup    (__xn_exec_lostage|__xn_exec_switchback)
    /* 主域系統不可重啟調用的簡寫 */
    #define __xn_exec_nonrestartable (__xn_exec_primary|__xn_exec_norestart)
    /*域探測系統調用簡寫*/
    #define __xn_exec_probing   (__xn_exec_conforming|__xn_exec_adaptive)
    /*將模式選擇移交給syscall。*/
    #define __xn_exec_handover  (__xn_exec_current|__xn_exec_adaptive)
    

    使用一個無符號32 位數的每一位來表示一種模式,各模式註釋已經很清楚,不在解釋,後面文章解析ipipe是如何根據mode來處理的。

    參考

    英特爾® 64 位和 IA-32 架構軟件開發人員手冊第 3 卷 :系統編程指南
    極客時間專欄-趣談Linux操作系統
    《linux內核源代碼情景分析》

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    南投搬家公司費用需注意的眉眉角角,別等搬了再說!

    ※教你寫出一流的銷售文案?

    聚甘新

  • 如何只用5分鐘完成數據 列表、創建頁面

    如何只用5分鐘完成數據 列表、創建頁面

    前言

    我們當然希望能夠更快的完成我們的工作,這樣我們才能有更多的時間做其他的事情,比如說測試、學習、放鬆。

    背景

    軟件一般也就這麼幾個方面的工作要做,增、刪、改、查。如果歸結到頁面上來說,那麼無非也就這麼幾個頁面Form頁面(增)、列表頁面(查、刪)、編輯頁面(改)。很大程度上,你的項目就是由不同的實體的這麼幾個頁面組裝起來的。既然他們都是這麼幾個頁面,那麼,我們是不是可以考慮針對這幾個頁面進行抽象呢?然後使用數據描述這幾個頁面的行為。

    效果

    經典倒敘,先上效果圖

    列表頁面

    創建頁面

    目前就簡單實現了列表頁面和創建頁面。編輯頁面,跟創建頁面太像了。暫時還沒有實現相關內容,不過,這個不是很重要了。

    實現過程

    需求分析

    其實,每個頁面都是存在固定的路數的。

    比如說:

    列表頁面裡邊主要存在這麼幾個參數:列表名、列表頭上的按鈕、列表的表頭、列表內容、列表每一行中的操作、分頁控件。

    表單頁面列表主要存在這麼幾個參數:表單名、表單內容項。

    主要的參數出現的位置都是固定的。但是什麼地方出現什麼內容則是可以變化的,一般情況下,我們都是通過代碼,一遍一遍的重寫這些頁面,然後來達到不同的應用之間的變化的目的。其實我們是可以通過數據來描述他們的。比如說向下面這樣。

    列表頁面的定義

    Form表單頁面的定義

    原始數據的定義

    然後將這些定義好的屬性通過後端渲染到頁面上。

    就可以達到,前邊展示的這種效果了。

    數據存儲

    因為數據類型是自定義的,所以數據存儲的字段也是可以自己隨便預設的。然後系統就可以直接支持這一數據類型。在這個Demo裡邊,我是簡單粗暴的使用了文件存儲Json文件的方式來進行保存的數據。

    其實應該鏈接數據庫的。不過我在Demo項目裡邊留下了相關的接口,只要再實現一個數據庫版本的實例就可以無縫對接了。

    其實

    當然了這隻是他的最初級的形態,因為現在寫的配置文件都是通過手寫來實現,將來可以做一個編輯器。並且可以實時看到調整過的效果。

    其實這個做法,是來源於PaaS項目中的一個很小很小的功能塊。真正的PaaS項目這一整套東西都是在線上直接編輯看效果的。

    最後

    系列

    這個項目將來會融入到我寫的PaaS Demo中作為前端展示部分。 系列的目錄在 https://juejin.im/post/5eca2a186fb9a047e96b2884 這個部分會一點點完善。

    開源

    雖然東西不大,但是還是希望能給你一點點啟發。 項目地址 https://gitee.com/anxin1225/Dov.GenericWeb

    簡單的體驗

    部署到雲端了,可以簡單體驗一下。

    http://gw.ash50p.com/Generic/Meeting.Record/List

    轉載莫忘原文地址:https://juejin.im/post/5eeb85b8e51d45740850f755

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※想知道最厲害的網頁設計公司“嚨底家”!

    ※別再煩惱如何寫文案,掌握八大原則!

    ※產品缺大量曝光嗎?你需要的是一流包裝設計!

    聚甘新

  • 80386學習(五) 80386分頁機制與虛擬內存

    80386學習(五) 80386分頁機制與虛擬內存

    一. 頁式內存管理介紹

      80386能夠將內存分為不同屬性的段,並通過段描述符、段表以及段選擇子等機制,通過段基址和段內偏移量計算出線性地址進行訪問,這一內存管理方式被稱為段式內存管理

      這裏要介紹的是另一種內存管理的方式:80386在開啟了分頁機制后,便能夠將物理內存劃分為一個個大小相同且連續的物理內存頁,訪問時通過物理內存頁號和頁內偏移計算出最終需要訪問的線性地址進行訪問,由於內存管理單元由段變成了頁,因此這一內存管理方式被稱為頁式內存管理

      80386的分頁機制只能在保護模式下開啟。

    為什麼需要頁式內存管理?

      在介紹80386分頁機制前,需要先理解為什麼CPU在管理內存時,要在段式內存管理的基礎上再引入一種有很大差異的頁式內存管理方式?頁式內存管理與純段式內存管理相比到底具有哪些優點?

      一個很重要的原因是為了解決多任務環境下,段式內存管理中多任務的創建與終止時會產生較多內存碎片,使得內存空間使用率不高的問題

      內存碎片分為外碎片和內碎片兩種。

    外碎片

      對於指令和數據的訪問通常都是連續的,所以需要為一個任務分配連續的內存空間。在段式內存管理中,通常為任務分配一個完整的內存段,或是按照任務內段功能的不同,分配包括代碼段、數據段和堆棧段在內的多個完整連續段空間。支持多道任務的系統分配的內存空間,會在某些任務退出並釋放內存時,產生外部內存碎片。

      舉個例子,假設當前存在10MB的內存空間,存在A/B/C/D四個任務,併為每個任務分配一整塊的內存空間,其所佔用的內存空間分別為3MB/2MB/4MB/1MB,如下圖所示(一個格子代表1MB內存)。

      當任務B和任務D執行完成后,所佔用的內存空間被釋放,10MB的內存空間中出現了3MB大小的空閑內存。如果此時出現了一個任務E,需要為其分配3MB的內存空間,此時內存雖然存在3MB的內存空間,卻由於空閑內存的不連續,碎片化,導致無法直接分配給任務E使用。而這裏任務B、任務D結束后釋放的空餘內存空間就被視為外碎片。

      這裏的例子任務數量少且內存空間也很小。而在實際的32位甚至64位的系統中,物理內存空間少則4GB,多則幾十甚至上百GB,由於任務內存的反覆分配和釋放,導致出現的外碎片的數量及浪費的內存空間會很多,很大程度上降低了內存空間的利用率。

      雖然理論上能夠通過操作系統小心翼翼的挪動內存,使得外碎片能夠拼接為連續的大塊,得以被有效利用(內存緊縮)。但是操作系統挪動、複製內存本身很佔用CPU資源,且存在對指令進行地址重定位、暫時暫停對所挪動內存區域的訪問等附加問題,造成的效率降低程度幾乎是不可忍受的,因此這一解決方案並沒有被廣泛使用。

      

    內碎片

      外碎片指的是不同任務內存之間的碎片,而內碎片指的是一個任務內產生的內存碎片。

      通常操作系統為了管理多任務環境下的物理內存,會將內存分隔為固定大小的分區,使用系統表記錄對應分區內存的使用情況(如是否已分配等)。分區的大小必須適當,如果分區過小,則相同物理內存大小下,系統表項過多使得所佔用的空間過大;可如果分區過大,則會產生過大的內碎片,造成不必要的內存空間浪費。

      以上述介紹外碎片的數據為例,系統中的內存分區固定大小為1MB,其中為任務C分配了4個內存分區,共4MB大小。可實際上任務C實際只需要3.5MB的空間即可滿足需求,但由於分區是內存管理的最小單元,只能為任務分配整數個的內存分區。3個分區3MB並不滿足任務C的3.5MB的內存需求,因此只能分配4個分區給任務C。而這裏任務C額外多佔用的0.5MB內存就是內碎片。 

      內碎片就是已經被分配出去,卻不能被有效利用的內存空間。

    80386是如何解決內存碎片問題的?

    外碎片的解決

      外碎片問題產生的主要原因是程序所需要分配的內存空間是連續的。為此,80386提供了分頁機制,使得最終分配給任務的物理內存空間可以不連續。如果任務所使用的內存不必連續,前面外碎片例子中提到的任務E就能夠在1MB+2MB的離散物理內存上正常運行,外碎片問題自然就得到了解決。

    內碎片的解決

      內碎片從本質上來說是很難完全避免的(內存管理最小單元不能過小),主要的問題在於前面提到的內存分區管理單元大小的較優值不好確定。開啟了分頁管理的80386,允許將物理內存分割最小為4KB固定大小的管理單元,這個固定大小的內存管理單元被稱為頁,並由專門的被稱為頁表的數據結構來追蹤內存頁的使用情況。

      對於頁表項過多的問題,80386的設計者提供了多級頁表機制,減少了頁表所佔用的空間。

      對於內碎片過大的問題,由於80386所運行的任務所佔用的內存段一般遠大於一個內存頁的大小,因此頁機制下所產生的內部碎片是十分有限的,可以達到一個令人滿意的內存使用率。

    二. 虛擬內存簡單介紹 

      為了解決應用程序高速增長的內存需求與物理內存增加緩慢的矛盾,計算機科學家們提供了虛擬內存的概念。使用了虛擬內存的系統,可以使得系統內運行的程序所佔用的內存空間總量,遠大於實際物理內存的容量。

      能夠實現虛擬內存的關鍵在於程序在特定時刻所需要訪問的內存地址是符合局部性原理的。通過操作系統和硬件的緊密配合,能夠將任務暫時不需要訪問的內存交換到外部硬盤中,而將物理內存留給真正需要訪問的那部分內存(工作集內存)。

      虛擬內存和分頁機制是一對好搭檔,分頁機制提供了管理內存的基本單位:頁,80386的頁式虛擬內存實現在工作集內存調度時也依賴分頁機制提供的頁來進行。隨着程序的執行,程序的工作集內存在動態變化,當CPU檢測到當前所訪問的內存頁不在物理內存中時,便會通知操作系統(內存缺頁異常),操作系統的缺頁異常處理程序會將硬盤交換區中的對應內存頁數據寫回物理內存。如果物理內存頁已經滿了的情況下,則還需要根據某種算法將另一個物理內存頁替換,來容納這一換入的內存頁。

    三. 80386分頁機制原理

      在介紹分頁機制原理之前,需要先理解關於80386保護模式下32位內存尋址時幾種地址的概念。

    物理地址(Physical Address):

      物理地址就是32位的地址總線所對應的真實的硬件存儲空間。對於物理內存的訪問,無論中間會經過多少次轉換,最終必須轉換為最終的物理地址進行訪問。

    邏輯地址(Logical Address):

      在80386保護模式的程序指令中,對內存的訪問是由段選擇子和段內偏移決定的。段選擇子+段內偏移 –> 邏輯地址。

    線性地址(Linear Address):

      CPU在內存尋址時,從指令中獲得段選擇子和段內偏移,即邏輯地址。由段選擇子在段表(GDT或LDT)中找到對應的段描述符,獲取段基址。段基址+段內偏移決定線性地址。

      如果沒有開啟分頁,CPU就使用生成的線性地址直接作為最終的物理地址進行訪問;如果開啟了分頁,則還需要通過頁表等機制,將線性地址進一步處理才能生成物理地址進行訪問。

    頁式虛擬內存實現原理

      程序要求訪問一個段時,其線性地址必須是連續的。在純粹的段式內存管理中,線性地址等於物理地址的情況下,就會出現外碎片的問題。而在段式內存管理的基礎上,80386如果還開啟了頁機制,就能通過抽象出一層線性地址到物理地址的映射,使得最終分配給程序的物理內存段不必連續。

      80386中的內存頁大小為4KB,在32位的內存尋址空間中(4GB),存在着0x10000 = 1048576個頁。每個頁對應的起始地址低12位都為0,第一個物理內存頁的物理地址為0x00000000,第二個物理內存頁的物理地址為0x00001000,依此類推,最後一個物理頁的物理地址是0xFFFFF000。

    頁表

      在80386的分頁機制的實現中,是通過頁表來實現線性地址到物理地址映射轉換的。每個任務都有一個自己的頁表記錄著任務的線性地址到物理地址的映射關係。

      開啟了頁機制后的線性地址也被稱為虛擬地址,這是因為線性地址已經不再直接對應真實的物理地址,而是一個不承載真實數據的虛擬內存地址。開啟了分頁機制后,一個任務的虛擬地址空間依然是連續的,但所佔用的物理地址空間卻可以不連續

      頁表保存着被稱為頁表項的數據結構集合,每一個頁表項都記載着一個虛擬內存頁到物理內存頁的映射關係。開啟了頁機制之後,CPU在內存尋址時,在通過段表計算出了線性地址(虛擬地址)后,便可以在連續排布的虛擬地址空間中找到對應的頁表項,通過頁表項獲取虛擬內存頁所對應的物理內存頁地址,進行物理內存的訪問。虛擬地址到物理地址映射的細節會在後面進行展開。

      由於是將不斷變化的虛擬內存頁裝載進相對不變的物理內存頁中,就像畫廊中展示的畫會不斷的更替,但畫框基本不變一樣。為了更好的區分這兩者,頁通常特指虛擬內存頁,而物理內存頁則被稱為頁框。

    頁表項介紹

      頁表項是32位的,其結構如下圖所示。

      

    P位:

      P(Present),存在位。標識當前虛擬內存頁是否存在於物理內存頁中。當P位為1時,表示當前虛擬內存頁存在於物理內存中,可以直接進行訪問。當P位為0時,表示對應的物理內存頁不存在,需要新分配物理內存頁或是從磁盤中將其調度回物理內存。

      分頁模式下的內存尋址,如果CPU發現對應的頁表項P位為0,會引發缺頁異常中斷,操作系統在缺頁異常處理程序中進行對應的處理,以實現虛擬內存。

    RW位:

      RW(Read/Write)位,讀寫位。標識當前頁是否能夠寫入。當RW為1時,代表當前頁可讀可寫;當RW為0時,代表當前頁是只讀的。

    US位:

      US(User/Supervisor)位,用戶/管理位。當US為1時,標識當前頁是用戶級別的,允許所有當前特權級的任務進行訪問。當US為0時,表示當前頁是屬於管理員級別的,只允許當前特權級為0、1、2的任務進行訪問,而當前特權級為3的用戶態任務無法進行訪問。

    PWT位/PCD位:

      PWT(Page-level Write Through)位,頁級通寫位。PWT為1時,表示當前物理頁的高速緩存採用通寫法;PWT為0時,表示當前物理頁的高速緩存採用回寫法。

      PCD(Page-level Cache Disable)位,頁級高速緩存禁止位。PCD為1時,表示訪問當前物理頁禁用高速緩存;PCD為0時,表示訪問當前物理頁時允許使用高速緩存。

      PWT與PCD位的使用,涉及到了80386高速緩存的工作原理與內存一致性問題,限於篇幅不在這裏展開。

    A位:

      A(Access)位,訪問位。A位為1時,代表當前頁曾經被訪問過;A位為0時,代表當前頁沒有被訪問過。

      A位的設置由CPU固件在對應內存頁訪問時自動設置為1,且可以由操作系統在適當的時候通過程序指令重置為0,用以計算內存頁的訪問頻率。通過訪問頻率,操作系統能夠以此作為虛擬內存調度算法中評估的依據,在物理內存緊張的情況下,可以選擇將最少使用的內存頁換出,以減少不必要的虛擬內存頁調度時的磁盤I/O,提高虛擬內存的效率。

    D位:

      D(Dirty)位,臟位。當D位為1時,表示當前頁被寫入修改過;D位為0時,代表當前頁沒有被寫入修改過。

      臟位由CPU在對應內存頁被寫入時自動設置為1。操作系統在進行內存頁調度時,如果發現需要被換出的內存頁D位為1時,則需要將對應物理內存頁數據寫回虛擬頁對應的磁盤交換區,保證磁盤/內存數據的一致性;當發現需要被換出的物理內存頁的D位為0時,表示當前頁自從換入物理內存以來沒有被修改過,和磁盤交換區中的數據一致,便直接將其覆蓋,而不進行磁盤的寫回,減少不必要的I/O以提高效率。

    PAT位:

      PAT(Page Attribute Table),頁屬性表支持位。PAT位的存在使得CPU能夠支持更複雜的,不同頁大小的分頁管理。當PAT=0時,每一頁的大小為4KB;當PAT=1時,每一頁的大小是4MB,或是其它大小(分CPU的情況而定)。

    G位:

      G(Global),全局位。表示當前頁是否是全局的,而不是屬於某一特定任務的。G=1時,表示當前頁是全局的;G=0時,表示當前頁是屬於特定任務的。

      為了加速頁表項的訪問,80386提供了TLB快表,作為頁表訪問的高速緩存。當任務切換時,TLB內所有G=0的非全局頁將會被清除,G=1的全局頁將會被保留。將操作系統內核中關鍵的,頻繁訪問的頁設置為全局頁,使得其能夠一直保存在TLB快表中,加速對其的訪問速度,提高效率。

    AVL位:

      AVL(Avaliable),可用位。和段描述符中的AVL位功能類似,CPU並不使用它,而是提供給操作系統軟件自定義使用。

    頁物理基地址字段:     

      頁物理基地址字段用於標識對應的物理頁,共20位。

      由於32位的80386的頁最小是4KB,而4GB的物理內存被分解為了最多0x10000個4KB的物理頁。20位的頁物理基地址字段作為物理頁的索引標號與每一個具體的物理頁一一對應。通過頁物理基地址字段,便能找到唯一對應的物理內存頁。

    多級頁表

      在32位的CPU中,操作系統可以給每個程序分配至多4GB的虛擬內存空間,如果一個內存頁佔4KB,那麼對應的每個程序的頁表中最多需要存放着0x10000個頁表項來進行映射。即使每個頁表項只佔小小的32位共4個字節(4Byte),這依然是一個不小的內存開銷(0x10000個頁表項的大小為4MB)。

      一個應用程序雖然可以被分配4GB的虛擬內存空間,但實際上可能只使用其中的一小部分,例如40MB的大小。通常程序的堆棧段和數據段都分別位於虛擬內存空間的高低兩端,並隨着程序的執行慢慢的向中間擴展,由於頁表項對應與虛擬地址空間的連續性,這就要求任務在執行時必須完整的定義整張頁表。

      可以看到,一級的平面頁表結構存在着明顯的頁表空間浪費的問題。雖然可以要求應用程序不要一下子就以4GB的內存規格進行編程,而是一開始用較小的內存,並在需要更大內存時梯度的申請更大的內存空間,並重新構造數據段和堆棧段以減少每個任務的無用頁表項空間的浪費。但這將頁表空間優化的繁重任務強加給了應用程序,並不是一個好的解決辦法。

      為此,計算機科學家們提出了多級頁表的方案來解決頁表項過多的問題。多級頁表顧名思義,頁表的結構不再是一個一級的平面結構(一級頁表),而是像一顆樹一樣,由頁目錄項節點頁表項節點組成。目錄節點中保存着下一級節點的物理頁地址等信息,恭弘=叶 恭弘子節點中則包含着真正的頁表項信息。查詢頁表項時,從一級頁目錄節點(根目錄)出發,按照一定的規則可以找到對應的下一級子目錄節點,直到查詢出對應的恭弘=叶 恭弘子節點為止。

      

    80386頁目錄項介紹

      80386採用的是二級頁表的設計,二級頁表由頁目錄表和頁表共同組成。頁目錄表中存放的是頁目錄項,頁目錄項的大小和頁表項一致,為4字節。

      通過80386指令得到的32位線性地址,其中高20位作為頁表項索引,低12位作為頁內偏移地址(4KB大小的物理頁)。如果採用的是一級頁表結構,20位的頁表項索引能直接找到4MB頁表中的對應頁表項。

      而對於80386二級頁表的設計來說,由於一個物理頁大小為4KB,最多可以容納1024(2^10)個頁表項或者頁目錄項,所以將頁表項索引的高10位作為根目錄頁中頁目錄項的索引值,通過頁目錄項中的頁表項物理頁號可以找到對應的頁表物理頁;再根據頁表項索引的后10位找到頁表中對應的頁表項。

      

    80386頁目錄項結構圖

       80386的二級頁表的頁目錄項佔32位,其低12位的含義與頁表項一致。主要區別在於其高20位存放的是下一級頁表的物理頁索引,而不是虛擬地址映射的物理內存頁地址。

      

    頁表基址寄存器

      前面提到過,和LDT一樣,每個任務都擁有着自己獨立的頁表。為此80386CPU提供了一個專門的寄存器用於追蹤定位任務自己的頁表,這個寄存器的名稱叫做頁表基址寄存器(Page Directory Base Register,PDBR),也就是控制寄存器CR3。

      由於80386分頁機制使用的是二級頁表,因此PDBR指向的是二級頁表結構中的頁目錄,通過頁目錄表便能夠間接的訪問整個二級頁表。為了效率其中存放的直接就是頁目錄表的32位物理地址,一般由操作系統負責在任務切換時將新任務對應的頁目錄表預先加載進物理內存。

      由於PDBR是和當前任務有關的,在任務切換時會被新任務TSS中的PDBR字段值所替換,指向新任務的頁目錄表,而舊任務的PDBR的值則在保護現場時被存入對應的TSS中。

    多級頁表是如何解決頁表項浪費問題的?

      以80386的二級頁表設計為例,最大4GB的虛擬內存空間下,無論如何一級頁目錄表是必須存在的。當不需要為應用程序分配過多的內存時,頁目錄表中的頁目錄項所指向的對應頁表可以不存在,即頁目錄項的P位為0,實際不使用的虛擬內存空間將沒有對應的二級頁表節點,相比一級頁表的設計其浪費的內存會少很多。

      假設需要為一個虛擬地址首尾各需要分配20MB,共佔用40MB內存的任務構建對應的頁表。

      1. 如果使用一級頁表,4GB的虛擬內存空間下需要提供0x10000個頁表項,共4MB,頁表的體積達到了任務自身所需40MB內存的10%,但其中絕大多數的頁表項都是沒用的(P位為0),不會對應實際的物理內存,空間效率很低。

      2. 如果使用二級頁表,除了佔一個物理頁4KB大小的頁目錄表是必須存在的外,其頁目錄表中只有首尾兩項的P位為1,分別指向一個實際存在的頁表(二級節點),頁目錄表中間其它的頁目錄項P位都為0,不需要為這些不會使用到的虛擬地址分配頁表。對於這個40MB的程序來說,其頁表只佔了3個物理頁面,共12KB,空間效率相比一級頁表高很多。

    TLB快表

      前面提到了多級頁表所帶來的好處:通過頁表分層,可以減少順序排列的無效頁表項數量,節約內存空間;頁表的層級越多,空間效率也越高。

      計算機領域中,通常並沒有免費的午餐,一個問題的解決,往往會帶來新的問題:多級頁表本質上是一個樹狀結構,每一個節點頁都是離散的,因此每一層級訪問都需要進行一次內存尋址操作,頁表的層級越多,訪問的次數也就越多,虛擬頁地址映射過程也越慢。在32位的80386中,2級頁表下問題還不算特別嚴重;但64位CPU的出現帶來了更大的尋址空間,也需要更多的頁表項,頁表的層級也漸漸的從2級變成了3級、4級甚至更多。頁機制開啟之後,所有的內存尋址都需要經過CPU的頁部件進行轉化才能獲得最終的物理地址,因此這一過程必須要快,不能因為頁表的離散層次訪問就嚴重影響虛擬地址空間到物理地址空間的轉換速度。

      要加快原本相對耗時的查詢操作,一個常用的辦法便是引入緩存。為了加速通用內存的訪問,80386利用局部性原理提供了高速緩存;為了加速多級頁表的頁表項訪問,80386提供了TLB。

      TLB(Translation Lookaside Buffer)直譯為地址轉換後援緩衝器,根據其作用也被稱為頁表緩存或是快表(快速頁表)。TLB中存放着一張表,其中的每一項用於緩存當前任務虛擬頁號和對應頁表項中的關鍵信息,被稱為TLB項。

      TLB的工作原理和高速緩存類似:當CPU訪問某一虛擬頁時,通過虛擬頁號先在TLB中尋找,如果發現對應的TLB項存在,則直接以TLB項中的數據進行物理地址的轉換,這被稱為TLB命中;當發現對應的TLB項不存在時(TLB未命中),則進行內存的訪問,在獲取內存中頁表項數據的同時,也將對應頁表項緩存入TLB中。如果TLB已滿則需要通過某種置換算法選出一個已存在的TLB項將其替換。

      TLB的查詢速度比內存快,但容量相對內存小很多,因此只能緩存數量有限的頁表項。但由於內存訪問的局部性,只要通過合理的設計提高TLB的命中率(通常可以達到90%以上),就能達到很好的效果。 

    四. 80386分頁機制下的內存尋址流程

      下面總結一下開啟了分頁機制的80386是如何進行內存尋址的。

      1. CPU首先從內存訪問指令中獲取段選擇子和段內偏移地址

      2. 根據段選擇子從段表(GDT或LDT)中查詢出對應的段描述符

      3. 根據段描述符中的段基址和指令中的段內偏移地址生成32位的線性地址(頁機制下的虛擬地址)

      4. 32位的線性地址根據80386二級頁表的設計,拆分成三個部分:高10位作為頁目錄項索引,中間次高10位作為頁表項索引,低12位作為頁內偏移地址。

      5. 通過高10位的頁目錄項索引從一級頁目錄表中獲取二級頁表的物理頁地址(通過物理頁框號可得),再根據中間10位的頁表項索引找到對應的物理頁框。根據物理頁框號與頁內偏移地址共同生成最終的物理地址,進行物理內存的訪問。

    五. 總結

      想要通過學習操作系統來更好的理解計算機程序底層的工作原理,基礎的硬件知識是必須要了解的。紙上得來終覺淺,絕知此事要躬行,在理解了基礎原理后,還需要通過實踐來加深對原理知識的理解,而閱讀相關操作系統的實現源碼就是一個很好的將實踐與原理緊密結合的學習方式。

      希望通過對硬件和操作系統的學習能幫助我打開計算機程序底層運行的神秘黑盒子一窺究竟,在思考問題時能夠換一個角度從底層的視角出發,去更好的理解和掌握上層的應用技術,以避免迷失在快速發展的技術浪潮中。

       

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※別再煩惱如何寫文案,掌握八大原則!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

    FB行銷專家,教你從零開始的技巧

    聚甘新

  • 【String註解驅動開發】面試官讓我說說:如何使用FactoryBean向Spring容器中註冊bean?

    寫在前面

    在前面的文章中,我們知道可以通過多種方式向Spring容器中註冊bean。可以使用@Configuration結合@Bean向Spring容器中註冊bean;可以按照條件向Spring容器中註冊bean;可以使用@Import向容器中快速導入bean對象;可以在@Import中使用ImportBeanDefinitionRegistrar向容器中註冊bean。

    項目工程源碼已經提交到GitHub:https://github.com/sunshinelyz/spring-annotation

    FactoryBean概述

    一般情況下,Spring通過反射機制利用bean的class屬性指定實現類來實例化bean 。在某些情況下,實例化bean過程比較複雜,如果按照傳統的方式,則需要在 標籤中提供大量的配置信息,配置方式的靈活性是受限的,這時採用編碼的方式可以得到一個更加簡單的方案。Spring為此提供了一個org.springframework.bean.factory.FactoryBean的工廠類接口,用戶可以通過實現該接口定製實例化bean的邏輯。

    FactoryBean接口對於Spring框架來說佔有重要的地位,Spring 自身就提供了70多個FactoryBean的實現。它們隱藏了實例化一些複雜bean的細節,給上層應用帶來了便利。從Spring 3.0 開始, FactoryBean開始支持泛型,即接口聲明改為FactoryBean 的形式:

    在Spring 5.2.6版本中,FactoryBean接口的定義如下所示。

    package org.springframework.beans.factory;
    import org.springframework.lang.Nullable;
    
    public interface FactoryBean<T> {
    
    	String OBJECT_TYPE_ATTRIBUTE = "factoryBeanObjectType";
    
    	@Nullable
    	T getObject() throws Exception;
    
    	@Nullable
    	Class<?> getObjectType();
    
    	default boolean isSingleton() {
    		return true;
    	}
    }
    
    • T getObject():返回由FactoryBean創建的bean實例,如果isSingleton()返回true,則該實例會放到Spring容器中單實例緩存池中。
    • boolean isSingleton():返回由FactoryBean創建的bean實例的作用域是singleton還是prototype。
    • Class getObjectType():返回FactoryBean創建的bean類型。

    這裏,需要注意的是:當配置文件中 標籤的class屬性配置的實現類是FactoryBean時,通過 getBean()方法返回的不是FactoryBean本身,而是FactoryBean#getObject()方法所返回的對象,相當於FactoryBean#getObject()代理了getBean()方法。

    FactoryBean實例

    首先,創建一個PersonFactoryBean,實現FactoryBean接口,如下所示。

    package io.mykit.spring.plugins.register.bean;
    
    import org.springframework.beans.factory.FactoryBean;
    /**
     * @author binghe
     * @version 1.0.0
     * @description 商品的FactoryBean,測試FactoryBean
     */
    public class PersonFactoryBean implements FactoryBean<Person> {
    
        //返回一個Person對象,這個對象會被註冊到Spring容器中
        @Override
        public Person getObject() throws Exception {
            return new Person();
        }
    
        @Override
        public Class<?> getObjectType() {
            return Person.class;
        }
    
        //bean是否為單例;true:是;false:否
        @Override
        public boolean isSingleton() {
            return true;
        }
    }
    

    接下來,我們在PersonConfig2類中加入PersonFactoryBean的聲明,如下所示。

    @Bean
    public PersonFactoryBean personFactoryBean(){
        return new PersonFactoryBean();
    }
    

    這裏需要小夥伴們注意的是:我在這裏使用@Bean註解向Spring容器中添加的是PersonFactory對象。那我們就來看看Spring容器中有哪些bean。接下來,運行SpringBeanTest類中的testAnnotationConfig7()方法,輸出的結果信息如下所示。

    org.springframework.context.annotation.internalConfigurationAnnotationProcessor
    org.springframework.context.annotation.internalAutowiredAnnotationProcessor
    org.springframework.context.annotation.internalCommonAnnotationProcessor
    org.springframework.context.event.internalEventListenerProcessor
    org.springframework.context.event.internalEventListenerFactory
    personConfig2
    io.mykit.spring.plugins.register.bean.Department
    io.mykit.spring.plugins.register.bean.Employee
    io.mykit.spring.plugins.register.bean.User
    io.mykit.spring.plugins.register.bean.Role
    person
    binghe001
    personFactoryBean
    company
    

    可以看到,結果信息中輸出了一個personFactoryBean,我們看下這個personFactoryBean到底是個什麼鬼!此時,我們對SpringBeanTest類中的testAnnotationConfig7()方法稍加改動,添加獲取personFactoryBean的代碼,並輸出personFactoryBean實例的類型,如下所示。

    @Test
    public void testAnnotationConfig7(){
        ApplicationContext context = new AnnotationConfigApplicationContext(PersonConfig2.class);
        String[] names = context.getBeanDefinitionNames();
        Arrays.stream(names).forEach(System.out::println);
    
        Object personFactoryBean = context.getBean("personFactoryBean");
        System.out.println("personFactoryBean實例的類型為:" + personFactoryBean.getClass());
    }
    

    再次運行SpringBeanTest類中的testAnnotationConfig7()方法,輸出的結果信息如下所示。

    org.springframework.context.annotation.internalConfigurationAnnotationProcessor
    org.springframework.context.annotation.internalAutowiredAnnotationProcessor
    org.springframework.context.annotation.internalCommonAnnotationProcessor
    org.springframework.context.event.internalEventListenerProcessor
    org.springframework.context.event.internalEventListenerFactory
    personConfig2
    io.mykit.spring.plugins.register.bean.Department
    io.mykit.spring.plugins.register.bean.Employee
    io.mykit.spring.plugins.register.bean.User
    io.mykit.spring.plugins.register.bean.Role
    person
    binghe001
    personFactoryBean
    company
    personFactoryBean實例的類型為:class io.mykit.spring.plugins.register.bean.Person
    

    可以看到,雖然我在代碼中使用@Bean註解注入的PersonFactoryBean對象,但是,實際上從Spring容器中獲取到的bean對象卻是調用PersonFactoryBean類中的getObject()獲取到的Person對象。

    看到這裏,是不是有種豁然開朗的感覺!!!

    在PersonFactoryBean類中,我們將Person對象設置為單實例bean,接下來,我們在SpringBeanTest類中的testAnnotationConfig7()方法多次獲取Person對象,並輸出多次獲取的對象是否為同一對象,如下所示。

    @Test
    public void testAnnotationConfig7(){
        ApplicationContext context = new AnnotationConfigApplicationContext(PersonConfig2.class);
        String[] names = context.getBeanDefinitionNames();
        Arrays.stream(names).forEach(System.out::println);
    
        Object personFactoryBean1 = context.getBean("personFactoryBean");
        Object personFactoryBean2 = context.getBean("personFactoryBean");
        System.out.println(personFactoryBean1 == personFactoryBean2);
    }
    

    運行testAnnotationConfig7()方法輸出的結果信息如下所示。

    org.springframework.context.annotation.internalConfigurationAnnotationProcessor
    org.springframework.context.annotation.internalAutowiredAnnotationProcessor
    org.springframework.context.annotation.internalCommonAnnotationProcessor
    org.springframework.context.event.internalEventListenerProcessor
    org.springframework.context.event.internalEventListenerFactory
    personConfig2
    io.mykit.spring.plugins.register.bean.Department
    io.mykit.spring.plugins.register.bean.Employee
    io.mykit.spring.plugins.register.bean.User
    io.mykit.spring.plugins.register.bean.Role
    person
    binghe001
    personFactoryBean
    company
    true
    

    可以看到,在PersonFactoryBean類的isSingleton()方法中返回true時,每次獲取到的Person對象都是同一個對象,說明Person對象是單實例bean。

    這裏,可能就會有小夥伴要問了,如果將Person對象修改成多實例bean呢?別急,這裏我們只需要在PersonFactoryBean類的isSingleton()方法中返回false,即可將Person對象設置為多實例bean,如下所示。

    //bean是否為單例;true:是;false:否
    @Override
    public boolean isSingleton() {
        return false;
    }
    

    再次運行SpringBeanTest類中的testAnnotationConfig7()方法,輸出的結果信息如下所示。

    org.springframework.context.annotation.internalConfigurationAnnotationProcessor
    org.springframework.context.annotation.internalAutowiredAnnotationProcessor
    org.springframework.context.annotation.internalCommonAnnotationProcessor
    org.springframework.context.event.internalEventListenerProcessor
    org.springframework.context.event.internalEventListenerFactory
    personConfig2
    io.mykit.spring.plugins.register.bean.Department
    io.mykit.spring.plugins.register.bean.Employee
    io.mykit.spring.plugins.register.bean.User
    io.mykit.spring.plugins.register.bean.Role
    person
    binghe001
    personFactoryBean
    company
    false
    

    可以看到,最終結果返回了false,說明此時Person對象是多實例bean。

    如何在Spring容器中獲取到FactoryBean對象?

    之前,我們使用@Bean註解向Spring容器中註冊的PersonFactoryBean,獲取出來的確實Person對象。那麼,小夥伴們可能會問:我就想獲取PersonFactoryBean實例,該怎麼辦呢?

    其實,這也很簡單, 只需要在獲取bean對象時,在id前面加上&符號即可

    打開我們的測試類SpringBeanTest,在testAnnotationConfig7()方法中添加獲取PersonFactoryBean實例的代碼,如下所示。

    @Test
    public void testAnnotationConfig7(){
        ApplicationContext context = new AnnotationConfigApplicationContext(PersonConfig2.class);
        String[] names = context.getBeanDefinitionNames();
        Arrays.stream(names).forEach(System.out::println);
    
        Object personFactoryBean1 = context.getBean("personFactoryBean");
        Object personFactoryBean2 = context.getBean("personFactoryBean");
        System.out.println("personFactoryBean1類型:" + personFactoryBean1.getClass());
        System.out.println("personFactoryBean2類型:" + personFactoryBean2.getClass());
        System.out.println(personFactoryBean1 == personFactoryBean2);
    
        Object personFactoryBean3 = context.getBean("&personFactoryBean");
        System.out.println("personFactoryBean3類型:" + personFactoryBean3.getClass());
    }
    

    運行SpringBeanTest類中的testAnnotationConfig7()方法,輸出的結果信息如下所示。

    org.springframework.context.annotation.internalConfigurationAnnotationProcessor
    org.springframework.context.annotation.internalAutowiredAnnotationProcessor
    org.springframework.context.annotation.internalCommonAnnotationProcessor
    org.springframework.context.event.internalEventListenerProcessor
    org.springframework.context.event.internalEventListenerFactory
    personConfig2
    io.mykit.spring.plugins.register.bean.Department
    io.mykit.spring.plugins.register.bean.Employee
    io.mykit.spring.plugins.register.bean.User
    io.mykit.spring.plugins.register.bean.Role
    person
    binghe001
    personFactoryBean
    company
    personFactoryBean1類型:class io.mykit.spring.plugins.register.bean.Person
    personFactoryBean2類型:class io.mykit.spring.plugins.register.bean.Person
    false
    personFactoryBean3類型:class io.mykit.spring.plugins.register.bean.PersonFactoryBean
    

    可以看到,在獲取bean時,在id前面加上&符號就會獲取到PersonFactoryBean實例對象。

    那問題又來了!!為什麼在id前面加上&符號就會獲取到PersonFactoryBean實例對象呢?

    接下來,我們就揭開這個神秘的面紗,打開BeanFactory接口,

    package org.springframework.beans.factory;
    import org.springframework.beans.BeansException;
    import org.springframework.core.ResolvableType;
    import org.springframework.lang.Nullable;
    
    public interface BeanFactory {
    	String FACTORY_BEAN_PREFIX = "&";
        /**************以下省略n行代碼***************/
    }
    

    看到這裏,是不是明白了呢?沒錯,在BeanFactory接口中定義了一個&前綴,只要我們使用bean的id來從Spring容器中獲取bean時,Spring就會知道我們是在獲取FactoryBean本身。

    好了,咱們今天就聊到這兒吧!別忘了給個在看和轉發,讓更多的人看到,一起學習一起進步!!

    項目工程源碼已經提交到GitHub:https://github.com/sunshinelyz/spring-annotation

    寫在最後

    如果覺得文章對你有點幫助,請微信搜索並關注「 冰河技術 」微信公眾號,跟冰河學習Spring註解驅動開發。公眾號回復“spring註解”關鍵字,領取Spring註解驅動開發核心知識圖,讓Spring註解驅動開發不再迷茫。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※別再煩惱如何寫文案,掌握八大原則!

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※超省錢租車方案

    ※教你寫出一流的銷售文案?

    網頁設計最專業,超強功能平台可客製化

    聚甘新

  • Netty中的這些知識點,你需要知道!

    Netty中的這些知識點,你需要知道!

    一、Channel

    Channel是一個接口,而且是一個很大的接口,我們稱之為“大而全”,囊括了server端及client端接口所需要的接口。

    Channel是一個門面,封裝了包括網絡I/O及相關的所有操作。

    Channel聚合了包括網絡讀寫、鏈路管理、網絡連接信息、獲取EventLoop、Pipeline等相關功能類;統一分配,調度實現相應場景的功能。

    一個Channel 對應一個物理連接,是基於物理連接上的操作包裝。

    二、EventLoop

    EventLoop,Event意為事件、Loop意為環,EventLoo即為事件環

    EventLoop是一種程序設計結構等待以及分發事件。

    NioEventLoop,是一個Netty工作線程,又不僅僅是一個Netty工作線程。

    標準的netty線程模型 中我們講過Netty的標準線程池模型,池子里的每個線程對象就是一個NioEventLoop對象。或負責接受連接,或負責網絡I/O

    說它不僅僅是一個Netty線程,因為它實現了很多功能,我們可以看下它的繼承圖:

    它的上方有兩個枝丫,一個線程屬性,一個EventLoop,它是Netty的Reactor線程

    既然是Reactor線程,那麼首先我們需要一個多路復用器。在Netty NioEventLoop中,包就含一個 Selector,它的操作對象是Channel。

    NioEventLoop的主要邏輯在它的run()方法,方法體內是一個無限循環 for (;;),循環體內實現Loop功能。這也是通用的NIO線程實現方式。

     

    Loop 從任務隊列里獲取任務,然後檢查多路復用器中就緒的Channel進行處理。

    三、Unsafe

    Netty中的Unsafe,一個Channel內部聚合接口,用以處理實際的網絡I/O讀寫。當然,取Unsafe命名,源碼中釋義:提供的網絡相關的操作方法,永遠不應該被開發人員操作使用。

    它是Channel的一個輔助接口,主要方法:

    1、register:註冊Channel

    2、deregister:取消註冊

    3、bind:綁定地址,服務端綁定監聽特定端口;客戶端指定本地綁定Socket地址。

    4、connect:建立連接

    5、disconnect:斷開連接

    6、close:關閉連接

    7、write:調度寫,將數據寫入buffer,並未真正進入Channel

    8、flush:將緩衝區中的數據寫入Channel

    四、AdaptiveRecvByteBufAllocator

    動態緩衝區分配器,源碼說明:根據實時的反饋動態的增加或者減少預需的緩衝區大小。

    如果一次分配的緩衝區被填滿了,則調高下一次分配的緩衝區大小。

    如果連續兩次實際使用的容量低於分配的緩衝區大小特定比例,則減小下一次分配的緩衝區大小。

    其它情景,保持分配大小不變。

    Netty的這種“智能化”處理,可以說是相當有用的:

    1、首先,實際的應用場景千差萬別,同一場景下不同時刻的緩衝區需求也是實時變化(一句話可以是一個字,也可能是1000個字),這就需要Netty動態調整緩衝分配大小以適應不同的業務場景,時刻場景

    2、其次,過大的不必要的內存分配,會導致Buffer處理性能下降;過小的內存分配,則會導致頻繁的分配釋放。這都是一個優良的網絡框架不應該有的。 

    3、最後,動態的調整最直接的好處就是內存的的高效使用,一定程度上做到了按需分配。 

    五、ChannelPipeline

    Pipeline 管道,Channel的數據流通管道,在這個管道中,可以做很多事情。

    ChannelPipeline 是一種職責鏈,可以對其中流動的數據進行過濾、攔截處理,是一種插拔式的鏈路裝配器

    1、ChannelPipline是一個容器

    支持查詢、添加、刪除、替換等容器操作。

    2、ChannelPipline支持動態的添加和刪除 Handler

    ChannelPipline的這種特性給了我們相當的想象空間,例如動態的添加系統擁塞保護Handler,敏感數據過濾Handler、日誌記錄Handler、性能統計Handler等。

    3、ChannelPipline 是線程安全的

    ChannelPipline使用 synchronized 實現線程安全,業務線程可以併發的操作ChannelPipline。但需要注意的是,Handler是非線程安全的

    六、HandlerAdapter

    Adapter是一種適配器,對於用戶自定義的Handler,可以通過繼承HandlerAdapter,來規避不必要的接口實現

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※教你寫出一流的銷售文案?

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※回頭車貨運收費標準

    ※別再煩惱如何寫文案,掌握八大原則!

    ※超省錢租車方案

    ※產品缺大量曝光嗎?你需要的是一流包裝設計!

    聚甘新

  • 【原創】強擼 .NET Redis Cluster 集群訪問組件

    【原創】強擼 .NET Redis Cluster 集群訪問組件

      Hello 大家好,我是TANZAME,我們又見面了。今天我們來聊聊怎麼手擼一個 Redis Cluster 集群客戶端,純手工有乾貨,您細品。

      隨着業務增長,線上環境的QPS暴增,自然而然將當前的單機 Redis 切換到群集模式。燃鵝,我們悲劇地發現,ServiceStack.Redis這個官方推薦的 .NET 客戶端並沒有支持集群模式。一通度娘翻牆無果后,決定自己強擼一個基於ServiceStack.Redis的Redis集群訪問組件。

      話不多說,先上運行效果圖:

     

      Redis-Cluster集群使用 hash slot 算法對每個key計算CRC16值,然後對16383取模,可以獲取key對應的 hash slot。Redis-Cluster中每個master都會持有部分 slot,在訪問key時根據計算出來的hash slot去找到具體的master節點,再由當前找到的節點去執行具體的 Redis 命令(具體可查閱官方說明文檔)。

      由於 ServiceStack.Redis已經實現了單個實例的Redis命令,因此我們可以將即將要實現的 Redis 集群客戶端當做一個代理,它只負責計算 key 落在哪一個具體節點(尋址)然後將Redis命令轉發給對應的節點執行即可。

      ServiceStack.Redis的RedisClient是非線程安全的,ServiceStack.Redis 使用緩存客戶端管理器(PooledRedisClientManager)來提高性能和併發能力,我們的Redis Cluster集群客戶端也應集成PooledRedisClientManager來獲取 RedisClient 實例。

      同時,Redis-Cluster集群支持在線動態擴容和slot遷移,我們的Redis集群客戶端也應具備自動智能發現新節點和自動刷新 slot 分佈的能力。

      總結起來,要實現一個Redis-Cluster客戶端,需要實現以下幾個要點:

    • 根據 key 計算 hash slot
    • 自動讀取群集上所有的節點信息
    • 為節點分配緩存客戶端管理器
    • 將 hash slot 路由到正確的節點
    • 自動發現新節點和自動刷新slot分佈

      如下面類圖所示,接下來我們詳細分析具體的代碼實現。

      

      一、CRC16  

      CRC即循環冗餘校驗碼,是信息系統中一種常見的檢錯碼。CRC校驗碼不同的機構有不同的標準,這裏Redis遵循的標準是CRC-16-CCITT標準,這也是被XMODEM協議使用的CRC標準,所以也常用XMODEM CRC代指,是比較經典的“基於字節查表法的CRC校驗碼生成算法”。 

     1 /// <summary>
     2 /// 根據 key 計算對應的哈希槽
     3 /// </summary>
     4 public static int GetSlot(string key)
     5 {
     6     key = CRC16.ExtractHashTag(key);
     7     // optimization with modulo operator with power of 2 equivalent to getCRC16(key) % 16384
     8     return GetCRC16(key) & (16384 - 1);
     9 }
    10 
    11 /// <summary>
    12 /// 計算給定字節組的 crc16 檢驗碼
    13 /// </summary>
    14 public static int GetCRC16(byte[] bytes, int s, int e)
    15 {
    16     int crc = 0x0000;
    17 
    18     for (int i = s; i < e; i++)
    19     {
    20         crc = ((crc << 8) ^ LOOKUP_TABLE[((crc >> 8) ^ (bytes[i] & 0xFF)) & 0xFF]);
    21     }
    22     return crc & 0xFFFF;
    23 }

     

      二、讀取集群節點

      從集群中的任意節點使用 CLUSTER NODES 命令可以讀取到集群中所有的節點信息,包括連接狀態,它們的標誌,屬性和分配的槽等等。CLUSTER NODES 以串行格式提供所有這些信息,輸出示例:

    d99b65a25ef726c64c565901e345f98c496a1a47 127.0.0.1:7007 master - 0 1592288083308 8 connected
    2d71879d6529d1edbfeed546443051986245c58e 127.0.0.1:7003 master - 0 1592288084311 11 connected 10923-16383
    654cdc25a5fa11bd44b5b716cdf07d4ce176efcd 127.0.0.1:7005 slave 484e73948d8aacd8327bf90b89469b52bff464c5 0 1592288085313 10 connected
    ed65d52dad7ef6854e0e261433b56a551e5e11cb 127.0.0.1:7004 slave 754d0ec7a7f5c7765f784a6a2c370ea38ea0c089 0 1592288081304 9 connected
    754d0ec7a7f5c7765f784a6a2c370ea38ea0c089 127.0.0.1:7001 master - 0 1592288080300 9 connected 0-5460
    484e73948d8aacd8327bf90b89469b52bff464c5 127.0.0.1:7002 master - 0 1592288082306 10 connected 5461-10922
    2223bc6d099bd9838e5d2f1fbd9a758f64c554c4 127.0.0.1:7006 myself,slave 2d71879d6529d1edbfeed546443051986245c58e 0 0 6 connected
    

      每個字段的含義如下:

      1. id:節點 ID,一個40個字符的隨機字符串,當一個節點被創建時不會再發生變化(除非CLUSTER RESET HARD被使用)。

      2. ip:port:客戶端應該聯繫節點以運行查詢的節點地址。

      3. flags:逗號列表分隔的標誌:myselfmasterslavefail?failhandshakenoaddrnoflags。標誌在下一節詳細解釋。

      4. master:如果節點是從屬節點,並且主節點已知,則節點ID為主節點,否則為“ – ”字符。

      5. ping-sent:以毫秒為單位的當前激活的ping發送的unix時間,如果沒有掛起的ping,則為零。

      6. pong-recv:毫秒 unix 時間收到最後一個乒乓球。

      7. config-epoch:當前節點(或當前主節點,如果該節點是從節點)的配置時期(或版本)。每次發生故障切換時,都會創建一個新的,唯一的,單調遞增的配置時期。如果多個節點聲稱服務於相同的哈希槽,則具有較高配置時期的節點將獲勝。

      8. link-state:用於節點到節點集群總線的鏈路狀態。我們使用此鏈接與節點進行通信。可以是connecteddisconnected

      9. slot:散列槽號或範圍。從參數9開始,但總共可能有16384個條目(限制從未達到)。這是此節點提供的散列槽列表。如果條目僅僅是一個数字,則被解析為這樣。如果它是一個範圍,它是在形式start-end,並且意味着節點負責所有散列時隙從startend包括起始和結束值。

    標誌的含義(字段編號3):

    • myself:您正在聯繫的節點。
    • master:節點是主人。
    • slave:節點是從屬的。
    • fail?:節點處於PFAIL狀態。對於正在聯繫的節點無法訪問,但仍然可以在邏輯上訪問(不處於FAIL狀態)。
    • fail:節點處於FAIL狀態。對於將PFAIL狀態提升為FAIL的多個節點而言,這是無法訪問的。
    • handshake:不受信任的節點,我們握手。
    • noaddr:此節點沒有已知的地址。
    • noflags:根本沒有標誌。
      1 // 讀取集群上的節點信息
      2 static IList<InternalClusterNode> ReadClusterNodes(IEnumerable<ClusterNode> source)
      3 {
      4     RedisClient c = null;
      5     StringReader reader = null;
      6     IList<InternalClusterNode> result = null;
      7 
      8     int index = 0;
      9     int rowCount = source.Count();
     10 
     11     foreach (var node in source)
     12     {
     13         try
     14         {
     15             // 從當前節點讀取REDIS集群節點信息
     16             index += 1;
     17             c = new RedisClient(node.Host, node.Port, node.Password);
     18             RedisData data = c.RawCommand("CLUSTER".ToUtf8Bytes(), "NODES".ToUtf8Bytes());
     19             string info = Encoding.UTF8.GetString(data.Data);
     20 
     21             // 將讀回的字符文本轉成強類型節點實體
     22             reader = new StringReader(info);
     23             string line = reader.ReadLine();
     24             while (line != null)
     25             {
     26                 if (result == null) result = new List<InternalClusterNode>();
     27                 InternalClusterNode n = InternalClusterNode.Parse(line);
     28                 n.Password = node.Password;
     29                 result.Add(n);
     30 
     31                 line = reader.ReadLine();
     32             }
     33 
     34             // 只要任意一個節點拿到集群信息,直接退出
     35             if (result != null && result.Count > 0) break;
     36         }
     37         catch (Exception ex)
     38         {
     39             // 出現異常,如果還沒到最後一個節點,則繼續使用下一下節點讀取集群信息
     40             // 否則拋出異常
     41             if (index < rowCount)
     42                 Thread.Sleep(100);
     43             else
     44                 throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex);
     45         }
     46         finally
     47         {
     48             if (reader != null) reader.Dispose();
     49             if (c != null) c.Dispose();
     50         }
     51     }
     52 
     53 
     54     if (result == null)
     55         result = new List<InternalClusterNode>(0);
     56     return result;
     57 }
     58 
     59 /// <summary>
     60 /// 從 cluster nodes 的每一行命令里讀取出集群節點的相關信息
     61 /// </summary>
     62 /// <param name="line">集群命令</param>
     63 /// <returns></returns>
     64 public static InternalClusterNode Parse(string line)
     65 {
     66     if (string.IsNullOrEmpty(line))
     67         throw new ArgumentException("line");
     68 
     69     InternalClusterNode node = new InternalClusterNode();
     70     node._nodeDescription = line;
     71     string[] segs = line.Split(' ');
     72 
     73     node.NodeId = segs[0];
     74     node.Host = segs[1].Split(':')[0];
     75     node.Port = int.Parse(segs[1].Split(':')[1]);
     76     node.MasterNodeId = segs[3] == "-" ? null : segs[3];
     77     node.PingSent = long.Parse(segs[4]);
     78     node.PongRecv = long.Parse(segs[5]);
     79     node.ConfigEpoch = int.Parse(segs[6]);
     80     node.LinkState = segs[7];
     81 
     82     string[] flags = segs[2].Split(',');
     83     node.IsMater = flags[0] == MYSELF ? flags[1] == MASTER : flags[0] == MASTER;
     84     node.IsSlave = !node.IsMater;
     85     int start = 0;
     86     if (flags[start] == MYSELF)
     87         start = 1;
     88     if (flags[start] == SLAVE || flags[start] == MASTER)
     89         start += 1;
     90     node.NodeFlag = string.Join(",", flags.Skip(start));
     91 
     92     if (segs.Length > 8)
     93     {
     94         string[] slots = segs[8].Split('-');
     95         node.Slot.Start = int.Parse(slots[0]);
     96         if (slots.Length > 1) node.Slot.End = int.Parse(slots[1]);
     97 
     98         for (int index = 9; index < segs.Length; index++)
     99         {
    100             if (node.RestSlots == null)
    101                 node.RestSlots = new List<HashSlot>();
    102 
    103             slots = segs[index].Split('-');
    104 
    105             int s1 = 0;
    106             int s2 = 0;
    107             bool b1 = int.TryParse(slots[0], out s1);
    108             bool b2 = int.TryParse(slots[1], out s2);
    109             if (!b1 || !b2)
    110                 continue;
    111             else
    112                 node.RestSlots.Add(new HashSlot(s1, slots.Length > 1 ? new Nullable<int>(s2) : null));
    113         }
    114     }
    115 
    116     return node;
    117 }

    View Code

     

      三、為節點分配緩存客戶端管理器

      在單實例的Redis中,我們通過 PooledRedisClientManager 這個管理器來獲取RedisClient。借鑒這個思路,在Redis Cluster集群中,我們為每一個主節點實例化一個 PooledRedisClientManager,並且該主節點持有的 slot 都共享一個 PooledRedisClientManager 實例。以 slot 做為 key 將 slot 與 PooledRedisClientManager 一一映射並緩存起來。

     1 // 初始化集群管理
     2 void Initialize(IList<InternalClusterNode> clusterNodes = null)
     3 {
     4     // 從 redis 讀取集群信息
     5     IList<InternalClusterNode> nodes = clusterNodes == null ? RedisCluster.ReadClusterNodes(_source) : clusterNodes;
     6 
     7     // 生成主節點,每個主節點的 slot 對應一個REDIS客戶端緩衝池管理器
     8     IList<InternalClusterNode> masters = null;
     9     IDictionary<int, PooledRedisClientManager> managers = null;
    10     foreach (var n in nodes)
    11     {
    12         // 節點無效或者
    13         if (!(n.IsMater &&
    14             !string.IsNullOrEmpty(n.Host) &&
    15             string.IsNullOrEmpty(n.NodeFlag) &&
    16             (string.IsNullOrEmpty(n.LinkState) || n.LinkState == InternalClusterNode.CONNECTED))) continue;
    17 
    18         n.SlaveNodes = nodes.Where(x => x.MasterNodeId == n.NodeId);
    19         if (masters == null)
    20             masters = new List<InternalClusterNode>();
    21         masters.Add(n);
    22 
    23         // 用每一個主節點的哈希槽做鍵,導入REDIS客戶端緩衝池管理器
    24         // 然後,方法表指針(又名類型對象指針)上場,佔據 4 個字節。 4 * 16384 / 1024 = 64KB
    25         if (managers == null)
    26             managers = new Dictionary<int, PooledRedisClientManager>();
    27 
    28         string[] writeHosts = new[] { n.HostString };
    29         string[] readHosts = n.SlaveNodes.Where(n => false).Select(n => n.HostString).ToArray();
    30         var pool = new PooledRedisClientManager(writeHosts, readHosts, _config);
    31         managers.Add(n.Slot.Start, pool);
    32         if (n.Slot.End != null)
    33         {
    34             // 這個範圍內的哈希槽都用同一個緩衝池
    35             for (int s = n.Slot.Start + 1; s <= n.Slot.End.Value; s++)
    36                 managers.Add(s, pool);
    37         }
    38         if (n.RestSlots != null)
    39         {
    40             foreach (var slot in n.RestSlots)
    41             {
    42                 managers.Add(slot.Start, pool);
    43                 if (slot.End != null)
    44                 {
    45                     // 這個範圍內的哈希槽都用同一個緩衝池
    46                     for (int s = slot.Start + 1; s <= slot.End.Value; s++)
    47                         managers.Add(s, pool);
    48                 }
    49             }
    50         }
    51     }
    52 
    53     _masters = masters;
    54     _redisClientManagers = managers;
    55     _clusterNodes = nodes != null ? nodes : null;
    56 
    57     if (_masters == null) _masters = new List<InternalClusterNode>(0);
    58     if (_clusterNodes == null) _clusterNodes = new List<InternalClusterNode>(0);
    59     if (_redisClientManagers == null) _redisClientManagers = new Dictionary<int, PooledRedisClientManager>(0);
    60 
    61     if (_masters.Count > 0)
    62         _source = _masters.Select(n => new ClusterNode(n.Host, n.Port, n.Password)).ToList();
    63 }

    View Code

     

      四、將 hash slot 路由到正確的節點

      在訪問一個 key 時,根據第三步緩存起來的 PooledRedisClientManager ,用 key 計算出來的 hash slot 值可以快速找出這個 key 對應的 PooledRedisClientManager 實例,調用 PooledRedisClientManager.GetClient() 即可將 hash slot 路由到正確的主節點。

     1 // 執行指定動作並返回值
     2 private T DoExecute<T>(string key, Func<RedisClient, T> action) => this.DoExecute(() => this.GetRedisClient(key), action);
     3 
     4 // 執行指定動作並返回值
     5 private T DoExecute<T>(Func<RedisClient> slot, Func<RedisClient, T> action, int tryTimes = 1)
     6 {
     7     RedisClient c = null;
     8     try
     9     {
    10         c = slot();
    11         return action(c);
    12     }
    13     catch (Exception ex)
    14     {
    15         // 此處省略 ...
    16     }
    17     finally
    18     {
    19         if (c != null)
    20             c.Dispose();
    21     }
    22 }
    23 
    24 // 獲取指定key對應的主設備節點
    25 private RedisClient GetRedisClient(string key)
    26 {
    27     if (string.IsNullOrEmpty(key))
    28         throw new ArgumentNullException("key");
    29 
    30     int slot = CRC16.GetSlot(key);
    31     if (!_redisClientManagers.ContainsKey(slot))
    32         throw new SlotNotFoundException(string.Format("No reachable node in cluster for slot {{{0}}}", slot), slot, key);
    33 
    34     var pool = _redisClientManagers[slot];
    35     return (RedisClient)pool.GetClient();
    36 }

       

      五、自動發現新節點和自動刷新slot分佈

      在實際生產環境中,Redis 集群經常會有添加/刪除節點、遷移 slot 、主節點宕機從節點轉主節點等,針對這些情況,我們的 Redis Cluster 組件必須具備自動發現節點和刷新在 第三步  緩存起來的 slot 的能力。在這裏我的實現思路是當節點執行 Redis 命令時返回 RedisException 異常時就強制刷新集群節點信息並重新緩存 slot 與 節點之間的映射。

      1 // 執行指定動作並返回值
      2 private T DoExecute<T>(Func<RedisClient> slot, Func<RedisClient, T> action, int tryTimes = 1)
      3 {
      4     RedisClient c = null;
      5     try
      6     {
      7         c = slot();
      8         return action(c);
      9     }
     10     catch (Exception ex)
     11     {
     12         if (!(ex is RedisException) || tryTimes == 0) throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex);
     13         else
     14         {
     15             tryTimes -= 1;
     16             // 嘗試重新刷新集群信息
     17             bool isRefresh = DiscoveryNodes(_source, _config);
     18             if (isRefresh)
     19                 // 集群節點有更新過,重新執行
     20                 return this.DoExecute(slot, action, tryTimes);
     21             else
     22                 // 集群節點未更新過,直接拋出異常
     23                 throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex);
     24         }
     25     }
     26     finally
     27     {
     28         if (c != null)
     29             c.Dispose();
     30     }
     31 }
     32 
     33 // 重新刷新集群信息
     34 private bool DiscoveryNodes(IEnumerable<ClusterNode> source, RedisClientManagerConfig config)
     35 {
     36     bool lockTaken = false;
     37     try
     38     {
     39         // noop
     40         if (_isDiscoverying) { }
     41 
     42         Monitor.Enter(_objLock, ref lockTaken);
     43 
     44         _source = source;
     45         _config = config;
     46         _isDiscoverying = true;
     47 
     48         // 跟上次同步時間相隔 {MONITORINTERVAL} 秒鐘以上才需要同步
     49         if ((DateTime.Now - _lastDiscoveryTime).TotalMilliseconds >= MONITORINTERVAL)
     50         {
     51             bool isRefresh = false;
     52             IList<InternalClusterNode> newNodes = RedisCluster.ReadClusterNodes(_source);
     53             foreach (var node in newNodes)
     54             {
     55                 var n = _clusterNodes.FirstOrDefault(x => x.HostString == node.HostString);
     56                 isRefresh =
     57                     n == null ||                        // 新節點                                                                
     58                     n.Password != node.Password ||      // 密碼變了                                                                
     59                     n.IsMater != node.IsMater ||        // 主變從或者從變主                                                                
     60                     n.IsSlave != node.IsSlave ||        // 主變從或者從變主                                                                
     61                     n.NodeFlag != node.NodeFlag ||      // 節點標記位變了                                                                
     62                     n.LinkState != node.LinkState ||    // 節點狀態位變了                                                                
     63                     n.Slot.Start != node.Slot.Start ||  // 哈希槽變了                                                                
     64                     n.Slot.End != node.Slot.End ||      // 哈希槽變了
     65                     (n.RestSlots == null && node.RestSlots != null) ||
     66                     (n.RestSlots != null && node.RestSlots == null);
     67                 if (!isRefresh && n.RestSlots != null && node.RestSlots != null)
     68                 {
     69                     var slots1 = n.RestSlots.OrderBy(x => x.Start).ToList();
     70                     var slots2 = node.RestSlots.OrderBy(x => x.Start).ToList();
     71                     for (int index = 0; index < slots1.Count; index++)
     72                     {
     73                         isRefresh =
     74                             slots1[index].Start != slots2[index].Start ||   // 哈希槽變了                                                                
     75                             slots1[index].End != slots2[index].End;         // 哈希槽變了
     76                         if (isRefresh) break;
     77                     }
     78                 }
     79 
     80                 if (isRefresh) break;
     81             }
     82 
     83             if (isRefresh)
     84             {
     85                 // 重新初始化集群
     86                 this.Dispose();
     87                 this.Initialize(newNodes);
     88                 this._lastDiscoveryTime = DateTime.Now;
     89             }
     90         }
     91 
     92         // 最後刷新時間在 {MONITORINTERVAL} 內,表示是最新群集信息 newest
     93         return (DateTime.Now - _lastDiscoveryTime).TotalMilliseconds < MONITORINTERVAL;
     94     }
     95     finally
     96     {
     97         if (lockTaken)
     98         {
     99             _isDiscoverying = false;
    100             Monitor.Exit(_objLock);
    101         }
    102     }
    103 }

    View Code

     

      六、配置訪問組件調用入口

      最後我們需要為組件提供訪問入口,我們用 RedisCluster 類實現 字符串、列表、哈希、集合、有序集合和Keys的基本操作,並且用 RedisClusterFactory 工廠類對外提供單例操作,這樣就可以像單實例 Redis 那樣調用 Redis Cluster 集群。調用示例:

    var node = new ClusterNode("127.0.0.1", 7001);
    var redisCluster = RedisClusterFactory.Configure(node, config);
    string key = "B070x14668";
    redisCluster.Set(key, key);
    string value = redisCluster.Get<string>(key);
    redisCluster.Del(key);
     1 /// <summary>
     2 /// REDIS 集群工廠
     3 /// </summary>
     4 public class RedisClusterFactory
     5 {
     6     static RedisClusterFactory _factory = new RedisClusterFactory();
     7     static RedisCluster _cluster = null;
     8 
     9     /// <summary>
    10     /// Redis 集群
    11     /// </summary>
    12     public static RedisCluster Cluster
    13     {
    14         get
    15         {
    16             if (_cluster == null)
    17                 throw new Exception("You should call RedisClusterFactory.Configure to config cluster first.");
    18             else
    19                 return _cluster;
    20         }
    21     }
    22 
    23     /// <summary>
    24     /// 初始化 <see cref="RedisClusterFactory"/> 類的新實例
    25     /// </summary>
    26     private RedisClusterFactory()
    27     {
    28     }
    29 
    30     /// <summary>
    31     /// 配置 REDIS 集群
    32     /// <para>若群集中有指定 password 的節點,必須使用  IEnumerable&lt;ClusterNode&gt; 重載列舉出這些節點</para>
    33     /// </summary>
    34     /// <param name="node">集群節點</param>
    35     /// <returns></returns>
    36     public static RedisCluster Configure(ClusterNode node)
    37     {
    38         return RedisClusterFactory.Configure(node, null);
    39     }
    40 
    41     /// <summary>
    42     /// 配置 REDIS 集群
    43     /// <para>若群集中有指定 password 的節點,必須使用  IEnumerable&lt;ClusterNode&gt; 重載列舉出這些節點</para>
    44     /// </summary>
    45     /// <param name="node">集群節點</param>
    46     /// <param name="config"><see cref="RedisClientManagerConfig"/> 客戶端緩衝池配置</param>
    47     /// <returns></returns>
    48     public static RedisCluster Configure(ClusterNode node, RedisClientManagerConfig config)
    49     {
    50         return RedisClusterFactory.Configure(new List<ClusterNode> { node }, config);
    51     }
    52 
    53     /// <summary>
    54     /// 配置 REDIS 集群
    55     /// </summary>
    56     /// <param name="nodes">集群節點</param>
    57     /// <param name="config"><see cref="RedisClientManagerConfig"/> 客戶端緩衝池配置</param>
    58     /// <returns></returns>
    59     public static RedisCluster Configure(IEnumerable<ClusterNode> nodes, RedisClientManagerConfig config)
    60     {
    61         if (nodes == null)
    62             throw new ArgumentNullException("nodes");
    63 
    64         if (nodes == null || nodes.Count() == 0)
    65             throw new ArgumentException("There is no nodes to configure cluster.");
    66 
    67         if (_cluster == null)
    68         {
    69             lock (_factory)
    70             {
    71                 if (_cluster == null)
    72                 {
    73                     RedisCluster c = new RedisCluster(nodes, config);
    74                     _cluster = c;
    75                 }
    76             }
    77         }
    78 
    79         return _cluster;
    80     }
    81 }

    View Code

     

      總結

      今天我們詳細介紹了如何從0手寫一個Redis Cluster集群客戶端訪問組件,相信對同樣在尋找類似解決方案的同學們會有一定的啟發,喜歡的同學請點個 star。在沒有相同案例可以參考的情況下筆者通過查閱官方說明文檔和借鑒 Java 的 JedisCluster 的實現思路,雖說磕磕碰碰但最終也初步完成這個組件並投入使用,必須給自己加一個雞腿!!在此我有一個小小的疑問,.NET 的同學們在用 Redis 集群時,你們是用什麼組件耍的,為何網上的相關介紹和現成組件幾乎都沒有?歡迎討論。

      GitHub 代碼託管:https://github.com/TANZAME/ServiceStack.Redis.Cluster

      技術交流 QQ 群:816425449

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※超省錢租車方案

    ※別再煩惱如何寫文案,掌握八大原則!

    ※回頭車貨運收費標準

    ※教你寫出一流的銷售文案?

    FB行銷專家,教你從零開始的技巧

    聚甘新

  • 荷蘭開放首條塑膠再製自行車道 約含50萬個瓶蓋

    摘錄自2018年09月18日科技新報報導

    荷蘭人以愛好騎單車聞名,該國約 1,700 萬人,卻擁有超過 2,200 萬輛腳踏車,光是阿姆斯特丹就鋪設了近 800 公里的自行車道,東北部城鎮茲沃勒(Zwolle)日前又正式開放一條全由回收塑膠再製生成的自行車道。

    該塑膠車道概念來自道路工程公司 KWS 的 Anne Koudstaal 和 Simon Jorritsma,道路全長 30 公尺,採用預製模塊,重量輕、易安裝,下方設計了排水系統讓管道、電纜通過,可讓水快速流通,遇到暴雨時還能充當臨時儲水槽、避免淹水。

    據估計,這條自行車道包含了約 218,000 個塑膠杯,或 50 萬個塑膠瓶蓋,使用年限可比傳統道路長 2~3 倍(有待觀察),路面應該也不會出現裂縫或坑洞。

     

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

  • 德國小城弗萊堡 交通轉型有成 35年私人汽車減半

    環境資訊中心特約記者 陳文姿報導

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※為什麼 USB CONNECTOR 是電子產業重要的元件?

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※台北網頁設計公司全省服務真心推薦

    ※想知道最厲害的網頁設計公司“嚨底家”!

    ※推薦評價好的iphone維修中心

    聚甘新

  • 維珍航空成功生產環保飛機燃料 減低環境影響

    摘錄自2018年09月18日科技新報報導

    維珍航空與 LanzaTech 從 2011 年開始研發環保飛機燃料技術,最近宣布終於開發成功,從鋼鐵煉製廠的工業廢氣提煉了 1,500 加侖飛機燃料。

    此技術把原本排出大氣層的廢氣,透過發酵過程轉換成低碳乙醇 Lanzanol,首批 Lanzanol 在中國首鋼集團廠房生產。初步測試顯示,這種飛機燃料比傳統燃料減少 65% 碳排放,意味除了生產過程可持續,實際使用也相當環保。

    兩家公司希望未來擴充生產,預計如果把計劃推展到全球鋼鐵煉製廠,可生產目前全球所有航班燃料的五分之一。維珍航空也希望明年可使用這種新燃料首次測試飛行。

    本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

    台北網頁設計公司這麼多該如何選擇?

    ※智慧手機時代的來臨,RWD網頁設計為架站首選

    ※評比南投搬家公司費用收費行情懶人包大公開

    ※回頭車貨運收費標準

    聚甘新