Linux | c&cpp | Email | github | QQ群:425043908 关注本站

itarticle.cc

您现在的位置是:网站首页 -> 代码相关 文章内容

内存屏障(Memory barrier)-itarticl.cc-IT技术类文章记录&分享

发布时间: 8年前代码相关 125人已围观返回

本文例子均在 Linux(g++)下验证通过,CPU 为 X86-64 处理器架构。所有罗列的 Linux 内核代码也均在(或只在)X86-64 下有效。

本文首先通过范例(以及内核代码)来解释 Memory barrier,然后介绍一个利用 Memory barrier 实现的无锁环形缓冲区。


Memory barrier 简介

程序在运行时内存实际的访问顺序和程序代码编写的访问顺序不一定一致,这就是内存乱序访问。内存乱序访问行为出现的理由是为了提升程序运行时的性能。内存乱序访问主要发生在两个阶段:

编译时,编译器优化导致内存乱序访问(指令重排)

运行时,多 CPU 间交互引起内存乱序访问

Memory barrier 能够让 CPU 或编译器在内存访问上有序。一个 Memory barrier 之前的内存访问操作必定先于其之后的完成。Memory barrier 包括两类:

编译器 barrier

CPU Memory barrier

很多时候,编译器和 CPU 引起内存乱序访问不会带来什么问题,但一些特殊情况下,程序逻辑的正确性依赖于内存访问顺序,这时候内存乱序访问会带来逻辑上的错误,例如:

// thread 1

while (!ok);

do(x);


// thread 2

x = 42;

ok = 1;

此段代码中,ok 初始化为 0,线程 1 等待 ok 被设置为 1 后执行 do 函数。假如说,线程 2 对内存的写操作乱序执行,也就是 x 赋值后于 ok 赋值完成,那么 do 函数接受的实参就很可能出乎程序员的意料,不为 42。

编译时内存乱序访问

在编译时,编译器对代码做出优化时可能改变实际执行指令的顺序(例如 gcc 下 O2 或 O3 都会改变实际执行指令的顺序):

// test.cpp

int x, y, r;

void f()

{

x = r;

y = 1;

}

编译器优化的结果可能导致 y = 1 在 x = r 之前执行完成。首先直接编译此源文件:

g++ -S test.cpp

得到相关的汇编代码如下:

movl r(%rip), %eax

movl %eax, x(%rip)

movl $1, y(%rip)

这里我们看到,x = r 和 y = 1 并没有乱序。现使用优化选项 O2(或 O3)编译上面的代码(g++ -O2 -S test.cpp),生成汇编代码如下:

movl r(%rip), %eax

movl $1, y(%rip)

movl %eax, x(%rip)

我们可以清楚的看到经过编译器优化之后 movl $1, y(%rip) 先于 movl %eax, x(%rip) 执行。避免编译时内存乱序访问的办法就是使用编译器 barrier(又叫优化 barrier)。Linux 内核提供函数 barrier() 用于让编译器保证其之前的内存访问先于其之后的完成。内核实现 barrier() 如下(X86-64 架构):

#define barrier() __asm__ __volatile__("" ::: "memory")

现在把此编译器 barrier 加入代码中:

int x, y, r;

void f()

{

x = r;

__asm__ __volatile__("" ::: "memory");

y = 1;

}

这样就避免了编译器优化带来的内存乱序访问的问题了(如果有兴趣可以再看看编译之后的汇编代码)。本例中,我们还可以使用 volatile 这个关键字来避免编译时内存乱序访问(而无法避免后面要说的运行时内存乱序访问)。volatile 关键字能够让相关的变量之间在内存访问上避免乱序,这里可以修改 x 和 y 的定义来解决问题:

volatile int x, y;

int r;

void f()

{

x = r;

y = 1;

}

现加上了 volatile 关键字,这使得 x 相对于 y、y 相对于 x 在内存访问上有序。在 Linux 内核中,提供了一个宏 ACCESS_ONCE 来避免编译器对于连续的 ACCESS_ONCE 实例进行指令重排。其实 ACCESS_ONCE 实现源码如下:

#define ACCESS_ONCE(x) (*(volatile typeof(x) *)&(x))

此代码只是将变量 x 转换为 volatile 的而已。现在我们就有了第三个修改方案:

int x, y, r;

void f()

{

ACCESS_ONCE(x) = r;

ACCESS_ONCE(y) = 1;

}

到此基本上就阐述完了我们的编译时内存乱序访问的问题。下面开始介绍运行时内存乱序访问。

运行时内存乱序访问

