志在指尖
用双手敲打未来

信号量Semaphore

Semaphore也是一个同步器,和前面两篇说的CountDownLatch和CyclicBarrier不同,这是递增的,初始化的时分能够指定一个值,但是不需求晓得需求同步的线程个数,只需求在同步的中央调用acquire办法时指定需求同步的线程个数;
一.简单运用
同步两个子线程,只要其中两个子线程执行终了,主线程才会执行:
packagecom.example.demo.study;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
importjava.util.concurrent.Semaphore;
publicclassStudy0217{
//创立一个信号量的实例,信号量初始值为0
staticSemaphoresemaphore=newSemaphore(0);
publicstaticvoidmain(String[]args)throwsException{
ExecutorServicepool=Executors.newFixedThreadPool(3);
pool.submit(()->{
System.out.println(“Thread1—start”);
//信号量加一
semaphore.release();
});
pool.submit(()->{
System.out.println(“Thread2—start”);
//信号量加一
semaphore.release();
});
pool.submit(()->{
System.out.println(“Thread3—start”);
//信号量加一
semaphore.release();
});
//等候两个子线程执行终了就放过,必需要信号量等于2才放过
semaphore.acquire(2);
System.out.println(“两个子线程执行终了”);
//关闭线程池,正在执行的任务继续执行
pool.shutdown();
}
}
这个信号量也能够复用,相似CyclicBarrier:
packagecom.example.demo.study;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
importjava.util.concurrent.Semaphore;
publicclassStudy0217{
//创立一个信号量的实例,信号量初始值为0
staticSemaphoresemaphore=newSemaphore(0);
publicstaticvoidmain(String[]args)throwsException{
ExecutorServicepool=Executors.newFixedThreadPool(3);
pool.submit(()->{
System.out.println(“Thread1—start”);
//信号量加一
semaphore.release();
});
pool.submit(()->{
System.out.println(“Thread2—start”);
//信号量加一
semaphore.release();
});
//等候两个子线程执行终了就放过,必需要信号量等于2才放过
semaphore.acquire(2);
System.out.println(“子线程1,2执行终了”);
pool.submit(()->{
System.out.println(“Thread3—start”);
//信号量加一
semaphore.release();
});
pool.submit(()->{
System.out.println(“Thread4—start”);
//信号量加一
semaphore.release();
});
semaphore.acquire(2);
System.out.println(“子线程3,4执行终了”);
//关闭线程池,正在执行的任务继续执行
pool.shutdown();
}
}
二.信号量原理
看看下面这个图,能够晓得信号量Semaphore还是依据AQS完成的,内部有个Sync工具类操作AQS,还分为公平战略和非公平战略;
结构器:
//默许是非公平战略
publicSemaphore(intpermits){
sync=newNonfairSync(permits);
}
//能够依据第二个参数选择是公平战略还是非公平战略
publicSemaphore(intpermits,booleanfair){
sync=fair?newFairSync(permits):newNonfairSync(permits);
}
acquire(intpermits)办法:
publicvoidacquire(intpermits)throwsInterruptedException{
if(permits<0)thrownewIllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
//AQS中的办法
publicfinalvoidacquireSharedInterruptibly(intarg)
throwsInterruptedException{
if(Thread.interrupted())thrownewInterruptedException();
//这里依据子类是公平战略还是非公平战略
if(tryAcquireShared(arg)<0)
//获取失败会进入这里,将线程放入阻塞队列,然后再尝试,还是失败的话就调用park办法挂起当前线程
doAcquireSharedInterruptibly(arg);
}
//非公平战略
protectedinttryAcquireShared(intacquires){
returnnonfairTryAcquireShared(acquires);
}
finalintnonfairTryAcquireShared(intacquires){
//一个无限循环,获取state剩余的信号量,由于每调用一次release()办法的话,信号量就会加一,这里将
//最新的信号量减去传进来的参数比拟,比方有两个线程,其中一个线程曾经调用了release办法,然后调用acquire(2)办法,那么
//这里remaining的值就是-1,返回-1,然后当前线程就会被丢到阻塞队列中去了;假如另外一个线程也调用了release办法,
//那么此时的remaining==0,所以在这里的if中会调用CAS将0设置到state
//
for(;;){
intavailable=getState();
intremaining=available-acquires;
if(remaining<0||compareAndSetState(available,remaining))
returnremaining;
}
}
//公平战略
//和上面非公平差不多,只不过这里会查看阻塞队列中当前节点前面有没有前驱节点,有的话直接返回-1,
//就会把当前线程丢到阻塞队列中阻塞去了,没有前驱节点的话,就跟非公平形式一样的了
protectedinttryAcquireShared(intacquires){
for(;;){
if(hasQueuedPredecessors())
return-1;
intavailable=getState();
intremaining=available-acquires;
if(remaining<0||compareAndSetState(available,remaining))
returnremaining;
}
}
再看看release(intpermits)办法:
//这个办法的作用就是将信号量加一
publicvoidrelease(intpermits){
if(permits<0)thrownewIllegalArgumentException();
sync.releaseShared(permits);
}
//AQS中办法
publicfinalbooleanreleaseShared(intarg){
//tryReleaseShared尝试释放资源
if(tryReleaseShared(arg)){
//释放资源胜利就调用park办法唤醒唤醒AQS队列中最前面的节点中的线程
doReleaseShared();
returntrue;
}
returnfalse;
}
protectedfinalbooleantryReleaseShared(intreleases){
//一个无限循环,获取state,然后加上传进去的参数,假如新的state的值小于旧的state,阐明曾经超越了state的最大值,溢出了
//没有溢出的话,就用CAS更新state的值
for(;;){
intcurrent=getState();
intnext=current+releases;
if(next<current)//overflow
thrownewError(“Maximumpermitcountexceeded”);
if(compareAndSetState(current,next))
returntrue;
}
}
privatevoiddoReleaseShared(){
for(;;){
Nodeh=head;
if(h!=null&&h!=tail){
intws=h.waitStatus;
//ws==Node.SIGNAL表示节点中线程需求被唤醒
if(ws==Node.SIGNAL){
if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))
continue;//looptorecheckcases
//调用阻塞队列中线程的unpark办法唤醒线程
unparkSuccessor(h);
}
//ws==0表示节点中线程是初始状态
elseif(ws==0&&!compareAndSetWaitStatus(h,0,Node.PROPAGATE))
continue;//looponfailedCAS
}
if(h==head)//loopifheadchanged
break;
}
}

未经允许不得转载:IT技术网站 » 信号量Semaphore
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!

 

志在指尖 用双手敲打未来

登录/注册IT技术大全

热门IT技术

C#基础入门   SQL server数据库   系统SEO学习教程   WordPress小技巧   WordPress插件   脚本与源码下载