进程和线程的定义

  • 进程: 一个应用程序
  • 线程: 一个进程内的多个处理部分

每当启动一个进程,CPU会分配给进程一部分时间单元,进程内的线程可以调用本进程内的资源

eg.

对于Java程序来说,在DOS命令窗口中输入:

1
java HelloWorld

回车后,会先启动JVM, JVM就是一个进程,之后JVM在启动一个主线程调用main方法(main就是主线程)
同时启动垃圾回收线程复制看护,回收垃圾

最起码,Java程序至少有两个线程并发,一个是垃圾回收机制, 一个是main的主线程


线程的生命周期

  • 出生状态
  • 就绪状态
  • 执行状态
  • 阻塞状态
  • 等待状态
  • 死亡状态

线程的构造方法

实现线程主要是通过两种Java提供的方式, 分别为继承Java.lang.Thread类与实现Java,lang.Runnable接口

1. 继承Thread类

Thread类封装好了线程运作的基本操作,通过继承Thread类实现线程,只需要覆盖Thread类里面的run方法,之后调用start()方法可以启动线程。如果没有调用start方法,那么该对象只是一个实例,不是一个真正的线程

实际上,start方法调用了run方法,如果start方法调用一个启动了的线程,会抛出IllegalThreadStateException异常

1
2
3
public Thread(String name)
public Thread(Runnable target)
public Thread(Runnable target, String name)

2.实现Runnable接口

如果你的类继承自一个不是Thread的类,但是你又想启动一个新的线程,那么这时你需要实现Runnable接口去实现。实质上,Runnable是关联了一个Thread类,实现run接口相当于实现了Thread类的run方法,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class SwingandThread extends JFrame {
private JLabel = new JLabel();
private static Thread t;
private count = 0;
private Container container = getContentPane();

public SwingandThread() {
... // 写入一些方法
t = new Thread(new Runnable() {
public void run() {
...// 实现run
}
});
}
}

这样可以调用这个Thread, 注意道写接口的时候可以直接Lambda

3. 实现Callable接口

前两种线程创建方式都存在一个问题:

  • 假如线程执行完毕之后有一些数据需要放回,但是他们的run方法都不能返回接口
  • JDK 5.0提供了Callable接口和FutureTask类来实现(多线程的第三种创建方式)。
  • 最大的优点,可以放返回线程结束的结果
    futureTask本身是一个runnable类的对象,实现了runnable接口

步骤:

  • 定义类实现Callable接口,把Callable封装成FutureTask类
  • 用FutureTask创建线程
  • 调用进程
  • 通过FutureTask类得到执行结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.io.*;  
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;


public class Main {
public static void main(String[] args) {
Callable<String> t = new MyCallable(100);
FutureTask<String> s = new FutureTask<>(t);

Thread t1 = new Thread(s);
t1.start();

try {
// 如果线程还未执行完毕,会让出CPU等第一个线程执行完毕后往下执行。
System.out.println(s.get()); // 获得返回值
} catch (Exception e) {
e.printStackTrace();
}
}
}
class MyCallable implements Callable<String> {
private int n;
public MyCallable(int c) {
n = c;
} // 2.实现call,定义线程执行体
public String call() throws Exception {
int sum = 0;
for (int i = 1; i <= n; i++) {
sum += i;
}
return "子线程计算结果:" + sum;
}
}

操作线程的方法

  • run
  • start
  • String getName() : 默认是Thread-索引,主线程是”main “
  • setName(String name)
  • public static Thread currentThread() // 获取当前执行的线程对象
  • sleep()
  • public final void join().. : 让调用这个方法的线程先执行完

    1. 线程的休眠

一种能调用线程的方法是调用 sleep() 方法,传入一个参数(毫秒), 使得线程休眠

  • sleep语法
    1
    2
    3
    4
    5
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

2. 线程优先调用(线程插队)

让调用这个方法的线程先执行完

  • join
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import java.io.*;  
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;