在运行时,CPU 虽然会乱序执行指令,但是在单个 CPU 的上,硬件能够保证程序执行时所有的内存访问操作看起来像是按程序代码编写的顺序执行的,这时候 Memory barrier 没有必要使用(不考虑编译器优化的情况下)。这里我们了解一下 CPU 乱序执行的行为。在乱序执行时,一个处理器真正执行指令的顺序由可用的输入数据决定,而非程序员编写的顺序。

早期的处理器为有序处理器(In-order processors),有序处理器处理指令通常有以下几步:

指令获取

如果指令的输入操作对象(input operands)可用(例如已经在寄存器中了),则将此指令分发到适当的功能单元中。如果一个或者多个操作对象不可用(通常是由于需要从内存中获取),则处理器会等待直到它们可用

指令被适当的功能单元执行

功能单元将结果写回寄存器堆(Register file,一个 CPU 中的一组寄存器)

相比之下,乱序处理器(Out-of-order processors)处理指令通常有以下几步:

指令获取

指令被分发到指令队列

指令在指令队列中等待,直到输入操作对象可用(一旦输入操作对象可用,指令就可以离开队列,即便更早的指令未被执行)

指令被分配到适当的功能单元并执行

执行结果被放入队列(而不立即写入寄存器堆)

只有所有更早请求执行的指令的执行结果被写入寄存器堆后,指令执行的结果才被写入寄存器堆(执行结果重排序,让执行看起来是有序的)

从上面的执行过程可以看出,乱序执行相比有序执行能够避免等待不可用的操作对象(有序执行的第二步)从而提高了效率。现代的机器上,处理器运行的速度比内存快很多,有序处理器花在等待可用数据的时间里已经可以处理大量指令了。

现在思考一下乱序处理器处理指令的过程,我们能得到几个结论:

对于单个 CPU 指令获取是有序的(通过队列实现)

对于单个 CPU 指令执行结果也是有序返回寄存器堆的(通过队列实现)

由此可知,在单 CPU 上,不考虑编译器优化导致乱序的前提下,多线程执行不存在内存乱序访问的问题。我们从内核源码也可以得到类似的结论(代码不完全的摘录):

#ifdef CONFIG_SMP

#define smp_mb() mb()

#else

#define smp_mb() barrier()

#endif

这里可以看到,如果是 SMP 则使用 mb,mb 被定义为 CPU Memory barrier(后面会讲到),而非 SMP 时,直接使用编译器 barrier。

在多 CPU 的机器上,问题又不一样了。每个 CPU 都存在 cache(cache 主要是为了弥补 CPU 和内存之间较慢的访问速度),当一个特定数据第一次被特定一个 CPU 获取时,此数据显然不在 CPU 的 cache 中(这就是 cache miss)。此 cache miss 意味着 CPU 需要从内存中获取数据(这个过程需要 CPU 等待数百个周期),此数据将被加载到 CPU 的 cache 中,这样后续就能直接从 cache 上快速访问。当某个 CPU 进行写操作时,它必须确保其他的 CPU 已经将此数据从它们的 cache 中移除(以便保证一致性),只有在移除操作完成后此 CPU 才能安全的修改数据。显然,存在多个 cache 时,我们必须通过一个 cache 一致性协议来避免数据不一致的问题,而这个通讯的过程就可能导致乱序访问的出现,也就是这里说的运行时内存乱序访问。这里不再深入讨论整个细节,这是一个比较复杂的问题,有兴趣可以研究http://www.rdrop.com/users/paulmck/scalability/paper/whymb.2010.06.07c.pdf 一文,其详细的分析了整个过程。

现在通过一个例子来说明多 CPU 下内存乱序访问:(这个举例有问题,由于线程的调度问题,并不能很好的说明内存屏障的)

// test2.cpp

#include <pthread.h>

#include <assert.h>


// -------------------

int cpu_thread1 = 0;

int cpu_thread2 = 1;


volatile int x, y, r1, r2;


void start()

{

x = y = r1 = r2 = 0;

}


void end()

{

assert(!(r1 == 0 && r2 == 0));

}


void run1()

{

x = 1;

r1 = y;

}


void run2()

{

y = 1;

r2 = x;

}


// -------------------

static pthread_barrier_t barrier_start;

static pthread_barrier_t barrier_end;


static void* thread1(void*)

{

while (1) {

pthread_barrier_wait(&barrier_start);

run1();

pthread_barrier_wait(&barrier_end);

}


return NULL;

}


static void* thread2(void*)

{

while (1) {

pthread_barrier_wait(&barrier_start);

run2();

pthread_barrier_wait(&barrier_end);

}


return NULL;

}


int main()

