久久精品精选,精品九九视频,www久久只有这里有精品,亚洲熟女乱色综合一区
    分享

    探索C++無鎖隊列:多線程編程的高效利器

     深度Linux 2024-11-06 發布于湖南

    在 C++ 的多線程編程世界中,有一個神奇的存在 —— 無鎖隊列。它宛如一座堅固的橋梁,橫跨在多線程協作的鴻溝之上,成為提升程序性能和穩定性的關鍵角色。

    隨著計算機硬件的不斷發展,多核處理器已經成為主流。多線程編程由此變得愈發重要,然而,傳統基于鎖的同步機制卻逐漸暴露出諸多問題。在這一背景下,無鎖隊列應運而生,它宛如黑暗中的燈塔,為多線程程序的高效運行照亮了前行的道路。

    想象一下,在一個繁忙的交通樞紐,傳統的鎖機制就像是交通管制中的紅綠燈,雖然能維持秩序,但頻繁的等待和切換也會造成擁堵。而無鎖隊列則更像是智能交通系統,讓數據在多線程之間流暢地穿梭,無需不必要的停頓。它為我們打開了一扇通往更高效多線程編程的大門,讓我們一同走進 C++ 無鎖隊列的奇妙世界吧。

    一、無鎖隊列簡介

    無鎖隊列(Lock-Free Queue)是一種并發數據結構,用于在多線程環境下實現高效的數據交換。與傳統的基于鎖的隊列相比,無鎖隊列使用了一些特殊的算法和技術,避免了線程之間的互斥操作,從而提高了并發性能和響應性。

    無鎖隊列通常基于原子操作(atomic operations)或其他底層同步原語來實現,并且它們采用一些巧妙的方法來確保操作的正確性。主要思想是通過使用原子讀寫操作或類似的機制,在沒有顯式鎖定整個隊列的情況下實現線程安全。

    典型的無鎖隊列算法有循環緩沖區(Circular Buffer)和鏈表(Linked List)等。循環緩沖區通常使用兩個指針(head 和 tail)來表示隊列的開始和結束位置,利用自旋、CAS (Compare-and-Swap) 等原子操作來進行入隊和出隊操作。鏈表則通過利用 CAS 操作插入或刪除節點來實現并發訪問。

    在當今多核心優化的大背景下,無鎖隊列在多線程編程中起著至關重要的作用。多核心優化是當前游戲開發以及許多領域的重點課題,無論是工程實踐還是算法研究,將工作并行化交由多線程去處理是極為普遍的場景。

    在這種情況下,線程池與命令隊列的組合常常被采用,而其中的命令隊列就可以選擇使用互斥鎖或者無鎖隊列。由于命令隊列的讀寫通常是較輕量級的操作,采用無鎖隊列能夠獲得比有鎖操作更高的性能。無鎖隊列通過使用原子操作來確保線程安全,避免了鎖的開銷,包括上下文切換、線程調度延遲以及潛在的死鎖問題。

    在多處理器系統中,無鎖隊列可以更好地擴展。隨著處理器數量的增加,使用鎖的隊列可能會遇到瓶頸,因為多個線程競爭同一個鎖。而無鎖隊列通過減少這種競爭,可以提供更好的并行性。在實時系統中,無鎖隊列可以提供更高效的響應時間,確保系統能夠及時處理各種任務。

    優點:

    • 提供了更好的并發性能,避免了互斥操作帶來的性能瓶頸。

    • 對于高度競爭情況下可以提供更好的可伸縮性。

    缺點:

    • 實現相對復雜,需要考慮并發安全和正確性問題。

    • 在高度競爭的情況下可能出現自旋等待導致的性能損失。

    二、無鎖隊列工作原理

    無鎖隊列的原理是通過使用原子操作(atomic operations)或其他底層同步原語來實現并發安全。它們避免了傳統鎖機制中的互斥操作,以提高并發性能和響應性。典型的無鎖隊列算法有循環緩沖區(Circular Buffer)和鏈表(Linked List)等。

    在循環緩沖區的實現中,通常使用兩個指針來表示隊列的開始位置和結束位置,即頭指針(head)和尾指針(tail)。入隊時,通過自旋、CAS (Compare-and-Swap) 等原子操作更新尾指針,并將元素放入相應位置。出隊時,同樣利用原子操作更新頭指針,并返回對應位置上的元素。

    鏈表實現無鎖隊列時,在插入或刪除節點時使用 CAS 操作來確保只有一個線程成功修改節點的指針值。這樣可以避免對整個鏈表進行加鎖操作。

    無論是循環緩沖區還是鏈表實現,關鍵點在于如何利用原子操作確保不同線程之間的協調與一致性。需要仔細處理并發情況下可能出現的競爭條件,并設計合適的算法來保證正確性和性能。

    2.1隊列操作模型

    隊列是一種非常重要的數據結構,其特性是先進先出(FIFO),符合流水線業務流程。在進程間通信、網絡通信間經常采用隊列做緩存,緩解數據處理壓力。根據操作隊列的場景分為:單生產者——單消費者、多生產者——單消費者、單生產者——多消費者、多生產者——多消費者四大模型。根據隊列中數據分為:隊列中的數據是定長的、隊列中的數據是變長的。

    (1)單生產者——單消費者

    (2)多生產者——單消費者

    (3)單生產者——多消費者

    (4)多生產者——多消費者

    2.2CAS操作

    CAS即Compare and Swap,是所有CPU指令都支持CAS的原子操作(X86中CMPXCHG匯編指令),用于實現實現各種無鎖(lock free)數據結構。

    CAS操作的C語言實現如下:

    bool compare_and_swap ( int *memory_location, int expected_value, int new_value)
    {
    if (*memory_location == expected_value)
    {
    *memory_location = new_value;
    return true;
    }
    return false;
    }

    CAS用于檢查一個內存位置是否包含預期值,如果包含,則把新值復賦值到內存位置。成功返回true,失敗返回false。

    (1)GGC對CAS支持,GCC4.1+版本中支持CAS原子操作。

    bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...);
    type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);

    (2)Windows對CAS支持,Windows中使用Windows API支持CAS。

    LONG InterlockedCompareExchange(
    LONG volatile *Destination,
    LONG ExChange,
    LONG Comperand
    );

    (3)C11對CAS支持,C11 STL中atomic函數支持CAS并可以跨平臺。

    template< class T >
    bool atomic_compare_exchange_weak( std::atomic* obj,T* expected, T desired );
    template< class T >
    bool atomic_compare_exchange_weak( volatile std::atomic* obj,T* expected, T desired );

    其它原子操作如下:

    • Fetch-And-Add:一般用來對變量做+1的原子操作

    • Test-and-set:寫值到某個內存位置并傳回其舊值

    2.3隊列數據定長與變長

    (1)隊列數據定長

    (2)隊列數據變長

    ⑴問題描述

    在多線程環境下,原始隊列會出現各種不可預料的問題。以兩個線程同時寫入為例,假設線程 A 和線程 B 同時對原始隊列進行寫入操作。首先看原始隊列的入隊偽代碼:void Enqueue(Node *node){m_Tail->next = node;m_Tail = node;},這個操作分為兩步。當兩個線程同時執行時,可能出現這樣的情況:線程 A 執行完第一步m_Tail->next = nodeC后,線程 B 開始執行并完成了整個入隊操作,接著線程 A 繼續執行第二步m_Tail = nodeB,這就導致了 Tail 指針失去與隊列的鏈接,后加的節點從 Head 開始就訪問不到了。這種情況會使得隊列的狀態變得混亂,無法保證數據的正確存儲和讀取。

    ⑵解決方法

    為了解決上述問題,可以使用原子操作實現無鎖同步。原子操作是不可分割的操作,CPU 的一個線程在執行原子操作時,不會被其他線程中斷或搶占。其中,典型的原子操作有 Load / Store(讀取與保存)、Test and Set(針對 bool 變量,如果為 true 則返回 true,如果為 false,則將變量置為 true 并返回 false)、Clear(將 bool 變量設為 false)、Exchange(將指定位置的值設置為傳入值,并返回其舊值)等。

    而 CAS(Compare And Swap)在實現無鎖同步中起著關鍵作用。CAS 操作包含三個參數:一個內存地址 V、一個期望值 A 和一個新值 B。當執行 CAS 操作時,如果當前內存地址 V 中存儲的值等于期望值 A,則將新值 B 寫入該內存地址,并返回 true;否則,不做任何修改,并返回 false。在無鎖隊列中,可以利用 CAS 操作來確保對 Head 或 Tail 指針的讀寫操作是原子性的,從而避免多線程同時寫入或讀取時出現的指針混亂問題。例如,在入隊操作中,可以使用 CAS 來確保在更新 Tail 指針時,不會被其他線程干擾。如果當前 Tail 指針指向的節點的_next指針與期望值不一致,說明有其他線程進行了寫入操作,此時可以重新嘗試 CAS 操作,直到成功為止。這樣就可以實現無鎖隊列的安全寫入和讀取操作。

    三、無鎖隊列方案

    3.1boost方案

    boost提供了三種無鎖方案,分別適用不同使用場景。

    • boost::lockfree::queue是支持多個生產者和多個消費者線程的無鎖隊列。

    • boost::lockfree::stack是支持多個生產者和多個消費者線程的無鎖棧。

    • boost::lockfree::spsc_queue是僅支持單個生產者和單個消費者線程的無鎖隊列,比boost::lockfree::queue性能更好。

    Boost無鎖數據結構的API通過輕量級原子鎖實現lock-free,不是真正意義的無鎖。

    Boost提供的queue可以設置初始容量,添加新元素時如果容量不夠,則總容量自動增長;但對于無鎖數據結構,添加新元素時如果容量不夠,總容量不會自動增長。

    3.2并發隊列

    ConcurrentQueue采用了無鎖算法來實現并發操作。它基于CAS(Compare-and-Swap)原子操作和其他底層同步原語來保證線程安全性。具體來說,它使用自旋鎖和原子指令來確保對隊列的修改是原子的,并且在多個線程之間共享數據時提供正確性保證。

    ConcurrentQueue是基于C++實現的工業級無鎖隊列方案。
    GitHub:https://github.com/cameron314/concurrentqueue
    ReaderWriterQueue是基于C++實現的單生產者單消費者場景的無鎖隊列方案。
    GitHub:https://github.com/cameron314/readerwriterqueue

    ConcurrentQueue具有以下特點:

    • 線程安全:多個線程可以同時對隊列進行操作而無需額外加鎖。

    • 無阻塞:入隊和出隊操作通常是非阻塞的,并且具有較低的開銷。

    • 先進先出(FIFO)順序:元素按照插入順序排列,在出隊時會返回最早入隊的元素。

    使用ConcurrentQueue可以方便地處理多個線程之間共享數據,并減少由于加鎖引起的性能開銷。但需要注意,雖然ConcurrentQueue提供了高效、線程安全的并發操作,但在某些特定情況下可能不適合所有應用場景,因此在選擇數據結構時需要根據具體需求進行評估。

    3.3Disruptor

    Disruptor是一種高性能的并發編程框架,用于實現無鎖(lock-free)的并發數據結構。它最初由LMAX Exchange開發,并成為了其核心交易引擎的關鍵組件。

    Disruptor旨在解決在高度多線程環境下的數據共享和通信問題。它基于環形緩沖區(Ring Buffer)和事件驅動模型,通過優化內存訪問和線程調度,提供了非常高效的消息傳遞機制。

    Disruptor是英國外匯交易公司LMAX基于JAVA開發的一個高性能隊列。

    GitHub:https://github.com/LMAX-Exchange/disruptor

    主要特點如下:

    • 無鎖設計:Disruptor使用CAS(Compare-and-Swap)等無鎖算法來避免使用傳統鎖帶來的競爭和阻塞。

    • 高吞吐量:Disruptor利用環形緩沖區和預分配內存等技術,在保證正確性前提下追求盡可能高的處理速度。

    • 低延遲:由于無鎖設計和緊湊的內存布局,Disruptor能夠實現非常低的消息處理延遲。

    • 線程間協調:Disruptor提供了靈活而強大的事件發布、消費者等待及觸發機制,可用于實現復雜的線程間通信模式。

    使用Disruptor可以有效地解決生產者-消費者模型中數據傳遞過程中的性能瓶頸,特別適用于高并發、低延遲的應用場景,例如金融交易系統、消息隊列等。然而,由于Disruptor對編程模型和理解要求較高,使用時需要仔細考慮,并根據具體需求評估是否適合。

    四、無鎖隊列的實現方式

    4.1環形緩沖區

    RingBuffer是生產者和消費者模型中常用的數據結構,生產者將數據追加到數組尾端,當達到數組的尾部時,生產者繞回到數組的頭部;消費者從數組頭端取走數據,當到達數組的尾部時,消費者繞回到數組頭部。

    如果只有一個生產者和一個消費者,環形緩沖區可以無鎖訪問,環形緩沖區的寫入index只允許生產者訪問并修改,只要生產者在更新index前將新的值保存到緩沖區中,則消費者將始終看到一致的數據結構;讀取index也只允許消費者訪問并修改,消費者只要在取走數據后更新讀index,則生產者將始終看到一致的數據結構。

    • 空隊列時,front與rear相等;當有元素進隊,則rear后移;有元素出隊,則front后移。

    • 空隊列時,rear等于front;滿隊列時,隊列尾部空一個位置,因此判斷循環隊列滿時使用(rear-front+maxn)%maxn。

    入隊操作:

    data[rear] = x;
    rear = (rear+1)%maxn;

    出隊操作:

    x = data[front];
    front = (front+1)%maxn;

    單生產者單消費者

    對于單生產者和單消費者場景,由于read_index和write_index都只會有一個線程寫,因此不需要加鎖也不需要原子操作,直接修改即可,但讀寫數據時需要考慮遇到數組尾部的情況。

    線程對write_index和read_index的讀寫操作如下:

    • (1)寫操作。先判斷隊列時否為滿,如果隊列未滿,則先寫數據,寫完數據后再修改write_index。

    • (2)讀操作。先判斷隊列是否為空,如果隊列不為空,則先讀數據,讀完再修改read_index。

    多生產者單消費者:多生產者和單消費者場景中,由于多個生產者都會修改write_index,所以在不加鎖的情況下必須使用原子操作。

    4.2RingBuffer實現

    RingBuffer.hpp文件:

    #pragma once

    template <class T>
    class RingBuffer
    {
    public:
    RingBuffer(unsigned size): m_size(size), m_front(0), m_rear(0)
    {
    m_data = new T[size];
    }

    ~RingBuffer()
    {
    delete [] m_data;
    m_data = NULL;
    }

    inline bool isEmpty() const
    {
    return m_front == m_rear;
    }

    inline bool isFull() const
    {
    return m_front == (m_rear + 1) % m_size;
    }

    bool push(const T& value)
    {
    if(isFull())
    {
    return false;
    }
    m_data[m_rear] = value;
    m_rear = (m_rear + 1) % m_size;
    return true;
    }

    bool push(const T* value)
    {
    if(isFull())
    {
    return false;
    }
    m_data[m_rear] = *value;
    m_rear = (m_rear + 1) % m_size;
    return true;
    }

    inline bool pop(T& value)
    {
    if(isEmpty())
    {
    return false;
    }
    value = m_data[m_front];
    m_front = (m_front + 1) % m_size;
    return true;
    }

    inline unsigned int front()const
    {
    return m_front;
    }

    inline unsigned int rear()const
    {
    return m_rear;
    }

    inline unsigned int size()const
    {
    return m_size;
    }
    private:
    unsigned int m_size;// 隊列長度
    int m_front;// 隊列頭部索引
    int m_rear;// 隊列尾部索引
    T* m_data;// 數據緩沖區
    };

    RingBufferTest.cpp測試代碼:

    #include <stdio.h>
    #include <thread>
    #include <unistd.h>
    #include <sys/time.h>
    #include "RingBuffer.hpp"


    class Test
    {
    public:
    Test(int id = 0, int value = 0)
    {
    this->id = id;
    this->value = value;
    sprintf(data, "id = %d, value = %d\n", this->id, this->value);
    }

    void display()
    {
    printf("%s", data);
    }
    private:
    int id;
    int value;
    char data[128];
    };

    double getdetlatimeofday(struct timeval *begin, struct timeval *end)
    {
    return (end->tv_sec + end->tv_usec * 1.0 / 1000000) -
    (begin->tv_sec + begin->tv_usec * 1.0 / 1000000);
    }

    RingBuffer<Test> queue(1 << 12);2u000

    #define N (10 * (1 << 20))

    void produce()
    {
    struct timeval begin, end;
    gettimeofday(&begin, NULL);
    unsigned int i = 0;
    while(i < N)
    {
    if(queue.push(Test(i % 1024, i)))
    {
    i++;
    }
    }

    gettimeofday(&end, NULL);
    double tm = getdetlatimeofday(&begin, &end);
    printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
    }

    void consume()
    {
    sleep(1);
    Test test;
    struct timeval begin, end;
    gettimeofday(&begin, NULL);
    unsigned int i = 0;
    while(i < N)
    {
    if(queue.pop(test))
    {
    // test.display();
    i++;
    }
    }
    gettimeofday(&end, NULL);
    double tm = getdetlatimeofday(&begin, &end);
    printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f, size=%u \n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
    }

    int main(int argc, char const *argv[])
    {
    std::thread producer1(produce);
    std::thread consumer(consume);
    producer1.join();
    consumer.join();
    return 0;
    }

    編譯:

    g++ --std=c++11  RingBufferTest.cpp -o test -pthread

    單生產者單消費者場景下,消息吞吐量為350萬條/秒左右。

    4.3LockFreeQueue實現

    LockFreeQueue.hpp:

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <unistd.h>
    #include <fcntl.h>
    #include <stdbool.h>
    #include <sys/stat.h>
    #include <sys/types.h>
    #include <sys/time.h>
    #include <sys/mman.h>

    #define SHM_NAME_LEN 128
    #define MIN(a, b) ((a) > (b) ? (b) : (a))
    #define IS_POT(x) ((x) && !((x) & ((x)-1)))
    #define MEMORY_BARRIER __sync_synchronize()

    template <class T>
    class LockFreeQueue
    {
    protected:
    typedef struct
    {
    int m_lock;
    inline void spinlock_init()
    {
    m_lock = 0;
    }

    inline void spinlock_lock()
    {
    while(!__sync_bool_compare_and_swap(&m_lock, 0, 1)) {}
    }

    inline void spinlock_unlock()
    {
    __sync_lock_release(&m_lock);
    }
    } spinlock_t;

    public:
    // size:隊列大小
    // name:共享內存key的路徑名稱,默認為NULL,使用數組作為底層緩沖區。
    LockFreeQueue(unsigned int size, const char* name = NULL)
    {
    memset(shm_name, 0, sizeof(shm_name));
    createQueue(name, size);
    }

    ~LockFreeQueue()
    {
    if(shm_name[0] == 0)
    {
    delete [] m_buffer;
    m_buffer = NULL;
    }
    else
    {
    if (munmap(m_buffer, m_size * sizeof(T)) == -1) {
    perror("munmap");
    }
    if (shm_unlink(shm_name) == -1) {
    perror("shm_unlink");
    }
    }
    }

    bool isFull()const
    {
    #ifdef USE_POT
    return m_head == (m_tail + 1) & (m_size - 1);
    #else
    return m_head == (m_tail + 1) % m_size;
    #endif
    }

    bool isEmpty()const
    {
    return m_head == m_tail;
    }

    unsigned int front()const
    {
    return m_head;
    }

    unsigned int tail()const
    {
    return m_tail;
    }

    bool push(const T& value)
    {
    #ifdef USE_LOCK
    m_spinLock.spinlock_lock();
    #endif
    if(isFull())
    {
    #ifdef USE_LOCK
    m_spinLock.spinlock_unlock();
    #endif
    return false;
    }
    memcpy(m_buffer + m_tail, &value, sizeof(T));
    #ifdef USE_MB
    MEMORY_BARRIER;
    #endif

    #ifdef USE_POT
    m_tail = (m_tail + 1) & (m_size - 1);
    #else
    m_tail = (m_tail + 1) % m_size;
    #endif

    #ifdef USE_LOCK
    m_spinLock.spinlock_unlock();
    #endif
    return true;
    }

    bool pop(T& value)
    {
    #ifdef USE_LOCK
    m_spinLock.spinlock_lock();
    #endif
    if (isEmpty())
    {
    #ifdef USE_LOCK
    m_spinLock.spinlock_unlock();
    #endif
    return false;
    }
    memcpy(&value, m_buffer + m_head, sizeof(T));
    #ifdef USE_MB
    MEMORY_BARRIER;
    #endif

    #ifdef USE_POT
    m_head = (m_head + 1) & (m_size - 1);
    #else
    m_head = (m_head + 1) % m_size;
    #endif

    #ifdef USE_LOCK
    m_spinLock.spinlock_unlock();
    #endif
    return true;
    }

    protected:
    virtual void createQueue(const char* name, unsigned int size)
    {
    #ifdef USE_POT
    if (!IS_POT(size))
    {
    size = roundup_pow_of_two(size);
    }
    #endif
    m_size = size;
    m_head = m_tail = 0;
    if(name == NULL)
    {
    m_buffer = new T[m_size];
    }
    else
    {
    int shm_fd = shm_open(name, O_CREAT | O_RDWR, 0666);
    if (shm_fd < 0)
    {
    perror("shm_open");
    }

    if (ftruncate(shm_fd, m_size * sizeof(T)) < 0)
    {
    perror("ftruncate");
    close(shm_fd);
    }

    void *addr = mmap(0, m_size * sizeof(T), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
    if (addr == MAP_FAILED)
    {
    perror("mmap");
    close(shm_fd);
    }
    if (close(shm_fd) == -1)
    {
    perror("close");
    exit(1);
    }

    m_buffer = static_cast<T*>(addr);
    memcpy(shm_name, name, SHM_NAME_LEN - 1);
    }
    #ifdef USE_LOCK
    spinlock_init(m_lock);
    #endif
    }
    inline unsigned int roundup_pow_of_two(size_t size)
    {
    size |= size >> 1;
    size |= size >> 2;
    size |= size >> 4;
    size |= size >> 8;
    size |= size >> 16;
    size |= size >> 32;
    return size + 1;
    }
    protected:
    char shm_name[SHM_NAME_LEN];
    volatile unsigned int m_head;
    volatile unsigned int m_tail;
    unsigned int m_size;
    #ifdef USE_LOCK
    spinlock_t m_spinLock;
    #endif
    T* m_buffer;
    };
    #define USE_LOCK

    開啟spinlock鎖,多生產者多消費者場景

    #define USE_MB

    開啟Memory Barrier

    #define USE_POT

    開啟隊列大小的2的冪對齊

    LockFreeQueueTest.cpp測試文件:

    #include "LockFreeQueue.hpp"
    #include <thread>

    //#define USE_LOCK

    class Test
    {
    public:
    Test(int id = 0, int value = 0)
    {
    this->id = id;
    this->value = value;
    sprintf(data, "id = %d, value = %d\n", this->id, this->value);
    }

    void display()
    {
    printf("%s", data);
    }
    private:
    int id;
    int value;
    char data[128];
    };

    double getdetlatimeofday(struct timeval *begin, struct timeval *end)
    {
    return (end->tv_sec + end->tv_usec * 1.0 / 1000000) -
    (begin->tv_sec + begin->tv_usec * 1.0 / 1000000);
    }

    LockFreeQueue<Test> queue(1 << 10, "/shm");

    #define N ((1 << 20))

    void produce()
    {
    struct timeval begin, end;
    gettimeofday(&begin, NULL);
    unsigned int i = 0;
    while(i < N)
    {
    if(queue.push(Test(i >> 10, i)))
    i++;
    }
    gettimeofday(&end, NULL);
    double tm = getdetlatimeofday(&begin, &end);
    printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
    }

    void consume()
    {
    Test test;
    struct timeval begin, end;
    gettimeofday(&begin, NULL);
    unsigned int i = 0;
    while(i < N)
    {
    if(queue.pop(test))
    {
    //test.display();
    i++;
    }
    }
    gettimeofday(&end, NULL);
    double tm = getdetlatimeofday(&begin, &end);
    printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
    }

    int main(int argc, char const *argv[])
    {
    std::thread producer1(produce);
    //std::thread producer2(produce);
    std::thread consumer(consume);
    producer1.join();
    //producer2.join();
    consumer.join();

    return 0;
    }

    多線程場景下,需要定義USE_LOCK宏,開啟鎖保護。

    編譯:

    g++ --std=c++11 -O3 LockFreeQueueTest.cpp -o test -lrt -pthread

    4.4場景分析(不同場景的實現)

    單生產者單消費者隊列(SPSC 隊列):在這種隊列中,只有一個生產者線程和一個消費者線程操作該隊列。由于只有一個線程操作隊列,因此不需要考慮線程同步和數據競爭的問題,可以實現非常高效的數據訪問。

    例如,在某些特定的任務處理場景中,一個線程負責生成任務,另一個線程負責處理任務,此時使用 SPSC 隊列可以避免復雜的同步機制,提高處理效率。

    多生產者多消費者隊列(MPMC 隊列):在這種隊列中,有多個生產者線程和多個消費者線程操作該隊列。由于存在多個線程同時操作隊列,因此必須考慮線程同步和數據競爭的問題,需要使用一些同步機制來保證數據的正確性。

    常見的實現方式是使用原子操作和 CAS 等技術。例如,在一個高并發的服務器程序中,多個客戶端請求可以看作是生產者,服務器的多個處理線程可以看作是消費者,使用 MPMC 隊列可以有效地管理這些請求和處理任務。

    單生產者多消費者隊列(SPMC 隊列):在這種隊列中,生產者線程向隊列中寫入數據,多個消費者線程從隊列中讀取數據。這種隊列的實現可以使用原子操作或者互斥鎖來實現線程同步。

    比如在一個視頻處理系統中,一個視頻采集線程作為生產者,多個視頻編碼線程作為消費者,使用 SPMC 隊列可以實現高效的數據傳輸和處理。

    多生產者單消費者隊列(MPSC 隊列):在這種隊列中,多個生產者線程向隊列中寫入數據,一個消費者線程從隊列中讀取數據。這種隊列的實現也可以使用原子操作或者互斥鎖來實現線程同步。

    例如在一個日志收集系統中,多個日志生成線程作為生產者,一個日志分析線程作為消費者,使用 MPSC 隊列可以方便地管理日志數據。

    4.5常見隊列形式

    ⑴鏈式隊列(Lock-free Linked Queue)

    • 鏈式隊列是一種基于鏈表實現的隊列,每個節點包含一個數據元素和一個指向下一個節點的指針。

    • 特點:可以動態地分配和釋放內存,適用于數據量不確定或者數據大小不固定的情況。在多線程環境下,需要使用無鎖算法來避免鎖的性能損失。

    • 例如,在一個實時數據采集系統中,數據量可能隨時變化,使用鏈式隊列可以靈活地適應這種變化。

    ⑵數組隊列(Lock-free Array Queue)

    • 數組隊列是一種基于數組實現的隊列,它可以提高數據的讀寫效率,適用于數據量比較大且大小固定的情況。

    • 實現比較簡單,可以使用一個指針來記錄隊尾位置,一個指針來記錄隊頭位置。在多線程環境下,需要使用無鎖算法來避免鎖的性能損失。

    • 比如在一個圖像處理系統中,圖像數據的大小相對固定,使用數組隊列可以提高數據處理效率。

    ⑶環形隊列

    • 實現環形隊列的方式可以基于數組或者基于鏈表。

    • 優點:可以有效地利用內存空間,避免了數據的移動和浪費。在多線程環境下,也可以使用無鎖算法來實現高效的數據訪問。

    • 例如,在一個網絡數據包處理系統中,環形隊列可以快速地存儲和讀取數據包,提高系統的性能。

    五、無鎖隊列的性能優勢

    5.1高效性

    無鎖隊列之所以能夠避免鎖競爭和開銷,從而提高性能,主要有以下幾個原因。首先,鎖的使用會帶來上下文切換和線程調度延遲。當一個線程獲取鎖時,如果其他線程也在競爭這個鎖,那么這些線程可能會被阻塞,等待鎖的釋放。而上下文切換和線程調度需要消耗一定的時間和系統資源,這會降低程序的執行效率。無鎖隊列通過使用原子操作和 CAS 等技術,避免了鎖的使用,從而減少了上下文切換和線程調度的次數,提高了程序的性能。

    其次,無鎖隊列可以更好地利用處理器的緩存。在多線程環境下,鎖的使用可能會導致緩存一致性問題,因為多個線程可能會同時訪問和修改共享數據。為了保證數據的一致性,處理器需要進行緩存同步操作,這會降低緩存的命中率,增加內存訪問的延遲。無鎖隊列通過使用原子操作和 CAS 等技術,可以避免緩存一致性問題,提高緩存的命中率,從而減少內存訪問的延遲,提高程序的性能。

    據統計,在某些高并發的場景下,無鎖隊列的性能可以比有鎖隊列提高數倍甚至數十倍。

    5.2線程安全

    無鎖隊列在多線程環境下具有很高的安全性。這是因為無鎖隊列通過使用原子操作和 CAS 等技術,確保了對隊列的操作是原子性的。原子操作是不可分割的操作,它要么全部執行成功,要么全部執行失敗,不會出現部分執行成功的情況。CAS 操作可以確保在對隊列進行操作時,不會被其他線程干擾。如果當前內存地址中存儲的值與期望值不一致,說明有其他線程進行了寫入操作,此時可以重新嘗試 CAS 操作,直到成功為止。

    此外,無鎖隊列還可以避免死鎖問題。在有鎖隊列中,如果多個線程同時獲取鎖,并且在獲取鎖的順序上出現問題,就可能會導致死鎖。而無鎖隊列不需要使用鎖,自然也就避免了死鎖問題。

    5.3可擴展性和低延遲

    無鎖隊列具有很好的可擴展性,可以在多個處理器上并行運行。在多處理器系統中,無鎖隊列可以更好地利用多個處理器的資源,提高隊列的吞吐量。這是因為無鎖隊列通過使用原子操作和 CAS 等技術,避免了鎖的競爭,從而可以讓多個線程在不同的處理器上同時對隊列進行操作。

    無鎖隊列還具有低延遲的特點。在實時系統中,低延遲是非常重要的。無鎖隊列可以實現非阻塞式的數據訪問,從而降低隊列的延遲。這是因為無鎖隊列通過使用原子操作和 CAS 等技術,可以在不阻塞其他線程的情況下,對隊列進行操作。如果當前操作無法成功,可以立即返回,而不會等待其他線程釋放鎖。這樣可以減少線程的等待時間,提高系統的響應速度。

    例如,在一個高并發的網絡服務器中,無鎖隊列可以快速地處理大量的網絡請求,提高服務器的響應速度和吞吐量。

    六、Kfifo內核隊列

    計算機科學家已經證明,當只有一個讀線程和一個寫線程并發操作時,不需要任何額外的鎖,就可以確保是線程安全的,也即kfifo使用了無鎖編程技術,以提高kernel的并發。

    Linux kernel里面從來就不缺少簡潔,優雅和高效的代碼,只是我們缺少發現和品味的眼光。在Linux kernel里面,簡潔并不表示代碼使用神出鬼沒的超然技巧,相反,它使用的不過是大家非常熟悉的基礎數據結構,但是kernel開發者能從基礎的數據結構中,提煉出優美的特性。

    kfifo就是這樣的一類優美代碼,它十分簡潔,絕無多余的一行代碼,卻非常高效。

    關于kfifo信息如下:

    本文分析的原代碼版本: 2.6.24.4
    kfifo的定義文件: kernel/kfifo.c
    kfifo的頭文件: include/linux/kfifo.h

    kfifo是Linux內核的一個FIFO數據結構,采用環形循環隊列的數據結構來實現,提供一個無邊界的字節流服務,并且使用并行無鎖編程技術,即單生產者單消費者場景下兩個線程可以并發操作,不需要任何加鎖行為就可以保證kfifo線程安全。

    kfifo代碼既然肩負著這么多特性,那我們先一敝它的代碼:

    struct kfifo {
    unsigned char *buffer; /* the buffer holding the data */
    unsigned int size; /* the size of the allocated buffer */
    unsigned int in; /* data is added at offset (in % size) */
    unsigned int out; /* data is extracted from off. (out % size) */
    spinlock_t *lock; /* protects concurrent modifications */
    };

    這是kfifo的數據結構,kfifo主要提供了兩個操作,__kfifo_put(入隊操作)和__kfifo_get(出隊操作)。 它的各個數據成員如下:

    • buffer: 用于存放數據的緩存

    • size: buffer空間的大小,在初化時,將它向上擴展成2的冪(如5,向上擴展 與它最接近的值且是2的n次方的值是2^3,即8)

    • lock: 如果使用不能保證任何時間最多只有一個讀線程和寫線程,需要使用該lock實施同步。

    • in, out: 和buffer一起構成一個循環隊列。 in指向buffer中隊頭,而且out指向buffer中的隊尾

    它的結構如示圖如下:

    +--------------------------------------------------------------+
    | |<----------data---------->| |
    +--------------------------------------------------------------+
    ^ ^ ^
    | | |
    out in size

    當然,內核開發者使用了一種更好的技術處理了in, out和buffer的關系,我們將在下面進行詳細分析。

    6.1kfifo功能描述

    kfifo提供如下對外功能規格

    • 只支持一個讀者和一個讀者并發操作

    • 無阻塞的讀寫操作,如果空間不夠,則返回實際訪問空間

    (1)kfifo_alloc 分配kfifo內存和初始化工作

    struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
    {
    unsigned char *buffer;
    struct kfifo *ret;

    /*
    * round up to the next power of 2, since our 'let the indices
    * wrap' tachnique works only in this case.
    */
    if (size & (size - 1)) {
    BUG_ON(size > 0x80000000);
    size = roundup_pow_of_two(size);
    }

    buffer = kmalloc(size, gfp_mask);
    if (!buffer)
    return ERR_PTR(-ENOMEM);

    ret = kfifo_init(buffer, size, gfp_mask, lock);

    if (IS_ERR(ret))
    kfree(buffer);

    return ret;
    }

    這里值得一提的是,kfifo->size的值總是在調用者傳進來的size參數的基礎上向2的冪擴展(roundup_pow_of_two,我自己的實現在文章末尾),這是內核一貫的做法。這樣的好處不言而喻——對kfifo->size取模運算可以轉化為與運算,如下:

    kfifo->in % kfifo->size 可以轉化為 kfifo->in & (kfifo->size – 1)

    在kfifo_alloc函數中,使用size & (size – 1)來判斷size 是否為2冪,如果條件為真,則表示size不是2的冪,然后調用roundup_pow_of_two將之向上擴展為2的冪。

    這都是常用的技巧,只不過大家沒有將它們結合起來使用而已,下面要分析的__kfifo_put和__kfifo_get則是將kfifo->size的特點發揮到了極致。

    (2)__kfifo_put和__kfifo_get巧妙的入隊和出隊

    __kfifo_put是入隊操作,它先將數據放入buffer里面,最后才修改in參數;__kfifo_get是出隊操作,它先將數據從buffer中移走,最后才修改out。(確保即使in和out修改失敗,也可以再來一遍)

    你會發現in和out兩者各司其職。下面是__kfifo_put和__kfifo_get的代碼

    unsigned int __kfifo_put(struct kfifo *fifo,
    unsigned char *buffer, unsigned int len)
    {
    unsigned int l;

    len = min(len, fifo->size - fifo->in + fifo->out);
    /*
    * Ensure that we sample the fifo->out index -before- we
    * start putting bytes into the kfifo.
    */
    smp_mb();

    /* first put the data starting from fifo->in to buffer end */
    l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
    memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);

    /* then put the rest (if any) at the beginning of the buffer */
    memcpy(fifo->buffer, buffer + l, len - l);

    /*
    * Ensure that we add the bytes to the kfifo -before-
    * we update the fifo->in index.
    */

    smp_wmb();

    fifo->in += len;

    return len;
    }

    奇怪嗎?代碼完全是線性結構,沒有任何if-else分支來判斷是否有足夠的空間存放數據。內核在這里的代碼非常簡潔,沒有一行多余的代碼。

    l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));

    這個表達式計算當前寫入的空間,換成人可理解的語言就是:

    l = kfifo可寫空間和預期寫入空間的最小值

    (3)使用min宏來代if-else分支

    __kfifo_get也應用了同樣技巧,代碼如下:

    unsigned int __kfifo_get(struct kfifo *fifo,
    unsigned char *buffer, unsigned int len)
    {
    unsigned int l;

    len = min(len, fifo->in - fifo->out);
    /*
    * Ensure that we sample the fifo->in index -before- we
    * start removing bytes from the kfifo.
    */
    smp_rmb();

    /* first get the data from fifo->out until the end of the buffer */
    l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
    memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);

    /* then get the rest (if any) from the beginning of the buffer */
    memcpy(buffer + l, fifo->buffer, len - l);

    /*
    * Ensure that we remove the bytes from the kfifo -before-
    * we update the fifo->out index.
    */

    smp_mb();

    fifo->out += len;

    return len;
    }

    認真讀兩遍吧,我也讀了多次,每次總是有新發現,因為in, out和size的關系太巧妙了,竟然能利用上unsigned int回繞的特性。

    原來,kfifo每次入隊或出隊,kfifo->in或kfifo->out只是簡單地kfifo->in/kfifo->out += len,并沒有對kfifo->size 進行取模運算。因此kfifo->in和kfifo->out總是一直增大,直到unsigned in最大值時,又會繞回到0這一起始端。但始終滿足:

    kfifo->in - kfifo->out <= kfifo->size
    即使kfifo->in回繞到了0的那一端,這個性質仍然是保持的。

    對于給定的kfifo:

    數據空間長度為:kfifo->in - kfifo->out
    而剩余空間(可寫入空間)長度為:kfifo->size - (kfifo->in - kfifo->out)

    盡管kfifo->in和kfofo->out一直超過kfifo->size進行增長,但它對應在kfifo->buffer空間的下標卻是如下:

    kfifo->in % kfifo->size (i.e. kfifo->in & (kfifo->size - 1))
    kfifo->out % kfifo->size (i.e. kfifo->out & (kfifo->size - 1))

    往kfifo里面寫一塊數據時,數據空間、寫入空間和kfifo->size的關系如果滿足:

    kfifo->in % size + len > size

    那就要做寫拆分了,見下圖:

    kfifo_put(寫)空間開始地址
    |
    \_/
    |XXXXXXXXXX
    XXXXXXXX|
    +--------------------------------------------------------------+
    | |<----------data---------->| |
    +--------------------------------------------------------------+
    ^ ^ ^
    | | |
    out%size in%size size
    ^
    |
    寫空間結束地址

    第一塊當然是: [kfifo->in % kfifo->size, kfifo->size]
    第二塊當然是:[0, len - (kfifo->size - kfifo->in % kfifo->size)]

    下面是代碼,細細體味吧:

    /* first put the data starting from fifo->in to buffer end */   
    l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
    memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);

    /* then put the rest (if any) at the beginning of the buffer */
    memcpy(fifo->buffer, buffer + l, len - l);

    對于kfifo_get過程,也是類似的,請各位自行分析。

    (4)kfifo_get和kfifo_put無鎖并發操作

    計算機科學家已經證明,當只有一個讀經程和一個寫線程并發操作時,不需要任何額外的鎖,就可以確保是線程安全的,也即kfifo使用了無鎖編程技術,以提高kernel的并發。

    kfifo使用in和out兩個指針來描述寫入和讀取游標,對于寫入操作,只更新in指針,而讀取操作,只更新out指針,可謂井水不犯河水,示意圖如下:

    |<--寫入-->|
    +--------------------------------------------------------------+
    | |<----------data----->| |
    +--------------------------------------------------------------+
    |<--讀取-->|
    ^ ^ ^
    | | |
    out in size

    為了避免讀者看到寫者預計寫入,但實際沒有寫入數據的空間,寫者必須保證以下的寫入順序:

    往[kfifo->in, kfifo->in + len]空間寫入數據
    更新kfifo->in指針為 kfifo->in + len

    在操作1完成時,讀者是還沒有看到寫入的信息的,因為kfifo->in沒有變化,認為讀者還沒有開始寫操作,只有更新kfifo->in之后,讀者才能看到。

    那么如何保證1必須在2之前完成,秘密就是使用內存屏障:smp_mb(),smp_rmb(), smp_wmb(),來保證對方觀察到的內存操作順序。

    6.2kfifo內核隊列實現

    kfifo數據結構定義如下:

    struct kfifo
    {
    unsigned char *buffer;
    unsigned int size;
    unsigned int in;
    unsigned int out;
    spinlock_t *lock;
    };

    // 創建隊列
    struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
    {
    struct kfifo *fifo;
    // 判斷是否為2的冪
    BUG_ON(!is_power_of_2(size));
    fifo = kmalloc(sizeof(struct kfifo), gfp_mask);
    if (!fifo)
    return ERR_PTR(-ENOMEM);
    fifo->buffer = buffer;
    fifo->size = size;
    fifo->in = fifo->out = 0;
    fifo->lock = lock;

    return fifo;
    }

    // 分配空間
    struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
    {
    unsigned char *buffer;
    struct kfifo *ret;
    // 判斷是否為2的冪
    if (!is_power_of_2(size))
    {
    BUG_ON(size > 0x80000000);
    // 向上擴展成2的冪
    size = roundup_pow_of_two(size);
    }

    buffer = kmalloc(size, gfp_mask);
    if (!buffer)
    return ERR_PTR(-ENOMEM);
    ret = kfifo_init(buffer, size, gfp_mask, lock);

    if (IS_ERR(ret))
    kfree(buffer);
    return ret;
    }

    void kfifo_free(struct kfifo *fifo)
    {
    kfree(fifo->buffer);
    kfree(fifo);
    }

    // 入隊操作
    static inline unsigned int kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len)
    {
    unsigned long flags;
    unsigned int ret;
    spin_lock_irqsave(fifo->lock, flags);
    ret = __kfifo_put(fifo, buffer, len);
    spin_unlock_irqrestore(fifo->lock, flags);
    return ret;
    }

    // 出隊操作
    static inline unsigned int kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len)
    {
    unsigned long flags;
    unsigned int ret;
    spin_lock_irqsave(fifo->lock, flags);
    ret = __kfifo_get(fifo, buffer, len);
    //當fifo->in == fifo->out時,buufer為空
    if (fifo->in == fifo->out)
    fifo->in = fifo->out = 0;

    spin_unlock_irqrestore(fifo->lock, flags);
    return ret;
    }

    // 入隊操作
    unsigned int __kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len)
    {
    unsigned int l;
    //buffer中空的長度
    len = min(len, fifo->size - fifo->in + fifo->out);
    // 內存屏障:smp_mb(),smp_rmb(), smp_wmb()來保證對方觀察到的內存操作順序
    smp_mb();
    // 將數據追加到隊列尾部
    l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
    memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
    memcpy(fifo->buffer, buffer + l, len - l);

    smp_wmb();
    //每次累加,到達最大值后溢出,自動轉為0
    fifo->in += len;
    return len;
    }

    // 出隊操作
    unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len)
    {
    unsigned int l;
    //有數據的緩沖區的長度
    len = min(len, fifo->in - fifo->out);
    smp_rmb();
    l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
    memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
    memcpy(buffer + l, fifo->buffer, len - l);
    smp_mb();
    fifo->out += len; //每次累加,到達最大值后溢出,自動轉為0

    return len;
    }

    static inline void __kfifo_reset(struct kfifo *fifo)
    {
    fifo->in = fifo->out = 0;
    }

    static inline void kfifo_reset(struct kfifo *fifo)
    {
    unsigned long flags;
    spin_lock_irqsave(fifo->lock, flags);
    __kfifo_reset(fifo);
    spin_unlock_irqrestore(fifo->lock, flags);
    }

    static inline unsigned int __kfifo_len(struct kfifo *fifo)
    {
    return fifo->in - fifo->out;
    }

    static inline unsigned int kfifo_len(struct kfifo *fifo)
    {
    unsigned long flags;
    unsigned int ret;
    spin_lock_irqsave(fifo->lock, flags);
    ret = __kfifo_len(fifo);
    spin_unlock_irqrestore(fifo->lock, flags);
    return ret;
    }

    6.3kfifo設計要點

    (1)保證buffer size為2的冪

    kfifo->size值在調用者傳遞參數size的基礎上向2的冪擴展,目的是使kfifo->size取模運算可以轉化為位與運算(提高運行效率)。kfifo->in % kfifo->size轉化為 kfifo->in & (kfifo->size – 1)

    保證size是2的冪可以通過位運算的方式求余,在頻繁操作隊列的情況下可以大大提高效率。

    (2)使用spin_lock_irqsave與spin_unlock_irqrestore 實現同步。

    Linux內核中有spin_lock、spin_lock_irq和spin_lock_irqsave保證同步。

    static inline void __raw_spin_lock(raw_spinlock_t *lock)
    {
    preempt_disable();
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
    }

    static inline void __raw_spin_lock_irq(raw_spinlock_t *lock)
    {
    local_irq_disable();
    preempt_disable();
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
    }

    spin_lock比spin_lock_irq速度快,但并不是線程安全的。spin_lock_irq增加調用local_irq_disable函數,即禁止本地中斷,是線程安全的,既禁止本地中斷,又禁止內核搶占。

    spin_lock_irqsave是基于spin_lock_irq實現的一個輔助接口,在進入和離開臨界區后,不會改變中斷的開啟、關閉狀態。

    如果自旋鎖在中斷處理函數中被用到,在獲取自旋鎖前需要關閉本地中斷,spin_lock_irqsave實現如下:

    • A、保存本地中斷狀態;

    • B、關閉本地中斷;

    • C、獲取自旋鎖。

    解鎖時通過 spin_unlock_irqrestore完成釋放鎖、恢復本地中斷到原來狀態等工作。

    (3)線性代碼結構

    代碼中沒有任何if-else分支來判斷是否有足夠的空間存放數據,kfifo每次入隊或出隊只是簡單的 +len 判斷剩余空間,并沒有對kfifo->size 進行取模運算,所以kfifo->in和kfifo->out總是一直增大,直到unsigned in超過最大值時繞回到0這一起始端,但始終滿足:kfifo->in - kfifo->out <= kfifo->size。

    (4)使用Memory Barrier

    • mb():適用于多處理器和單處理器的內存屏障。

    • rmb():適用于多處理器和單處理器的讀內存屏障。

    • wmb():適用于多處理器和單處理器的寫內存屏障。

    • smp_mb():適用于多處理器的內存屏障。

    • smp_rmb():適用于多處理器的讀內存屏障。

    • smp_wmb():適用于多處理器的寫內存屏障。

    Memory Barrier使用場景如下:

    • A、實現同步原語(synchronization primitives)

    • B、實現無鎖數據結構(lock-free data structures)

    • C、驅動程序

    程序在運行時內存實際訪問順序和程序代碼編寫的訪問順序不一定一致,即內存亂序訪問。內存亂序訪問行為出現是為了提升程序運行時的性能。內存亂序訪問主要發生在兩個階段:

    • A、編譯時,編譯器優化導致內存亂序訪問(指令重排)。

    • B、運行時,多CPU間交互引起內存亂序訪問。

    Memory Barrier能夠讓CPU或編譯器在內存訪問上有序。Memory barrier前的內存訪問操作必定先于其后的完成。Memory Barrier包括兩類:

    • A、編譯器Memory Barrier。

    • B、CPU Memory Barrier。

    通常,編譯器和CPU引起內存亂序訪問不會帶來問題,但如果程序邏輯的正確性依賴于內存訪問順序,內存亂序訪問會帶來邏輯上的錯誤。

    在編譯時,編譯器對代碼做出優化時可能改變實際執行指令的順序(如GCC的O2或O3都會改變實際執行指令的順序)。

    在運行時,CPU雖然會亂序執行指令,但在單個CPU上,硬件能夠保證程序執行時所有的內存訪問操作都是按程序代碼編寫的順序執行的,Memory Barrier沒有必要使用(不考慮編譯器優化)。為了更快執行指令,CPU采取流水線的執行方式,編譯器在編譯代碼時為了使指令更適合CPU的流水線執行方式以及多CPU執行,原本指令就會出現亂序的情況。在亂序執行時,CPU真正執行指令的順序由可用的輸入數據決定,而非程序員編寫的順序。

    七、總結

    C++ 無鎖隊列在多線程編程中占據著舉足輕重的地位。它不僅是解決多核心優化問題的關鍵技術之一,也是邁向高效多線程編程的重要基石。

    在多線程編程的實際應用中,無鎖隊列的性能優勢使其在各種場景下都能發揮出色的作用。無論是游戲開發、高性能計算、并發編程還是高并發網絡編程,無鎖隊列都能為程序提供高效的數據處理和同步機制。

    從單生產者單消費者隊列到多生產者多消費者隊列,不同的場景需求都能找到合適的無鎖隊列實現方式。鏈式隊列、數組隊列和環形隊列等多種形式的無鎖隊列,為開發者提供了豐富的選擇,以適應不同的數據量和應用場景。

    無鎖隊列的高效性、線程安全性、可擴展性和低延遲等特點,使其成為處理高并發任務的理想選擇。它避免了鎖競爭和死鎖問題,減少了上下文切換和線程調度的開銷,提高了緩存的命中率,從而極大地提升了程序的性能。

    隨著多核心處理器的普及和高并發應用的不斷增加,C++ 無鎖隊列的應用前景將更加廣闊。開發者可以繼續深入研究和優化無鎖隊列的實現,以滿足不斷增長的性能需求和復雜的應用場景。相信在未來的多線程編程領域,C++ 無鎖隊列將繼續發揮重要作用,為構建高效、可靠的多線程應用程序提供有力支持。

      轉藏 分享 獻花(0

      0條評論

      發表

      請遵守用戶 評論公約

      類似文章 更多

      主站蜘蛛池模板: 在国产线视频A在线视频| 中文人妻av高清一区二区| 无码人妻精品一区二区三区下载 | 国产成人A在线视频免费| 精品一区二区三区免费播放| 国产精品多p对白交换绿帽| 日本一高清二区视频久二区| 天堂资源中文最新版在线一区| 亚洲国产午夜精品福利| 国产精品毛片无遮挡高清| 99精品热在线在线观看视| 国产成人亚洲综合| 92精品国产自产在线观看481页| 人妻AV中文字幕一区二区三区 | 99riav国产精品视频| 国产精品亚洲А∨天堂免| 国产丰满乱子伦无码专区| 人人超人人超碰超国产| 四虎永久免费精品视频| 国产高清自产拍AV在线| 色狠狠色噜噜AV一区| 人人妻人人澡人人爽人人DVD| 亚洲欧洲日产国码AV天堂偷窥 | 中文字幕人妻无码一夲道| 99久久免费只有精品国产| 人妻丝袜AV中文系列先锋影音| 国内大量揄拍人妻精品視頻| 久久精品第九区免费观看| 欧美变态另类zozo| 性无码专区无码| 亚洲爆乳WWW无码专区| 欧美变态另类zozo| 猫咪AV成人永久网站在线观看| 亚洲欧美日韩综合一区在线| 中文字幕国产精品日韩| 亚洲AV区无码字幕中文色| 亚欧洲乱码视频一二三区| 九九在线精品国产| 最新国产精品亚洲| 久久亚洲男人第一AV网站| 亚洲中文字幕无码不卡电影|