public class Main {
public static void main(String[] args) {
Callable<String> t = new MyCallable(100);
FutureTask<String> s = new FutureTask<>(t);

Thread t1 = new Thread(s);
t1.start();

for (int i = 0; i < 5; i++) {
if (i == 1) {
try {
t1.join(); // 让t1跑完后再跑主线程
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "线程执行" + i);
}
}
}
class MyCallable implements Callable<String> {
private int n;
public MyCallable(int c) {
n = c;
} // 2.实现call,定义线程执行体
public String call() throws Exception {
int sum = 0;
for (int i = 1; i <= n; i++) {
sum += i;
}
return "子线程计算结果:" + sum;
}
}

线程安全*

多个线程,同时操作同一个贡献资源的时候,可能会出现业务安全问题

如 :银行里一个账户有10w,两个人同时去银行取钱,都取到10w,如果操作的太快,那两个人都拿到10w,银行就亏了10w元,这可太神秘。呵呵呵呵。

线程安全出现问题原因

  • 存在多个线程
  • 同时访问了一个共享资源
  • 存在修改该共享资源

解决线程问题 (很重要)

核心思想: 让多个线程先后依次访问共享资源,避免出现线程安全问题

常见方案: 加锁 : 每次只允许一个线程加锁, 加锁后才能进入访问,访问完毕后自动解锁,然后其他线程才能加锁进来

1. 同步代码块

  • synchronized
    1
    2
    3
    synchronized(同步锁) {
    访问共享资源核心代码
    }

注意道同步锁必须是唯一的一个对象(不能new, 不能new,不能new)

原理是, 调用后,把锁这一块内存占用,别的来了得等,等当前占用完毕后再交给下一个,延续之前取钱问题,模拟一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class account {  
private double money;
public void drawmoney(double money) {
String name = Thread.currentThread().getName();

synchronized (this) { //最好用this
if (this.money >= money) {
System.out.println("取钱成功");
this.money -= money;
System.out.println("当前账户剩余" + this.money);
} else {
System.out.println("余额不足");
}
}
}
}

锁对象选择的时候别乱选,最好是自己选择一些个新开匹配的唯一空间,或者是直接用对象的名字,或者对于实例是直接用this(推荐,且是高度面向对象)
对于静态方法,最好直接使用字节码(类名.class)对象作为锁对象。

2. 同步方法