{

assert(pthread_barrier_init(&barrier_start, NULL, 3) == 0);

assert(pthread_barrier_init(&barrier_end, NULL, 3) == 0);


pthread_t t1;

pthread_t t2;

assert(pthread_create(&t1, NULL, thread1, NULL) == 0);

assert(pthread_create(&t2, NULL, thread2, NULL) == 0);


cpu_set_t cs;

CPU_ZERO(&cs);

CPU_SET(cpu_thread1, &cs);

assert(pthread_setaffinity_np(t1, sizeof(cs), &cs) == 0);

CPU_ZERO(&cs);

CPU_SET(cpu_thread2, &cs);

assert(pthread_setaffinity_np(t2, sizeof(cs), &cs) == 0);


while (1) {

start();

pthread_barrier_wait(&barrier_start);

pthread_barrier_wait(&barrier_end);

end();

}


return 0;

}

这里创建了两个线程来运行测试代码(需要测试的代码将放置在 run 函数中)。我使用了 pthread barrier(区别于本文讨论的 Memory barrier)主要为了让两个子线程能够同时运行它们的 run 函数。此段代码不停的尝试同时运行两个线程的 run 函数,以便得出我们期望的结果。在每次运行 run 函数前会调用一次 start 函数(进行数据初始化),run 运行后会调用一次 end 函数(进行结果检查)。run1 和 run2 两个函数运行在哪个 CPU 上则通过 cpu_thread1 和 cpu_thread2 两个变量控制。

先编译此程序:g++ -lpthread -o test2 test2.cpp(这里未优化,目的是为了避免编译器优化的干扰)。需要注意的是,两个线程运行在两个不同的 CPU 上(CPU 0 和 CPU 1)。只要内存不出现乱序访问,那么 r1 和 r2 不可能同时为 0,因此断言失败表示存在内存乱序访问。编译之后运行此程序,会发现存在一定概率导致断言失败。为了进一步说明问题,我们把 cpu_thread2 的值改为 0,换而言之就是让两个线程跑在同一个 CPU 下,再运行程序发现断言不再失败。

最后,我们使用 CPU Memory barrier 来解决内存乱序访问的问题(X86-64 架构下):

int cpu_thread1 = 0;

int cpu_thread2 = 1;


void run1()

{

x = 1;

__asm__ __volatile__("mfence" ::: "memory");

r1 = y;

}


void run2()

{

y = 1;

__asm__ __volatile__("mfence" ::: "memory");

r2 = x;

}


准备使用 Memory barrier

Memory barrier 常用场合包括:

实现同步原语(synchronization primitives)

实现无锁数据结构(lock-free data structures)

驱动程序

实际的应用程序开发中,开发者可能完全不知道 Memory barrier 就可以开发正确的多线程程序,这主要是因为各种同步机制中已经隐含了 Memory barrier(但和实际的 Memory barrier 有细微差别),这就使得不直接使用 Memory barrier 不会存在任何问题。但是如果你希望编写诸如无锁数据结构,那么 Memory barrier 还是很有用的。

通常来说,在单个 CPU 上,存在依赖的内存访问有序:

Q = P;

D = *Q;

这里内存操作有序。然而在 Alpha CPU 上,存在依赖的内存读取操作不一定有序,需要使用数据依赖 barrier(由于 Alpha 不常见,这里就不详细解释了)。

在 Linux 内核中,除了前面说到的编译器 barrier — barrier() 和 ACCESS_ONCE(),还有 CPU Memory barrier:

通用 barrier,保证读写操作有序的,mb() 和 smp_mb()

写操作 barrier,仅保证写操作有序的,wmb() 和 smp_wmb()

读操作 barrier,仅保证读操作有序的,rmb() 和 smp_rmb()

注意,所有的 CPU Memory barrier(除了数据依赖 barrier 之外)都隐含了编译器 barrier。这里的 smp 开头的 Memory barrier 会根据配置在单处理器上直接使用编译器 barrier,而在 SMP 上才使用 CPU Memory barrier(也就是 mb()、wmb()、rmb(),回忆上面相关内核代码)。

最后需要注意一点的是,CPU Memory barrier 中某些类型的 Memory barrier 需要成对使用,否则会出错,详细来说就是:一个写操作 barrier 需要和读操作(或数据依赖)barrier 一起使用(当然,通用 barrier 也是可以的),反之依然。


Memory barrier 的范例

读内核代码进一步学习 Memory barrier 的使用。

Linux 内核实现的无锁(只有一个读线程和一个写线程时)环形缓冲区 kfifo 就使用到了 Memory barrier,实现源码如下:

