標籤: 台北網頁設計公司

  • 【Leetcode 做題學算法周刊】第四期

    首發於微信公眾號《前端成長記》,寫於 2019.11.21

    背景

    本文記錄刷題過程中的整個思考過程,以供參考。主要內容涵蓋:

    • 題目分析設想
    • 編寫代碼驗證
    • 查閱他人解法
    • 思考總結

    目錄

    Easy

    67.二進制求和

    題目描述

    給定兩個二進制字符串,返回他們的和(用二進製表示)。

    輸入為非空字符串且只包含数字 10

    示例:

    輸入: a = "11", b = "1"
    輸出: "100"
    
    輸入: a = "1010", b = "1011"
    輸出: "10101"

    題目分析設想

    這道題又是一道加法題,所以記住下,直接轉数字進行加法可能會溢出,所以不可取。所以我們需要遍歷每一位來做解答。我這有兩個大方向:補0后遍歷,和不補0遍歷。但是基本的依據都是本位相加,逢2進1即可,類似手寫10進制加法。

    • 補0后遍歷,可以採用先算出的位數推入數組最後反轉,也可以採用先算出的位數填到對應位置后直接輸出
    • 不補0遍歷,根據短數組的長度進行遍歷,長數組剩下的数字與短數組生成的進位進行計算

    查閱他人解法

    Ⅰ.補0后遍歷,先算先推

    代碼:

    /**
     * @param {string} a
     * @param {string} b
     * @return {string}
     */
    var addBinary = function(a, b) {
        let times = Math.max(a.length, b.length) // 需要遍歷次數
        // 補 0
        while(a.length < times) {
            a = '0' + a
        }
        while(b.length < times) {
            b = '0' + b
        }
        let res = []
        let carry = 0 // 是否進位
        for(let i = times - 1; i >= 0; i--) {
            const num = carry + (a.charAt(i) | 0) + (b.charAt(i) | 0)
            carry = num >= 2 ? 1 : 0
            res.push(num % 2)
        }
        if (carry === 1) {
            res.push(1)
        }
        return res.reverse().join('')
    };

    結果:

    • 294/294 cases passed (68 ms)
    • Your runtime beats 95.13 % of javascript submissions
    • Your memory usage beats 72.58 % of javascript submissions (35.4 MB)
    • 時間複雜度 O(n)

    Ⅱ.補0后遍歷,按位運算

    代碼:

    /**
     * @param {string} a
     * @param {string} b
     * @return {string}
     */
    var addBinary = function(a, b) {
        let times = Math.max(a.length, b.length) // 需要遍歷次數
        // 補 0
        while(a.length < times) {
            a = '0' + a
        }
        while(b.length < times) {
            b = '0' + b
        }
        let res = []
        let carry = 0 // 是否進位
        for(let i = times - 1; i >= 0; i--) {
            res[i] = carry + (a.charAt(i) | 0) + (b.charAt(i) | 0)
            carry = res[i] >= 2 ? 1 : 0
            res[i] %= 2
        }
        if (carry === 1) {
            res.unshift(1)
        }
        return res.join('')
    };

    結果:

    • 294/294 cases passed (60 ms)
    • Your runtime beats 99.65 % of javascript submissions
    • Your memory usage beats 65.82 % of javascript submissions (35.5 MB)
    • 時間複雜度 O(n)

    Ⅲ.不補0遍歷

    當然處理方式還是可以選擇上面兩種,我這就採用先算先推來處理了。

    代碼:

    /**
     * @param {string} a
     * @param {string} b
     * @return {string}
     */
    var addBinary = function(a, b) {
        let max = Math.max(a.length, b.length) // 最大長度
        let min = Math.min(a.length, b.length) // 最大公共長度
    
        // 將長字符串拆成兩部分
        let left = a.length > b.length ? a.substr(0, a.length - b.length) : b.substr(0, b.length - a.length)
        let right = a.length > b.length ? a.substr(a.length - b.length) : b.substr(b.length - a.length)
    
        // 公共長度部分遍歷
        let rightRes = []
        let carry = 0
        for(let i = min - 1; i >= 0; i--) {
            const num = carry + (right.charAt(i) | 0) + (((a.length > b.length ? b : a)).charAt(i) | 0)
            carry = num >= 2 ? 1 : 0
            rightRes.push(num % 2)
        }
    
        let leftRes = []
        for(let j = max - min - 1; j >= 0; j--) {
            const num = carry + (left.charAt(j) | 0)
            carry = num >= 2 ? 1 : 0
            leftRes.push(num % 2)
        }
    
        if (carry === 1) {
            leftRes.push(1)
        }
        return leftRes.reverse().join('') + rightRes.reverse().join('')
    };

    結果:

    • 294/294 cases passed (76 ms)
    • Your runtime beats 80.74 % of javascript submissions
    • Your memory usage beats 24.48 % of javascript submissions (36.2 MB)
    • 時間複雜度 O(n)

    查閱他人解法

    看到一些細節上的區別,我這使用 '1' | 0 來轉数字,有的使用 ''1' - '0''。另外還有就是初始化結果數組長度為最大長度加1后,最後判斷首位是否為0需要剔除的,我這使用的是判斷最後是否還要進位補1。

    這裏還看到用一個提案中的 BigInt 類型來解決的

    Ⅰ.BigInt

    代碼:

    /**
     * @param {string} a
     * @param {string} b
     * @return {string}
     */
    var addBinary = function(a, b) {
        return (BigInt("0b"+a) + BigInt("0b"+b)).toString(2);
    };

    結果:

    • 294/294 cases passed (52 ms)
    • Your runtime beats 100 % of javascript submissions
    • Your memory usage beats 97.05 % of javascript submissions (34.1 MB)
    • 時間複雜度 O(1)

    思考總結

    通過 BigInt 的方案我們能看到,使用原生方法確實性能更優。簡單說一下這個類型,目前還在提案階段,看下面的等式基本就能知道實現原理自己寫對應 Hack 來實現了:

    BigInt(10) = '10n'
    BigInt(20) = '20n'
    BigInt(10) + BigInt(20) = '30n'

    雖然這種方式很友好,但是還是希望看到加法題的時候,能考慮到遍歷按位處理。

    69.x的平方根

    題目描述

    實現 int sqrt(int x) 函數。

    計算並返回 x 的平方根,其中 x 是非負整數。

    由於返回類型是整數,結果只保留整數的部分,小數部分將被捨去。

    示例:

    輸入: 4
    輸出: 2
    
    輸入: 8
    輸出: 2
    說明: 8 的平方根是 2.82842...,
         由於返回類型是整數,小數部分將被捨去。

    題目分析設想

    同樣,這裏類庫提供的方法 Math.sqrt(x) 就不說了,這也不是本題想考察的意義。所以這裡有幾種方式:

    • 暴力法,這裏不用考慮溢出是因為x沒溢出,所以即使加到平方根加1,也會終止循環
    • 二分法,直接取中位數運算,可以快速排除當前區域一半的區間

    編寫代碼驗證

    Ⅰ.暴力法

    代碼:

    /**
     * @param {number} x
     * @return {number}
     */
    var mySqrt = function(x) {
        if (x === 0) return 0
        let i = 1
        while(i * i < x) {
            i++
        }
        return i * i === x ? i : i - 1
    };

    結果:

    • 1017/1017 cases passed (120 ms)
    • Your runtime beats 23 % of javascript submissions
    • Your memory usage beats 34.23 % of javascript submissions (35.7 MB)
    • 時間複雜度 O(n)

    Ⅱ.二分法

    代碼:

    /**
     * @param {number} x
     * @return {number}
     */
    var mySqrt = function(x) {
        if (x === 0) return 0
        let l = 1
        let r = x >>> 1
        while(l < r) {
            // 這裏要用大於判斷,所以取右中位數
            const mid = (l + r + 1) >>> 1
    
            if (mid * mid > x) {
                r = mid - 1
            } else {
                l = mid
            }
        }
        return l
    };

    結果:

    • 1017/1017 cases passed (76 ms)
    • Your runtime beats 96.08 % of javascript submissions
    • Your memory usage beats 59.17 % of javascript submissions (35.5 MB)
    • 時間複雜度 O(log2(n))

    查閱他人解法

    這裏看見了兩個有意思的解法:

    • 2的冪次底層優化
    • 牛頓法

    Ⅰ.冪次優化

    稍微解釋一下,二分法需要做乘法運算,他這裏改用加減法

    /**
     * @param {number} x
     * @return {number}
     */
    var mySqrt = function(x) {
        let l = 0
        let r = 1 << 16 // 2的16次方,這裏我猜是因為上限2^32所以取一半
        while (l < r - 1) {
            const mid = (l + r) >>> 1
            if (mid * mid <= x) {
                l = mid
            } else {
                r = mid
            }
        }
        return l
    };

    結果:

    1017/1017 cases passed (72 ms)
    Your runtime beats 98.46 % of javascript submissions
    Your memory usage beats 70.66 % of javascript submissions (35.4 MB)

    • 時間複雜度 O(log2(n))

    Ⅱ.牛頓法

    算法說明:

    在迭代過程中,以直線代替曲線,用一階泰勒展式(即在當前點的切線)代替原曲線,求直線與 xx 軸的交點,重複這個過程直到收斂。

    首先隨便猜一個近似值 x,然後不斷令 x 等於 xa/x 的平均數,迭代個六七次后 x 的值就已經相當精確了。

    公式可以寫為 X[n+1]=(X[n]+a/X[n])/2

    代碼:

    /**
     * @param {number} x
     * @return {number}
     */
    var mySqrt = function(x) {
        if (x === 0 || x === 1) return x
    
        let a = x >>> 1
        while(true) {
            let cur = a
            a = (a + x / a) / 2
            // 這裡是為了消除浮點運算的誤差,1e-5是我試出來的
            if (Math.abs(a - cur) < 1e-5) {
                return parseInt(cur)
            }
        }
    };

    結果:

    • 1017/1017 cases passed (68 ms)
    • Your runtime beats 99.23 % of javascript submissions
    • Your memory usage beats 9.05 % of javascript submissions (36.1 MB)
    • 時間複雜度 O(log2(n))

    思考總結

    這裏就提一下新接觸的牛頓法吧,實際上是牛頓迭代法,主要是迭代操作。由於在單根附近具有平方收斂,所以可以轉換成線性問題去求平方根的近似值。主要應用場景有這兩個方向:

    • 求方程的根
    • 求解最優化問題

    70.爬樓梯

    題目描述

    假設你正在爬樓梯。需要 n 階你才能到達樓頂。

    每次你可以爬 12 個台階。你有多少種不同的方法可以爬到樓頂呢?

    注意:給定 n 是一個正整數。

    示例:

    輸入: 2
    輸出: 2
    解釋: 有兩種方法可以爬到樓頂。
    1.  1 階 + 1 階
    2.  2 階
    
    輸入: 3
    輸出: 3
    解釋: 有三種方法可以爬到樓頂。
    1.  1 階 + 1 階 + 1 階
    2.  1 階 + 2 階
    3.  2 階 + 1 階

    題目分析設想

    這道題很明顯可以用動態規劃和斐波那契數列來求解。然後我們來看看其他正常思路,如果使用暴力法的話,那麼複雜度將會是 2^n,很容易溢出,但是如果能夠優化成 n 的話,其實還可以求解的。所以這道題我就從以下三個方向來作答:

    • 哈希遞歸,也就是暴力運算的改進版,通過存下算過的值降低複雜度
    • 動態規劃
    • 斐波那契數列

    編寫代碼驗證

    Ⅰ.哈希遞歸

    代碼:

    /**
     * @param {number} n
     * @return {number}
     */
    var climbStairs = function(n) {
        let hash = {}
        return count(0)
        function count (i) {
            if (i > n) return 0
            if (i === n) return 1
    
            // 這步節省運算
            if(hash[i] > 0) {
                return hash[i]
            }
    
            hash[i] = count(i + 1) + count(i + 2)
            return hash[i]
        }
    };

    結果:

    • 45/45 cases passed (52 ms)
    • Your runtime beats 98.67 % of javascript submissions
    • Your memory usage beats 48.29 % of javascript submissions (33.7 MB)
    • 時間複雜度 O(n)

    Ⅱ.動態規劃

    代碼:

    /**
     * @param {number} n
     * @return {number}
     */
    var climbStairs = function(n) {
        if (n === 1) return 1
        if (n === 2) return 2
        // dp[0] 多一位空間,省的後面做減法
        let dp = new Array(n + 1).fill(0)
        dp[1] = 1
        dp[2] = 2
        for(let i = 3; i <= n; i++) {
            dp[i] = dp[i - 1] + dp[i - 2]
        }
        return dp[n]
    };

    結果:

    • 45/45 cases passed (48 ms)
    • Your runtime beats 99.48 % of javascript submissions
    • Your memory usage beats 21.49 % of javascript submissions (33.8 MB)
    • 時間複雜度 O(n)

    Ⅲ.斐波那契數列

    其實斐波那契數列就可以用動態規劃來實現,所以下面的代碼思路很相似。

    代碼:

    /**
     * @param {number} n
     * @return {number}
     */
    var climbStairs = function(n) {
        if (n === 1) return 1
        if (n === 2) return 2
        let num1 = 1
        let num2 = 2
        for(let i = 3; i <= n; i++) {
            let count = num1 + num2
            num1 = num2
            num2 = count
        }
        // 相當於fib(n)
        return num2
    };

    結果:

    • 45/45 cases passed (56 ms)
    • Your runtime beats 95.49 % of javascript submissions
    • Your memory usage beats 46.1 % of javascript submissions (33.7 MB)
    • 時間複雜度 O(n)

    查閱他人解法

    查看題解發現這麼幾種解法:

    • 斐波那契公式(原來有計算公式可以直接用,尷尬)
    • Binets 方法
    • 排列組合

    Ⅰ.斐波那契公式

    代碼:

    /**
     * @param {number} n
     * @return {number}
     */
    var climbStairs = function(n) {
        const sqrt_5 = Math.sqrt(5)
        // 由於 F0 = 1,所以相當於需要求 n+1 的值
        const fib_n = Math.pow((1 + sqrt_5) / 2, n + 1) - Math.pow((1 - sqrt_5) / 2, n + 1)
        return Math.round(fib_n / sqrt_5)
    };

    結果:

    • 45/45 cases passed (52 ms)
    • Your runtime beats 98.67 % of javascript submissions
    • Your memory usage beats 54.98 % of javascript submissions (33.6 MB)
    • 時間複雜度 O(log(n))

    Ⅱ.Binets 方法

    算法說明:

    使用矩陣乘法來得到第 n 個斐波那契數。注意需要將初始項從 fib(2)=2,fib(1)=1 改成 fib(2)=1,fib(1)=0 ,來達到矩陣等式的左右相等。

    代碼:

    /**
     * @param {number} n
     * @return {number}
     */
    var climbStairs = function(n) {
    
        function pow(a, n) {
            let ret = [[1,0],[0,1]] // 矩陣
            while(n > 0) {
                if ((n & 1) === 1) {
                    ret = multiply(ret, a)
                }
                n >> 1
                a = multiply(a, a)
            }
            return ret;
        }
        function multiply(a, b) {
            let c = [[0,0], [0,0]]
            for (let i = 0; i < 2; i++) {
                for(let j = 0; j < 2; j++) {
                    c[i][j] = a[i][0] * b[0][j] + a[i][1] * b[1][j]
                }
            }
            return c
        }
    
        let q = [[1,1], [1, 0]]
        let res = pow(q, n)
        return res[0][0]
    };

    結果:

    測試用例可以輸出,提交發現超時。

    這個筆者還沒完全理解,所以很抱歉,暫時沒有 js 相應代碼分析,後續會補上。也歡迎您補充給我,感謝!

    Ⅲ.排列組合

    代碼:

    /**
     * @param {number} n
     * @return {number}
     */
    var climbStairs = function(n) {
        // n 個台階走 i 次1階和 j 次2階走到,推導出 i + 2*j = n
        function combine(m, n) {
            if (m < n) [m, n] = [n, m];
            let count = 1;
            for (let i = m + n, j = 1; i > m; i--) {
                count *= i;
                if (j <= n) count /= j++;
            }
            return count;
        }
        let total = 0;
        // 取出所有滿足條件的解
        for (let i = 0,j = n; j >= 0; j -= 2, i++) {
          total += combine(i, j);
        }
        return total;
    };

    結果:

    • 45/45 cases passed (60 ms)
    • Your runtime beats 87.94 % of javascript submissions
    • Your memory usage beats 20.72 % of javascript submissions (33.8 MB)
    • 時間複雜度 O(n^2)

    思考總結

    這種疊加的問題,首先就會想到動態規劃的解法,剛好這裏又滿足斐波那契數列,所以我是推薦首選這兩種解法。另外通過查看他人解法學到了斐波那契公式,以及站在排列組合的角度去解,開拓了思路。

    83.刪除排序鏈表中的重複元素

    題目描述

    給定一個排序鏈表,刪除所有重複的元素,使得每個元素只出現一次。

    示例:

    輸入: 1->1->2
    輸出: 1->2
    
    輸入: 1->1->2->3->3
    輸出: 1->2->3

    題目分析設想

    注意一下,給定的是一個排序鏈表,所以只需要依次更改指針就可以直接得出結果。當然,也可以使用雙指針來跳過重複項即可。所以這裡有兩個方向:

    • 直接運算,通過改變指針指向
    • 雙指針,通過跳過重複項

    如果是無序鏈表,我會建議先得到所有值然後去重后(比如通過Set)生成新鏈表作答。

    編寫代碼驗證

    Ⅰ.直接運算

    代碼:

    /**
     * @param {ListNode} head
     * @return {ListNode}
     */
    var deleteDuplicates = function(head) {
        // 複製一個用做操作,由於對象是傳址,所以改指針指向即可
        let cur = head
        while(cur !== null && cur.next !== null) {
            if (cur.val === cur.next.val) { // 值相等
                cur.next = cur.next.next
            } else {
                cur = cur.next
            }
        }
        return head
    };

    結果:

    • 165/165 cases passed (76 ms)
    • Your runtime beats 87.47 % of javascript submissions
    • Your memory usage beats 81.21 % of javascript submissions (35.5 MB)
    • 時間複雜度 O(n)

    Ⅱ.雙指針法

    代碼:

    /**
     * @param {ListNode} head
     * @return {ListNode}
     */
    var deleteDuplicates = function(head) {
        // 新建哨兵指針和當前遍歷指針
        if (head === null || head.next === null) return head
        let pre = head
        let cur = head
        while(cur !== null) {
            debugger
            if (cur.val === pre.val) {
                // 當前指針移動
                cur = cur.next
            } else {
                pre.next = cur
                pre = cur
            }
        }
        // 最後一項如果重複需要把head.next指向null
        pre.next = null
        return head
    };

    結果:

    • 165/165 cases passed (80 ms)
    • Your runtime beats 77.31 % of javascript submissions
    • Your memory usage beats 65.1 % of javascript submissions (35.7 MB)
    • 時間複雜度 O(n)

    查閱他人解法

    忘記了,這裏確實還可以使用遞歸來作答。

    Ⅰ.遞歸法

    代碼:

    /**
     * @param {ListNode} head
     * @return {ListNode}
     */
    var deleteDuplicates = function(head) {
        if(head === null || head.next === null) return head
        if (head.val === head.next.val) { // 值相等
            return deleteDuplicates(head.next)
        } else {
            head.next = deleteDuplicates(head.next)
        }
        return head
    };

    結果:

    • 165/165 cases passed (80 ms)
    • Your runtime beats 77.31 % of javascript submissions
    • Your memory usage beats 81.21 % of javascript submissions (35.5 MB)
    • 時間複雜度 O(n)

    思考總結

    關於鏈表的題目一般都是通過修改指針指向來作答,區分單指針和雙指針法。另外,遍歷也是可以實現的。

    88.合併兩個有序數組

    題目描述

    給定兩個有序整數數組 nums1nums2,將 nums2 合併到 nums1 中,使得 num1 成為一個有序數組。

    說明:

    • 初始化 nums1nums2 的元素數量分別為 mn
    • 你可以假設 nums1 有足夠的空間(空間大小大於或等於 m + n)來保存 nums2 中的元素。

    示例:

    輸入:
    nums1 = [1,2,3,0,0,0], m = 3
    nums2 = [2,5,6],       n = 3
    
    輸出: [1,2,2,3,5,6]

    題目分析設想

    之前我們做過刪除排序數組中的重複項,其實這裏也類似。可以從這幾個方向作答:

    • 數組合併後排序
    • 遍曆數組並進行插入
    • 雙指針法,輪流比較

    但是由於題目有限定空間都在 nums1 ,並且不要寫 return ,直接在 nums1 上修改,所以我這裏主要的思路就是遍歷,通過 splice 來修改數組。區別就在於遍歷的方式方法。

    • 從前往後
    • 從后往前
    • 合併後排序再賦值

    編寫代碼驗證

    Ⅰ.從前往後

    代碼:

    /**
     * @param {number[]} nums1
     * @param {number} m
     * @param {number[]} nums2
     * @param {number} n
     * @return {void} Do not return anything, modify nums1 in-place instead.
     */
    var merge = function(nums1, m, nums2, n) {
        // 兩個數組對應指針
        let p1 = 0
        let p2 = 0
        // 這裏需要提前把nums1的元素拷貝出來,要不然比較賦值后就丟失了
        let cpArr = nums1.splice(0, m)
    
        // 數組指針
        let p = 0
        while(p1 < m && p2 < n) {
            // 先賦值,再進行+1操作
            nums1[p++] = cpArr[p1] < nums2[p2] ? cpArr[p1++] : nums2[p2++]
        }
        // 已經有p個元素了,多餘的元素要刪除,剩餘的要加上
        if (p1 < m) {
            // 剩餘元素,p1 + m + n - p = m + n - (p - p1) = m + n - p2
            nums1.splice(p, m + n - p, ...cpArr.slice(p1, m + n - p2))
        }
        if (p2 < n) {
            // 剩餘元素,p2 + m + n - p = m + n - (p - p2) = m + n - p1
            nums1.splice(p, m + n - p, ...nums2.slice(p2, m + n - p1))
        }
    };

    結果:

    • 59/59 cases passed (48 ms)
    • Your runtime beats 100 % of javascript submissions
    • Your memory usage beats 64.97 % of javascript submissions (33.8 MB)
    • 時間複雜度 O(m + n)

    Ⅱ.從后往前

    代碼:

    /**
     * @param {number[]} nums1
     * @param {number} m
     * @param {number[]} nums2
     * @param {number} n
     * @return {void} Do not return anything, modify nums1 in-place instead.
     */
    var merge = function(nums1, m, nums2, n) {
        // 避免 nums1 = [0,0,0,0], nums2 = [1,2] 這種 nums1.length > nums2.length 並且 m = 0
        nums1.splice(m, nums1.length - m)
        // 兩個數組對應指針
        let p1 = m - 1
        let p2 = n - 1
        // 數組指針
        let p = m + n - 1
        while(p1 >= 0 && p2 >= 0) {
            // 先賦值,再進行-1操作
            nums1[p--] = nums1[p1] < nums2[p2] ? nums2[p2--] : nums1[p1--]
        }
        // 可能nums2有剩餘,由於指針是下標,所以截取數量需要加1
        nums1.splice(0, p2 + 1, ...nums2.slice(0, p2 + 1))
    };

    結果:

    • 59/59 cases passed (52 ms)
    • Your runtime beats 99.76 % of javascript submissions
    • Your memory usage beats 78.3 % of javascript submissions (33.6 MB)
    • 時間複雜度 O(m + n)

    Ⅲ.合併後排序再賦值

    代碼:

    /**
     * @param {number[]} nums1
     * @param {number} m
     * @param {number[]} nums2
     * @param {number} n
     * @return {void} Do not return anything, modify nums1 in-place instead.
     */
    var merge = function(nums1, m, nums2, n) {
        arr = [].concat(nums1.splice(0, m), nums2.splice(0, n))
        arr.sort((a, b) => a - b)
        for(let i = 0; i < arr.length; i++) {
            nums1[i] = arr[i]
        }
    };

    結果:

    • 59/59 cases passed (64 ms)
    • Your runtime beats 90.11 % of javascript submissions
    • Your memory usage beats 31.21 % of javascript submissions (34.8 MB)
    • 時間複雜度 O(m + n)

    查閱他人解法

    這裏看到一個直接用兩次 while ,然後直接用 m/n 來計算下標的,沒有額外空間,但是本質上也是從后往前遍歷。

    Ⅰ.兩次while

    代碼:

    /**
     * @param {number[]} nums1
     * @param {number} m
     * @param {number[]} nums2
     * @param {number} n
     * @return {void} Do not return anything, modify nums1 in-place instead.
     */
    var merge = function(nums1, m, nums2, n) {
        // 避免 nums1 = [0,0,0,0], nums2 = [1,2] 這種 nums1.length > nums2.length 並且 m = 0
        // nums1.splice(m, nums1.length - m)
        // 從后開始賦值
        while(m !== 0 && n !== 0) {
            nums1[m + n - 1] = nums1[m - 1] > nums2[n - 1] ? nums1[--m] : nums2[--n]
        }
        // nums2 有剩餘
        while(n !== 0) {
            nums1[m + n - 1] = nums2[--n]
        }
    };

    結果:

    • 59/59 cases passed (56 ms)
    • Your runtime beats 99.16 % of javascript submissions
    • Your memory usage beats 64.26 % of javascript submissions (33.8 MB)
    • 時間複雜度 O(m + n)

    思考總結

    碰到數組操作,會優先考慮雙指針法,具體指針方向可以由題目邏輯來決定。

    (完)

    本文為原創文章,可能會更新知識點及修正錯誤,因此轉載請保留原出處,方便溯源,避免陳舊錯誤知識的誤導,同時有更好的閱讀體驗
    如果能給您帶去些許幫助,歡迎 ⭐️star 或 ️ fork
    (轉載請註明出處:https://chenjiahao.xyz)

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

    ※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

    ※Google地圖已可更新顯示潭子電動車充電站設置地點!!

    ※帶您來看台北網站建置台北網頁設計,各種案例分享

  • 新能源積分交易制度 或替代財政補貼

    新能源積分交易制度 或替代財政補貼

    新能源汽車業發展日益成熟之時,單純的財政補貼帶來的車企騙補等諸多負面效應逐漸顯現。對此,有專家提出建議採用積分交易機制替代財政補貼政策,促進車企在傳統燃油車節能和新能源車技術進步兩方面共同發展。  
      中國汽車技術研究中心新能源汽車積分政策負責人時間表示,相比真金白銀的財政補貼,新能源積分交易制度靈活性更高,是當下國家推動新能源產業發展的一種可行方式。   新能源積分交易制度,即政府將企業年度“零排放”車型的銷售情況記錄成積分,以積分為依據來考核企業在節能減排方面是否達標。若企業積分不達標,可以購買同行業其餘公司的積分,或者向政府繳納高額罰款。   中國若要實施積分交易制度 政府部門職責需合理分配。   為此,積分政策負責人時間給出幾點建議:作為政策的制定者政府,需要部門間進行合理的職責分配,物質保障方面,中國新能源基礎設施建設尚需進一步完善。意識形態方面,當前,消費者對新能源車的認識還不充足,對制度實施造成一定阻礙。企業方面,不同規模的企業須在政策上區別對待,為中小企業的發展提供空間;當企業規模有所變更時,應當提供相應的扶持政策。   文章來源:人民網

    本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包”嚨底家”

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

  • 國務院:外商可獨資製造新能源汽車動力電池等領域

    國務院:外商可獨資製造新能源汽車動力電池等領域

    7月19日,國務院下發第2016年第41號文件,關於在自由貿易試驗區暫時調整有關行政法規、國務院文件和經國務院批准的部門規章規定的決定。  
      相關自由貿易試驗區涉及上海市、天津市、廣東省、福建省四區域。國務院決定,在自由貿易試驗區暫時調整《中華人民共和國外資企業法實施細則》等18部行政法規、《國務院關於投資體制改革的決定》等4件國務院檔、《外商投資產業指導目錄(2015年修訂)》等4件經國務院批准的部門規章的有關規定。   據分析,此次重大調整專案涉及51項。其中放開合資門檻,允許外商以獨資形式從事生產經營活動的項目多達12項。在新能源汽車關鍵零部件及整車領域,涉及3項,具體如下:   1、允許外商以獨資形式從事能量型動力電池(能量密度≥110Wh,迴圈壽命≥2000次)的製造; 2、允許外商以獨資形式從事汽車電子匯流排網路技術、電動助力轉向系統電子控制器的製造與研發; 3、允許外商以獨資形式從事摩托車生產;   由此可見,在新能源汽車重要零部件動力電池領域,國外一線大廠終於擺脫了合資電池廠的固定模式,三星SDI、松下、LG化學等電池大頭將獲益。   文章來源:上海蓋世

    本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    ※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

    ※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

    ※Google地圖已可更新顯示潭子電動車充電站設置地點!!

    ※帶您來看台北網站建置台北網頁設計,各種案例分享

  • 星際爭霸2 AI開發(持續更新)

    星際爭霸2 AI開發(持續更新)

    準備

    我的環境是python3.6,sc2包0.11.1
    機器學習包下載鏈接:
    地圖下載鏈接
    pysc2是DeepMind開發的星際爭霸Ⅱ學習環境。 它是封裝星際爭霸Ⅱ機器學習API,同時也提供Python增強學習環境。
    以神族為例編寫代碼,神族建築科技圖如下:

    採礦

    # -*- encoding: utf-8 -*-
    '''
    @File    :   __init__.py.py    
    @Modify Time      @Author       @Desciption
    ------------      -------       -----------
    2019/11/3 12:32   Jonas           None
    '''
    
    import sc2
    from sc2 import run_game, maps, Race, Difficulty
    from sc2.player import Bot, Computer
    
    
    class SentdeBot(sc2.BotAI):
        async def on_step(self, iteration: int):
            await self.distribute_workers()
    
    
    run_game(maps.get("AcidPlantLE"), [
        Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Easy)
    ],realtime = True)

    注意
    game_data.py的assert self.id != 0註釋掉
    pixel_map.py的assert self.bits_per_pixel % 8 == 0, "Unsupported pixel density"註釋掉
    否則會報錯

    運行結果如下,农民開始採礦

    可以正常採礦

    建造农民和水晶塔

    import sc2
    from sc2 import run_game, maps, Race, Difficulty
    from sc2.player import Bot, Computer
    from sc2.constants import *
    
    
    class SentdeBot(sc2.BotAI):
        async def on_step(self, iteration: int):
            await self.distribute_workers()
            await self.build_workers()
            await self.build_pylons()
    
        # 建造农民
        async def build_workers(self):
            # 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於佔用資源
            for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
                # 是否有50晶體礦
                if self.can_afford(UnitTypeId.PROBE):
                    await self.do(nexus.train(UnitTypeId.PROBE))
    
        ## 建造水晶
        async def build_pylons(self):
            ## 供應人口和現有人口之差小於5且水晶不是正在建造
            if self.supply_left<5 and not self.already_pending(UnitTypeId.PYLON):
                nexuses = self.units(UnitTypeId.NEXUS).ready
                if nexuses.exists:
                    if self.can_afford(UnitTypeId.PYLON):
                        await self.build(UnitTypeId.PYLON,near=nexuses.first)
    
    ## 啟動遊戲
    run_game(maps.get("AcidPlantLE"), [
        Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Easy)
    ],realtime = True)
    

    運行結果如下,基地造农民,农民造水晶

    收集氣體和開礦

    代碼如下

    import sc2
    from sc2 import run_game, maps, Race, Difficulty
    from sc2.player import Bot, Computer
    from sc2.constants import *
    
    
    class SentdeBot(sc2.BotAI):
        async def on_step(self, iteration: int):
            await self.distribute_workers()
            await self.build_workers()
            await self.build_pylons()
            await self.build_assimilators()
            await self.expand()
    
        # 建造农民
        async def build_workers(self):
            # 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於佔用資源
            for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
                # 是否有50晶體礦
                if self.can_afford(UnitTypeId.PROBE):
                    await self.do(nexus.train(UnitTypeId.PROBE))
    
        ## 建造水晶
        async def build_pylons(self):
            ## 供應人口和現有人口之差小於5且建築不是正在建造
            if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
                nexuses = self.units(UnitTypeId.NEXUS).ready
                if nexuses.exists:
                    if self.can_afford(UnitTypeId.PYLON):
                        await self.build(UnitTypeId.PYLON, near=nexuses.first)
    
        ## 建造吸收廠
        async def build_assimilators(self):
            for nexus in self.units(UnitTypeId.NEXUS).ready:
                # 在瓦斯泉上建造吸收廠
                vaspenes = self.state.vespene_geyser.closer_than(15.0,nexus)
                for vaspene in vaspenes:
                    if not self.can_afford(UnitTypeId.ASSIMILATOR):
                        break
                    worker = self.select_build_worker(vaspene.position)
                    if worker is None:
                        break
                    if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0,vaspene).exists:
                        await self.do(worker.build(UnitTypeId.ASSIMILATOR,vaspene))
    
        ## 開礦
        async def expand(self):
            if self.units(UnitTypeId.NEXUS).amount<3 and self.can_afford(UnitTypeId.NEXUS):
                await self.expand_now()
    
    ## 啟動遊戲
    run_game(maps.get("AcidPlantLE"), [
        Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Easy)
    ], realtime=False)
    

    run_game的realtime設置成False,可以在加速模式下運行遊戲。
    運行效果如下:

    可以建造吸收廠和開礦

    建造軍隊

    import sc2
    from sc2 import run_game, maps, Race, Difficulty
    from sc2.player import Bot, Computer
    from sc2.constants import *
    
    
    class SentdeBot(sc2.BotAI):
        async def on_step(self, iteration: int):
            await self.distribute_workers()
            await self.build_workers()
            await self.build_pylons()
            await self.build_assimilators()
            await self.expand()
            await self.offensive_force_buildings()
            await self.build_offensive_force()
    
        # 建造农民
        async def build_workers(self):
            # 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於佔用資源
            for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
                # 是否有50晶體礦
                if self.can_afford(UnitTypeId.PROBE):
                    await self.do(nexus.train(UnitTypeId.PROBE))
    
        ## 建造水晶
        async def build_pylons(self):
            ## 供應人口和現有人口之差小於5且建築不是正在建造
            if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
                nexuses = self.units(UnitTypeId.NEXUS).ready
                if nexuses.exists:
                    if self.can_afford(UnitTypeId.PYLON):
                        await self.build(UnitTypeId.PYLON, near=nexuses.first)
    
        ## 建造吸收廠
        async def build_assimilators(self):
            for nexus in self.units(UnitTypeId.NEXUS).ready:
                # 在瓦斯泉上建造吸收廠
                vaspenes = self.state.vespene_geyser.closer_than(15.0,nexus)
                for vaspene in vaspenes:
                    if not self.can_afford(UnitTypeId.ASSIMILATOR):
                        break
                    worker = self.select_build_worker(vaspene.position)
                    if worker is None:
                        break
                    if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0,vaspene).exists:
                        await self.do(worker.build(UnitTypeId.ASSIMILATOR,vaspene))
    
        ## 開礦
        async def expand(self):
            if self.units(UnitTypeId.NEXUS).amount<2 and self.can_afford(UnitTypeId.NEXUS):
                await self.expand_now()
    
        ## 建造進攻性建築
        async def offensive_force_buildings(self):
            if self.units(UnitTypeId.PYLON).ready.exists:
                pylon = self.units(UnitTypeId.PYLON).ready.random
                if self.units(UnitTypeId.PYLON).ready.exists:
                    # 根據神族建築科技圖,折躍門建造過後才可以建造控制核心
                    if self.units(UnitTypeId.GATEWAY).ready.exists:
                        if not self.units(UnitTypeId.CYBERNETICSCORE):
                            if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
                                await self.build(UnitTypeId.CYBERNETICSCORE,near = pylon)
                    # 否則建造折躍門
                    else:
                        if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
                            await self.build(UnitTypeId.GATEWAY,near=pylon)
    
        # 造兵
        async def build_offensive_force(self):
            # 無隊列化建造
            for gw in self.units(UnitTypeId.GATEWAY).ready.noqueue:
                if self.can_afford(UnitTypeId.STALKER) and self.supply_left>0:
                    await self.do(gw.train(UnitTypeId.STALKER))
    
    
    
    ## 啟動遊戲
    run_game(maps.get("AcidPlantLE"), [
        Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Easy)
    ], realtime=False)
    

    運行結果如下:

    可以看到,我們建造了折躍門和控制核心並訓練了追獵者

    控制部隊進攻

    代碼如下

    
    import sc2
    from sc2 import run_game, maps, Race, Difficulty
    from sc2.player import Bot, Computer
    from sc2.constants import *
    import random
    
    class SentdeBot(sc2.BotAI):
        async def on_step(self, iteration: int):
            await self.distribute_workers()
            await self.build_workers()
            await self.build_pylons()
            await self.build_assimilators()
            await self.expand()
            await self.offensive_force_buildings()
            await self.build_offensive_force()
            await self.attack()
    
        # 建造农民
        async def build_workers(self):
            # 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於佔用資源
            for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
                # 是否有50晶體礦
                if self.can_afford(UnitTypeId.PROBE):
                    await self.do(nexus.train(UnitTypeId.PROBE))
    
        ## 建造水晶
        async def build_pylons(self):
            ## 供應人口和現有人口之差小於5且建築不是正在建造
            if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
                nexuses = self.units(UnitTypeId.NEXUS).ready
                if nexuses.exists:
                    if self.can_afford(UnitTypeId.PYLON):
                        await self.build(UnitTypeId.PYLON, near=nexuses.first)
    
        ## 建造吸收廠
        async def build_assimilators(self):
            for nexus in self.units(UnitTypeId.NEXUS).ready:
                # 在瓦斯泉上建造吸收廠
                vaspenes = self.state.vespene_geyser.closer_than(15.0,nexus)
                for vaspene in vaspenes:
                    if not self.can_afford(UnitTypeId.ASSIMILATOR):
                        break
                    worker = self.select_build_worker(vaspene.position)
                    if worker is None:
                        break
                    if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0,vaspene).exists:
                        await self.do(worker.build(UnitTypeId.ASSIMILATOR,vaspene))
    
        ## 開礦
        async def expand(self):
            if self.units(UnitTypeId.NEXUS).amount<3 and self.can_afford(UnitTypeId.NEXUS):
                await self.expand_now()
    
        ## 建造進攻性建築
        async def offensive_force_buildings(self):
            if self.units(UnitTypeId.PYLON).ready.exists:
                pylon = self.units(UnitTypeId.PYLON).ready.random
                # 根據神族建築科技圖,折躍門建造過後才可以建造控制核心
                if self.units(UnitTypeId.GATEWAY).ready.exists and not self.units(UnitTypeId.CYBERNETICSCORE):
                    if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
                        await self.build(UnitTypeId.CYBERNETICSCORE,near = pylon)
                # 否則建造折躍門
                elif len(self.units(UnitTypeId.GATEWAY))<=3:
                    if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
                        await self.build(UnitTypeId.GATEWAY,near=pylon)
    
        ## 造兵
        async def build_offensive_force(self):
            # 無隊列化建造
            for gw in self.units(UnitTypeId.GATEWAY).ready.noqueue:
                if self.can_afford(UnitTypeId.STALKER) and self.supply_left>0:
                    await self.do(gw.train(UnitTypeId.STALKER))
    
        ## 尋找目標
        def find_target(self,state):
            if len(self.known_enemy_units)>0:
                # 隨機選取敵方單位
                return random.choice(self.known_enemy_units)
            elif len(self.known_enemy_units)>0:
                # 隨機選取敵方建築
                return random.choice(self.known_enemy_structures)
            else:
                # 返回敵方出生點位
                return self.enemy_start_locations[0]
    
        ## 進攻
        async def attack(self):
            # 追獵者數量超過15個開始進攻
            if self.units(UnitTypeId.STALKER).amount>15:
                for s in self.units(UnitTypeId.STALKER).idle:
                    await self.do(s.attack(self.find_target(self.state)))
    
            # 防衛模式:視野範圍內存在敵人,開始攻擊
            if self.units(UnitTypeId.STALKER).amount>5:
                if len(self.known_enemy_units)>0:
                    for s in self.units(UnitTypeId.STALKER).idle:
                        await self.do(s.attack(random.choice(self.known_enemy_units)))
    
    ## 啟動遊戲
    run_game(maps.get("AcidPlantLE"), [
        Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Medium)
    ], realtime=False)
    

    運行結果如下

    可以看到,4個折躍門訓練追獵者並發動進攻。

    擊敗困難電腦

    我們目前的代碼只能擊敗中等和簡單電腦,那麼如何擊敗困難電腦呢?
    代碼如下

    
    import sc2
    from sc2 import run_game, maps, Race, Difficulty
    from sc2.player import Bot, Computer
    from sc2.constants import *
    import random
    
    
    class SentdeBot(sc2.BotAI):
        def __init__(self):
            # 經過計算,每分鐘大約165迭代次數
            self.ITERATIONS_PER_MINUTE = 165
            # 最大农民數量
            self.MAX_WORKERS = 65
    
        async def on_step(self, iteration: int):
            self.iteration = iteration
            await self.distribute_workers()
            await self.build_workers()
            await self.build_pylons()
            await self.build_assimilators()
            await self.expand()
            await self.offensive_force_buildings()
            await self.build_offensive_force()
            await self.attack()
    
        # 建造农民
        async def build_workers(self):
            # 星靈樞鈕*16(一個基地配備16個农民)大於農民數量並且現有农民數量小於MAX_WORKERS
            if len(self.units(UnitTypeId.NEXUS))*16>len(self.units(UnitTypeId.PROBE)) and len(self.units(UnitTypeId.PROBE))<self.MAX_WORKERS:
                    # 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於佔用資源
                    for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
                        # 是否有50晶體礦建造农民
                        if self.can_afford(UnitTypeId.PROBE):
                            await self.do(nexus.train(UnitTypeId.PROBE))
    
        ## 建造水晶
        async def build_pylons(self):
            ## 供應人口和現有人口之差小於5且建築不是正在建造
            if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
                nexuses = self.units(UnitTypeId.NEXUS).ready
                if nexuses.exists:
                    if self.can_afford(UnitTypeId.PYLON):
                        await self.build(UnitTypeId.PYLON, near=nexuses.first)
    
        ## 建造吸收廠
        async def build_assimilators(self):
            for nexus in self.units(UnitTypeId.NEXUS).ready:
                # 在瓦斯泉上建造吸收廠
                vaspenes = self.state.vespene_geyser.closer_than(15.0,nexus)
                for vaspene in vaspenes:
                    if not self.can_afford(UnitTypeId.ASSIMILATOR):
                        break
                    worker = self.select_build_worker(vaspene.position)
                    if worker is None:
                        break
                    if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0,vaspene).exists:
                        await self.do(worker.build(UnitTypeId.ASSIMILATOR,vaspene))
    
        ## 開礦
        async def expand(self):
            # (self.iteration / self.ITERATIONS_PER_MINUTE)是一個緩慢遞增的值,動態開礦
            if self.units(UnitTypeId.NEXUS).amount<self.iteration / self.ITERATIONS_PER_MINUTE and self.can_afford(UnitTypeId.NEXUS):
                await self.expand_now()
    
        ## 建造進攻性建築
        async def offensive_force_buildings(self):
            print(self.iteration / self.ITERATIONS_PER_MINUTE)
            if self.units(UnitTypeId.PYLON).ready.exists:
                pylon = self.units(UnitTypeId.PYLON).ready.random
                # 根據神族建築科技圖,折躍門建造過後才可以建造控制核心
                if self.units(UnitTypeId.GATEWAY).ready.exists and not self.units(UnitTypeId.CYBERNETICSCORE):
                    if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
                        await self.build(UnitTypeId.CYBERNETICSCORE, near=pylon)
                # 否則建造折躍門
                # (self.iteration / self.ITERATIONS_PER_MINUTE)/2 是一個緩慢遞增的值
                elif len(self.units(UnitTypeId.GATEWAY)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
                    if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
                        await self.build(UnitTypeId.GATEWAY, near=pylon)
                # 控制核心存在的情況下建造星門
                if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
                    if len(self.units(UnitTypeId.STARGATE)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
                        if self.can_afford(UnitTypeId.STARGATE) and not self.already_pending(UnitTypeId.STARGATE):
                            await self.build(UnitTypeId.STARGATE, near=pylon)
    
        ## 造兵
        async def build_offensive_force(self):
            # 無隊列化建造
            for gw in self.units(UnitTypeId.GATEWAY).ready.noqueue:
                if not self.units(UnitTypeId.STALKER).amount > self.units(UnitTypeId.VOIDRAY).amount:
    
                    if self.can_afford(UnitTypeId.STALKER) and self.supply_left > 0:
                        await self.do(gw.train(UnitTypeId.STALKER))
    
            for sg in self.units(UnitTypeId.STARGATE).ready.noqueue:
                if self.can_afford(UnitTypeId.VOIDRAY) and self.supply_left > 0:
                    await self.do(sg.train(UnitTypeId.VOIDRAY))
    
        ## 尋找目標
        def find_target(self,state):
            if len(self.known_enemy_units)>0:
                # 隨機選取敵方單位
                return random.choice(self.known_enemy_units)
            elif len(self.known_enemy_units)>0:
                # 隨機選取敵方建築
                return random.choice(self.known_enemy_structures)
            else:
                # 返回敵方出生點位
                return self.enemy_start_locations[0]
    
        ## 進攻
        async def attack(self):
            # {UNIT: [n to fight, n to defend]}
            aggressive_units = {UnitTypeId.STALKER: [15, 5],
                                UnitTypeId.VOIDRAY: [8, 3]}
    
            for UNIT in aggressive_units:
                # 攻擊模式
                if self.units(UNIT).amount > aggressive_units[UNIT][0] and self.units(UNIT).amount > aggressive_units[UNIT][
                    1]:
                    for s in self.units(UNIT).idle:
                        await self.do(s.attack(self.find_target(self.state)))
                # 防衛模式
                elif self.units(UNIT).amount > aggressive_units[UNIT][1]:
                    if len(self.known_enemy_units) > 0:
                        for s in self.units(UNIT).idle:
                            await self.do(s.attack(random.choice(self.known_enemy_units)))
    ## 啟動遊戲
    run_game(maps.get("AcidPlantLE"), [
        Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Hard)
    ], realtime=False)
    

    運行結果如下

    可以看到,擊敗了困難人族電腦,但是電腦選擇了rush戰術,我們寫得AI腳本會輸掉遊戲。顯然,這不是最佳方案。
    “只有AI才能拯救我的勝率”,請看下文。

    採集地圖數據

    這次我們只造一個折躍門,全力通過星門造虛空光輝艦
    修改offensive_force_buildings(self)方法的判斷

    elif len(self.units(GATEWAY)) < 1:
                    if self.can_afford(GATEWAY) and not self.already_pending(GATEWAY):
                        await self.build(GATEWAY, near=pylon)

    註釋或者刪除build_offensive_force(self)的建造追獵者的代碼

            ## 造兵
        async def build_offensive_force(self):
            # 無隊列化建造
            # for gw in self.units(UnitTypeId.GATEWAY).ready.noqueue:
            #     if not self.units(UnitTypeId.STALKER).amount > self.units(UnitTypeId.VOIDRAY).amount:
            #
            #         if self.can_afford(UnitTypeId.STALKER) and self.supply_left > 0:
            #             await self.do(gw.train(UnitTypeId.STALKER))
    
            for sg in self.units(UnitTypeId.STARGATE).ready.noqueue:
                if self.can_afford(UnitTypeId.VOIDRAY) and self.supply_left > 0:
                    await self.do(sg.train(UnitTypeId.VOIDRAY))

    attack(self)中的aggressive_units註釋掉Stalker
    導入numpy和cv2庫

    game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)

    建立以地圖Heigt為行,Width為列的三維矩陣

    for nexus in self.units(NEXUS):
                nex_pos = nexus.position
                print(nex_pos)
                cv2.circle(game_data, (int(nex_pos[0]), int(nex_pos[1])), 10, (0, 255, 0), -1)  # BGR

    遍歷星靈樞紐,獲取下一個位置,畫圓,circle(承載圓的img, 圓心, 半徑, 顏色, thickness=-1表示填充)
    接下來我們要垂直翻轉三維矩陣,因為我們建立的矩陣左上角是原點(0,0),縱坐標向下延申,橫坐標向右延申。翻轉之後就成了正常的坐標系。

    flipped = cv2.flip(game_data, 0)

    圖像縮放,達到可視化最佳。

            resized = cv2.resize(flipped, dsize=None, fx=2, fy=2)
            cv2.imshow('Intel', resized)
            cv2.waitKey(1)

    至此,完整代碼如下

    import sc2
    from sc2 import run_game, maps, Race, Difficulty
    from sc2.player import Bot, Computer
    from sc2.constants import *
    import random
    import numpy as np
    import cv2
    
    
    class SentdeBot(sc2.BotAI):
        def __init__(self):
            # 經過計算,每分鐘大約165迭代次數
            self.ITERATIONS_PER_MINUTE = 165
            # 最大农民數量
            self.MAX_WORKERS = 65
    
        async def on_step(self, iteration: int):
            self.iteration = iteration
            await self.distribute_workers()
            await self.build_workers()
            await self.build_pylons()
            await self.build_assimilators()
            await self.expand()
            await self.offensive_force_buildings()
            await self.build_offensive_force()
            await self.intel()
            await self.attack()
    
        async def intel(self):
            # 根據地圖建立的三維矩陣
            game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)
            for nexus in self.units(UnitTypeId.NEXUS):
                nex_pos = nexus.position
                # circle(承載圓的img, 圓心, 半徑, 顏色, thickness=-1表示填充)
                # 記錄星靈樞紐的位置
                cv2.circle(game_data, (int(nex_pos[0]), int(nex_pos[1])), 10, (0, 255, 0), -1)
            # 圖像翻轉垂直鏡像
            flipped = cv2.flip(game_data, 0)
            # 圖像縮放
            # cv2.resize(原圖像,輸出圖像的大小,width方向的縮放比例,height方向縮放的比例)
            resized = cv2.resize(flipped, dsize=None, fx=2, fy=2)
            cv2.imshow('Intel', resized)
    
            # cv2.waitKey(每Xms刷新圖像)
            cv2.waitKey(1)
    
        # 建造农民
        async def build_workers(self):
            # 星靈樞鈕*16(一個基地配備16個农民)大於農民數量並且現有农民數量小於MAX_WORKERS
            if len(self.units(UnitTypeId.NEXUS)) * 16 > len(self.units(UnitTypeId.PROBE)) and len(
                    self.units(UnitTypeId.PROBE)) < self.MAX_WORKERS:
                # 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於佔用資源
                for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
                    # 是否有50晶體礦建造农民
                    if self.can_afford(UnitTypeId.PROBE):
                        await self.do(nexus.train(UnitTypeId.PROBE))
    
        ## 建造水晶
        async def build_pylons(self):
            ## 供應人口和現有人口之差小於5且建築不是正在建造
            if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
                nexuses = self.units(UnitTypeId.NEXUS).ready
                if nexuses.exists:
                    if self.can_afford(UnitTypeId.PYLON):
                        await self.build(UnitTypeId.PYLON, near=nexuses.first)
    
        ## 建造吸收廠
        async def build_assimilators(self):
            for nexus in self.units(UnitTypeId.NEXUS).ready:
                # 在瓦斯泉上建造吸收廠
                vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus)
                for vaspene in vaspenes:
                    if not self.can_afford(UnitTypeId.ASSIMILATOR):
                        break
                    worker = self.select_build_worker(vaspene.position)
                    if worker is None:
                        break
                    if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0, vaspene).exists:
                        await self.do(worker.build(UnitTypeId.ASSIMILATOR, vaspene))
    
        ## 開礦
        async def expand(self):
            # (self.iteration / self.ITERATIONS_PER_MINUTE)是一個緩慢遞增的值,動態開礦
            if self.units(UnitTypeId.NEXUS).amount < self.iteration / self.ITERATIONS_PER_MINUTE and self.can_afford(
                    UnitTypeId.NEXUS):
                await self.expand_now()
    
        ## 建造進攻性建築
        async def offensive_force_buildings(self):
            print(self.iteration / self.ITERATIONS_PER_MINUTE)
            if self.units(UnitTypeId.PYLON).ready.exists:
                pylon = self.units(UnitTypeId.PYLON).ready.random
                # 根據神族建築科技圖,折躍門建造過後才可以建造控制核心
                if self.units(UnitTypeId.GATEWAY).ready.exists and not self.units(UnitTypeId.CYBERNETICSCORE):
                    if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
                        await self.build(UnitTypeId.CYBERNETICSCORE, near=pylon)
                # 否則建造折躍門
                # (self.iteration / self.ITERATIONS_PER_MINUTE)/2 是一個緩慢遞增的值
                # elif len(self.units(UnitTypeId.GATEWAY)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
                elif len(self.units(UnitTypeId.GATEWAY)) < 1:
                    if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
                        await self.build(UnitTypeId.GATEWAY, near=pylon)
                # 控制核心存在的情況下建造星門
                if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
                    if len(self.units(UnitTypeId.STARGATE)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
                        if self.can_afford(UnitTypeId.STARGATE) and not self.already_pending(UnitTypeId.STARGATE):
                            await self.build(UnitTypeId.STARGATE, near=pylon)
    
        ## 造兵
        async def build_offensive_force(self):
            # 無隊列化建造
            for sg in self.units(UnitTypeId.STARGATE).ready.noqueue:
                if self.can_afford(UnitTypeId.VOIDRAY) and self.supply_left > 0:
                    await self.do(sg.train(UnitTypeId.VOIDRAY))
    
        ## 尋找目標
        def find_target(self, state):
            if len(self.known_enemy_units) > 0:
                # 隨機選取敵方單位
                return random.choice(self.known_enemy_units)
            elif len(self.known_enemy_units) > 0:
                # 隨機選取敵方建築
                return random.choice(self.known_enemy_structures)
            else:
                # 返回敵方出生點位
                return self.enemy_start_locations[0]
    
        ## 進攻
        async def attack(self):
            # {UNIT: [n to fight, n to defend]}
            aggressive_units = {UnitTypeId.VOIDRAY: [8, 3]}
    
            for UNIT in aggressive_units:
                # 攻擊模式
                if self.units(UNIT).amount > aggressive_units[UNIT][0] and self.units(UNIT).amount > aggressive_units[UNIT][1]:
                    for s in self.units(UNIT).idle:
                        await self.do(s.attack(self.find_target(self.state)))
                # 防衛模式
                elif self.units(UNIT).amount > aggressive_units[UNIT][1]:
                    if len(self.known_enemy_units) > 0:
                        for s in self.units(UNIT).idle:
                            await self.do(s.attack(random.choice(self.known_enemy_units)))
    
    
    ## 啟動遊戲
    run_game(maps.get("AcidPlantLE"), [
        Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Hard)
    ], realtime=False)
    

    運行結果如下

    採集到了地圖位置。

    偵察

    在intel(self)里創建一個字典draw_dict,UnitTypeId作為key,半徑和顏色是value

    
            draw_dict = {
                UnitTypeId.NEXUS: [15, (0, 255, 0)],
                UnitTypeId.PYLON: [3, (20, 235, 0)],
                UnitTypeId.PROBE: [1, (55, 200, 0)],
                UnitTypeId.ASSIMILATOR: [2, (55, 200, 0)],
                UnitTypeId.GATEWAY: [3, (200, 100, 0)],
                UnitTypeId.CYBERNETICSCORE: [3, (150, 150, 0)],
                UnitTypeId.STARGATE: [5, (255, 0, 0)],
                UnitTypeId.ROBOTICSFACILITY: [5, (215, 155, 0)],
    
                UnitTypeId.VOIDRAY: [3, (255, 100, 0)]
            }

    迭代同上

    for unit_type in draw_dict:
                for unit in self.units(unit_type).ready:
                    pos = unit.position
                    cv2.circle(game_data, (int(pos[0]), int(pos[1])), draw_dict[unit_type][0], draw_dict[unit_type][1], -1)

    存儲三族的主基地名稱(星靈樞紐,指揮中心,孵化場),刻畫敵方建築。

    # 主基地名稱
            main_base_names = ["nexus", "supplydepot", "hatchery"]
            # 記錄敵方基地位置
            for enemy_building in self.known_enemy_structures:
                pos = enemy_building.position
                if enemy_building.name.lower() not in main_base_names:
                    cv2.circle(game_data, (int(pos[0]), int(pos[1])), 5, (200, 50, 212), -1)
            for enemy_building in self.known_enemy_structures:
                pos = enemy_building.position
                if enemy_building.name.lower() in main_base_names:
                    cv2.circle(game_data, (int(pos[0]), int(pos[1])), 15, (0, 0, 255), -1)

    刻畫敵方單位,如果是农民畫得小些,其他單位則畫大些。

            for enemy_unit in self.known_enemy_units:
    
                if not enemy_unit.is_structure:
                    worker_names = ["probe", "scv", "drone"]
                    # if that unit is a PROBE, SCV, or DRONE... it's a worker
                    pos = enemy_unit.position
                    if enemy_unit.name.lower() in worker_names:
                        cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (55, 0, 155), -1)
                    else:
                        cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (50, 0, 215), -1)

    在offensive_force_buildings(self)方法中添加建造机械台

                if self.units(CYBERNETICSCORE).ready.exists:
                    if len(self.units(ROBOTICSFACILITY)) < 1:
                        if self.can_afford(ROBOTICSFACILITY) and not self.already_pending(ROBOTICSFACILITY):
                            await self.build(ROBOTICSFACILITY, near=pylon)

    創建scout(),訓練Observer

    async def scout(self):
            if len(self.units(OBSERVER)) > 0:
                scout = self.units(OBSERVER)[0]
                if scout.is_idle:
                    enemy_location = self.enemy_start_locations[0]
                    move_to = self.random_location_variance(enemy_location)
                    print(move_to)
                    await self.do(scout.move(move_to))
    
            else:
                for rf in self.units(ROBOTICSFACILITY).ready.noqueue:
                    if self.can_afford(OBSERVER) and self.supply_left > 0:
                        await self.do(rf.train(OBSERVER))

    生成隨機位置,很簡單。意思是橫坐標累計遞增-0.2和0.2倍的橫坐標,限制條件為如果x超過橫坐標,那麼就是橫坐標最大值。
    縱坐標同理。

        def random_location_variance(self, enemy_start_location):
            x = enemy_start_location[0]
            y = enemy_start_location[1]
    
            x += ((random.randrange(-20, 20))/100) * enemy_start_location[0]
            y += ((random.randrange(-20, 20))/100) * enemy_start_location[1]
    
            if x < 0:
                x = 0
            if y < 0:
                y = 0
            if x > self.game_info.map_size[0]:
                x = self.game_info.map_size[0]
            if y > self.game_info.map_size[1]:
                y = self.game_info.map_size[1]
    
            go_to = position.Point2(position.Pointlike((x,y)))
            return go_to

    完整代碼如下

    # -*- encoding: utf-8 -*-
    '''
    @File    :   demo.py
    @Modify Time      @Author       @Desciption
    ------------      -------       -----------
    2019/11/3 12:32   Jonas           None
    '''
    
    import sc2
    from sc2 import run_game, maps, Race, Difficulty, position
    from sc2.player import Bot, Computer
    from sc2.constants import *
    import random
    import numpy as np
    import cv2
    
    
    class SentdeBot(sc2.BotAI):
        def __init__(self):
            # 經過計算,每分鐘大約165迭代次數
            self.ITERATIONS_PER_MINUTE = 165
            # 最大农民數量
            self.MAX_WORKERS = 50
    
        async def on_step(self, iteration: int):
            self.iteration = iteration
            await self.scout()
            await self.distribute_workers()
            await self.build_workers()
            await self.build_pylons()
            await self.build_assimilators()
            await self.expand()
            await self.offensive_force_buildings()
            await self.build_offensive_force()
            await self.intel()
            await self.attack()
    
        ## 偵察
        async def scout(self):
            if len(self.units(UnitTypeId.OBSERVER)) > 0:
                scout = self.units(UnitTypeId.OBSERVER)[0]
                if scout.is_idle:
                    enemy_location = self.enemy_start_locations[0]
                    move_to = self.random_location_variance(enemy_location)
                    print(move_to)
                    await self.do(scout.move(move_to))
    
            else:
                for rf in self.units(UnitTypeId.ROBOTICSFACILITY).ready.noqueue:
                    if self.can_afford(UnitTypeId.OBSERVER) and self.supply_left > 0:
                        await self.do(rf.train(UnitTypeId.OBSERVER))
    
        async def intel(self):
            game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)
    
            # UnitTypeId作為key,半徑和顏色是value
            draw_dict = {
                UnitTypeId.NEXUS: [15, (0, 255, 0)],
                UnitTypeId.PYLON: [3, (20, 235, 0)],
                UnitTypeId.PROBE: [1, (55, 200, 0)],
                UnitTypeId.ASSIMILATOR: [2, (55, 200, 0)],
                UnitTypeId.GATEWAY: [3, (200, 100, 0)],
                UnitTypeId.CYBERNETICSCORE: [3, (150, 150, 0)],
                UnitTypeId.STARGATE: [5, (255, 0, 0)],
                UnitTypeId.ROBOTICSFACILITY: [5, (215, 155, 0)],
    
                UnitTypeId.VOIDRAY: [3, (255, 100, 0)],
                # OBSERVER: [3, (255, 255, 255)],
            }
    
            for unit_type in draw_dict:
                for unit in self.units(unit_type).ready:
                    pos = unit.position
                    cv2.circle(game_data, (int(pos[0]), int(pos[1])), draw_dict[unit_type][0], draw_dict[unit_type][1], -1)
    
            # 主基地名稱
            main_base_names = ["nexus", "supplydepot", "hatchery"]
            # 記錄敵方基地位置
            for enemy_building in self.known_enemy_structures:
                pos = enemy_building.position
                # 不是主基地建築,畫小一些
                if enemy_building.name.lower() not in main_base_names:
                    cv2.circle(game_data, (int(pos[0]), int(pos[1])), 5, (200, 50, 212), -1)
            for enemy_building in self.known_enemy_structures:
                pos = enemy_building.position
                if enemy_building.name.lower() in main_base_names:
                    cv2.circle(game_data, (int(pos[0]), int(pos[1])), 15, (0, 0, 255), -1)
    
            for enemy_unit in self.known_enemy_units:
    
                if not enemy_unit.is_structure:
                    worker_names = ["probe", "scv", "drone"]
                    # if that unit is a PROBE, SCV, or DRONE... it's a worker
                    pos = enemy_unit.position
                    if enemy_unit.name.lower() in worker_names:
                        cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (55, 0, 155), -1)
                    else:
                        cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (50, 0, 215), -1)
    
            for obs in self.units(UnitTypeId.OBSERVER).ready:
                pos = obs.position
                cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (255, 255, 255), -1)
    
            # flip horizontally to make our final fix in visual representation:
            flipped = cv2.flip(game_data, 0)
            resized = cv2.resize(flipped, dsize=None, fx=2, fy=2)
    
            cv2.imshow('Intel', resized)
            cv2.waitKey(1)
    
        def random_location_variance(self, enemy_start_location):
            x = enemy_start_location[0]
            y = enemy_start_location[1]
    
            x += ((random.randrange(-20, 20)) / 100) * enemy_start_location[0]
            y += ((random.randrange(-20, 20)) / 100) * enemy_start_location[1]
    
            if x < 0:
                x = 0
            if y < 0:
                y = 0
            if x > self.game_info.map_size[0]:
                x = self.game_info.map_size[0]
            if y > self.game_info.map_size[1]:
                y = self.game_info.map_size[1]
    
            go_to = position.Point2(position.Pointlike((x, y)))
            return go_to
    
        # 建造农民
        async def build_workers(self):
            # 星靈樞鈕*16(一個基地配備16個农民)大於農民數量並且現有农民數量小於MAX_WORKERS
            if len(self.units(UnitTypeId.NEXUS)) * 16 > len(self.units(UnitTypeId.PROBE)) and len(
                    self.units(UnitTypeId.PROBE)) < self.MAX_WORKERS:
                # 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於佔用資源
                for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
                    # 是否有50晶體礦建造农民
                    if self.can_afford(UnitTypeId.PROBE):
                        await self.do(nexus.train(UnitTypeId.PROBE))
    
        ## 建造水晶
        async def build_pylons(self):
            ## 供應人口和現有人口之差小於5且建築不是正在建造
            if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
                nexuses = self.units(UnitTypeId.NEXUS).ready
                if nexuses.exists:
                    if self.can_afford(UnitTypeId.PYLON):
                        await self.build(UnitTypeId.PYLON, near=nexuses.first)
    
        ## 建造吸收廠
        async def build_assimilators(self):
            for nexus in self.units(UnitTypeId.NEXUS).ready:
                # 在瓦斯泉上建造吸收廠
                vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus)
                for vaspene in vaspenes:
                    if not self.can_afford(UnitTypeId.ASSIMILATOR):
                        break
                    worker = self.select_build_worker(vaspene.position)
                    if worker is None:
                        break
                    if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0, vaspene).exists:
                        await self.do(worker.build(UnitTypeId.ASSIMILATOR, vaspene))
    
        ## 開礦
        async def expand(self):
            # (self.iteration / self.ITERATIONS_PER_MINUTE)是一個緩慢遞增的值,動態開礦
            if self.units(UnitTypeId.NEXUS).amount < self.iteration / self.ITERATIONS_PER_MINUTE and self.can_afford(
                    UnitTypeId.NEXUS):
                await self.expand_now()
    
        ## 建造進攻性建築
        async def offensive_force_buildings(self):
            print(self.iteration / self.ITERATIONS_PER_MINUTE)
            if self.units(UnitTypeId.PYLON).ready.exists:
                pylon = self.units(UnitTypeId.PYLON).ready.random
                # 根據神族建築科技圖,折躍門建造過後才可以建造控制核心
                if self.units(UnitTypeId.GATEWAY).ready.exists and not self.units(UnitTypeId.CYBERNETICSCORE):
                    if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
                        await self.build(UnitTypeId.CYBERNETICSCORE, near=pylon)
                # 否則建造折躍門
                # (self.iteration / self.ITERATIONS_PER_MINUTE)/2 是一個緩慢遞增的值
                # elif len(self.units(UnitTypeId.GATEWAY)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
                elif len(self.units(UnitTypeId.GATEWAY)) < 1:
                    if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
                        await self.build(UnitTypeId.GATEWAY, near=pylon)
                # 控制核心存在的情況下建造机械台
                if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
                    if len(self.units(UnitTypeId.ROBOTICSFACILITY)) < 1:
                        if self.can_afford(UnitTypeId.ROBOTICSFACILITY) and not self.already_pending(
                                UnitTypeId.ROBOTICSFACILITY):
                            await self.build(UnitTypeId.ROBOTICSFACILITY, near=pylon)
    
                # 控制核心存在的情況下建造星門
                if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
                    if len(self.units(UnitTypeId.STARGATE)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
                        if self.can_afford(UnitTypeId.STARGATE) and not self.already_pending(UnitTypeId.STARGATE):
                            await self.build(UnitTypeId.STARGATE, near=pylon)
    
        ## 造兵
        async def build_offensive_force(self):
            # 無隊列化建造
            # for gw in self.units(UnitTypeId.GATEWAY).ready.noqueue:
            #     if not self.units(UnitTypeId.STALKER).amount > self.units(UnitTypeId.VOIDRAY).amount:
            #
            #         if self.can_afford(UnitTypeId.STALKER) and self.supply_left > 0:
            #             await self.do(gw.train(UnitTypeId.STALKER))
    
            for sg in self.units(UnitTypeId.STARGATE).ready.noqueue:
                if self.can_afford(UnitTypeId.VOIDRAY) and self.supply_left > 0:
                    await self.do(sg.train(UnitTypeId.VOIDRAY))
    
        ## 尋找目標
        def find_target(self, state):
            if len(self.known_enemy_units) > 0:
                # 隨機選取敵方單位
                return random.choice(self.known_enemy_units)
            elif len(self.known_enemy_units) > 0:
                # 隨機選取敵方建築
                return random.choice(self.known_enemy_structures)
            else:
                # 返回敵方出生點位
                return self.enemy_start_locations[0]
    
        ## 進攻
        async def attack(self):
            # {UNIT: [n to fight, n to defend]}
            aggressive_units = {UnitTypeId.VOIDRAY: [8, 3]}
    
            for UNIT in aggressive_units:
                # 攻擊模式
                if self.units(UNIT).amount > aggressive_units[UNIT][0] and self.units(UNIT).amount > aggressive_units[UNIT][
                    1]:
                    for s in self.units(UNIT).idle:
                        await self.do(s.attack(self.find_target(self.state)))
                # 防衛模式
                elif self.units(UNIT).amount > aggressive_units[UNIT][1]:
                    if len(self.known_enemy_units) > 0:
                        for s in self.units(UNIT).idle:
                            await self.do(s.attack(random.choice(self.known_enemy_units)))
    
    
    ## 啟動遊戲
    run_game(maps.get("AcidPlantLE"), [
        Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Hard)
    ], realtime=False)
    

    運行結果如下,紅色和粉紅色是敵方單位。

    創建訓練數據

    統計資源、人口和軍隊人口比,在intel方法添加如下代碼

            # 追蹤資源、人口和軍隊人口比
            line_max = 50
            mineral_ratio = self.minerals / 1500
            if mineral_ratio > 1.0:
                mineral_ratio = 1.0
    
            vespene_ratio = self.vespene / 1500
            if vespene_ratio > 1.0:
                vespene_ratio = 1.0
    
            population_ratio = self.supply_left / self.supply_cap
            if population_ratio > 1.0:
                population_ratio = 1.0
    
            plausible_supply = self.supply_cap / 200.0
    
            military_weight = len(self.units(UnitTypeId.VOIDRAY)) / (self.supply_cap - self.supply_left)
            if military_weight > 1.0:
                military_weight = 1.0
    
            # 农民/人口      worker/supply ratio
            cv2.line(game_data, (0, 19), (int(line_max * military_weight), 19), (250, 250, 200), 3)
            # 人口/200    plausible supply (supply/200.0)
            cv2.line(game_data, (0, 15), (int(line_max * plausible_supply), 15), (220, 200, 200), 3)
            # (人口-現有人口)/人口  population ratio (supply_left/supply)
            cv2.line(game_data, (0, 11), (int(line_max * population_ratio), 11), (150, 150, 150), 3)
            # 氣體/1500   gas/1500
            cv2.line(game_data, (0, 7), (int(line_max * vespene_ratio), 7), (210, 200, 0), 3)
            # 晶體礦/1500  minerals minerals/1500
            cv2.line(game_data, (0, 3), (int(line_max * mineral_ratio), 3), (0, 255, 25), 3)

    運行結果如下,左下角自上而下依次是“农民/人口”,“人口/200”,“(人口-現有人口)/人口”,“氣體/1500”,“晶體礦/1500”

    採集進攻行為數據,在attack方法中加入如下代碼

            if len(self.units(UnitTypeId.VOIDRAY).idle) > 0:
                choice = random.randrange(0, 4)
                target = False
                if self.iteration > self.do_something_after:
                    if choice == 0:
                        # 什麼都不做
                        wait = random.randrange(20, 165)
                        self.do_something_after = self.iteration + wait
    
                    elif choice == 1:
                        # 攻擊離星靈樞紐最近的單位
                        if len(self.known_enemy_units) > 0:
                            target = self.known_enemy_units.closest_to(random.choice(self.units(UnitTypeId.NEXUS)))
    
                    elif choice == 2:
                        # 攻擊敵方建築
                        if len(self.known_enemy_structures) > 0:
                            target = random.choice(self.known_enemy_structures)
    
                    elif choice == 3:
                        # 攻擊敵方出生位置
                        target = self.enemy_start_locations[0]
    
                    if target:
                        for vr in self.units(UnitTypeId.VOIDRAY).idle:
                            await self.do(vr.attack(target))
                    y = np.zeros(4)
                    y[choice] = 1
                    print(y)
                    self.train_data.append([y, self.flipped])

    輸出如下結果

    ···
    [1. 0. 0. 0.]
    [0. 0. 1. 0.]
    [0. 0. 0. 1.]
    [0. 0. 1. 0.]
    [1. 0. 0. 0.]
    ···

    為了使用self.flipped = cv2.flip(game_data, 0),修改

            flipped = cv2.flip(game_data, 0)
            resized = cv2.resize(flipped, dsize=None, fx=2, fy=2)

            self.flipped = cv2.flip(game_data, 0)
            resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2)

    init 方法添加do_something_after和train_data

        def __init__(self):
            self.ITERATIONS_PER_MINUTE = 165
            self.MAX_WORKERS = 50
            self.do_something_after = 0
            self.train_data = []

    採集攻擊數據的時候不需要畫圖,我們在類前加HEADLESS = False,intel方法代碼修改如下

            self.flipped = cv2.flip(game_data, 0)
    
            if not HEADLESS:
                resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2)
                cv2.imshow('Intel', resized)
                cv2.waitKey(1)

    加入on_end方法,只存儲勝利的數據,在和代碼同級目錄新建train_data文件夾

        def on_end(self, game_result):
            print('--- on_end called ---')
            print(game_result)
    
            if game_result == Result.Victory:
                np.save("train_data/{}.npy".format(str(int(time.time()))), np.array(self.train_data))

    完整代碼如下

    import os
    import time
    
    import sc2
    from sc2 import run_game, maps, Race, Difficulty, position, Result
    from sc2.player import Bot, Computer
    from sc2.constants import *
    import random
    import numpy as np
    import cv2
    
    HEADLESS = True
    # os.environ["SC2PATH"] = 'F:\StarCraft II'
    
    class SentdeBot(sc2.BotAI):
        def __init__(self):
            # 經過計算,每分鐘大約165迭代次數
            self.ITERATIONS_PER_MINUTE = 165
            # 最大农民數量
            self.MAX_WORKERS = 50
            self.do_something_after = 0
            self.train_data = []
    
        def on_end(self, game_result):
            print('--- on_end called ---')
            print(game_result)
    
            if game_result == Result.Victory:
                np.save("train_data/{}.npy".format(str(int(time.time()))), np.array(self.train_data))
    
        async def on_step(self, iteration: int):
            self.iteration = iteration
            await self.scout()
            await self.distribute_workers()
            await self.build_workers()
            await self.build_pylons()
            await self.build_assimilators()
            await self.expand()
            await self.offensive_force_buildings()
            await self.build_offensive_force()
            await self.intel()
            await self.attack()
    
        ## 偵察
        async def scout(self):
            if len(self.units(UnitTypeId.OBSERVER)) > 0:
                scout = self.units(UnitTypeId.OBSERVER)[0]
                if scout.is_idle:
                    enemy_location = self.enemy_start_locations[0]
                    move_to = self.random_location_variance(enemy_location)
                    print(move_to)
                    await self.do(scout.move(move_to))
    
            else:
                for rf in self.units(UnitTypeId.ROBOTICSFACILITY).ready.noqueue:
                    if self.can_afford(UnitTypeId.OBSERVER) and self.supply_left > 0:
                        await self.do(rf.train(UnitTypeId.OBSERVER))
    
        async def intel(self):
            game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)
    
            # UnitTypeId作為key,半徑和顏色是value
            draw_dict = {
                UnitTypeId.NEXUS: [15, (0, 255, 0)],
                UnitTypeId.PYLON: [3, (20, 235, 0)],
                UnitTypeId.PROBE: [1, (55, 200, 0)],
                UnitTypeId.ASSIMILATOR: [2, (55, 200, 0)],
                UnitTypeId.GATEWAY: [3, (200, 100, 0)],
                UnitTypeId.CYBERNETICSCORE: [3, (150, 150, 0)],
                UnitTypeId.STARGATE: [5, (255, 0, 0)],
                UnitTypeId.ROBOTICSFACILITY: [5, (215, 155, 0)],
    
                UnitTypeId.VOIDRAY: [3, (255, 100, 0)],
                # OBSERVER: [3, (255, 255, 255)],
            }
    
            for unit_type in draw_dict:
                for unit in self.units(unit_type).ready:
                    pos = unit.position
                    cv2.circle(game_data, (int(pos[0]), int(pos[1])), draw_dict[unit_type][0], draw_dict[unit_type][1], -1)
    
            # 主基地名稱
            main_base_names = ["nexus", "supplydepot", "hatchery"]
            # 記錄敵方基地位置
            for enemy_building in self.known_enemy_structures:
                pos = enemy_building.position
                # 不是主基地建築,畫小一些
                if enemy_building.name.lower() not in main_base_names:
                    cv2.circle(game_data, (int(pos[0]), int(pos[1])), 5, (200, 50, 212), -1)
            for enemy_building in self.known_enemy_structures:
                pos = enemy_building.position
                if enemy_building.name.lower() in main_base_names:
                    cv2.circle(game_data, (int(pos[0]), int(pos[1])), 15, (0, 0, 255), -1)
    
            for enemy_unit in self.known_enemy_units:
    
                if not enemy_unit.is_structure:
                    worker_names = ["probe", "scv", "drone"]
                    # if that unit is a PROBE, SCV, or DRONE... it's a worker
                    pos = enemy_unit.position
                    if enemy_unit.name.lower() in worker_names:
                        cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (55, 0, 155), -1)
                    else:
                        cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (50, 0, 215), -1)
    
            for obs in self.units(UnitTypeId.OBSERVER).ready:
                pos = obs.position
                cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (255, 255, 255), -1)
    
    
            # 追蹤資源、人口和軍隊人口比
            line_max = 50
            mineral_ratio = self.minerals / 1500
            if mineral_ratio > 1.0:
                mineral_ratio = 1.0
    
            vespene_ratio = self.vespene / 1500
            if vespene_ratio > 1.0:
                vespene_ratio = 1.0
    
            population_ratio = self.supply_left / self.supply_cap
            if population_ratio > 1.0:
                population_ratio = 1.0
    
            plausible_supply = self.supply_cap / 200.0
    
            military_weight = len(self.units(UnitTypeId.VOIDRAY)) / (self.supply_cap - self.supply_left)
            if military_weight > 1.0:
                military_weight = 1.0
    
            # 农民/人口      worker/supply ratio
            cv2.line(game_data, (0, 19), (int(line_max * military_weight), 19), (250, 250, 200), 3)
            # 人口/200    plausible supply (supply/200.0)
            cv2.line(game_data, (0, 15), (int(line_max * plausible_supply), 15), (220, 200, 200), 3)
            # (人口-現有人口)/人口  population ratio (supply_left/supply)
            cv2.line(game_data, (0, 11), (int(line_max * population_ratio), 11), (150, 150, 150), 3)
            # 氣體/1500   gas/1500
            cv2.line(game_data, (0, 7), (int(line_max * vespene_ratio), 7), (210, 200, 0), 3)
            # 晶體礦/1500  minerals minerals/1500
            cv2.line(game_data, (0, 3), (int(line_max * mineral_ratio), 3), (0, 255, 25), 3)
    
    
    
    
            # flip horizontally to make our final fix in visual representation:
            self.flipped = cv2.flip(game_data, 0)
    
            if HEADLESS:
                resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2)
    
                cv2.imshow('Intel', resized)
                cv2.waitKey(1)
    
        def random_location_variance(self, enemy_start_location):
            x = enemy_start_location[0]
            y = enemy_start_location[1]
    
            x += ((random.randrange(-20, 20)) / 100) * enemy_start_location[0]
            y += ((random.randrange(-20, 20)) / 100) * enemy_start_location[1]
    
            if x < 0:
                x = 0
            if y < 0:
                y = 0
            if x > self.game_info.map_size[0]:
                x = self.game_info.map_size[0]
            if y > self.game_info.map_size[1]:
                y = self.game_info.map_size[1]
    
            go_to = position.Point2(position.Pointlike((x, y)))
            return go_to
    
        # 建造农民
        async def build_workers(self):
            # 星靈樞鈕*16(一個基地配備16個农民)大於農民數量並且現有农民數量小於MAX_WORKERS
            if len(self.units(UnitTypeId.NEXUS)) * 16 > len(self.units(UnitTypeId.PROBE)) and len(
                    self.units(UnitTypeId.PROBE)) < self.MAX_WORKERS:
                # 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於佔用資源
                for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
                    # 是否有50晶體礦建造农民
                    if self.can_afford(UnitTypeId.PROBE):
                        await self.do(nexus.train(UnitTypeId.PROBE))
    
        ## 建造水晶
        async def build_pylons(self):
            ## 供應人口和現有人口之差小於5且建築不是正在建造
            if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
                nexuses = self.units(UnitTypeId.NEXUS).ready
                if nexuses.exists:
                    if self.can_afford(UnitTypeId.PYLON):
                        await self.build(UnitTypeId.PYLON, near=nexuses.first)
    
        ## 建造吸收廠
        async def build_assimilators(self):
            for nexus in self.units(UnitTypeId.NEXUS).ready:
                # 在瓦斯泉上建造吸收廠
                vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus)
                for vaspene in vaspenes:
                    if not self.can_afford(UnitTypeId.ASSIMILATOR):
                        break
                    worker = self.select_build_worker(vaspene.position)
                    if worker is None:
                        break
                    if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0, vaspene).exists:
                        await self.do(worker.build(UnitTypeId.ASSIMILATOR, vaspene))
    
        ## 開礦
        async def expand(self):
            # (self.iteration / self.ITERATIONS_PER_MINUTE)是一個緩慢遞增的值,動態開礦
            if self.units(UnitTypeId.NEXUS).amount < self.iteration / self.ITERATIONS_PER_MINUTE and self.can_afford(
                    UnitTypeId.NEXUS):
                await self.expand_now()
    
        ## 建造進攻性建築
        async def offensive_force_buildings(self):
            # print(self.iteration / self.ITERATIONS_PER_MINUTE)
            if self.units(UnitTypeId.PYLON).ready.exists:
                pylon = self.units(UnitTypeId.PYLON).ready.random
                # 根據神族建築科技圖,折躍門建造過後才可以建造控制核心
                if self.units(UnitTypeId.GATEWAY).ready.exists and not self.units(UnitTypeId.CYBERNETICSCORE):
                    if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
                        await self.build(UnitTypeId.CYBERNETICSCORE, near=pylon)
                # 否則建造折躍門
                # (self.iteration / self.ITERATIONS_PER_MINUTE)/2 是一個緩慢遞增的值
                # elif len(self.units(UnitTypeId.GATEWAY)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
                elif len(self.units(UnitTypeId.GATEWAY)) < 1:
                    if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
                        await self.build(UnitTypeId.GATEWAY, near=pylon)
                # 控制核心存在的情況下建造机械台
                if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
                    if len(self.units(UnitTypeId.ROBOTICSFACILITY)) < 1:
                        if self.can_afford(UnitTypeId.ROBOTICSFACILITY) and not self.already_pending(
                                UnitTypeId.ROBOTICSFACILITY):
                            await self.build(UnitTypeId.ROBOTICSFACILITY, near=pylon)
    
                # 控制核心存在的情況下建造星門
                if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
                    if len(self.units(UnitTypeId.STARGATE)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
                        if self.can_afford(UnitTypeId.STARGATE) and not self.already_pending(UnitTypeId.STARGATE):
                            await self.build(UnitTypeId.STARGATE, near=pylon)
    
        ## 造兵
        async def build_offensive_force(self):
            # 無隊列化建造
            # for gw in self.units(UnitTypeId.GATEWAY).ready.noqueue:
            #     if not self.units(UnitTypeId.STALKER).amount > self.units(UnitTypeId.VOIDRAY).amount:
            #
            #         if self.can_afford(UnitTypeId.STALKER) and self.supply_left > 0:
            #             await self.do(gw.train(UnitTypeId.STALKER))
    
            for sg in self.units(UnitTypeId.STARGATE).ready.noqueue:
                if self.can_afford(UnitTypeId.VOIDRAY) and self.supply_left > 0:
                    await self.do(sg.train(UnitTypeId.VOIDRAY))
    
        ## 尋找目標
        def find_target(self, state):
            if len(self.known_enemy_units) > 0:
                # 隨機選取敵方單位
                return random.choice(self.known_enemy_units)
            elif len(self.known_enemy_units) > 0:
                # 隨機選取敵方建築
                return random.choice(self.known_enemy_structures)
            else:
                # 返回敵方出生點位
                return self.enemy_start_locations[0]
    
        ## 進攻
        async def attack(self):
            if len(self.units(UnitTypeId.VOIDRAY).idle) > 0:
                choice = random.randrange(0, 4)
                target = False
                if self.iteration > self.do_something_after:
                    if choice == 0:
                        # 什麼都不做
                        wait = random.randrange(20, 165)
                        self.do_something_after = self.iteration + wait
    
                    elif choice == 1:
                        # 攻擊離星靈樞紐最近的單位
                        if len(self.known_enemy_units) > 0:
                            target = self.known_enemy_units.closest_to(random.choice(self.units(UnitTypeId.NEXUS)))
    
                    elif choice == 2:
                        # 攻擊敵方建築
                        if len(self.known_enemy_structures) > 0:
                            target = random.choice(self.known_enemy_structures)
    
                    elif choice == 3:
                        # 攻擊敵方出生位置
                        target = self.enemy_start_locations[0]
    
                    if target:
                        for vr in self.units(UnitTypeId.VOIDRAY).idle:
                            await self.do(vr.attack(target))
                    y = np.zeros(4)
                    y[choice] = 1
                    print(y)
                    self.train_data.append([y, self.flipped])
    
    
    ## 啟動遊戲
    run_game(maps.get("AcidPlantLE"), [
        Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Medium)
    ], realtime=False)
    

    可以看到train_data文件夾下存儲了勝利數據

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

  • 深入理解Kafka必知必會(2)

    深入理解Kafka必知必會(2)

    Kafka目前有哪些內部topic,它們都有什麼特徵?各自的作用又是什麼?

    __consumer_offsets:作用是保存 Kafka 消費者的位移信息
    __transaction_state:用來存儲事務日誌消息

    優先副本是什麼?它有什麼特殊的作用?

    所謂的優先副本是指在AR集合列表中的第一個副本。
    理想情況下,優先副本就是該分區的leader 副本,所以也可以稱之為 preferred leader。Kafka 要確保所有主題的優先副本在 Kafka 集群中均勻分佈,這樣就保證了所有分區的 leader 均衡分佈。以此來促進集群的負載均衡,這一行為也可以稱為“分區平衡”。

    Kafka有哪幾處地方有分區分配的概念?簡述大致的過程及原理

    1. 生產者的分區分配是指為每條消息指定其所要發往的分區。可以編寫一個具體的類實現org.apache.kafka.clients.producer.Partitioner接口。
    2. 消費者中的分區分配是指為消費者指定其可以消費消息的分區。Kafka 提供了消費者客戶端參數 partition.assignment.strategy 來設置消費者與訂閱主題之間的分區分配策略。
    3. 分區副本的分配是指為集群制定創建主題時的分區副本分配方案,即在哪個 broker 中創建哪些分區的副本。kafka-topics.sh 腳本中提供了一個 replica-assignment 參數來手動指定分區副本的分配方案。

    簡述Kafka的日誌目錄結構

    Kafka 中的消息是以主題為基本單位進行歸類的,各個主題在邏輯上相互獨立。每個主題又可以分為一個或多個分區。不考慮多副本的情況,一個分區對應一個日誌(Log)。為了防止 Log 過大,Kafka 又引入了日誌分段(LogSegment)的概念,將 Log 切分為多個 LogSegment,相當於一個巨型文件被平均分配為多個相對較小的文件。

    Log 和 LogSegment 也不是純粹物理意義上的概念,Log 在物理上只以文件夾的形式存儲,而每個 LogSegment 對應於磁盤上的一個日誌文件和兩個索引文件,以及可能的其他文件(比如以“.txnindex”為後綴的事務索引文件)

    Kafka中有那些索引文件?

    每個日誌分段文件對應了兩個索引文件,主要用來提高查找消息的效率。
    偏移量索引文件用來建立消息偏移量(offset)到物理地址之間的映射關係,方便快速定位消息所在的物理文件位置
    時間戳索引文件則根據指定的時間戳(timestamp)來查找對應的偏移量信息。

    如果我指定了一個offset,Kafka怎麼查找到對應的消息?

    Kafka是通過seek() 方法來指定消費的,在執行seek() 方法之前要去執行一次poll()方法,等到分配到分區之後會去對應的分區的指定位置開始消費,如果指定的位置發生了越界,那麼會根據auto.offset.reset 參數設置的情況進行消費。

    如果我指定了一個timestamp,Kafka怎麼查找到對應的消息?

    Kafka提供了一個 offsetsForTimes() 方法,通過 timestamp 來查詢與此對應的分區位置。offsetsForTimes() 方法的參數 timestampsToSearch 是一個 Map 類型,key 為待查詢的分區,而 value 為待查詢的時間戳,該方法會返回時間戳大於等於待查詢時間的第一條消息對應的位置和時間戳,對應於 OffsetAndTimestamp 中的 offset 和 timestamp 字段。

    聊一聊你對Kafka的Log Retention的理解

    日誌刪除(Log Retention):按照一定的保留策略直接刪除不符合條件的日誌分段。
    我們可以通過 broker 端參數 log.cleanup.policy 來設置日誌清理策略,此參數的默認值為“delete”,即採用日誌刪除的清理策略。

    1. 基於時間
      日誌刪除任務會檢查當前日誌文件中是否有保留時間超過設定的閾值(retentionMs)來尋找可刪除的日誌分段文件集合(deletableSegments)retentionMs 可以通過 broker 端參數 log.retention.hours、log.retention.minutes 和 log.retention.ms 來配置,其中 log.retention.ms 的優先級最高,log.retention.minutes 次之,log.retention.hours 最低。默認情況下只配置了 log.retention.hours 參數,其值為168,故默認情況下日誌分段文件的保留時間為7天。
      刪除日誌分段時,首先會從 Log 對象中所維護日誌分段的跳躍表中移除待刪除的日誌分段,以保證沒有線程對這些日誌分段進行讀取操作。然後將日誌分段所對應的所有文件添加上“.deleted”的後綴(當然也包括對應的索引文件)。最後交由一個以“delete-file”命名的延遲任務來刪除這些以“.deleted”為後綴的文件,這個任務的延遲執行時間可以通過 file.delete.delay.ms 參數來調配,此參數的默認值為60000,即1分鐘。

    2. 基於日誌大小
      日誌刪除任務會檢查當前日誌的大小是否超過設定的閾值(retentionSize)來尋找可刪除的日誌分段的文件集合(deletableSegments)。
      retentionSize 可以通過 broker 端參數 log.retention.bytes 來配置,默認值為-1,表示無窮大。注意 log.retention.bytes 配置的是 Log 中所有日誌文件的總大小,而不是單個日誌分段(確切地說應該為 .log 日誌文件)的大小。單個日誌分段的大小由 broker 端參數 log.segment.bytes 來限制,默認值為1073741824,即 1GB。
      這個刪除操作和基於時間的保留策略的刪除操作相同。
    3. 基於日誌起始偏移量
      基於日誌起始偏移量的保留策略的判斷依據是某日誌分段的下一個日誌分段的起始偏移量 baseOffset 是否小於等於 logStartOffset,若是,則可以刪除此日誌分段。

    如上圖所示,假設 logStartOffset 等於25,日誌分段1的起始偏移量為0,日誌分段2的起始偏移量為11,日誌分段3的起始偏移量為23,通過如下動作收集可刪除的日誌分段的文件集合 deletableSegments:

    從頭開始遍歷每個日誌分段,日誌分段1的下一個日誌分段的起始偏移量為11,小於 logStartOffset 的大小,將日誌分段1加入 deletableSegments。
    日誌分段2的下一個日誌偏移量的起始偏移量為23,也小於 logStartOffset 的大小,將日誌分段2加入 deletableSegments。
    日誌分段3的下一個日誌偏移量在 logStartOffset 的右側,故從日誌分段3開始的所有日誌分段都不會加入 deletableSegments。
    收集完可刪除的日誌分段的文件集合之後的刪除操作同基於日誌大小的保留策略和基於時間的保留策略相同

    聊一聊你對Kafka的Log Compaction的理解

    日誌壓縮(Log Compaction):針對每個消息的 key 進行整合,對於有相同 key 的不同 value 值,只保留最後一個版本。
    如果要採用日誌壓縮的清理策略,就需要將 log.cleanup.policy 設置為“compact”,並且還需要將 log.cleaner.enable (默認值為 true)設定為 true。

    如下圖所示,Log Compaction 對於有相同 key 的不同 value 值,只保留最後一個版本。如果應用只關心 key 對應的最新 value 值,則可以開啟 Kafka 的日誌清理功能,Kafka 會定期將相同 key 的消息進行合併,只保留最新的 value 值。

    聊一聊你對Kafka底層存儲的理解

    頁緩存

    頁緩存是操作系統實現的一種主要的磁盤緩存,以此用來減少對磁盤 I/O 的操作。具體來說,就是把磁盤中的數據緩存到內存中,把對磁盤的訪問變為對內存的訪問。

    當一個進程準備讀取磁盤上的文件內容時,操作系統會先查看待讀取的數據所在的頁(page)是否在頁緩存(pagecache)中,如果存在(命中)則直接返回數據,從而避免了對物理磁盤的 I/O 操作;如果沒有命中,則操作系統會向磁盤發起讀取請求並將讀取的數據頁存入頁緩存,之後再將數據返回給進程。

    同樣,如果一個進程需要將數據寫入磁盤,那麼操作系統也會檢測數據對應的頁是否在頁緩存中,如果不存在,則會先在頁緩存中添加相應的頁,最後將數據寫入對應的頁。被修改過後的頁也就變成了臟頁,操作系統會在合適的時間把臟頁中的數據寫入磁盤,以保持數據的一致性。

    用過 Java 的人一般都知道兩點事實:對象的內存開銷非常大,通常會是真實數據大小的幾倍甚至更多,空間使用率低下;Java 的垃圾回收會隨着堆內數據的增多而變得越來越慢。基於這些因素,使用文件系統並依賴於頁緩存的做法明顯要優於維護一個進程內緩存或其他結構,至少我們可以省去了一份進程內部的緩存消耗,同時還可以通過結構緊湊的字節碼來替代使用對象的方式以節省更多的空間。

    此外,即使 Kafka 服務重啟,頁緩存還是會保持有效,然而進程內的緩存卻需要重建。這樣也極大地簡化了代碼邏輯,因為維護頁緩存和文件之間的一致性交由操作系統來負責,這樣會比進程內維護更加安全有效。

    零拷貝

    除了消息順序追加、頁緩存等技術,Kafka 還使用零拷貝(Zero-Copy)技術來進一步提升性能。所謂的零拷貝是指將數據直接從磁盤文件複製到網卡設備中,而不需要經由應用程序之手。零拷貝大大提高了應用程序的性能,減少了內核和用戶模式之間的上下文切換。對 Linux 操作系統而言,零拷貝技術依賴於底層的 sendfile() 方法實現。對應於 Java 語言,FileChannal.transferTo() 方法的底層實現就是 sendfile() 方法。

    聊一聊Kafka的延時操作的原理

    Kafka 中有多種延時操作,比如延時生產,還有延時拉取(DelayedFetch)、延時數據刪除(DelayedDeleteRecords)等。
    延時操作創建之後會被加入延時操作管理器(DelayedOperationPurgatory)來做專門的處理。延時操作有可能會超時,每個延時操作管理器都會配備一個定時器(SystemTimer)來做超時管理,定時器的底層就是採用時間輪(TimingWheel)實現的。

    聊一聊Kafka控制器的作用

    在 Kafka 集群中會有一個或多個 broker,其中有一個 broker 會被選舉為控制器(Kafka Controller),它負責管理整個集群中所有分區和副本的狀態。當某個分區的 leader 副本出現故障時,由控制器負責為該分區選舉新的 leader 副本。當檢測到某個分區的 ISR 集合發生變化時,由控制器負責通知所有broker更新其元數據信息。當使用 kafka-topics.sh 腳本為某個 topic 增加分區數量時,同樣還是由控制器負責分區的重新分配。

    Kafka的舊版Scala的消費者客戶端的設計有什麼缺陷?

    如上圖,舊版消費者客戶端每個消費組( )在 ZooKeeper 中都維護了一個 /consumers/ /ids 路徑,在此路徑下使用臨時節點記錄隸屬於此消費組的消費者的唯一標識(consumerIdString),/consumers/ /owner 路徑下記錄了分區和消費者的對應關係,/consumers/ /offsets 路徑下記錄了此消費組在分區中對應的消費位移。

    每個消費者在啟動時都會在 /consumers/ /ids 和 /brokers/ids 路徑上註冊一個監聽器。當 /consumers/ /ids 路徑下的子節點發生變化時,表示消費組中的消費者發生了變化;當 /brokers/ids 路徑下的子節點發生變化時,表示 broker 出現了增減。這樣通過 ZooKeeper 所提供的 Watcher,每個消費者就可以監聽消費組和 Kafka 集群的狀態了。

    這種方式下每個消費者對 ZooKeeper 的相關路徑分別進行監聽,當觸發再均衡操作時,一個消費組下的所有消費者會同時進行再均衡操作,而消費者之間並不知道彼此操作的結果,這樣可能導致 Kafka 工作在一個不正確的狀態。與此同時,這種嚴重依賴於 ZooKeeper 集群的做法還有兩個比較嚴重的問題。

    1. 羊群效應(Herd Effect):所謂的羊群效應是指ZooKeeper 中一個被監聽的節點變化,大量的 Watcher 通知被發送到客戶端,導致在通知期間的其他操作延遲,也有可能發生類似死鎖的情況。
    2. 腦裂問題(Split Brain):消費者進行再均衡操作時每個消費者都與 ZooKeeper 進行通信以判斷消費者或broker變化的情況,由於 ZooKeeper 本身的特性,可能導致在同一時刻各個消費者獲取的狀態不一致,這樣會導致異常問題發生。

    消費再均衡的原理是什麼?(提示:消費者協調器和消費組協調器)

    就目前而言,一共有如下幾種情形會觸發再均衡的操作:

    • 有新的消費者加入消費組。
    • 有消費者宕機下線。消費者並不一定需要真正下線,例如遇到長時間的GC、網絡延遲導致消費者長時間未向 GroupCoordinator 發送心跳等情況時,GroupCoordinator 會認為消費者已經下線。
    • 有消費者主動退出消費組(發送 LeaveGroupRequest 請求)。比如客戶端調用了 unsubscrible() 方法取消對某些主題的訂閱。
    • 消費組所對應的 GroupCoorinator 節點發生了變更。
    • 消費組內所訂閱的任一主題或者主題的分區數量發生變化。

    GroupCoordinator 是 Kafka 服務端中用於管理消費組的組件。而消費者客戶端中的 ConsumerCoordinator 組件負責與 GroupCoordinator 進行交互。

    第一階段(FIND_COORDINATOR)

    消費者需要確定它所屬的消費組對應的 GroupCoordinator 所在的 broker,並創建與該 broker 相互通信的網絡連接。如果消費者已經保存了與消費組對應的 GroupCoordinator 節點的信息,並且與它之間的網絡連接是正常的,那麼就可以進入第二階段。否則,就需要向集群中的某個節點發送 FindCoordinatorRequest 請求來查找對應的 GroupCoordinator,這裏的“某個節點”並非是集群中的任意節點,而是負載最小的節點。

    第二階段(JOIN_GROUP)

    在成功找到消費組所對應的 GroupCoordinator 之後就進入加入消費組的階段,在此階段的消費者會向 GroupCoordinator 發送 JoinGroupRequest 請求,並處理響應。

    選舉消費組的leader
    如果消費組內還沒有 leader,那麼第一個加入消費組的消費者即為消費組的 leader。如果某一時刻 leader 消費者由於某些原因退出了消費組,那麼會重新選舉一個新的 leader

    選舉分區分配策略

    1. 收集各個消費者支持的所有分配策略,組成候選集 candidates。
    2. 每個消費者從候選集 candidates 中找出第一個自身支持的策略,為這個策略投上一票。
    3. 計算候選集中各個策略的選票數,選票數最多的策略即為當前消費組的分配策略。

    第三階段(SYNC_GROUP)

    leader 消費者根據在第二階段中選舉出來的分區分配策略來實施具體的分區分配,在此之後需要將分配的方案同步給各個消費者,通過 GroupCoordinator 這個“中間人”來負責轉發同步分配方案的。

    第四階段(HEARTBEAT)

    進入這個階段之後,消費組中的所有消費者就會處於正常工作狀態。在正式消費之前,消費者還需要確定拉取消息的起始位置。假設之前已經將最後的消費位移提交到了 GroupCoordinator,並且 GroupCoordinator 將其保存到了 Kafka 內部的 __consumer_offsets 主題中,此時消費者可以通過 OffsetFetchRequest 請求獲取上次提交的消費位移並從此處繼續消費。

    消費者通過向 GroupCoordinator 發送心跳來維持它們與消費組的從屬關係,以及它們對分區的所有權關係。只要消費者以正常的時間間隔發送心跳,就被認為是活躍的,說明它還在讀取分區中的消息。心跳線程是一個獨立的線程,可以在輪詢消息的空檔發送心跳。如果消費者停止發送心跳的時間足夠長,則整個會話就被判定為過期,GroupCoordinator 也會認為這個消費者已經死亡,就會觸發一次再均衡行為。

    Kafka中的冪等是怎麼實現的?

    為了實現生產者的冪等性,Kafka 為此引入了 producer id(以下簡稱 PID)和序列號(sequence number)這兩個概念。

    每個新的生產者實例在初始化的時候都會被分配一個 PID,這個 PID 對用戶而言是完全透明的。對於每個 PID,消息發送到的每一個分區都有對應的序列號,這些序列號從0開始單調遞增。生產者每發送一條消息就會將 <PID,分區> 對應的序列號的值加1。

    broker 端會在內存中為每一對 <PID,分區> 維護一個序列號。對於收到的每一條消息,只有當它的序列號的值(SN_new)比 broker 端中維護的對應的序列號的值(SN_old)大1(即 SN_new = SN_old + 1)時,broker 才會接收它。如果 SN_new< SN_old + 1,那麼說明消息被重複寫入,broker 可以直接將其丟棄。如果 SN_new> SN_old + 1,那麼說明中間有數據尚未寫入,出現了亂序,暗示可能有消息丟失,對應的生產者會拋出 OutOfOrderSequenceException,這個異常是一個嚴重的異常,後續的諸如 send()、beginTransaction()、commitTransaction() 等方法的調用都會拋出 IllegalStateException 的異常。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    ※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

    ※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

    ※Google地圖已可更新顯示潭子電動車充電站設置地點!!

    ※帶您來看台北網站建置台北網頁設計,各種案例分享

  • 【python測試開發棧】帶你徹底搞明白python3編碼原理

    【python測試開發棧】帶你徹底搞明白python3編碼原理

    在之前的文章中,我們介紹過編碼格式的發展史:[文章傳送門-todo]。今天我們通過幾個例子,來徹底搞清楚python3中的編碼格式原理,這樣你之後寫python腳本時碰到編碼問題,才能有章可循。

    我們先搞清楚幾個概念:

    • 系統默認編碼:指python解釋器默認的編碼格式,在python文件頭部沒有聲明其他編碼格式時,python3默認的編碼格式是utf-8。
    • 本地默認編碼:操作系統默認的編碼,常見的Windows的默認編碼是gbk,Linux的默認編碼是UTF-8。
    • python文件頭部聲明編碼格式:修改的是文件的默認編碼格式,只是會影響python解釋器讀取python文件時的編碼格式,並不會改變系統默認編碼和本地默認編碼。

    通過python自帶的庫,可以查看系統默認編碼和本地默認編碼

    Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
    Type "help", "copyright", "credits" or "license" for more information.
    >>> import sys
    >>> sys.getdefaultencoding()
    'utf-8'
    >>> import locale
    >>> locale.getdefaultlocale()
    ('zh_CN', 'cp936')
    >>>

    注意,因為我在windows系統的電腦上 進行測試,所以系統默認編碼返回“cp936”, 這是代碼頁(是字符編碼集的別名),而936對應的就是gbk。如果你在linux或者mac上執行上面的代碼,應該會返回utf-8編碼。

    其實總結來看,容易出現亂碼的場景,基本都與讀寫程序有關,比如:讀取/寫入某個文件,或者從網絡流中讀取數據等,因為這個過程中涉及到了編碼解碼的過程,只要編碼和解碼的編碼格式對應不上,就容易出現亂碼。下面我們舉兩個具體的例子,來驗證下python的編碼原理,幫助你理解這個過程。注意:下面的例子都是在pycharm中寫的。

    01默認的編碼格式

    我們新建一個encode_demo.py的文件,其文件默認的編碼格式是UTF-8(可以從pycharm右下角看到編碼格式),代碼如下:

    """
        @author: asus
        @time: 2019/11/21
        @function: 驗證編碼格式
    """
    import sys, locale
    
    
    def write_str_default_encode():
        s = "我是一個str"
        print(s)
        print(type(s))
        print(sys.getdefaultencoding())
        print(locale.getdefaultlocale())
    
        with open("utf_file", "w", encoding="utf-8") as f:
            f.write(s)
        with open("gbk_file", "w", encoding="gbk") as f:
            f.write(s)
        with open("jis_file", "w", encoding="shift-jis") as f:
            f.write(s)
    
    
    if __name__ == '__main__':
        write_str_default_encode()
    

    我們先來猜測下結果,因為我們沒有聲明編碼格式,所以python解釋器默認用UTF-8去解碼文件,因為文件默認編碼格式就是UTF-8,所以字符串s可以正常打印。同時以UTF-8編碼格式寫文件不會出現亂碼,而以gbk和shift-jis(日文編碼)寫文件會出現亂碼(這裏說明一點,我是用pycharm直接打開生成的文件查看的,編輯器默認編碼是UTF-8,如果在windows上用記事本打開則其默認編碼跟隨系統是GBK,gbk_file和utf_file均不會出現亂碼,只有jis_file是亂碼),我們運行看下結果:

    # 運行結果
    我是一個str
    <class 'str'>
    utf-8
    ('zh_CN', 'cp936')
    
    # 寫文件utf_file、gbk_file、jis_file文件內容分別是:
    我是一個str
    ����һ��str
    �䐥�꘢str

    和我們猜測的結果一致,下面我們做個改變,在文件頭部聲明個編碼格式,再來看看效果。

    02 python頭文件聲明編碼格式

    因為上面文件encode_demo.py的格式是UTF-8,那麼我們就將其變為gbk編碼。同樣的我們先來推測下結果,在pycharm中,在python文件頭部聲明編碼為gbk后(頭部加上 # coding=gbk ),文件的編碼格式變成gbk,同時python解釋器會用gbk去解碼encode_demo.py文件,所以運行結果應該和用UTF-8編碼時一樣。運行結果如下:

    # 運行結果
    我是一個str
    <class 'str'>
    utf-8
    ('zh_CN', 'cp936')
    
    # 寫文件utf_file、gbk_file、jis_file文件內容分別是:
    我是一個str
    ����һ��str
    �䐥�꘢str

    結果確實是一樣的,證明我們推論是正確的。接下來我們再做個嘗試,假如我們將(# coding=gbk)去掉(需要注意,在pycharm中將 # coding=gbk去掉,並不會改變文件的編碼格式,也就是說encode_demo.py還是gbk編碼),我們再運行一次看結果:

      File "D:/codespace/python/pythonObject/pythonSample/basic/encodeDemo/encode_demo.py", line 4
    SyntaxError: Non-UTF-8 code starting with '\xd1' in file D:/codespace/python/pythonObject/pythonSample/basic/encodeDemo/encode_demo.py on line 5, but no encoding declared; see http://python.org/dev/peps/pep-0263/ for details
    

    運行直接報錯了,我們加個斷點,看看具體的異常信息:

    看錯誤提示是UnicodeDecodeError,python解釋器在對encode_demo.py文件解碼時,使用默認的UTF-8編碼,但是文件本身是gbk編碼,所以當碰到有中文沒辦法識別時,就拋出DecodeError。

    03 敲黑板,划重點

    python3中的str和bytes

    python3的重要特性之一就是對字符串和二進制流做了嚴格的區分,我們聲明的字符串都是str類型,不過Str和bytes是可以相互轉換的:

    def str_transfor_bytes():
        s = '我是一個測試Str'
        print(type(s))
        # str 轉bytes
        b = s.encode()
        print(b)
        print(type(b))
        # bytes轉str
        c = b.decode('utf-8')
        print(c)
        print(type(c))
    
    
    if __name__ == '__main__':
        str_transfor_bytes()

    需要注意一點:在調用encode()和decode()方法時,如果不傳參數,則會使用python解釋器默認的編碼格式UTF-8(如果不在python頭文件聲明編碼格式)。但是如果傳參的話,encode和decode使用的編碼格式要能對應上。

    python3默認編碼是UTF-8?還是Unicode?

    經常在很多文章里看到,python3的默認編碼格式是Unicode,但是我在本文中卻一直在說python3的默認編碼格式是UTF-8,那麼哪種說法是正確的呢?其實兩種說法都對,主要得搞清楚Unicode和UTF-8的區別(之前文章有提到):

    • Unicode是一個字符集,說白了就是把各種編碼的映射關係全都整合起來,不過它是不可變長的,全部都以兩個字節或四個字節來表示,佔用的內存空間比較大。
    • UTF-8是Unicode的一種實現方式,主要對 Unicode 碼的數據進行轉換,方便存儲和網絡傳輸 。它是可變長編碼,比如對於英文字母,它使用一個字節就可以表示。

    在python3內存中使用的字符串全都是Unicode碼,當python解釋器解析python文件時,默認使用UTF-8編碼。

    open()方法默認使用本地編碼

    在上面的例子中,我們往磁盤寫入文件時,都指定了編碼格式。如果不指定編碼格式,那麼默認將使用操作系統本地默認的編碼格式,比如:Linux默認是UTF-8,windows默認是GBK。其實這也好理解,因為和磁盤交互,肯定要考慮操作系統的編碼格式。這有區別於encode()和decode()使用的是python解釋器的默認編碼格式,千萬別搞混淆了。

    總結

    不知道你看完上面的例子后,是否已經徹底理解了python3的編碼原理。不過所有的編碼問題,都逃不過“編碼”和“解碼”兩個過程,當你碰到編碼問題時,先確定源文件使用的編碼,再確定目標文件需要的編碼格式,只要能匹配,一般就可以解決編碼的問題。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

  • 小白學 Python 爬蟲(2):前置準備(一)基本類庫的安裝

    小白學 Python 爬蟲(2):前置準備(一)基本類庫的安裝

    人生苦短,我用 Python

    前文傳送門:

    本篇內容較長,各位同學可以先收藏后再看~~

    在開始講爬蟲之前,還是先把環境搞搞好,工欲善其事必先利其器嘛~~~

    本篇文章主要介紹 Python 爬蟲所使用到的請求庫和解析庫,請求庫用來請求目標內容,解析庫用來解析請求回來的內容。

    開發環境

    首先介紹小編本地的開發環境:

    • Python3.7.4
    • win10

    差不多就這些,最基礎的環境,其他環境需要我們一個一個安裝,現在開始。

    請求庫

    雖然 Python 為我們內置了 HTTP 請求庫 urllib ,使用姿勢並不是很優雅,但是很多第三方的提供的 HTTP 庫確實更加的簡潔優雅,我們下面開始。

    Requests

    Requests 類庫是一個第三方提供的用於發送 HTTP 同步請求的類庫,相比較 Python 自帶的 urllib 類庫更加的方便和簡潔。

    Python 為我們提供了包管理工具 pip ,使用 pip 安裝將會非常的方便,安裝命令如下:

    pip install requests

    驗證:

    C:\Users\inwsy>python
    Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
    Type "help", "copyright", "credits" or "license" for more information.
    >>> import requests

    首先在 CMD 命令行中輸入 python ,進入 python 的命令行模式,然後輸入 import requests 如果沒有任何錯誤提示,說明我們已經成功安裝 Requests 類庫。

    Selenium

    Selenium 現在更多的是用來做自動化測試工具,相關的書籍也不少,同時,我們也可以使用它來做爬蟲工具,畢竟是自動化測試么,利用它我們可以讓瀏覽器執行我們想要的動作,比如點擊某個按鈕、滾動滑輪之類的操作,這對我們模擬真實用戶操作是非常方便的。

    安裝命令如下:

    pip install selenium

    驗證:

    C:\Users\inwsy>python
    Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
    Type "help", "copyright", "credits" or "license" for more information.
    >>> import selenium

    這樣沒報錯我們就安裝完成,但是你以為這樣就算好了么?圖樣圖森破啊。

    ChromeDriver

    我們還需要瀏覽器的支持來配合 selenium 的工作,開發人員嘛,常用的瀏覽器莫非那麼幾種:Chrome、Firefox,那位說 IE 的同學,你給我站起來,小心我跳起來打你膝蓋,還有說 360 瀏覽器的,你們可讓我省省心吧。

    接下來,安裝 Chrome 瀏覽器就不用講了吧。。。。

    再接下來,我們開始安裝 ChromeDriver ,安裝了 ChromeDriver 后,我們才能通過剛才安裝的 selenium 來驅動 Chrome 來完成各種騷操作。

    首先,我們需要查看自己的 Chrome 瀏覽器的版本,在 Chrome 瀏覽器右上角的三個點鐘,點擊 幫助 -> 關於,如下圖:

    將這個版本找個小本本記下來,小編這裏的版本為: 版本 78.0.3904.97(正式版本) (64 位)

    接下來我們需要去 ChromeDriver 的官網查看當前 Chrome 對應的驅動。

    官網地址:

    因某些原因,訪問時需某些手段,訪問不了的就看小編為大家準備的版本對應表格吧。。。

    ChromeDriver Version Chrome Version
    78.0.3904.11 78
    77.0.3865.40 77
    77.0.3865.10 77
    76.0.3809.126 76
    76.0.3809.68 76
    76.0.3809.25 76
    76.0.3809.12 76
    75.0.3770.90 75
    75.0.3770.8 75
    74.0.3729.6 74
    73.0.3683.68 73
    72.0.3626.69 72
    2.46 71-73
    2.45 70-72
    2.44 69-71
    2.43 69-71
    2.42 68-70
    2.41 67-69
    2.40 66-68
    2.39 66-68
    2.38 65-67
    2.37 64-66
    2.36 63-65
    2.35 62-64

    順便小編找到了國內對應的下載的鏡像站,由淘寶提供,如下:

    雖然和小編本地的小版本對不上,但是看樣子只要大版本符合應該沒啥問題,so,去鏡像站下載對應的版本即可,小編這裏下載的版本是:78.0.3904.70 ,ChromeDriver 78版本的最後一個小版本。

    下載完成后,將可執行文件 chromedriver.exe 移動至 Python 安裝目錄的 Scripts 目錄下。如果使用默認安裝未修改過安裝目錄的話目錄是:%homepath%\AppData\Local\Programs\Python\Python37\Scripts ,如果有過修改,那就自力更生吧。。。

    chromedriver.exe 添加后結果如下圖:

    驗證:

    還是在 CMD 命令行中,輸入以下內容:

    C:\Users\inwsy>python
    Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
    Type "help", "copyright", "credits" or "license" for more information.
    >>> from selenium import webdriver
    >>> browser = webdriver.Chrome()

    如果打開一個空白的 Chrome 頁面說明安裝成功。

    GeckoDriver

    上面我們通過安裝 Chrome 的驅動完成了 Selenium 與 Chrome 的對接,想要完成 Selenium 與 FireFox 的對接則需要安裝另一個驅動 GeckoDriver 。

    FireFox 的安裝小編這裏就不介紹了,大家最好去官網下載安裝,路徑如下:

    FireFox 官網地址:

    GeckoDriver 的下載需要去 Github (全球最大的同性交友網站),下載路徑小編已經找好了,可以選擇最新的 releases 版本進行下載。

    下載地址:

    選擇對應自己的環境,小編這裏選擇 win-64 ,版本為 v0.26.0 進行下載。

    具體配置方式和上面一樣,將可執行的 .exe 文件放入 %homepath%\AppData\Local\Programs\Python\Python37\Scripts 目錄下即可。

    驗證:

    還是在 CMD 命令行中,輸入以下內容:

    C:\Users\inwsy>python
    Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
    Type "help", "copyright", "credits" or "license" for more information.
    >>> from selenium import webdriver
    >>> browser = webdriver.Firefox()

    應該是可以正常打開一個空白的 FireFox 頁面的,結果如下:

    注意: GeckoDriver 指出一點,當前的版本在 win 下使用有已知 bug ,需要安裝微軟的一個插件才能解決,原文如下:

    You must still have the installed on your system for the binary to run. This is a known bug which we weren’t able fix for this release.

    插件下載地址:

    請各位同學選擇自己對應的系統版本進行下載安裝。

    Aiohttp

    上面我們介紹了同步的 Http 請求庫 Requests ,而 Aiohttp 則是一個提供異步 Http 請求的類庫。

    那麼,問題來了,什麼是同步請求?什麼是異步請求呢?

    • 同步:阻塞式,簡單理解就是當發出一個請求以後,程序會一直等待這個請求響應,直到響應以後,才繼續做下一步。
    • 異步:非阻塞式,還是上面的例子,當發出一個請求以後,程序並不會阻塞在這裏,等待請求響應,而是可以去做其他事情。

    從資源消耗和效率上來說,同步請求是肯定比不過異步請求的,這也是為什麼異步請求會比同步請求擁有更大的吞吐量。在抓取數據時使用異步請求,可以大大提升抓取的效率。

    如果還想了解跟多有關 aiohttp 的內容,可以訪問官方文檔: 。

    aiohttp 安裝如下:

    pip install aiohttp

    aiohttp 還推薦我們安裝另外兩個庫,一個是字符編碼檢測庫 cchardet ,另一個是加速DNS的解析庫 aiodns 。

    安裝 cchardet 庫:

    pip install cchardet

    安裝 aiodns 庫:

    pip install aiodns

    aiohttp 十分貼心的為我們準備了整合的安裝命令,無需一個一個鍵入命令,如下:

    pip install aiohttp[speedups]

    驗證:

    C:\Users\inwsy>python
    Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
    Type "help", "copyright", "credits" or "license" for more information.
    >>> import aiohttp

    沒報錯就安裝成功。

    解析庫

    lxml

    lxml 是 Python 的一個解析庫,支持 HTML 和 XML 的解析,支持 XPath 的解析方式,而且解析效率非常高。

    什麼是 XPath ?

    XPath即為XML路徑語言(XML Path Language),它是一種用來確定XML文檔中某部分位置的語言。
    XPath基於XML的樹狀結構,提供在數據結構樹中找尋節點的能力。起初XPath的提出的初衷是將其作為一個通用的、介於XPointer與XSL間的語法模型。

    以上內容來源《百度百科》

    好吧,小編說人話,就是可以從 XML 文檔或者 HTML 文檔中快速的定位到所需要的位置的路徑語言。

    還沒看懂?emmmmmmmmmmm,我們可以使用 XPath 快速的取出 XML 或者 HTML 文檔中想要的值。用法的話我們放到後面再聊。

    安裝 lxml 庫:

    pip install lxml

    驗證:

    C:\Users\inwsy>python
    Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
    Type "help", "copyright", "credits" or "license" for more information.
    >>> import lxml

    沒報錯就安裝成功。

    Beautiful Soup

    Beautiful Soup 同樣也是一個 Python 的 HTML 或 XML 的解析庫 。它擁有強大的解析能力,我們可以使用它更方便的從 HTML 文檔中提取數據。

    首先,放一下 Beautiful Soup 的官方網址,有各種問題都可以在官網查看文檔,各位同學養成一個好習慣,有問題找官方文檔,雖然是英文的,使用 Chrome 自帶的翻譯功能還是勉強能看的。

    官方網站:

    安裝方式依然使用 pip 進行安裝:

    pip install beautifulsoup4

    Beautiful Soup 的 HTML 和 XML 解析器是依賴於 lxml 庫的,所以在此之前請確保已經成功安裝好了 lxml 庫 。

    驗證:

    C:\Users\inwsy>python
    Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
    Type "help", "copyright", "credits" or "license" for more information.
    >>> from bs4 import BeautifulSoup

    沒報錯就安裝成功。

    pyquery

    pyquery 同樣也是一個網頁解析庫,只不過和前面兩個有區別的是它提供了類似 jQuery 的語法來解析 HTML 文檔,前端有經驗的同學應該會非常喜歡這款解析庫。

    首先還是放一下 pyquery 的官方文檔地址。

    官方文檔:

    安裝:

    pip install pyquery

    驗證:

    C:\Users\inwsy>python
    Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
    Type "help", "copyright", "credits" or "license" for more information.
    >>> import pyquery

    沒報錯就安裝成功。

    本篇的內容就先到這裏結束了。請各位同學在自己的電腦上將上面介紹的內容都安裝一遍,以便後續學習使用。

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    ※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

    ※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

    ※Google地圖已可更新顯示潭子電動車充電站設置地點!!

    ※帶您來看台北網站建置台北網頁設計,各種案例分享

  • fastjason常用方法

    fastjason常用方法

    什麼是fastjson?

    Fastjson是一個Java語言編寫的高性能功能完善的JSON庫。它採用一種“假定有序快速匹配”的算法,把JSON Parse的性能提升到極致,是目前Java語言中最快的JSON庫。Fastjson接口簡單易用,已經被廣泛使用在緩存序列化、協議交互、Web輸出、Android客戶端等多種應用場景。

    主要特點:

    • 快速FAST (比其它任何基於Java的解析器和生成器更快,包括jackson)
    • 強大(支持普通JDK類包括任意Java Bean Class、Collection、Map、Date或enum)
    • 零依賴(沒有依賴其它任何類庫除了JDK)

    背景

    最近關於fastjson的消息,引起了很多人的關注!

    fastjson爆出重大漏洞,攻擊者可使整個業務癱瘓

    漏洞描述

    常用JSON組件FastJson存在遠程代碼執行漏洞,攻擊者可通過精心構建的json報文對目標服務器執行任意命令,從而獲得服務器權限。此次爆發的漏洞為以往漏洞中autoType的繞過。

    影響範圍

    FastJson < 1.2.48

    很多開發者才猛然發現,fastjson已經深入到我們開發工作的方方面面。那麼除了趕快升級你的json外,我們來挖挖fastjson最常用的用法。

    fastjson常用方式

    1.maven依賴(記得升級到1.2.48以上版本哦)

            <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
            <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
             <version>1.2.62</version>
            </dependency>    

    2.FastJson對於json格式字符串的解析主要用到了一下三個類:

    (1)JSON:fastJson的解析器,用於JSON格式字符串與JSON對象及javaBean之間的轉換。

    (2)JSONObject:fastJson提供的json對象。

    (3)JSONArray:fastJson提供json數組對象。

    3.常用方式

    3.1 string和java對象

     

    實例1:對象轉json字符串

            Map<String,String> map=new HashMap<String,String>();
            map.put("code","0");
            map.put("message","ok");
            String json=JSON.toJSONString(map);
            System.out.println(json);

    輸出結果為:

    {"code":"0","message":"ok"}

    實例2:字符串轉對象

            Map<String,String> map=new HashMap<String,String>();
            map.put("code","0");
            map.put("message","ok");
            String json=JSON.toJSONString(map);
            System.out.println(json);
            
            Map obj=(Map)JSON.parse(json);
            System.out.println("code="+obj.get("code")+",message="+obj.get("message"));

    輸出結果

    {"code":"0","message":"ok"}
    code=0,message=ok

    3.2 工具類JSONObject

        public static void main(String[] args) {
            Map<String,String> map=new HashMap<String,String>();
            map.put("code","0");
            map.put("message","ok");
            String json=JSON.toJSONString(map);
            System.out.println(json);
            
            Map obj=(Map)JSON.parse(json);
            System.out.println("code="+obj.get("code")+",message="+obj.get("message"));        
            
            String code=JSON.parseObject(json).getString("code");
            String message=JSON.parseObject(json).getString("message");
            System.out.println("code="+code+",message="+message);
        }

    輸出結果

    {"code":"0","message":"ok"}
    code=0,message=ok
    code=0,message=ok

    3.3 數組對象

    List<user> list=new ArrayList<user>(JSONArray.parseArray(jsonString,user.class)); 

    Fastjson 與各種JSON庫的性能比較:

     

    json庫 序列化性能 反序列化性能 jar大小
    fastjson 1201 1216 fastjson-1.1.26.jar(356k)
    fastjson-1.1.25-android.jar(226k)
    jackson 1408 1915 jackson-annotations-2.1.1.jar(34k)
    jackson-core-2.1.1.jar(206k)
    jackson-databind-2.1.1.jar(922k)
    總共1162k
    gson 7421 5065 gson-2.2.2.jar(189k)
    json-lib 27555 87292 json-lib-2.4-jdk15.jar(159k)


    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

  • Spring Boot2 系列教程(二十五)Spring Boot 整合 Jpa 多數據源

    Spring Boot2 系列教程(二十五)Spring Boot 整合 Jpa 多數據源

    本文是 Spring Boot 整合數據持久化方案的最後一篇,主要和大夥來聊聊 Spring Boot 整合 Jpa 多數據源問題。在 Spring Boot 整合JbdcTemplate 多數據源、Spring Boot 整合 MyBatis 多數據源以及 Spring Boot 整合 Jpa 多數據源這三個知識點中,整合 Jpa 多數據源算是最複雜的一種,也是很多人在配置時最容易出錯的一種。本文大夥就跟着松哥的教程,一步一步整合 Jpa 多數據源。

    工程創建

    首先是創建一個 Spring Boot 工程,創建時添加基本的 Web、Jpa 以及 MySQL 依賴,如下:

    創建完成后,添加 Druid 依賴,這裏和前文的要求一樣,要使用專為 Spring Boot 打造的 Druid,大夥可能發現了,如果整合多數據源一定要使用這個依賴,因為這個依賴中才有 DruidDataSourceBuilder,最後還要記得鎖定數據庫依賴的版本,因為可能大部分人用的還是 5.x 的 MySQL 而不是 8.x。完整依賴如下:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid-spring-boot-starter</artifactId>
        <version>1.1.10</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.28</version>
        <scope>runtime</scope>
    </dependency>

    如此之後,工程就創建成功了。

    基本配置

    在基本配置中,我們首先來配置多數據源基本信息以及 DataSource,首先在 application.properties 中添加如下配置信息:

    #  數據源一
    spring.datasource.one.username=root
    spring.datasource.one.password=root
    spring.datasource.one.url=jdbc:mysql:///test01?useUnicode=true&characterEncoding=UTF-8
    spring.datasource.one.type=com.alibaba.druid.pool.DruidDataSource
    
    #  數據源二
    spring.datasource.two.username=root
    spring.datasource.two.password=root
    spring.datasource.two.url=jdbc:mysql:///test02?useUnicode=true&characterEncoding=UTF-8
    spring.datasource.two.type=com.alibaba.druid.pool.DruidDataSource
    
    # Jpa配置
    spring.jpa.properties.database=mysql
    spring.jpa.properties.show-sql=true
    spring.jpa.properties.database-platform=mysql
    spring.jpa.properties.hibernate.ddl-auto=update
    spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL57Dialect

    這裏 Jpa 的配置和上文相比 key 中多了 properties,多數據源的配置和前文一致,然後接下來配置兩個 DataSource,如下:

    @Configuration
    public class DataSourceConfig {
        @Bean
        @ConfigurationProperties(prefix = "spring.datasource.one")
        @Primary
        DataSource dsOne() {
            return DruidDataSourceBuilder.create().build();
        }
        @Bean
        @ConfigurationProperties(prefix = "spring.datasource.two")
        DataSource dsTwo() {
            return DruidDataSourceBuilder.create().build();
        }
    }

    這裏的配置和前文的多數據源配置基本一致,但是注意多了一個在 Spring 中使用較少的註解 @Primary,這個註解一定不能少,否則在項目啟動時會出錯,@Primary 表示當某一個類存在多個實例時,優先使用哪個實例。

    好了,這樣,DataSource 就有了。

    多數據源配置

    接下來配置 Jpa 的基本信息,這裏兩個數據源,我分別在兩個類中來配置,先來看第一個配置:

    @Configuration
    @EnableJpaRepositories(basePackages = "org.javaboy.jpa.dao",entityManagerFactoryRef = "localContainerEntityManagerFactoryBeanOne",transactionManagerRef = "platformTransactionManagerOne")
    public class JpaConfigOne {
        @Autowired
        @Qualifier(value = "dsOne")
        DataSource dsOne;
        @Autowired
        JpaProperties jpaProperties;
        @Bean
        @Primary
        LocalContainerEntityManagerFactoryBean localContainerEntityManagerFactoryBeanOne(EntityManagerFactoryBuilder builder) {
            return builder.dataSource(dsOne)
                    .packages("org.javaboy.jpa.model")
                    .properties(jpaProperties.getProperties())
                    .persistenceUnit("pu1")
                    .build();
        }
        @Bean
        PlatformTransactionManager platformTransactionManagerOne(EntityManagerFactoryBuilder builder) {
            LocalContainerEntityManagerFactoryBean factoryBeanOne = localContainerEntityManagerFactoryBeanOne(builder);
            return new JpaTransactionManager(factoryBeanOne.getObject());
        }
    }

    首先這裏注入 dsOne,再注入 JpaProperties,JpaProperties 是系統提供的一個實例,裡邊的數據就是我們在 application.properties 中配置的 jpa 相關的配置。然後我們提供兩個 Bean,分別是 LocalContainerEntityManagerFactoryBean 和 PlatformTransactionManager 事務管理器,不同於 MyBatis 和 JdbcTemplate,在 Jpa 中,事務一定要配置。在提供 LocalContainerEntityManagerFactoryBean 的時候,需要指定 packages,這裏的 packages 指定的包就是這個數據源對應的實體類所在的位置,另外在這裏配置類上通過 @EnableJpaRepositories 註解指定 dao 所在的位置,以及 LocalContainerEntityManagerFactoryBean 和 PlatformTransactionManager 分別對應的引用的名字。

    好了,這樣第一個就配置好了,第二個基本和這個類似,主要有幾個不同點:

    • dao 的位置不同
    • persistenceUnit 不同
    • 相關 bean 的名稱不同

    注意實體類可以共用。

    代碼如下:

    @Configuration
    @EnableJpaRepositories(basePackages = "org.javaboy.jpa.dao2",entityManagerFactoryRef = "localContainerEntityManagerFactoryBeanTwo",transactionManagerRef = "platformTransactionManagerTwo")
    public class JpaConfigTwo {
        @Autowired
        @Qualifier(value = "dsTwo")
        DataSource dsTwo;
        @Autowired
        JpaProperties jpaProperties;
        @Bean
        LocalContainerEntityManagerFactoryBean localContainerEntityManagerFactoryBeanTwo(EntityManagerFactoryBuilder builder) {
            return builder.dataSource(dsTwo)
                    .packages("org.javaboy.jpa.model")
                    .properties(jpaProperties.getProperties())
                    .persistenceUnit("pu2")
                    .build();
        }
        @Bean
        PlatformTransactionManager platformTransactionManagerTwo(EntityManagerFactoryBuilder builder) {
            LocalContainerEntityManagerFactoryBean factoryBeanTwo = localContainerEntityManagerFactoryBeanTwo(builder);
            return new JpaTransactionManager(factoryBeanTwo.getObject());
        }
    }

    接下來,在對應位置分別提供相關的實體類和 dao 即可,數據源一的 dao 如下:

    package org.javaboy.jpa.dao;
    public interface UserDao extends JpaRepository<User,Integer> {
        List<User> getUserByAddressEqualsAndIdLessThanEqual(String address, Integer id);
        @Query(value = "select * from t_user where id=(select max(id) from t_user)",nativeQuery = true)
        User maxIdUser();
    }

    數據源二的 dao 如下:

    package org.javaboy.jpa.dao2;
    public interface UserDao2 extends JpaRepository<User,Integer> {
        List<User> getUserByAddressEqualsAndIdLessThanEqual(String address, Integer id);
    
        @Query(value = "select * from t_user where id=(select max(id) from t_user)",nativeQuery = true)
        User maxIdUser();
    }

    共同的實體類如下:

    package org.javaboy.jpa.model;
    @Entity(name = "t_user")
    public class User {
        @Id
        @GeneratedValue(strategy = GenerationType.IDENTITY)
        private Integer id;
        private String username;
        private String address;
        //省略getter/setter
    }

    到此,所有的配置就算完成了,接下來就可以在 Service 中注入不同的 UserDao,不同的 UserDao 操作不同的數據源。

    其實整合 Jpa 多數據源也不算難,就是有幾個細節問題,這些細節問題解決,其實前面介紹的其他多數據源整個都差不多。

    好了,本文就先介紹到這裏。

    相關案例已經上傳到 GitHub,歡迎小夥伴們們下載:

    掃碼關注松哥,公眾號後台回復 2TB,獲取松哥獨家 超2TB 免費 Java 學習乾貨

    本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

    ※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

    ※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

    ※Google地圖已可更新顯示潭子電動車充電站設置地點!!

    ※帶您來看台北網站建置台北網頁設計,各種案例分享

  • 德國頒佈最新電動汽車補貼計畫 共投入12億歐元

    德國頒佈最新電動汽車補貼計畫 共投入12億歐元

    據報導,自7月1日起,德國頒佈最新的電動車補貼計畫,截止目前已經有近2000位申請者,其中寶馬車主占多數。  
      為了促進電動車等環保車型的普及,德國為每位購買電動車的消費者提供4000歐元的補貼,插電式混合動力車的補貼為3000歐元。在計畫實施後,有1791位插電式混合動力車的車主申請了補貼,其中有581位購買了寶馬的車型。同時還有444位申請者購買了雷諾車型,大眾汽車買主為154位。   據統計,目前德國人汽車擁有量為4500萬輛,而其中僅有5萬輛是純電動或者是混合動力車輛。為改善這一情況,德國此次計畫共投入12億歐元,由政府和汽車製造商平攤,希望能夠在2019年6月底,即計畫截止期前售出40萬輛電動車。   文章來源:環球網

    本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

    【其他文章推薦】

    台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

    網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

    ※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整