  • 把访问共享资源的核心方法上锁,以此保证线程安全。(HashTable用的就是同步方法
1
2
3
修饰符 synchronized 返回值类型(形参列表) {
//...
}

同步方法底层原理:

  • 底层也是有隐式对象锁,锁的范围是整个方法代码
  • 如果方法是实例方法:同步方法默认用this作为锁的对象。
  • 如果是静态方法:同步方法默认用类名.class作为锁的对象

虽然性能差一点,但是安全

3. Lock锁

  • Lock锁是JDK5提供的锁操作,可以创建出一个锁对象进行加锁和解锁
  • Lock是接口,不能实例化,可以采用它的实现类ReentrantLock来构建Lock锁对象
1
2
// 构造器
public ReentrantLock() // 获得Lock锁的实现类对象
1
2
lock() //上锁
unlock() //解锁

继续之前例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.util.concurrent.locks.Lock;  
import java.util.concurrent.locks.ReentrantLock;

public class account {
private double money;
private final Lock lk = new ReentrantLock(); // 加上了final, 保证锁不会被撬(不然别人来个lk = null 哥们你不是炸了吗)
public void drawmoney(double money) {
String name = Thread.currentThread().getName();
lk.lock(); // 上锁

try {
if (this.money >= money) {
System.out.println("取钱成功");
this.money -= money;
System.out.println("当前账户剩余" + this.money);
} else {
System.out.println("余额不足");
}
} finally {
lk.unlock(); // 解锁
}
}
}


线程池(important)

线程池就是一个可以复用线程的技术。 如果不用线程池,那每次有新的线程,都会创建一个线程,那以及其消耗性能

工作线程
|
|
任务队列(只有runnable和callable接口)

1. 创建线程池

  • 使用ExecutorService的实现类ThreadPoolExecutor自创建一个线程池对象
  • 使用Excutors(线程池的工具类)调用方法返回不同特点的线程池对象

    ThreadPoolExrcutor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public ThreadPoolExecutor(
    int corePoolSize, // 核心线程池数量
    int maximunPoolSize, // 最大线程数量
    long keepAliveTime, // 临时线程存活时间
    TimeUnit unit, // 临时存活时间单位
    BlockingQueue<Runnable> workQueue, // 指定任务队列
    ThreadFactory threadFactort, // 指定线程工厂
    RejectedExecutionHandler handler //指定线程池的拒绝策略(太多了的处理方式)
    )
1
2
3
4
5
6
7
8
9
import java.util.concurrent.*;  

public class ThreadPool {
public static void main(String[] args) {
ExecutorService pool = new ThreadPoolExecutor(3, 5, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
}
}

处理任务

ExecutorService常用方法:

  • void execute(Runnable command) : 执行Runnable任务
  • Future<T> submit(Callable<T>) task : 执行Callable任务,返回任务对象
  • void shutdown() : 等全部任务执行完了之后关闭线程池
  • List<Runnable> shutdownNow() : 立马关闭,停止正在执行的任务,并返回队列中未执行的任务

通过Executors创建线程池

  • Executors是线程池的一个工具类,提供了很多静态方法返回不同的线程池对象

Java通过Executors工厂类提供四种线程池,分别为:

  • newCachedThreadPool :创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,否则新建线程。(线程最大并发数不可控制)
  • newFixedThreadPool:创建一个固定大小的线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • newScheduledThreadPool : 创建一个定时线程池,支持定时及周期性任务执行。
  • newSingleThreadExecutor :创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

我们先创建一个统一的线程任务,方便测试四种线程池

1
2
3
4
5
6
7
8
9
public class MyRunnable implements Runnable {

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is running...");
}

}

newSingleThreadExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class SingleThreadExecutorTest {

public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
MyRunnable myRunnable = new MyRunnable();
for (int i = 0; i < 5; i++) {
executorService.execute(myRunnable);
}

System.out.println("线程任务开始执行");
executorService.shutdown();
}

}


输出结果
1
2
3
4
5
6
7
线程任务开始执行
pool-1-thread-1 is running...
pool-1-thread-1 is running...
pool-1-thread-1 is running...
pool-1-thread-1 is running...
pool-1-thread-1 is running...


底层实现
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 核心线程池大小=1
* 最大线程池大小为1
* 线程过期时间为0ms
* LinkedBlockingQueue作为工作队列
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}


从参数可以看出来,SingleThreadExecutor 相当于特殊的 FixedThreadPool,它的执行流程如下:

线程池中没有线程时,新建一个线程执行任务
有一个线程以后,将任务加入阻塞队列,不停的加
唯一的这一个线程不停地去队列里取任务执行
SingleThreadExecutor 用于串行执行任务的场景,每个任务必须按顺序执行,不需要并发执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class FixedThreadPoolTest {

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
MyRunnable myRunnable = new MyRunnable();
for (int i = 0; i < 5; i++) {
executorService.execute(myRunnable);
}

System.out.println("线程任务开始执行");
executorService.shutdown();
}

}


输出结果
1
2
3
4
5
6
7
线程任务开始执行
pool-1-thread-1 is running...
pool-1-thread-1 is running...
pool-1-thread-2 is running...
pool-1-thread-1 is running...
pool-1-thread-2 is running...