/*

* A simple kernel FIFO implementation.

*

* Copyright (C) 2004 Stelian Pop <stelian@popies.net>

*

* This program is free software; you can redistribute it and/or modify

* it under the terms of the GNU General Public License as published by

* the Free Software Foundation; either version 2 of the License, or

* (at your option) any later version.

*

* This program is distributed in the hope that it will be useful,

* but WITHOUT ANY WARRANTY; without even the implied warranty of

* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the

* GNU General Public License for more details.

*

* You should have received a copy of the GNU General Public License

* along with this program; if not, write to the Free Software

* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.

*

*/


#include <linux/kernel.h>

#include <linux/module.h>

#include <linux/slab.h>

#include <linux/err.h>

#include <linux/kfifo.h>

#include <linux/log2.h>


/**

* kfifo_init - allocates a new FIFO using a preallocated buffer

* @buffer: the preallocated buffer to be used.

* @size: the size of the internal buffer, this have to be a power of 2.

* @gfp_mask: get_free_pages mask, passed to kmalloc()

* @lock: the lock to be used to protect the fifo buffer

*

* Do NOT pass the kfifo to kfifo_free() after use! Simply free the

* &struct kfifo with kfree().

*/

struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size,

gfp_t gfp_mask, spinlock_t *lock)

{

struct kfifo *fifo;


/* size must be a power of 2 */

BUG_ON(!is_power_of_2(size));


fifo = kmalloc(sizeof(struct kfifo), gfp_mask);

if (!fifo)

return ERR_PTR(-ENOMEM);


fifo->buffer = buffer;

fifo->size = size;

fifo->in = fifo->out = 0;

fifo->lock = lock;


return fifo;

}

EXPORT_SYMBOL(kfifo_init);


/**

* kfifo_alloc - allocates a new FIFO and its internal buffer

* @size: the size of the internal buffer to be allocated.

* @gfp_mask: get_free_pages mask, passed to kmalloc()

* @lock: the lock to be used to protect the fifo buffer

*

* The size will be rounded-up to a power of 2.

*/

struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)

{

unsigned char *buffer;

struct kfifo *ret;


/*

* round up to the next power of 2, since our 'let the indices

* wrap' technique works only in this case.

*/

if (!is_power_of_2(size)) {

BUG_ON(size > 0x80000000);

size = roundup_pow_of_two(size);

}


buffer = kmalloc(size, gfp_mask);

if (!buffer)

return ERR_PTR(-ENOMEM);


ret = kfifo_init(buffer, size, gfp_mask, lock);


if (IS_ERR(ret))

kfree(buffer);


return ret;

}

EXPORT_SYMBOL(kfifo_alloc);


/**

* kfifo_free - frees the FIFO

* @fifo: the fifo to be freed.

*/

void kfifo_free(struct kfifo *fifo)

{

kfree(fifo->buffer);

kfree(fifo);

}

EXPORT_SYMBOL(kfifo_free);


/**

* __kfifo_put - puts some data into the FIFO, no locking version

* @fifo: the fifo to be used.

* @buffer: the data to be added.

* @len: the length of the data to be added.

*

* This function copies at most @len bytes from the @buffer into

* the FIFO depending on the free space, and returns the number of

* bytes copied.

*

* Note that with only one concurrent reader and one concurrent

* writer, you don't need extra locking to use these functions.

*/

unsigned int __kfifo_put(struct kfifo *fifo,

const unsigned char *buffer, unsigned int len)

{

unsigned int l;


len = min(len, fifo->size - fifo->in + fifo->out);


/*

* Ensure that we sample the fifo->out index -before- we

* start putting bytes into the kfifo.

*/


smp_mb();


/* first put the data starting from fifo->in to buffer end */

l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));

memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);


/* then put the rest (if any) at the beginning of the buffer */

memcpy(fifo->buffer, buffer + l, len - l);


/*

* Ensure that we add the bytes to the kfifo -before-

* we update the fifo->in index.

*/


smp_wmb();


fifo->in += len;


return len;

}

EXPORT_SYMBOL(__kfifo_put);


/**

* __kfifo_get - gets some data from the FIFO, no locking version

* @fifo: the fifo to be used.

* @buffer: where the data must be copied.

* @len: the size of the destination buffer.

*

* This function copies at most @len bytes from the FIFO into the

* @buffer and returns the number of copied bytes.

*

* Note that with only one concurrent reader and one concurrent

* writer, you don't need extra locking to use these functions.

*/

unsigned int __kfifo_get(struct kfifo *fifo,

unsigned char *buffer, unsigned int len)

