好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Java多线程之Worker Thread模式

一.Worker Thread模式

Worker 的意思是工作的人,在 Worker Thread 模式中,工人线程 Worker thread 会逐个取回工作并进行处理,当所有工作全部完成后,工人线程会等待新的工作到来。

Worker Thread 模式也被成为 Background Thread (背景线程)模式,另外,如果从保存多个工人线程的场所这一点看,我们也可以称这种模式为 Thread Pool 模式。

二   .Worker Thread模式中的角色

1.Client(委托者)

创建表示工作请求的 Request 并将其传递给 Channel 。在示例程序中, ClientThread 相当于该角色。

2.Channel(通信线路)

Channel 角色接受来自于 Client 的 Request ,并将其传递给 Worker 。在示例程序中, Channel 相当于该角色。

3.Worker(工人)

Worker 角色从 Channel 中获取 Request ,并进行工作,当一项工作完成后,它会继续去获取另外的 Request ,在示例程序中, WorkerThread 相当于该角色。

4.Request(请求)

Request 角色是表示工作的角色, Request 角色中保存了进行工作所必须的信息,在示例程序中, Request 相当于该角色。

三.Worker Thread使用场景

想象一个场景,一个工厂在生产玩具,在一个车间里,有几个工人,每次生产部件准备好车间外的人就将部件放到车间的一个桌子上,工人每次做完一个玩具就从桌子上取部件。在这里,注意到,部件并不是直接交给工人的,另外一点,工人并不是做完一个部件就回家换个新人,后者在现实有点滑稽,但是在程序中却对应一个典型的线程使用方法:线程池。

所谓线程池,就是对线程的复用,当线程执行完任务之后就继续取其他任务执行,而不是销毁启动新线程执行其他任务。因为线程的启动对于系统性能开销比较大,所以这样对于系统性能的提高很有好处。

四.Worker Thread模式程序示例

首先是请求,即玩具的部件

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

public class Request {

 

     private final String name;

     private final int number;

 

     public Request(String name, int number) {

         this .name = name;

         this .number = number;

     }

 

     public void execute(){

         System.out.println(Thread.currentThread().getName()+ " executed " + this );

     }

     @Override

     public String toString() {

         return "Request=> " + "No." + number + "  Name." + name;

     }

}

也就是拥有 name 和 number 并且 execute 的时候打印出字段的一个简单类。

ClientThread ,负责将请求放入 RequestQueue 中,即将部件放到桌子上。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

public class ClientThread extends Thread {

 

     private static final Random random = new Random(System.currentTimeMillis());

 

     private final Channel channel;

 

     public ClientThread(String name, Channel channel) {

         super (name);

         this .channel = channel;

     }

 

     @Override

     public void run() {

         try {

 

             for ( int i = 0 ; true ; i++) {

                 Request request = new Request(getName(),i);

                 this .channel.put(request);

                 Thread.sleep(random.nextInt(1_000));

             }

         } catch (Exception e) {

 

         }

     }

}

Channel类,可以当做车间

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

public class Channel {

 

     private final static int MAX_REQUEST = 100 ;

 

     private final Request[] requestQueue;

     private final WorkerThread[] workerPool;

     private int head;

     private int tail;

     private int count;

 

     public Channel( int workers) {

 

         this .requestQueue = new Request[MAX_REQUEST];

         this .head = 0 ;

         this .tail = 0 ;

         this .count = 0 ;

         this .workerPool = new WorkerThread[workers];

         this .init();

     }

 

     private void init() {

         for ( int i = 0 ; i < workerPool.length; i++) {

             workerPool[i] = new WorkerThread( "Worker-" + i, this );

         }

     }

 

     /**

      * push switch to start all of worker to work

      */

     public void startWorker() {

         Arrays.asList(workerPool).forEach(WorkerThread::start);

 

//        List<WorkerThread> workerThreads = Arrays.asList(workerPool);

//

//        workerThreads.stream().forEach(WorkerThread::start);

     }

 

