好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Java current并发包超详细分析

并发包

current并发包、在JDK1.5之前Java并没有提供线程安全的一些工具类去操作多线程,需要开发人员自行编写实现线程安全,但仍然无法完全避免低性能、死锁、资源管理等问题。在JDK1.5时新增了 java.util.current 并发包,其中提供了许多供我们使用的并发编程工具类。本文对于典型的并发包做出讲解

ConcurrentHashMap

Java集合框架提供了存储容器HashMap用于存储键值对,但是HashMap是线程不安全的。在并发编程中,我们向HashMap添加大量数据时,可能会出现各种预料之外的问题。

同时Java也提供了线程安全的集合类HashTable,打开HashTable的底层我们会发现HashTable的所有方法都利用synchtonized进行了上锁机制来保证了线程安全,但是利用这种阻塞同步的机制来保证线程安全的同时会大大降低程序的性能和执行效率,这也是为什么HashTable被淘汰的原因

在JDK1.5之后Java就提供了保证性能高效、线程安全的键值对存储容器ConcurrentHashMap

下面我们看下HashMap、HashTable、ConcurrentHashMap的对比

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

public class Demo01 {

     //public static Map<String,String> maps = new HashMap<String, String>();

     //public static Map<String,String> maps = new Hashtable<String, String>();

     public static Map<String,String> maps = new ConcurrentHashMap<String, String>();

     public static void main(String[] args) throws Exception {

         Runnable task = new Temp();

         Thread t1 = new Thread(task, "A线程" );

         Thread t2 = new Thread(task, "B线程" );

         t1.start();

         t2.start();

         // 保证t1和t2先执行完

         t1.join();

         t2.join();

         System.out.println( "最终集合长度:" +maps.size());

     }

}

class Temp implements Runnable{

     @Override

     public void run() {

         for ( int i = 0 ; i < 500000 ; i++) {

  Demo01.maps.put(Thread.currentThread().getName()+i,Thread.currentThread().getName()+i);

         }

     }

}

如上述代码所示,我们启动两条线程执行同一任务:向容器中添加50万条数据,预期最终容器中的数据将会达到100万条。

利用HashMap存储时,发现程序会出现各种各样的异常状况

程序卡顿,不报异常也不停止

报异常

java.lang.ClassCastException: java.util.HashMap$Node cannot be cast to java.util.HashMap$TreeNode

最终产生错误数据

利用HashTable存储时,发现HashTable可以准确存储。并且对比HashTable和ConcurrentHashMap两者的存储速度,发现大差小不差甚至HashTable还要更快。那么为什么还要说HashTable效率低下呢?

是因为我们只是测试了对数据进行的写操作,而没有测试其他的像查询、修改等操作。综合来讲ConcurrentHashMap的各项性能优于HashTbale,所以我们在需要考虑线程安全时,就可以采用ConcurrentHashMap进行存储数据

那么ConcurrentHashMap是如何既保证线程安全又不失高性能的存储数据呢?

首先明确它的底层实现机制是用CAS机制+synchronized分段式锁,属于是悲观和乐观相结合

HashTable工作时会将整个哈希表进行上锁,此时所有其他线程都将被阻塞,效率低下

ConcurrentHashMap工作时利用synchronized进行分段式上锁,我们知道哈希表底层基于数组实现,数组中每个位置形成槽位以便后续成链或者转换树结构。而分段式上锁就是将当前线程所存储的该位置进行上锁,其他位置仍可以被其他线程进行操作。

CountDownLatch倒计数触发

CountDownLatch同样是current包下的一个同步工具,它的主要作用就是使当前线程等待一条或多条线程执行完毕后再执行当前线程。同时提供了两个主要方法来控制线程的交替执行

?

1

2

3

4

// 创建CountDownLatch

CountDownLatch cdl = new CountDownLatch( 1 );

cdl.await() // 让出cpu,使当前线程等待

cdl.CountDown() // 计数器减1,只有当计数器为零时才会唤醒被await的线程

