java.util.concurrent包的拆解

java.util.concurrent包:
1.locks部分:显式锁(互斥锁和速写锁)相关
2.atomic部分:原子变量类相关,是构建非阻塞算法的基础
3.executor部分:线程池相关
4.collection部分:并发容器相关
5.tools部分:同步工具相关,如信号量、闭锁、栅栏等功能

创新互联建站专注于成都网站建设、网站制作、网页设计、网站制作、网站开发。公司秉持“客户至上,用心服务”的宗旨,从客户的利益和观点出发,让客户在网络营销中找到自己的驻足之地。尊重和关怀每一位客户,用严谨的态度对待客户,用专业的服务创造价值,成为客户值得信赖的朋友,为客户解除后顾之忧。

1.collection部分:
1.1 BlockingQueue
BlockingQueue为接口,如果要用它,需要实现它的子类:
ArrayBlockingQueue
DelayQueue
LinkedBlockingQueue
SynchronousQueue
PriorityBlockingQueue
TransferQueue

   /**
  • 在两个独立的线程中启动一个Producer和一个Consumer
  • Producer向一个共享的BlockingQueue注入字符串,而Comsumer从中拿出来
  • @Author mufeng
  • @Date 2019-7-8 11:25
    */
    public class BlockingQueueExample {
    public static void main(String[] args) {
    BlockingQueue blockingQueue=new ArrayBlockingQueue(1024);
    Producer producer=new Producer(blockingQueue);
    Consumer consumer=new Consumer(blockingQueue);
    new Thread(producer).start();
    new Thread(consumer).start();
    try {
    Thread.sleep(4000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    class Producer implements Runnable{
    private BlockingQueue blockingQueue;
    public Producer(BlockingQueue blockingQueue){this.blockingQueue=blockingQueue;
    }
    @Override
    br/>this.blockingQueue=blockingQueue;
    }
    @Override
    try {
    blockingQueue.put("1");
    Thread.sleep(1000);
    blockingQueue.put("2");
    Thread.sleep(1000);
    blockingQueue.put("3");
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    class Consumer implements Runnable{
    private BlockingQueue blockingQueue;
    public Consumer(BlockingQueue blockingQueue){
    this.blockingQueue=blockingQueue;
    }

    @Override
    public void run() {
    try {
    System.out.println(blockingQueue.take());
    System.out.println(blockingQueue.take());
    System.out.println(blockingQueue.take());
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

2.Tools部分
2.1 CountDownLatch用法

/**

  • @Author mufeng
  • @Date 2019-7-8 11:54
    */
    public class TestCountDownLatch {
    public static void main(String[] args) {
    final CountDownLatch countDownLatch=new CountDownLatch(2);
    new Thread(){
    public void run(){
    System.out.println(Thread.currentThread().getName()+"processing");
    try {
    Thread.sleep(3000);
    System.out.println(Thread.currentThread().getName()+"ended");
    countDownLatch.countDown();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }.start();
    new Thread(){
    public void run(){
    System.out.println(Thread.currentThread().getName()+"processing");
    try {
    Thread.sleep(3000);
    System.out.println(Thread.currentThread().getName()+"ended");
    countDownLatch.countDown();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }.start();
    try {
    System.out.println("wait success...");
    countDownLatch.await();//调用await()方法的线程会被挂起,会等待直到count值为0才继续执行
    System.out.println("2 end");
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

/**

  • 所有线程都停留在栅栏的位置,都结束,才继续执行
  • @Author mufeng
  • @Date 2019-7-8 14:10
    */
    public class TestCyclicBarrier {
    public static void main(String[] args) {
    int n=4;
    CyclicBarrier cyclicBarrier=new CyclicBarrier(n);
    for(int i=0;inew Writer(cyclicBarrier).start();
    }

}
class Writer extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier){
this.cyclicBarrier=cyclicBarrier;
}
public void run(){
try {
System.out.println("writer start");
Thread.sleep(5000);
System.out.println("writer end");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("continue...");
}
}

/**

  • Semaphore可以控制同时访问的线程个数,通过acquire()获取一个许可,如果没有就等待,而release()释放一个许可。
  • @Author mufeng
  • @Date 2019-7-8 14:32
    */
    public class TestSemaphore {
    public static void main(String[] args) {
    int N=8;//工人数
    Semaphore semaphore=new Semaphore(5);//机器数
    for(int i=0;inew Worker(i,semaphore).start();
    }
    }
    }
    class Worker extends Thread{
    private int num;
    private Semaphore semaphore;
    public Worker(int num,Semaphore semaphore){
    this.num=num;
    this.semaphore=semaphore;
    }
    public void run(){
    try {
    semaphore.acquire();
    System.out.println("工人"+this.num+"occupy a machine。。。");
    Thread.sleep(2000);
    semaphore.release();
    System.out.println("工人"+this.num+"release a machine");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

/**

  • Exchanger是在两个任务之间交换对象的栅栏,当这些任务进入栅栏时,它们各自拥有一个对象,当它们离开时,它们都拥有之前由对象持有的对象
  • @Author mufeng
  • @Date 2019-7-8 14:54
    */
    public class TestExchanger {
    public static void main(String[] args) {
    ExecutorService executor =Executors.newCachedThreadPool();
    final Exchanger exchanger=new Exchanger();
    executor.execute(new Runnable() {@Override
    br/>@Override
    String data1="li";
    doExchangeWork(data1,exchanger);
    }
    });
    executor.execute(new Runnable() {@Override
    br/>@Override
    String data1="zhang";
    doExchangeWork(data1,exchanger);
    }
    });
    executor.shutdown();
    }
    private static void doExchangeWork(String data1,Exchanger exchanger){
    System.out.println(Thread.currentThread().getName()+"exchange:"+data1);
    try {
    String data2 = (String) exchanger.exchange(data1);
    System.out.println(Thread.currentThread().getName()+"exchange to"+data2);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

3.Executor
四种线程池:newFixedThreadPool,newCachedThreadPool,newSingleThreadExecutor,newScheduledThreadPool
1.newFixedThreadPool创建一个可重用固定线程数的线程池,以共享的×××队列方式来运行线程。
2.newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程
3.newScheduledThreadPool创建一个定长线程池,支持定时及周期性任务执行
4.newSingleThreadExecutor创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO,LIFO,优先级)执行

任务分两类:一类是实现了Runnable接口的类,一类是实现了Callable接口的类, Callable的call()方法只能通过ExecutorService的submit(Callable task)方法来执行,
并且返回一个Future.

4.lock
Synchronized缺点
1.无法中断
2.无法设置超时
3.使用在方法上的synchronized其实是个语法糖

lock(),trylock(),tryLock(long time,TimeUnit unit)和lockInterruptibly()是用来获取锁的,unlock()方法是用来释放锁的。

ReentrantLock 可重入锁

5.atomic
标量类:AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference
数组类:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
更新器类:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater
复合变量类:AtomicMarkableReference,AtomicStampedReference


分享文章:java.util.concurrent包的拆解
文章路径:http://hbruida.cn/article/ipdgdc.html