     public synchronized void put(Request request) {

         while (count >= requestQueue.length) {

             try {

                 this .wait();

             } catch (InterruptedException e) {

                 e.printStackTrace();

             }

         }

         this .requestQueue[tail] = request;

         this .tail = (tail + 1 ) % requestQueue.length;

         this .count++;

         this .notifyAll();

     }

 

     public synchronized Request take() {

         while (count <= 0 ) {

             try {

                 this .wait();

             } catch (InterruptedException e) {

                 e.printStackTrace();

             }

         }

         Request request = this .requestQueue[head];

         this .head = ( this .head + 1 ) % this .requestQueue.length;

         this .count--;

         this .notifyAll();

         return request;

     }

}

Requestqueue 可以当做桌子,是一个数量有限的请求队列。 threadPool 是一个工人线程的数组,这就是一个线程池。在这里提供了 putRequest 和 takeRequest 方法,分别是往请求队列放入请求和取出请,这里使用了上一篇博文讲到的生产者消费者模式 java 多线程设计模式之消费者生产者模式。确保了 WorkerThread 和 ClientThread 之间可以友好合作。

工人线程:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

public class WorkerThread extends Thread {

 

     private static final Random random = new Random(System.currentTimeMillis());

     private final Channel channel;

 

     public WorkerThread(String name, Channel channel) {

         super (name);

         this .channel = channel;

     }

 

     @Override

     public void run() {

         while ( true ) {

             channel.take().execute();

 

             try {

                 Thread.sleep(random.nextInt(1_000));

             } catch (InterruptedException e) {

                 e.printStackTrace();

             }

         }

     }

}

这里就是一个不断从请求队列中取出请求然后执行请求的过程,保证了工人线程的复用,并不会执行完一个请求任务就销毁。

最后是Main:

?

1

2

3

4

5

6

7

8

9

10

11

public class WorkerClient {

 

     public static void main(String[] args) {

         final Channel channel = new Channel( 5 );

         channel.startWorker();

 

         new ClientThread( "Alex" , channel).start();

         new ClientThread( "Jack" , channel).start();

         new ClientThread( "William" , channel).start();

     }

}

结果:

Worker-4 executed Request=> No.0  Name.Alex
Worker-2 executed Request=> No.0  Name.Jack
Worker-3 executed Request=> No.0  Name.William
Worker-4 executed Request=> No.1  Name.Jack
Worker-0 executed Request=> No.1  Name.William
Worker-3 executed Request=> No.2  Name.Jack
Worker-2 executed Request=> No.1  Name.Alex
Worker-4 executed Request=> No.2  Name.William
Worker-1 executed Request=> No.3  Name.Jack
Worker-3 executed Request=> No.2  Name.Alex
Worker-4 executed Request=> No.3  Name.William
Worker-0 executed Request=> No.4  Name.Jack
Worker-0 executed Request=> No.3  Name.Alex
Worker-1 executed Request=> No.5  Name.Jack
Worker-3 executed Request=> No.4  Name.William
Worker-1 executed Request=> No.6  Name.Jack
Worker-2 executed Request=> No.4  Name.Alex
Worker-3 executed Request=> No.7  Name.Jack
Worker-0 executed Request=> No.5  Name.William
Worker-1 executed Request=> No.5  Name.Alex
Worker-4 executed Request=> No.8  Name.Jack
Worker-2 executed Request=> No.6  Name.Alex
Worker-0 executed Request=> No.7  Name.Alex
Worker-4 executed Request=> No.8  Name.Alex
Worker-2 executed Request=> No.6  Name.William
省略...

可以看出线程执行任务的线程就是 WorkerThread1,2,3,4,5 五个,它们不断执行来自 ClientThread Alex , Jack , William 的请求任务。

到此这篇关于Java多线程之 Worker Thread 模式的文章就介绍到这了,更多相关Java多线程 Worker Thread内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://juejin.cn/post/7023986653987340319

查看更多关于Java多线程之Worker Thread模式的详细内容...

  阅读:17次