图片 5

一步一步的实现八个reentranklock,自个儿入手写把

Posted by

锁是整个Java并发包的实现基础,通过学习本系列文章,将对你理解Java并发包的本质有很大的帮助。

本文涵盖的知识点包括:

既然文章叫做《一步一步的实现一个reentranklock》,那么我们将从设计一个最简单的锁开始,一步步地深入,最后完整的展示一个reentranklock的设计过程。(未完成)

 

Lock和synchronized的对比AbstractQueuedSynchronizer的实现原理ReentrantLock的实现原理LockSupport的使用

一个实现锁的最简单的办法

我们知道cas操作是原子的,且自带更新,所以我们其实可以用cas设计一个最简单的自旋锁。方案很简单

前边几篇中,我已经把实现锁用到的技术,进行了一一讲述。这其中有原子性、内存模型、LockSupport还有CAS,掌握了这些技术,即使没有本篇,你也完全有能力自己写一把锁出来。但为了本系列的完整性,我在这里还是把最后这一篇补上。

Lock和synchronized的对比

在Lock接口出现之前,Java应用程序只能依靠synchronized关键字来实现同步锁的功能,在JDK1.5以后,增加了JUC(java.util.concurrent)的并发包且提供了Lock接口用来实现锁的功能。Lock和synchronized的对比:Ø
从层次上,一个是类、一个是关键字Ø
从使用上,lock具备更大的灵活性,可以显式地控制锁的释放和获取;
而synchronized的锁的释放是被动的,当出现异常或者同步代码块执行完以后,才会释放锁Ø
lock可以判断锁的状态,而synchronized无法做到;lock可以实现公平锁、非公平锁,
而synchronized只有非公平锁。

简单自旋锁的方案

设置一个公共的变量value,初始化值为a,然后每一个线程的加锁操作都是试图使用cas,校验当前变量值为a,如果为a将其设置为值b,b!=a。失败,就反复重试,直到成功为止。解锁的时候,使用cas校验当前值是否为b,如果是就更新为a。

 

AbstractQueuedSynchronizer

Lock之所以能实现线程安全的锁,主要的核心是AQS(AbstractQueuedSynchronizer),AbstractQueuedSynchronizer提供了一个FIFO队列,可以看做是一个用来实现锁以及其他需要同步功能的框架。这里简称该类为AQS。AQS的使用依靠继承来完成,子类通过继承自AQS并实现所需的方法来管理同步状态。例如常见的ReentrantLock,CountDownLatch。

从使用上来说,AQS的功能可以分为两种:EXCLUSIVE独占模式和SHARED共享模式。独占模式下,每次只能有一个线程持有锁,比如ReentrantLock就是以独占方式实现的互斥锁。共享模式下,允许多个线程同时获取锁,并发访问共享资源,比如ReentrantReadWriteLock中的读锁。

AQS依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。Node的主要属性如下

static final class Node { int waitStatus; //表示节点的状态,包含cancelled;condition 表示节点在等待condition也就是在condition队列中 Node prev; //前继节点 Node next; //后继节点 Node nextWaiter; //存储在condition队列中的后继节点 Thread thread; //当前线程}

AQS类底层的数据结构是使用双向链表,是队列的一种实现。包括一个head节点和一个tail节点,分别表示头结点和尾节点,其中头结点不存储Thread,仅保存next结点的引用。

图片 1

AQS提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node
expect,Nodeupdate),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。

图片 2AQS遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。图片 3

AQS中,除了本身的链表结构以外,还有一个很关键的功能,就是CAS,这个是保证在多线程并发的情况下保证线程安全的前提下去把线程加入到AQS中的方法,可以简单理解为乐观锁。

private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update);}

这个方法里面,首先用到了unsafe类(Unsafe类是在sun.misc包下,不属于Java标准。但是很多Java的基础类库,包括一些被广泛使用的高性能开发库都是基于Unsafe类开发的,比如Netty、Hadoop、Kafka等;Unsafe可认为是Java中留下的后门,提供了一些低层次操作,如直接内存访问、线程调度等)。