{

unsigned int l;


len = min(len, fifo->in - fifo->out);


/*

* Ensure that we sample the fifo->in index -before- we

* start removing bytes from the kfifo.

*/


smp_rmb();


/* first get the data from fifo->out until the end of the buffer */

l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));

memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);


/* then get the rest (if any) from the beginning of the buffer */

memcpy(buffer + l, fifo->buffer, len - l);


/*

* Ensure that we remove the bytes from the kfifo -before-

* we update the fifo->out index.

*/


smp_mb();


fifo->out += len;


return len;

}

EXPORT_SYMBOL(__kfifo_get);

为了更好的理解上面的源码,这里顺带说一下此实现使用到的一些和本文主题无关的技巧:

使用与操作来求取环形缓冲区的下标,相比取余操作来求取下标的做法效率要高不少。使用与操作求取下标的前提是环形缓冲区的大小必须是 2 的 N 次方,换而言之就是说环形缓冲区的大小为一个仅有一个 1 的二进制数,那么 index & (size – 1) 则为求取的下标(这不难理解)

使用了 in 和 out 两个索引且 in 和 out 是一直递增的(此做法比较巧妙),这样能够避免一些复杂的条件判断(某些实现下,in == out 时还无法区分缓冲区是空还是满)

这里,索引 in 和 out 被两个线程访问。in 和 out 指明了缓冲区中实际数据的边界,也就是 in 和 out 同缓冲区数据存在访问上的顺序关系,由于未使用同步机制,那么保证顺序关系就需要使用到 Memory barrier 了。索引 in 和 out 都分别只被一个线程修改,而被两个线程读取。__kfifo_put 先通过 in 和 out 来确定可以向缓冲区中写入数据量的多少,这时,out 索引应该先被读取后才能真正的将用户 buffer 中的数据写入缓冲区,因此这里使用到了 smp_mb(),对应的,__kfifo_get 也使用 smp_mb() 来确保修改 out 索引之前缓冲区中数据已经被成功读取并写入用户 buffer 中了。对于 in 索引,在 __kfifo_put 中,通过 smp_wmb() 保证先向缓冲区写入数据后才修改 in 索引,由于这里只需要保证写入操作有序,故选用写操作 barrier,在 __kfifo_get 中,通过 smp_rmb() 保证先读取了 in 索引(这时候 in 索引用于确定缓冲区中实际存在多少可读数据)才开始读取缓冲区中数据(并写入用户 buffer 中),由于这里只需要保证读取操作有序,故选用读操作 barrier。


参考文献

http://en.wikipedia.org/wiki/Memory_barrier

http://en.wikipedia.org/wiki/Memory_ordering

http://en.wikipedia.org/wiki/Out-of-order_execution

https://www.kernel.org/doc/Documentation/memory-barriers.txt


一方面,CPU由于采用指令流水线和超流水线技术,可能导致CPU虽然顺序取指令、但有可能会出现“乱序”执行的情况,当然,对于” a++;b = f(a);c = f”等存在依赖关系的指令,CPU则会在“b= f(a)”执行阶段之前被阻塞;另一方面,编译器也有可能将依赖关系很近“人为地”拉开距离以防止阻塞情况的发生,从而导致编译器乱序,如“a++ ;c = f;b = f(a)”。

一个CPU对指令顺序提供如下保证:

(1) On any given CPU, dependent memory accesses will be issued in order, with respect to itself.如Q = P; D = *Q;将保证其顺序执行

(2) Overlapping loads and stores within a particular CPU will appear to be ordered within that CPU.重叠的Load和Store操作将保证顺序执行(目标地址相同的Load、Store),如:a = *X; *X = b;

(3) It _must_not_ be assumed that independent loads and stores will be issued in the order given.

(4) It _must_ be assumed that overlapping memory accesses may be merged or discarded.如*A = X; Y = *A; => STORE *A = X; Y = LOAD *A; / or STORE *A = Y = X;


由此可见,无关的内存操作会被按随机顺序有效的得到执行,但是在CPU与CPU交互时或CPU与IO设备交互时, 这可能会成为问题. 我们需要一些手段来干预编译器和CPU, 使其限制指令顺序。内存屏障就是这样的干预手段. 他们能保证处于内存屏障两边的内存操作满足部分有序.(译注: 这里"部分有序"的意思是, 内存屏障之前的操作都会先于屏障之后的操作, 但是如果几个操作出现在屏障的同一边, 则不保证它们的顺序.)

(1) 写(STORE)内存屏障。在写屏障之前的STORE操作将先于所有在写屏障之后的STORE操作。

(2) 数据依赖屏障。两条Load指令,第二条Load指令依赖于第一条Load指令的结果,则数据依赖屏障保障第二条指令的目标地址将被更新。