CountDownLatch提供了一个构造器用于参数Count,在创建时就给定计数个数。每次调用CountDown方法就减一知道减为0时才会执行被await等待的线程。

我们来看下面这个示例,目的是顺序打印出[A、B、C]

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

public class Demo02 {

     public static void main(String[] args) {

         CountDownLatch count = new CountDownLatch( 1 );

         new ThreadA(count).start();

         new ThreadB(count).start();

     }

}

class ThreadA extends Thread{

     private CountDownLatch count;

     public ThreadA(CountDownLatch count) {

         this .count = count;

     }

     @Override

     public void run() {

         System.out.println( "A" );

         // 使当前线程等待  等待打印B之后宰继续执行打印A

         try {

             count.await();

         } catch (InterruptedException e) {

             throw new RuntimeException(e);

         }

         System.out.println( "C" );

     }

}

class ThreadB extends Thread{

     private CountDownLatch count;

     public ThreadB(CountDownLatch count) {

         this .count = count;

     }

     @Override

     public void run() {

         System.out.println( "B" );

         // 当前线程执行完后倒计数减一

         count.countDown();

     }

}

但是有序线程执行先后 顺序不确定,也有可能打印出[B、A、C]

CyclicBarrier循环屏障

CyclicBarrier与CountDownLatch很容易弄混

CountDownLatch:使一条或多条线程等待其他线程执行完毕之后再执行自己,内部使用倒计数,最终执行被await等待的线程

CyclicBarrier:阻塞一个线程组,内部采用正计数。当被阻塞的线程达到某个数量时才能执行指定的任务。我们每调用一次await代表阻塞了一条线程。

假设示例:五个人进入会议室执行开会任务

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

// 六条线程:五个员工进入会议室、一个开会

public class CyclicBarrierDemo {

     public static void main(String[] args) {

         // 创建循环屏障

         CyclicBarrier cb = new CyclicBarrier( 5 , new Metting());

         for ( int i = 1 ; i <= 4 ; i++) {

             new Employee(i+ "号员工" ,cb).start();

         }

     }

}

class Employee extends Thread{

     private CyclicBarrier cb;

     public Employee(String s, CyclicBarrier cb) {

         super (s);

         this .cb = cb;

     }

     @Override

     public void run() {

         System.out.println(Thread.currentThread().getName()+ "进入会议室" );

 

         try {

             Thread.sleep( 1000 );

             cb.await();

         } catch (Exception e) {

             e.printStackTrace();

         }

     }

}

class Metting implements Runnable{

     @Override

     public void run() {

         System.out.println(Thread.currentThread().getName()+ "组织会议,会议开始" );

     }

}

上述代码所示:

?

1

CyclicBarrier cb = new CyclicBarrier( 5 , new Metting());

我们创建了一个循环屏障用于控制线程执行,当被await阻塞的线程数==5时将会执行newMetting的Runnable线程任务

同时会发现最后一个到达会议室的人(线程)将会组织会议开始,这说明我们调用了await方法并不是将该线程阻塞。是由于CyclicBarrier底层由线程池实现,每一条线程执行完毕之后都会被线程池回收而不是阻塞

Semaphore指示灯

Semaphore用于设置一个或多个线程可以同时执行即控制线程的并发数量,其他线程被阻塞。常用于限流操作。同时可以设置公平锁和非公平锁

Semaphore的使用与Lock工具有些类似,同样是提供了两个方法用于上锁和解锁。只是Semaphore可以自由的控制能拿到锁的线程数

Semaphore提供了如下两个构造器

?

1

2

3

public Semaphore( int permits) // permits为允许执行的线程数

public Semaphore( int permits, boolean fair)

     // fair为true表示公平锁,等待时间最长的线程将在下次进入 反之是不公平锁

Semphore提供的两个操作锁方法

?

1

2

public void acquire()  // 表示获得许可

public void release()  // 表示释放许可

示例:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

public class SemaphoreDemo {

