
在C++中终了无锁部队是一个复杂的任务,因为无锁编程触及到复杂的内存操作和同步机制,需要真切相聚并发编程和硬件级别的原子操作。无锁部队的上风在于高并发场景下不错提供更好的性能四色播播,因为它幸免了传统锁机制带来的线程竞争和崎岖文切换支拨。
C++无锁部队是一种多线程编程技巧,它不错在不使用锁的情况下终了线程安全的部队。它不错提高多线程重要的性能,无锁部队的主要想想是让多个线程同期阅览部队,而不需要使用锁来保护分享资源。这不错幸免锁竞争和死锁等问题,从而提高重要的成果。终了无锁部队闲居需要使用C++11或更高版块的原子操作库(如<atomic>),以及一些高等的并发截止技巧。
一、无锁部队详细无锁部队(Lock-Free Queue)是一种并发数据结构,用于在多线程环境下终了高效的数据交换。与传统的基于锁的部队比较,无锁部队使用了一些特殊的算法和技巧,幸免了线程之间的互斥操作,从而提高了并发性能和反映性。
无锁部队闲居基于原子操作(atomic operations)或其他底层同步原语来终了,况兼它们遴选一些私密的步骤来确保操作的正确性。主要想想是通过使用原子读写操作或肖似的机制,在莫得显式锁定系数这个词部队的情况下终了线程安全。
典型的无锁部队算法有轮回缓冲区(Circular Buffer)和链表(Linked List)等。轮回缓冲区闲居使用两个指针(head 和 tail)来线路部队的开首和收尾位置,讹诈自旋、CAS (Compare-and-Swap) 等原子操作来进行入队和出队操作。链表则通过讹诈 CAS 操作插入或删除节点来终了并发阅览。
优点:
提供了更好的并发性能,幸免了互斥操作带来的性能瓶颈。
对于高度竞争情况下不错提供更好的可伸缩性。
缺陷:
终了相对复杂,需要研讨并发安全和正确性问题。
在高度竞争的情况下可能出现自旋恭候导致的性能亏空。
二、无锁部队旨趣无锁部队的旨趣是通过使用原子操作(atomic operations)或其他底层同步原语来终了并发安全。它们幸免了传统锁机制中的互斥操作,以提高并发性能和反映性。
典型的无锁部队算法有轮回缓冲区(Circular Buffer)和链表(Linked List)等。
在轮回缓冲区的终了中,闲居使用两个指针来线路部队的开首位置和收尾位置,即头指针(head)和尾指针(tail)。入队时,通过自旋、CAS (Compare-and-Swap) 等原子操作更新尾指针,并将元素放入相应位置。出队时,相通讹诈原子操作更新头指针,并复返对应位置上的元素。
链表终了无锁部队时,在插入或删除节点时使用 CAS 操作来确保唯唯独个线程到手修改节点的指针值。这样不错幸免对系数这个词链表进行加锁操作。
不管是轮回缓冲区照旧链表终了,重要点在于怎样讹诈原子操作确保不同线程之间的配合与一致性。需要仔细处理并发情况下可能出现的竞争要求,并瞎想合适的算法来保证正确性和性能。
2.1部队操作模子部队是一种相等伏击的数据结构,其秉性是先进先出(FIFO),合适活水线业务经由。在程度间通讯、收罗通讯间时常遴选部队作念缓存,缓解数据处理压力。凭据操作部队的场景分为:单坐蓐者——单铺张者、多坐蓐者——单铺张者、单坐蓐者——多铺张者、多坐蓐者——多铺张者四大模子。凭据部队中数据分为:部队中的数据是定长的、部队中的数据是变长的。
(1)单坐蓐者——单铺张者
图片
(2)多坐蓐者——单铺张者
图片
(3)单坐蓐者——多铺张者
图片
(4)多坐蓐者——多铺张者
图片
2.2CAS操作CAS即Compare and Swap,是系数CPU领导都相沿CAS的原子操作(X86中CMPXCHG汇编领导),用于终了终了各式无锁(lock free)数据结构。
CAS操作的C话语终了如下:
bool compare_and_swap ( int *memory_location, int expected_value, int new_value){ if (*memory_location == expected_value) { *memory_location = new_value; return true; } return false;}
CAS用于搜检一个内存位置是否包含预期值,若是包含,则把新值复赋值到内存位置。到手复返true,失败复返false。
(1)GGC对CAS相沿,GCC4.1+版块中相沿CAS原子操作。
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...);type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);
(2)Windows对CAS相沿,Windows中使用Windows API相沿CAS。
LONG InterlockedCompareExchange( LONG volatile *Destination, LONG ExChange, LONG Comperand);
(3)C11对CAS相沿,C11 STL中atomic函数相沿CAS并不错跨平台。
template< class T >bool atomic_compare_exchange_weak( std::atomic* obj,T* expected, T desired );template< class T >bool atomic_compare_exchange_weak( volatile std::atomic* obj,T* expected, T desired );
其它原子操作如下:
Fetch-And-Add:一般用来对变量作念+1的原子操作
Test-and-set:写值到某个内存位置并传回其旧值
2.3部队数据定长与变长(1)部队数据定长
图片
(2)部队数据变长
图片
三、无锁部队有经营3.1boost有经营boost提供了三种无锁有经营,区分适用不同使用场景。
boost::lockfree::queue是相沿多个坐蓐者和多个铺张者线程的无锁部队。
boost::lockfree::stack是相沿多个坐蓐者和多个铺张者线程的无锁栈。
boost::lockfree::spsc_queue是仅相沿单个坐蓐者和单个铺张者线程的无锁部队,比boost::lockfree::queue性能更好。
Boost无锁数据结构的API通过轻量级原子锁终了lock-free,不是真确真义真义的无锁。
Boost提供的queue不错建设运转容量,添加新元素时若是容量不够,则总容量自动增长;但对于无锁数据结构,添加新元素时若是容量不够,总容量不会自动增长。
3.2并发部队ConcurrentQueue遴选了无锁算法来终了并发操作。它基于CAS(Compare-and-Swap)原子操作和其他底层同步原语来保证线程安全性。具体来说,它使用自旋锁和原子领导来确保对部队的修改是原子的,况兼在多个线程之间分享数据时提供正确性保证。
ConcurrentQueue是基于C++终了的工业级无锁部队有经营。GitHub:https://github.com/cameron314/concurrentqueueReaderWriterQueue是基于C++终了的单坐蓐者单铺张者场景的无锁部队有经营。GitHub:https://github.com/cameron314/readerwriterqueue
ConcurrentQueue具有以下特色:
线程安全:多个线程不错同期对部队进行操作而无需异常加锁。
无贬抑:入队和出队操作闲居是曲贬抑的,况兼具有较低的支拨。
先进先出(FIFO)规章:元素按照插入规章陈列,在出队时会复返最早入队的元素。
使用ConcurrentQueue不错便捷地处理多个线程之间分享数据,并减少由于加锁引起的性能支拨。但需要防卫,天然ConcurrentQueue提供了高效、线程安全的并发操作,但在某些特定情况下可能不得当系数应用场景,因此在取舍数据结构时需要凭据具体需求进行评估。
3.3DisruptorDisruptor是一种高性能的并发编程框架,用于终了无锁(lock-free)的并发数据结构。它率先由LMAX Exchange开拓,并成为了其中枢来往引擎的重要组件。
Disruptor旨在料理在高度多线程环境下的数据分享和通讯问题。它基于环形缓冲区(Ring Buffer)和事件驱动模子,通过优化内存阅览和线程调遣,提供了相等高效的音信传递机制。
Disruptor是英外洋汇来往公司LMAX基于JAVA开拓的一个高性能部队。GitHub:https://github.com/LMAX-Exchange/disruptor
主要特色如下:
无锁瞎想:Disruptor使用CAS(Compare-and-Swap)等无锁算法来幸免使用传统锁带来的竞争和贬抑。
高蒙胧量:Disruptor讹诈环形缓冲区和预分派内存等技巧,在保证正确性前提下追求尽可能高的处理速率。
低蔓延:由于无锁瞎想和紧凑的内存布局,Disruptor省略终了相等低的音信处理蔓延。
线程间配合:Disruptor提供了生动而浩大的事件发布、铺张者恭候及触发机制,可用于终了复杂的线程间通讯形态。
使用Disruptor不错有用地料理坐蓐者-铺张者模子中数据传递过程中的性能瓶颈,零星适用于高并发、低蔓延的应用场景,举例金融来往系统、音信部队等。计算词,由于Disruptor对编程模子和相聚要求较高,使用时需要仔细研讨,并凭据具体需求评估是否得当。
四、无锁部队终了4.1环形缓冲区RingBuffer是坐蓐者和铺张者模子中常用的数据结构,坐蓐者将数据追加到数组尾端,当达到数组的尾部时,坐蓐者绕回到数组的头部;铺张者从数组头端取走数据,当到达数组的尾部时,铺张者绕回到数组头部。
若是唯唯独个坐蓐者和一个铺张者,环形缓冲区不错无锁阅览,环形缓冲区的写入index只允许坐蓐者阅览并修改,只须坐蓐者在更新index前将新的值保存到缓冲区中,则铺张者将永恒看到一致的数据结构;读取index也只允许铺张者阅览并修改,铺张者只须在取走数据后更新读index,则坐蓐者将永恒看到一致的数据结构。
空部队时,front与rear绝顶;当有元素进队,则rear后移;有元素出队,则front后移。
空部队时,rear等于front;满部队时,部队尾部空一个位置,因此判断轮回部队满时使用(rear-front+maxn)%maxn。
入队操作:
data[rear] = x;rear = (rear+1)%maxn;
出队操作:
x = data[front];front = (front+1)%maxn;
单坐蓐者单铺张者
对于单坐蓐者和单铺张者场景,由于read_index和write_index都只会有一个线程写,因此不需要加锁也不需要原子操作,凯旋修改即可,但读写数据时需要研讨碰到数组尾部的情况。
线程对write_index和read_index的读写操作如下:
(1)写操作。先判断部队时否为满,若是部队未满,则先写数据,写完数据后再修改write_index。
(2)读操作。先判断部队是否为空,若是部队不为空,则先读数据,读完再修改read_index。
多坐蓐者单铺张者
多坐蓐者和单铺张者场景中,由于多个坐蓐者都会修改write_index,是以在不加锁的情况下必须使用原子操作。
4.2RingBuffer终了RingBuffer.hpp文献:
#pragma once template <class T>class RingBuffer{public: RingBuffer(unsigned size): m_size(size), m_front(0), m_rear(0) { m_data = new T[size]; } ~RingBuffer() { delete [] m_data; m_data = NULL; } inline bool isEmpty() const { return m_front == m_rear; } inline bool isFull() const { return m_front == (m_rear + 1) % m_size; } bool push(const T& value) { if(isFull()) { return false; } m_data[m_rear] = value; m_rear = (m_rear + 1) % m_size; return true; } bool push(const T* value) { if(isFull()) { return false; } m_data[m_rear] = *value; m_rear = (m_rear + 1) % m_size; return true; } inline bool pop(T& value) { if(isEmpty()) { return false; } value = m_data[m_front]; m_front = (m_front + 1) % m_size; return true; } inline unsigned int front()const { return m_front; } inline unsigned int rear()const { return m_rear; } inline unsigned int size()const { return m_size; }private: unsigned int m_size;// 部队长度 int m_front;// 部队头部索引 int m_rear;// 部队尾部索引 T* m_data;// 数据缓冲区};
RingBufferTest.cpp测试代码:
#include <stdio.h>#include <thread>#include <unistd.h>#include <sys/time.h>#include "RingBuffer.hpp" class Test{public: Test(int id = 0, int value = 0) { this->id = id; this->value = value; sprintf(data, "id = %d, value = %d\n", this->id, this->value); } void display() { printf("%s", data); }private: int id; int value; char data[128];}; double getdetlatimeofday(struct timeval *begin, struct timeval *end){ return (end->tv_sec + end->tv_usec * 1.0 / 1000000) - (begin->tv_sec + begin->tv_usec * 1.0 / 1000000);} RingBuffer<Test> queue(1 << 12);2u000 #define N (10 * (1 << 20)) void produce(){ struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.push(Test(i % 1024, i))) { i++; } } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);} void consume(){ sleep(1); Test test; struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.pop(test)) { // test.display(); i++; } } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f, size=%u \n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);} int main(int argc, char const *argv[]){ std::thread producer1(produce); std::thread consumer(consume); producer1.join(); consumer.join(); return 0;}
编译:
g++ --std=c++11 RingBufferTest.cpp -o test -pthread
单坐蓐者单铺张者场景下,音信蒙胧量为350万条/秒阁下。
4.3LockFreeQueue终了LockFreeQueue.hpp:
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <fcntl.h>#include <stdbool.h>#include <sys/stat.h>#include <sys/types.h>#include <sys/time.h>#include <sys/mman.h> #define SHM_NAME_LEN 128#define MIN(a, b) ((a) > (b) ? (b) : (a))#define IS_POT(x) ((x) && !((x) & ((x)-1)))#define MEMORY_BARRIER __sync_synchronize() template <class T>class LockFreeQueue{protected: typedef struct { int m_lock; inline void spinlock_init() { m_lock = 0; } inline void spinlock_lock() { while(!__sync_bool_compare_and_swap(&m_lock, 0, 1)) {} } inline void spinlock_unlock() { __sync_lock_release(&m_lock); } } spinlock_t; public: // size:部队大小 // name:分享内存key的旅途称号,默许为NULL,使用数组算作底层缓冲区。 LockFreeQueue(unsigned int size, const char* name = NULL) { memset(shm_name, 0, sizeof(shm_name)); createQueue(name, size); } ~LockFreeQueue() { if(shm_name[0] == 0) { delete [] m_buffer; m_buffer = NULL; } else { if (munmap(m_buffer, m_size * sizeof(T)) == -1) { perror("munmap"); } if (shm_unlink(shm_name) == -1) { perror("shm_unlink"); } } } bool isFull()const {#ifdef USE_POT return m_head == (m_tail + 1) & (m_size - 1);#else return m_head == (m_tail + 1) % m_size;#endif } bool isEmpty()const { return m_head == m_tail; } unsigned int front()const { return m_head; } unsigned int tail()const { return m_tail; } bool push(const T& value) {#ifdef USE_LOCK m_spinLock.spinlock_lock();#endif if(isFull()) {#ifdef USE_LOCK m_spinLock.spinlock_unlock();#endif return false; } memcpy(m_buffer + m_tail, &value, sizeof(T));#ifdef USE_MB MEMORY_BARRIER;#endif #ifdef USE_POT m_tail = (m_tail + 1) & (m_size - 1);#else m_tail = (m_tail + 1) % m_size;#endif #ifdef USE_LOCK m_spinLock.spinlock_unlock();#endif return true; } bool pop(T& value) {#ifdef USE_LOCK m_spinLock.spinlock_lock();#endif if (isEmpty()) {#ifdef USE_LOCK m_spinLock.spinlock_unlock();#endif return false; } memcpy(&value, m_buffer + m_head, sizeof(T));#ifdef USE_MB MEMORY_BARRIER;#endif #ifdef USE_POT m_head = (m_head + 1) & (m_size - 1);#else m_head = (m_head + 1) % m_size;#endif #ifdef USE_LOCK m_spinLock.spinlock_unlock();#endif return true; } protected: virtual void createQueue(const char* name, unsigned int size) {#ifdef USE_POT if (!IS_POT(size)) { size = roundup_pow_of_two(size); }#endif m_size = size; m_head = m_tail = 0; if(name == NULL) { m_buffer = new T[m_size]; } else { int shm_fd = shm_open(name, O_CREAT | O_RDWR, 0666); if (shm_fd < 0) { perror("shm_open"); } if (ftruncate(shm_fd, m_size * sizeof(T)) < 0) { perror("ftruncate"); close(shm_fd); } void *addr = mmap(0, m_size * sizeof(T), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); if (addr == MAP_FAILED) { perror("mmap"); close(shm_fd); } if (close(shm_fd) == -1) { perror("close"); exit(1); } m_buffer = static_cast<T*>(addr); memcpy(shm_name, name, SHM_NAME_LEN - 1); }#ifdef USE_LOCK spinlock_init(m_lock);#endif } inline unsigned int roundup_pow_of_two(size_t size) { size |= size >> 1; size |= size >> 2; size |= size >> 4; size |= size >> 8; size |= size >> 16; size |= size >> 32; return size + 1; }protected: char shm_name[SHM_NAME_LEN]; volatile unsigned int m_head; volatile unsigned int m_tail; unsigned int m_size;#ifdef USE_LOCK spinlock_t m_spinLock;#endif T* m_buffer;};
#define USE_LOCK开启spinlock锁,多坐蓐者多铺张者场景#define USE_MB开启Memory Barrier#define USE_POT开启部队大小的2的幂对皆
LockFreeQueueTest.cpp测试文献:
#include "LockFreeQueue.hpp"#include <thread> //#define USE_LOCK class Test{public: Test(int id = 0, int value = 0) { this->id = id; this->value = value; sprintf(data, "id = %d, value = %d\n", this->id, this->value); } void display() { printf("%s", data); }private: int id; int value; char data[128];}; double getdetlatimeofday(struct timeval *begin, struct timeval *end){ return (end->tv_sec + end->tv_usec * 1.0 / 1000000) - (begin->tv_sec + begin->tv_usec * 1.0 / 1000000);} LockFreeQueue<Test> queue(1 << 10, "/shm"); #define N ((1 << 20)) void produce(){ struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.push(Test(i >> 10, i))) i++; } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);} void consume(){ Test test; struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.pop(test)) { //test.display(); i++; } } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);} int main(int argc, char const *argv[]){ std::thread producer1(produce); //std::thread producer2(produce); std::thread consumer(consume); producer1.join(); //producer2.join(); consumer.join(); return 0;}
多线程场景下,需要界说USE_LOCK宏,开启锁保护。
编译:
g++ --std=c++11 -O3 LockFreeQueueTest.cpp -o test -lrt -pthread五、kfifo内核部队
缱绻机科学家仍是证明,当唯唯独个读线程和一个写线程并发操作时,不需要任何异常的锁,就不错确保是线程安全的,也即kfifo使用了无锁编程技巧,以提高kernel的并发。
Linux kernel内部从来就不清寒浮松,优雅和高效的代码,仅仅咱们清寒发现和试吃的眼神。在Linux kernel内部,浮松并不线路代码使用神出鬼没的超然手段,相背,它使用的不外是环球相等练习的基础数据结构,然则kernel开拓者能从基础的数据结构中,索求出优好意思的秉性。
kfifo等于这样的一类优好意思代码,它十分浮松,绝无过剩的一转代码,却相等高效。
对于kfifo信息如下:
本文分析的原代码版块: 2.6.24.4kfifo的界说文献: kernel/kfifo.ckfifo的头文献: include/linux/kfifo.h
kfifo是Linux内核的一个FIFO数据结构,遴选环形轮回部队的数据结构来终了,提供一个在意界的字节流就业,况兼使用并行无锁编程技巧,即单坐蓐者单铺张者场景下两个线程不错并发操作,不需要任何加锁行径就不错保证kfifo线程安全。
kfifo代码既然肩负着这样多秉性,那咱们先一敝它的代码:
struct kfifo { unsigned char *buffer; /* the buffer holding the data */ unsigned int size; /* the size of the allocated buffer */ unsigned int in; /* data is added at offset (in % size) */ unsigned int out; /* data is extracted from off. (out % size) */ spinlock_t *lock; /* protects concurrent modifications */};
这是kfifo的数据结构,kfifo主要提供了两个操作,__kfifo_put(入队操作)和__kfifo_get(出队操作)。 它的各个数据成员如下:
buffer: 用于存放数据的缓存
size: buffer空间的大小,在初化时,将它进取膨胀成2的幂(如5,进取膨胀 与它最接近的值且是2的n次方的值是2^3,即8)
lock: 若是使用不成保证任何时辰最多唯唯独个读线程和写线程,需要使用该lock实施同步。
in, out: 和buffer一谈组成一个轮回部队。 in指向buffer中队头,而且out指向buffer中的队尾
它的结构如示图如下:
+--------------------------------------------------------------+| |<----------data---------->| |+--------------------------------------------------------------+ ^ ^ ^ | | | out in size
天然,内核开拓者使用了一种更好的技巧处理了in, out和buffer的计算,咱们将鄙人面进行详确分析。
5.1kfifo功能描写kfifo提供如下对外功能规格
擦玻璃 裸舞只相沿一个读者和一个读者并发操作
无贬抑的读写操作,若是空间不够,则复返本色阅览空间
(1)kfifo_alloc 分派kfifo内存和运更正责任
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock){ unsigned char *buffer; struct kfifo *ret; /* * round up to the next power of 2, since our 'let the indices * wrap' tachnique works only in this case. */ if (size & (size - 1)) { BUG_ON(size > 0x80000000); size = roundup_pow_of_two(size); } buffer = kmalloc(size, gfp_mask); if (!buffer) return ERR_PTR(-ENOMEM); ret = kfifo_init(buffer, size, gfp_mask, lock); if (IS_ERR(ret)) kfree(buffer); return ret;}
这里值得一提的是,kfifo->size的值老是在调用者传进来的size参数的基础上向2的幂膨胀(roundup_pow_of_two,我我方的终了在著作末尾),这是内核一贯的作念法。这样的平正了然于目——对kfifo->size取模运算不错更正为与运算,如下:
kfifo->in % kfifo->size 不错更正为 kfifo->in & (kfifo->size – 1)
在kfifo_alloc函数中,使用size & (size – 1)来判断size 是否为2幂,若是要求为真,则线路size不是2的幂,然后调用roundup_pow_of_two将之进取膨胀为2的幂。
这都是常用的手段,只不外环球莫得将它们迎阿起来使用长途,底下要分析的__kfifo_put和__kfifo_get则是将kfifo->size的特色证实到了极致。
(2)__kfifo_put和__kfifo_get私密的入队和出队
__kfifo_put是入队操作,它先将数据放入buffer内部,临了才修改in参数;__kfifo_get是出队操作,它先将数据从buffer中移走,临了才修改out。(确保即使in和out修改失败,也不错再来一遍)
你会发现in和out两者各司其职。底下是__kfifo_put和__kfifo_get的代码
unsigned int __kfifo_put(struct kfifo *fifo, unsigned char *buffer, unsigned int len){ unsigned int l; len = min(len, fifo->size - fifo->in + fifo->out); /* * Ensure that we sample the fifo->out index -before- we * start putting bytes into the kfifo. */ smp_mb(); /* first put the data starting from fifo->in to buffer end */ l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l); /* then put the rest (if any) at the beginning of the buffer */ memcpy(fifo->buffer, buffer + l, len - l); /* * Ensure that we add the bytes to the kfifo -before- * we update the fifo->in index. */ smp_wmb(); fifo->in += len; return len;}
奇怪吗?代码实足是线性结构,莫得任何if-else分支来判断是否有实足的空间存放数据。内核在这里的代码相等浮松,莫得一转过剩的代码。
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
这个抒发式缱绻刻下写入的空间,换成东谈主可相聚的话语等于:
l = kfifo可写空间和预期写入空间的最小值
(3)使用min宏来代if-else分支
__kfifo_get也应用了相通手段,代码如下:
unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len){ unsigned int l; len = min(len, fifo->in - fifo->out); /* * Ensure that we sample the fifo->in index -before- we * start removing bytes from the kfifo. */ smp_rmb(); /* first get the data from fifo->out until the end of the buffer */ l = min(len, fifo->size - (fifo->out & (fifo->size - 1))); memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l); /* then get the rest (if any) from the beginning of the buffer */ memcpy(buffer + l, fifo->buffer, len - l); /* * Ensure that we remove the bytes from the kfifo -before- * we update the fifo->out index. */ smp_mb(); fifo->out += len; return len;}
肃穆读两遍吧,我也读了屡次,每次老是有新发现,因为in, out和size的计算太私密了,居然能讹诈上unsigned int回绕的秉性。
原来,kfifo每次入队或出队,kfifo->in或kfifo->out仅仅绵薄地kfifo->in/kfifo->out += len,并莫得对kfifo->size 进行取模运算。因此kfifo->in和kfifo->out老是一直增大,直到unsigned in最大值时,又会绕回到0这一肇始端。但永恒得志:
kfifo->in - kfifo->out <= kfifo->size即使kfifo->in回绕到了0的那一端,这个性质仍然是保握的。
对于给定的kfifo:
数据空间长度为:kfifo->in - kfifo->out而剩余空间(可写入空间)长度为:kfifo->size - (kfifo->in - kfifo->out)
尽管kfifo->in和kfofo->out一直逾越kfifo->size进行增长,但它对应在kfifo->buffer空间的下标却是如下:
kfifo->in % kfifo->size (i.e. kfifo->in & (kfifo->size - 1))kfifo->out % kfifo->size (i.e. kfifo->out & (kfifo->size - 1))
往kfifo内部写一块数据时,数据空间、写入空间和kfifo->size的计算若是得志:
kfifo->in % size + len > size
那就要作念写拆分了,见下图:
kfifo_put(写)空间开首地址 | \_/ |XXXXXXXXXXXXXXXXXX| +--------------------------------------------------------------+| |<----------data---------->| |+--------------------------------------------------------------+ ^ ^ ^ | | | out%size in%size size ^ | 写空间收尾地址
第一块天然是: [kfifo->in % kfifo->size, kfifo->size]第二块天然是:[0, len - (kfifo->size - kfifo->in % kfifo->size)]
底下是代码,细细体味吧:
/* first put the data starting from fifo->in to buffer end */ l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l); /* then put the rest (if any) at the beginning of the buffer */ memcpy(fifo->buffer, buffer + l, len - l);
对于kfifo_get过程,亦然肖似的,请诸位自行分析。
(4)kfifo_get和kfifo_put无锁并发操作
缱绻机科学家仍是证明,当唯唯独个读经程和一个写线程并发操作时,不需要任何异常的锁,就不错确保是线程安全的,也即kfifo使用了无锁编程技巧,以提高kernel的并发。
kfifo使用in和out两个指针来描写写入和读取游标,对于写入操作,只更新in指针,而读取操作,只更新out指针,可谓是曲分明,线路图如下:
|<--写入-->|+--------------------------------------------------------------+| |<----------data----->| |+--------------------------------------------------------------+ |<--读取-->| ^ ^ ^ | | | out in size
为了幸免读者看到写者瞻望写入,但本色莫得写入数据的空间,写者必须保证以下的写入规章:
往[kfifo->in, kfifo->in + len]空间写入数据更新kfifo->in指针为 kfifo->in + len
在操作1完成时,读者是还莫得看到写入的信息的,因为kfifo->in莫得变化,觉得读者还莫得开首写操作,唯独更新kfifo->in之后,读者智商看到。
那么怎样保证1必须在2之前完成,秘密等于使用内存障蔽:smp_mb(),smp_rmb(), smp_wmb(),来保证对方不雅察到的内存操作规章。
5.2kfifo内核部队终了kfifo数据结构界说如下:
struct kfifo{ unsigned char *buffer; unsigned int size; unsigned int in; unsigned int out; spinlock_t *lock;}; // 创建部队struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size, gfp_t gfp_mask, spinlock_t *lock){ struct kfifo *fifo; // 判断是否为2的幂 BUG_ON(!is_power_of_2(size)); fifo = kmalloc(sizeof(struct kfifo), gfp_mask); if (!fifo) return ERR_PTR(-ENOMEM); fifo->buffer = buffer; fifo->size = size; fifo->in = fifo->out = 0; fifo->lock = lock; return fifo;} // 分派空间struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock){ unsigned char *buffer; struct kfifo *ret; // 判断是否为2的幂 if (!is_power_of_2(size)) { BUG_ON(size > 0x80000000); // 进取膨胀成2的幂 size = roundup_pow_of_two(size); } buffer = kmalloc(size, gfp_mask); if (!buffer) return ERR_PTR(-ENOMEM); ret = kfifo_init(buffer, size, gfp_mask, lock); if (IS_ERR(ret)) kfree(buffer); return ret;} void kfifo_free(struct kfifo *fifo){ kfree(fifo->buffer); kfree(fifo);} // 入队操作static inline unsigned int kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len){ unsigned long flags; unsigned int ret; spin_lock_irqsave(fifo->lock, flags); ret = __kfifo_put(fifo, buffer, len); spin_unlock_irqrestore(fifo->lock, flags); return ret;} // 出队操作static inline unsigned int kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len){ unsigned long flags; unsigned int ret; spin_lock_irqsave(fifo->lock, flags); ret = __kfifo_get(fifo, buffer, len); //当fifo->in == fifo->out时,buufer为空 if (fifo->in == fifo->out) fifo->in = fifo->out = 0; spin_unlock_irqrestore(fifo->lock, flags); return ret;} // 入队操作unsigned int __kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len){ unsigned int l; //buffer中空的长度 len = min(len, fifo->size - fifo->in + fifo->out); // 内存障蔽:smp_mb(),smp_rmb(), smp_wmb()来保证对方不雅察到的内存操作规章 smp_mb(); // 将数据追加到部队尾部 l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l); memcpy(fifo->buffer, buffer + l, len - l); smp_wmb(); //每次累加,到达最大值后溢出,自动转为0 fifo->in += len; return len;} // 出队操作unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len){ unsigned int l; //罕有据的缓冲区的长度 len = min(len, fifo->in - fifo->out); smp_rmb(); l = min(len, fifo->size - (fifo->out & (fifo->size - 1))); memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l); memcpy(buffer + l, fifo->buffer, len - l); smp_mb(); fifo->out += len; //每次累加,到达最大值后溢出,自动转为0 return len;} static inline void __kfifo_reset(struct kfifo *fifo){ fifo->in = fifo->out = 0;} static inline void kfifo_reset(struct kfifo *fifo){ unsigned long flags; spin_lock_irqsave(fifo->lock, flags); __kfifo_reset(fifo); spin_unlock_irqrestore(fifo->lock, flags);} static inline unsigned int __kfifo_len(struct kfifo *fifo){ return fifo->in - fifo->out;} static inline unsigned int kfifo_len(struct kfifo *fifo){ unsigned long flags; unsigned int ret; spin_lock_irqsave(fifo->lock, flags); ret = __kfifo_len(fifo); spin_unlock_irqrestore(fifo->lock, flags); return ret;}5.3kfifo瞎想重点
(1)保证buffer size为2的幂
kfifo->size值在调用者传递参数size的基础上向2的幂膨胀,方向是使kfifo->size取模运算不错更正为位与运算(提高运行成果)。kfifo->in % kfifo->size更正为 kfifo->in & (kfifo->size – 1)
保证size是2的幂不错通过位运算的形貌求余,在频繁操作部队的情况下不错大大提高成果。
(2)使用spin_lock_irqsave与spin_unlock_irqrestore 终了同步。
Linux内核中有spin_lock、spin_lock_irq和spin_lock_irqsave保证同步。
static inline void __raw_spin_lock(raw_spinlock_t *lock){ preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);} static inline void __raw_spin_lock_irq(raw_spinlock_t *lock){ local_irq_disable(); preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);}
spin_lock比spin_lock_irq速率快,但并不是线程安全的。spin_lock_irq增多调用local_irq_disable函数,即回绝腹地中断,是线程安全的,既回绝腹地中断,又回绝内核霸占。
spin_lock_irqsave是基于spin_lock_irq终了的一个缓助接口,在插足和离开临界区后,不会改变中断的开启、关闭景况。
若是自旋锁在中断处理函数中被用到,在赢得自旋锁前需要关闭腹地中断,spin_lock_irqsave终了如下:
A、保存腹地中断景况;
B、关闭腹地中断;
C、赢得自旋锁。
解锁时通过 spin_unlock_irqrestore完成开释锁、收复腹地中断到原来景况等责任。
(3)线性代码结构
代码中莫得任何if-else分支来判断是否有实足的空间存放数据,kfifo每次入队或出队仅仅绵薄的 +len 判断剩余空间,并莫得对kfifo->size 进行取模运算,是以kfifo->in和kfifo->out老是一直增大,直到unsigned in逾越最大值时绕回到0这一肇始端,但永恒得志:kfifo->in - kfifo->out <= kfifo->size。
(4)使用Memory Barrier
mb():适用于多处理器和单处理器的内存障蔽。
rmb():适用于多处理器和单处理器的读内存障蔽。
wmb():适用于多处理器和单处理器的写内存障蔽。
smp_mb():适用于多处理器的内存障蔽。
smp_rmb():适用于多处理器的读内存障蔽。
smp_wmb():适用于多处理器的写内存障蔽。
Memory Barrier使用场景如下:
A、终了同步原语(synchronization primitives)
B、终了无锁数据结构(lock-free data structures)
C、驱动重要
重要在运行时内存本色阅览规章和重要代码编写的阅览规章不一定一致,即内存乱序阅览。内存乱序阅览行径出现是为了擢升重要运行时的性能。内存乱序阅览主要发生在两个阶段:
A、编译时,编译器优化导致内存乱序阅览(领导重排)。
B、运行时,多CPU间交互引起内存乱序阅览。
Memory Barrier省略让CPU或编译器在内存阅览上有序。Memory barrier前的内存阅览操作必定先于自后的完成。Memory Barrier包括两类:
A、编译器Memory Barrier。
B、CPU Memory Barrier。
闲居,编译器和CPU引起内存乱序阅览不会带来问题,但若是重要逻辑的正确性依赖于内存阅览规章,内存乱序阅览会带来逻辑上的无理。
在编译时,编译器对代码作念出优化时可能改变本色推论领导的规章(如GCC的O2或O3都会改变本色推论领导的规章)。
在运行时,CPU天然会乱序推论领导四色播播,但在单个CPU上,硬件省略保证重要推论时系数的内存阅览操作都是按重要代码编写的规章推论的,Memory Barrier莫得必要使用(不研讨编译器优化)。为了更快推论领导,CPU遴选活水线的推论形貌,编译器在编译代码时为了使领导更得当CPU的活水线推论形貌以及多CPU推论,原来领导就会出现乱序的情况。在乱序推论时,CPU真确推论领导的规章由可用的输入数据决定,而非重要员编写的规章。
本站仅提供存储就业,系数内容均由用户发布,如发现存害或侵权内容,请点击举报。