(3) 读(LOAD)内存屏障。读屏障包含数据依赖屏障的功能, 并且保证所有出现在屏障之前的LOAD操作都将先于所有出现在屏障之后的LOAD操作被系统中的其他组件所感知.

(4) 通用内存屏障. 通用内存屏障保证所有出现在屏障之前的LOAD和STORE操作都将先于所有出现在屏障之后的LOAD和STORE操作被系统中的其他组件所感知.

(5) LOCK操作.它的作用相当于一个单向渗透屏障.它保证所有出现在LOCK之后的内存操作都将在LOCK操作被系统中的其他组件所感知之后才能发生. 出现在LOCK之前的内存操作可能在LOCK完成之后才发生.LOCK操作总是跟UNLOCK操作配对出现.

(6) UNLOCK操作。它保证所有出现在UNLOCK之前的内存操作都将在UNLOCK操作被系统中的其他组件所感知之前发生.


LINUX对于x86而言,在为UP(单核)体系统架构下,调用barrier()进行通用内存屏障。在SMP体系架构下,若为64位CPU或支持mfence、lfence、sfence指令的32位CPU,则smp_mb()、smp_rmb()、smp_smb()对应通用内存屏障、写屏障和读屏障;而不支持mfence、lfence、sfence指令的32位CPU则smp_mb()、smp_rmb()、smp_smb()对应LOCK操作。源码请参见《内存屏障源码分析》一节。


内存屏障源码分析

/include/asm-generic/system.h:

053 #ifdef CONFIG_SMP

054 #define smp_mb() mb()

055 #define smp_rmb() rmb()

056 #define smp_wmb() wmb()

057 #else

058 #define smp_mb() barrier()

059 #define smp_rmb() barrier()

060 #define smp_wmb() barrier()

061 #endif

在x86 UP体系架构中,smp_mb、smp_rmb、smp_wmb被翻译成barrier:

012 #define barrier() __asm__ __volatile__("": : :"memory")

__volatile告诉编译器此条语句不进行任何优化,"": : :"memory" 内存单元已被修改、需要重新读入。


在x86 SMP(多核)体系架构中,smp_mb、smp_rmb、smp_wmb如下定义:

/arch/x86/include/asm/system.h:

352 /*

353 * Force strict CPU ordering.

354 * And yes, this is required on UP too when we're talking

355 * to devices.

356 */

357 #ifdef CONFIG_X86_32

358 /*

359 * Some non-Intel clones support out of order store. wmb() ceases to be a

360 * nop for these.

361 */

362 #define mb() alternative("lock; addl $0,0(%%esp)", "mfence", X86_FEATURE_XMM2)

363 #define rmb() alternative("lock; addl $0,0(%%esp)", "lfence", X86_FEATURE_XMM2)

364 #define wmb() alternative("lock; addl $0,0(%%esp)", "sfence", X86_FEATURE_XMM)

365 #else

366 #define mb() asm volatile("mfence":::"memory")

367 #define rmb() asm volatile("lfence":::"memory")

368 #define wmb() asm volatile("sfence" ::: "memory")

369 #endif

362~364行针对x86的32位CPU,366~368行针对x86的64位CPU。


在x86的64位CPU中,mb()宏实际为:

asm volatile("sfence" ::: "memory")。

volatile告诉编译器严禁在此处汇编语句与其它语句重组优化,memory强制编译器假设RAM所有内存单元均被汇编指令修改,"sfence" ::: 表示在此插入一条串行化汇编指令sfence。

mfence:串行化发生在mfence指令之前的读写操作

lfence:串行化发生在mfence指令之前的读操作、但不影响写操作

sfence:串行化发生在mfence指令之前的写操作、但不影响读操作


在x86的32位CPU中,mb()宏实际为:

mb() alternative("lock; addl $0,0(%%esp)", "mfence", X86_FEATURE_XMM2)

由于x86的32位CPU有可能不提供mfence、lfence、sfence三条汇编指令的支持,故在不支持mfence的指令中使用:"lock; addl $0,0(%%esp)", "mfence"。lock表示将“addl $0,0(%%esp)”语句作为内存屏障。

关于lock的实现:cpu上有一根pin #HLOCK连到北桥,lock前缀会在执行这条指令前先去拉这根pin,持续到这个指令结束时放开#HLOCK pin,在这期间,北桥会屏蔽掉一切外设以及AGP的内存操作。也就保证了这条指令的atomic。


参考资料

《memroy-barries.txt》,/Documentation/memory-barriers.txt

《LINUX内核内存屏障》,http://blog.csdn.net/ljl1603/article/details/6793982


