线程池
线程池结构框图
线程池的创建
5种常见的线程池生成方法,通过使用Executors工具类
ExecutorService threadPool = Executors.newFixedThreadPool(5); //一池5个处理线程
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); //一池一个处理线程
ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); //一池N个线程
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);//创建拥有固定线程数量的定时线程任务的线程池
ExecutorService executorService = Executors.newWorkStealingPool(); //创建ForJoin线程池
线程池的执行
按有无返回值可以有两种方式操作
- 使用Runnable接口(无返回值)
static class MyThread extends Thread{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"/t"+" 正在执行");
}
}
ExecutorService es = Executors.newFixedThreadPool(2);
MyThread t1 = new MyThread();
MyThread t2 = new MyThread();
MyThread t3 = new MyThread();
MyThread t4 = new MyThread();
MyThread t5 = new MyThread();
es.execute(t1);
es.execute(t2);
es.execute(t3);
es.execute(t4);
- 使用Callable接口
HashSet<Callable<String>> callables = new HashSet<>();
callables.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("callable 1 -------------");
return "1";
}
});
callables.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("callable 2 -------------");
return "2";
}
});
callables.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("callable 3 -------------");
//int b = 1/0;
return "3";
}
});
callables.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("callable 4 -------------");
return "4";
}
});
try {
List<Future<String>> futures = es.invokeAll(callables);
for (Future f : futures){
System.out.println(f.get());
}
es.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
高级线程池:ForkJoinPool
并发线程处理任务,分为两步:1. fork() 2.join()
public class TempTest {
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24);
forkJoinPool.invoke(myRecursiveAction);
}
static class MyRecursiveAction extends RecursiveAction{
private long workLoad = 0;
public MyRecursiveAction(long a){
workLoad = a;
}
@Override
//好像compute方法一般都是一个if-else来操作
protected void compute() {
//如果工作超过门槛,把任务分解成更小的任务
if(this.workLoad > 16){
System.out.println("Splitting workLoad : " + this.workLoad);
/*ArrayList<MyRecursiveAction> subTasks = new ArrayList<>();
subTasks.addAll(createSubtasks());
for (MyRecursiveAction subTask : subTasks){
subTask.fork();
}*/
//两步走战略:
//1. 分一个较小的任务出来
//2. 把分出来的子任务进行fork操作
MyRecursiveAction sub1 = new MyRecursiveAction(this.workLoad / 2);
MyRecursiveAction sub2 = new MyRecursiveAction(this.workLoad / 2);
sub1.fork();
sub2.fork();
}else {
System.out.println("Doing workLoad myself: " + this.workLoad);
}
}
public List<MyRecursiveAction> createSubtasks(){
ArrayList<MyRecursiveAction> subTasks = new ArrayList<>();
MyRecursiveAction sub1 = new MyRecursiveAction(this.workLoad / 2);
MyRecursiveAction sub2 = new MyRecursiveAction(this.workLoad / 2);
subTasks.add(sub1);
subTasks.add(sub2);
return subTasks;
}
}
}
- 多个并发子任务的情况:使用List
public class TempTest {
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
MyRecursiveTask myRecursiveAction = new MyRecursiveTask(124);
Long invoke = forkJoinPool.invoke(myRecursiveAction);
System.out.println(invoke);
}
static class MyRecursiveTask extends RecursiveTask<Long>{
private long workLoad = 0;
public MyRecursiveTask(long a){
workLoad = a;
}
public List<MyRecursiveTask> createSubtasks(){
ArrayList<MyRecursiveTask> subTasks = new ArrayList<>();
MyRecursiveTask sub1 = new MyRecursiveTask(this.workLoad / 2);
MyRecursiveTask sub2 = new MyRecursiveTask(this.workLoad / 2);
subTasks.add(sub1);
subTasks.add(sub2);
return subTasks;
}
@Override
protected Long compute() {
//如果工作超过门槛,把任务分解成更小的任务
if(this.workLoad > 16){
System.out.println("Splitting workLoad : " + this.workLoad);
//分解子任务的操作
ArrayList<MyRecursiveTask> subTasks = new ArrayList<>();
subTasks.addAll(createSubtasks());
for (MyRecursiveTask subTask : subTasks){
subTask.fork();
}
//合并子任务的操作
long result = 0;
for(MyRecursiveTask subtask : subTasks) {
result += subtask.join();
}
return result;
}else {
//细化执行的步骤
System.out.println("Doing workLoad myself: " + this.workLoad);
return 3 * workLoad;
}
}
}
}
小练习(阻塞队列)
在多线程操作下,一个数组中最多只能存入 1 个元素。多放入不可以存入数组,
或等待某线程对数组中某个元素取走才能放入,要求使用java的多线程来实现。
public class TempTest {
public static void main(String[] args) {
final SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<Integer>();
new Thread(() -> {
while (true) {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("put start");
synchronousQueue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("put end");
}
}).start();
new Thread(() -> {
while (true) {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("take start");
synchronousQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("take end");
}
}).start();
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/202506.html