     public static void main(String[] args) {

         // 创建任务

         Service service = new Service();

         for ( int i = 1 ; i <= 5 ; i++) {

             new MyThread(i+ "号线程" ,service).start();

         }

     }

}

// 线程类

class MyThread extends Thread{

     private Service service;

     public MyThread(String name,Service service){

         super (name);

         this .service = service;

     }

     @Override

     public void run() {

         try {

             service.testMethod();

         } catch (Exception e) {

             throw new RuntimeException(e);

         }

     }

}

// 抽离业务代码

class Service{

     // 创建Semaphore对象 并指定线程数

     private Semaphore sp = new Semaphore( 2 );

     public void testMethod() throws Exception {

         // 获取许可

         sp.acquire();

         System.out.println(Thread.currentThread().getName()+ "进入  时间:" +System.currentTimeMillis());

         Thread.sleep( 200 );

         System.out.println(Thread.currentThread().getName()+ "执行成功" );

         System.out.println(Thread.currentThread().getName()+ "离开  时间:" +System.currentTimeMillis());

         // 释放许可

         sp.release();

     }

}

如上述程序所示,我们在创建Semaphore时指定了允许的并发数量为2,那么业务代码同时只能被两个线程执行,一旦一条线程执行完毕之后将会释放许可,立刻会有其他线程获得许可进入执行

Exchanger交换者

Exchanger用于线程间的通信、数据交换。Exchanger提供了一个同步点exchange方法: public V exchange(V x) 互相交换数据的两条线程必须都运行到了同步点才能执行交换数据的操作,只有一方到达时就会进行等待,等待时间可以由开发人员设定

我们先来看下面的示例

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

public class ExchangerDemo {

     public static void main(String[] args) {

         // 创建交换者

         Exchanger<String> exchanger = new Exchanger<>();

         // 创建两条线程进行交换数据

         new ThreadN( "线程N" ,exchanger).start();

         new ThreadP( "线程P" ,exchanger).start();

     }

}

class ThreadN extends Thread{

     private Exchanger<String> exchanger;

     public ThreadN(String name,Exchanger<String> exchanger) {

         super (name);

         this .exchanger = exchanger;

     }

     @Override

     public void run() {

         System.out.println(Thread.currentThread().getName()+ "给线程P:" + "我是线程N" );

         try {

             String exchange = exchanger.exchange( "我是线程N" );

             System.out.println( "线程N拿到数据:" +exchange);

         } catch (InterruptedException e) {

             throw new RuntimeException(e);

         }

     }

}

class ThreadP extends Thread{

     private Exchanger<String> exchanger;

     public ThreadP(String name,Exchanger<String> exchanger) {

         super (name);

         this .exchanger = exchanger;

     }

     @Override

     public void run() {

         System.out.println(Thread.currentThread().getName()+ "给线程N:" + "我是线程P" );

         try {

             String exchange = exchanger.exchange( "我是线程P" );

             System.out.println( "线程P拿到数据:" +exchange);

         } catch (InterruptedException e) {

             throw new RuntimeException(e);

         }

     }

}

根据最终打印,可以发现两者交换了数据。这两条线程拥有的是同一个交换者对象,所以可以实现数据交换。

前文提到过我们可以自定义线程等待的时间,就是再同步点exchange处等待另一条线程执行到此的时间。利用exchange方法定义等待时间

?

1

2

3

4

public V exchange(V x, long timeout, TimeUnit unit)

     // timeout等待的时间数值  unit时间单位

     // 示例:只等待五秒

exchanger.exchange( "111" , "5000" , TimeUnit.SECONDS)

超出了规定的等待时间,正在等待的线程将被回收并抛出 java.util.TimeoutException 超时异常,所以交换数据的双方必须都执行到同步点才能进行数据交换

到此这篇关于Java current并发包超详细分析的文章就介绍到这了,更多相关Java current并发包内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://blog.csdn.net/yuqu1028/article/details/128687010

查看更多关于Java current并发包超详细分析的详细内容...

  阅读:28次