线程
进程和线程的定义
- 进程: 一个应用程序
- 线程: 一个进程内的多个处理部分
每当启动一个进程,CPU会分配给进程一部分时间单元,进程内的线程可以调用本进程内的资源
eg.
对于Java程序来说,在DOS命令窗口中输入:1
java HelloWorld
回车后,会先启动JVM, JVM就是一个进程,之后JVM在启动一个主线程调用main方法(main就是主线程)
同时启动垃圾回收线程复制看护,回收垃圾
最起码,Java程序至少有两个线程并发,一个是垃圾回收机制, 一个是main的主线程。
线程的生命周期
- 出生状态
- 就绪状态
- 执行状态
- 阻塞状态
- 等待状态
- 死亡状态
线程的构造方法
实现线程主要是通过两种Java提供的方式, 分别为继承Java.lang.Thread类与实现Java,lang.Runnable接口
1. 继承Thread类
Thread类封装好了线程运作的基本操作,通过继承Thread类实现线程,只需要覆盖Thread类里面的run方法,之后调用start()方法可以启动线程。如果没有调用start方法,那么该对象只是一个实例,不是一个真正的线程
实际上,start方法调用了run方法,如果start方法调用一个启动了的线程,会抛出IllegalThreadStateException异常
1 | public Thread(String name) |
2.实现Runnable接口
如果你的类继承自一个不是Thread的类,但是你又想启动一个新的线程,那么这时你需要实现Runnable接口去实现。实质上,Runnable是关联了一个Thread类,实现run接口相当于实现了Thread类的run方法,
1 | public class SwingandThread extends JFrame { |
这样可以调用这个Thread, 注意道写接口的时候可以直接Lambda
3. 实现Callable接口
前两种线程创建方式都存在一个问题:
- 假如线程执行完毕之后有一些数据需要放回,但是他们的run方法都不能返回接口
- JDK 5.0提供了Callable接口和FutureTask类来实现(多线程的第三种创建方式)。
- 最大的优点,可以放返回线程结束的结果
futureTask本身是一个runnable类的对象,实现了runnable接口
步骤:
- 定义类实现Callable接口,把Callable封装成FutureTask类
- 用FutureTask创建线程
- 调用进程
- 通过FutureTask类得到执行结果
1 | import java.io.*; |
操作线程的方法
- run
- start
- String getName() : 默认是Thread-索引,主线程是”main “
- setName(String name)
- public static Thread currentThread() // 获取当前执行的线程对象
- sleep()
- public final void join().. : 让调用这个方法的线程先执行完
1. 线程的休眠
一种能调用线程的方法是调用 sleep() 方法,传入一个参数(毫秒), 使得线程休眠
- sleep语法
1
2
3
4
5try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
2. 线程优先调用(线程插队)
让调用这个方法的线程先执行完
- join
1 | import java.io.*; |
线程安全*
多个线程,同时操作同一个贡献资源的时候,可能会出现业务安全问题
如 :银行里一个账户有10w,两个人同时去银行取钱,都取到10w,如果操作的太快,那两个人都拿到10w,银行就亏了10w元,这可太神秘。呵呵呵呵。
线程安全出现问题原因
- 存在多个线程
- 同时访问了一个共享资源
- 存在修改该共享资源
解决线程问题 (很重要)
核心思想: 让多个线程先后依次访问共享资源,避免出现线程安全问题
常见方案: 加锁 : 每次只允许一个线程加锁, 加锁后才能进入访问,访问完毕后自动解锁,然后其他线程才能加锁进来
1. 同步代码块
- synchronized
1
2
3synchronized(同步锁) {
访问共享资源核心代码
}
注意道同步锁必须是唯一的一个对象(不能new, 不能new,不能new)
原理是, 调用后,把锁这一块内存占用,别的来了得等,等当前占用完毕后再交给下一个,延续之前取钱问题,模拟一下:
1 | public class account { |
锁对象选择的时候别乱选,最好是自己选择一些个新开匹配的唯一空间,或者是直接用对象的名字,或者对于实例是直接用this(推荐,且是高度面向对象)
对于静态方法,最好直接使用字节码(类名.class)对象作为锁对象。
2. 同步方法
- 把访问共享资源的核心方法上锁,以此保证线程安全。(HashTable用的就是同步方法
1 | 修饰符 synchronized 返回值类型(形参列表) { |
同步方法底层原理:
- 底层也是有隐式对象锁,锁的范围是整个方法代码
- 如果方法是实例方法:同步方法默认用this作为锁的对象。
- 如果是静态方法:同步方法默认用类名.class作为锁的对象
虽然性能差一点,但是安全
3. Lock锁
- Lock锁是JDK5提供的锁操作,可以创建出一个锁对象进行加锁和解锁
- Lock是接口,不能实例化,可以采用它的实现类ReentrantLock来构建Lock锁对象
1 | // 构造器 |
1 | lock() //上锁 |
继续之前例子:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class account {
private double money;
private final Lock lk = new ReentrantLock(); // 加上了final, 保证锁不会被撬(不然别人来个lk = null 哥们你不是炸了吗)
public void drawmoney(double money) {
String name = Thread.currentThread().getName();
lk.lock(); // 上锁
try {
if (this.money >= money) {
System.out.println("取钱成功");
this.money -= money;
System.out.println("当前账户剩余" + this.money);
} else {
System.out.println("余额不足");
}
} finally {
lk.unlock(); // 解锁
}
}
}
线程池(important)
线程池就是一个可以复用线程的技术。 如果不用线程池,那每次有新的线程,都会创建一个线程,那以及其消耗性能
工作线程
|
|
任务队列(只有runnable和callable接口)
1. 创建线程池
- 使用ExecutorService的实现类ThreadPoolExecutor自创建一个线程池对象
使用Excutors(线程池的工具类)调用方法返回不同特点的线程池对象
ThreadPoolExrcutor
1
2
3
4
5
6
7
8
9public ThreadPoolExecutor(
int corePoolSize, // 核心线程池数量
int maximunPoolSize, // 最大线程数量
long keepAliveTime, // 临时线程存活时间
TimeUnit unit, // 临时存活时间单位
BlockingQueue<Runnable> workQueue, // 指定任务队列
ThreadFactory threadFactort, // 指定线程工厂
RejectedExecutionHandler handler //指定线程池的拒绝策略(太多了的处理方式)
)
1 | import java.util.concurrent.*; |
处理任务
ExecutorService常用方法:
- void execute(Runnable command) : 执行Runnable任务
Future<T> submit(Callable<T>) task: 执行Callable任务,返回任务对象- void shutdown() : 等全部任务执行完了之后关闭线程池
List<Runnable> shutdownNow(): 立马关闭,停止正在执行的任务,并返回队列中未执行的任务
通过Executors创建线程池
- Executors是线程池的一个工具类,提供了很多静态方法返回不同的线程池对象
Java通过Executors工厂类提供四种线程池,分别为:
- newCachedThreadPool :创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,否则新建线程。(线程最大并发数不可控制)
- newFixedThreadPool:创建一个固定大小的线程池,可控制线程最大并发数,超出的线程会在队列中等待。
- newScheduledThreadPool : 创建一个定时线程池,支持定时及周期性任务执行。
- newSingleThreadExecutor :创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
我们先创建一个统一的线程任务,方便测试四种线程池
1 | public class MyRunnable implements Runnable { |
newSingleThreadExecutor1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public class SingleThreadExecutorTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
MyRunnable myRunnable = new MyRunnable();
for (int i = 0; i < 5; i++) {
executorService.execute(myRunnable);
}
System.out.println("线程任务开始执行");
executorService.shutdown();
}
}
输出结果1
2
3
4
5
6
7线程任务开始执行
pool-1-thread-1 is running...
pool-1-thread-1 is running...
pool-1-thread-1 is running...
pool-1-thread-1 is running...
pool-1-thread-1 is running...
底层实现1
2
3
4
5
6
7
8
9
10
11
12
13/**
* 核心线程池大小=1
* 最大线程池大小为1
* 线程过期时间为0ms
* LinkedBlockingQueue作为工作队列
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
从参数可以看出来,SingleThreadExecutor 相当于特殊的 FixedThreadPool,它的执行流程如下:
线程池中没有线程时,新建一个线程执行任务
有一个线程以后,将任务加入阻塞队列,不停的加
唯一的这一个线程不停地去队列里取任务执行
SingleThreadExecutor 用于串行执行任务的场景,每个任务必须按顺序执行,不需要并发执行。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public class FixedThreadPoolTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
MyRunnable myRunnable = new MyRunnable();
for (int i = 0; i < 5; i++) {
executorService.execute(myRunnable);
}
System.out.println("线程任务开始执行");
executorService.shutdown();
}
}
输出结果1
2
3
4
5
6
7线程任务开始执行
pool-1-thread-1 is running...
pool-1-thread-1 is running...
pool-1-thread-2 is running...
pool-1-thread-1 is running...
pool-1-thread-2 is running...
底层实现1
2
3
4
5
6
7
8
9
10
11
12/**
* 核心线程池大小=传入参数
* 最大线程池大小为传入参数
* 线程过期时间为0ms
* LinkedBlockingQueue作为工作队列
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可以看到,FixedThreadPool 的核心线程数和最大线程数都是指定值,也就是说当线程池中的线程数超过核心线程数后,任务都会被放到阻塞队列中。
此外 keepAliveTime 为 0,也就是多余的空余线程会被立即终止(由于这里没有多余线程,这个参数也没什么意义了)。
而这里选用的阻塞队列是 LinkedBlockingQueue,使用的是默认容量 Integer.MAX_VALUE,相当于没有上限。
因此这个线程池执行任务的流程如下:
线程数少于核心线程数,也就是设置的线程数时,新建线程执行任务
线程数等于核心线程数后,将任务加入阻塞队列
由于队列容量非常大,可以一直加
执行完任务的线程反复去队列中取任务执行
FixedThreadPool 用于负载比较重的服务器,为了资源的合理利用,需要限制当前线程数量。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public class CachedThreadPoolTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
MyRunnable myRunnable = new MyRunnable();
for (int i = 0; i < 5; i++) {
executorService.execute(myRunnable);
}
System.out.println("线程任务开始执行");
executorService.shutdown();
}
}
输出结果
1 | 线程任务开始执行 |
底层实现1
2
3
4
5
6
7
8
9
10
11
12/**
* 核心线程池大小=0
* 最大线程池大小为Integer.MAX_VALUE
* 线程过期时间为60s
* 使用SynchronousQueue作为工作队列
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可以看到,CachedThreadPool 没有核心线程,非核心线程数无上限,也就是全部使用外包,但是每个外包空闲的时间只有 60 秒,超过后就会被回收。
CachedThreadPool 使用的队列是 SynchronousQueue,这个队列的作用就是传递任务,并不会保存。
因此当提交任务的速度大于处理任务的速度时,每次提交一个任务,就会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。
它的执行流程如下:
没有核心线程,直接向 SynchronousQueue 中提交任务
如果有空闲线程,就去取出任务执行;如果没有空闲线程,就新建一个
执行完任务的线程有 60 秒生存时间,如果在这个时间内可以接到新任务,就可以继续活下去,否则就拜拜
由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。
CachedThreadPool 用于并发执行大量短期的小任务,或者是负载较轻的服务器。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public class ScheduledThreadPoolTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
MyRunnable myRunnable = new MyRunnable();
for (int i = 0; i < 5; i++) {
// 参数1:目标对象,参数2:隔多长时间开始执行线程,参数3:执行周期,参数4:时间单位
scheduledExecutorService.scheduleAtFixedRate(myRunnable, 1, 2, TimeUnit.SECONDS);
}
System.out.println("线程任务开始执行");
}
}
输出结果1
2
3
4
5
6
7
8
9
10
11
12
13
14线程任务开始执行
// 打印【线程任务开始执行】后1秒输出
pool-1-thread-1 is running...
pool-1-thread-2 is running...
pool-1-thread-1 is running...
pool-1-thread-3 is running...
pool-1-thread-2 is running...
// 2秒后输出
pool-1-thread-1 is running...
pool-1-thread-3 is running...
pool-1-thread-2 is running...
pool-1-thread-1 is running...
pool-1-thread-3 is running...
底层实现
1 | /** |
ScheduledThreadPoolExecutor 的执行流程如下:
添加一个任务
线程池中的线程从 DelayQueue 中取任务
然后执行任务
具体执行任务的步骤也比较复杂:
线程从 DelayQueue 中获取 time 大于等于当前时间的 ScheduledFutureTask
执行完后修改这个 task 的 time 为下次被执行的时间
然后再把这个 task 放回队列中
ScheduledThreadPoolExecutor 用于需要多个后台线程执行周期任务,同时需要限制线程数量的场景。
Executors和ThreaPoolExecutor创建线程池的区别
Executors 各个方法的弊端:
newFixedThreadPool 和 newSingleThreadExecutor:
主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。
newCachedThreadPool 和 newScheduledThreadPool:
主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。
ThreaPoolExecutor
创建线程池方式只有一种,就是走它的构造函数,参数自己指定
大型并发系统中使用Executors如果不注意可能会出现系统风险。
线程池注意事项:
什么时候创建临时线程?
新任务提交时发现核心线程都在忙,任务队列也满了,并且可以创建临时线程,此时才会创建临时线程核心线程和临时线程都在忙,任务队列也满了,新的任务过来的时候才会拒绝任务
任务拒绝策略:
| 策略 | 说明 |
|---|---|
| ThreadPoolExecutor.AbortPolicy() | 丢弃任务并抛出RejectedExecutionExeception异常,是默认的策略 |
| ThreadPoolExecutor.DiscardPolicy() | 丢弃任务但是不抛出异常,不推荐 |
| ThreadPoolExecutor.DiscardOldestPolicy() | 丢弃队列中等待最久的任务,把当前任务加入队列 |
| ThreadPoolExecutor.CallerRunsPolicy() | 由主线程负责调用任务的run()方法从而绕过线程池直接执行 |
多线程实际案例与遇到的问题
写一个抢红包程序,一共200个红包,100个员工去抢,抢完了之后排序输出金额
Main:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35import java.util.ArrayList;
import java.util.List;
public class Main {
public static void main(String[] args) {
RedPool rd = new RedPool();
List<peo> p = new ArrayList<>();
System.out.println("总红包数: " + rd.B.size());
// 创建 100 个线程
for (int i = 1; i <= 100; i++) {
p.add(new peo(i, rd));
}
// 启动所有线程
for (peo thread : p) {
thread.start();
}
// 等待所有线程完成
for (peo thread : p) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 线程执行完毕后进行排序
p.sort((a, b) -> Double.compare(b.money, a.money)); // 从大到小排序
System.out.println("-------------游戏结束-------------");
for (peo person : p) {
System.out.println("用户 " + person.id + " 总共抢到了 " + person.money);
}
}
}
Peo:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31import java.util.Random;
public class peo extends Thread {
public double money;
public final int id;
private final RedPool rd;
private final Random r = new Random();
public peo(int id, RedPool r) {
this.id = id;
this.money = 0;
this.rd = r;
}
public void run() {
while (true) {
RedPool.bao selectedBao;
synchronized (rd.B) {
if (rd.B.isEmpty()) {
break;
}
int idx = r.nextInt(rd.B.size()); // 随机选红包
selectedBao = rd.B.get(idx);
rd.B.remove(idx); // 只同步这个操作
} // 🔥 释放锁,让其他线程有机会进入
// 处理金额(这部分不需要锁)
this.money += selectedBao.money();
System.out.println("用户 " + this.id + " 抢到了 " + selectedBao.money());
}
}
}
注意道在处理加锁的时候,如果加锁的范围太大,比如加到了处理金额部分,那么会出现bug,某个用户拿到了第一次进入锁的机会,就会把所有的钱全部抢完(因为把所有逻辑结束后他又开始继续枪锁使得别的线程无法run)
RedPool:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class RedPool {
private static final int TOT = 200;
private static int now = 0;
private static int small = 0;
public final List<bao> B = new ArrayList<>();
public volatile boolean f = false; // 解决可见性问题
public RedPool() {
pushr();
}
public void pushr() {
while (now < TOT) { // 用循环代替递归,防止栈溢出
now++;
int g = 1 + (int) (Math.random() * 2);
if (g == 1 && small < 160) {
B.add(new bao(now, 1 + Math.random() * 30));
small++; // small 需要增加,否则可能超出 160 限制
} else {
B.add(new bao(now, 31 + Math.random() * 70));
}
}
}
public record bao(int id, double money) {}
}
