分類: 3C資訊

  • [源碼解析] GroupReduce,GroupCombine 和 Flink SQL group by

    [源碼解析] GroupReduce,GroupCombine和Flink SQL group by

    目錄

    • [源碼解析] GroupReduce,GroupCombine和Flink SQL group by
      • 0x00 摘要
      • 0x01 緣由
      • 0x02 概念
        • 2.1 GroupReduce
        • 2.2 GroupCombine
        • 2.3 例子
      • 0x03 代碼
      • 0x04 Flink SQL內部翻譯
      • 0x05 JobGraph
      • 0x06 Runtime
        • 6.1 ChainedFlatMapDriver
        • 6.2 GroupReduceCombineDriver
        • 6.3 GroupReduceDriver & ChainedFlatMapDriver
      • 0x07 總結
      • 0x08 參考

    0x00 摘要

    本文從源碼和實例入手,為大家解析 Flink 中 GroupReduce 和 GroupCombine 的用途。也涉及到了 Flink SQL group by 的內部實現。

    0x01 緣由

    在前文[源碼解析] Flink的Groupby和reduce究竟做了什麼中,我們剖析了Group和reduce都做了些什麼,也對combine有了一些了解。但是總感覺意猶未盡,因為在Flink還提出了若干新算子,比如GroupReduce和GroupCombine。這幾個算子不搞定,總覺得如鯁在喉,但沒有找到一個良好的例子來進行剖析說明。

    本文是筆者在探究Flink SQL UDF問題的一個副產品。起初是為了調試一段sql代碼,結果發現Flink本身給出了一個GroupReduce和GroupCombine使用的完美例子。於是就拿出來和大家共享,一起分析看看究竟如何使用這兩個算子。

    請注意:這個例子是Flink SQL,所以本文中將涉及Flink SQL goup by內部實現的知識。

    0x02 概念

    Flink官方對於這兩個算子的使用說明如下:

    2.1 GroupReduce

    GroupReduce算子應用在一個已經分組了的DataSet上,其會對每個分組都調用到用戶定義的group-reduce函數。它與Reduce的區別在於用戶定義的函數會立即獲得整個組。

    Flink將在組的所有元素上使用Iterable調用用戶自定義函數,並且可以返回任意數量的結果元素。

    2.2 GroupCombine

    GroupCombine轉換是可組合GroupReduceFunction中組合步驟的通用形式。它在某種意義上被概括為允許將輸入類型 I 組合到任意輸出類型O。與此相對的是,GroupReduce中的組合步驟僅允許從輸入類型 I 到輸出類型 I 的組合。這是因為GroupReduceFunction的 “reduce步驟” 期望自己的輸入類型為 I。

    在一些應用中,我們期望在執行附加變換(例如,減小數據大小)之前將DataSet組合成中間格式。這可以通過CombineGroup轉換能以非常低的成本實現。

    注意:分組數據集上的GroupCombine在內存中使用貪婪策略執行,該策略可能不會一次處理所有數據,而是以多個步驟處理。它也可以在各個分區上執行,而無需像GroupReduce轉換那樣進行數據交換。這可能會導致輸出的是部分結果,所以GroupCombine是不能替代GroupReduce操作的,儘管它們的操作內容可能看起來都一樣。

    2.3 例子

    是不是有點暈?還是直接讓代碼來說話吧。以下官方示例演示了如何將CombineGroup和GroupReduce轉換用於WordCount實現。即通過combine操作先對單詞數目進行初步排序,然後通過reduceGroup對combine產生的結果進行最終排序。因為combine進行了初步排序,所以在算子之間傳輸的數據量就少多了

    DataSet<String> input = [..] // The words received as input
    
    // 這裏通過combine操作先對單詞數目進行初步排序,其優勢在於用戶定義的combine函數只調用一次,因為runtime已經把輸入數據一次性都提供給了自定義函數。  
    DataSet<Tuple2<String, Integer>> combinedWords = input
      .groupBy(0) // group identical words
      .combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>() {
    
        public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>>) { // combine
            String key = null;
            int count = 0;
    
            for (String word : words) {
                key = word;
                count++;
            }
            // emit tuple with word and count
            out.collect(new Tuple2(key, count));
        }
    });
    
    // 這裏對combine的結果進行第二次排序,其優勢在於用戶定義的reduce函數只調用一次,因為runtime已經把輸入數據一次性都提供給了自定義函數。  
    DataSet<Tuple2<String, Integer>> output = combinedWords
      .groupBy(0)                              // group by words again
      .reduceGroup(new GroupReduceFunction() { // group reduce with full data exchange
    
        public void reduce(Iterable<Tuple2<String, Integer>>, Collector<Tuple2<String, Integer>>) {
            String key = null;
            int count = 0;
    
            for (Tuple2<String, Integer> word : words) {
                key = word;
                count++;
            }
            // emit tuple with word and count
            out.collect(new Tuple2(key, count));
        }
    });
    

    看到這裏,有的兄弟已經明白了,這和mapPartition很類似啊,都是runtime做了大量工作。為了讓大家這兩個算子的使用情形有深刻的認識,我們再通過一個sql的例子,向大家展示Flink內部是怎麼應用這兩個算子的,也能看出來他們的強大之處

    0x03 代碼

    下面代碼主要參考自 flink 使用問題匯總。我們可以看到這裏通過groupby進行了聚合操作。其中collect方法,類似於mysql的group_concat。

    public class UdfExample {
        public static class MapToString extends ScalarFunction {
    
            public String eval(Map<String, Integer> map) {
                if(map==null || map.size()==0) {
                    return "";
                }
                StringBuffer sb=new StringBuffer();
                for(Map.Entry<String, Integer> entity : map.entrySet()) {
                    sb.append(entity.getKey()+",");
                }
                String result=sb.toString();
                return result.substring(0, result.length()-1);
            }
        }
    
        public static void main(String[] args) throws Exception {
            MemSourceBatchOp src = new MemSourceBatchOp(new Object[][]{
                    new Object[]{"1", "a", 1L},
                    new Object[]{"2", "b33", 2L},
                    new Object[]{"2", "CCC", 2L},
                    new Object[]{"2", "xyz", 2L},
                    new Object[]{"1", "u", 1L}
            }, new String[]{"f0", "f1", "f2"});
    
            BatchTableEnvironment environment = MLEnvironmentFactory.getDefault().getBatchTableEnvironment();
            Table table = environment.fromDataSet(src.getDataSet());
            environment.registerTable("myTable", table);
            BatchOperator.registerFunction("MapToString",  new MapToString());
            BatchOperator.sqlQuery("select f0, mapToString(collect(f1)) as type from myTable group by f0").print();
        }
    }
    

    程序輸出是

    f0|type
    --|----
    1|a,u
    2|CCC,b33,xyz
    

    0x04 Flink SQL內部翻譯

    這個SQL語句的重點是group by。這個是程序猿經常使用的操作。但是大家有沒有想過這個group by在真實運行起來時候是怎麼操作的呢?針對大數據環境有沒有做了什麼優化呢?其實,Flink正是使用了GroupReduce和GroupCombine來實現並且優化了group by的功能。優化之處在於:

    • GroupReduce和GroupCombine的函數調用次數要遠低於正常的reduce算子,如果reduce操作中涉及到頻繁創建額外的對象或者外部資源操作,則會相當省時間。
    • 因為combine進行了初步排序,所以在算子之間傳輸的數據量就少多了。

    SQL生成Flink的過程十分錯綜複雜,所以我們只能找最關鍵處。其是在 DataSetAggregate.translateToPlan 完成的。我們可以看到,對於SQL語句 “select f0, mapToString(collect(f1)) as type from myTable group by f0”,Flink系統把它翻譯成如下階段,即

    • pre-aggregation :排序 + combine;
    • final aggregation :排序 + reduce;

    從之前的文章我們可以知道,groupBy這個其實不是一個算子,它只是排序過程中的一個輔助步驟而已,所以我們重點還是要看combineGroup和reduceGroup。這恰恰是我們想要的完美例子。

    input ----> (groupBy + combineGroup) ----> (groupBy + reduceGroup) ----> output
    

    SQL生成的Scala代碼如下,其中 combineGroup在後續中將生成GroupCombineOperator,reduceGroup將生成GroupReduceOperator。

      override def translateToPlan(
          tableEnv: BatchTableEnvImpl,
          queryConfig: BatchQueryConfig): DataSet[Row] = {
    
        if (grouping.length > 0) {
          // grouped aggregation
          ...... 
          if (preAgg.isDefined) { // 我們的例子是在這裏
            inputDS          
              // pre-aggregation
              .groupBy(grouping: _*)
              .combineGroup(preAgg.get) // 將生成GroupCombineOperator算子
              .returns(preAggType.get)
              .name(aggOpName)
              // final aggregation
              .groupBy(grouping.indices: _*) //將生成GroupReduceOperator算子。
              .reduceGroup(finalAgg.right.get)
              .returns(rowTypeInfo)
              .name(aggOpName)
          } else {
            ......
          }
        }
        else {
          ......
        }
      }
    }
    
    // 程序變量打印如下
    this = {DataSetAggregate@5207} "Aggregate(groupBy: (f0), select: (f0, COLLECT(f1) AS $f1))"
     cluster = {RelOptCluster@5220} 
    

    0x05 JobGraph

    LocalExecutor.execute中會生成JobGraph。JobGraph是提交給 JobManager 的數據結構,是唯一被Flink的數據流引擎所識別的表述作業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。

    在生成JobGraph時候,系統得到如下JobVertex。

    jobGraph = {JobGraph@5652} "JobGraph(jobId: 6aae8b5e5ad32f588136bef26f8b65f6)"
     taskVertices = {LinkedHashMap@5655}  size = 4
    
    {JobVertexID@5677} "c625209bb7fb9a098807551840aeaa99" -> {InputOutputFormatVertex@5678} "CHAIN DataSource (at initializeDataSource(MemSourceBatchOp.java:98) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (select: (f0, f1)) (org.apache.flink.runtime.operators.DataSourceTask)"
    
    {JobVertexID@5679} "b56ace4acd7a2f69ea110a9f262ff80a" -> {JobVertex@5680} "CHAIN GroupReduce (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1)) -> FlatMap (select: (f0, mapToString($f1) AS type)) -> Map (Map at linkFrom(MapBatchOp.java:35)) (org.apache.flink.runtime.operators.BatchTask)"
     
    {JobVertexID@5681} "3f5e2a0f700421d80ce85e02a6d9db73" -> {InputOutputFormatVertex@5682} "DataSink (collect()) (org.apache.flink.runtime.operators.DataSinkTask)"
     
    {JobVertexID@5683} "ad29dc5b2e0a39ad2cd1d164b6f859f7" -> {JobVertex@5684} "GroupCombine (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1)) (org.apache.flink.runtime.operators.BatchTask)"
    

    我們可以看到,在JobGraph中就生成了對應的兩個算子。其中這裏的FlatMap就是用戶的UDF函數MapToString的映射生成。

    GroupCombine (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1))  
      
    CHAIN GroupReduce (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1)) -> FlatMap (select: (f0, mapToString($f1) AS type)) -> Map 
    

    0x06 Runtime

    最後,讓我們看看runtime會如何處理這兩個算子。

    6.1 ChainedFlatMapDriver

    首先,Flink會在ChainedFlatMapDriver.collect中對record進行處理,這是從Table中提取數據所必須經歷的,與後續的group by關係不大。

    @Override
    public void collect(IT record) {
       try {
          this.numRecordsIn.inc();
          this.mapper.flatMap(record, this.outputCollector);
       } catch (Exception ex) {
          throw new ExceptionInChainedStubException(this.taskName, ex);
       }
    }
    
    // 這裡能夠看出來,我們獲取了第一列記錄
    record = {Row@9317} "1,a,1"
     fields = {Object[3]@9330} 
    this.taskName = "FlatMap (select: (f0, f1))"
    
    // 程序堆棧打印如下
    collect:80, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining)
    collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
    invoke:196, DataSourceTask (org.apache.flink.runtime.operators)
    doRun:707, Task (org.apache.flink.runtime.taskmanager)
    run:532, Task (org.apache.flink.runtime.taskmanager)
    run:748, Thread (java.lang)
    

    6.2 GroupReduceCombineDriver

    其次,GroupReduceCombineDriver.run()中會進行combine操作。

    1. 會通過this.sorter.write(value)把數據寫到排序緩衝區。
    2. 會通過sortAndCombineAndRetryWrite(value)進行實際的排序,合併。

    因為是系統實現,所以Combine的用戶自定義函數就是由Table API提供的,比如org.apache.flink.table.functions.aggfunctions.CollectAccumulator.accumulate

    @Override
    public void run() throws Exception {
       final MutableObjectIterator<IN> in = this.taskContext.getInput(0);
       final TypeSerializer<IN> serializer = this.serializer;
    
       if (objectReuseEnabled) {
        .....
       }
       else {
          IN value;
          while (running && (value = in.next()) != null) {
             // try writing to the sorter first
             if (this.sorter.write(value)) {
                continue;
             }
    
             // do the actual sorting, combining, and data writing
             sortAndCombineAndRetryWrite(value);
          }
       }
    
       // sort, combine, and send the final batch
       if (running) {
          sortAndCombine();
       }
    }
    
    // 程序變量如下
    value = {Row@9494} "1,a"
     fields = {Object[2]@9503} 
    

    sortAndCombine是具體排序/合併的過程。

    1. 排序是通過 org.apache.flink.runtime.operators.sort.QuickSort 完成的。
    2. 合併是通過 org.apache.flink.table.functions.aggfunctions.CollectAccumulator.accumulate 完成的。
    3. 給下游是由 org.apache.flink.table.runtime.aggregate.DataSetPreAggFunction.combine 調用 out.collect(output) 完成的。
    private void sortAndCombine() throws Exception {
       final InMemorySorter<IN> sorter = this.sorter;
       // 這裏進行實際的排序
       this.sortAlgo.sort(sorter);
       final GroupCombineFunction<IN, OUT> combiner = this.combiner;
       final Collector<OUT> output = this.output;
    
       // iterate over key groups
       if (objectReuseEnabled) {
    			......		
       } else {
          final NonReusingKeyGroupedIterator<IN> keyIter = 
                new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
          // 這裡是歸併操作
          while (this.running && keyIter.nextKey()) {
             // combiner.combiner 是用戶定義操作,runtime把某key對應的數據一次性傳給它
             combiner.combine(keyIter.getValues(), output);
          }
       }
    }
    

    具體調用棧如下:

    accumulate:57, CollectAggFunction (org.apache.flink.table.functions.aggfunctions)
    accumulate:-1, DataSetAggregatePrepareMapHelper$5
    combine:71, DataSetPreAggFunction (org.apache.flink.table.runtime.aggregate)
    sortAndCombine:213, GroupReduceCombineDriver (org.apache.flink.runtime.operators)
    run:188, GroupReduceCombineDriver (org.apache.flink.runtime.operators)
    run:504, BatchTask (org.apache.flink.runtime.operators)
    invoke:369, BatchTask (org.apache.flink.runtime.operators)
    doRun:707, Task (org.apache.flink.runtime.taskmanager)
    run:532, Task (org.apache.flink.runtime.taskmanager)
    run:748, Thread (java.lang)
    

    6.3 GroupReduceDriver & ChainedFlatMapDriver

    這兩個放在一起,是因為他們組成了Operator Chain。

    GroupReduceDriver.run中完成了reduce。具體reduce 操作是在 org.apache.flink.table.runtime.aggregate.DataSetFinalAggFunction.reduce 完成的,然後在其中直接發送給下游 out.collect(output)

    @Override
    public void run() throws Exception {
       // cache references on the stack
       final GroupReduceFunction<IT, OT> stub = this.taskContext.getStub();
     
       if (objectReuseEnabled) {
           ......	
       }
       else {
          final NonReusingKeyGroupedIterator<IT> iter = new NonReusingKeyGroupedIterator<IT>(this.input, this.comparator);
          // run stub implementation
          while (this.running && iter.nextKey()) {
             // stub.reduce 是用戶定義操作,runtime把某key對應的數據一次性傳給它
             stub.reduce(iter.getValues(), output);
          }
       }
    }
    

    從前文我們可以,這裏已經配置成了Operator Chain,所以out.collect(output)會調用到CountingCollector。CountingCollector的成員變量collector已經配置成了ChainedFlatMapDriver。

    public void collect(OUT record) {
       this.numRecordsOut.inc();
       this.collector.collect(record);
    }
    
    this.collector = {ChainedFlatMapDriver@9643} 
     mapper = {FlatMapRunner@9610} 
     config = {TaskConfig@9655} 
     taskName = "FlatMap (select: (f0, mapToString($f1) AS type))"
    

    於是程序就調用到了 ChainedFlatMapDriver.collect

    public void collect(IT record) {
       try {
          this.numRecordsIn.inc();
          this.mapper.flatMap(record, this.outputCollector);
       } catch (Exception ex) {
          throw new ExceptionInChainedStubException(this.taskName, ex);
       }
    }
    

    最終調用棧如如下:

    eval:21, UdfExample$MapToString (com.alibaba.alink)
    flatMap:-1, DataSetCalcRule$14
    flatMap:52, FlatMapRunner (org.apache.flink.table.runtime)
    flatMap:31, FlatMapRunner (org.apache.flink.table.runtime)
    collect:80, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining)
    collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
    reduce:80, DataSetFinalAggFunction (org.apache.flink.table.runtime.aggregate)
    run:131, GroupReduceDriver (org.apache.flink.runtime.operators)
    run:504, BatchTask (org.apache.flink.runtime.operators)
    invoke:369, BatchTask (org.apache.flink.runtime.operators)
    doRun:707, Task (org.apache.flink.runtime.taskmanager)
    run:532, Task (org.apache.flink.runtime.taskmanager)
    run:748, Thread (java.lang)
    

    0x07 總結

    由此我們可以看到:

    • GroupReduce,GroupCombine和mapPartition十分類似,都是從系統層面對算子進行優化,把循環操作放到用戶自定義函數來處理。
    • 對於group by這個SQL語句,Flink將其翻譯成 GroupReduce + GroupCombine,採用兩階段優化的方式來完成了對大數據下的處理。

    0x08 參考

    flink 使用問題匯總

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

    南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

  • 漲姿勢了解一下Kafka消費位移可好?

    漲姿勢了解一下Kafka消費位移可好?

    摘要:Kafka中的位移是個極其重要的概念,因為數據一致性、準確性是一個很重要的語義,我們都不希望消息重複消費或者丟失。而位移就是控制消費進度的大佬。本文就詳細聊聊kafka消費位移的那些事,包括:

    概念剖析

    kafka的兩種位移

    關於位移(Offset),其實在kafka的世界里有兩種位移:

    • 分區位移:生產者向分區寫入消息,每條消息在分區中的位置信息由一個叫offset的數據來表徵。假設一個生產者向一個空分區寫入了 10 條消息,那麼這 10 條消息的位移依次是 0、1、…、9;

    • 消費位移:消費者需要記錄消費進度,即消費到了哪個分區的哪個位置上,這是消費者位移(Consumer Offset)。

    注意,這和上面所說的消息在分區上的位移完全不是一個概念。上面的“位移”表徵的是分區內的消息位置,它是不變的,即一旦消息被成功寫入到一個分區上,它的位移值就是固定的了。而消費者位移則不同,它可能是隨時變化的,畢竟它是消費者消費進度的指示器。

    消費位移

    消費位移,記錄的是 Consumer 要消費的下一條消息的位移,切記,是下一條消息的位移! 而不是目前最新消費消息的位移

    假設一個分區中有 10 條消息,位移分別是 0 到 9。某個 Consumer 應用已消費了 5 條消息,這就說明該 Consumer 消費了位移為 0 到 4 的 5 條消息,此時 Consumer 的位移是 5,指向了下一條消息的位移。

    至於為什麼要有消費位移,很好理解,當 Consumer 發生故障重啟之後,就能夠從 Kafka 中讀取之前提交的位移值,然後從相應的位移處繼續消費,從而避免整個消費過程重來一遍。就好像書籤一樣,需要書籤你才可以快速找到你上次讀書的位置。

    那麼了解了位移是什麼以及它的重要性,我們自然而然會有一個疑問,kafka是怎麼記錄、怎麼保存、怎麼管理位移的呢?

    位移的提交

    Consumer 需要上報自己的位移數據,這個彙報過程被稱為位移提交。因為 Consumer 能夠同時消費多個分區的數據,所以位移的提交實際上是在分區粒度上進行的,即Consumer 需要為分配給它的每個分區提交各自的位移數據。

    鑒於位移提交甚至是位移管理對 Consumer 端的巨大影響,KafkaConsumer API提供了多種提交位移的方法,每一種都有各自的用途,這些都是本文將要談到的方案。

    void commitSync(Duration timeout);
    void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
    void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
    void commitAsync();
    void commitAsync(OffsetCommitCallback callback);
    void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
    

    先粗略的總結一下。位移提交分為自動提交和手動提交;而手動提交又分為同步提交和異步提交。

    自動提交

    當消費配置enable.auto.commit=true的時候代表自動提交位移。

    自動提交位移是發生在什麼時候呢?auto.commit.interval.ms默認值是50000ms。即kafka每隔5s會幫你自動提交一次位移。自動位移提交的動作是在 poll()方法的邏輯里完成的,在每次真正向服務端發起拉取請求之前會檢查是否可以進行位移提交,如果可以,那麼就會提交上一次輪詢的位移。假如消費數據量特別大,可以設置的短一點。

    越簡單的東西功能越不足,自動提交位移省事的同時肯定會帶來一些問題。自動提交帶來重複消費和消息丟失的問題:

    • 重複消費: 在默認情況下,Consumer 每 5 秒自動提交一次位移。現在,我們假設提交位移之後的 3 秒發生了 Rebalance 操作。在 Rebalance 之後,所有 Consumer 從上一次提交的位移處繼續消費,但該位移已經是 3 秒前的位移數據了,故在 Rebalance 發生前 3 秒消費的所有數據都要重新再消費一次。雖然你能夠通過減少 auto.commit.interval.ms 的值來提高提交頻率,但這麼做只能縮小重複消費的時間窗口,不可能完全消除它。這是自動提交機制的一個缺陷。

    • 消息丟失: 假設拉取了100條消息,正在處理第50條消息的時候,到達了自動提交窗口期,自動提交線程將拉取到的每個分區的最大消息位移進行提交,如果此時消費服務掛掉,消息並未處理結束,但卻提交了最大位移,下次重啟就從100條那消費,即發生了50-100條的消息丟失。

    手動提交

    當消費配置enable.auto.commit=false的時候代表手動提交位移。用戶必須在適當的時機(一般是處理完業務邏輯后),手動的調用相關api方法提交位移。比如在下面的案例中,我需要確認我的業務邏輯返回true之後再手動提交位移

     while (true) {
         try {
             ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
             if (!consumerRecords.isEmpty()) {
                 for (ConsumerRecord<String, String> record : consumerRecords) {
                     KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                     // 處理業務
                     boolean handleResult = handle(kafkaMessage);
                     if (handleResult) {
                         log.info(" handle success, kafkaMessage={}" ,kafkaMessage);
                     } else {
                         log.info(" handle failed, kafkaMessage={}" ,kafkaMessage);
                     }
                 }
                 // 手動提交offset
                 consumer.commitSync(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
            
             } 
         } catch (Exception e) {
             log.info("kafka consume error." ,e);
         }
     }
    

    手動提交明顯能解決消息丟失的問題,因為你是處理完業務邏輯后再提交的,假如此時消費服務掛掉,消息並未處理結束,那麼重啟的時候還會重新消費。

    但是對於業務層面的失敗導致消息未消費成功,是無法處理的。因為業務層的邏輯千變萬化、比如格式不正確,你叫Kafka消費端程序怎麼去處理?應該要業務層面自己處理,記錄失敗日誌做好監控等。

    但是手動提交不能解決消息重複的問題,也很好理解,假如消費0-100條消息,50條時掛了,重啟後由於沒有提交這一批消息的offset,是會從0開始重新消費。至於如何避免重複消費的問題,在這篇文章有說。

    手動提交又分為異步提交和同步提交。

    同步提交

    上面案例代碼使用的是commitSync() ,顧名思義,是同步提交位移的方法。同步提交位移Consumer 程序會處於阻塞狀態,等待 Broker 返回提交結果。同步模式下提交失敗的時候一直嘗試提交,直到遇到無法重試的情況下才會結束。在任何系統中,因為程序而非資源限制而導致的阻塞都可能是系統的瓶頸,會影響整個應用程序的 TPS。當然,你可以選擇拉長提交間隔,但這樣做的後果是 Consumer 的提交頻率下降,在下次 Consumer 重啟回來后,會有更多的消息被重新消費。因此,為了解決這些不足,kafka還提供了異步提交方法。

    異步提交

    異步提交會立即返回,不會阻塞,因此不會影響 Consumer 應用的 TPS。由於它是異步的,Kafka 提供了回調函數,供你實現提交之後的邏輯,比如記錄日誌或處理異常等。下面這段代碼展示了調用 commitAsync() 的方法

     consumer.commitAsync((offsets, exception) -> {
     if (exception != null)
         handleException(exception);
     });
    

    但是異步提交會有一個問題,那就是它沒有重試機制,不過一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那麼後續的提交總會有成功的。所以消息也是不會丟失和重複消費的。
    但如果這是發生在關閉消費者或再均衡前的最後一次提交,就要確保能夠提交成功。因此,組合使用commitAsync()commitSync()是最佳的方式。

    try {
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
            if (!consumerRecords.isEmpty()) {
                 for (ConsumerRecord<String, String> record : consumerRecords) {
                    KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                    boolean handleResult = handle(kafkaMessage);             
                 }
                 //異步提交位移               
                 consumer.commitAsync((offsets, exception) -> {
                 if (exception != null)
                     handleException(exception);
                 });
               
            }
        }
    } catch (Exception e) {
        System.out.println("kafka consumer error:" + e.toString());
    } finally {
        try {
            //最後同步提交位移
            consumer.commitSync();
        } finally {
            consumer.close();
        }
    }
    

    讓位移提交更加靈活和可控

    如果細心的閱讀了上面所有demo的代碼,那麼你會發現這樣幾個問題:

    1、所有的提交,都是提交 poll 方法返回的所有消息的位移,poll 方法一次返回1000 條消息,則一次性地將這 1000 條消息的位移一併提交。可這樣一旦中間出現問題,位移沒有提交,下次會重新消費已經處理成功的數據。所以我想做到細粒度控制,比如每次提交100條,該怎麼辦?

    答:可以通過commitSync(Map<TopicPartition, OffsetAndMetadata>)commitAsync(Map<TopicPartition, OffsetAndMetadata>)對位移進行精確控制。

    2、poll和commit方法對於普通的開發人員而言是一個黑盒,無法精確地掌控其消費的具體位置。我都不知道這次的提交,是針對哪個partition,提交上去的offset是多少。

    答:可以通過record.topic()獲取topic信息, record.partition()獲取分區信息,record.offset() + 1獲取消費位移,記住消費位移是指示下一條消費的位移,所以要加一。

    3、我想自己管理offset怎麼辦?一方面更加保險,一方面下次重啟之後可以精準的從數據庫讀取最後的offset就不存在丟失和重複消費了。
    答:可以將消費位移保存在數據庫中。消費端程序使用comsumer.seek方法指定從某個位移開始消費。

    綜合以上幾個可優化點,並結合全文,可以給出一個比較完美且完整的demo:聯合異步提交和同步提交,對處理過程中所有的異常都進行了處理。細粒度的控制了消費位移的提交,並且保守的將消費位移記錄到了數據庫中,重新啟動消費端程序的時候會從數據庫讀取位移。這也是我們消費端程序位移提交的最佳實踐方案。你只要繼承這個抽象類,實現你具體的業務邏輯就可以了。

    public abstract class PrefectCosumer {
        private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
        int count = 0;
        public final void consume() {
            Properties properties = PropertiesConfig.getConsumerProperties();
            properties.put("group.id", getGroupId());
            Consumer<String, String> consumer = new KafkaConsumer<>(properties);
            consumer.subscribe(getTopics());
            consumer.poll(0);
            // 把offset記錄到數據庫中 從指定的offset處消費 
            consumer.partitionsFor(getTopics()).stream().map(info ->
            new TopicPartition(getTopics(), info.partition()))
            .forEach(tp -> {
                   consumer.seek(tp, JdbcUtils.queryOffset().get(tp.partition()));   
             });
            try {
                while (true) {
                    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
                    if (!consumerRecords.isEmpty()) {
                        for (ConsumerRecord<String, String> record : consumerRecords) {
    
                            KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                            boolean handleResult = handle(kafkaMessage);
                            if (handleResult) {
                                //注意:提交的是下一條消息的位移。所以OffsetAndMetadata 對象時,必須使用當前消息位移加 1。
                                offsets.put(new TopicPartition(record.topic(), record.partition()),
                                        new OffsetAndMetadata(record.offset() + 1));
    
                                // 細粒度控制提交 每10條提交一次offset
                                if (count % 10 == 0) {
                                    // 異步提交offset
                                    consumer.commitAsync(offsets, (offsets, exception) -> {
                                        if (exception != null) {
                                            handleException(exception);
                                        }
                                        // 將消費位移再記錄一份到數據庫中
                                        offsets.forEach((k, v) -> {
                                            String s = "insert into kafka_offset(`topic`,`group_id`,`partition_id`,`offset`) values" +
                                                    " ('" + k.topic() + "','" + getGroupId() + "'," + k.partition() + "," + v.offset() + ")" +
                                                    " on duplicate key update offset=values(offset);";
                                            JdbcUtils.insertTable(s);
                                        });
    
    
                                    });
                                }
                                count++;
                            } else {         
                                System.out.println("消費消息失敗 kafkaMessage={}" + getTopics() + getGroupId() + kafkaMessage.toString());                         
                            }
                        }
    
    
                    }
                }
            } catch (Exception e) {
                System.out.println("kafka consumer error:" + e.toString());
            } finally {
                try {
                    // 最後一次提交 使用同步提交offset
                    consumer.commitSync();
                } finally {
                    consumer.close();
                }
    
    
            }
        }
    
    
        /**
         * 具體的業務邏輯
         *
         * @param kafkaMessage
         * @return
         */
        public abstract boolean handle(KafkaMessage kafkaMessage);
    
        public abstract List<String> getTopics();
    
        public abstract String getGroupId();
    
        void handleException(Exception e) {
            //異常處理
        }
    }
    

    控制位移提交的N種方式

    剛剛我們說自己控制位移,使用seek方法可以指定offset消費。那到底怎麼控制位移?怎麼重設消費組位移?seek是什麼?現在就來仔細說說。

    並不是所有的消息隊列都可以重設消費者組位移達到重新消費的目的。比如傳統的RabbitMq,它們處理消息是一次性的,即一旦消息被成功消費,就會被刪除。而Kafka消費消息是可以重演的,因為它是基於日誌結構(log-based)的消息引擎,消費者在消費消息時,僅僅是從磁盤文件上讀取數據而已,所以消費者不會刪除消息數據。同時,由於位移數據是由消費者控制的,因此它能夠很容易地修改位移的值,實現重複消費歷史數據的功能。

    了解如何重設位移是很重要的。假設這麼一個場景,我已經消費了1000條消息后,我發現處理邏輯錯了,所以我需要重新消費一下,可是位移已經提交了,我到底該怎麼重新消費這1000條呢??假設我想從某個時間點開始消費,我又該如何處理呢?

    首先說個誤區:auto.offset.reset=earliest/latest這個參數大家都很熟悉,但是初學者很容易誤會它。大部分朋友都覺得在任何情況下把這兩個值設置為earliest或者latest ,消費者就可以從最早或者最新的offset開始消費,但實際上並不是那麼回事,他們生效都有一個前提條件,那就是對於同一個groupid的消費者,如果這個topic某個分區有已經提交的offset,那麼無論是把auto.offset.reset=earliest還是latest,都將失效,消費者會從已經提交的offset開始消費。因此這個參數並不能解決用戶想重設消費位移的需求。

    kafka有七種控制消費組消費offset的策略,主要分為位移維度和時間維度,包括:

    • 位移維度。這是指根據位移值來重設。也就是說,直接把消費者的位移值重設成我們給定的位移值。包括Earliest/Latest/Current/Specified-Offset/Shift-By-N策略

    • 時間維度。我們可以給定一個時間,讓消費者把位移調整成大於該時間的最小位移;也可以給出一段時間間隔,比如 30 分鐘前,然後讓消費者直接將位移調回 30 分鐘之前的位移值。包括DateTime和Duration策略

    說完了重設策略,我們就來看一下具體應該如何實現,可以從兩個角度,API方式和命令行方式。

    重設位移的方法之API方式

    API方式只要記住用seek方法就可以了,包括seek,seekToBeginning 和 seekToEnd。

    void seek(TopicPartition partition, long offset);    
    void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);    
    void seekToBeginning(Collection<TopicPartition> partitions);    
    void seekToEnd(Collection<TopicPartition> partitions);    
    

    從方法簽名我們可以看出seekToBeginningseekToEnd是可以一次性重設n個分區的位移,而seek 只允許重設指定分區的位移,即為每個分區都單獨設置位移,因為不難得出,如果要自定義每個分區的位移值則用seek,如果希望kafka幫你批量重設所有分區位移,比如從最新數據消費或者從最早數據消費,那麼用seekToEnd和seekToBeginning。

    Earliest 策略:從最早的數據開始消費

    從主題當前最早位移處開始消費,這個最早位移不一定就是 0 ,因為很久遠的消息會被 Kafka 自動刪除,主要取決於你的刪除配置。

    代碼如下:

    Properties properties = PropertiesConfig.getConsumerProperties();
    properties.put("group.id", getGroupId());
    Consumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(getTopics());
    consumer.poll(0);
    consumer.seekToBeginning(
    consumer.partitionsFor(getTopics()).stream().map(partitionInfo ->
       new TopicPartition(getTopics(), partitionInfo.partition()))
       .collect(Collectors.toList()));
    

    首先是構造consumer對象,這樣我們可以通過partitionsFor獲取到分區的信息,然後我們就可以構造出TopicPartition集合,傳給seekToBegining方法。需要注意的一個地方是:需要用consumer.poll(0),而不能用consumer.poll(Duration.ofMillis(0))

    在poll(0)中consumer會一直阻塞直到它成功獲取了所需的元數據信息,之後它才會發起fetch請求去獲取數據。而poll(Duration)會把元數據獲取也計入整個超時時間。由於本例中使用的是0,即瞬時超時,因此consumer根本無法在這麼短的時間內連接上coordinator,所以只能趕在超時前返回一個空集合。

    Latest策略:從最新的數據開始消費

        consumer.seekToEnd(
            consumer.partitionsFor(getTopics().get(0)).stream().map(partitionInfo ->
                new TopicPartition(getTopics().get(0), partitionInfo.partition()))
                  .collect(Collectors.toList()));
    
    

    Current策略:從當前已經提交的offset處消費

    consumer.partitionsFor(getTopics().get(0)).stream().map(info ->
            new TopicPartition(getTopics().get(0), info.partition()))
            .forEach(tp -> {
                long committedOffset = consumer.committed(tp).offset();
                consumer.seek(tp, committedOffset);
            });
    

    **Special-offset策略:從指定的offset處消費 **

    該策略使用的方法和current策略一樣,區別在於,current策略是直接從kafka元信息中讀取中已經提交的offset值,而special策略需要用戶自己為每一個分區指定offset值,我們一般是把offset記錄到數據庫中然後可以從數據庫去讀取這個值

        consumer.partitionsFor(getTopics().get(0)).stream().map(info ->
                    new TopicPartition(getTopics().get(0), info.partition()))
                    .forEach(tp -> {
                        try {
                            consumer.seek(tp, JdbcUtils.queryOffset().get(tp.partition()));
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }
                    });
    
    

    以上演示了用API方式重設位移,演示了四種常見策略的代碼,另外三種沒有演示,一方面是大同小異,另一方面在實際生產中,用API的方式不太可能去做時間維度的重設,而基本都是用命令行方式。

    重設位移的方法之命令行方式

    命令行方式重設位移是通過 kafka-consumer-groups 腳本。比起 API 的方式,用命令行重設位移要簡單得多。

    Earliest 策略指定–to-earliest。

    bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
    

    Latest 策略指定–to-latest。

    bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
    

    Current 策略指定–to-current。

    bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
    

    Specified-Offset 策略指定–to-offset。

    bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute
    

    Shift-By-N 策略指定–shift-by N。

    bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute
    

    DateTime 策略指定–to-datetime。

    DateTime 允許你指定一個時間,然後將位移重置到該時間之後的最早位移處。常見的使用場景是,你想重新消費昨天的數據,那麼你可以使用該策略重設位移到昨天 0 點。

    bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute
    

    Duration 策略指定–by-duration。
    Duration 策略則是指給定相對的時間間隔,然後將位移調整到距離當前給定時間間隔的位移處,具體格式是 PnDTnHnMnS。如果你熟悉 Java 8 引入的 Duration 類的話,你應該不會對這個格式感到陌生。它就是一個符合 ISO-8601 規範的 Duration 格式,以字母 P 開頭,後面由 4 部分組成,即 D、H、M 和 S,分別表示天、小時、分鐘和秒。舉個例子,如果你想將位移調回到 15 分鐘前,那麼你就可以指定 PT0H15M0S

    bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute
    

    提交的位移都去哪了?

    通過上面那幾部分的內容,我們已經搞懂了位移提交的方方面面,那麼提交的位移它保存在哪裡呢?這就要去位移主題的的世界里一探究竟了。kafka把位移保存在一個叫做__consumer_offsets的內部主題中,叫做位移主題。

    注意:老版本的kafka其實是把位移保存在zookeeper中的,但是zookeeper並不適合這種高頻寫的場景。所以新版本已經是改進了這個方案,直接保存到kafka。畢竟kafka本身就適合高頻寫的場景,並且kafka也可以保證高可用性和高持久性。

    既然它也是主題,那麼離不開分區和副本這兩個機制。我們並沒有手動創建這個主題並且指定,所以是kafka自動創建的, 分區的數量取決於Broker 端參數 offsets.topic.num.partitions,默認是50個分區,而副本參數取決於offsets.topic.replication.factor,默認是3。

    既然也是主題,肯定會有消息,那麼消息格式是什麼呢?參考前面我們手動設計將位移寫入數據庫的方案,我們保存了topic,group_id,partition,offset四個字段。topic,group_id,partition無疑是數據表中的聯合主鍵,而offset是不斷更新的。無疑kafka的位移主題消息也是類似這種設計。key也是那三個字段,而消息體其實很複雜,你可以先簡單理解為就是offset。

    既然也是主題,肯定也會有刪除策略,否則消息會無限膨脹。但是位移主題的刪除策略和其他主題刪除策略又不太一樣。我們知道普通主題的刪除是可以通過配置刪除時間或者大小的。而位移主題的刪除,叫做 Compaction。Kafka 使用Compact 策略來刪除位移主題中的過期消息,對於同一個 Key 的兩條消息 M1 和 M2,如果 M1 的發送時間早於 M2,那麼 M1 就是過期消息。Compact 的過程就是掃描日誌的所有消息,剔除那些過期的消息,然後把剩下的消息整理在一起。

    Kafka 提供了專門的後台線程定期地巡檢待 Compact 的主題,看看是否存在滿足條件的可刪除數據。這個後台線程叫 Log Cleaner。很多實際生產環境中都出現過位移主題無限膨脹佔用過多磁盤空間的問題,如果你的環境中也有這個問題,我建議你去檢查一下 Log Cleaner 線程的狀態,通常都是這個線程掛掉了導致的。

    總結

    kafka的位移是個極其重要的概念,控制着消費進度,也即控制着消費的準確性,完整性,為了保證消息不重複和不丟失。我們最好做到以下幾點:

    • 手動提交位移。

    • 手動提交有異步提交和同步提交兩種方式,既然兩者有利也有弊,那麼我們可以結合起來使用。

    • 細粒度的控制消費位移的提交,這樣可以避免重複消費的問題。

    • 保守的將消費位移再記錄到了數據庫中,重新啟動消費端程序的時候從數據庫讀取位移。

    獲取Kafka全套原創學習資料及思維導圖,關注【胖滾豬學編程】公眾號,回復”kafka”。

    本文來源於公眾號:【胖滾豬學編程】。一枚集顏值與才華於一身,不算聰明卻足夠努力的女程序媛。用漫畫形式讓編程so easy and interesting!求關注!

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※Google地圖已可更新顯示潭子電動車充電站設置地點!!

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※別再煩惱如何寫文案,掌握八大原則!

    網頁設計最專業,超強功能平台可客製化

  • VSCode + WSL 2 + Ruby環境搭建詳解

    VSCode + WSL 2 + Ruby環境搭建詳解

    vscode配置ruby開發環境

    vscode近年來發展迅速,幾乎在3年之間就搶佔了原來vim、sublime text的很多份額,猶記得在2015-2016年的時候,ruby推薦的開發環境基本上都是vim和sublime text,然而,隨着vscode的發展,vscode下ruby的開發體驗已經非常不錯。現在基本上使用win 10 wsl2 + vscode + windows terminal的體驗已經不遜於mac + vim (sublime) + item 2的體驗了

    總體步驟

    使用win10專業版配置ruby開發環境大致分為以下幾步:

    1. 開啟win10 wsl功能
    2. 升級wsl2
    3. 安裝ubuntu
    4. 安裝ruby(rvm)
    5. 安裝vscode
    6. 安裝vscode wsl擴展
    7. 安裝vscode ruby相關擴展

    經過以上7步就可以開始愉悅的ruby開發了,再開始之前,可以先看個效果圖。

    1. 開啟win10 wsl功能

    ruby對Linux和Mac比較友好,在windows下很多第三方庫要配合mingw或msys2才能安裝,不過好在windows 10提供了Linux子系統,在win10 2004版本中wsl也升級到了wsl2,速度更快,功能更完善。

    要使用wsl2需要先在控制面板中開啟wsl功能:

    • 適用於Linux的Windows子系統
    • 虛擬機平台

    2. 升級wsl2

    目前wsl2還需要安裝一個內核升級包,具體可參考微軟說明:

    • wsl2安裝說明
    • wsl2 update包

    更新包安裝完成后,輸入命令

    wsl --set-default-version 2
    

    3. 安裝Ubuntu

    在微軟應用商店安裝Ubuntu,當前Ubuntu版本為20.04 LTS

    安裝完成以後,配置Ubuntu默認為wsl2

    # 查看
    wsl --list --verbose
    
    # 設置
    wsl --set-version Ubuntu 2
    

    4. 安裝ruby

    在Linux下安裝ruby有多種方法,比較主流的方法是RVM,不過為了簡單起見,我直接通過ubuntu的apt工具進行了安裝。

    關於RVM的安裝可參考如下網站:

    • RVM官網
    • RVM實用指南

    通過APT安裝,輸入下列命令即可

    sudo apt install ruby ruby-dev ri ruby-bundle
    

    安裝完成以後需要配置gem國內鏡像,參考如下網址:

    • gem中文鏡像

    輸入下列命令

    # 設置gem source
    gem sources --add https://gems.ruby-china.com/ --remove https://rubygems.org/
    
    # 查看gem source
    gem sources -l
    
    # 設置bundle
    bundle config mirror.https://rubygems.org https://gems.ruby-china.com
    

    5. 安裝vscode

    vscode直接在官網下載安裝即可,這裏我選擇了System Installer

    • vscode官網下載頁面

    6. 安裝vscode wsl擴展

    vscode安裝完成以後,可以在plugin中找到Remote – WSL擴展,點擊安裝即可

    7. 安裝vscode ruby相關擴展

    直接在plugin中搜索ruby在wsl中安裝下列五個擴展即可

    • Peng Lv/Ruby
    • Castwide/Ruby Solargraph(Language Server)
    • misogi/ruby-rubocop(Lint)
    • Simple Ruby ERB
    • endwise

    其中,ruby solargraphrubocop除了安裝擴展,還需要通過gem安裝第三方包

    sudo gem install rubocop
    sudo gem install solargraph
    

    重新加載vscode-wsl就可以愉快的使用ruby language進行開發了

    vscode使用

    在使用上基本只要require了相應的庫,就solargraph就會對require的庫中涉及的類和模塊進行提示,非常方便。唯一有問題的地方就是require的時候沒有提示,這可能就需要自己記一下庫的名稱,不過相比於原來已經好太多了,應該說在可以接受的範圍內。

    1. 如果安裝了新的第三方庫會提示嗎?

    如果安裝了sinatra這樣的庫,vscode-ruby如何給出提示呢?只需要Ctrl + Shift + P,選擇solargraph: build new gem documention即可

    2. rubocop如何使用?

    rubocop是一個Ruby Lint工具,可以進行Ruby代碼風格檢查,並能夠自動修復,只需要Ctrl + Shift + P,選擇Ruby: autocorrect by rubocop即可

    3. 常用類型註釋

    ruby是動態強類型語言,由於不需要指定函數返回值類型,這導致IDE無法自動推斷一些變量的類型。目前Python、PHP、TypeScript都在不斷的強化類型以方便IDE進行靜態檢查。IDE只有在知道類型的情況下才能準確地進行智能提示。

    在ruby 2當中,我們可以通過類型註釋的方式增強IDE推斷能力。常見的類型註釋可參考YARD項目

    下面代碼給出了一些示例。

    require 'socket'
    
    server = TCPServer.new 2000
    loop do
      # 代碼塊參數類型註釋
      # @param {TCPSocket} client
      Thread.start(server.accept) do |client|
        client.puts 'hello !'
        client.puts "Time is #{Time.now}"
        client.close
      end
    end
    
    server = TCPServer.new 2000
    loop do
      # 變量註釋
      # @type {TCPSocket} client
      client = server.accept
    end
    
    # 函數參數和返回值註釋,數組類型
    # @param {Array(Integer)} nums
    # @param {Integer} target
    # @return {Array(Integer)}
    def two_sum(nums, target)
      hash_nums = {}
      result = []
      nums.each_with_index do |num, index|
        hash_nums[num] = index
      end
    
      nums.each_with_index do |num, index|
        another = target - num
        if hash_nums[another] && hash_nums[another] != index
          result.push(index, hash_nums[another])
          break
        end
      end
    
      result
    end
    

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    南投搬家公司費用需注意的眉眉角角,別等搬了再說!

    ※教你寫出一流的銷售文案?

  • SpringMVC-攔截器

    SpringMVC-攔截器

    概述

    Java 里的攔截器是動態攔截 action 調用的對象。

    可以在Controller 中的方法執行之前與執行之後,及頁面显示完畢后,執行指定的方法,自定義的攔截器必須實現HandlerInterceptor 接口。

    方法介紹

    preHandle

    在業務處理器處理請求之前被調用

    postHandle

    在業務處理器處理完請求后

    afterCompletion

    在 DispatcherServlet 完全處理完請求后被調用

    SpringMVC攔截器使用

    攔截所有請求

    創建一個類實現 HandlerInterceptor 接口

     

     配置文件當中添加攔截器

     

     內部源碼分析

     

     

     

     

    攔截指定請求

     

     

    配置多個攔截器執行順序

     

     

     

     

     

     

     

    第 2 個返回 false

     

     

     

     

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※想知道最厲害的網頁設計公司"嚨底家"!

    ※別再煩惱如何寫文案,掌握八大原則!

    ※產品缺大量曝光嗎?你需要的是一流包裝設計!

  • salesforce零基礎學習(九十八)Type淺談

    salesforce零基礎學習(九十八)Type淺談

    在Salesforce的世界,凡事皆Metadata。

    先通過一句經常使用的代碼帶入一下:

    Account accountItem = (Account)JSON.deserialize(accountString,Account.class);

    這種代碼相信大部分開發都會寫過,前台將數據序列化,通過字符串的形參傳遞給後台,後台將數據進行反序列化,從而獲取到這個表或者這個自定義類的實例。所以問題來了,為啥第二個參數是 Account.class?我們通過官方的API描述可能更好的進行了解。

     這裏我們引出了 Type的概念,他是apex 預定的類型,包括 基礎數據類型(Integer等) , 集合, sObject類型以及 用戶定義的類。基礎數據類型等等都是 object類型,所以當我們理解salesforce裏面的類型時,可以簡單的分成兩個大類:Object & sObject。所以Type概念引入完了,它用來幹嘛?怎麼聲明?什麼時候用呢?

    Type t1 = Integer.class;
    Type t2 = Type.forName('Integer');
    system.debug(t1.equals(t2));

    上面的簡單的demo中提供了兩種聲明Type的方式,一種是根據 object | sObject使用 .class聲明,另外一種是使用 Type的方法forName來實例化變量。既然變量可以聲明出來,我們就可以看看他的方法考慮如何搞事情了。

     Type的方法非常少,所以我們想要查看其對應的方法描述以及使用很容易就看完。這裏針對幾個重要的方法進行描述:

    • forName(fullyQualifiedName):返回與指定的完全限定的類名相對應的類型。這裏的類名包括salesforce系統預製的類,自定義的類以及sObject名稱;
    • isAssignableFrom(sourceType):如果object指定類型的引用可以從子類型分配,則返回true,否則返回false。這個方法我們可能首先會先想到 instanceof,因為都是來判斷類型是否相兼容匹配,但是 instanceof必須是初始化這個類或者對象的變量,才能使用 instanceof 來進行判斷,使用 此方法可以省去了靜態編譯時的依賴,所以如果想判斷依賴性,可以優先考慮此方法。
    • newInstance():此方法用來實例化一個指定Type的對象,返回類型是一個object對象,我們可以強制轉換成我們需要的對象類型。因為apex也是面向對象的語言,封裝,繼承,多態三大特性,我們可以通過 newInstance實現父子類型的輕鬆轉換調用相關的方法從而實現動態配置。

    基礎信息介紹完畢,此篇淺入淺出,介紹兩種可能用到的場景。

    1.  JSON序列化與反序列化

    這個我們經常使用,一筆帶過:通過字符串以及指定的 Type類型可以轉換成指定的數據類型。

    Account accountItem = (Account)JSON.deserialize(accountString,Account.class);

    2. 針對Custom Setting等根據配置的動態的類調用動態方法的使用

    ParentClass是一個父類,有兩個變量以及一個虛擬的方法,繼承的子類可以進行實現

    public abstract class ParentClass {
        public String param1 { get; set; }
        public String param2 { get; set; }
    
        public ParentClass() {
            this.param1 = 'value1';
            this.param2 = 'value2';
        }
    
        public virtual String joinParam() {
            return param1 + param2;
        }
    }

    SonClass1繼承了它並實現了它相關的方法

    public class SonClass1 extends ParentClass {
        public SonClass1() {
            super();
            this.param1 = 'son value1';
            this.param2 = 'son value2';
        }
    
        public override String joinParam() {
            return super.joinParam();
        }
    }

    還有其他的SonClassn繼承了它並實現了它的相關的方法,我們在custom setting中配置了不同的場景應該調用的不同的子類,公共方法中,我們只需要使用以下的代碼來判斷和調用即可。

    public String executeJoin(String className) {
        Type t = Type.forName(className);
        Type t2 = Type.forName('ParentClass');
        if(!t2.isAssignableFrom(t)) {
            throw new CustomException('should be son class of ParentClass');
        }
        ParentClass newObj = (ParentClass)t.newInstance();
        return newObj.joinParam();
    }

    總結:篇中簡單的介紹了salesforce中的 Type的使用,拋磚引玉,想要深入了解還要自行查看官方文檔。篇中有錯誤的地方歡迎指出,有不懂的歡迎留言。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※別再煩惱如何寫文案,掌握八大原則!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

    FB行銷專家,教你從零開始的技巧

  • Nginx服務器的使用與反向代理負載均衡

    Nginx服務器的使用與反向代理負載均衡

    Nginx服務器

    一:什麼是Nginx?

    我們生活的世界中,有的時候需要上網。我們可以瀏覽很多很多的網頁,這些網頁都是由一系列的程序組成,但是我們是否想過,這些程序存儲在什麼地方呢?沒錯,這些程序都是存儲在一種名叫服務器的硬件上,比如我們的電腦也是一種服務器,只不過我們的個人電腦作為服務器的話性能會比較低。我們的網頁程序存儲在服務器硬件上,是否可以隨意存儲呢?不是的,我們需要在服務器硬件的操作系統中搭建一個服務器軟件,那麼這樣,有服務器軟件跟服務器硬件配合,才形成一個完整的服務器。服務器軟件有非常多,比如Apache、tomcat等等都是服務器軟件,而我們今天要學習的Nginx也是一種服務器軟件之一。

    Nginx是一種服務器軟件,故而其最主要、最基本的功能當然是可以與服務器硬件結合,讓程序員可以將程序放在Nginx服務器上,將程序發布出去,讓成千上萬的網民可以瀏覽。除此之外,Nginx是一種高性能的HTTP和反向代理服務器,同時也是一個代理郵件服務器。也就是說,我們Nginx上可以發布網站,也可以實現負載均衡的功能,還可以作為郵件服務器實現收發郵件等功能。所謂的負載均衡是指,當同時有N多用戶訪問我們服務器的時候,為了減少服務器壓力,我們需要將用戶分別引入各服務器,分擔服務器的壓力。

    Nginx與其他服努器的性能比較

    首先說IIS, IIS服務器只能在Windows上運行,Windows服務器性能不如Linux— 類服務器。其次說Tomcat,Tomcat服務器面向的是Java語言,是一種重量級的服 務器,而Nginx是輕量級服務器,Tomcat與Nginx不具備可比性。最後,我們講一 下Apache,Apache優點非常多,比如穩定、幵源、跨平台等等,但是Apache不支 持高併發。Nginx能支持處理百萬級的TCP連接,10萬以上的併發連接,並且是一 個很好的跨平台服務器。

    Nginx的主要優點有可以實現高併發、部署簡單、內存消耗少、成本低等等。

    Nginx的主要缺點有rewrite功能不夠強大,模塊沒有Apache的多。

    本文版權歸微信公眾號”代碼藝術”(ID:onblog)所有,若是轉載請務必保留本段原創聲明,違者必究。若是文章有不足之處,歡迎關注微信公眾號私信與我進行交流!

    二:Linux中搭建Nginx服務器

    新建壓縮包下載位置(可選)

    新建目錄
    mkdir /usr/local/nginx_down
    切換目錄
    cd /usr/local/nginx_down
    

    下載解壓 Nginx

    下載
    wget http://nginx.org/download/nginx-1.14.0.tar.gz
    解壓
    tar -zxvf nginx-1.14.0.tar.gz
    切換目錄
    cd nginx-1.14.0
    

    配置 Nginx

    ./configure --with-http_ssl_module
    
    1. 這樣會默認安裝nginx在 /usr/local/nginx 目錄,可以使用--prefix=/usr/local/nginx指定安裝位置。

    2. 如果需要HTTPS(SSL)的支持,需要指定參數--with-http_ssl_module

    如果提示錯誤,那麼需要其它環境,請參考下面

    安裝 make

    yum -y install gcc automake autoconf libtool make
    

    安裝 g++

    yum -y install gcc gcc-c++
    

    安裝 PCRE庫

    yum -y install pcre pcre-devel
    

    安裝 Zlib

    yum -y install zlib zlib-devel
    

    安裝 GD library

    yum -y install gd-devel
    

    安裝 openssl

    yum -y install openssl openssl-devel
    

    -y:跳過所有手動確認輸入

    如果./configure安裝成功,只需要再執行兩個命令:

    make
    
    make install
    

    查看是否安裝成功

    cd /usr/local/nginx
    

    如果安裝成功,則會出現下列目錄:

    conf  html  logs  sbin
    

    切換到sbin目錄

    cd /sbin
    

    啟動程序

    ./nginx
    

    追加參數 -c 可以指定配置文件。

    常見的錯誤

    在Linux操作系統下搭建Nginx服務器,很多時候會出現不同的錯誤,在此,我們對搭建過程中出現的錯誤進行一些總結。主要有這些類型:

    防火牆問題,缺少gc++,缺少pcre、zlib等庫。

    三:Nginx的反向代理和負載均衡

    什麼是反向代理

    我們有時候,用自己的計算機A想訪問國外的某個網站B,但是訪問不了,此時,有一台中間服務器C可以訪問國外的網站B,那麼,我們可以用自己的電腦訪問服務器C,通過C來訪問B這個網站。那麼這個時候,服務器C稱為代理服務器,這種訪問方式叫做正向代理。正向代理有一個特點,就是我們明確知道要訪問哪個網站。再如,當我們有一個服務器集中,並且服務器集群中的每台服務器的內容一樣的時候,同樣我們要直接從個人電腦訪問到服務器集中的服務器的時候無法訪問,且此時第三方服務器能訪問集群,這個時候,我捫通過第三方服務器訪問服務器集群的內容,但是此吋我們並不知道是哪一台服務器提供的內容,此時的代理方式稱為反向代理。

    正向代理

    反向代理

    什麼是負載均衡

    當一台服務器的單位時間內的訪問量越大的時候,服務器的壓力會越大。當一台服務器壓力大得超過自身的承受能力的時候,服務器會崩潰。為了避免服務器崩潰,讓用戶有更好地體驗,我們通常通過負載均衡的方式來分擔服務器的壓力。那麼什麼是負載均衡呢?是這樣,我們可以建立很多很多個服務器,這些服務器組成一個服務器集群,然後,當用戶訪問我們網站的時候,先訪問一个中間服務器,再讓這个中間服務器在服務器集群中選擇一個壓力較小的服務器,然後將該訪問請求引入該選擇的服務器。這樣,用戶的每次訪問,都會保證服務器集群中的每個服務器的壓力趨於平衡,分擔了服務器壓力,避免了服務器崩潰的情況。

    基於反向代理的原理實現負載均衡

    四:Nginx負載均衡的實現

    Nginx 是一款可以通過反向代理實現負載均衡的服務器,使用 Nginx 服務實現負載均衡的時候,用戶的訪問首先會訪問到 Nginx 服務器,然後 Nginx 服務器再從服務器集群表中選擇壓力較小的服務器,然後將該訪問請求引向該服務器。若服務器集群中的某個服務器崩潰,那麼從待選服務器列表中將該服務器刪除,也就是說一個服務器假如崩潰了,那麼 Nginx 就肯定不會將訪問請求引入該服務器了。那麼下面,我們通過實例來講解一下 Nginx 負載均衡的實現。

    負載均衡配置文件

    默認配置文件nginx.conf 很重要,我們一般是新建一個配置文件,在啟動時指定加載。

    [root@hostname conf]# ls  //查看目錄
    fastcgi.conf            koi-win             scgi_params
    fastcgi.conf.default    mime.types          scgi_params.default
    fastcgi_params          mime.types.default  uwsgi_params
    fastcgi_params.default  nginx.conf          uwsgi_params.default
    koi-utf                 nginx.conf.default  win-utf
    [root@hostname conf]# touch fzjh.conf  //新建負載均衡配置文件
    

    編輯fzjh.conf配置文件

    vi fzjh.conf
    
    worker_processes  1;#工作進程的個數,一般與計算機的cpu核數一致  
      
    events {  
        worker_connections  1024;#單個進程最大連接數(最大連接數=連接數*進程數) 併發 
    }  
      
    http {  
        include       mime.types; #文件擴展名與文件類型映射表  
        default_type  application/octet-stream;#默認文件類型  
      
        sendfile        on;#開啟高效文件傳輸模式,sendfile指令指定nginx是否調用sendfile函數來輸出文件,對於普通應用設為 on,如果用來進行下載等應用磁盤IO重負載應用,可設置為off,以平衡磁盤與網絡I/O處理速度,降低系統的負載。注意:如果圖片显示不正常把這個改成off。  
          
        keepalive_timeout  65; #長連接超時時間,單位是秒  
      
        gzip  on;#啟用Gizp壓縮  
          
        #服務器的集群  
        upstream  myproject {  #服務器集群名字   
            server    220.181.111.188:80  weight=1;#服務器配置   weight是權重的意思,權重越大,分配的概率越大。  
            server    42.121.252.58:80  weight=2;  
        }     
      
        #當前的Nginx的配置  
        server {  
            listen       80;  #監聽80端口,可以改成其他端口  
            server_name  localhost; ##############   當前服務的域名  
      
            location / {  ##配置路徑/下實現負載均衡
                proxy_pass http://myproject;  ##對應哪個服務器集群
                proxy_redirect default;  
            }  
              
            error_page   500 502 503 504  /50x.html;  
            location = /50x.html {  
               root   html;  #根目錄
            }  
        }  
    }  
    

    我們在nginx目錄下執行命令,啟動負載均衡

    ./sbin/nginx -c ./conf/fzjh.conf
    

    然後嘗試訪問你的服務器,多訪問幾次,你會發現它會訪問2次csdn網站,1次百度。說明我們的負載均衡已經部署完畢。

    如何重啟Nginx

    【本文版權歸微信公眾號”代碼藝術”(ID:onblog)所有,若是轉載請務必保留本段原創聲明,違者必究。若是文章有不足之處,歡迎關注微信公眾號私信與我進行交流!】

    ./sbin/nginx -s reload
    

    如何關閉Nginx服務器

    1.查看nginx進程號

    ps -ef|grep nginx
    

    2.kill掉進程即可 (1709是第二列的進程號)

    kill 1709
    

    或者

    killall -9 nginx
    

    五:HTTP Upstream 模塊

    HTTP Upstream 模塊

    Upstream 模塊是 Nginx 服務器的一個重要模塊。 Upstream 模塊實現在輪詢和客戶端 iP 之間實現後端的負載均衡。常用的指令有 ip_hash指令、 server 指令和 upstream 指令等,下面我們分別來講一下。

    Http Upstream模塊- ip_hash指令

    在負載均衡系統中,假如用戶在某台服務器上登錄,那麼如果該用戶第二次請求的時候,因為我們是負載均衡系統,每次請求都會重新定位到服務器集群中的一個服務器,那麼此時如果將已經登錄服務器A的用戶再定位到其他服務器,顯然不妥。故而,我們可以採用 ip_hash指令解決這個問題,如果客戶端請求已經訪問了服務器A並登錄,那麼第二次請求的時候,會將該請求通過哈希算法自動定位到該後端服務器中。下面我們通過實例講解。

    實例

    此時不應該使用weight權重。

    
    tp {  
       ....
      upstream  myproject {    
          ip_hash; #實現會話跟蹤
          server 140.205.140.234;
          server 61.135.169.125;
    
      }     
     ....
    
    

    Http Upstream 模塊一 upstream 指令及相關變量

    upstream 指令主要是用於設置一組可以在 proxy_pass 和 fastcgi_pass 指令中使用代理服務器,默認負載均衡方式為輪詢。

    六:其他負載均衡實現方式

    負載均衡的實現方法除了可以使用 Nginx服務器實現外,還可以通過很多種方法來實現。負載均衡的核心就是建立一個服務器集群,然後用戶首先訪問到第三方代理服務器,然後由代理服務器選擇一個集群中的服務器,然後將請求引入選定的服務器。那麼代理服務器可以使用多種方式來充當,故而實現負載均衡的方式也是多種。總的來說,負載均衡實現的方式分為軟件實現和硬件實現兩種,如果中間的代理機構是硬件,那麼就是通過硬件設備來實現負載均衡的方式,如果中間的代理機構為軟件,就是軟件實現負載均衡的方式。而其中,軟件又可以是服務器軟件、系統軟件以及應用軟件等充當。

    負載均衡實現方式小結

    下面我們簡單總結一下負載均衡不同實現方式的優缺點:

    假如使用硬件的方式實現負載均衡,那麼中間的轉發機構就是硬件,這個時候運行的效率非常高,但是對應的成本也非常高。如果我們採用軟件的方式來實現負載均衡,那麼中間的轉發機構就是軟件,這個時候,運行效率不如硬件,但是成本相對來說低得多。而使用Nginx服務器實現負載均衡,那麼就是通過軟件的方式來實現負載均衡,並且 Nginx本身支持高併發等。故而使用 Nginx服務器實現負載均衡,能大大節約企業的成本,並且由於 Nginx是服務器軟件,其執行效率也是非常高。

    七:location匹配順序

    1. “=”前綴指令匹配,如果匹配成功,則停止其他匹配
    2. 普通字符串指令匹配,順序是從長到短,匹配成功的location如果使用^~,則停止其他匹配(正則匹配)
    3. 正則表達式指令匹配,按照配置文件里的順序,成功就停止其他匹配
    4. 如果第三步中有匹配成功,則使用該結果,否則使用第二步結果

    注意點

    1. 匹配的順序是先匹配普通字符串,然後再匹配正則表達式。另外普通字符串匹配順序是根據配置中字符長度從長到短,也就是說使用普通字符串配置的location順序是無關緊要的,反正最後nginx會根據配置的長短來進行匹配,但是需要注意的是正則表達式按照配置文件里的順序測試。找到第一個比配的正則表達式將停止搜索。
    2. 一般情況下,匹配成功了普通字符串location后還會進行正則表達式location匹配。有兩種方法改變這種行為,其一就是使用“=”前綴,這時執行的是嚴格匹配,並且匹配成功后立即停止其他匹配,同時處理這個請求;另外一種就是使用“^~”前綴,如果把這個前綴用於一個常規字符串那麼告訴nginx 如果路徑匹配那麼不測試正則表達式。

    匹配模式及順序

      location = /uri    =開頭表示精確匹配,只有完全匹配上才能生效。

      location ^~ /uri   ^~ 開頭對URL路徑進行前綴匹配,並且在正則之前。

      location ~ pattern  ~開頭表示區分大小寫的正則匹配。

      location ~* pattern  ~*開頭表示不區分大小寫的正則匹配。

      location /uri     不帶任何修飾符,也表示前綴匹配,但是在正則匹配之後。

      location /      通用匹配,任何未匹配到其它location的請求都會匹配到,相當於switch中的default。

    八:配置HTTPS

    1.獲取證書

    獲得SSL證書文件 1_www.domain.com_bundle.crt 和私鑰文件 2_www.domain.com.key

    2.證書安裝

    將域名 www.domain.com 的證書文件1_www.domain.com_bundle.crt 、私鑰文件2_www.domain.com.key保存到同一個目錄,例如/usr/local/nginx/conf目錄下。

    3.配置conf

    打開nginx.conf文件,找到nginx.conf的下段配置內容:

    # HTTPS server
    #
    #server {
    #    listen       443 ssl;
    #    server_name  localhost;
    
    #    ssl_certificate      cert.pem;
    #    ssl_certificate_key  cert.key;
    
    #    ssl_session_cache    shared:SSL:1m;
    #    ssl_session_timeout  5m;
    
    #    ssl_ciphers  HIGH:!aNULL:!MD5;
    #    ssl_prefer_server_ciphers  on;
    
    #    location / {
    #        root   html;
    #        index  index.html index.htm;
    #    }
    #}
    

    打開註釋,修改server_name為綁定證書的域名(如:www.domain.com),修改ssl_certificate 為 1_www.domain.com_bundle.crt,修改 ssl_certificate_key 為 2_www.domain.com.key 即可。

    4.HTTP自動跳轉HTTPS

    對於用戶不知道網站可以進行https訪問的情況下,讓服務器自動把http的請求重定向到https。 在服務器這邊的話配置的話,可以在頁面里加js腳本,也可以在後端程序里寫重定向,當然也可以在web服務器來實現跳轉。

    Nginx是支持rewrite的(只要在編譯的時候沒有去掉pcre) 在http的server里增加rewrite ^(.*) https://$host$1 permanent; 這樣就可以實現80進來的請求,重定向為https了。

    還是在此配置文件中,加入下面一句:

    server {
            listen       80;
            server_name  localhost;
            
    		rewrite ^(.*) https://$host$1 permanent;
    		...
    

    版權聲明

    本文版權歸微信公眾號”代碼藝術”(ID:onblog)所有,若是轉載請務必保留本段原創聲明,違者必究。若是文章有不足之處,歡迎關注微信公眾號私信與我進行交流!

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※別再煩惱如何寫文案,掌握八大原則!

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※超省錢租車方案

    ※教你寫出一流的銷售文案?

    網頁設計最專業,超強功能平台可客製化

  • drf之框架基礎

    drf之框架基礎

    (一)drf基礎

    全稱:django-rest framework

    接口:什麼是接口、restful接口規範(協議)

    CBV(基於FBV的基礎上形成)、CBV生命周期源碼—-基於restful規範下的CBV接口

    請求生命周期:請求組件、解析組件、響應組件

    序列化組件(序列化、反序列化簡單來說就是對象轉為字符串、字符串轉為對象,目的是為傳輸數據(傳給別的語言或者存儲))

    三大認證(重中之重):認證(用戶是否合法)、權限(管理員、普通用戶)、頻率(次數過多限制)

    其他組件(過濾、篩選、排序、分頁、路由)

     

    1.接口

    概念:聯繫兩個物質的媒介,完成信息的交互。在web程序中,聯繫前台頁面後台數據庫的媒介。

    web接口組成:

    url:長得像返回數據的url鏈接。如api.baidu.map/search

    www.baidu.com不叫接口,叫url鏈接,訪問只能拿到主頁。api.baidu.map/search是接口,返回的是json數據)

    請求參數:前台按照指定的key提供數據給後台。(必須是給定的,這樣後台才能以此提取數據再到數據庫查詢返回)

    響應數據:後台與數據庫交互后,將數據反饋給前台。

    因此,接口就是帶着請求參數訪問能夠得到響應數據的url鏈接。

    接口 = url + 請求參數 + 響應數據

     

     

    2.restful接口規範

    接口規範:不同後台語言,使用同樣的接口返回同樣的數據。

    如何寫接口:要寫url和響應數據。如果將請求參數也加上,就是在寫接口文檔。

     

    兩大部分:

    1url

    1)用api關鍵字標識接口url。方式1api.baidu.com;方式2:www.baudu.com/api

    2)接口數據安全性決定優先選用https協議

    3)如果一個接口有多版本存在,需要在url中標識體現。如下的v1v2

    api.baidu.com/v1/….  api.baidu.com/v2/….

    4)操作中的數據稱為資源,在url中資源一般採用複數形式,一個接口可以概括對該資源的多種操作方式。(一個url對應一個類,類裏面可以有多個請求方法)

    可以對操作隱藏,並且復用性更強(寫動作了,只能適用這一個動作,不寫其他動作都可以用)如api.baidu.com/books    api.baidu.com/books/(pk)

    5)請求方式有多種,用一個url處理如何讓保證不混亂——通過不同的請求方式標識不同操作資源的方式

    /books      get     獲取所有

    /books post   增加一個(多個)

    /books/(pk) delete 刪除一個

    /books/(pk)  put 整體更新一個  #改一個用戶

    /books/(pk)  patch 局部更新一個 #改一個用戶的密碼

     

    6)資源往往涉及數據的各種操作方式:篩選、排序、限制

    api.baidu.com/books/?search=寶馬&ordering=-price&limit=3

     

    2)響應數據

    1http請求的響應會有響應狀態碼,接口用來返回操作的資源數據,也有自己操作數據結果的資源狀態碼status 0代表操作資源成功,1代表操作失敗,2代表操作成功,但沒匹配結果)

    注:資源狀態碼和http狀態碼不一樣,為前後台的約定

    2)資源狀態碼的文字提示。

    status ok “賬號有誤或者密碼有誤”

    3)資源本身

    results

    :刪除資源成功不做任何數據返回(只返回空字符串,連狀態碼、狀態信息都不返回)

    4)不能直接返回的資源(子資源、圖片、視頻等資源),返回該資源的url鏈接。

     

    https://api.baidu.com/v1/books?limit=3
    get|post|delete|put|patch   
    {
      “status” : 0,
      “msg” : “ok”,
      “results”: [
    {
      “title”: “三國”,
      “price”: 78,
      “img”: “https://.....”   
        }
      ]
    }

     

    3.django流程

    1)項目準備:

     

    1.分發路由

    在項目文件夾的urls複製一份到應用文件夾中。然後在項目文件夾的urls分發路由給app:導入include,然後url(r’^api/’, include(‘api.urls’))。再在app文件夾的urls.py中分發路由給CBV

    2.視圖

    在應用中分發路由前,先寫類視圖

    from django.http import JsonResponse
    from django.views import View
    
    class Book(View):
    
        def get(self, request, *args, **kwargs):
            return JsonResponse('get ok', safe=False)
    
        def post(self, request, *args, **kwargs):
            return JsonResponse('get ok', safe=False)   #safe默認為true,要返回字典。不是字典否則拋異常。

     

    3.在應用urls下分發路由

    from django.conf.urls import url
    from . import views    #注意在應用中導入視圖都是.   從當前應用中導入
    
    urlpatterns = [
        url(r'^books/', views.Book.as_view()),
    ]

     

    4.定義模型類

    1models.py中定義類

    from django.db import models
    
    class Book(models.Model):
    
        title = models.CharField(max_length=64)
        price = models.DecimalField(max_digits=5, decimal_places=2) #整數、小數位
    
        class Meta:    #嵌套類(給上級類添加功能或指定標準)
            
           db_table = 'book'    #自定義數據庫表名
            verbose_name = "book"   #給模型起個可讀的名字,默認是複數
            verbose_name_plural = verbose_name   #取消上面的複數
    
        def __str__(self):      #显示的內容
            return '<<%s>>' % self.title    

     

     

       (2)數據庫遷移

    進入djangoshell環境中:Tools—-> run manage.py task

    shell環境中生成遷移文件:makemigrations。然後遷移:migrate

     

    5.生成admin

    1)在amin.py中註冊並且導入模型

    from django.contrib import admin
    port models
    
    admin.site.register(models.Book)

     

    2)創建用戶

    shell環境中:createsuper創建超級用戶,然後輸入用戶密碼(郵箱不用)

     

     

     2CBV的請求生命周期

    請求如何到CBV下的getpost

    a.請求過來,項目文件中路由分發給應用api的路由

    b.應用分發路由走as_view函數。

    views.Book.as_view()  保存一系列數據(requestargs**kwargs等)給Book對象,然後都給dispatch進行路由分發。

    dispatch乾的事:判斷請求方式是否支持,然後返回(通過getattr)支持的這些請求方法(getpost等,在視圖中自定義getpost的返回值)的結果。

    c.通過dispatch就執行了CBV下請求方式的結果,返回結果

     

     

    4.django原生的接口、序列化

      六大基礎接口:獲取一個、獲取所有、增加一個、刪除一個、整體更新一個、局部更新一個

     十大接口:6大基礎、群增、群刪、整體群改、局部群改

     

    1.在應用的urls.py下分發路由

    url(r’^books/$’, views.Book.as_view()),   #必須要加$,否則後面匹配不到

    url(r’^books/(?P<pk>.*)/$’, views.Book.as_view()),有名分組

    在視圖函數中通過kwargs.get(pk)取到匹配的值

     

    2.views.py里寫邏輯

    class Book(View):

     

        def get(self, request, *args, **kwargs):

            pk = kwargs.get(‘pk’)  #獲取參數

            if not pk:  #群查接口

                #操作數據庫

                book_obj_list = models.Book.objects.all()

                #序列化過程

                book_list = []

                for obj in book_obj_list:   #將查到的對象序列化

                    dic = {}

                    dic[‘title’] = obj.title

                    dic[‘price’] = obj.price

                    book_list.append(dic)

                return JsonResponse({

                    ‘status’ : 0,

                    “msg” : “ok”,

                    “results”: book_list,

                }, json_dumps_params={‘ensure_ascii’:False})

            else:    #單查接口

                book_dic = models.Book.objects.filter(pk=pk).values(

                    ‘title’, ‘price’).first()

                if book_dic:

                    return JsonResponse({

                        ‘status’: 0,

                        “msg”: “ok”,

                        “results”: book_dic,

                    }, json_dumps_params={‘ensure_ascii’: False})

     

                return JsonResponse({

                    ‘status’: 2,

                    “msg”: “no results”,

                }, json_dumps_params={‘ensure_ascii’: False})

     

     

        def post(self, request, *args, **kwargs):

            #前台通過urlencoded方式提交數據

            try:

                book_obj = models.Book.objects.create(**request.POST.dict()) #create創建對象。將request.POST中存放的提交的關鍵詞參數轉化為字典以**方式傳進去。沒傳參數,這邊會報錯。

                if book_obj:

                    return JsonResponse({

                    ‘status’: 0,

                    “msg”: “ok”,

                    “results”: {‘title’:book_obj.title, “price”:book_obj.price}

                }, json_dumps_params={‘ensure_ascii’: False})

            except:    #健壯性

                return JsonResponse({

                    ‘status’: 1,

                    “msg”: “wrong params”,

                }, json_dumps_params={‘ensure_ascii’: False})

            return JsonResponse({    #可能操作數據庫失敗了

                    ‘status’: 2,

                    “msg”: “created failed”,

                }, json_dumps_params={‘ensure_ascii’: False})

               

      

    JsonResponse返回時,中文會變成unicode,要加json_dumps_params={‘ensure_ascii’:False}選項。但在linux環境下的火狐瀏覽器,加了是亂碼。

     

    filter返回queryset對象,對象里是個列表(表名:對象信息(有自定義str就是自定義的信息))。first取里第一個對象(相當於print(第一個對象)values展示對應的對象里的值

    <QuerySet [<Book: <<三國演義>>>]>                   #直接.filter

    <<三國演義>>                                    #.first()

    <QuerySet [{‘title’: ‘三國演義‘, ‘price’: Decimal(‘56.00’)}]>  #.values(‘title’,’price’)

    {‘title’: ‘三國演義‘, ‘price’: Decimal(‘56.00’)}             #.values.first()  是個字典

     

     上面序列化的工作很麻煩。drf就是為了方便序列化的。

     

    postman可以完成不同方式的請求:getpostput

    postman發送數據包有三種方式:form-dataurlencodedjson. 原生djangourlencoded數據提交兼容。

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※教你寫出一流的銷售文案?

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※回頭車貨運收費標準

    ※別再煩惱如何寫文案,掌握八大原則!

    ※超省錢租車方案

    ※產品缺大量曝光嗎?你需要的是一流包裝設計!

  • 『圖論』LCA 最近公共祖先

    『圖論』LCA 最近公共祖先

    概述篇

    LCA (Least Common Ancestors) ,即最近公共祖先,是指這樣的一個問題:在一棵有根樹中,找出某兩個節點 uv 最近的公共祖先。

    LCA 可分為在線算法離線算法

    • 在線算法:指程序可以以序列化的方式一個一個處理輸入,也就是說在一開始並不需要知道所有的輸入。
    • 離線算法:指一開始就需要知道問題的所有輸入數據,而在解決一個問題后立即輸出結果。

    算法篇

    對於該問題,很容易想到的做法是從 u、v 分別回溯到根節點,然後這兩條路徑中的第一個交點即為 u、v 的最近公共祖先,在一棵平衡二叉樹中,該算法的時間複雜度可以達到 O(logn)O(log⁡n) ,但是對於某些退化為鏈狀的樹來說,算法的時間複雜度最壞為 O(n)O(n) ,顯然無法滿足更高頻率的查詢。

    本節將介紹幾種比較高效的算法來解決這一問題,常見的算法有三種:在線 DFS + ST 算法、倍增算法、離線 Tarjan 算法。

    接下來我們來一一解釋這三種 /* 看似高深,其實也不簡單 */ 的算法。

    在線 DFS + ST 算法

    首先看到 ST 你會想到什麼呢?(腦補許久都沒有想到它會是哪個單詞的縮寫)

    看過前文 『數據結構』RMQ 問題 的話你便可以明白 ST算法 的思路啦~

    So ,關於 LCA 的這種在線算法也是可以建立在 RMQ 問題的基礎上咯~

    我們設 LCA(T,u,v) 為在有根樹 T 中節點 u、v 的最近公共祖先, RMQ(A,i,j) 為線性序列 A 中區間 [i,j] 上的最小(大)值。

    如下圖這棵有根樹:

    我們令節點編號滿足父節點編號小於子節點編號(編號條件)

    可以看出 LCA(T,4,5) = 2, LCA(T,2,8) = 1, LCA(T,3,9) = 3

    設線性序列 A 為有根樹 T 的中序遍歷,即 A = [4,2,5,1,8,6,9,3,7]

    由中序遍歷的性質我們可以知道,任意兩點 u、v 的最近公共祖先總在以該兩點所在位置為端點的區間內,且編號最小。

    舉個栗子:

    假設 u = 8, v = 7 ,則該兩點所確定的一段區間為 [8,6,9,3,7] ,而區間最小值為 3 ,也就是說,節點 3u、v 的最近公共祖先。

    解決區間最值問題我們可以採用 RMQ 問題中的 ST 算法

    但是在有些問題中給出的節點並不一定滿足我們所說的父節點編號小於子節點編號,因此我們可以利用節點間的關係建圖,然後採用前序遍歷來為每一個節點重新編號以生成線性序列 A ,於是問題又被轉化為了區間最值的查詢,和之前一樣的做法咯~

    時間複雜度: n×O(logn)n×O(log⁡n) 預處理 + O(1)O(1) 查詢

    想了解 RMQ 問題 的解法可以戳上面的鏈接哦~

    以上部分介紹了 LCA 如何轉化為 RMQ 問題,而在實際中這兩種方案之間可以相互轉化

    類比之前的做法,我們如何將一個線性序列轉化為滿足編號條件的有根樹呢?

    1. 設序列中的最小值為 AkAk ,建立優先級為 AkAk 的根節點 TkTk
    2. 將 A[1…k−1]A[1…k−1] 遞歸建樹作為 TkTk 的左子樹
    3. 將 A[k+1…n]A[k+1…n] 遞歸建樹作為 TkTk 的右子樹

    讀者可以試着利用此方法將之前的線性序列 A = [4,2,5,1,8,6,9,3,7] 構造出有根樹 T ,結果一定滿足之前所說的編號條件,但卻不一定唯一。

    離線 Tarjan 算法

    Tarjan 算法是一種常見的用於解決 LCA 問題的離線算法,它結合了深度優先搜索與並查集,整個算法為線性處理時間。

    首先來介紹一下 Tarjan 算法的基本思路:

    1. 任選一個節點為根節點,從根節點開始
    2. 遍歷該點 u 的所有子節點 v ,並標記 v 已經被訪問過
    3. 若 v 還有子節點,返回 2 ,否則下一步
    4. 合併 v 到 u 所在集合
    5. 尋找與當前點 u 有詢問關係的點 e
    6. 若 e 已經被訪問過,則可以確定 u、e 的最近公共祖先為 e 被合併到的父親節點

    偽代碼:

    Tarjan(u)               // merge 和 find 為並查集合併函數和查找函數
    {
        for each(u,v)       // 遍歷 u 的所有子節點 v
        {
            Tarjan(v);      // 繼續往下遍歷
            merge(u,v);     // 合併 v 到 u 這一集合
            標記 v 已被訪問過;
        }
        for each(u,e)       // 遍歷所有與 u 有查詢關係的 e
        {
            if (e 被訪問過)
                u, e 的最近公共祖先為 find(e);
        }
    }
    C++
    

    感覺講到這裏已經沒有其它內容了,但是一定會有好多人沒有理解怎麼辦呢?

    我們假設在如下樹中模擬 Tarjan 過程(節點數量少一點可以畫更少的圖o( ̄▽ ̄)o)

    存在查詢: LCA(T,3,4)、LCA(T,4,6)、LCA(T,2,1)

    注意:每個節點的顏色代表它當前屬於哪一個集合,橙色線條為搜索路徑,黑色線條為合併路徑。

    當前所在位置為 u = 1 ,未遍歷孩子集合 v = {2,5} ,向下遍歷。

    當前所在位置為 u = 2 ,未遍歷孩子集合 v = {3,4} ,向下遍歷。

    當前所在位置為 u = 3 ,未遍歷孩子集合 v = {} ,遞歸到達最底層,遍歷所有相關查詢發現存在 LCA(T,3,4) ,但是節點 4 此時標記未訪問,因此什麼也不做,該層遞歸結束。

    遞歸返回,當前所在位置 u = 2 ,合併節點 3u 所在集合,標記 vis[3] = true ,此時未遍歷孩子集合 v = {4} ,向下遍歷。

    當前所在位置 u = 4 ,未遍歷孩子集合 v = {} ,遍歷所有相關查詢發現存在 LCA(T,3,4) ,且 vis[3] = true ,此時得到該查詢的解為節點 3 所在集合的首領,即 LCA(T,3,4) = 2 ;又發現存在相關查詢 LCA(T,4,6) ,但是節點 6 此時標記未訪問,因此什麼也不做。該層遞歸結束。

    遞歸返回,當前所在位置 u = 2 ,合併節點 4u 所在集合,標記 vis[4] = true ,未遍歷孩子集合 v = {} ,遍歷相關查詢發現存在 LCA(T,2,1) ,但是節點 1 此時標記未訪問,因此什麼也不做,該層遞歸結束。

    遞歸返回,當前所在位置 u = 1 ,合併節點 2u 所在集合,標記 vis[2] = true ,未遍歷孩子集合 v = {5} ,繼續向下遍歷。

    當前所在位置 u = 5 ,未遍歷孩子集合 v = {6} ,繼續向下遍歷。

    當前所在位置 u = 6 ,未遍歷孩子集合 v = {} ,遍歷相關查詢發現存在 LCA(T,4,6) ,且 vis[4] = true ,因此得到該查詢的解為節點 4 所在集合的首領,即 LCA(T,4,6) = 1 ,該層遞歸結束。

    遞歸返回,當前所在位置 u = 5 ,合併節點 6u 所在集合,並標記 vis[6] = true ,未遍歷孩子集合 v = {} ,無相關查詢因此該層遞歸結束。

    遞歸返回,當前所在位置 u = 1 ,合併節點 5u 所在集合,並標記 vis[5] = true ,未遍歷孩子集合 v = {} ,遍歷相關查詢發現存在 LCA(T,2,1) ,此時該查詢的解便是節點 2 所在集合的首領,即 LCA(T,2,1) = 1 ,遞歸結束。

    至此整個 Tarjan 算法便結束啦~

    PS:不要在意最終根節點的顏色和其他節點顏色有一點點小小差距,可能是在染色的時候沒仔細看,總之就這樣咯~

    PPS:所謂的首領就是、就是首領啦~

    倍增算法

    哇!還有一個倍增算法以後繼續補充吧!

    總結篇

    對於不同的 LCA 問題我們可以選擇不同的算法。

    假若一棵樹存在動態更新,此時離線算法就顯得有點力不從心了,但是在其他情況下,離線算法往往效率更高(雖然不能保證得到解的順序與輸入一致,不過我們有 sort 呀)

    總之,喜歡哪種風格的 code 是我們自己的意願咯~

    另外, LCA 和 RMQ 問題是兩個非常基礎的問題,很多複雜問題都可以轉化為這兩類問題來解決。(當然這兩類問題之間也可以相互轉化啦~)

    參考資料

    OI wiki https://oi-wiki.org/graph/lca/

    https://blog.csdn.net/my_sunshine26/article/details/72717112

    https://wizardforcel.gitbooks.io/the-art-of-programming-by-july/content/03.03.html

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※超省錢租車方案

    ※別再煩惱如何寫文案,掌握八大原則!

    ※回頭車貨運收費標準

    ※教你寫出一流的銷售文案?

    FB行銷專家,教你從零開始的技巧

  • 帶你學夠浪:Go語言基礎系列 – 8分鐘學複合類型

    文章每周持續更新,原創不易,「三連」讓更多人看到是對我最大的肯定。可以微信搜索公眾號「 後端技術學堂 」第一時間閱讀(一般比博客早更新一到兩篇)

    對於一般的語言使用者來說 ,20% 的語言特性就能夠滿足 80% 的使用需求,剩下在使用中掌握。基於這一理論,Go 基礎系列的文章不會刻意追求面面俱到,但該有知識點都會覆蓋,目的是帶你快跑趕上 Golang 這趟新車。

    Hurry up , Let’s go !

    前面我們學習過 Golang 中基礎數據類型,比如內置類型 int string bool 等,其實還有一些複雜一點點,但很好用的複合類型,類似 C 中的數組和 struct、C++ 中的 map ,今天我們就來學習 Go 中的複合類型。

    通過本文的學習你將掌握以下知識:

    • 結構體
    • 指針類型
    • 數組和切片
    • 映射類型

    指針

    指針不保存實際數據的內容,而是保存了指向值的內存地址 。用 & 對變量取內存地址,用 * 來訪問指向的內存。這點和 C 中的指針是一樣,唯一不同的是 Go 中的指針不能運算。

     a := 3
     pa := &a // 用 `&` 對變量取內存地址
     fmt.Println("point", a, *pa) // 用 `*` 來訪問指向的內存
    

    只聲明沒賦值的指針值是 nil ,代表空指針。

     var a0 *int // 只聲明沒賦值的指針是nil
     if a0 == nil {
      fmt.Println("point", "it is nil point")
     }
    

    結構體

    與C中的結構體類似, 結構體是一種聚合的數據類型,是由零個或多個任意類型的值聚合成的實體。每個值稱為結構體的成員,看例子:

    type Test struct {
      a int
      b int
     }
    

    語法上的不同看到了嗎? 每個結構體字段之後沒有分號,沒有分號寫起來還是很舒服的。

    初始化

    可以在定義的時候初始化

    test := Test{1, 2}  // 定義結構體變量並初始化
    

    初始化部分結構體字段

    t2  = Test{a: 3}   //指定賦值Test.a為3 Test.b隱式賦值0
    

    隱式初始化

    t3  = Test{}       // .a .b都隱式賦值0
    

    多個變量可以分組一起賦值

    var (
        t1  = Test{8, 6}
        t2  = Test{a: 3}  //指定賦值Test.a Test.b隱式賦值0
        t3  = Test{}      // .a .b都隱式賦值0
        pt4 = &Test{8, 6} // 指針
    )
    

    訪問成員

    通過 . 運算來訪問結構體成員,不區分結構體類型或是結構體指針類型。

    fmt.Println("struct", st0.a, st0.b) // 通過 . 運算來訪問結構體成員
    

    對於只聲明沒賦值的結構體,其內部變量被賦予零值,下面我們聲明了 st0 但沒有對其賦值。

    var st0 Test  
    fmt.Println("struct", st0.a, st0.b) //輸出:struct 0 0
    

    數組

    數組是一個由固定長度的特定類型元素組成的序列,一個數組可以由零個或多個元素組成。 數組可以用下標訪問元素,下標從 0 開始。

    數組聲明后賦值

     var strarr [2]string // 數組聲明語法
     strarr[0] = "ready"
     strarr[1] = "go"
    

    聲明賦值同時完成

     intarr := [5]int{6, 8, 9, 10, 7} // 聲明賦值同時完成
    

    對於確定初始值個數的數組,可以省略數組長度

     intarr := [...]int{6, 8, 9, 10, 7} // 聲明賦值同時完成
    

    Slice 切片

    切片是變長的序列,序列中每個元素都有相同的類型。slice 語法和數組很像,只是沒有固定長度而已,切片底層引用一個數組對象,修改切片會修改原數組。

    通過切片可以訪問數組的部分或全部元素,正因為切片長度不是固定的,因此切片比數組更加的常用。

    聲明與初始化

    常規初始化

    簡短聲明並初始化切片

    s0 := []int{1, 2, 3, 4, 5, 6} // 簡短聲明加賦值
    

    聲明后再初始化

    var s []int        // 聲明切片s
    s = s0     // 用切片s0初始化切片s
    

    聲明並初始化切片

    var s00 []int = s0 // 用切片s0初始化切片s
    

    切片的零值是 nil

    // 切片的零值是nil 空切片長度和容量都是0
    var nilslice []int
    if nilslice == nil {
        fmt.Println("slice", "nilslice is nil ", len(nilslice), cap(nilslice))
    }
    
    

    make初始化

    除了上述的常規初始化方法,還可以用 make 內置函數來創建切片

    // 內建函數make創建切片,指定切片長度和容量
    // make 函數會分配一個元素為零值的數組並返回一個引用了它的切片
    s2 := make([]int, 4, 6) //創建元素都是0的切片s2, 長度為4,容量為6 第三個參數可以省略
    fmt.Println("slice", len(s2), cap(s2), s2)
    

    切片長度

    長度表示切片中元素的數目,可用內置函數 len 函數得到。

    切片容量

    容量表示切片中第一個元素到引用的底層數組結尾所包含元素個數,可用內置函數 cap 求得。

    切片區間

    切片區間遵循「左閉右開」原則,

    s0 := [5]int{6, 8, 9, 10, 7} // 數組定義
    var slice []int = intarr[1:4]    // 創建切片slice 包含數組子序列
    

    默認上下界。切片下界的默認值為 0,上界默認是該切片的長度。

    fmt.Println("slice", s0[:], s0[0:], s0[:5], s0[0:5]) // 這四個切片相同
    

    切片append操作

    append 函數用於在切片末尾追加新元素。

    添加元素也分兩種情況。

    添加之後長度還在原切片容量範圍內

    s2 := make([]int, 4, 6) //創建元素都是0的切片s2, 長度為4,容量為6 第三個參數可以省略
    s22 := append(s2, 2)    // append每次都是在最後添加,所以此時,s21 s22指向同一個底層數組
    fmt.Println(s21, s22)   // [0 0 0 0 2] [0 0 0 0 2]
    

    添加元素之後長度超出原切片容量

    此時會分配新的數組空間,並返回指向這個新分配的數組的切片。

    下面例子中 s24 切片已經指向新分配的數組,s22 依然指向的是原來的數組空間,而 s24 已經指向了新的底層數組。

     s24 := append(s2, 1, 2, 3)
     fmt.Println(s24, s22) // s24 [0 0 0 0 1 2 3] [0 0 0 0 2]
    

    二維切片

    可以定義切片的切片,類似其他語言中的二維數組用法。參考代碼:

     s3 := [][]int{
      {1, 1, 1},
      {2, 2, 2},
     }
     fmt.Println(s3, s3[0], len(s3), cap(s3)) // 輸出: [[1 1 1] [2 2 2]] [1 1 1] 2 2
    

    map 映射類型

    在 Go 中 map 是鍵值對類型,代表 keyvalue 的映射關係,一個map就是一個哈希表的引用 。

    定義和初始化

    下面這樣定義並初始化一個 map 變量

     m0 := map[int]string{
      0: "0",
      1: "1",
     }
    

    也可以用內置 make 函數來初始化一個 map 變量,後續再向其中添加鍵值對。像下面這樣:

     m1 := make(map[int]string) // make 函數會返回給定類型的映射,並將其初始化備用
     if m1 != nil {
      fmt.Println("map", "m1 is not nil", m1) // m1 不是nil
     }
     m1[0] = "1"
     m1[1] = "2"
    

    注意:只聲明不初始化的map變量是 nil 映射,不能直接拿來用!

     var m map[int]string // 未初始化的m零值是nil映射
     if m == nil {
      fmt.Println("map", "m is nil", m)
     }
     //m[0] = "1" // 這句引發panic異常, 映射的零值為 nil 。nil映射既沒有鍵,也不能添加鍵。
    

    元素讀取

    使用語法:vaule= m[key] 獲取鍵 key 對應的元素 vaule 。

    上面我們只用了一個變量來獲取元素,其實這個操作會返回兩個值,第一個返回值代表讀書的元素,第二個返回值是代表鍵是否存在的 bool 類型,舉例說明:

     v, st := m1[0]  // v是元素值,下標對應的元素存在st=true 否則st=false
     _, st1 := m1[0] // _ 符號表示忽略第一個元素
     v1, _ := m1[0]  // _ 符號表示忽略第二個元素 
     fmt.Println(v, st, v1, st1, m1[2]) // m1[2]不存在,返回元素string的零值「空字符」
    

    刪除元素

    內置函數 delete 可以刪除 map 元素,舉例:

    delete(m1, 1)  // 刪除鍵是 1 的元素
    

    range 遍歷

    range 用於遍歷 切片 或 映射。

    數組或切片遍歷

    當使用for 循環和 range 遍曆數組或切片時,每次迭代都會返回兩個值。第一個值為當前元素的下標,第二個值為該下標所對應元素的一份副本。

    s1 := []int{1, 2, 3, 4, 5, 6}  
    for key, vaule := range s1 {
        fmt.Println("range", key, vaule)
    }
    
    for key := range s1 { // 只需要索引,忽略第二個變量即可
        fmt.Println("range", key)
    }
    
    for _, vaule := range s1 { // 只需要元素值,用'_'忽略索引
        fmt.Println("range", vaule)
    }
    

    map 遍歷

    當使用for 循環和 range 遍歷map 時,每次迭代都會返回兩個值。第一個值為當前元素 key , 第二個值是 value

    m0 := map[int]string{
        0: "0",
        1: "1",
    }
    fmt.Println("map", m0)
    
    for k, v := range m0 { // range遍歷映射,返回key 和 vaule
        fmt.Println("map", "m0 key:", k, "vaule:", v)
    }
    

    總結

    通過本文的學習,我們掌握了 Golang 中基本的控制流語句,利用這些控制語句加上一節介紹的變量等基礎知識,可以構成豐富的程序邏輯,你就能用 Golang 來做一些有意思的事情了。

    感謝各位的閱讀,文章的目的是分享對知識的理解,技術類文章我都會反覆求證以求最大程度保證準確性,若文中出現明顯紕漏也歡迎指出,我們一起在探討中學習.

    今天的技術分享就到這裏,我們下期再見。

    創作不易,白票不是好習慣,如果在我這有收穫,動動手指「點贊」「關注」是對我持續創作的最大支持。

    可以微信搜索公眾號「 後端技術學堂 」回復「資料」「1024」有我給你準備的各種編程學習資料。文章每周持續更新,我們下期見!

    本文使用 mdnice 排版

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

    ※想知道最厲害的網頁設計公司"嚨底家"!

    ※別再煩惱如何寫文案,掌握八大原則!

    ※產品缺大量曝光嗎?你需要的是一流包裝設計!

  • 最新 iOS 框架整體梳理(三),最新 iOS 框架整體梳理(一),最新 iOS 框架整體梳理(二),iOS – QuartzCore

    最新 iOS 框架整體梳理(三),最新 iOS 框架整體梳理(一),最新 iOS 框架整體梳理(二),iOS – QuartzCore

     

          這一篇得把介紹框架這個系列終結了,不能超過三篇了,不然太長了….. 還是老規矩,前面兩篇的機票在下方:

          最新 iOS 框架整體梳理(一)

          最新 iOS 框架整體梳理(二)

     

    Part – 3

     

               

     

    62、Metal  MetalKit

           Metal ( [ˈmetl] )  這是一個和 OpenGLES 類似的面向底層的圖形處理接口,這也是蘋果自己搞出來的,所以這個框架我還是推薦要有一個大概的了解。

           Metal 系列教程(1)- Metal 介紹及基本使用  (系列文章三篇都是講述 Metal 的,可以學習一下)

           iOS漸變二維碼之Metal實現篇

           官方文檔

    63、MetalPerdormanceShaders

           其實這個 MetalPerdormanceShaders 也是屬於Metal的內容,關於它的具體的使用我推薦一篇利用它組高斯迷糊的文章。

           學習用MetalPerformanceShaders進行圖像處理

           官方文檔

    64、MetricKit

           這是一個在 iOS 13 中新加入的框架,iOS 13 中推出了MetricKit,它用於收集和處理電池和性能指標。

           iOS MetricsKit 收集電量和性能數據

           官方文檔

    65、MobileCoreServices

           要是在iOS10 以後在有一些APP之間跳轉的時候是需要這個框架的,我也了解了一下關於這個框架,幾乎說的都是使用它的私有API的情況下跳轉,所以不推薦使用!按照現在的審核要求私有API是行不通的,要承擔被下架的風險,具體的UTIs可以在下面查詢.

           UTIs

    66、ModelIo

          這個框架出來的相對比較早了 iOS 9 的時候發布的,但在日常中使用的還真的不多,但關於這個框架的基本的認知還是可以通過官方文檔了解到的。

          官方文檔 

    67、MultiPeerConnectivity

           這個框架我們也是有必要了解一下的,它主要是用於iOS設備間的通信,就像我們兩台iOS設備間使用 Airdrop 傳輸文件等都是屬於iOS通訊的,藉助這個機會我也給大家介紹一個直接從手機拍照導入mac的快速方法,右鍵桌面,見下圖。這個是我自己經常會用到的一個東西。

     

     

           下面是對於iOS設備間通信方式的一個總結小圖:

     

     

            圖片來源於  iOS近距離實時通信解決方案 這篇文章也能讓我們了解這個框架。

            官方文檔

    68、NaturalLanguage、

           這是一個很有趣的框架,是在iOS12中新加入的,大家在發微信消息的時候比如說了句“我想你了”微信就會有小星星雨下落,當然不一定微信是利用這個框架實現的,但這個自然語言分析框架也的確能幫我們實現這一點。具體它的使用以及怎樣分析語言的就需要我們自己探索一下了。

           Apple NLP框架NaturalLanguage的應用實例

           官方文檔

    69、NetWork  NetWorkExtension

          它可給系統WiFi列表列表裡邊的WiFi設置密碼 、標籤(副標題)。 還可獲取整個WiFi列表。獲取到WIFI列表之後呢,判斷有沒有連接上自己公司的WIFI,然後讓他打卡上班?這個我真沒試過,要有這種需求還真的是有點厲害!

         iOS 獲取系統wifi列表,wifi信號強度,並給wifi設置密碼,標籤(副標題)

         官方文檔

    70、NewsstandKit ( deprecated 

    71、NotificationCenter

          框架這東西整理的時候我發現兩個問題,最不常用的、最常用的反而是最難料理的。這個通知就是,不管是本地通知還是遠程通知我相信大家用的都很熟悉很熟悉了!所以關於它真的也只能一筆帶過了,不過還是提一句,通知框架里的東西的確需要我們掌握的,尤其是在iOS10之後蘋果在通知上是下了一份功夫的。

    72、OpenAL

          它也是一個音頻播放的框架,我們前面說過的關於音頻播放的框架真的不少了,像 AudioToolbox ,但它們之間還是有區別的,在延時、緩存等方面存在着區別。

          OpenAL的一些知識點

    73、OpenGLES

          iOS上繪製圖形的方式很多,UIKit,CoreGraphics,SpriteKit,OpenGL ES,Metal等。OpenGL ES是一套非常底層但使用非常廣泛的C語言API,專為移動設備定製,可在不同的手機系統或瀏覽器上使用,渲染效果非常好。

          iOS-OpenGLES  這是個系列文章,從這裏進去有好多的東西等着你學習呢。

    74、PassKit

          PassKit 框架在您的應用程序中請求和處理Apple Pay付款。 創建,分發和更新电子錢包應用的通行證。

          iOS PassKit Wallet 開發

          官方文檔

    75、PDFKit

           iOS 11 后蘋果在iOS平台開放了PDFKit SDK,可以使用這個框架显示和操作 pdf 文件,此項目應用PDFKit實現显示pdf、显示縮略圖、展開大綱和搜索文字的功能。這個框架還是值得我們好好學習一下的。

           iOS PDFKit框架講解

           官方文檔

    76、PencilKit

           這個框架是在iOS13中加入的,PencilKit可讓您輕鬆快捷地將手繪內容整合到iOS或macOS應用中。 PencilKit為iOS應用程序提供了一個繪圖環境,該環境可以從Apple Pencil或用戶的手指中獲取輸入,並將其轉換為您在iOS或macOS中显示的高質量圖像。該環境附帶了用於創建,擦除和選擇線條的工具。

           官方文檔

    77、Photos   PhotosUI

           這兩個框架是開發者比較熟悉常用的,它的最低適配版本是iOS 8,所以以前的相冊框架幾乎也都是不用了。關於它的資料網絡是哪個還真的不少,所以我們也就不多說了。

           官方文檔

    78、PuskKit  (很慚愧,沒找到資料)

    79、QuartzCore

           這個框架相信大家還是比較熟悉的,它裏面的內容我們在日常開發中也經常會用到,比如 CAAnimation(動畫),CADisplayLink(定時器),CAShapeLayer(圖層),CAGradientLayer(漸變)等等,一起拿我有寫文章大概的介紹過這個框架。

           iOS – QuartzCore

    80、QuickLook  QuickLookThumbnailing (Thumbnail [ˈθʌmneɪl] 縮略圖)

           QuickLook幾乎可以預覽幾乎所有的文件,像圖片、音樂,視頻、PDF、Word等都是可以。但是其可定製部分比較少,樣式比較單一,這是它的缺點。

           iOS快速預覽——QuickLook

           QuickLook官方文檔

           QuickLookThumbnailing官方文檔

    81、RealityKit

          RealityKit 是iOS 13 + 專為增強現實技術開發的一款新的高級框架,它可以處理渲染的所有方面,包括材質、陰影、反射,甚至相機的運動模糊。它還為多人AR應用程序處理網絡,這意味着開發人員不需要成為網絡工程師就可以來開發共享AR體驗,這個框架會和後面介紹的 SceneKit 和 ARKit 配合使用

          iOS ARKit,SceneKit,RealityKit總結

          官方文檔

    82、ReplayKit

          這是一個錄製屏幕的框架,但在不同的iOS版本中確有許多不同的表現,這個大家可以看下面分享的文章看一下。這一塊的需求應該也有,主要應該還是集中在遊戲中吧。

          iOS端使用replaykit錄製屏幕的技術細節

          官方文檔

    83、SafariServices

          這個框架看前面的Safari就知道和Safari瀏覽器相關了,你可以把瀏覽器集成到項目中然後瀏覽器上面能做的事你都可以做。具體的還是見官方文檔,在實際的項目中我們對這個框架的利用率感覺不是特別高。

          官方文檔

    84、SceneKit

           在前面說RealityKit框架的時候有提過這個框架,還是那句話它和RealityKit還有ARKit都是處理AR方面的內容的,你了解其中一個的時候回自然的了解到別的框架。

           官方文檔

    85、Security

          Security 框架用於保證應用程序所管理之數據的安全。該框架提供的接口可用於管理證書、公鑰、私鑰以及信任策略。它支持生成加密的安全偽隨機數。同時,它也支持對證書和Keychain密鑰進行保存,是用戶敏感數據的安全倉庫。

          關於它官方文檔最後面一個注意點說的挺明確的,內容如下:

           其實上面的大致意思就是說在iOS中我們平常使用的像URL等都是建立在安全框架基礎上的,所以我們沒必要刻意的使用這個安全框架,要視情況而定。

           官方文檔

    86、Social

           這也是一個社會化分享框架,只不過的原生的,所以在一些簡單的分享中我覺得還是可以一試的,沒必要一個不怎麼沉重的功能上一把第三方的殺牛刀。

           ios原生社交分享實踐

           官方文檔

    87、SoundAnalysis

           使用SoundAnalysis框架來分析音頻,並將其識別為特定類型,比如笑聲或掌聲。框架使用由MLSoundClassifier訓練的核心ML模型來執行分析。使用框架的能力分析流或基於文件的音頻,讓您添加智能音頻識別功能到您的應用程序。這個框架看介紹我覺得是一個很有意思的點,有空研究一下。

           官方文檔

    88、Speech

           這是一個語音識別的框架,也是很有趣的一個框架。建議大家都了解學習一下。

           iOS-Speech Framework

           官方文檔

    89、SpriteKit

           以前在接觸Cocos2d-JS的是有才有的“精靈”這個概念,你要不涉及這一塊那你知道那是一個和遊戲來發相關的框架就可以了,要是你是做遊戲的那我相信這個框架你也早都應該了解了。

           iOS SpriteKit 遊戲

           官方文檔

    90、StoreKit

           蘋果的內購相信大家也都有了解,這個框架就是專門用來處理內容的,有條件的我建議還是好好了解一下關於內購的知識。你再找它的資料的時候不塌搜索這個框架名稱,你直接搜索iOS 內購即可,這樣找打的資源相對多一些。以前有寫過關於內購的內容,有興趣的可以翻翻我以前的博客。

          官方文檔

    91、SwiftUI

          這個是一個全新的UI框架,它應該在以後也是一個趨勢,就像Swift一樣,它裏面的東西我們是有必要進行一個學習的。當然學習的資料也是相當的豐富。所以下面我們就只給出一個官方的文檔,具體的內容可以自己上網去篩選。

          官方文檔

    92、SystemConfiguration

          看網上的資源說這個框架也是一個用來測試網絡連接狀態的框架,但具體的使用又似乎不多。但的確可以嘗試,要是效果不多的話我建議能用原生的盡量避免使用第三方。

    93、Twiteer  UIKit  這兩個框架知道就行了,因為一個幾乎不用一個幾乎每天都用,的確沒有更多的可以說了。

    94、UserNotifications UserNotificationsUI

           這兩個框架在iOS10給的最大的一個驚喜,的確在10以後把通知優化的很是強大。這兩個框架相信很多人都知道,就沒必要在細說,葯還有不知道該怎麼處理的的確是應該去好好的研究一下他們。

    95、VideoSubscriberAccount

           iOS10引入了Video Subscriber Account框架(VideoSubscriberAccount.framework)來幫助應用支持流媒體認證或認證視頻點播(也被稱為TV Everywhere)與他們的有線電視或衛星電視供應商認證。 對於那些用戶註冊一次就能解鎖流媒體訂閱服務的應用來說,使用這個框架中的API可以幫助你支持單一登錄體驗。   

           這個框架的確我也沒有使用過,它是一個和AppleTV掛鈎的框架,具體的信息大家可以去看官方文檔。

           官方文檔

    96、VideoToolbox

           這個框架使讓用戶可以自行對視頻進行硬編解碼操作。關於視頻的硬編碼和解碼我也在學習計劃的當中,建議還是過一遍裏面的東西。

           iOS 利用VideoToolBox對視頻進行編解碼

           iOS利用VideoToolbox實現視頻硬解碼

           官方文檔

    97、Vision VisionKit ([ˈvɪʒn] 視力;美景;眼力;幻象)

           這個框架也是一個比較值得我們深入研究的框架,它是一個可以用來做識別圖像的框架。像面部檢測、矩陣碼/條形碼檢測等等,具體的可以在官方文檔裏面看到或者下面的文章都是可以看到的。

           iOS Vision 框架概覽

           iOS Vision的使用

           官方文檔

    98、WatchConnectivity

           這個框架看名字就能很好的理解它的作用了,它是用於 Watch 應用和 iOS 設備傳輸數據的框架。

           WatchConnectivity 介紹:告別加載等待。

           官方文檔

    99、WebKit

           這個框架也是日常中經常會用到的一個框架,WKWebView就是它裏面的Web頁面展示View,現在iOS端的網頁幾乎應該都是使用WK展示的吧,UIWebView已經被廢棄了,再用會影響到審核。這個框架具體的內容像和JS交互這個我們就不再提了,網上關於它的資料還真的不少。

     

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

    ※別再煩惱如何寫文案,掌握八大原則!

    ※教你寫出一流的銷售文案?

    ※超省錢租車方案

    FB行銷專家,教你從零開始的技巧