参考视频:黑马学成在线项目
一 分布式任务处理
1.1 什么是分布式任务调度
对一个视频的转码可以理解为一个任务的执行,如果视频的数量比较多,如何去高效处理一批任务呢?
1、多线程
多线程是充分利用单机的资源。
2、分布式加多线程
充分利用多台计算机,每台计算机使用多线程处理。
方案2可扩展性更强。
方案2是一种分布式任务调度的处理方案。
什么是分布式任务调度?
我们可以先思考一下下面业务场景的解决方案:
每隔24小时执行数据备份任务。
12306网站会根据车次不同,设置几个时间点分批次放票。
某财务系统需要在每天上午10点前结算前一天的账单数据,统计汇总。
商品成功发货后,需要向客户发送短信提醒。
类似的场景还有很多,我们该如何实现?
多线程方式实现:
学过多线程的同学,可能会想到,我们可以开启一个线程,每sleep一段时间,就去检查是否已到预期执行时间。
以下代码简单实现了任务调度的功能:
public static void main(String[] args) {
//任务执行间隔时间
final long timeInterval = 1000;
Runnable runnable = new Runnable() {
public void run() {
while (true) {
//TODO:something
try {
Thread.sleep(timeInterval);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Thread thread = new Thread(runnable);
thread.start();
}上面的代码实现了按一定的间隔时间执行任务调度的功能。
Jdk也为我们提供了相关支持,如Timer、ScheduledExecutor,下边我们了解下。
Timer方式实现:
public static void main(String[] args){
Timer timer = new Timer();
timer.schedule(new TimerTask(){
@Override
public void run() {
//TODO:something
}
}, 1000, 2000); //1秒后开始调度,每2秒执行一次
}- Timer 的优点在于简单易用,每个Timer对应一个线程,因此可以同时启动多个Timer并行执行多个任务,同一个Timer中的任务是串行执行。
ScheduledExecutor方式实现:
public static void main(String [] agrs){
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
service.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
//TODO:something
System.out.println("todo something");
}
}, 1,
2, TimeUnit.SECONDS);
}Java 5 推出了基于线程池设计的 ScheduledExecutor,其设计思想是,每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰。
Timer 和 ScheduledExecutor 都仅能提供基于开始时间与重复间隔的任务调度,不能胜任更加复杂的调度需求。比如,设置每月第一天凌晨1点执行任务、复杂调度任务的管理、任务间传递数据等等。
第三方Quartz方式实现,项目地址:https://github.com/quartz-scheduler/quartz
- Quartz 是一个功能强大的任务调度框架,它可以满足更多更复杂的调度需求,Quartz 设计的核心类包括 Scheduler, Job 以及 Trigger。其中,Job 负责定义需要执行的任务,Trigger 负责设置调度策略,Scheduler 将二者组装在一起,并触发任务开始执行。Quartz支持简单的按时间间隔调度、还支持按日历调度方式,通过设置CronTrigger表达式(包括:秒、分、时、日、月、周、年)进行任务调度。
下边是一个例子代码:
public static void main(String [] agrs) throws SchedulerException {
//创建一个Scheduler
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler scheduler = schedulerFactory.getScheduler();
//创建JobDetail
JobBuilder jobDetailBuilder = JobBuilder.newJob(MyJob.class);
jobDetailBuilder.withIdentity("jobName","jobGroupName");
JobDetail jobDetail = jobDetailBuilder.build();
//创建触发的CronTrigger 支持按日历调度
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity("triggerName", "triggerGroupName")
.startNow()
.withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?"))
.build();
scheduler.scheduleJob(jobDetail,trigger);
scheduler.start();
}
public class MyJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext){
System.out.println("todo something");
}
}通过以上内容我们学习了什么是任务调度,任务调度所解决的问题,以及任务调度的多种实现方式。
任务调度顾名思义,就是对任务的调度,它是指系统为了完成特定业务,基于给定时间点,给定时间间隔或者给定执行次数自动执行任务。
什么是分布式任务调度?
- 通常任务调度的程序是集成在应用中的,比如:优惠卷服务中包括了定时发放优惠卷的的调度程序,结算服务中包括了定期生成报表的任务调度程序,由于采用分布式架构,一个服务往往会部署多个冗余实例来运行我们的业务,在这种分布式系统环境下运行任务调度,我们称之为分布式任务调度,如下图:

分布式调度要实现的目标:
- 不管是任务调度程序集成在应用程序中,还是单独构建的任务调度系统,如果采用分布式调度任务的方式就相当于将任务调度程序分布式构建,这样就可以具有分布式系统的特点,并且提高任务的调度处理能力:
1、并行任务调度
并行任务调度实现靠多线程,如果有大量任务需要调度,此时光靠多线程就会有瓶颈了,因为一台计算机CPU的处理能力是有限的。
如果将任务调度程序分布式部署,每个结点还可以部署为集群,这样就可以让多台计算机共同去完成任务调度,我们可以将任务分割为若干个分片,由不同的实例并行执行,来提高任务调度的处理效率。
2、高可用
- 若某一个实例宕机,不影响其他实例来执行任务。
3、弹性扩容
- 当集群中增加实例就可以提高并执行任务的处理效率。
4、任务管理与监测
- 对系统中存在的所有定时任务进行统一的管理及监测。让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应。
5、避免任务重复执行
- 当任务调度以集群方式部署,同一个任务调度可能会执行多次,比如在上面提到的电商系统中到点发优惠券的例子,就会发放多次优惠券,对公司造成很多损失,所以我们需要控制相同的任务在多个运行实例上只执行一次。
1.2 XXL-JOB介绍
XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
官网:https://www.xuxueli.com/xxl-job/
文档:https://www.xuxueli.com/xxl-job/#《分布式任务调度平台XXL-JOB》
XXL-JOB主要有调度中心、执行器、任务:

调度中心:
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码;
主要职责为执行器管理、任务管理、监控运维、日志管理等
任务执行器:
负责接收调度请求并执行任务逻辑;
只要职责是注册服务、任务执行服务(接收到任务后会放入线程池中的任务队列)、执行结果上报、日志服务等
**任务:**负责执行具体的业务处理。
调度中心与执行器之间的工作流程如下:

执行流程:
- 1.任务执行器根据配置的调度中心的地址,自动注册到调度中心
- 2.达到任务触发条件,调度中心下发任务
- 3.执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中
- 4.执行器消费内存队列中的执行结果,主动上报给调度中心
- 5.当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情
二 任务调度实战—视频处理
2.1 视频转码需求
2.1.1 什么是视频编码
视频上传成功后需要对视频进行转码处理。
什么是视频编码?查阅百度百科如下:

详情参考 :https://baike.baidu.com/item/%E8%A7%86%E9%A2%91%E7%BC%96%E7%A0%81/839038
首先我们要分清文件格式和编码格式:
文件格式:是指.mp4、.avi、.rmvb等 这些不同扩展名的视频文件的文件格式 ,视频文件的内容主要包括视频和音频,其文件格式是按照一 定的编码格式去编码,并且按照该文件所规定的封装格式将视频、音频、字幕等信息封装在一起,播放器会根据它们的封装格式去提取出编码,然后由播放器解码,最终播放音视频。
音视频编码格式:通过音视频的压缩技术,将视频格式转换成另一种视频格式,通过视频编码实现流媒体的传输。比如:一个.avi的视频文件原来的编码是a,通过编码后编码格式变为b,音频原来为c,通过编码后变为d。
音视频编码格式各类繁多,主要有几下几类:
MPEG系列
(由ISO[国际标准组织机构]下属的MPEG[运动图象专家组]开发 )视频编码方面主要是Mpeg1(vcd用的就是它)、Mpeg2(DVD使用)、Mpeg4(的DVDRIP使用的都是它的变种,如:divx,xvid等)、Mpeg4 AVC(正热门);音频编码方面主要是MPEG Audio Layer 1/2、MPEG Audio Layer 3(大名鼎鼎的mp3)、MPEG-2 AAC 、MPEG-4 AAC等等。注意:DVD音频没有采用Mpeg的。
H.26X系列
(由ITU[国际电传视讯联盟]主导,侧重网络传输,注意:只是视频编码)
包括
H.261、H.262、H.263、H.263+、H.263++、H.264(就是MPEG4 AVC-合作的结晶)目前最常用的编码标准是视频
H.264,音频AAC。
提问:
H.264是编码格式还是文件格式?mp4是编码格式还是文件格式?
2.1.2 FFmpeg 的基本使用
我们将视频录制完成后,使用视频编码软件对视频进行编码,本项目 使用FFmpeg对视频进行编码 。

FFmpeg被许多开源项目采用,QQ影音、暴风影音、VLC等。
下载:FFmpeg https://www.ffmpeg.org/download.html#build-windows
请从常用工具软件目录找到ffmpeg.exe,并将ffmpeg.exe加入环境变量path中。
测试是否正常:cmd运行
ffmpeg -version

安装成功,作下简单测试
将一个.avi文件转成mp4、mp3、gif等。
比如我们将nacos.avi文件转成mp4,运行如下命令:
D:\soft\ffmpeg\ffmpeg.exe -i 1.avi 1.mp4可以将ffmpeg.exe配置到环境变量path中,进入视频目录直接运行:
ffmpeg.exe -i 1.avi 1.mp4转成mp3:
ffmpeg -i nacos.avi nacos.mp3转成gif:
ffmpeg -i nacos.avi nacos.gif官方文档(英文):
http://ffmpeg.org/ffmpeg.html
2.1.3 视频处理工具类
将课程资料的工具类中的util拷贝至base工程。