当我们在做多线程编程的时候,会涉及到一个称为memory order的问题。

例如

int x=0,y=0;

x=4;

y=3;

请问,实际执行的时候,这两条赋值语句谁先执行,谁后执行? 会不会有某个时间点,在某个CPU看来,y比x大?

答案很复杂。本文的目的是从非常实践的角度来考虑这个问题。

首先,它分为两个层面。在编译器看来,x和y是两个没有关联的变量,那么编译器有权利调整这两行代码的执行顺序,只要它乐意。

其次,CPU也有权利这么做。

如果我非要严格要求顺序,那么就应该插入一个memory barrier

int x=0,y=0;

x=4;

//在此插入memory barrier指令

y=3;

下面要论述,中间那行怎么写。请耐心看下去,因为大多数人都在瞎整。

gcc的手册中有一节叫做"Built-in functions for atomic memory access",然后里面列举了这样一个函数:

__sync_synchronize (...)

This builtin issues a full memory barrier.

来,我们写段代码试下:

int main(){

__sync_synchronize();

return 0;

}

然后用gcc4.2编译,

# gcc -S -c test.c

然后看对应的汇编代码,

main:

pushq %rbp

movq %rsp, %rbp

movl $0, %eax

leave

ret

嗯?Nothing at all !!! 不信你试一试,我的编译环境是Freebsd 9.0 release, gcc (GCC) 4.2.1 20070831 patched [FreeBSD]。 好,我换个高版本的gcc编译器试一试,gcc46 (FreeBSD Ports Collection) 4.6.3 20120113 (prerelease)

main:

pushq %rbp

movq %rsp, %rbp

mfence

movl $0, %eax

popq %rbp

ret

看,多了一行,mfence。 怎么回事呢?这是gcc之前的一个BUG:http://gcc.gnu.org/bugzilla/show_bug.cgi?id=36793 。 2008年被发现,然后修复的。其实它之所以是一个BUG,关键在于gcc的开发者很迷惑,mfence在x86 CPU上到底有没有用?有嘛用? 说到这里,我们得到一个结论:gcc的__sync_synchronize()尽量别用,因为你的代码在低版本的gcc下会有BUG。大部分人用的gcc都比4.4低。从CentOS 6开始,默认的编译器才是gcc 4.4。

那么mfence到底能不能提供我们想要的结果呢? 之前intel的手册一直语焉不详,没说清楚。

最新的手册对mfence的解释是:

"Serializes all store and load operations that occurred prior to the MFENCE instruction in the

program instruction stream"

并且特别强调,这个指令影响的是data memory子系统,而不是指令执行流。

对于单个CPU来说,

"Reads cannot pass earlier MFENCE instructions"

"Writes cannot pass earlier MFENCE instructions. "

"MFENCE instructions cannot pass earlier reads or writes"

而对于多个CPU来说,

Individual processors use the same ordering principles as in a single-processor system.

Writes by a single processor are observed in the same order by all processors.

Writes from an individual processor are NOT ordered with respect to the writes from other processors.

Memory ordering obeys causality (memory ordering respects transitive visibility).

Any two stores are seen in a consistent order by processors other than those performing the stores

简单点说,对于单个CPU,即便你不用mfence,写入顺序也是保证的。

假如你在C++中,

std::string* str=new std::string();

那么不会出现str指针已经被赋值但是它指向的对象还未被初始化好的情况。

另一个有趣的问题是,gcc有一个汇编指令是用来控制内存顺序的,请看这段文档:

Accesses to non-volatile objects are not ordered with respect to volatile accesses. You cannot use a volatile object as a memory barrier to order a sequence of writes to non-volatile memory. For instance:

int *ptr = something;

volatile int vobj;

*ptr = something;

vobj = 1;

Unless *ptr and vobj can be aliased, it is not guaranteed that the write to *ptr occurs by the time the update of vobj happens. If you need this guarantee, you must use a stronger memory barrier such as:

int *ptr = something;

volatile int vobj;

*ptr = something;

asm volatile ("" : : : "memory");

vobj = 1;

经我测试,

asm volatile ("" : : : "memory");

并不生成任何汇编代码。也就是说,这个仅仅是给编译器看的。

为了进一步证实我的观点,请看如下从Intel的Threading Building Blocks函数库中摘取的代码:

#define __TBB_compiler_fence() __asm__ __volatile__("": : :"memory")

#define __TBB_control_consistency_helper() __TBB_compiler_fence()

#define __TBB_acquire_consistency_helper() __TBB_compiler_fence()

#define __TBB_release_consistency_helper() __TBB_compiler_fence()