方案的优缺点

先说一下锁的运行流程:多个线程抢占同一把锁,只有一个线程能抢占成功,抢占成功的线程继续执行下边的逻辑,抢占失败的线程进入阻塞等待。抢占成功的线程执行完毕后,释放锁,并从等待的线程中挑一个唤醒,让它继续竞争锁。

ReentrantLock的实现原理分析

之所以叫重入锁是因为同一个线程如果已经获得了锁,那么后续该线程调用lock方法时不需要再次获取锁,也就是不会阻塞;重入锁提供了两种实现,一种是非公平的重入锁,另一种是公平的重入锁。怎么理解公平和非公平呢?如果在绝对时间上,先对锁进行获取的请求一定先被满足获得锁,那么这个锁就是公平锁,反之,就是不公平的。简单来说公平锁就是等待时间最长的线程最优先获取锁。非公平锁的实现流程时序图

图片 41、调用ReentrantLock.lock,这个是获取锁的入口,调用了sync.lock。
sync是一个实现了AQS的抽象类,这个类的主要作用是用来实现同步控制的,并且sync有两个实现,一个是NonfairSync、另一个是FailSync。2、先分析非公平锁的情况NonfairSync.lock

final void lock() { if (compareAndSetState //这是跟公平锁的主要区别,一上来就试探锁是否空闲,如果可以插队,则设置获得锁的线程为当前线程//exclusiveOwnerThread属性是AQS从父类AbstractOwnableSynchronizer中继承的属性,用来保存当前占用同步状态的线程 setExclusiveOwnerThread(Thread.currentThread; else acquire; //尝试去获取锁}

compareAndSetState通过cas算法去改变state的值,而这个states是AQS中存在一个变量,对于ReentrantLock来说,如果state=0表示无锁状态、如果state>0表示有锁状态。由于ReentrantLock是可重入锁,所以持有锁的线程可以多次加锁,经过判断加锁线程就是当前持有锁的线程时(即exclusiveOwnerThread==Thread.currentThread,每次加锁都会将state的值+1,state等于几,就代表当前持有锁的线程加了几次锁;解锁时每解一次锁就会将state减1,state减到0后,锁就被释放掉,这时其它线程可以加锁。3、AbstractQueuedSynchronizer.acquire如果CAS操作未能成功,说明state已经不为0,此时继续acquire操作,acquire是AQS中的方法
当多个线程同时进入这个方法时,首先通过cas去修改state的状态,如果修改成功表示竞争锁成功,竞争失败的,tryAcquire会返回false。

public final void acquire { if (!tryAcquire && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}

4、NonfairSync.tryAcquirenofairTryAcquire这里可以看非公平锁的含义,即获取锁并不会严格根据争用锁的先后顺序决定。

final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //获取当前的状态,前面讲过,默认情况下是0表示无锁状态 if  { if (compareAndSetState(0, acquires)) { //通过cas来改变state状态的值,如果更新成功,表示获取锁成功, 这个操作外部方法lock()就做过一次,这里再做只是为了再尝试一次,尽量以最简单的方式获取锁。 setExclusiveOwnerThread; return true; } } else if (current == getExclusiveOwnerThread {//如果当前线程等于获取锁的线程,表示重入,直接累加重入次数 int nextc = c + acquires; if (nextc < 0) // overflow 如果这个状态值越界,抛出异常;如果没有越界,则设置后返回true throw new Error("Maximum lock count exceeded"); setState; return true; }如果状态不为0,且当前线程不是owner,则返回false。 return false; //获取锁失败,返回false}

5、addWaiter当前锁如果已经被其他线程锁持有,那么当前线程来去请求锁的时候,会进入这个方法,这个方法主要是把当前线程封装成node,添加到AQS的链表中。6、acquireQueuedaddWaiter返回了插入的节点,作为acquireQueued方法的入参,这个方法主要用于争抢锁。

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for  { final Node p = node.predecessor();// 获取prev节点,若为null即刻抛出NullPointException if (p == head && tryAcquire {// 如果前驱为head才有资格进行锁的抢夺 setHead; // 获取锁成功后就不需要再进行同步操作了,获取锁成功的线程作为新的head节点//凡是head节点,head.thread与head.prev永远为null, 但是head.next不为null p.next = null; // help GC failed = false; //获取锁成功 return interrupted; }//如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程 if (shouldParkAfterFailedAcquire && parkAndCheckInterrupt// 若前面为true,则执行挂起,待下次唤醒的时候检测中断的标志 interrupted = true; } } finally { if  // 如果抛出异常则取消锁的获取,进行出队(sync queue)操作 cancelAcquire; }}

原来的head节点释放锁以后,会从队列中移除,原来head节点的next节点会成为head节点.

图片 5

优点

优点很明显,就是这个方案很快的就实现了一个简单的加锁和解锁方案。

 

公平锁和非公平锁的区别

锁的公平性是相对于获取锁的顺序而言的,如果是一个公平锁,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。
在上面分析的例子来说,只要CAS设置同步状态成功,则表示当前线程获取了锁,而公平锁则不一样,差异点有两个:FairSync.tryAcquire:非公平锁在获取锁的时候,会先通过CAS进行抢占,而公平锁则不会

final void** lock() { acquire;}

FairSync.tryAcquire:这个方法与nonfairTryAcquire(int
acquires)比较,不同的地方在于判断条件多了hasQueuedPredecessors()方法,也就是加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。

protected final boolean* tryAcquire(int acquires) { final Thread current = Thread.currentThread*(); int c = getState(); if  { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread; return true; } } else if (current == getExclusiveOwnerThread { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState; return true; } return false;}
缺点

这个锁是自旋锁,自旋本身就是一个耗费性能的操作。更为重要的是,对于解锁方没有控制,一个线程在得到value的时候,如果value已经是b了,该线程确实不能加锁,但是该线程却能强行使用cas,对该共享的变量进行解锁,这是不能接受的,因为一旦代码出现问题,会导致锁的逻辑本身失控。因为一个没有持有锁的线程,居然可以通过解锁来强行获取锁,这本身就是不对的。这个不应该指望使用者保证,而是应该锁本身来保证的。

转变成程序实现:我们首先定一个state变量,state=0表示未被加锁,state=1表示被加锁。多个线程在抢占锁时,竞争将state变量从0修改为1,修改成功的线程则加锁成功。state从0修改为1的过程,这里使用cas操作,以保证只有一个线程加锁成功,同时state需要用volatile修饰,已解决线程可见的问题。加锁成功的线程执行完业务逻辑后,将state从1修改回0,同时从等待的线程中选择一个线程唤醒。所以加锁失败的线程,在加锁失败时需要将自己放到一个集合中,以等待被唤醒。这个集合需要支持多线程并发安全,在这里我通过一个链表来实现,通过CAS操作来实现并发安全。

LockSupport

LockSupport类是JDK1.6引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:

public native void unpark(Thread jthread); public native void park(boolean isAbsolute, long time); 

LockSupport中的park() 和 unpark()
的作用分别是阻塞线程和解除阻塞线程,而且park()和unpark()不会遇到“Thread.suspend
和 Thread.resume所可能引发的死锁”问题。因为park() 和
unpark()有许可的存在。

1、
pack时:如果许可存在,那么将这个许可使用掉,并且立即返回。如果许可不存在,那么挂起当前线程。2、
unpack时:设置线程许可为可用。如果线程当前已经被pack挂起,那么这个线程将会被唤醒。如果线程当前没有被挂起,那么下次调用pack不会挂起线程。注意,unpark函数可以先于park调用。比如线程B调用unpark函数,给线程A发了一个“许可”,那么当线程A调用park时,它发现已经有“许可”了,那么它会马上再继续运行。

在使用LockSupport之前,我们对线程做同步,只能使用wait和notify,但是wait和notify其实不是很灵活,并且耦合性很高,调用notify必须要确保某个线程处于wait状态,而park/unpark模型真正解耦了线程之间的同步,先后顺序没有没有直接关联,同时线程之间不再需要一个Object或者其它变量来存储状态,不再需要关心对方的状态。

park/unpark以线程作为方法的参数,支持指定线程唤醒。wait/notify针对的是对象,通知实际上是不知道唤醒具体哪个线程的,要不随机唤醒一个线程要不唤醒所有的(notifyAll)。

保证解锁正确的方案

在简单自旋锁的方案中,已经提出了,没有获取锁的线程不应该拥有解锁的能力。解锁的能力,只能开放给拥有锁的线程,所以为了记录当前持有锁的线程,除了value的值之外,我们还需要额外的一个值,用于记录当前拥有锁的线程,在解锁的时候,必须校验,当且仅当是拥有当前锁的线程,才能解锁。
(ps:注意,对于cas的操作值a,b,这里的逻辑应该是对外层隔离的,外层只要有lock和unlock方法就好了。

 

一个可重入的锁

把思路说清楚了,咱们看下代码实现。

加锁可重入

一般来说,我们希望锁不仅是独占的,还是可重入的,也就说,已经获取当前锁的线程,还可以继续获取当前的锁。如果我们希望锁是可重入,那么最简单的思路,就是在试图获取锁的时候,判断当前持有锁的线程是否就是当前试图获取锁的线程,如果是就认为获取成功了。这样锁就可以重入了。下面是描述这个过程的伪代码。

if(System.currentThread() == lock.ownerThread()){
  //do something
}else{
  tryAcquireLock();
}

这样的设计,解决了持有锁的线程可以重复获取锁的问题。但是考虑解锁呢。假设我们现在有这么一个场景,线程thread在代码块A中,获取锁l,然后又在B中再一次获取锁l,A,B互相不知道对方的存在,A,B中都有释放锁的逻辑。这样就会导致一个问题,当B快要执行完成的时候,B释放了锁l,本来只想让B自己这部分代码可以继续进行下去,但这个时候A本应该还是锁着的,还没有到需要解锁的地步,这个时候在B中解锁,会导致A中本来是线程thread独占的代码块,被其它线程进入了,破坏了锁的原意。所以单纯判断当前锁的方案是有问题的。

 

解决办法

B解锁的时候,A不解锁,这个时候允许获取锁是危险的,而当A,B都解锁之后,才尝试获取锁呢,这个时候A,B两边都准备好了,再来的加锁请求都是安全的。为了保证这样的机制,我们需要记录锁被重入的次数,
我们知道java中有一个AtomicInteger类,该类是一个原子的integer类,可以用于保证并发情况下的原子性。假设我们现在有一个AtomicInteger的实例,我们给他命名为state,每当当前线程重入锁一次的时候,就调用AtomicInteger中的incrmentAndGet方法,每次释放锁,就调用decrementAndGet方法。这样的设计,就能记录下当前锁被重入的次数了,只有当所有的锁都被释放的时候,state才会为0,这个时候其它线程再来竞争锁的资源是安全,依循上面的设计就能在原有的设计的基础上,制作出一个可重入的锁了。

首先咱们实现一个ThreadList,这是一个链表结合,用来存放等待的处于等待唤醒的线程:

我们是否真的需要AtomicInteger

在上面的描述中,我们使用了一个AtomicInteger用于记录锁当前被重入的次数。使用atomicInteger当然没有错,不过我们也不得不承认,相比于普通的int和integer来说,atomicInteger确实是重了一些。一般来说在解决并发的问题的时候,我们倾向于解决一个我们已经解决的并发问题,其实其他问题一样,这种思想在数学里叫做:化归。不过这里我们重新思考一个问题。
在一个线程获取锁成功的前提下,在之后的过程中,是否有其它线程和它争用对state的控制权。仔细一想,其实是没有,因为能使用increment操作的线程,必须是获取到锁的当前线程,所以state,在锁从被某个线程获取到被释放的生命周期中,只能被持有锁的线程修改,既然只有一个线程能修改,就不存在什么并发问题,所以这里state其实根本不需要设计成使AtomicInteger,直接使用普通的int就好了。

public class ThreadList{
    private volatile Node head = null;
    private static  long headOffset;
    private static Unsafe unsafe;
    static {
        try {
            Constructor<Unsafe> constructor = Unsafe.class.getDeclaredConstructor(new Class<?>[0]);
            constructor.setAccessible(true);
            unsafe = constructor.newInstance(new Object[0]);
            headOffset = unsafe.objectFieldOffset(ThreadList.class.getDeclaredField("head"));
        }catch (Exception e){
        }
    }
    /**
     *
     * @param thread
     * @return 是否只有当前一个线程在等待
     */
    public boolean insert(Thread thread){
        Node node = new Node(thread);
        for(;;){
            Node first = getHead();
            node.setNext(first);
            if(unsafe.compareAndSwapObject(this, headOffset,first,node)){
                return first==null?true:false;
            }
        }
    }
    public Thread pop(){
        Node first = null;
        for(;;){
            first = getHead();
            Node next = null;
            if(first!=null){
                next = first.getNext();
            }
            if(unsafe.compareAndSwapObject(this, headOffset,first,next)){
                break;
            }
        }
        return first==null?null:first.getThread();
    }
    private Node getHead(){
        return this.head;
    }
    private static class Node{
        volatile Node next;
        volatile Thread thread;
        public Node(Thread thread){
            this.thread = thread;
        }
        public void setNext(Node next){
            this.next = next;
        }
        public Node getNext(){
            return next;
        }
        public Thread getThread(){
            return this.thread;
        }
    }
}

我们是否确实同时需要value和state

在最开始的设计中,value被设计用于表示锁当前是否被持有的状态,而最新加入的state被用于表示当前锁被重入的次数。但是当我们已经有了state的时候,我们还需要value吗?value的要求是,value所表示的集合至少要包含两个不同的值,或者说两个状态,一个代表被持有,另一个表示还未持有。state用于表示锁被重入的次数,0表示没有被重入,也即表示没有线程持有该锁。当有线程获取锁的时候,state加1,每次锁被重入的时候都加1.其实对于state来说,它不仅表示了锁当前被重入的次数,还反映了当前锁是否被线程持有,当state
== 0的时候,表示当前锁是空闲的,当state >
0的时候,表示当前锁被某个线程所持有。所以既然state身兼两义,所以站在性能最优的原则上来说,当我们有state的时候,就不再需要value了。

加锁失败的线程,调用insert方法将自己放入这个集合中,insert方法里将线程封装到Node中,然后使用cas操作将node添加到列表的头部。同样为了线程可见的问题,Node里的thread和next都用volatile修饰。

park和unpark

回顾我们之前设计的锁的方案。我们的锁,目前至少拥有了如下的设计

public class SimpleLock {
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;    
    static {        
        try {
            stateOffset = unsafe.objectFieldOffset(SimpleLock.class.getDeclaredField("state"));
        }catch (Exception e){
            throw new Error(e);
        }
    }
    private int state = 0;
    private Thread ownerThread;
    public boolean lock(){
        Thread currentThread = Thread.currentThread();
        if(currentThread == ownerThread){
            ++state;
            return true;
        }else {
            if(unsafe.compareAndSwapInt(this,stateOffset,0,1)){
                setOwnerThread(currentThread);
                return true;
            }
        }
        return false;
    }
    public void unlock(){
        --state;
    }
    public void setOwnerThread(Thread ownerThread) {
        this.ownerThread = ownerThread;
    }
}

在这个设计下,unsafe和stateOffset主要是为了实现CAS方法,关于他们的更多细节,请自行学习,上面的代码是无法运行的,主要原因在于unsafe要求只能java的标准库才能使用获取该实例。
由于CAS的方法会经常用到,后面的实例中,不会一一写CAS用unsafe实现的细节,我们假设在我们需要CAS的地方,都有这么一个方法,如下

/** cas操作,如果对象object的是期望的except,那么将其更新为newObject,返回成功与否
* @param object 
* @param except 
* @param newObject 
* @return 是否更新成功
*/
private boolean compareAndSet(Object object,Object except,Object newObject);

这里我们的关注点主要在state和ownerThread上。上面这个锁的设计,已经可以达到一个可以使用的可重入锁的需求了。而且也已经去掉了一些多余的数据,比如说value。但是这样的设计依旧不够好。我们来看SimpleLock的一个典型的用法。

SimpleLock simpleLock = new SimpleLock();
while (!simpleLock.lock()){
    System.out.println("获取锁失败,重试");
}
System.out.println("执行业务逻辑");
simpleLock.unlock();

我们可以看到,这个锁对于调用方来说,依旧是个自旋锁,当然我们可以包含掉这部分逻辑,让自旋这部分逻辑对外部透明,但是这依旧改变不了,这个锁需要自旋等待的结果。
自旋等待最大的问题就在于性能耗损严重,不断自旋的过程是对系统cpu资源极大的浪费,当锁被单个线程占用的时间越多,或者当有很多线程都在尝试获取锁的时候,对系统cpu的资源的浪费更是可观。所以我们希望,当一个线程第一次获取锁失败的时候,它可以去休息等待锁释放,而不是不断的自旋,询问锁是否准备好了。而需要实现这个目标,第一个需求就是需要有一个办法,可以让线程进入上述的等待状态。自己设计一个这样的东西无疑是很难的,幸好java在1.5之后,给我们提供了让线程进入等待的一个便捷的方法库java.util.concurrent.locks.LockSupport。

有了LockSupport之后,我们就可以将线程置为阻塞状态了。这样当线程获取失败后,就可以直接将线程置为阻塞,用于节约对资源的消耗了。

加锁成功的线程,调用pop方法获得一个线程,进行唤醒,这里边同样使用了cas操作来保证线程安全。

阻塞解除的方案

我们有了阻塞,自然就要有唤醒线程的方法,否则线程就会无限等待下去了。既然要唤醒等待的线程,自然的我们的第一个需求,就应该是先设计一个数据结构用于保存阻塞的线程。我们先不纠结集合的具体细节,我们先把它看成一个抽象的集合,我们先来想想这个集合需要满足哪些功能。

 

这个集合最好FIFO的

我们先来考虑公平锁的情况,公平锁希望的是,锁的获取顺序,是由试图获取锁的时候顺序决定的。所以这个集合如果能是一个先进先出队列,那么就可以满足公平锁的需求了。

接下来在看看锁的实现:

队列插入和读取是线程安全的

要一个FIFO的队列很简单,但是现在的情况,却并不是那么简单,因为现在的环境是多线程的,在一个线程插入的同时,另一个线程也可能也在插入或者删除,所以一个困难点,在于如何保证他的线程安全。

public class MyLock {
    private volatile int state = 0;
    private ThreadList threadList = new ThreadList();
    private static  long stateOffset;
    private static Unsafe unsafe;
    static {
       try {
           Constructor<Unsafe> constructor = Unsafe.class.getDeclaredConstructor(new Class<?>[0]);
           constructor.setAccessible(true);
           unsafe = constructor.newInstance(new Object[0]);
           stateOffset = unsafe.objectFieldOffset(MyLock.class.getDeclaredField("state"));
       }catch (Exception e){
       }

    }
    public void lock(){
        if(compareAndSetState(0,1)){
        }else{
            addNodeAndWait();
        }
    }
    public void unLock(){
        compareAndSetState(1,0);
        Thread thread = threadList.pop();
        if(thread != null){
            LockSupport.unpark(thread);
        }
    }
    private void addNodeAndWait(){
        //如果当前只有一个等待线程时,重新获取一下锁,防止永远不被唤醒。
        boolean isOnlyOne = threadList.insert(Thread.currentThread());
        if(isOnlyOne && compareAndSetState(0,1)){
            return;
        }
        LockSupport.park(this);//线程被挂起
        if(compareAndSetState(0,1)){//线程被唤醒后继续竞争锁
            return;
        }else{
            addNodeAndWait();
        }
    }
    private boolean compareAndSetState(int expect,int update){
        return unsafe.compareAndSwapInt(this,stateOffset,expect,update);
    }
}

线程不安全的原因

假设我们以最简单的单链表为基础,尝试在单链表的基础上,通过改进来设计一个线程安全的链表。首先,我们来看看为什么单链表不是线程安全的。

 

初始化环节

首先,一个链表并不是开始就可用的,还需要初始化的过程。当head为null的时候,我们需要对链表做初始化,但是这个初始化对于插入来说应该是透明的,或者说这个链表的初始化逻辑,应该是藏于插入等方法中的,外部应该尽可能的无感知。我们先来看看普通的链表初始化操作在多线程的环境下,会出现的问题

private void initFifo(){
    head = new Node();
    head.setNext(null);
}
  • head被重复初始化
  • 同一个时间,head被多个线程初始化,而各个线程所看到的head却是不一样的
    所以我们问题解决的重心,应该放在如何保证在head被第一次设置之后,不会被修改。我们知道head的初始值是null,所以问题转换为当head第一次不为null之后,head不会被改变了。
    在解决问题之前,先看看我们现在的工具和武器有什么,很可惜,目前我们只有CAS和LockSupport,LockSupport显然无法帮我解决这个问题。然后CAS刚刚好是可以解决这个问题的,只要我们将期望值设为null就好了,这样head就有且仅能被设置一次了,代码如下所示。

private void initFifoThreadSafe(){
    if(head == null){
        Node myHead = new Node();
        myHead.setNext(null);
        compareAndSet(head,null,myHead);
    }
}

上面的代码解决了head可能在运行的过程被改变的问题,但是不能解决向内存多次申请Node的问题,不过这个问题并不会影响链表的行为,而且java有一套优秀的内存回收机制,我们当然有办法让这个地方只初始化一个Node,但是比起这么一瞬间节约的内存,带来的编码性的复杂性和系统性能的下降,更为令人不能接受。而且更简单,一般来说总是更好的所以在这个问题上,我们可以选择忽略可能存在的一瞬间申请多个Node的问题。

线程调用lock方法进行加锁,cas将state从0修改1,修改成功则加锁成功,lock方法返回,否则调用addNodeAndWait方法将线程加入ThreadList队列,并使用LockSupport将线程挂起。(ThreadList的insert方法,返回一个boolean类型的值,用来处理一个特殊情况的,稍后再说。)

插入环节

初始化的问题解决了,然后我们再来看看插入环节的问题。同样的,我们研究一下,普通的插入策略有哪些线程安全的问题。

initFifoThreadSafe();
Node tail = head;
while (tail.getNext() != null){
    tail = tail.getNext();
}
tail.setNext(node);

在多线程环境下,这段代码存在的问题是

  • 我们无法准确的获知,在我们需要插入新的尾的一瞬间之前,当前看到的tail,是否是当前真正的tail。
    很显然,我们无法通过遍历数组来获取当前准确的尾节点,这个时候我们需要的额外的信息是,向我们表明当前的tail是什么。针对这个问题,我们可以添加一个指向tail的节点,每次添加通过cas更新它,以此来保证,我们可见的tail是当前的tail。代码如下。

private void addNodeThreadSafe(Node node){
    while (true){
        Node oldTail = tail;
        if(compareAndSet(tail,oldTail,node)){
            oldTail.setNext(node);
            break;
        }
    }
}

获得锁的线程执行完业务逻辑后,调用unLock方法释放锁,即通过cas操作将state修改回0,同时从ThreadList拿出一个等待线程,调用LockSupport的unpark方法,来将它唤醒。

删除环节/出队列

对于公平锁来说,由于是先进先出的,所以出队列的,只能是头结点。照常我们先来看看,普通的pop操作有什么问题。

private Node pop(){
    Node node = head;
    head = head.getNext();
    return node;
}

普通的pop操作在并发的环境下,一个重要的问题,是当你试图获取head的时候,可能当前head已经被改变了。所以需要控制,方案和之前大同小异。如下所示

private Node popThreadSafe(){
    while (true){
        Node tempHead = head;
        Node node = tempHead.getNext();
        if(compareAndSet(head,tempHead,node)){
            return node;
        }
    }
}

注意,在node出队列的时候,我们还需要知道node入队列的时候的对应的线程,所以node的构造函数,应该如下所示。

public Node(){
    thread = Thread.currentThread();
}

截止到现在,我们已经得到了一个比较完整的公平锁。我们目前得到的代码,如下所示。

/**
 * 一个携带线程信息的Node节点
**/
public class Node {
    private Node next;
    private Node previous;
    private Thread thread;
    public Node(){
        thread = Thread.currentThread();
    }
}

/**
 * 线程安全的FIFO队列
 */
public class ThreadSafeFifo {
    private Node head;
    private Node tail;
    private void addNodeThreadSafe(Node node){
        while (true){
            Node oldTail = tail;
            if(compareAndSet(tail,oldTail,node)){
                oldTail.setNext(node);
                break;
            }
        }
    }
    private void initFifoThreadSafe(){
        if(head == null){
            Node myHead = new Node();
            myHead.setNext(null);
            compareAndSet(head,null,myHead);
        }
    }
    private Node popThreadSafe(){
        while (true){
            Node tempHead = head;
            Node node = tempHead.getNext();
            if(compareAndSet(head,tempHead,node)){
                return node;
            }
        }
    }
}

(截止2016/04/26未完成)

 

 

将我们在《自己动手写把”锁”—锁的作用》的例子修改为如下,来测试下咱们的锁的效果:

public class TestMyLock {
    private static  List<Integer> list = new ArrayList<>();
    private static MyLock myLock = new MyLock();
    public static void main(String[] args){
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                for(int i=0;i<10000;i++){
                    add(i);
                }
            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                print();
            }
        });
        t1.start();
        t2.start();
    }
    private static void add(int i){
        myLock.lock();
        list.add(i);
        myLock.unLock();
    }
    private static void print(){
        myLock.lock();
        Iterator<Integer> iterator = list.iterator();
        while (iterator.hasNext()){
            System.out.println(iterator.next());
        }
        myLock.unLock();
    }
}

ok,正常运行了,不在报错。

 

到这里咱们的一个简单地锁已经实现了。接下来我再把上边的,一个没讲的细节说一下。即如下这段代码:

boolean isOnlyOne = threadList.insert(Thread.currentThread());
        if(isOnlyOne && compareAndSetState(0,1)){
            return;
        }

ThreadList的insert方法,在插入成功后,会判断当前链表中是否只有自己一个线程在等待,如果是则返回true。从而进入后边的if语句。这个逻辑的用意就是:如果只有自己一个线程在等待时,则试着通过cas操作重新获取锁,如果获取失败才进入阻塞等待。它是用来解决以下边界情况:

图片 6

在只有线程A和线程B两个线程的时候,如果没有以上判断逻辑,线程B将有可能会永远处于阻塞不被唤醒。 

 

以下是本系列其他的文章:

自己动手写把”锁”之—锁的作用

自己动手写把”锁”之—JMM和volatile

自己动手写把”锁”—原子性操作

自己动手写把”锁”—LockSupport深入浅出

 


有兴趣的朋友,可以加入我的知识圈,一起研究讨论。

我正在「JAVA互联网技术」和朋友们讨论有趣的话题,你一起来吧?

 

相关文章

Leave a Reply

电子邮件地址不会被公开。 必填项已用*标注