其中Mp4VideoUtil类是用于将视频转为mp4格式,是我们项目要使用的工具类。
下边看下这个类的代码,并进行测试。
我们要通过ffmpeg对视频转码,Java程序调用ffmpeg,使用java.lang.ProcessBuilder去完成,具体在Mp4VideoUtil类的63行,下边进行简单的测试,下边的代码运行本机安装的QQ软件。
ProcessBuilder builder = new ProcessBuilder();
builder.command("C:\\Program Files (x86)\\Tencent\\QQ\\Bin\\QQScLauncher.exe");
//将标准输入流和错误输入流合并,通过标准输入流程读取信息
builder.redirectErrorStream(true);
Process p = builder.start();对Mp4VideoUtil类需要学习使用方法,下边代码将一个avi视频转为mp4视频,如下:
public static void main(String[] args) throws IOException {
//ffmpeg的路径
String ffmpeg_path = "D:\\soft\\ffmpeg\\ffmpeg.exe";//ffmpeg的安装位置
//源avi视频的路径
String video_path = "D:\\develop\\bigfile_test\\nacos01.avi";
//转换后mp4文件的名称
String mp4_name = "nacos01.mp4";
//转换后mp4文件的路径
String mp4_path = "D:\\develop\\bigfile_test\\nacos01.mp4";
//创建工具类对象
Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path,video_path,mp4_name,mp4_path);
//开始视频转换,成功将返回success
String s = videoUtil.generateMp4();
System.out.println(s);
}执行main方法,最终在控制台输出 success 表示执行成功。
2.2 搭建XXL-JOB
2.2.1 调度中心
1)创建挂载目录
mkdir -p -m 777 /mydata/xxl-job/{logs,conf}conf 目录下创建 application.properties 文件。
### web
server.port=8088
server.servlet.context-path=/xxl-job-admin
### actuator
management.server.servlet.context-path=/actuator
management.health.mail.enabled=false
### resources
spring.mvc.servlet.load-on-startup=0
spring.mvc.static-path-pattern=/static/**
spring.resources.static-locations=classpath:/static/
### freemarker
spring.freemarker.templateLoaderPath=classpath:/templates/
spring.freemarker.suffix=.ftl
spring.freemarker.charset=UTF-8
spring.freemarker.request-context-attribute=request
spring.freemarker.settings.number_format=0.##########
### mybatis
mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml
#mybatis.type-aliases-package=com.xxl.job.admin.core.model
###更改配置文件连接mysql信息
### xxl-job, datasource
spring.datasource.url=jdbc:mysql://dockermysql:3306/xxl_job?characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false&autoReconnect=true ###ip
spring.datasource.username=root
spring.datasource.password=1234
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
### datasource-pool
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=10
spring.datasource.hikari.maximum-pool-size=30
spring.datasource.hikari.auto-commit=true
spring.datasource.hikari.idle-timeout=30000
spring.datasource.hikari.pool-name=HikariCP
spring.datasource.hikari.max-lifetime=900000
spring.datasource.hikari.connection-timeout=10000
spring.datasource.hikari.connection-test-query=SELECT 1
spring.datasource.hikari.validation-timeout=1000
### xxl-job, email
spring.mail.host=smtp.qq.com
spring.mail.port=25
spring.mail.username=xxx@qq.com
spring.mail.from=xxx@qq.com
spring.mail.password=xxx
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory
### xxl-job, access token
xxl.job.accessToken=default_token
### xxl-job, i18n (default is zh_CN, and you can choose "zh_CN", "zh_TC" and "en")
xxl.job.i18n=zh_CN
## xxl-job, triggerpool max size
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100
### xxl-job, log retention days
xxl.job.logretentiondays=302)下载XXL-JOB
GitHub:https://github.com/xuxueli/xxl-job
码云:https://gitee.com/xuxueli0323/xxl-job
项目使用2.3.1版本: https://github.com/xuxueli/xxl-job/releases/tag/2.3.1
也可从课程资料目录获取,解压xxl-job-2.3.1.zip
使用IDEA打开解压后的目录

xxl-job-admin:调度中心
xxl-job-core:公共依赖
xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用)
:xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
:xxl-job-executor-sample-frameless:无框架版本;
doc :文档资料,包含数据库脚本
在下发的虚拟机的MySQL中已经创建了xxl_job_2.3.1数据库

如下图:

执行sh /data/soft/restart.sh自动启动xxl-job调度中心
3)创建容器
docker run -d \
-p 8088:8080 \
--name=xxl-job-admin \
--link mysql:dockermysql \
--restart=always \
-v ~/docker/software/xxl-job/conf/application.properties:/application.properties \
-e PARAMS='--spring.config.location=/application.properties' \
xuxueli/xxl-job-admin:2.3.1--link:请确保 Docker 中运行着名为 mysql 的容器。连接上 MySQL 容器后,实现网络互通。
4)访问后台
访问:http://192.168.2.203:8088/xxl-job-admin/
账号和密码:admin/123456
如果无法使用虚拟机运行xxl-job可以在本机idea运行xxl-job调度中心。
2.2.2 执行器
下边配置执行器,执行器负责与调度中心通信接收调度中心发起的任务调度请求。
1、下边进入调度中心添加执行器

点击新增,填写执行器信息,appname是前边在nacos中配置xxl信息时指定的执行器的应用名。

添加成功:

2、首先在媒资管理模块的service工程添加依赖,在项目的父工程已约定了版本2.3.1
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
</dependency>3、在nacos下的media-service-dev.yaml下配置xxl-job
xxl:
job:
admin:
addresses: http://192.168.2.203:8088/xxl-job-admin
executor:
appname: media-process-service
address:
ip:
port: 9999
logpath: /data/applogs/xxl-job/jobhandler
logretentiondays: 30
accessToken: default_token注意配置中的appname这是执行器的应用名,port是执行器启动的端口,如果本地启动多个执行器注意端口不能重复。
4、配置xxl-job的执行器
将xxl-job示例工程下配置类拷贝到媒资管理的service工程下

/**
* @author Klaus
* @date 2023/06/24 1:52
* @description TODO
*/
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/
}到此完成媒资管理模块service工程配置xxl-job执行器,在xxl-job调度中心添加执行器,下边准备测试执行器与调度中心是否正常通信,因为接口工程依赖了service工程,所以启动媒资管理模块的接口工程。
启动后观察日志,出现下边的日志表示执行器在调度中心注册成功

同时观察调度中心中的执行器界面

在线机器地址处已显示1个执行器。
2.2.3 执行任务
下边编写任务,参考示例工程中任务类的编写方法
在媒资服务service包下新建jobhandler存放任务类,下边参考示例工程编写一个任务类
/**
* @description 测试执行器
*/
@Component
@Slf4j
public class SampleJob {
/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("testJob")
public void testJob() throws Exception {
log.info("开始执行.....");
}
}下边在调度中心添加任务,进入任务管理

点击新增,填写任务信息

注意红色标记处:
调度类型:
固定速度指按固定的间隔定时调度。
Cron,通过Cron表达式实现更丰富的定时调度策略。
Cron表达式是一个字符串,通过它可以定义调度策略,格式如下:
{秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)}
xxl-job提供图形界面去配置:

一些例子如下:
30 10 1 * * ?每天1点10分30秒触发
0/30 * * * * ?每30秒触发一次
\* 0/10 * * * ?每10分钟触发一次运行模式有BEAN和GLUE,bean模式较常用就是在项目工程中编写执行器的任务代码,GLUE是将任务代码编写在调度中心。
JobHandler即任务方法名,填写任务方法上边@XxlJob注解中的名称。
路由策略:当执行器集群部署时,调度中心向哪个执行器下发任务,这里选择第一个表示只向第一个执行器下发任务,路由策略的其它选项稍后在分片广播章节详细解释。
高级配置的其它配置项稍后在分片广播章节详细解释。
添加成功,启动任务

通过调度日志查看任务执行情况

下边启动媒资管理的service工程,启动执行器。
观察执行器方法的执行。

如果要停止任务需要在调度中心操作

任务跑一段时间注意清理日志

2.3 分片广播
掌握了xxl-job的基本使用,下边思考如何进行分布式任务处理呢?如下图,我们会启动多个执行器组成一个集群,去执行任务。

执行器在集群部署下调度中心有哪些路由策略呢?
查看xxl-job官方文档,阅读高级配置相关的内容:
高级配置: - 路由策略:当执行器集群部署时,提供丰富的路由策略,包括; FIRST(第一个):固定选择第一个机器; LAST(最后一个):固定选择最后一个机器; ROUND(轮询):; RANDOM(随机):随机选择在线的机器; CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。 LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举; LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举; FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度; BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度; SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
- 子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度,通过子任务可以实现一个任务执行完成去执行另一个任务。 - 调度过期策略: - 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间; - 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间; - 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略; 单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行; 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败; 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务; - 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务; - 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;
下边要重点说的是分片广播策略,分片是指是调度中心以执行器为维度进行分片,将集群中的执行器标上序号:0,1,2,3…,广播是指每次调度会向集群中的所有执行器发送任务调度,请求中携带分片参数。
如下图:

每个执行器收到调度请求同时接收分片参数。
xxl-job支持动态扩容执行器集群从而动态增加分片数量,当有任务量增加可以部署更多的执行器到集群中,调度中心会动态修改分片的数量。
作业分片适用哪些场景呢?
• 分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;
• 广播任务场景:广播执行器同时运行shell脚本、广播集群节点进行缓存更新等。
所以,广播分片方式不仅可以充分发挥每个执行器的能力,并且根据分片参数可以控制任务是否执行,最终灵活控制了执行器集群分布式处理任务。
使用说明:
"分片广播" 和普通任务开发流程一致,不同之处在于可以获取分片参数进行分片业务处理。
Java语言任务获取分片参数方式:
BEAN、GLUE模式(Java),可参考Sample示例执行器中的示例任务"ShardingJobHandler":
/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {
// 分片序号,从0开始
int shardIndex = XxlJobHelper.getShardIndex();
// 分片总数
int shardTotal = XxlJobHelper.getShardTotal();
....下边测试作业分片:
1、定义作业分片的任务方法
/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {
// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
log.info("开始执行第"+shardIndex+"批任务");
}2、在调度中心添加任务

添加成功:

启动任务,观察日志

下边启动两个执行器实例,观察每个实例的执行情况
首先在nacos中配置media-service的本地优先配置:
#配置本地优先
spring:
cloud:
config:
override-none: true将media-service启动两个实例
两个实例的在启动时注意端口不能冲突:
实例1 在VM options处添加:-Dserver.port=63051 -Dxxl.job.executor.port=9998
实例2 在VM options处添加:-Dserver.port=63050 -Dxxl.job.executor.port=9999
例如:

启动两个实例
观察任务调度中心,稍等片刻执行器有两个

观察两个执行实例的日志:

另一实例的日志如下:

从日志可以看每个实例的分片序号不同。
如果其中一个执行器挂掉,只剩下一个执行器在工作,稍等片刻调用中心发现少了一个执行器将动态调整总分片数为1。
到此作业分片任务调试完成,此时我们可以思考:
当一次分片广播到来,各执行器如何根据分片参数去分布式执行任务,保证执行器之间执行的任务不重复呢?
2.4 技术方案
2.4.1 作业分片方案
掌握了xxl-job的分片广播调度方式,下边思考如何分布式去执行学成在线平台中的视频处理任务。
任务添加成功后,对于要处理的任务会添加到待处理任务表中,现在启动多个执行器实例去查询这些待处理任务,此时如何保证多个执行器不会查询到重复的任务呢?
XXL-JOB并不直接提供数据处理的功能,它只会给执行器分配好分片序号,在向执行器任务调度的同时下发分片总数以及分片序号等参数,执行器收到这些参数根据自己的业务需求去利用这些参数。
下图表示了多个执行器获取视频处理任务的结构:

每个执行器收到广播任务有两个参数:分片总数、分片序号。每个执行从数据表取任务时可以让任务id 模上 分片总数,如果等于分片序号则执行此任务。
上边两个执行器实例那么分片总数为2,序号为0、1,从任务1开始,如下:
1 % 2 = 1 执行器2执行
2 % 2 = 0 执行器1执行
3 % 2 = 1 执行器2执行
以此类推.
2.4.2 保证任务不重复执行
通过作业分片方案保证了执行器之间查询到不重复的任务,如果一个执行器在处理一个视频还没有完成,此时调度中心又一次请求调度,为了不重复处理同一个视频该怎么办?
首先配置调度过期策略:
查看文档如下:
- 调度过期策略:调度中心错过调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等; - 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间; - 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间; - 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
这里我们选择忽略,如果立即执行一次就可能重复执行相同的任务。

其次,再看阻塞处理策略,阻塞处理策略就是当前执行器正在执行任务还没有结束时调度中心进行任务调度,此时该如何处理。
查看文档如下: 单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行; 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败; 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
这里如果选择覆盖之前调度则可能重复执行任务,这里选择 丢弃后续调度或单机串行方式来避免任务重复执行。
只做这些配置可以保证任务不会重复执行吗?
做不到,还需要保证任务处理的幂等性,什么是任务的幂等性?任务的幂等性是指:对于数据的操作不论多少次,操作的结果始终是一致的。在本项目中要实现的是不论多少次任务调度同一个视频只执行一次成功的转码。
什么是幂等性?
它描述了一次和多次请求某一个资源对于资源本身应该具有同样的结果。
幂等性是为了解决重复提交问题,比如:恶意刷单,重复支付等。
解决幂等性常用的方案:
1)数据库约束,比如:唯一索引,主键。
2)乐观锁,常用于数据库,更新数据时根据乐观锁状态去更新。
3)唯一序列号,操作传递一个唯一序列号,操作时判断与该序列号相等则执行。
基于以上分析,在执行器接收调度请求去执行视频处理任务时要实现视频处理的幂等性,要有办法去判断该视频是否处理完成,如果正在处理中或处理完则不再处理。这里我们在数据库视频处理表中添加处理状态字段,视频处理完成更新状态为完成,执行视频处理前判断状态是否完成,如果完成则不再处理。
2.4.3 视频处理方案
确定了分片方案,下边梳理整个视频上传及处理的业务流程。