底层实现
1
2
3
4
5
6
7
8
9
10
11
12
/**
* 核心线程池大小=传入参数
* 最大线程池大小为传入参数
* 线程过期时间为0ms
* LinkedBlockingQueue作为工作队列
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}


可以看到,FixedThreadPool 的核心线程数和最大线程数都是指定值,也就是说当线程池中的线程数超过核心线程数后,任务都会被放到阻塞队列中。

此外 keepAliveTime 为 0,也就是多余的空余线程会被立即终止(由于这里没有多余线程,这个参数也没什么意义了)。

而这里选用的阻塞队列是 LinkedBlockingQueue,使用的是默认容量 Integer.MAX_VALUE,相当于没有上限。

因此这个线程池执行任务的流程如下:

线程数少于核心线程数,也就是设置的线程数时,新建线程执行任务
线程数等于核心线程数后,将任务加入阻塞队列
由于队列容量非常大,可以一直加
执行完任务的线程反复去队列中取任务执行
FixedThreadPool 用于负载比较重的服务器,为了资源的合理利用,需要限制当前线程数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class CachedThreadPoolTest {

public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
MyRunnable myRunnable = new MyRunnable();
for (int i = 0; i < 5; i++) {
executorService.execute(myRunnable);
}

System.out.println("线程任务开始执行");
executorService.shutdown();
}

}


输出结果

1
2
3
4
5
6
7
线程任务开始执行
pool-1-thread-1 is running...
pool-1-thread-4 is running...
pool-1-thread-2 is running...
pool-1-thread-5 is running...
pool-1-thread-3 is running...

底层实现

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 核心线程池大小=0
* 最大线程池大小为Integer.MAX_VALUE
* 线程过期时间为60s
* 使用SynchronousQueue作为工作队列
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}


可以看到,CachedThreadPool 没有核心线程,非核心线程数无上限,也就是全部使用外包,但是每个外包空闲的时间只有 60 秒,超过后就会被回收。

CachedThreadPool 使用的队列是 SynchronousQueue,这个队列的作用就是传递任务,并不会保存。

因此当提交任务的速度大于处理任务的速度时,每次提交一个任务,就会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。

它的执行流程如下:

没有核心线程,直接向 SynchronousQueue 中提交任务
如果有空闲线程,就去取出任务执行;如果没有空闲线程,就新建一个
执行完任务的线程有 60 秒生存时间,如果在这个时间内可以接到新任务,就可以继续活下去,否则就拜拜
由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。
CachedThreadPool 用于并发执行大量短期的小任务,或者是负载较轻的服务器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ScheduledThreadPoolTest {

public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
MyRunnable myRunnable = new MyRunnable();
for (int i = 0; i < 5; i++) {
// 参数1:目标对象,参数2:隔多长时间开始执行线程,参数3:执行周期,参数4:时间单位
scheduledExecutorService.scheduleAtFixedRate(myRunnable, 1, 2, TimeUnit.SECONDS);
}

System.out.println("线程任务开始执行");
}

}


输出结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
线程任务开始执行
// 打印【线程任务开始执行】后1秒输出
pool-1-thread-1 is running...
pool-1-thread-2 is running...
pool-1-thread-1 is running...
pool-1-thread-3 is running...
pool-1-thread-2 is running...
// 2秒后输出
pool-1-thread-1 is running...
pool-1-thread-3 is running...
pool-1-thread-2 is running...
pool-1-thread-1 is running...
pool-1-thread-3 is running...


底层实现

1
2
3
4
5
6
7
8
9
10
11
/**
* 核心线程池大小=传入参数
* 最大线程池大小为Integer.MAX_VALUE
* 线程过期时间为0ms
* DelayedWorkQueue作为工作队列
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

ScheduledThreadPoolExecutor 的执行流程如下:

添加一个任务
线程池中的线程从 DelayQueue 中取任务
然后执行任务
具体执行任务的步骤也比较复杂:

线程从 DelayQueue 中获取 time 大于等于当前时间的 ScheduledFutureTask

执行完后修改这个 task 的 time 为下次被执行的时间

然后再把这个 task 放回队列中

ScheduledThreadPoolExecutor 用于需要多个后台线程执行周期任务,同时需要限制线程数量的场景。

Executors和ThreaPoolExecutor创建线程池的区别
Executors 各个方法的弊端:

newFixedThreadPool 和 newSingleThreadExecutor:
主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。
newCachedThreadPool 和 newScheduledThreadPool:
主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。
ThreaPoolExecutor

创建线程池方式只有一种,就是走它的构造函数,参数自己指定

大型并发系统中使用Executors如果不注意可能会出现系统风险。

线程池注意事项:

  • 什么时候创建临时线程?
    新任务提交时发现核心线程都在忙,任务队列也满了,并且可以创建临时线程,此时才会创建临时线程

  • 核心线程和临时线程都在忙,任务队列也满了,新的任务过来的时候才会拒绝任务

任务拒绝策略:

策略 说明
ThreadPoolExecutor.AbortPolicy() 丢弃任务并抛出RejectedExecutionExeception异常,是默认的策略
ThreadPoolExecutor.DiscardPolicy() 丢弃任务但是不抛出异常,不推荐
ThreadPoolExecutor.DiscardOldestPolicy() 丢弃队列中等待最久的任务,把当前任务加入队列
ThreadPoolExecutor.CallerRunsPolicy() 由主线程负责调用任务的run()方法从而绕过线程池直接执行

多线程实际案例与遇到的问题

写一个抢红包程序,一共200个红包,100个员工去抢,抢完了之后排序输出金额

Main:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.ArrayList;  
import java.util.List;

public class Main {
public static void main(String[] args) {
RedPool rd = new RedPool();
List<peo> p = new ArrayList<>();

System.out.println("总红包数: " + rd.B.size());

// 创建 100 个线程
for (int i = 1; i <= 100; i++) {
p.add(new peo(i, rd));
}
// 启动所有线程
for (peo thread : p) {
thread.start();
}
// 等待所有线程完成
for (peo thread : p) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 线程执行完毕后进行排序
p.sort((a, b) -> Double.compare(b.money, a.money)); // 从大到小排序

System.out.println("-------------游戏结束-------------");
for (peo person : p) {
System.out.println("用户 " + person.id + " 总共抢到了 " + person.money);
}
}
}

Peo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.Random;  
public class peo extends Thread {
public double money;
public final int id;
private final RedPool rd;
private final Random r = new Random();

public peo(int id, RedPool r) {
this.id = id;
this.money = 0;
this.rd = r;
}
@Override
public void run() {
while (true) {
RedPool.bao selectedBao;
synchronized (rd.B) {
if (rd.B.isEmpty()) {
break;
}
int idx = r.nextInt(rd.B.size()); // 随机选红包
selectedBao = rd.B.get(idx);
rd.B.remove(idx); // 只同步这个操作
} // 🔥 释放锁,让其他线程有机会进入

// 处理金额(这部分不需要锁)
this.money += selectedBao.money();
System.out.println("用户 " + this.id + " 抢到了 " + selectedBao.money());
}
}
}

注意道在处理加锁的时候,如果加锁的范围太大,比如加到了处理金额部分,那么会出现bug,某个用户拿到了第一次进入锁的机会,就会把所有的钱全部抢完(因为把所有逻辑结束后他又开始继续枪锁使得别的线程无法run)

RedPool:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.ArrayList;  
import java.util.List;
import java.util.Random;

public class RedPool {
private static final int TOT = 200;
private static int now = 0;
private static int small = 0;
public final List<bao> B = new ArrayList<>();
public volatile boolean f = false; // 解决可见性问题

public RedPool() {
pushr();
}
public void pushr() {
while (now < TOT) { // 用循环代替递归,防止栈溢出
now++;
int g = 1 + (int) (Math.random() * 2);
if (g == 1 && small < 160) {
B.add(new bao(now, 1 + Math.random() * 30));
small++; // small 需要增加,否则可能超出 160 限制
} else {
B.add(new bao(now, 31 + Math.random() * 70));
}
}
}
public record bao(int id, double money) {}
}