#ifndef __TBB_full_memory_fence

#define __TBB_full_memory_fence() __asm__ __volatile__("mfence": : :"memory")

#endif

能同时起编译器和硬件内存屏障作用的是

__asm__ __volatile__("mfence": : :"memory")

注意:mfence!

另外,我们在intel cpu上用的CAS指令都是带lock前缀的。所以在使用CAS的时候完全不必考虑memory order的问题。



/usr/include/c++/4.4.7/ext/atomicity.h #define _GLIBCXX_WRITE_MEM_BARRIER __asm __volatile ("":::"memory")



关于C++多线程程序中简单类型(int/bool)的安全性

关于这个问题,很少有听到权威的解答。偶这里也只是收集各处资料,以尝试对今后写出高质量的代码做一定的保证。

通常会联想到这个问题应该跟CPU架构相关。CSDN上也有人做了实验。根据其结论,在x86上,对1字节byte/2字节word/4字节int类型的读写操作都是原子性的。(类似java中的immutable objects的概念)亦即,1个int不会有中间状态,若它原始值是0,往其写入0x12345678,则另一个线程绝对不会读到0x12000000或是0x00005678的值。

根据偶自己的实验,不但int是原子的,在64位x86上,size_t(8字节)也是原子操作(多核,volatile)。

注意:

1. 通常偶们会以为64位系统中的int是64位的,但实际上不一定,请参见ABI这个概念,简单说就是gcc用-m64参数编译,不加其它选项的话,其int为32位,long也为32位,只有long long, size_t为64位。

2. 根据编译器优化选项不同,以上结论有可能被颠覆:一个64位类型的赋值有可能被编译器编译为两个32位的指令。请在实验时看好自己的编译器(确认反汇编 ^_^)

但是,从收集的资料来看,结果是令人沮丧的。要实现在各种CPU上都通用的原子操作,bool/int都没有得到标准的支持。

写一个字节也不安全?请见下面链接:

http://stackoverflow.com/questions/15246862/single-byte-write-by-multiple-threads-in-smp-without-using-lock

主要意思如下:有的架构如Alpha等,根本就没办法实现不读原始值而写1个byte

C++标准也没有规定bool/int类型变量的原子性:

http://stackoverflow.com/questions/4936289/is-volatile-a-proper-way-to-make-a-single-byte-atomic-in-c-c

所以C++0x中做出了规定。

有没有绝对支持的平台?在SMP架构的多核CPU环境下,如果读写指令能够编译成1条指令的话,根据其标准,byte和cache line对齐的int/short应该是原子操作:

http://en.wikipedia.org/wiki/Symmetric_multiprocessing

("ram access are serialized")

http://www.blachford.info/computer/articles/bigcrunch1.html

其中的The problem with cache部份

那么,以后是不是bool/int都要加锁了?这个问题偶也没办法回答。大家视情况而定吧。


增注:

本文讨论的是C/C++中int/bool写法的原子性,并非针对特定CPU。若程序目标平台一定是Intel CPU,可参考其架构手册《64-ia-32-architectures》。

摘抄一段如下:

The Intel486 processor (and newer processors since) guarantees that the following basic memory operations will

always be carried out atomically:

• Reading or writing a byte

• Reading or writing a word aligned on a 16-bit boundary

• Reading or writing a doubleword aligned on a 32-bit boundary

The Pentium processor (and newer processors since) guarantees that the following additional memory operations

will always be carried out atomically:

• Reading or writing a quadword aligned on a 64-bit boundary

• 16-bit accesses to uncached memory locations that fit within a 32-bit data bus

The P6 family processors (and newer processors since) guarantee that the following additional memory operation

will always be carried out atomically:

• Unaligned 16-, 32-, and 64-bit accesses to cached memory that fit within a cache line

翻译一下最后一段就是说,在P6以上CPU不管有没有进行对齐,只要访问的word/dword/qword所有字节都在cache line内部,就是原子操作。

cache line对齐大小可用以下命令获得:

# cat /proc/cpuinfo

cache_alignment : 64

当然你仍然要记住你写的是C/C++代码,将来它运行的架构并非确定的,所以不能无视C/C++标准。

发布时间: 8年前代码相关125人已围观返回回到顶端

很赞哦! (1)

文章评论

  • 请先说点什么
    热门评论
    124人参与,1条评论
    内存屏障 3年前
    [/鼓掌][/鼓掌][/鼓掌][/鼓掌]

站点信息

  • 建站时间:2016-04-01
  • 文章统计:728条
  • 文章评论:82条
  • QQ群二维码:扫描二维码,互相交流