上传视频成功向视频处理待处理表添加记录。
视频处理的详细流程如下:

1、任务调度中心广播作业分片。
2、执行器收到广播作业分片,从数据库读取待处理任务,读取未处理及处理失败的任务。
3、执行器更新任务为处理中,根据任务内容从MinIO下载要处理的文件。
4、执行器启动多线程去处理任务。
5、任务处理完成,上传处理后的视频到MinIO。
6、将更新任务处理结果,如果视频处理完成除了更新任务处理结果以外还要将文件的访问地址更新至任务处理表及文件表中,最后将任务完成记录写入历史表。
2.5 查询待处理任务
2.5.1 需求分析
查询待处理任务只处理未提交及处理失败的任务,任务处理失败后进行重试,最多重试3次。
任务处理成功将待处理记录移动到历史任务表。
下图是待处理任务表:

历史任务表与待处理任务表的结构相同。
2.5.2添加待处理任务
上传视频成功向视频处理待处理表添加记录,暂时只添加对avi视频的处理记录。
根据MIME Type去判断是否是avi视频,下边列出部分MIME Type

avi视频的MIME Type是video/x-msvideo
修改文件信息入库方法,如下:
@Transactional
public MediaFiles addMediaFilesToDb(Long companyId, String fileMd5, UploadFileParamsDto uploadFileParamsDto, String bucket, String objectName) {
//从数据库查询文件
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);
if (mediaFiles == null) {
mediaFiles = new MediaFiles();
//拷贝基本信息
BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles);
mediaFiles.setId(fileMd5);
mediaFiles.setFileId(fileMd5);
mediaFiles.setCompanyId(companyId);
//媒体类型
mediaFiles.setUrl("/" + bucket + "/" + objectName);
mediaFiles.setBucket(bucket);
mediaFiles.setFilePath(objectName);
mediaFiles.setCreateDate(LocalDateTime.now());
mediaFiles.setAuditStatus("002003");
mediaFiles.setStatus("1");
//保存文件信息到文件表
int insert = mediaFilesMapper.insert(mediaFiles);
if (insert < 0) {
log.error("保存文件信息到数据库失败,{}", mediaFiles.toString());
XueChengPlusException.cast("保存文件信息失败");
}
//添加到待处理任务表
addWaitingTask(mediaFiles);
log.debug("保存文件信息到数据库成功,{}", mediaFiles.toString());
}
return mediaFiles;
}
/**
* 添加待处理任务
* @param mediaFiles 媒资文件信息
*/
private void addWaitingTask(MediaFiles mediaFiles){
//文件名称
String filename = mediaFiles.getFilename();
//文件扩展名
String extension = filename.substring(filename.lastIndexOf("."));
//文件mimeType
String mimeType = getMimeType(extension);
//如果是avi视频添加到视频待处理表
if(mimeType.equals("video/x-msvideo")){
MediaProcess mediaProcess = new MediaProcess();
BeanUtils.copyProperties(mediaFiles,mediaProcess);
mediaProcess.setStatus("1");//未处理
mediaProcess.setFailCount(0);//失败次数默认为0
mediaProcessMapper.insert(mediaProcess);
}
}进行前后端测试,上传4个avi视频,观察待处理任务表是否存在记录,记录是否完成。
2.5.3 查询待处理任务
如何保证查询到的待处理视频记录不重复?
编写根据分片参数获取待处理任务的DAO方法,定义DAO接口如下:
public interface MediaProcessMapper extends BaseMapper<MediaProcess> {
/**
* @description 根据分片参数获取待处理任务
* @param shardTotal 分片总数
* @param shardindex 分片序号
* @param count 任务数
* @return java.util.List<com.xuecheng.media.model.po.MediaProcess>
*/
@Select("select * from media_process t where t.id % #{shardTotal} = #{shardIndex} and (t.status = '1' or t.status = '3') and t.fail_count < 3 limit #{count}")
List<MediaProcess> selectListByShardIndex(@Param("shardTotal") int shardTotal,@Param("shardIndex") int shardIndex,@Param("count") int count);
}定义Service接口,查询待处理
/**
* @description 媒资文件处理业务方法
*/
public interface MediaFileProcessService {
/**
* @description 获取待处理任务
* @param shardIndex 分片序号
* @param shardTotal 分片总数
* @param count 获取记录数
* @return java.util.List<com.xuecheng.media.model.po.MediaProcess>
*/
public List<MediaProcess> getMediaProcessList(int shardIndex,int shardTotal,int count);
}service接口实现
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {
@Autowired
MediaFilesMapper mediaFilesMapper;
@Autowired
MediaProcessMapper mediaProcessMapper;
@Override
public List<MediaProcess> getMediaProcessList(int shardIndex, int shardTotal, int count) {
List<MediaProcess> mediaProcesses = mediaProcessMapper.selectListByShardIndex(shardTotal, shardIndex, count);
return mediaProcesses;
}
}2.6 开始执行任务
2.6.1 分布式锁
前边分析了保证任务不重复执行的方案,理论上每个执行器分到的任务是不重复的,但是当在执行器弹性扩容时无法绝对避免任务不重复执行,比如:原来有四个执行器正在执行任务,由于网络问题原有的0、1号执行器无法与调度中心通信,调度中心就会对执行器重新编号,原来的3、4执行器可能就会执行和0、1号执行器相同的任务。
为了避免多线程去争抢同一个任务可以使用synchronized同步锁去解决,如下代码:
synchronized(锁对象){
执行任务...
}synchronized只能保证同一个虚拟机中多个线程去争抢锁。

如果是多个执行器分布式部署,并不能保证同一个视频只有一个执行器去处理。
现在要实现分布式环境下所有虚拟机中的线程去同步执行就需要让多个虚拟机去共用一个锁,虚拟机可以分布式部署,锁也可以分布式部署,如下图:

虚拟机都去抢占同一个锁,锁是一个单独的程序提供加锁、解锁服务。
该锁已不属于某个虚拟机,而是分布式部署,由多个虚拟机所共享,这种锁叫分布式锁。
实现分布式锁的方案有很多,常用的如下:
1、基于数据库实现分布锁
利用数据库主键唯一性的特点,或利用数据库唯一索引、行级锁的特点,比如:多个线程同时向数据库插入主键相同的同一条记录,谁插入成功谁就获取锁,多个线程同时去更新相同的记录,谁更新成功谁就抢到锁。
2、基于redis实现锁
redis提供了分布式锁的实现方案,比如:SETNX、set nx、redisson等。
拿SETNX举例说明,SETNX命令的工作过程是去set一个不存在的key,多个线程去设置同一个key只会有一个线程设置成功,设置成功的的线程拿到锁。
3、使用zookeeper实现
zookeeper是一个分布式协调服务,主要解决分布式程序之间的同步的问题。zookeeper的结构类似的文件目录,多线程向zookeeper创建一个子目录(节点)只会有一个创建成功,利用此特点可以实现分布式锁,谁创建该结点成功谁就获得锁。
本次我们选用数据库实现分布锁,后边的模块会选用其它方案到时再详细介绍。
2.6.2 开启任务
下边基于数据库方式实现分布锁,开始执行任务将任务执行状态更新为4表示任务执行中。
下边的sql语句可以实现更新操作:
update media_process m set m.status='4' where m.id=?如果是多个线程去执行该sql都将会执行成功,但需求是只能有一个线程抢到锁,所以此sql无法满足需求。
使用乐观锁方式实现更新操作:
update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=?多个线程同时执行上边的sql只会有一个线程执行成功。
什么是乐观锁、悲观锁?
synchronized是一种悲观锁,在执行被synchronized包裹的代码时需要首先获取锁,没有拿到锁则无法执行,是总悲观的认为别的线程会去抢,所以要悲观锁。
乐观锁的思想是它不认为会有线程去争抢,尽管去执行,如果没有执行成功就再去重试。
数据库的乐观锁实现方式是在表中增加一个version字段,更新时判断是否等于某个版本,等于则更新否则更新失败,如下方式。
update t1 set t1.data1 = '',t1.version='2' where t1.version='1'实现如下:
1、定义mapper
public interface MediaProcessMapper extends BaseMapper<MediaProcess> {
/**
* 开启一个任务
* @param id 任务id
* @return 更新记录数
*/
@Update("update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=#{id}")
int startTask(@Param("id") long id);
}2、在MediaFileProcessService中定义接口
/**
* 开启一个任务
* @param id 任务id
* @return true开启任务成功,false开启任务失败
*/
public boolean startTask(long id);
//实现如下
public boolean startTask(long id) {
int result = mediaProcessMapper.startTask(id);
return result<=0?false:true;
}2.7 更新任务状态
任务处理完成需要更新任务处理结果,任务执行成功更新视频的URL、及任务处理结果,将待处理任务记录删除,同时向历史任务表添加记录。
在MediaFileProcessService接口添加方法
/**
* @description 保存任务结果
* @param taskId 任务id
* @param status 任务状态
* @param fileId 文件id
* @param url url
* @param errorMsg 错误信息
* @return void
*/
void saveProcessFinishStatus(Long taskId,String status,String fileId,String url,String errorMsg);service接口方法实现如下:
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {
@Autowired
MediaFilesMapper mediaFilesMapper;
@Autowired
MediaProcessMapper mediaProcessMapper;
@Autowired
MediaProcessHistoryMapper mediaProcessHistoryMapper;
@Transactional
@Override
public void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) {
//查出任务,如果不存在则直接返回
MediaProcess mediaProcess = mediaProcessMapper.selectById(taskId);
if(mediaProcess == null){
return ;
}
//处理失败,更新任务处理结果
LambdaQueryWrapper<MediaProcess> queryWrapperById = new LambdaQueryWrapper<MediaProcess>().eq(MediaProcess::getId, taskId);
//处理失败
if(status.equals("3")){
MediaProcess mediaProcess_u = new MediaProcess();
mediaProcess_u.setStatus("3");
mediaProcess_u.setErrormsg(errorMsg);
mediaProcess_u.setFailCount(mediaProcess.getFailCount()+1);
mediaProcessMapper.update(mediaProcess_u,queryWrapperById);
log.debug("更新任务处理状态为失败,任务信息:{}",mediaProcess_u);
return ;
}
//任务处理成功
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId);
if(mediaFiles!=null){
//更新媒资文件中的访问url
mediaFiles.setUrl(url);
mediaFilesMapper.updateById(mediaFiles);
}
//处理成功,更新url和状态
mediaProcess.setUrl(url);
mediaProcess.setStatus("2");
mediaProcess.setFinishDate(LocalDateTime.now());
mediaProcessMapper.updateById(mediaProcess);
//添加到历史记录
MediaProcessHistory mediaProcessHistory = new MediaProcessHistory();
BeanUtils.copyProperties(mediaProcess, mediaProcessHistory);
mediaProcessHistoryMapper.insert(mediaProcessHistory);
//删除mediaProcess
mediaProcessMapper.deleteById(mediaProcess.getId());
}
@Override
public List<MediaProcess> getMediaProcessList(int shardIndex, int shardTotal, int count) {
List<MediaProcess> mediaProcesses = mediaProcessMapper.selectListByShardIndex(shardTotal, shardIndex, count);
return mediaProcesses;
}
}2.8 视频处理
视频采用并发处理,每个视频使用一个线程去处理,每次处理的视频数量不要超过cpu核心数。
所有视频处理完成结束本次执行,为防止代码异常出现无限期等待则添加超时设置,到达超时时间还没有处理完成仍结束任务。
定义任务类VideoTask 如下:
@Slf4j
@Component
public class VideoTask {
@Autowired
MediaFileService mediaFileService;
@Autowired
MediaFileProcessService mediaFileProcessService;
@Value("${videoprocess.ffmpegpath}")
String ffmpegpath;
@XxlJob("videoJobHandler")
public void videoJobHandler() throws Exception {
// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
List<MediaProcess> mediaProcessList = null;
int size = 0;
try {
//取出cpu核心数作为一次处理数据的条数
int processors = Runtime.getRuntime().availableProcessors();
//一次处理视频数量不要超过cpu核心数
mediaProcessList = mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, processors);
size = mediaProcessList.size();
log.debug("取出待处理视频任务{}条", size);
if (size <= 0) {
return;
}
} catch (Exception e) {
e.printStackTrace();
return;
}
//启动size个线程的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(size);
//计数器
CountDownLatch countDownLatch = new CountDownLatch(size);
//将处理任务加入线程池
mediaProcessList.forEach(mediaProcess -> {
threadPool.execute(() -> {
try {
//任务id
Long taskId = mediaProcess.getId();
//抢占任务
boolean b = mediaFileProcessService.startTask(taskId);
if (!b) {
return;
}
log.debug("开始执行任务:{}", mediaProcess);
//下边是处理逻辑
//桶
String bucket = mediaProcess.getBucket();
//存储路径
String filePath = mediaProcess.getFilePath();
//原始视频的md5值
String fileId = mediaProcess.getFileId();
//原始文件名称
String filename = mediaProcess.getFilename();
//将要处理的文件下载到服务器上
File originalFile = mediaFileService.downloadFileFromMinIO(mediaProcess.getBucket(), mediaProcess.getFilePath());
if (originalFile == null) {
log.debug("下载待处理文件失败,originalFile:{}", mediaProcess.getBucket().concat(mediaProcess.getFilePath()));
mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "下载待处理文件失败");
return;
}
//处理下载的视频文件
File mp4File = null;
try {
mp4File = File.createTempFile("mp4", ".mp4");
} catch (IOException e) {
log.error("创建mp4临时文件失败");
mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "创建mp4临时文件失败");
return;
}
//视频处理结果
String result = "";
try {
//开始处理视频
Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpegpath, originalFile.getAbsolutePath(), mp4File.getName(), mp4File.getAbsolutePath());
//开始视频转换,成功将返回success
result = videoUtil.generateMp4();
} catch (Exception e) {
e.printStackTrace();
log.error("处理视频文件:{},出错:{}", mediaProcess.getFilePath(), e.getMessage());
}
if (!result.equals("success")) {
//记录错误信息
log.error("处理视频失败,视频地址:{},错误信息:{}", bucket + filePath, result);
mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, result);
return;
}
//将mp4上传至minio
//mp4在minio的存储路径
String objectName = getFilePath(fileId, ".mp4");
//访问url
String url = "/" + bucket + "/" + objectName;
try {
mediaFileService.addMediaFilesToMinIO(mp4File.getAbsolutePath(), "video/mp4", bucket, objectName);
//将url存储至数据,并更新状态为成功,并将待处理视频记录删除存入历史
mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "2", fileId, url, null);
} catch (Exception e) {
log.error("上传视频失败或入库失败,视频地址:{},错误信息:{}", bucket + objectName, e.getMessage());
//最终还是失败了
mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "处理后视频上传或入库失败");
}
}finally {
countDownLatch.countDown();
}
});
});
//等待,给一个充裕的超时时间,防止无限等待,到达超时时间还没有处理完成则结束任务
countDownLatch.await(30, TimeUnit.MINUTES);
}
private String getFilePath(String fileMd5,String fileExt){
return fileMd5.substring(0,1) + "/" + fileMd5.substring(1,2) + "/" + fileMd5 + "/" +fileMd5 +fileExt;
}
}2.9 测试
2.9.1 基本测试
进入xxl-job调度中心添加执行器和视频处理任务
在xxl-job配置任务调度策略:
1)配置阻塞处理策略为:丢弃后续调度。
2)配置视频处理调度时间间隔不用根据视频处理时间去确定,可以配置的小一些,如:5分钟,即使到达调度时间如果视频没有处理完会丢弃调度请求。
配置完成开始测试视频处理:
1、首先上传至少4个视频,非mp4格式。
2、在xxl-job启动视频处理任务
3、观察媒资管理服务后台日志
2.9.2 失败测试
1、先停止调度中心的视频处理任务。
2、上传视频,手动修改待处理任务表中file_path字段为一个不存在的文件地址
3、启动任务
观察任务处理失败后是否会重试,并记录失败次数。
2.9.3 抢占任务测试
1、修改调度中心中视频处理任务的阻塞处理策略为“覆盖之间的调度”

2、在抢占任务代码处打断点并选择支持多线程方式

3、在抢占任务代码处的下边两行代码分别打上断点,避免观察时代码继续执行。
4、启动任务
此时多个线程执行都停留在断点处

依次放行,观察同一个任务只会被一个线程抢占成功。


2.10 其它问题
2.10.1 任务补偿机制
如果有线程抢占了某个视频的处理任务,如果线程处理过程中挂掉了,该视频的状态将会一直是处理中,其它线程将无法处理,这个问题需要用补偿机制。
单独启动一个任务找到待处理任务表中超过执行期限但仍在处理中的任务,将任务的状态改为执行失败。
任务执行期限是处理一个视频的最大时间,比如定为30分钟,通过任务的启动时间去判断任务是否超过执行期限。
大家思考这个sql该如何实现?
大家尝试自己实现此任务补偿机制。
2.10.2 达到最大失败次数
当任务达到最大失败次数时一般就说明程序处理此视频存在问题,这种情况就需要人工处理,在页面上会提示失败的信息,人工可手动执行该视频进行处理,或通过其它转码工具进行视频转码,转码后直接上传mp4视频。
2.10.3 分块文件清理问题
上传一个文件进行分块上传,上传一半不传了,之前上传到minio的分块文件要清理吗?怎么做的?
1、在数据库中有一张文件表记录minio中存储的文件信息。
2、文件开始上传时会写入文件表,状态为上传中,上传完成会更新状态为上传完成。
3、当一个文件传了一半不再上传了说明该文件没有上传完成,会有定时任务去查询文件表中的记录,如果文件未上传完成则删除minio中没有上传成功的文件目录。
三 任务调度实战—课程发布
3.1 需求分析
3.1.1 数据模型
教学机构人员在课程审核通过后即可发布课程,课程发布后会公开展示在网站上供学生查看、选课和学习。
在网站上展示课程信息需要解决课程信息显示的性能问题,如果速度慢(排除网速)会影响用户的体验性。
如何去快速搜索课程?
打开课程详情页面仍然去查询数据库可行吗?
为了提高网站的速度需要将课程信息进行缓存,并且要将课程信息加入索引库方便搜索,下图显示了课程发布后课程信息的流转情况:

1、向内容管理数据库的课程发布表存储课程发布信息,更新课程基本信息表中发布状态为已发布。
2、向Redis存储课程缓存信息。
3、向Elasticsearch存储课程索引信息。
4、请求分布文件系统存储课程静态化页面(即html页面),实现快速浏览课程详情页面。
课程发布表的数据来源于课程预发布表,它们的结构基本一样,只是课程发布表中的状态是课程发布状态,如下图:

redis中的课程缓存信息是将课程发布表中的数据转为json进行存储。
elasticsearch中的课程索引信息是根据搜索需要将课程名称、课程介绍等信息进行索引存储。
MinIO中存储了课程的静态化页面文件(html网页),查看课程详情是通过文件系统去浏览课程详情页面。
3.2 课程发布的场景方案
目前我们已经有了任务调度的技术积累,这里选用任务调度的方案去实现分布式事务控制,课程发布满足:课程发布操作后,先更新数据库中的课程发布状态,更新后向redis、elasticsearch、MinIO写课程信息,只要在一定时间内最终向redis、elasticsearch、MinIO写数据成功即可。
下图是具体的技术方案:

1、在内容管理服务的数据库中添加一个消息表,消息表和课程发布表在同一个数据库。
2、点击课程发布通过本地事务向课程发布表写入课程发布信息,同时向消息表写课程发布的消息。通过数据库进行控制,只要课程发布表插入成功消息表也插入成功,消息表的数据就记录了某门课程发布的任务。
3、启动任务调度系统定时调度内容管理服务去定时扫描消息表的记录。
4、当扫描到课程发布的消息时即开始完成向redis、elasticsearch、MinIO同步数据的操作。
5、同步数据的任务完成后删除消息表记录。
时序图如下:
下图是课程发布操作的流程:

1、执行发布操作,内容管理服务存储课程发布表的同时向消息表添加一条“课程发布任务”。这里使用本地事务保证课程发布信息保存成功,同时消息表也保存成功。
2、任务调度服务定时调度内容管理服务扫描消息表,由于课程发布操作后向消息表插入一条课程发布任务,此时扫描到一条任务。
3、拿到任务开始执行任务,分别向redis、elasticsearch及文件系统存储数据。
4、任务完成后删除消息表记录。
3.3 课程发布接口
3.3.1 接口定义
根据课程发布的分布式事务控制方案,课程发布操作首先通过本地事务向课程发布表写入课程发布信息并向消息表插入一条消息,这里定义的课程发布接口要实现该功能。
在内容管理接口工程中定义课程发布接口。
/**
* @description 课程预览,发布
*/
@Api(value = "课程预览发布接口",tags = "课程预览发布接口")
@Controller
public class CoursePublishController {
...
@ApiOperation("课程发布")
@ResponseBody
@PostMapping ("/coursepublish/{courseId}")
public void coursepublish(@PathVariable("courseId") Long courseId){
}
...3.3.2 接口开发
3.3.2.1 DAO开发
课程发布操作对数据库操作如下:
1、向课程发布表course_publish插入一条记录,记录来源于课程预发布表,如果存在则更新,发布状态为:已发布。
2、更新course_base表的课程发布状态为:已发布
3、删除课程预发布表的对应记录。
4、向mq_message消息表插入一条消息,消息类型为:course_publish
约束:
1、课程审核通过方可发布。
2、本机构只允许发布本机构的课程。
以上功能使用自动生成的mapper接口即可完成。
1、在内容管理数据库创建mq_message消息表及消息历史消息表(历史表存储已经完成的消息)。

消息表结构如下:

2、生成mq_message消息表、course_publish课程发布表的po和mapper接口
稍后会开发一个通用的消息处理组件,这里先不生成代码。
3.3.2.2 Service开发
定义Service接口:
/**
* @description 课程发布接口
* @param companyId 机构id
* @param courseId 课程id
* @return void
*/
public void publish(Long companyId,Long courseId);编写课程发布的Service方法:
@Transactional
@Override
public void publish(Long companyId, Long courseId) {
//约束校验
//查询课程预发布表
CoursePublishPre coursePublishPre = coursePublishPreMapper.selectById(courseId);
if(coursePublishPre == null){
XueChengPlusException.cast("请先提交课程审核,审核通过才可以发布");
}
//本机构只允许提交本机构的课程
if(!coursePublishPre.getCompanyId().equals(companyId)){
XueChengPlusException.cast("不允许提交其它机构的课程。");
}
//课程审核状态
String auditStatus = coursePublishPre.getStatus();
//审核通过方可发布
if(!"202004".equals(auditStatus)){
XueChengPlusException.cast("操作失败,课程审核通过方可发布。");
}
//保存课程发布信息
saveCoursePublish(courseId);
//保存消息表
saveCoursePublishMessage(courseId);
//删除课程预发布表对应记录
coursePublishPreMapper.deleteById(courseId);
}
/**
* @description 保存课程发布信息
* @param courseId 课程id
* @return void
*/
private void saveCoursePublish(Long courseId){
//整合课程发布信息
//查询课程预发布表
CoursePublishPre coursePublishPre = coursePublishPreMapper.selectById(courseId);
if(coursePublishPre == null){
XueChengPlusException.cast("课程预发布数据为空");
}
CoursePublish coursePublish = new CoursePublish();
//拷贝到课程发布对象
BeanUtils.copyProperties(coursePublishPre,coursePublish);
coursePublish.setStatus("203002");
CoursePublish coursePublishUpdate = coursePublishMapper.selectById(courseId);
if(coursePublishUpdate == null){
coursePublishMapper.insert(coursePublish);
}else{
coursePublishMapper.updateById(coursePublish);
}
//更新课程基本表的发布状态
CourseBase courseBase = courseBaseMapper.selectById(courseId);
courseBase.setStatus("203002");
courseBaseMapper.updateById(courseBase);
}
/**
* @description 保存消息表记录,稍后实现
* @param courseId 课程id
* @return void
*/
private void saveCoursePublishMessage(Long courseId){
}3.3.2.3 接口完善
完善接口层代码
@ApiOperation("课程发布")
@ResponseBody
@PostMapping ("/coursepublish/{courseId}")
public void coursepublish(@PathVariable("courseId") Long courseId){
Long companyId = 1232141425L;
coursePublishService.publish(companyId,courseId);
}3.3.3 接口测试
先使用httpclient方法测试:
### 课程发布
POST {{content_host}}/content/coursepublish/2先测试约束条件:
1、在未提交审核时进行课程发布测试。
2、在课程未审核通过时进行发布。
正常流程测试:
1、提交审核课程
2、手动修改课程预发布表与课程基本信息的审核状态为审核通过。
3、执行课程发布
4、观察课程发布表记录是否正常,课程预发布表记录已经删除,课程基本信息表与课程发布表的发布状态为”发布“。
使用前后端联调方式测试。
3.4 消息处理SDK
3.4.1 消息模块技术方案
课程发布操作执行后需要扫描消息表的记录,有关消息表处理的有哪些?

上图中红色框内的都是与消息处理相关的操作:
1、新增消息表
2、扫描消息表。
3、更新消息表。
4、删除消息表。
使用消息表这种方式实现最终事务一致性的地方除了课程发布还有其它业务场景。

如果在每个地方都实现一套针对消息表定时扫描、处理的逻辑基本上都是重复的,软件的可复用性太低,成本太高。
如何解决这个问题?
针对这个问题可以想到将消息处理相关的逻辑做成一个通用的东西。
是做成通用的服务,还是做成通用的代码组件呢?
通用的服务是完成一个通用的独立功能,并提供独立的网络接口,比如:项目中的文件系统服务,提供文件的分布式存储服务。
代码组件也是完成一个通用的独立功能,通常会提供API的方式供外部系统使用,比如:fastjson、Apache commons工具包等。
如果将消息处理做成一个通用的服务,该服务需要连接多个数据库,因为它要扫描微服务数据库下的消息表,并且要提供与微服务通信的网络接口,单就针对当前需求而言开发成本有点高。
如果将消息处理做一个SDK工具包相比通用服务不仅可以解决将消息处理通用化的需求,还可以降低成本。
所以,本项目确定将对消息表相关的处理做成一个SDK组件供各微服务使用,如下图所示:

下边对消息SDK的设计内容进行说明:
sdk需要提供执行任务的逻辑吗?
拿课程发布任务举例,执行课程发布任务是要向redis、索引库等同步数据,其它任务的执行逻辑是不同的,所以执行任务在sdk中不用实现任务逻辑,只需要提供一个抽象方法由具体的执行任务方去实现。
如何保证任务的幂等性?
在视频处理章节介绍的视频处理的幂等性方案,这里可以采用类似方案,任务执行完成后会从消息表删除,如果消息的状态是完成或不存在消息表中则不用执行。
如何保证任务不重复执行?
采用和视频处理章节一致方案,除了保证任务的幂等性外,任务调度采用分片广播,根据分片参数去获取任务,另外阻塞调度策略为丢弃任务。
注意:这里是信息同步类任务,即使任务重复执行也没有关系,不再使用抢占任务的方式保证任务不重复执行。
还有一个问题,根据消息表记录是否存在或消息表中的任务状态去保证任务的幂等性,如果一个任务有好几个小任务,比如:课程发布任务需要执行三个同步操作:存储课程到redis、存储课程到索引库,存储课程页面到文件系统。如果其中一个小任务已经完成也不应该去重复执行。这里该如何设计?
将小任务作为任务的不同的阶段,在消息表中设计阶段状态。

每完成一个阶段在相应的阶段状态字段打上完成标记,即使这个大任务没有完成再重新执行时,如果小阶段任务完成了也不会重复执行某个小阶段的任务。
综上所述,除了消息表的基本的增、删、改、查的接口外,消息SDK还具有如下接口功能:
package com.xuecheng.messagesdk.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.xuecheng.messagesdk.model.po.MqMessage;
import java.util.List;
/**
* <p>
* 服务类
* </p>
*/
public interface MqMessageService extends IService<MqMessage> {
/**
* @description 扫描消息表记录,采用与扫描视频处理表相同的思路
* @param shardIndex 分片序号
* @param shardTotal 分片总数
* @param count 扫描记录数
* @return java.util.List 消息记录
*/
public List<MqMessage> getMessageList(int shardIndex, int shardTotal, String messageType,int count);
/**
* @description 完成任务
* @param id 消息id
* @return int 更新成功:1
*/
public int completed(long id);
/**
* @description 完成阶段任务
* @param id 消息id
* @return int 更新成功:1
*/
public int completedStageOne(long id);
public int completedStageTwo(long id);
public int completedStageThree(long id);
public int completedStageFour(long id);
/**
* @description 查询阶段状态
* @param id
* @return int
*/
public int getStageOne(long id);
public int getStageTwo(long id);
public int getStageThree(long id);
public int getStageFour(long id);
}消息SDK提供消息处理抽象类,此抽象类供使用方去继承使用,如下:
Java
package com.xuecheng.messagesdk.service;
import com.xuecheng.messagesdk.model.po.MqMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
import java.util.concurrent.*;
/**
* @description 消息处理抽象类
*/
@Slf4j
@Data
public abstract class MessageProcessAbstract {
@Autowired
MqMessageService mqMessageService;
/**
* @param mqMessage 执行任务内容
* @return boolean true:处理成功,false处理失败
* @description 任务处理
*/
public abstract boolean execute(MqMessage mqMessage);
/**
* @description 扫描消息表多线程执行任务
* @param shardIndex 分片序号
* @param shardTotal 分片总数
* @param messageType 消息类型
* @param count 一次取出任务总数
* @param timeout 预估任务执行时间,到此时间如果任务还没有结束则强制结束 单位秒
* @return void
*/
public void process(int shardIndex, int shardTotal, String messageType,int count,long timeout) {
try {
//扫描消息表获取任务清单
List<MqMessage> messageList = mqMessageService.getMessageList(shardIndex, shardTotal,messageType, count);
//任务个数
int size = messageList.size();
log.debug("取出待处理消息"+size+"条");
if(size<=0){
return ;
}
//创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(size);
//计数器
CountDownLatch countDownLatch = new CountDownLatch(size);
messageList.forEach(message -> {
threadPool.execute(() -> {
log.debug("开始任务:{}",message);
//处理任务
try {
boolean result = execute(message);
if(result){
log.debug("任务执行成功:{})",message);
//更新任务状态,删除消息表记录,添加到历史表
int completed = mqMessageService.completed(message.getId());
if (completed>0){
log.debug("任务执行成功:{}",message);
}else{
log.debug("任务执行失败:{}",message);
}
}
} catch (Exception e) {
e.printStackTrace();
log.debug("任务出现异常:{},任务:{}",e.getMessage(),message);
}
//计数
countDownLatch.countDown();
log.debug("结束任务:{}",message);
});
});
//等待,给一个充裕的超时时间,防止无限等待,到达超时时间还没有处理完成则结束任务
countDownLatch.await(timeout,TimeUnit.SECONDS);
System.out.println("结束....");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}3.4.2 消息模块SDK测试
1、在内容管理数据库创建消息表和消息历史表
2、拷贝课程资料中的xuecheng-plus-message-sdk到工程目录,如下图:

3、修改test下的bootstrap.yml中的数据库连接
下边测试消息SDK的接口:
1、继承MessageProcessAbstract 抽象类编写任务执行方法
package com.xuecheng.messagesdk;
import com.xuecheng.messagesdk.model.po.MqMessage;
import com.xuecheng.messagesdk.service.MessageProcessAbstract;
import com.xuecheng.messagesdk.service.MqMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
/**
* @description 消息处理测试类,继承MessageProcessAbstract
*/
@Slf4j
@Component
public class MessageProcessClass extends MessageProcessAbstract {
@Autowired
MqMessageService mqMessageService;
//执行任务
@Override
public boolean execute(MqMessage mqMessage) {
Long id = mqMessage.getId();
log.debug("开始执行任务:{}",id);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//取出阶段状态
int stageOne = mqMessageService.getStageOne(id);
if(stageOne<1){
log.debug("开始执行第一阶段任务");
System.out.println();
int i = mqMessageService.completedStageOne(id);
if(i>0){
log.debug("完成第一阶段任务");
}
}else{
log.debug("无需执行第一阶段任务");
}
return true;
}
}2、编写测试类
@SpringBootTest
public class MessageProcessClassTest {
@Autowired
MessageProcessClass messageProcessClass;
@Test
public void test() {
System.out.println("开始执行-----》" + LocalDateTime.now());
messageProcessClass.process(0, 1, "test", 5, 30);
System.out.println("结束执行-----》" + LocalDateTime.now());
Thread.sleep(9000000);
}
}3、准备测试数据,在消息表添加消息类型为"test"的消息
4、执行MessageProcessClassTest 类中的test()方法,观察控制台任务执行的日志信息。
3.4.3 集成消息SDK
3.4.3.1 添加消息
1、在内容管理数据库创建消息表和消息历史表
2、拷贝课程资料中的xuecheng-plus-message-sdk到工程目录,如下图:

3、在内容管理service工程中添加sdk依赖
<dependency>
<groupId>com.xuecheng</groupId>
<artifactId>xuecheng-plus-message-sdk</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>4、课程发布操作使用本地事务保存课程发布信息、添加消息表。
回到当初编写课程发布时的代码,如下:
@Transactional
@Override
public void publish(Long companyId, Long courseId) {
//约束校验
//查询课程预发布表
CoursePublishPre coursePublishPre = coursePublishPreMapper.selectById(courseId);
if(coursePublishPre == null){
XueChengPlusException.cast("请先提交课程审核,审核通过才可以发布");
}
//本机构只允许提交本机构的课程
if(!coursePublishPre.getCompanyId().equals(companyId)){
XueChengPlusException.cast("不允许提交其它机构的课程。");
}
//课程审核状态
String auditStatus = coursePublishPre.getStatus();
//审核通过方可发布
if(!"202004".equals(auditStatus)){
XueChengPlusException.cast("操作失败,课程审核通过方可发布。");
}
//保存课程发布信息
saveCoursePublish(courseId);
//保存消息表
saveCoursePublishMessage(courseId);
//删除课程预发布表对应记录
coursePublishPreMapper.deleteById(courseId);
}我们要填充的saveCoursePublishMessage(courseId)方法,如下:
/**
* @description 保存消息表记录
* @param courseId 课程id
* @return void
*/
private void saveCoursePublishMessage(Long courseId){
MqMessage mqMessage = mqMessageService.addMessage("course_publish", String.valueOf(courseId), null, null);
if(mqMessage==null){
XueChengPlusException.cast(CommonError.UNKOWN_ERROR);
}
}下边进行测试:
发布一门课程,观察消息表是否正常添加消息。
需要手动修改课程审核状态为审核通过执行发布操作,发布后可以修改发布状态为下架重新发布测试。
3.4.3.2 课程发布任务处理
在内容管理服务添加消息处理sdk的依赖即可使用它,实现sdk中的MessageProcessAbstract类,重写execte方法。
实现sdk中的MessageProcessAbstract类:
@Slf4j
@Component
public class CoursePublishTask extends MessageProcessAbstract {
//课程发布任务处理
@Override
public boolean execute(MqMessage mqMessage) {
//获取消息相关的业务信息
String businessKey1 = mqMessage.getBusinessKey1();
long courseId = Integer.parseInt(businessKey1);
//课程静态化
generateCourseHtml(mqMessage,courseId);
//课程索引
saveCourseIndex(mqMessage,courseId);
//课程缓存
saveCourseCache(mqMessage,courseId);
return true;
}
//生成课程静态化页面并上传至文件系统
public void generateCourseHtml(MqMessage mqMessage,long courseId){
log.debug("开始进行课程静态化,课程id:{}",courseId);
//消息id
Long id = mqMessage.getId();
//消息处理的service
MqMessageService mqMessageService = this.getMqMessageService();
//消息幂等性处理
int stageOne = mqMessageService.getStageOne(id);
if(stageOne >0){
log.debug("课程静态化已处理直接返回,课程id:{}",courseId);
return ;
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//保存第一阶段状态
mqMessageService.completedStageOne(id);
}
//将课程信息缓存至redis
public void saveCourseCache(MqMessage mqMessage,long courseId){
log.debug("将课程信息缓存至redis,课程id:{}",courseId);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
//保存课程索引信息
public void saveCourseIndex(MqMessage mqMessage,long courseId){
log.debug("保存课程索引信息,课程id:{}",courseId);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}3.4.3.3 开启任务调度
1、首先在内容管理service工程中添加xxl-job依赖
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
</dependency>2、配置执行器
在nacos中在content-service-dev.yaml中配置
xxl:
job:
admin:
addresses: http://192.168.2.203:8088/xxl-job-admin
executor:
appname: coursepublish-job
address:
ip:
port: 8999
logpath: /data/applogs/xxl-job/jobhandler
logretentiondays: 30
accessToken: default_token3、从媒资管理服务层工程中拷贝一个XxlJobConfig配置类到内容管理service工程中。
在xxl-job-admin控制台中添加执行器

3、编写任务调度入口
@Slf4j
@Component
public class CoursePublishTask extends MessageProcessAbstract {
//任务调度入口
@XxlJob("CoursePublishJobHandler")
public void coursePublishJobHandler() throws Exception {
// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
log.debug("shardIndex="+shardIndex+",shardTotal="+shardTotal);
//参数:分片序号、分片总数、消息类型、一次最多取到的任务数量、一次任务调度执行的超时时间
process(shardIndex,shardTotal,"course_publish",30,60);
}
....4、在xxl-job添加任务

任务配置如下:

到此SDK开发、集成完成,下一步添加课程发布后页面静态化、课程缓存、课程索引等任务。
3.4.3.4 测试
在消息表添加课程发布的消息,消息类型为course_publish,business_key1为发布课程的ID
1、测试是否可以正常调度执行。
2、测试任务幂等性
在 saveCourseCache(mqMessage,courseId);处打断点,待执行到这里观察数据库第一阶段完成的标记预期标记为1。
结束进程,再重新启动,观察第一阶段的任务预期不再执行。
3、任务执行完成删除消息表记录,插入历史表,state状态字段为1
3.5 页面静态化
3.5.1 什么是页面静态化
根据课程发布的操作流程,执行课程发布后要将课程详情信息页面静态化,生成html页面上传至文件系统。
什么是页面静态化?
课程预览功能通过模板引擎技术在页面模板中填充数据,生成html页面,这个过程是当客户端请求服务器时服务器才开始渲染生成html页面,最后响应给浏览器,服务端渲染的并发能力是有限的。
页面静态化则强调将生成html页面的过程提前,提前使用模板引擎技术生成html页面,当客户端请求时直接请求html页面,由于是静态页面可以使用nginx、apache等高性能的web服务器,并发性能高。
什么时候能用页面静态化技术?
当数据变化不频繁,一旦生成静态页面很长一段时间内很少变化,此时可以使用页面静态化。因为如果数据变化频繁,一旦改变就需要重新生成静态页面,导致维护静态页面的工作量很大。
根据课程发布的业务需求,虽然课程发布后仍可以修改课程信息,但需要经过课程审核,且修改频度不大,所以适合使用页面静态化。
3.5.2 静态化测试
下边使用freemarker技术对页面静态化生成html页面。
在内容管理service工程中添加freemarker依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>编写测试方法
@SpringBootTest
public class FreemarkerTest {
@Autowired
CoursePublishService coursePublishService;
//测试页面静态化
@Test
public void testGenerateHtmlByTemplate() throws IOException, TemplateException {
//配置freemarker
Configuration configuration = new Configuration(Configuration.getVersion());
//加载模板
//选指定模板路径,classpath下templates下
//得到classpath路径
String classpath = this.getClass().getResource("/").getPath();
configuration.setDirectoryForTemplateLoading(new File(classpath + "/templates/"));
//设置字符编码
configuration.setDefaultEncoding("utf-8");
//指定模板文件名称
Template template = configuration.getTemplate("course_template.ftl");
//准备数据
CoursePreviewDto coursePreviewInfo = coursePublishService.getCoursePreviewInfo(2L);
Map<String, Object> map = new HashMap<>();
map.put("model", coursePreviewInfo);
//静态化
//参数1:模板,参数2:数据模型
String content = FreeMarkerTemplateUtils.processTemplateIntoString(template, map);
System.out.println(content);
//将静态化内容输出到文件中
InputStream inputStream = IOUtils.toInputStream(content);
//输出流
FileOutputStream outputStream = new FileOutputStream("D:\\develop\\test.html");
IOUtils.copy(inputStream, outputStream);
}
}将content-api工程下的模板拷贝到content-service工程下:

执行测试方法,观察D:\develop\test.html 是否成功生成。
3.5.3 上传文件测试
3.5.3.1 配置远程调用环境
静态化生成文件后需要上传至分布式文件系统,根据微服务的职责划分,媒资管理服务负责维护文件系统中的文件,所以内容管理服务对页面静态化生成html文件需要调用媒资管理服务的上传文件接口。如下图:

微服务之间难免会存在远程调用,在Spring Cloud中可以使用Feign进行远程调用,
Feign是一个声明式的http客户端,官方地址:https://github.com/OpenFeign/feign
其作用就是帮助我们优雅的实现http请求的发送,解决上面提到的问题。
下边先准备Feign的开发环境:
1、在内容管理content-service工程添加依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- Spring Cloud 微服务远程调用 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
</dependency>
<!--feign支持Multipart格式传参-->
<dependency>
<groupId>io.github.openfeign.form</groupId>
<artifactId>feign-form</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>io.github.openfeign.form</groupId>
<artifactId>feign-form-spring</artifactId>
<version>3.8.0</version>
</dependency>2、在nacos配置feign-dev.yaml公用配置文件
feign:
hystrix:
enabled: true
circuitbreaker:
enabled: true
hystrix:
command:
default:
execution:
isolation:
thread:
timeoutInMilliseconds: 30000 #熔断超时时间
ribbon:
ConnectTimeout: 60000 #连接超时时间
ReadTimeout: 60000 #读超时时间
MaxAutoRetries: 0 #重试次数
MaxAutoRetriesNextServer: 1 #切换实例的重试次数3、在内容管理service工程和内容管理api工程都引入此配置文件
shared-configs:
- data-id: feign-${spring.profiles.active}.yaml
group: xuecheng-plus-common
refresh: true4、在内容管理service工程配置feign支持Multipart,拷贝课程资料下的MultipartSupportConfig 到content-service工程下的config包下。
3.5.3.2 扩充上传文件接口
现在需要将课程的静态文件上传到minio,单独存储到course目录下,文件的objectname为"课程id.html",原有的上传文件接口需要增加一个参数 objectname。
下边扩充媒资服务的上传文件接口
@ApiOperation("上传文件")
@RequestMapping(value = "/upload/coursefile",consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public UploadFileResultDto upload(@RequestPart("filedata") MultipartFile filedata,
@RequestParam(value= "objectName",required=false) String objectName) throws IOException{
//....
}service接口也增加一个参数:
/**
* 上传文件
* @param companyId 机构id
* @param uploadFileParamsDto 上传文件信息
* @param localFilePath 文件磁盘路径
* @param objectName 对象名
* @return 文件信息
*/
public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, String localFilePath,String objectName);修改原有uploadFile方法,判断如果objectName为空则采取年月日样式的路径方式。
//存储到minio中的对象名(带目录)
if(StringUtils.isEmpty(objectName)){
objectName = defaultFolderPath + fileMd5 + extension;
}
// String objectName = defaultFolderPath + fileMd5 + extension;3.5.3.3 远程调用测试
在content-service下编写feign接口
/**
* @description 媒资管理服务远程接口
*/
@FeignClient(value = "media-api",configuration = MultipartSupportConfig.class)
public interface MediaServiceClient {
@RequestMapping(value = "/media/upload/coursefile",consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
String uploadFile(@RequestPart("filedata") MultipartFile upload,@RequestParam(value = "objectName",required=false) String objectName);
}在启动类添加@EnableFeignClients注解
@EnableFeignClients(basePackages={"com.xuecheng.content.feignclient"})
编写测试方法
/**
* @description 测试使用feign远程上传文件
*/
@SpringBootTest
public class FeignUploadTest {
@Autowired
MediaServiceClient mediaServiceClient;
//远程调用,上传文件
@Test
public void test() {
MultipartFile multipartFile = MultipartSupportConfig.getMultipartFile(new File("D:\\develop\\test.html"));
mediaServiceClient.uploadFile(multipartFile,"course","test.html");
}
}下边进行测试,启动媒资服务,执行测试方法,上传文件成功,进入minIO查看文件

访问:http://192.168.101.65:9000/mediafiles/course/74b386417bb9f3764009dc94068a5e44.html
查看是否可以正常访问。
3.5.4 课程静态化开发
课程页面静态化和静态页面远程上传测试通过,下一步开发课程静态化功能,最终使用消息处理SDK去调度执行。
3.5.4.1 静态化实现
课程静态化包括两部分工作:生成课程静态化页面,上传静态页面到文件系统。
在课程发布的service编写这两部分内容,最后通过消息去调度执行。
1、接口定义
/**
* @description 课程静态化
* @param courseId 课程id
* @return File 静态化文件
*/
public File generateCourseHtml(Long courseId);
/**
* @description 上传课程静态化页面
* @param file 静态化文件
* @return void
*/
public void uploadCourseHtml(Long courseId,File file);2、接口实现
将之前编写的静态化测试代码以及上传静态文件测试代码拷贝过来使用
@Override
public File generateCourseHtml(Long courseId) {
//静态化文件
File htmlFile = null;
try {
//配置freemarker
Configuration configuration = new Configuration(Configuration.getVersion());
//加载模板
//选指定模板路径,classpath下templates下
//得到classpath路径
String classpath = this.getClass().getResource("/").getPath();
configuration.setDirectoryForTemplateLoading(new File(classpath + "/templates/"));
//设置字符编码
configuration.setDefaultEncoding("utf-8");
//指定模板文件名称
Template template = configuration.getTemplate("course_template.ftl");
//准备数据
CoursePreviewDto coursePreviewInfo = this.getCoursePreviewInfo(courseId);
Map<String, Object> map = new HashMap<>();
map.put("model", coursePreviewInfo);
//静态化
//参数1:模板,参数2:数据模型
String content = FreeMarkerTemplateUtils.processTemplateIntoString(template, map);
// System.out.println(content);
//将静态化内容输出到文件中
InputStream inputStream = IOUtils.toInputStream(content);
//创建静态化文件
htmlFile = File.createTempFile("course",".html");
log.debug("课程静态化,生成静态文件:{}",htmlFile.getAbsolutePath());
//输出流
FileOutputStream outputStream = new FileOutputStream(htmlFile);
IOUtils.copy(inputStream, outputStream);
} catch (Exception e) {
log.error("课程静态化异常:{}",e.toString());
XueChengPlusException.cast("课程静态化异常");
}
return htmlFile;
}
@Override
public void uploadCourseHtml(Long courseId, File file) {
MultipartFile multipartFile = MultipartSupportConfig.getMultipartFile(file);
String course = mediaServiceClient.uploadFile(multipartFile, "course/"+courseId+".html");
if(course==null){
XueChengPlusException.cast("上传静态文件异常");
}
}完善课程发布任务CoursePublishTask类的代码:
Java
//生成课程静态化页面并上传至文件系统
public void generateCourseHtml(MqMessage mqMessage,long courseId){
log.debug("开始进行课程静态化,课程id:{}",courseId);
//消息id
Long id = mqMessage.getId();
//消息处理的service
MqMessageService mqMessageService = this.getMqMessageService();
//消息幂等性处理
int stageOne = mqMessageService.getStageOne(id);
if(stageOne == 1){
log.debug("课程静态化已处理直接返回,课程id:{}",courseId);
return ;
}
//生成静态化页面
File file = coursePublishService.generateCourseHtml(courseId);
//上传静态化页面
if(file!=null){
coursePublishService.uploadCourseHtml(courseId,file);
}
//保存第一阶段状态
mqMessageService.completedStageOne(id);
}3.5.4.2 测试
1、启动网关、媒资管理服务工程。
2、在内容管理api工程的启动类上配置FeignClient
@EnableFeignClients(basePackages={"com.xuecheng.content.feignclient"})
在bootstrap.yml引用feign-dev.yaml
- data-id: feign-${spring.profiles.active}.yaml
group: xuecheng-plus-common
refresh: true #profiles默认为dev启动内容管理接口工程。
在CoursePublishTask类的execute方法中打上断点。
3、发布一门课程,保存消息表存在未处理的处理。
4、启动xxl-job调度中心、启动课程发布任务,等待定时调度。

5、观察任务调度日志,观察任务是否可以正常处理。
6、处理完成进入文件系统,查询mediafiles桶内是否存在以课程id命名的html文件

如果不存在说明课程静态化存在问题,再仔细查看执行日志,排查问题。
如果存在则说明课程静态化并上传到minio成功。
3.5.4.3 浏览详细页面
课程静态化成功后可以用浏览器访问html文件是否可以正常浏览,下图表示可以正常浏览。

页面还没有样式,需要在nginx配置虚拟目录,在www.51xuecheng.cn下配置:
location /course/ {
proxy_pass http://fileserver/mediafiles/course/;
}加载nginx配置文件
访问:http://www.51xuecheng.cn/course/2.html
2.html为以课程id命名的html文件。
3.6 课程搜索
3.6.1 需求分析
3.6.1.1 模块介绍
搜索功能是一个系统的重要功能,是信息查询的方式。课程搜索是课程展示的渠道,用户通过课程搜索找到课程信息,进一步去查看课程的详细信息,进行选课、支付、学习。
本项目的课程搜索支持全文检索技术,什么是全文检索?
全文检索是指计算机索引程序通过扫描文章中的每一个词,对每一个词建立一个索引,指明该词在文章中出现的次数和位置,当用户查询时,检索程序就根据事先建立的索引进行查找,并将查找的结果反馈给用户的检索方式。这个过程类似于通过字典中的检索字表查字的过程。
全文检索可以简单理解为通过索引搜索文章。

全文检索的速度非常快,早期应用在搜索引擎技术中,比如:百度、google等,现在通常一些大型网站的搜索功能都是采用全文检索技术。

课程搜索也要将课程信息建立索引,在课程发布时建立课程索引,索引建立好用户可通过搜索网页去查询课程信息。

所以,课程搜索模块包括两部分:课程索引、课程搜索。
课程索引是将课程信息建立索引。
课程搜索是通过前端网页,通过关键字等条件去搜索课程。
3.6.1.2 业务流程
根据模块介绍的内容,课程搜索模块包括课程索引、课程搜索两部分。
1、课程索引
在课程发布操作执行后通过消息处理方式创建课程索引,如下图:

本项目使用elasticsearch作为索引及搜索服务。
2、课程搜索
课程索引创建完成,用户才可以通过前端搜索课程信息。
课程搜索可以从首页进入搜索页面。

下图是搜索界面,可以通过课程分类、课程难度等级等条件进行搜索。

3.6.2 准备环境
3.6.2.1 搭建elasticsearch
在课前下发的虚拟中已经在docker容器中安装了elasticsearch和kibana。
kibana 是 ELK(Elasticsearch , Logstash, Kibana )之一,kibana 一款开源的数据分析和可视化平台,通过可视化界面访问elasticsearch的索引库,并可以生成一个数据报表。
开发中主要使用kibana通过api对elasticsearch进行索引和搜索操作,通过浏览器访问 http://192.168.101.65:5601/app/dev_tools#/console进入kibana的开发工具界面。

可通过命令:GET /_cat/indices?v 查看所有的索引,通过此命令判断kibana是否正常连接elasticsearch。
索引相当于MySQL中的表,Elasticsearch与MySQL之间概念的对应关系见下表:

要使用elasticsearch需要建立索引,Mapping相当于表结构,Mapping创建后其字段不能删除,如果要删除需要删除整个索引,下边介绍创建索引、查询索引、删除索引的方法:
1、创建索引,并指定Mapping。
PUT /course-publish
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"companyId": {
"type": "keyword"
},
"companyName": {
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"type": "text"
},
"name": {
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"type": "text"
},
"users": {
"index": false,
"type": "text"
},
"tags": {
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"type": "text"
},
"mt": {
"type": "keyword"
},
"mtName": {
"type": "keyword"
},
"st": {
"type": "keyword"
},
"stName": {
"type": "keyword"
},
"grade": {
"type": "keyword"
},
"teachmode": {
"type": "keyword"
},
"pic": {
"index": false,
"type": "text"
},
"description": {
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"type": "text"
},
"createDate": {
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
},
"status": {
"type": "keyword"
},
"remark": {
"index": false,
"type": "text"
},
"charge": {
"type": "keyword"
},
"price": {
"type": "scaled_float",
"scaling_factor": 100
},
"originalPrice": {
"type": "scaled_float",
"scaling_factor": 100
},
"validDays": {
"type": "integer"
}
}
}
}2、查询索引
通过 GET /_cat/indices?v 查询所有的索引,查找course-publish是否创建成功。
通过GET /course-publish/_mapping 查询course-publish的索引结构。
{
"course-publish" : {
"mappings" : {
"properties" : {
"charge" : {
"type" : "keyword"
},
"companyId" : {
"type" : "keyword"
},
"companyName" : {
"type" : "text",
"analyzer" : "ik_max_word",
"search_analyzer" : "ik_smart"
},
"createDate" : {
"type" : "date",
"format" : "yyyy-MM-dd HH:mm:ss"
},
"description" : {
"type" : "text",
"analyzer" : "ik_max_word",
"search_analyzer" : "ik_smart"
},
"grade" : {
"type" : "keyword"
},
"id" : {
"type" : "keyword"
},
"mt" : {
"type" : "keyword"
},
"mtName" : {
"type" : "keyword"
},
"name" : {
"type" : "text",
"analyzer" : "ik_max_word",
"search_analyzer" : "ik_smart"
},
"originalPrice" : {
"type" : "scaled_float",
"scaling_factor" : 100.0
},
"pic" : {
"type" : "text",
"index" : false
},
"price" : {
"type" : "scaled_float",
"scaling_factor" : 100.0
},
"remark" : {
"type" : "text",
"index" : false
},
"st" : {
"type" : "keyword"
},
"stName" : {
"type" : "keyword"
},
"status" : {
"type" : "keyword"
},
"tags" : {
"type" : "text",
"analyzer" : "ik_max_word",
"search_analyzer" : "ik_smart"
},
"teachmode" : {
"type" : "keyword"
},
"users" : {
"type" : "text",
"index" : false
},
"validDays" : {
"type" : "integer"
}
}
}
}
}3、删除索引
如果发现创建的course-publish不正确可以删除重新创建。
删除索引后当中的文档数据也同时删除,一定要谨慎操作!
删除索引命令:DELETE /course-publish
3.6.2.2 部署搜索工程
拷贝课程资料中的xuecheng-plus-search搜索工程到自己的工程目录。

修改bootstrap.xml中nacos的namespace为自己的命名空间。
启动网关、搜索服务。
部署完成通过httpclient进行测试
Java
### 添加课程索引
POST {{search_host}}/search/index/course
Content-Type: application/json
{
"charge" : "201000",
"companyId" : 100000,
"companyName" : "北京黑马程序",
"createDate" : "2022-09-25 09:36:11",
"description" : "《Spring编程思想》是2007年6月1日机械工业出版社出版的图书,作者是埃克尔,译者是陈昊鹏。主要内容本书赢得了全球程序员的广泛赞誉,即使是最晦涩的概念,在Bruce Eckel的文字亲和力和小而直接的编程示例面前也会化解于无形。从Java的基础语法到最高级特性(深入的面向对象概念、多线程、自动项目构建、单元测试和调试等),本书都能逐步指导你轻松掌握。从本书获得的各项大奖以及来自世界各地的读者评论中,不难看出这是一本经典之作",
"grade" : "204001",
"id" : 102,
"mt" : "1-3",
"mtName" : "编程开发",
"name" : "Spring编程思想",
"originalPrice" : 200.0,
"pic" : "/mediafiles/2022/09/20/1d0f0e6ed8a0c4a89bfd304b84599d9c.png",
"price" : 100.0,
"remark" : "没有备注",
"st" : "1-3-2",
"stName" : "Java语言",
"status" : "203002",
"tags" : "没有标签",
"teachmode" : "200002",
"validDays" : 222
}### 搜索课程
GET {{search_host}}/search/course/list?pageNo=1&keywords=spring
Content-Type: application/json进入前端搜索界面http://www.51xuecheng.cn/course/search.html

3.6.3 课程信息索引同步
3.6.3.1 技术方案
通过向索引中添加课程信息最终实现了课程的搜索,我们发现课程信息是先保存在关系数据库中,而后再写入索引,这个过程是将关系数据中的数据同步到elasticsearch索引中的过程,可以简单成为索引同步。
通常项目中使用elasticsearch需要完成索引同步,索引同步的方法很多:
1、针对实时性非常高的场景需要满足数据的及时同步,可以同步调用,或使用Canal去实现。
1)同步调用即在向MySQL写数据后远程调用搜索服务的接口写入索引,此方法简单但是耦合代码太高。
2)可以使用一个中间的软件canal解决耦合性的问题,但存在学习与维护成本。
canal主要用途是基于 MySQL 数据库增量日志解析,并能提供增量数据订阅和消费,实现将MySQL的数据同步到消息队列、Elasticsearch、其它数据库等,应用场景十分丰富。

它的地址:
github地址:https://github.com/alibaba/canal
版本下载地址:https://github.com/alibaba/canal/releases
文档地址:https://github.com/alibaba/canal/wiki/Docker-QuickStart
Canal基于mysql的binlog技术实现数据同步,什么是binlog,它是一个文件,二进制格式,记录了对数据库更新的SQL语句,向数据库写数据的同时向binlog文件里记录对应的sql语句。当数据库服务器发生了故障就可以使用binlog文件对数据库进行恢复。
所以,使用canal是需要开启mysql的binlog写入功能,Canal工作原理如下:

1、canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump
协议
2、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
3、canal 解析 binary log 对象(原始为 byte 流)
详细使用Canal进行索引同步的步骤参考:Canal实现索引同步.pdf
2、当索引同步的实时性要求不高时可用的技术比较多,比如:MQ、Logstash、任务调度等。
MQ:向mysql写数据的时候向mq写入消息,搜索服务监听MQ,收到消息后写入索引。使用MQ的优势是代码解耦,但是需要处理消息可靠性的问题有一定的技术成本,做到消息可靠性需要做到生产者投递成功、消息持久化以及消费者消费成功三个方面,另外还要做好消息幂等性问题。
Logstash: 开源实时日志分析平台 ELK包括Elasticsearch、Kibana、Logstash,Logstash负责收集、解析和转换日志信息,可以实现MySQL与Elasticsearch之间的数据同步。也可以实现解耦合并且是官方推荐,但需要增加学习与维护成本。
任务调度:向mysql写数据的时候记录修改记录,开启一个定时任务根据修改记录将数据同步到Elasticsearch。
根据本项目的需求,课程发布后信息同步的实时性要求不高,从提交审核到发布成功一般两个工作日完成。综合比较以上技术方案本项目的索引同步技术使用任务调度的方法。
如下图:

1、课程发布向消息表插入记录。
2、由任务调度程序通过消息处理SDK对消息记录进行处理。
3、向elasticsearch索引中保存课程信息。
如何向向elasticsearch索引中保存课程信息?
执行流程如下:
由内容管理服务远程调用搜索服务添加课程信息索引,搜索服务再请求elasticsearch向课程索引中添加文档。
3.6.3.2 课程索引任务开发
1、拷贝CourseIndex 模型类到内容管理model 工程的dto包下。
2、在内容管理服务中添加FeignClient
/**
* @description 搜索服务远程接口
*/
@FeignClient(value = "search",fallbackFactory = SearchServiceClientFallbackFactory.class)
public interface SearchServiceClient {
@PostMapping("/search/index/course")
public Boolean add(@RequestBody CourseIndex courseIndex);
}定义SearchServiceClientFallbackFactory :
@Slf4j
@Component
public class SearchServiceClientFallbackFactory implements FallbackFactory<SearchServiceClient> {
@Override
public SearchServiceClient create(Throwable throwable) {
return new SearchServiceClient() {
@Override
public Boolean add(CourseIndex courseIndex) {
throwable.printStackTrace();
log.debug("调用搜索发生熔断走降级方法,熔断异常:", throwable.getMessage());
return false;
}
};
}
}3、编写课程索引任务执行方法
完善CoursePublishTask类中的saveCourseIndex方法
//保存课程索引信息
public void saveCourseIndex(MqMessage mqMessage,long courseId){
log.debug("保存课程索引信息,课程id:{}",courseId);
//消息id
Long id = mqMessage.getId();
//消息处理的service
MqMessageService mqMessageService = this.getMqMessageService();
//消息幂等性处理
int stageTwo = mqMessageService.getStageTwo(id);
if(stageTwo > 0){
log.debug("课程索引已处理直接返回,课程id:{}",courseId);
return ;
}
Boolean result = saveCourseIndex(courseId);
if(result){
//保存第一阶段状态
mqMessageService.completedStageTwo(id);
}
}
private Boolean saveCourseIndex(Long courseId) {
//取出课程发布信息
CoursePublish coursePublish = coursePublishMapper.selectById(courseId);
//拷贝至课程索引对象
CourseIndex courseIndex = new CourseIndex();
BeanUtils.copyProperties(coursePublish,courseIndex);
//远程调用搜索服务api添加课程信息到索引
Boolean add = searchServiceClient.add(courseIndex);
if(!add){
XueChengPlusException.cast("添加索引失败");
}
return add;
}3.6.3.3 测试
测试流程如下:
1、启动elasticsearch、kibana。
2、启动网关、内容管理、搜索服务、nginx。
3、启动xxl-job调度中心。
4、在任务调度中心开始课程发布任务。
5、发布一门课程,页面提示操作成功,查看发布课程任务是否写到任务表。
6、经过任务调度将课程信息写入索引。
7、通过门户进入搜索页面,查看课程信息是否展示。
Q.E.D.
