媒资管理模块
模块需求分析
模块介绍
媒资管理系统是每个在线教育平台所必须具备的,查阅百度百科对它的定义如下:
媒体资源管理(Media Asset Management,MAM)系统是建立在多媒体、网络、数据库和数字存储等先进技术基础上的一个对各种媒体及内容(如视/音频资料、文本文件、图表等)进行数字化存储、管理以及应用的总体解决方案,包括数字媒体的采集、编目、管理、传输和编码转换等所有环节。其主要是满足媒体资源拥有者收集、保存、查找、编辑、发布各种信息的要求,为媒体资源的使用者提供访问内容的便捷方法,实现对媒体资源的高效管理,大幅度提高媒体资源的价值。
每个教学机构都可以在媒资系统管理自己的教学资源,包括:视频、教案等文件。
目前媒资管理的主要管理对象是视频、图片、文档等,包括:媒资文件的查询、文件上传、视频处理等。
媒资查询:教学机构查询自己所拥有的媒资信息。
文件上传:包括上传图片、上传文档、上传视频。
视频处理:视频上传成功,系统自动对视频进行编码处理。
文件删除:教学机构删除自己上传的媒资文件。
搭建模块环境
架构的问题分析
当前要开发的是媒资管理服务,目前为止共三个三微服务:内容管理、系统管理、媒资管理,后期还会添加更多的微服务,当前这种由前端直接请求微服务的方式存在弊端:如果在前端对每个请求地址都配置绝对路径,非常不利于系统维护,当系统上线后这里需要改成公网的域名,如果这种地址非常多则非常麻烦。
基于这个问题可以采用网关来解决,这样在前端的代码中只需要指定每个接口的相对路径,在前端代码的一个固定的地方在接口地址前统一加网关的地址,每个请求统一到网关,由网关将请求转发到具体的微服务。
为什么所有的请求先到网关呢?
有了网关就可以对请求进行路由,比如:可以根据请求路径路由、根据host地址路由等,当微服务有多个实例时可以通过负载均衡算法进行路由,另外,网关还可以实现权限控制、限流等功能。
项目采用Spring Cloud Gateway作为网关,网关在请求路由时需要知道每个微服务实例的地址,项目使用Nacos作用服务发现中心和配置中心,流程如下:
微服务启动,将自己注册到Nacos,Nacos记录了各微服务实例的地址。
网关从Nacos读取服务列表,包括服务名称、服务地址等。
请求到达网关,网关将请求路由到具体的微服务。
要使用网关首先搭建Nacos,Nacos有两个作用:
服务发现中心。
- 微服务将自身注册至Nacos,网关从Nacos获取微服务列表。
配置中心。
- 微服务众多,它们的配置信息也非常复杂,为了提供系统的可维护性,微服务的配置信息统一在Nacos配置。
搭建Nacos
这里我踩了很多坑,但是删删改改的很难总结,所以直接将最终版的写法展示出来
服务发现中心
根据上节讲解的网关的架构图,要使用网关首先搭建Nacos。
首先搭建Nacos服务发现中心。
在搭建Nacos服务发现中心之前需要搞清楚两个概念:namespace和group
首先在nacos配置namespace:登录Centos,启动Naocs,使用 sh /data/soft/restart.sh
将自动启动Nacos。
1 2 3 4
| 访问:http://192.168.101.65:8848/nacos/
账号:nacos 密码:nacos
|
首先完成各服务注册到Naocs(导文件,改配置),将资料给的文件 nacos_config_export.zip 导入nacos
下边将服务注册到nacos中。
- 在xuecheng-parent中添加依赖管理
1 2 3 4 5 6 7
| <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>${spring-cloud-alibaba.version}</version> <type>pom</type> <scope>import</scope> </dependency>
|
- 在
content-api
工程、content-service
工程、 system-api
工程以及system-service
工程中添加如下依赖
1 2 3 4 5 6 7 8 9
| <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency>
|
注意是 discovery 和 config !!!
- 在idea中配置nacos的地址
system-service 的配置文件
改名:将 application.yml
改名为 bootstrap.yml
在里面写入以下配置:
注意:profiles是和 cloud 同级的,不要顶格写!!!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| spring: application: name: system-service cloud: nacos: server-addr: nacosip:8848 discovery: namespace: ${spring.profiles.active} group: xuecheng-online-project(自定义分组名称) config: namespace: xcdev group: xuecheng-online-project file-extension: yaml refresh-enabled: true shared-configs: - data-id: logging-${spring.profiles.active}.yaml group: common refresh: true profiles: active: dev
|
对应的是nacos的 system-service-dev.yaml 文件——主要存放的是service层连接数据库的信息
system-api 的配置文件
改名:将 application.yml
改名为 bootstrap.yml
在里面写入以下配置:(注释掉的东西都写在了nacos中)
注意:profiles是和 cloud 同级的,不要顶格写!!!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
|
spring: application: name: system-api
cloud: nacos: server-addr: nacos ip:8848 discovery: namespace: xcdev group: xuecheng-online-project config: namespace: xcdev group: xuecheng-online-project file-extension: yaml refresh-enabled: true extension-configs: - data-id: system-service-${spring.profiles.active}.yaml group: xuecheng-online-project refresh: true shared-configs: - data-id: logging-${spring.profiles.active}.yaml group: common refresh: true profiles: active: dev
|
对应的是nacos的 system-api-dev.yaml 文件看上图——主要存放的是 api层的端口号以及配置消息
在 content-api 的配置文件:
改名:将 application.yml
改名为 bootstrap.yml
在里面写入以下配置:(注释掉的东西都写在了nacos中)
注意:profiles是和 cloud 同级的,不要顶格写!!!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
spring: application: name: content-api
cloud: nacos: server-addr: ip:8848 discovery: namespace: xcdev group: xuecheng-online-project config: namespace: xcdev group: xuecheng-online-project file-extension: yaml refresh-enabled: true extension-configs: - data-id: content-service-${spring.profiles.active}.yaml group: xuecheng-online-project refresh: true shared-configs: - data-id: logging-${spring.profiles.active}.yaml group: common refresh: true profiles: active: dev
|
对应的是nacos的 content-api-dev.yaml 文件看上图——主要存放的是 api层的端口号以及配置消息
content-service 的配置文件
改名:将 application.yml
改名为 bootstrap.yml
在里面写入以下配置:
注意:profiles是和 cloud 同级的,不要顶格写!!!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| spring: application: name: content-service cloud: nacos: server-addr: ip:8848 discovery: namespace: ${spring.profiles.active} group: xuecheng-online-project config: namespace: xcdev group: xuecheng-online-project file-extension: yaml refresh-enabled: true shared-configs: - data-id: logging-${spring.profiles.active}.yaml group: common refresh: true profiles: active: dev
|
对应的是nacos的 content-service-dev.yaml 文件看上图——主要存放的是service层连接数据库的信息
- 重启内容管理服务content、系统管理服务system。
只要能在 “服务列表”看到启动情况:出现了 content-api 和 system-api,则说明nacos配置成功。
启动content-api工程,查询控制台是否打印出了请求nacos的日志,如下:
1
| [NacosRestTemplate.java:476] - HTTP method: POST, url:http://192.168.101.65:8848/nacos/v1/cs/configs/listener
|
并使用Httpclient测试课程查询接口是否可以正常查询。
公用配置
如何在nacos中配置项目的公用配置呢?
nacos提供了 shared-configs 可以引入公用配置。
对于swagger或者 knife4j 这类接口测试文档,可以将配置定义为一个公用配置,哪个项目需要用直接引入即可。(甚至别的项目也可以直接用)
单独在common分组下创建xuecheng的公用配置,进入nacos的开发环境,添加公用配置:
项目使用shared-configs可以引入公用配置。在接口工程的本地配置文件中引入公用配置,如上面的服务发现中心就已经添加了 shared-configs 引入了公用配置
配置完成后可以查看swagger/knife4j接口文档是否可以正常访问,查看控制台log4j2日志输出是否正常。
配置优先级
到目前为止已将所有微服务的配置统一在nacos进行配置,用到的配置文件有本地的配置文件bootstrap.yaml和nacos上的配置文件,引入配置文件的形式有:
通过dataid方式引入
以扩展配置文件方式引入
以共享配置文件 方式引入
那么这几种配置文件方式的优先级是怎么样的呢
- 通过dataid方式引入优先级第一高
- 其次是扩展类配置文件(extension-configs)
- 再者是共享类配置文件(shared-configs)
- 最后本地配置文件优先级最低
如果需要设置本地最优先,要在nacos配置文件中配置代码:
1 2 3 4
| spring: cloud: config: override-none: true
|
搭建Gateway
本项目使用Spring Cloud Gateway作为网关,下边创建网关工程。
新建一个网关工程xuecheng-gateway,工程结构如下:
添加依赖
配置pom文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.xuecheng</groupId> <artifactId>xuecheng-parent</artifactId> <version>0.0.1-SNAPSHOT</version> <relativePath>../xuecheng-parent</relativePath> </parent> <artifactId>xuecheng-gateway</artifactId> <name>xuecheng-gateway</name> <description>xuecheng-gateway</description>
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
</project>
|
配置bootstrap.yaml配置文件
在gateway工程的 resource 中将 application.properties 改成 bootstrap.yml,在里面配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| spring: application: name: gateway cloud: nacos: server-addr: nacoip:8848 discovery: namespace: xcdev group: xuecheng-online-project config: namespace: xcdev group: xuecheng-online-project file-extension: yaml refresh-enabled: true shared-configs: - data-id: logging-${spring.profiles.active}.yaml group: common refresh: true profiles: active: dev
|
注意:profiles是和 cloud 同级的,不要顶格写!!!
在nacos上配置网关路由策略,详细配置导材料就行
接口测试
启动网关工程,通过网关工程访问微服务进行测试。
在http-client-env.json中配置网关的地址,使用httpclient测试课程查询 接口,如下:
1 2 3 4 5 6 7 8
| ### 课程查询列表 POST {{gateway_host}}/content/course/list?pageNo=2&pageSize=1 Content-Type: application/json
{ "auditStatus": "202002", "courseName": "" }
|
运行,对照数据库信息,观察是否可以正常访问接口 ,如下所示可以正常请求接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| JSON http:
HTTP/1.1 200 OK transfer-encoding: chunked Content-Type: application/json Date: Sun, 11 Sep 2022 09:54:32 GMT
{ "items": [ { "id": 26, "companyId": 1232141425, "companyName": null, "name": "spring cloud实战", "users": "所有人", "tags": null, "mt": "1-3", "mtName": null, "st": "1-3-2", "stName": null, "grade": "200003", "teachmode": "201001", "description": "本课程主要从四个章节进行讲解: 1.微服务架构入门 2.spring cloud 基础入门 3.实战Spring Boot 4.注册中心eureka。", "pic": "https://cdn.educba.com/academy/wp-content/uploads/2018/08/Spring-BOOT-Interview-questions.jpg", "createDate": "2019-09-04 09:56:19", "changeDate": "2021-12-26 22:10:38", "createPeople": null, "changePeople": null, "auditStatus": "202002", "status": "203001", "coursePubId": null, "coursePubDate": null } ], "counts": 29, "page": 2, "pageSize": 1 }
|
更改前端工程接口
网关工程搭建完成即可将前端工程中的接口地址改为网关的地址
启动前端工程,测试之前开发内容管理模块的功能。
搭建媒资工程
至此网关、Nacos已经搭建完成,下边将媒资工程导入项目。(建议自己敲熟悉一下)
从课程资料中获取媒资工程 xuecheng-plus-media,拷贝到项目工程根目录。
右键pom.xml转为maven工程。
创建媒资数据库,并导入xcplus_media.sql
启动media-api工程测试运行
分布式文件系统
MinIO介绍
本项目采用MinIO构建分布式文件系统,MinIO是一个非常轻量的服务,可以很简单的和其他应用的结合使用,它兼容亚马逊 S3云存储服务接口,非常适合于存储大容量非结构化的数据,例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等。
它一大特点就是轻量,使用简单,功能强大,支持各种平台,单个文件最大5TB,兼容Amazon S3接口,提供了 Java、Python、GO等多版本SDK支持。
官网:https://min.io
中文:https://www.minio.org.cn/,http://docs.minio.org.cn/docs/
MinIO集群采用去中心化共享架构,每个结点是对等关系,通过Nginx可对MinIO进行负载均衡访问。
去中心化有什么好处?
在大数据领域,通常的设计理念都是无中心和分布式。Minio分布式模式可以帮助你搭建一个高可用的对象存储服务,你可以使用这些存储设备,而不用考虑其真实物理位置。
它将分布在不同服务器上的多块硬盘组成一个对象存储服务。由于硬盘分布在不同的节点上,分布式Minio避免了单点故障。
Minio使用纠删码技术来保护数据,它是一种恢复丢失和损坏数据的数学算法,它将数据分块冗余的分散存储在各各节点的磁盘上,所有的可用磁盘组成一个集合,上图由8块硬盘组成一个集合,当上传一个文件时会通过纠删码算法计算对文件进行分块存储,除了将文件本身分成4个数据块,还会生成4个校验块,数据块和校验块会分散的存储在这8块硬盘上。
使用纠删码的好处是即便丢失一半数量(N/2)的硬盘,仍然可以恢复数据。
MinIO安装
直接cmd运行MinIO.exe就能跑起来,最好看着视频弄,挂在虚拟机或者服务器上要去官网下对应的二进制文件
安装参考文档:安装参考
重定向问题参考:nohup出问题了
MinIO 对象存储参考:MinIO 对象存储
上传文件(图片)测试
MinIO提供多个语言版本SDK的支持,下边找到java版本的文档:
地址:https://docs.min.io/docs/java-client-quickstart-guide.html
最低需求Java 1.8或更高版本:
在media-service工程添加此依赖,依赖如下:(我直接附全部pom配置了)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.xuecheng</groupId> <artifactId>xuecheng-media</artifactId> <version>0.0.1-SNAPSHOT</version> <relativePath/> </parent> <artifactId>xuecheng-media-service</artifactId> <name>xuecheng-media-service</name> <description>xuecheng-media-service</description>
<dependencies> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency> <dependency> <groupId>com.xuecheng</groupId> <artifactId>xuecheng-media-model</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency>
<dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <dependency> <groupId>io.minio</groupId> <artifactId>minio</artifactId> <version>8.4.3</version> </dependency> <dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>okhttp</artifactId> <version>4.8.1</version> </dependency> <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> </dependency>
</dependencies>
</project>
|
MinIO参数说明
需要三个参数才能连接到minio服务。
参数 |
说明 |
Endpoint |
对象存储服务的URL |
Access Key |
Access key就像用户ID,可以唯一标识你的账户。 |
Secret Key |
Secret key是你账户的密码。 |
MinIO功能测试
在media-service中添加测试用例,测试,示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| package com.xuecheng.mediaService;
import io.minio.*; import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest;
import javax.xml.bind.SchemaOutputResolver; import java.io.File; import java.io.FileOutputStream; import java.io.FilterInputStream;
@SpringBootTest class MediaServiceApplicationTests {
@Test void contextLoads() { }
static MinioClient minioClient = MinioClient.builder() .endpoint("http://MinIOIP:9000") .credentials("minioadmin", "minioadmin") .build();
@Test public void upload() { try { UploadObjectArgs uploadObjectArgs = UploadObjectArgs.builder() .bucket("testbucket") .object("rurisa.jpg") .filename("文件上传的本地路径\\rurisa.jpg") .build(); minioClient.uploadObject(uploadObjectArgs); System.out.println("上传成功"); } catch (Exception e) { System.out.println("上传失败"); } }
@Test public void delete() { try { RemoveObjectArgs removeObjectArgs = RemoveObjectArgs.builder() .bucket("testbucket") .object("rurisa.jpg") .build(); minioClient.removeObject(removeObjectArgs); System.out.println("删除成功"); } catch (Exception e) { System.out.println("删除失败"); } }
@Test public void getFile() { GetObjectArgs getObjectArgs = GetObjectArgs.builder() .bucket("testbucket") .object("rurisa.jpg") .build(); try ( FilterInputStream inputStream = minioClient.getObject(getObjectArgs); FileOutputStream fileOutputStream = new FileOutputStream(new File("文件下载的存放路径\\pretty.jpg")); ) { if (inputStream != null) { IOUtils.copy(inputStream, fileOutputStream); } } catch (Exception e) { System.out.println("查询失败"); } } }
|
执行方法,在MinIO页面查看效果,以及在文件下载指定文件夹中查看下载效果
上传图片
需求分析
业务流程
课程图片是宣传课程非常重要的信息,在新增课程界面上传课程图片,也可以修改课程图片。
课程图片上传至分布式文件系统,在课程信息中保存课程图片路径,如下流程:
前端进入上传图片界面
上传图片,请求媒资管理服务。
媒资管理服务将图片文件存储在MinIO。
媒资管理记录文件信息到数据库。
保存课程信息,在内容管理数据库保存图片地址。
媒资管理服务由接口层和业务层共同完成,具体分工如下:
用户上传图片请求至媒资管理的接口层,接口层解析文件信息通过业务层将文件保存至minio及数据库。如下图:
数据模型
涉及到的数据表有:课程信息表courseBase中的图片字段、媒资数据库media的文件表mediaFiles
接口定义
根据需求分析,下边进行接口定义,此接口定义为一个通用的上传文件接口,可以上传图片或其它文件。
请求参数:
1 2 3 4 5
| 请求地址:/media/upload/coursefile
**Content-Type:** multipart/form-data;boundary=\.....
FormData: **filedata=??**
|
响应参数:文件信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| JSON { "id": "a16da7a132559daf9e1193166b3e7f52", "companyId": 1232141425, "companyName": null, "filename": "1.jpg", "fileType": "001001", "tags": "", "bucket": "/testbucket/2022/09/12/a16da7a132559daf9e1193166b3e7f52.jpg", "fileId": "a16da7a132559daf9e1193166b3e7f52", "url": "/testbucket/2022/09/12/a16da7a132559daf9e1193166b3e7f52.jpg", "timelength": null, "username": null, "createDate": "2022-09-12T21:57:18", "changeDate": null, "status": "1", "remark": "", "auditStatus": null, "auditMind": null, "fileSize": 248329 }
|
准备环境
配置bucket
首先在minio配置bucket,bucket名称为:mediafiles,并设置bucket的权限为公开。
在nacos配置中minio的相关信息,进入media-service-dev.yaml,配置信息如下:
1 2 3 4 5 6 7 8
| YAML minio: endpoint: http://minioIp:9000 accessKey: minioadmin secretKey: minioadmin bucket: files: mediafiles videofiles: video
|
编写minio配置类
在media-service工程中创建config文件夹,编写minio的配置类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| package com.xuecheng.mediaService.config;
import io.minio.MinioClient;
import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class MinioConfig { @Value("${minio.endpoint}") private String endpoint; @Value("${minio.accessKey}") private String accessKey; @Value("${minio.secretKey}") private String secretKey;
@Bean public MinioClient minioClient() {
MinioClient minioClient = MinioClient.builder() .endpoint(endpoint) .credentials(accessKey, secretKey) .build(); return minioClient; } }
|
model-生成DTO
在media-model 中创建DTO类
定义媒资文件查询请求模型类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package com.xuecheng.mediaModel.dto;
import lombok.Data; import lombok.ToString;
@Data @ToString public class QueryMediaParamsDto { private String filename;
private String fileType;
private String auditStatus; }
|
定义上传响应模型类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| package com.xuecheng.mediaModel.dto;
import com.xuecheng.mediaModel.po.MediaFiles; import lombok.Data;
@Data public class UploadFileResultDto extends MediaFiles { }
|
定义请求参数类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package com.xuecheng.mediaModel.dto;
import lombok.Data; import lombok.ToString;
@Data @ToString public class UploadFileParamsDto {
private String filename;
private String contentType;
private String fileType;
private Long fileSize;
private String tags;
private String username;
private String remark; }
|
service-接口开发
定义service方法
在mediafilesservice中定义方法:
注意:service和impl中涵盖了day04-06三天的接口方法,可以做到哪了再往上写
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
| package com.xuecheng.mediaService.service;
import com.baomidou.mybatisplus.extension.service.IService; import com.xuecheng.base.model.PageParams; import com.xuecheng.base.model.PageResult; import com.xuecheng.base.model.RestResponse; import com.xuecheng.mediaModel.dto.QueryMediaParamsDto; import com.xuecheng.mediaModel.dto.UploadFileParamsDto; import com.xuecheng.mediaModel.dto.UploadFileResultDto; import com.xuecheng.mediaModel.po.MediaFiles; import org.springframework.transaction.annotation.Transactional;
import java.io.File;
public interface MediaFilesService extends IService<MediaFiles> {
public PageResult<MediaFiles> queryMediaFiles(Long companyId, PageParams pageParams, QueryMediaParamsDto queryMediaParamsDto);
public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, byte[] bytes,String folder,String objectName);
@Transactional public MediaFiles addMediaFilesToDB(Long companyId,String fileMd5,UploadFileParamsDto uploadFileParamsDto,String bucket_files,String objectName);
public void addMediaFilesToMinIO(byte[] bytes, String bucket_files, String objectName, String contentType);
public void addMediaFilesToMinIO(String filePath, String bucket, String objectName);
public File downloadFileFromMinIO(File file, String bucket, String objectName);
public RestResponse<Boolean> checkFile(String fileMd5);
public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex);
public RestResponse uploadChunk(String fileMd5,int chunk,byte[] bytes);
public RestResponse mergechunks(Long companyId,String fileMd5,int chunkTotal,UploadFileParamsDto uploadFileParamsDto);
public MediaFiles getFileById(String id); }
|
实现service方法impl
注意:service和impl中涵盖了三天的接口方法,可以做到哪了再往上写

| package com.xuecheng.mediaService.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.xuecheng.base.exception.XueChengException; import com.xuecheng.base.model.PageParams; import com.xuecheng.base.model.PageResult; import com.xuecheng.base.model.RestResponse; import com.xuecheng.mediaModel.dto.QueryMediaParamsDto; import com.xuecheng.mediaModel.dto.UploadFileParamsDto; import com.xuecheng.mediaModel.dto.UploadFileResultDto; import com.xuecheng.mediaModel.po.MediaFiles; import com.xuecheng.mediaModel.po.MediaProcess; import com.xuecheng.mediaService.mapper.MediaFilesMapper; import com.xuecheng.mediaService.mapper.MediaProcessMapper; import com.xuecheng.mediaService.service.MediaFilesService; import io.minio.GetObjectArgs; import io.minio.MinioClient; import io.minio.PutObjectArgs; import io.minio.UploadObjectArgs; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.compress.utils.IOUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional;
import java.io.*; import java.text.SimpleDateFormat; import java.time.LocalDateTime; import java.util.Date; import java.util.List;
@Slf4j @Service public class MediaFilesServiceImpl extends ServiceImpl<MediaFilesMapper, MediaFiles> implements MediaFilesService {
@Autowired MediaFilesMapper mediaFilesMapper;
@Autowired MediaProcessMapper mediaProcessMapper;
@Autowired MinioClient minioClient;
@Autowired MediaFilesService currentProxy;
@Value("${minio.bucket.files}") private String bucket_files;
@Value("${minio.bucket.videofiles}") private String bucket_videoFiles;
@Override public PageResult<MediaFiles> queryMediaFiles(Long companyId, PageParams pageParams, QueryMediaParamsDto queryMediaParamsDto) { LambdaQueryWrapper<MediaFiles> queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(StringUtils.isNotBlank(queryMediaParamsDto.getFileType()), MediaFiles::getFileType, queryMediaParamsDto.getFileType());
queryWrapper.like(StringUtils.isNotBlank(queryMediaParamsDto.getFilename()), MediaFiles::getFilename, queryMediaParamsDto.getFilename()); Page<MediaFiles> page = new Page<>(pageParams.getPageNo(), pageParams.getPageSize()); Page<MediaFiles> pageResult = mediaFilesMapper.selectPage(page, queryWrapper); List<MediaFiles> list = pageResult.getRecords(); long total = pageResult.getTotal(); PageResult<MediaFiles> mediaListResult = new PageResult<>(list, total, pageParams.getPageNo(), pageParams.getPageSize()); return mediaListResult; }
@Override public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, byte[] bytes, String folder, String objectName) { String fileId = DigestUtils.md5Hex(bytes); String fileMd5 = DigestUtils.md5Hex(bytes);
if (StringUtils.isEmpty(folder)) { folder = getFileFolder(new Date(), true, true, true); } else if (folder.indexOf("/") < 0) { folder = folder + "/"; } String filename = uploadFileParamsDto.getFilename();
if (StringUtils.isEmpty(objectName)) { objectName = fileMd5 + filename.substring(filename.lastIndexOf(".")); }
objectName = folder + objectName; MediaFiles mediaFiles = null;
try { addMediaFilesToMinIO(bytes, bucket_files, objectName, uploadFileParamsDto.getContentType()); mediaFiles = currentProxy.addMediaFilesToDB(companyId, fileId, uploadFileParamsDto, bucket_files, objectName);
UploadFileResultDto uploadFileResultDto = new UploadFileResultDto(); BeanUtils.copyProperties(mediaFiles, uploadFileResultDto); return uploadFileResultDto;
} catch (Exception e) { e.printStackTrace(); log.debug("上传文件失败:{}", e.getMessage()); } return null; }
@Transactional public MediaFiles addMediaFilesToDB(Long companyId, String fileMd5, UploadFileParamsDto uploadFileParamsDto, String bucket_files, String objectName) { MediaFiles mediaFiles = new MediaFiles(); BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles); mediaFiles.setId(fileMd5); mediaFiles.setFileId(fileMd5); mediaFiles.setCompanyId(companyId); mediaFiles.setBucket(bucket_files); mediaFiles.setFilePath(objectName); mediaFiles.setUrl("/" + bucket_files + "/" + objectName); mediaFiles.setCreateDate(LocalDateTime.now()); mediaFiles.setStatus("1"); mediaFiles.setAuditStatus("002004");
int insert = mediaFilesMapper.insert(mediaFiles); if (insert < 0) { XueChengException.cast("保存文件信息失败"); } String extension = objectName.substring(objectName.lastIndexOf(".")); if (extension.equalsIgnoreCase(".avi")) { MediaProcess mediaProcess = new MediaProcess(); BeanUtils.copyProperties(mediaFiles, mediaProcess); mediaProcessMapper.insert(mediaProcess); mediaFiles.setUrl(null); mediaFilesMapper.updateById(mediaFiles); } return mediaFiles; }
public void addMediaFilesToMinIO(byte[] bytes, String bucket_files, String objectName, String contentType) { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); try { PutObjectArgs putObjectArgs = PutObjectArgs.builder() .bucket(bucket_files) .object(objectName) .stream(byteArrayInputStream, byteArrayInputStream.available(), -1) .contentType(contentType) .build(); minioClient.putObject(putObjectArgs); } catch (Exception e) { e.printStackTrace(); XueChengException.cast("上传文件到文件系统出错"); } }
@Override public RestResponse<Boolean> checkFile(String fileMd5) { MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5); if (mediaFiles != null) { String bucket = mediaFiles.getBucket(); String filePath = mediaFiles.getFilePath(); InputStream stream = null; try { stream = minioClient.getObject( GetObjectArgs.builder() .bucket(bucket) .object(filePath) .build());
if (stream != null) { return RestResponse.success(true); } } catch (Exception e) { } } return RestResponse.success(false); }
@Override public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex) { String chunkFileFolderPath = getChunkFileFolderPath(fileMd5); String chunkFilePath = chunkFileFolderPath + chunkIndex;
InputStream fileInputStream = null; try { fileInputStream = minioClient.getObject( GetObjectArgs.builder() .bucket(bucket_videoFiles) .object(chunkFilePath) .build());
if (fileInputStream != null) { return RestResponse.success(true); } } catch (Exception e) {
} return RestResponse.success(false); }
private String getChunkFileFolderPath(String fileMd5) { return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/"; }
@Override public RestResponse uploadChunk(String fileMd5, int chunk, byte[] bytes) { String chunkFileFolderPath = getChunkFileFolderPath(fileMd5); String chunkFilePath = chunkFileFolderPath + chunk;
try { addMediaFilesToMinIO(bytes, bucket_videoFiles, chunkFilePath, "application/octet-stream");
} catch (Exception ex) { ex.printStackTrace(); XueChengException.cast("上传过程出错请重试"); } return RestResponse.success(); }
private File[] checkChunkStatus(String fileMd5, int chunkTotal) { String chunkFileFolderPath = getChunkFileFolderPath(fileMd5); File[] files = new File[chunkTotal]; for (int i = 0; i < chunkTotal; i++) { String chunkFilePath = chunkFileFolderPath + i; File chunkFile = null; try { chunkFile = File.createTempFile("chunk" + i, null); } catch (IOException e) { e.printStackTrace(); XueChengException.cast("下载分块时创建临时文件出错"); } downloadFileFromMinIO(chunkFile, bucket_videoFiles, chunkFilePath); files[i] = chunkFile; } return files; }
public File downloadFileFromMinIO(File file, String bucket, String objectName) { InputStream fileInputStream = null; OutputStream fileOutputStream = null; try { fileInputStream = minioClient.getObject( GetObjectArgs.builder() .bucket(bucket) .object(objectName) .build()); try { fileOutputStream = new FileOutputStream(file); IOUtils.copy(fileInputStream, fileOutputStream);
} catch (IOException e) { XueChengException.cast("下载文件" + objectName + "出错"); } } catch (Exception e) { e.printStackTrace(); XueChengException.cast("文件不存在" + objectName); } finally { if (fileInputStream != null) { try { fileInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (fileOutputStream != null) { try { fileOutputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } return file; }
@Override public RestResponse mergechunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) { String fileName = uploadFileParamsDto.getFilename(); File[] chunkFiles = checkChunkStatus(fileMd5, chunkTotal); String extName = fileName.substring(fileName.lastIndexOf(".")); File mergeFile = null; try { mergeFile = File.createTempFile(fileMd5, extName); } catch (IOException e) { XueChengException.cast("合并文件过程中创建临时文件出错"); }
try { byte[] b = new byte[1024]; try (RandomAccessFile raf_write = new RandomAccessFile(mergeFile, "rw");) { for (File chunkFile : chunkFiles) { try (FileInputStream chunkFileStream = new FileInputStream(chunkFile);) { int len = -1; while ((len = chunkFileStream.read(b)) != -1) { raf_write.write(b, 0, len); } } } } catch (IOException e) { e.printStackTrace(); XueChengException.cast("合并文件过程中出错"); } log.debug("合并文件完成{}", mergeFile.getAbsolutePath()); uploadFileParamsDto.setFileSize(mergeFile.length());
try (InputStream mergeFileInputStream = new FileInputStream(mergeFile);) { String newFileMd5 = DigestUtils.md5Hex(mergeFileInputStream); if (!fileMd5.equalsIgnoreCase(newFileMd5)) { XueChengException.cast("合并文件校验失败"); } log.debug("合并文件校验通过{}", mergeFile.getAbsolutePath()); } catch (Exception e) { e.printStackTrace(); XueChengException.cast("合并文件校验异常"); }
String mergeFilePath = getFilePathByMd5(fileMd5, extName); try {
addMediaFilesToMinIO(mergeFile.getAbsolutePath(), bucket_videoFiles, mergeFilePath); log.debug("合并文件上传MinIO完成{}", mergeFile.getAbsolutePath()); } catch (Exception e) { e.printStackTrace(); XueChengException.cast("合并文件时上传文件出错"); }
MediaFiles mediaFiles = addMediaFilesToDB(companyId, fileMd5, uploadFileParamsDto, bucket_videoFiles, mergeFilePath); if (mediaFiles == null) { XueChengException.cast("媒资文件入库出错"); }
return RestResponse.success(); } finally { for (File file : chunkFiles) { try { file.delete(); } catch (Exception e) {
} } try { mergeFile.delete(); } catch (Exception e) {
} } }
private String getFilePathByMd5(String fileMd5, String fileExt) { return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + fileExt; }
public void addMediaFilesToMinIO(String filePath, String bucket, String objectName) { try { minioClient.uploadObject( UploadObjectArgs.builder() .bucket(bucket) .object(objectName) .filename(filePath) .build()); } catch (Exception e) { e.printStackTrace(); XueChengException.cast("上传文件到文件系统出错"); } }
@Override public MediaFiles getFileById(String id) { return mediaFilesMapper.selectById(id); }
private String getFileFolder(Date date, boolean year, boolean month, boolean day) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); String dateString = sdf.format(new Date()); String[] dateStringArray = dateString.split("-"); StringBuffer folderString = new StringBuffer(); if (year) { folderString.append(dateStringArray[0]); folderString.append("/"); } if (month) { folderString.append(dateStringArray[1]); folderString.append("/"); } if (day) { folderString.append(dateStringArray[2]); folderString.append("/"); } return folderString.toString(); } }
|
api-接口定义
在 media-api 中创建 mediaFilesController,实现以下接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| package com.xuecheng.mediaApi.api;
import com.xuecheng.base.exception.XueChengException; import com.xuecheng.base.model.PageParams; import com.xuecheng.base.model.PageResult; import com.xuecheng.mediaModel.dto.QueryMediaParamsDto; import com.xuecheng.mediaModel.dto.UploadFileParamsDto; import com.xuecheng.mediaModel.dto.UploadFileResultDto; import com.xuecheng.mediaModel.po.MediaFiles; import com.xuecheng.mediaService.service.MediaFilesService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.*; import org.springframework.web.multipart.MultipartFile;
@RestController public class MediaFilesController {
@Autowired MediaFilesService mediaFilesService;
@PostMapping("/files") public PageResult<MediaFiles> list(PageParams pageParams, @RequestBody QueryMediaParamsDto queryMediaParamsDto) { Long companyId = 1232141425L; return mediaFilesService.queryMediaFiles(companyId, pageParams, queryMediaParamsDto);
}
@RequestMapping(value = "/upload/coursefile", consumes = {MediaType.MULTIPART_FORM_DATA_VALUE}) public UploadFileResultDto upload( @RequestPart("filedata") MultipartFile filedata, @RequestParam(value = "folder", required = false) String folder, @RequestParam(value = "objectName", required = false) String objectName ) { Long companyId = 1232141425L; UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto(); String contentType = filedata.getContentType(); uploadFileParamsDto.setContentType(contentType); uploadFileParamsDto.setFileSize(filedata.getSize()); if (contentType.indexOf("image") >= 0) { uploadFileParamsDto.setFileType("001001"); } else { uploadFileParamsDto.setFileType("001003"); } uploadFileParamsDto.setFilename(filedata.getOriginalFilename()); UploadFileResultDto uploadFileResultDto = null; try { uploadFileResultDto = mediaFilesService.uploadFile(companyId, uploadFileParamsDto, filedata.getBytes(), folder, objectName); } catch (Exception e) { XueChengException.cast("上传文件过程中出错"); }
return uploadFileResultDto;
}
}
|
接口测试
httpClient测试
1 2 3 4 5 6 7 8 9
| ### 上传文件 POST {{media_host}}/media/upload/coursefile Content-Type: multipart/form-data; boundary=WebAppBoundary
--WebAppBoundary Content-Disposition: form-data; name="filedata"; filename="文件名称" Content-Type: application/octet-stream
< 文件路径/rurisa.jpg
|
前后端联调测试
在新增课程、编辑课程界面上传图,保存课程信息后再次进入编辑课程界面,查看是否可以正常保存课程图片信息。
上图图片完成后,进入媒资管理,查看文件列表中是否有刚刚上传的图片信息。
bug修复
上文代码已经修复,主要是对 queryMediaFiles 查询media 信息的方法添加 querywrapper自定义查询,可自行上翻查阅
上传视频
需求分析
教学机构人员进入媒资管理列表查询自己上传的媒资文件。
教育机构用户在"媒资管理"页面中点击 "上传视频" 按钮。
选择要上传的文件,自动执行文件上传。
视频上传成功会自动处理,处理完成可以预览视频。
断点续传流程
什么是断点续传
通常视频文件都比较大,所以对于媒资系统上传文件的需求要满足大文件的上传要求。http协议本身对上传文件大小没有限制,但是客户的网络环境质量、电脑硬件环境等参差不齐,如果一个大文件快上传完了网断了没有上传完成,需要客户重新上传,用户体验非常差,所以对于大文件上传的要求最基本的是断点续传。
引用百度百科:断点续传指的是在下载或上传时,将下载或上传任务(一个文件或一个压缩包)人为的划分为几个部分,每一个部分采用一个线程进行上传或下载,如果碰到网络故障,可以从已经上传或下载的部分开始继续上传下载未完成的部分,而没有必要从头开始上传下载,断点续传可以提高节省操作时间,提高用户体验性。
流程如下:
前端上传前先把文件分成块
一块一块的上传,上传中断后重新上传,已上传的分块则不用再上传
各分块上传完成最后在服务端合并文件
分块原理
为了更好的理解文件分块上传的原理,下边用java代码测试文件的分块与合并。文件分块的流程如下:
获取源文件长度
根据设定的分块文件的大小计算出块数
从源文件读数据依次向每一个块文件写数据。
合并原理
文件合并流程:
找到要合并的文件并按文件合并的先后进行排序。
创建合并文件
依次从合并的文件中读取数据向合并文件写入数
上传视频流程
下图是上传视频的整体流程:
- 前端上传文件前请求媒资接口层检查文件是否存在,如果已经存在则不再上传。
- 如果文件在系统不存在前端开始上传,首先对视频文件进行分块
- 前端分块进行上传,上传前首先检查分块是否上传,如已上传则不再上传,如果未上传则开始上传分块。
- 前端请求媒资管理接口层请求上传分块。
- 接口层请求服务层上传分块。
- 服务端将分块信息上传到MinIO。
- 前端将分块上传完毕请求接口层合并分块。
- 接口层请求服务层合并分块。
- 服务层根据文件信息找到MinIO中的分块文件,下载到本地临时目录,将所有分块下载完毕后开始合并。
- 合并完成将合并后的文件上传到MinIO。
准备环境
在 base 工程的 model 中创建 RestResponse 通用结果模型类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| package com.xuecheng.base.model;
import lombok.Data; import lombok.ToString;
@Data @ToString public class RestResponse<T> {
private int code;
private String msg;
private T result;
public RestResponse() { this(0, "success"); }
public RestResponse(int code, String msg) { this.code = code; this.msg = msg; }
public static <T> RestResponse<T> validfail(String msg) { RestResponse<T> response = new RestResponse<T>(); response.setCode(-1); response.setMsg(msg); return response; }
public static <T> RestResponse<T> success(T result) { RestResponse<T> response = new RestResponse<T>(); response.setResult(result); return response; }
public static <T> RestResponse<T> success() { return new RestResponse<T>(); }
public Boolean isSuccessful() { return this.code == 0; } }
|
model-生成DTO
无新添加的配置,略
service-接口开发
上传视频主要实现四个接口:检查文件是否存在、检查分块是否存在、上传分块 以及 合并分块
定义service方法
在mediafilesservice中定义接口方法:
如果是导入我上传图片时的service,应该已经写好了,这里也是偷懒直接将整个service复制过来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
| package com.xuecheng.mediaService.service;
import com.baomidou.mybatisplus.extension.service.IService; import com.xuecheng.base.model.PageParams; import com.xuecheng.base.model.PageResult; import com.xuecheng.base.model.RestResponse; import com.xuecheng.mediaModel.dto.QueryMediaParamsDto; import com.xuecheng.mediaModel.dto.UploadFileParamsDto; import com.xuecheng.mediaModel.dto.UploadFileResultDto; import com.xuecheng.mediaModel.po.MediaFiles; import org.springframework.transaction.annotation.Transactional;
import java.io.File;
public interface MediaFilesService extends IService<MediaFiles> {
public PageResult<MediaFiles> queryMediaFiles(Long companyId, PageParams pageParams, QueryMediaParamsDto queryMediaParamsDto);
public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, byte[] bytes,String folder,String objectName);
@Transactional public MediaFiles addMediaFilesToDB(Long companyId,String fileMd5,UploadFileParamsDto uploadFileParamsDto,String bucket_files,String objectName);
public void addMediaFilesToMinIO(byte[] bytes, String bucket_files, String objectName, String contentType);
public void addMediaFilesToMinIO(String filePath, String bucket, String objectName);
public File downloadFileFromMinIO(File file, String bucket, String objectName);
public RestResponse<Boolean> checkFile(String fileMd5);
public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex);
public RestResponse uploadChunk(String fileMd5,int chunk,byte[] bytes);
public RestResponse mergechunks(Long companyId,String fileMd5,int chunkTotal,UploadFileParamsDto uploadFileParamsDto);
public MediaFiles getFileById(String id); }
|
实现service方法impl
同复制粘贴上传图片的 mediafilesserviceimpl

| package com.xuecheng.mediaService.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.xuecheng.base.exception.XueChengException; import com.xuecheng.base.model.PageParams; import com.xuecheng.base.model.PageResult; import com.xuecheng.base.model.RestResponse; import com.xuecheng.mediaModel.dto.QueryMediaParamsDto; import com.xuecheng.mediaModel.dto.UploadFileParamsDto; import com.xuecheng.mediaModel.dto.UploadFileResultDto; import com.xuecheng.mediaModel.po.MediaFiles; import com.xuecheng.mediaModel.po.MediaProcess; import com.xuecheng.mediaService.mapper.MediaFilesMapper; import com.xuecheng.mediaService.mapper.MediaProcessMapper; import com.xuecheng.mediaService.service.MediaFilesService; import io.minio.GetObjectArgs; import io.minio.MinioClient; import io.minio.PutObjectArgs; import io.minio.UploadObjectArgs; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.compress.utils.IOUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional;
import java.io.*; import java.text.SimpleDateFormat; import java.time.LocalDateTime; import java.util.Date; import java.util.List;
@Slf4j @Service public class MediaFilesServiceImpl extends ServiceImpl<MediaFilesMapper, MediaFiles> implements MediaFilesService {
@Autowired MediaFilesMapper mediaFilesMapper;
@Autowired MediaProcessMapper mediaProcessMapper;
@Autowired MinioClient minioClient;
@Autowired MediaFilesService currentProxy;
@Value("${minio.bucket.files}") private String bucket_files;
@Value("${minio.bucket.videofiles}") private String bucket_videoFiles;
@Override public PageResult<MediaFiles> queryMediaFiles(Long companyId, PageParams pageParams, QueryMediaParamsDto queryMediaParamsDto) { LambdaQueryWrapper<MediaFiles> queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(StringUtils.isNotBlank(queryMediaParamsDto.getFileType()), MediaFiles::getFileType, queryMediaParamsDto.getFileType());
queryWrapper.like(StringUtils.isNotBlank(queryMediaParamsDto.getFilename()), MediaFiles::getFilename, queryMediaParamsDto.getFilename()); Page<MediaFiles> page = new Page<>(pageParams.getPageNo(), pageParams.getPageSize()); Page<MediaFiles> pageResult = mediaFilesMapper.selectPage(page, queryWrapper); List<MediaFiles> list = pageResult.getRecords(); long total = pageResult.getTotal(); PageResult<MediaFiles> mediaListResult = new PageResult<>(list, total, pageParams.getPageNo(), pageParams.getPageSize()); return mediaListResult; }
@Override public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, byte[] bytes, String folder, String objectName) { String fileId = DigestUtils.md5Hex(bytes); String fileMd5 = DigestUtils.md5Hex(bytes);
if (StringUtils.isEmpty(folder)) { folder = getFileFolder(new Date(), true, true, true); } else if (folder.indexOf("/") < 0) { folder = folder + "/"; } String filename = uploadFileParamsDto.getFilename();
if (StringUtils.isEmpty(objectName)) { objectName = fileMd5 + filename.substring(filename.lastIndexOf(".")); }
objectName = folder + objectName; MediaFiles mediaFiles = null;
try { addMediaFilesToMinIO(bytes, bucket_files, objectName, uploadFileParamsDto.getContentType()); mediaFiles = currentProxy.addMediaFilesToDB(companyId, fileId, uploadFileParamsDto, bucket_files, objectName);
UploadFileResultDto uploadFileResultDto = new UploadFileResultDto(); BeanUtils.copyProperties(mediaFiles, uploadFileResultDto); return uploadFileResultDto;
} catch (Exception e) { e.printStackTrace(); log.debug("上传文件失败:{}", e.getMessage()); } return null; }
@Transactional public MediaFiles addMediaFilesToDB(Long companyId, String fileMd5, UploadFileParamsDto uploadFileParamsDto, String bucket_files, String objectName) { MediaFiles mediaFiles = new MediaFiles(); BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles); mediaFiles.setId(fileMd5); mediaFiles.setFileId(fileMd5); mediaFiles.setCompanyId(companyId); mediaFiles.setBucket(bucket_files); mediaFiles.setFilePath(objectName); mediaFiles.setUrl("/" + bucket_files + "/" + objectName); mediaFiles.setCreateDate(LocalDateTime.now()); mediaFiles.setStatus("1"); mediaFiles.setAuditStatus("002004");
int insert = mediaFilesMapper.insert(mediaFiles); if (insert < 0) { XueChengException.cast("保存文件信息失败"); } String extension = objectName.substring(objectName.lastIndexOf(".")); if (extension.equalsIgnoreCase(".avi")) { MediaProcess mediaProcess = new MediaProcess(); BeanUtils.copyProperties(mediaFiles, mediaProcess); mediaProcessMapper.insert(mediaProcess); mediaFiles.setUrl(null); mediaFilesMapper.updateById(mediaFiles); } return mediaFiles; }
public void addMediaFilesToMinIO(byte[] bytes, String bucket_files, String objectName, String contentType) { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); try { PutObjectArgs putObjectArgs = PutObjectArgs.builder() .bucket(bucket_files) .object(objectName) .stream(byteArrayInputStream, byteArrayInputStream.available(), -1) .contentType(contentType) .build(); minioClient.putObject(putObjectArgs); } catch (Exception e) { e.printStackTrace(); XueChengException.cast("上传文件到文件系统出错"); } }
@Override public RestResponse<Boolean> checkFile(String fileMd5) { MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5); if (mediaFiles != null) { String bucket = mediaFiles.getBucket(); String filePath = mediaFiles.getFilePath(); InputStream stream = null; try { stream = minioClient.getObject( GetObjectArgs.builder() .bucket(bucket) .object(filePath) .build());
if (stream != null) { return RestResponse.success(true); } } catch (Exception e) { } } return RestResponse.success(false); }
@Override public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex) { String chunkFileFolderPath = getChunkFileFolderPath(fileMd5); String chunkFilePath = chunkFileFolderPath + chunkIndex;
InputStream fileInputStream = null; try { fileInputStream = minioClient.getObject( GetObjectArgs.builder() .bucket(bucket_videoFiles) .object(chunkFilePath) .build());
if (fileInputStream != null) { return RestResponse.success(true); } } catch (Exception e) {
} return RestResponse.success(false); }
private String getChunkFileFolderPath(String fileMd5) { return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/"; }
@Override public RestResponse uploadChunk(String fileMd5, int chunk, byte[] bytes) { String chunkFileFolderPath = getChunkFileFolderPath(fileMd5); String chunkFilePath = chunkFileFolderPath + chunk;
try { addMediaFilesToMinIO(bytes, bucket_videoFiles, chunkFilePath, "application/octet-stream");
} catch (Exception ex) { ex.printStackTrace(); XueChengException.cast("上传过程出错请重试"); } return RestResponse.success(); }
private File[] checkChunkStatus(String fileMd5, int chunkTotal) { String chunkFileFolderPath = getChunkFileFolderPath(fileMd5); File[] files = new File[chunkTotal]; for (int i = 0; i < chunkTotal; i++) { String chunkFilePath = chunkFileFolderPath + i; File chunkFile = null; try { chunkFile = File.createTempFile("chunk" + i, null); } catch (IOException e) { e.printStackTrace(); XueChengException.cast("下载分块时创建临时文件出错"); } downloadFileFromMinIO(chunkFile, bucket_videoFiles, chunkFilePath); files[i] = chunkFile; } return files; }
public File downloadFileFromMinIO(File file, String bucket, String objectName) { InputStream fileInputStream = null; OutputStream fileOutputStream = null; try { fileInputStream = minioClient.getObject( GetObjectArgs.builder() .bucket(bucket) .object(objectName) .build()); try { fileOutputStream = new FileOutputStream(file); IOUtils.copy(fileInputStream, fileOutputStream);
} catch (IOException e) { XueChengException.cast("下载文件" + objectName + "出错"); } } catch (Exception e) { e.printStackTrace(); XueChengException.cast("文件不存在" + objectName); } finally { if (fileInputStream != null) { try { fileInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (fileOutputStream != null) { try { fileOutputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } return file; }
@Override public RestResponse mergechunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) { String fileName = uploadFileParamsDto.getFilename(); File[] chunkFiles = checkChunkStatus(fileMd5, chunkTotal); String extName = fileName.substring(fileName.lastIndexOf(".")); File mergeFile = null; try { mergeFile = File.createTempFile(fileMd5, extName); } catch (IOException e) { XueChengException.cast("合并文件过程中创建临时文件出错"); }
try { byte[] b = new byte[1024]; try (RandomAccessFile raf_write = new RandomAccessFile(mergeFile, "rw");) { for (File chunkFile : chunkFiles) { try (FileInputStream chunkFileStream = new FileInputStream(chunkFile);) { int len = -1; while ((len = chunkFileStream.read(b)) != -1) { raf_write.write(b, 0, len); } } } } catch (IOException e) { e.printStackTrace(); XueChengException.cast("合并文件过程中出错"); } log.debug("合并文件完成{}", mergeFile.getAbsolutePath()); uploadFileParamsDto.setFileSize(mergeFile.length());
try (InputStream mergeFileInputStream = new FileInputStream(mergeFile);) { String newFileMd5 = DigestUtils.md5Hex(mergeFileInputStream); if (!fileMd5.equalsIgnoreCase(newFileMd5)) { XueChengException.cast("合并文件校验失败"); } log.debug("合并文件校验通过{}", mergeFile.getAbsolutePath()); } catch (Exception e) { e.printStackTrace(); XueChengException.cast("合并文件校验异常"); }
String mergeFilePath = getFilePathByMd5(fileMd5, extName); try {
addMediaFilesToMinIO(mergeFile.getAbsolutePath(), bucket_videoFiles, mergeFilePath); log.debug("合并文件上传MinIO完成{}", mergeFile.getAbsolutePath()); } catch (Exception e) { e.printStackTrace(); XueChengException.cast("合并文件时上传文件出错"); }
MediaFiles mediaFiles = addMediaFilesToDB(companyId, fileMd5, uploadFileParamsDto, bucket_videoFiles, mergeFilePath); if (mediaFiles == null) { XueChengException.cast("媒资文件入库出错"); }
return RestResponse.success(); } finally { for (File file : chunkFiles) { try { file.delete(); } catch (Exception e) {
} } try { mergeFile.delete(); } catch (Exception e) {
} } }
private String getFilePathByMd5(String fileMd5, String fileExt) { return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + fileExt; }
public void addMediaFilesToMinIO(String filePath, String bucket, String objectName) { try { minioClient.uploadObject( UploadObjectArgs.builder() .bucket(bucket) .object(objectName) .filename(filePath) .build()); } catch (Exception e) { e.printStackTrace(); XueChengException.cast("上传文件到文件系统出错"); } }
@Override public MediaFiles getFileById(String id) { return mediaFilesMapper.selectById(id); }
private String getFileFolder(Date date, boolean year, boolean month, boolean day) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); String dateString = sdf.format(new Date()); String[] dateStringArray = dateString.split("-"); StringBuffer folderString = new StringBuffer(); if (year) { folderString.append(dateStringArray[0]); folderString.append("/"); } if (month) { folderString.append(dateStringArray[1]); folderString.append("/"); } if (day) { folderString.append(dateStringArray[2]); folderString.append("/"); } return folderString.toString(); } }
|
api-接口定义
根据上传视频流程,在media-api中新创建controller,定义如下接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
| package com.xuecheng.mediaApi.api;
import com.j256.simplemagic.ContentInfo; import com.j256.simplemagic.ContentInfoUtil; import com.xuecheng.base.exception.XueChengException; import com.xuecheng.base.model.RestResponse; import com.xuecheng.mediaModel.dto.UploadFileParamsDto; import com.xuecheng.mediaModel.po.MediaFiles; import com.xuecheng.mediaService.service.MediaFilesService; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import org.springframework.web.multipart.MultipartFile;
@RestController public class BigFilesController { @Autowired MediaFilesService mediaFilesService;
@PostMapping("/upload/checkfile") public RestResponse<Boolean> checkfile( @RequestParam("fileMd5") String fileMd5 ) throws Exception { return mediaFilesService.checkFile(fileMd5); }
@PostMapping("/upload/checkchunk") public RestResponse<Boolean> checkchunk( @RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk ) throws Exception { return mediaFilesService.checkChunk(fileMd5, chunk); }
@PostMapping("/upload/uploadchunk") public RestResponse uploadchunk( @RequestParam("file") MultipartFile file, @RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk ) throws Exception { return mediaFilesService.uploadChunk(fileMd5,chunk,file.getBytes()); }
@PostMapping("/upload/mergechunks") public RestResponse mergechunks( @RequestParam("fileMd5") String fileMd5, @RequestParam("fileName") String fileName, @RequestParam("chunkTotal") int chunkTotal ) throws Exception { Long companyId = 1232141425L;
UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto(); uploadFileParamsDto.setFileType("001002"); uploadFileParamsDto.setTags("课程视频"); uploadFileParamsDto.setRemark(""); uploadFileParamsDto.setFilename(fileName); ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(fileName); String mimeType = extensionMatch.getMimeType(); uploadFileParamsDto.setContentType(mimeType);
return mediaFilesService.mergechunks(companyId, fileMd5, chunkTotal, uploadFileParamsDto); }
@GetMapping("/preview/{mediaId}") public RestResponse<String> getPlayUrlByMediaId(@PathVariable String mediaId){ MediaFiles mediaFiles = mediaFilesService.getFileById(mediaId); if (mediaFiles == null || StringUtils.isEmpty(mediaFiles.getUrl())){ XueChengException.cast("视频并未进行转码处理"); } return RestResponse.success(mediaFiles.getUrl()); }
}
|
接口测试
如果是单个接口测试使用httpclient
建议直接前后端联调,更好看到结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| Java ### 检查文件 POST{{media_host}}/media/upload/register Content-Type: application/x-www-form-urlencoded;
fileMd5=查表找filemd5
### 上传分块前检查 POST {{media_host}}/media/upload/checkchunk Content-Type: application/x-www-form-urlencoded;
fileMd5=c5c75d70f382e6016d2f506d134eee11&chunk=0
### 上传分块文件 POST {{media_host}}/media/upload/uploadchunk?fileMd5=c5c75d70f382e6016d2f506d134eee11&chunk=1 Content-Type: multipart/form-data; boundary=WebAppBoundary
--WebAppBoundary Content-Disposition: form-data; name="file"; filename="1" Content-Type: application/octet-stream
< E:/ffmpeg_test/chunks/1
### 合并文件 POST {{media_host}}/media/upload/mergechunks Content-Type: application/x-www-form-urlencoded;
fileMd5=dcb37b85c9c03fc5243e20ab4dfbc1c8&fileName=8.avi&chunkTotal=1
|
下边介绍采用前后联调:
- 首先在每个接口层方法上打开断点
在前端上传视频,观察接口层是否收到参数。
进入service方法逐行跟踪。
断点续传测试
上传一部分后,停止刷新浏览器再重新上传,通过浏览器日志发现已经上传过的分块不再重新上传
文件预览
需求分析
图片上传成功、视频上传成功可以通过预览功能查看文件的内容。
预览的方式是通过浏览器直接打文件,对于图片和浏览器支持的视频格式的视频文件可以直接预览。
业务流程如下:
前端请求接口层预览文件
接口层将文件id传递给服务层
服务层使用文件id查询媒资数据库文件表,获取文件的url
接口层将文件url返回给前端,通过浏览器打开URL。
接口定义
根据需求分析定义接口如下:
1 2 3 4 5 6
| Java @ApiOperation("预览文件") @GetMapping("/preview/{mediaId}") public RestResponse<String> getPlayUrlByMediaId(@PathVariable String mediaId){
}
|
model-生成DTO
无新增配置,略
service-接口开发
定义service方法
还是在media-service中创建接口
1 2 3 4 5 6 7 8
|
public MediaFiles getFileById(String id);
|
实现service方法impl
1 2 3 4
| Java public MediaFiles getFileById(String id) { return mediaFilesMapper.selectById(id); }
|
api-接口定义:
在 bigFilescontroller中完善:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
@GetMapping("/preview/{mediaId}") public RestResponse<String> getPlayUrlByMediaId(@PathVariable String mediaId){ MediaFiles mediaFiles = mediaFilesService.getFileById(mediaId); if (mediaFiles == null || StringUtils.isEmpty(mediaFiles.getUrl())){ XueChengException.cast("视频并未进行转码处理"); } return RestResponse.success(mediaFiles.getUrl()); }
|
接口测试
使用前后端联调
上传mp4视频文件,预览文件。
上传图片文件,预览文件。
对于无法预览的视频文件,稍后通过视频处理对视频转码。
视频处理
FFmpeg 的基本使用
我们将视频录制完成后,使用视频编码软件对视频进行编码,本项目使用FFmpeg对视频进行编码 。
FFmpeg被许多开源项目采用,QQ影音、暴风影音、VLC等。
下载:FFmpeg https://www.ffmpeg.org/download.html#build-windows
请从课程资料目录解压ffmpeg.zip,并将解压得到的exe文件加入环境变量。
测试是否正常:cmd运行 ffmpeg -v
安装成功,作下简单测试:将一个.avi文件转成mp4、mp3、gif等。
比如我们将nacos.avi文件转成mp4,运行如下命令:
ffmpeg -i nacos.avi nacos.mp4
转成mp3:ffmpeg -i nacos.avi nacos.mp3
转成gif:ffmpeg -i nacos.avi nacos.gif
官方文档(英文):http://ffmpeg.org/ffmpeg.html
分布式任务处理概念
什么是分布式任务调度
如何去高效处理一批任务呢?
- 多线程
多线程是充分利用单机的资源。
- 分布式加多线程
充分利用我台计算机,每台计算机多线程处理。
方案2可扩展性更强,是一种分布式任务调度的处理方案。
在思考什么是分布式任务调度之前,我们可以先思考一下下面业务场景的解决方案:
某电商系统需要在每天上午10点,下午3点,晚上8点发放一批优惠券。
某财务系统需要在每天上午10点前结算前一天的账单数据,统计汇总。
某电商平台每天凌晨3点,要对订单中的无效订单进行清理。
12306网站会根据车次不同,设置几个时间点分批次放票。
电商整点抢购,商品价格某天上午8点整开始优惠。
商品成功发货后,需要向客户发送短信提醒。
以上这些场景,就是任务调度所需要解决的问题。
任务调度顾名思义,就是对任务的调度,它是指系统为了完成特定业务,基于给定时间点,给定时间间隔或者给定执行次数自动执行任务。
如何实现任务调度?
多线程方式实现:
学过多线程的同学,可能会想到,我们可以开启一个线程,每sleep一段时间,就去检查是否已到预期执行时间。
以下代码简单实现了任务调度的功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| Java public static void main(String[] args) { final long timeInterval = 1000; Runnable runnable = new Runnable() { public void run() { while (true) { try { Thread.sleep(timeInterval); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Thread thread = new Thread(runnable); thread.start(); }
|
上面的代码实现了按一定的间隔时间执行任务调度的功能。
Jdk也为我们提供了相关支持,如Timer、ScheduledExecutor,下边我们了解下。
Timer方式实现:
1 2 3 4 5 6 7 8 9 10
| Java public static void main(String[] args){ Timer timer = new Timer(); timer.schedule(new TimerTask(){ @Override public void run() { } }, 1000, 2000); }
|
Timer的优点在于简单易用,每个Timer对应一个线程,因此可以同时启动多个Timer并行执行多个任务,同一个Timer中的任务是串行执行。
ScheduledExecutor方式实现:
1 2 3 4 5 6 7 8 9 10 11 12 13
| Java public static void main(String [] agrs){ ScheduledExecutorService service = Executors.newScheduledThreadPool(10); service.scheduleAtFixedRate( new Runnable() { @Override public void run() { System.out.println("todo something"); } }, 1, 2, TimeUnit.SECONDS); }
|
Java 5 推出了基于线程池设计的ScheduledExecutor,其设计思想是,每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰。
Timer 和 ScheduledExecutor都仅能提供基于开始时间与重复间隔的任务调度,不能胜任更加复杂的调度需求。比如,设置每月第一天凌晨1点执行任务、复杂调度任务的管理、任务间传递数据等等。
Quartz
Quartz是一个功能强大的任务调度框架,它可以满足更多更复杂的调度需求,Quartz设计的核心类包括 Scheduler, Job 以及 Trigger。其中,Job负责定义需要执行的任务,Trigger 负责设置调度策略,Scheduler将二者组装在一起,并触发任务开始执行。Quartz支持简单的按时间间隔调度、还支持按日历调度方式,通过设置CronTrigger表达式(包括:秒、分、时、日、月、周、年)进行任务调度。
第三方Quartz方式实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| Java public static void main(String [] agrs) throws SchedulerException { SchedulerFactory schedulerFactory = new StdSchedulerFactory(); Scheduler scheduler = schedulerFactory.getScheduler(); JobBuilder jobDetailBuilder = JobBuilder.newJob(MyJob.class); jobDetailBuilder.withIdentity("jobName","jobGroupName"); JobDetail jobDetail = jobDetailBuilder.build(); CronTrigger trigger = TriggerBuilder.newTrigger() .withIdentity("triggerName", "triggerGroupName") .startNow() .withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?")) .build();
scheduler.scheduleJob(jobDetail,trigger); scheduler.start(); }
public class MyJob implements Job { @Override public void execute(JobExecutionContext jobExecutionContext){ System.out.println("todo something"); } }
|
通过以上内容我们学习了什么是任务调度,任务调度所解决的问题,以及任务调度的多种实现方式。
什么是分布式任务调度?
通常任务调度的程序是集成在应用中的,比如:优惠卷服务中包括了定时发放优惠卷的的调度程序,结算服务中包括了定期生成报表的任务调度程序,由于采用分布式架构,一个服务往往会部署多个冗余实例来运行我们的业务,在这种分布式系统环境下运行任务调度,我们称之为分布式任务调度。
分布式调度要实现的目标:
不管是任务调度程序集成在应用程序中,还是单独构建的任务调度系统,如果采用分布式调度任务的方式就相当于将任务调度程序分布式构建,这样就可以具有分布式系统的特点,并且提高任务的调度处理能力:
- 并行任务调度
并行任务调度实现靠多线程,如果有大量任务需要调度,此时光靠多线程就会有瓶颈了,因为一台计算机CPU的处理能力是有限的。
如果将任务调度程序分布式部署,每个结点还可以部署为集群,这样就可以让多台计算机共同去完成任务调度,我们可以将任务分割为若干个分片,由不同的实例并行执行,来提高任务调度的处理效率。
- 高可用
若某一个实例宕机,不影响其他实例来执行任务。
- 弹性扩容
当集群中增加实例就可以提高并执行任务的处理效率。
- 任务管理与监测
对系统中存在的所有定时任务进行统一的管理及监测。让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应。
- 避免任务重复执行
当任务调度以集群方式部署,同一个任务调度可能会执行多次,比如在上面提到的电商系统中到点发优惠券的例子,就会发放多次优惠券,对公司造成很多损失,所以我们需要控制相同的任务在多个运行实例上只执行一次。
XXL-JOB介绍
XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
官网:https://www.xuxueli.com/xxl-job/
文档:https://www.xuxueli.com/xxl-job/#%E3%80%8A%E5%88%86%E5%B8%83%E5%BC%8F%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA%A6%E5%B9%B3%E5%8F%B0XXL-JOB%E3%80%8B
XXL-JOB主要有调度中心、执行器、任务:
调度中心:
任务执行器:
任务:负责执行具体的业务处理。
执行流程:
任务执行器根据配置的调度中心的地址,自动注册到调度中心
达到任务触发条件,调度中心下发任务
执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中
执行器消费内存队列中的执行结果,主动上报给调度中心
当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情
搭建XXL-JOB
下载XXL-JOB
GitHub:https://github.com/xuxueli/xxl-job
码云:https://gitee.com/xuxueli0323/xxl-job
项目使用2.3.1版本:
https://github.com/xuxueli/xxl-job/releases/tag/2.3.1
也可从课程资料目录获取,解压xxl-job-2.3.1.zip,使用IDEA打开解压后的目录
doc :文档资料,包含数据库脚本
创建数据库
首先修改doc下的tables_xxl_job.sql脚本内容:
1 2 3
| Java CREATE database if NOT EXISTS `xxl_job_2.3.1` default character set utf8mb4 collate utf8mb4_unicode_ci; use `xxl_job_2.3.1`;
|
将tables_xxl_job.sql脚本导入xxl_job_2.3.1数据库
修改配置文件
修改xxl-job-admin任务调度中心下application.properties的配置文件内容,修改数据库链接地址:
1 2 3 4
| Java spring.datasource.url=jdbc:mysql: spring.datasource.username=账号 spring.datasource.password=密码
|
启动工程
启动xxl-job-admin任务调度中心,运行com.xxl.job.admin.XxlJobAdminApplication
启动成功访问http://localhost:8080/xxl-job-admin
账号:admin
密码:123456
到此调度中心启动成功。
配置执行器
执行器负责与调度中心通信接收调度中心发起的任务调度请求。
添加依赖
在media-service工程添加依赖,在项目的父工程已约定了版本2.3.1
1 2 3 4 5
| XML <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> </dependency>
|
配置nacos信息
在nacos下的media-service-dev.yaml下配置xxl-job
1 2 3 4 5 6 7 8 9 10 11 12 13
| YAML xxl: job: admin: addresses: http://localhost:端口号/xxl-job-admin executor: appname: media-process-service address: ip: port: 9999 logpath: /data/applogs/xxl-job/jobhandler logretentiondays: 30 accessToken: default_token
|
注意配置中的appname这是执行器的应用名,稍后在调度中心配置执行器时要使用。
配置xxl-job执行器
将示例工程下配置类:
拷贝到media-service工程的config文件夹下:
添加执行器
进入调度中心添加执行器
点击新增,填写执行器信息,appname是前边在nacos中配置xxl信息时指定的执行器的应用名。
添加成功:
到此完成媒资管理模块service工程配置xxl-job执行器,在xxl-job调度中心添加执行器
测试连接
准备测试执行器与调度中心是否正常通信,因为接口工程依赖了service工程,所以启动media-api工程。
启动后观察日志,出现下边的日志表示执行器在调度中心注册成功
同时观察调度中心中的执行器界面
在线机器地址处已显示1个执行器。
测试任务类
- 在media-service包下新建jobhandler存放任务类,下边参考示例工程编写一个任务类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package com.xuecheng.mediaService.jobhandler;
import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component;
@Component @Slf4j public class SampleJob {
@XxlJob("testJob") public void testJob() throws Exception { log.info("开始执行.....");
}
@XxlJob("shardingJobHandler") public void shardingJobHandler() throws Exception {
int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal();
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal); log.info("开始执行第"+shardIndex+"批任务");
} }
|
- 在调度中心添加任务,进入任务管理
点击新增,填写任务信息
注意红色标记处:
调度类型选择Cron,并配置Cron表达式设置定时策略。
运行模式有BEAN和GLUE,bean模式较常用就是在项目工程中编写执行器的任务代码,GLUE是将任务代码编写在调度中心。
JobHandler任务方法名填写@XxlJob注解中的名称。
- 添加成功,启动任务
通过调度日志查看任务执行情况
- 启动媒资管理的service工程,启动执行器,观察执行器方法的执行。
- 如果要停止任务需要在调度中心操作
- 任务跑一段时间注意清理日志
需求分析
作业分片方案
掌握了xxl-job的作业分片调度方式,下边思考如何分布式去执行学成在线平台中的视频处理任务。
任务添加成功后,对于要处理的任务会添加到待处理任务表中,现在启动多个执行器实例去查询这些待处理任务,此时如何保证多个执行器不会重复执行任务?
执行器收到调度请求后各自己查询属于自己的任务,这样就保证了执行器之间不会重复执行任务。
xxl-job设计作业分片就是为了分布式执行任务,XXL-JOB并不直接提供数据处理的功能,它只会给执行器分配好分片序号并向执行器传递分片总数、分片序号这些参数,开发者需要自行处理分片项与真实数据的对应关系。
每个执行器收到广播任务有两个参数:分片总数、分片序号。每个执行从数据表取任务时可以让任务id
模上 分片总数,如果等于分片序号则执行此任务。
上边两个执行器实例那么分片总数为2,序号为0、1,从任务1开始,如下:
1 % 2 = 1 执行器2执行
2 % 2 = 0 执行器1执行
3 % 2 = 1 执行器2执行
以此类推.
保证任务不重复执行
通过作业分片的方式保证了执行器之间分配的任务不重复,如果同一个执行器在处理一个视频还没有完成,此时调度中心又一次请求调度,为了不重复处理同一个视频该怎么办?
- 首先配置调度过期策略,查看文档如下:
调度过期策略:
忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
这里我们选择忽略,如果立即执行一次可能会重复调度。
- 看阻塞处理策略,阻塞处理策略就是当前执行器正在执行任务还没有结束,此时调度时间到该如何处理,查看文档如下:
- 单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
- 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
- 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
这里选择 丢弃后续调度,避免重复调度。
- 注意保证任务处理的幂等性
什么是任务的幂等性?
任务的幂等性是指:对于数据的操作不论多少次,操作的结果始终是一致的。执行器接收调度请求去执行任务,要有办法去判断该任务是否处理完成,如果处理完则不再处理,即使重复调度处理相同的任务也不能重复处理相同的视频。
什么是幂等性?
它描述了一次和多次请求某一个资源对于资源本身应该具有同样的结果。幂等性是为了解决重复提交问题,比如:恶意刷单,重复支付等。
解决幂等性常用的方案:
数据库约束,比如:唯一索引,主键。
乐观锁,常用于数据库,更新数据时根据乐观锁状态去更新。
唯一序列号,操作传递一个唯一序列号,操作时判断与该序列号相等则执行。
这里我们在数据库视频处理表中添加处理状态字段,视频处理完成更新状态为完成,执行视频处理前判断状态是否完成,如果完成则不再处理。
业务流程
确定了分片方案,下边梳理整个视频上传及处理的业务流程。
视频处理的详细流程如下:
任务调度中心广播作业分片。
执行器收到广播作业分片,从数据库读取待处理任务。
执行器根据任务内容从MinIO下载要处理的文件。
执行器启动多线程去处理任务。
任务处理完成,上传处理后的视频到MinIO。
将更新任务处理结果,如果视频处理完成除了更新任务处理结果以外还要将文件的访问地址更新至任务处理表及文件表中,最后将任务完成记录写入历史表。
model-生成DTO
根据需求分析,DAO接口包括如下:
- 查询待处理视频列表。
- 保证视频处理结果
- 添加视频历史处理记录表。
- 删除已完成视频处理记录
- 上传处理后的视频文件向media_files插入记录
在 mediaProcessMapper中编写根据分片参数获取待处理任务的DAO方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.xuecheng.mediaService.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.xuecheng.mediaModel.po.MediaProcess; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select;
import java.util.List;
public interface MediaProcessMapper extends BaseMapper<MediaProcess> {
@Select("SELECT t.* FROM media_process t WHERE t.id % #{shardTotal} = #{shardindex} and t.status='1' limit #{count}") List<MediaProcess> selectListByShardIndex(@Param("shardTotal") int shardTotal, @Param("shardindex") int shardindex, @Param("count") int count); }
|
Service-接口开发
定义Service方法
在MediaFileProcessService中定义方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package com.xuecheng.mediaService.service;
import com.baomidou.mybatisplus.extension.service.IService; import com.xuecheng.mediaModel.po.MediaProcess; import org.springframework.transaction.annotation.Transactional;
import java.util.List;
public interface MediaProcessService extends IService<MediaProcess> {
@Transactional public void saveProcessFinishStatus(int status, String fileId, String url, String errorMsg);
public List<MediaProcess> getMediaProcessList(int shardIndex, int shardTotal, int count);
}
|
实现service方法impl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| package com.xuecheng.mediaService.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.xuecheng.mediaModel.po.MediaFiles; import com.xuecheng.mediaModel.po.MediaProcess; import com.xuecheng.mediaModel.po.MediaProcessHistory; import com.xuecheng.mediaService.mapper.MediaFilesMapper; import com.xuecheng.mediaService.mapper.MediaProcessHistoryMapper; import com.xuecheng.mediaService.mapper.MediaProcessMapper; import com.xuecheng.mediaService.service.MediaProcessService; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
import java.time.LocalDateTime; import java.util.List;
@Service public class MediaProcessServiceImpl extends ServiceImpl<MediaProcessMapper, MediaProcess> implements MediaProcessService {
@Autowired MediaFilesMapper mediaFilesMapper;
@Autowired MediaProcessMapper mediaProcessMapper;
@Autowired MediaProcessHistoryMapper mediaProcessHistoryMapper;
@Override public void saveProcessFinishStatus(int status, String fileId, String url, String errorMsg) { LambdaQueryWrapper<MediaProcess> queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(MediaProcess::getFileId, fileId); MediaProcess mediaProcess = mediaProcessMapper.selectOne(queryWrapper); if (mediaProcess == null){ return; }
if (status == 3){ mediaProcess.setStatus("3"); mediaProcess.setErrormsg(errorMsg); mediaProcessMapper.updateById(mediaProcess); return; }
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId); if (mediaFiles != null){ mediaFiles.setUrl(url); mediaFilesMapper.updateById(mediaFiles); }
mediaProcess.setUrl(url); mediaProcess.setStatus("2"); mediaProcess.setFinishDate(LocalDateTime.now()); mediaProcessMapper.updateById(mediaProcess);
MediaProcessHistory mediaProcessHistory = new MediaProcessHistory(); BeanUtils.copyProperties(mediaProcess, mediaProcessHistory); mediaProcessHistoryMapper.insert(mediaProcessHistory); mediaProcessMapper.deleteById(mediaProcess.getId()); }
@Override public List<MediaProcess> getMediaProcessList(int shardIndex, int shardTotal, int count) { List<MediaProcess> mediaProcessList = mediaProcessMapper.selectListByShardIndex(shardTotal, shardIndex, count); return mediaProcessList; } }
|
utils-任务类开发
视频处理工具类
将课程资料目录中的util.zip解压,将解压出的工具类拷贝至base工程,并在base工程添加如下依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </dependency>
<dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <scope>provided</scope> </dependency>
<dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> </dependency> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.11</version> </dependency>
|
其中Mp4VideoUtil类是用于将视频转为mp4格式,是我们项目要使用的工具类。
下边看下这个类的代码,并进行测试。
我们要通过ffmpeg对视频转码,Java程序调用ffmpeg,使用java.lang.ProcessBuilder去完成,具体在Mp4VideoUtil类的63行。
使用该类的main方法进行测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| Java public static void main(String[] args) throws IOException { String ffmpeg_path = "D:\\soft\\ffmpeg\\ffmpeg.exe"; String video_path = "D:\\develop\\bigfile_test\\nacos01.avi"; String mp4_name = "nacos01.mp4"; String mp4_path = "D:\\develop\\bigfile_test\\nacos01.mp4"; Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path,video_path,mp4_name,mp4_path); String s = videoUtil.generateMp4(); System.out.println(s); }
|
执行main方法,最终在控制台输出 success 表示执行成功。
定义任务类
视频采用并发处理,每个视频使用一个线程去处理,每次处理的视频数量不要超过cpu核心数。
所有视频处理完成结束本次执行,为防止代码异常出现无限期等待添加超时设置,到达超时时间还没有处理完成仍结束任务。
在media-service工程的 jobhandler文件夹中定义任务类VideoTask 如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
| package com.xuecheng.mediaService.jobhandler;
import com.xuecheng.base.utils.Mp4VideoUtil; import com.xuecheng.mediaModel.po.MediaProcess; import com.xuecheng.mediaService.service.MediaFilesService; import com.xuecheng.mediaService.service.MediaProcessService; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;
import java.io.File; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
@Component @Slf4j public class VideoTask {
@Autowired private MediaFilesService mediaFilesService;
@Autowired private MediaProcessService mediaProcessService;
@Value("${videoprocess.ffmpegpath}") String ffmpegPath;
@XxlJob("videoJobHandler") public void videoJobHandler() throws Exception { int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal(); log.debug("shardIndex=" + shardIndex + ",shardTotal=" + shardTotal);
List<MediaProcess> mediaProcesses = mediaProcessService.getMediaProcessList(shardIndex, shardTotal, 2); int size = mediaProcesses.size();
log.debug("取出待处理视频记录" + size + "条"); if (size <= 0) { return; }
ExecutorService threadPool = Executors.newFixedThreadPool(size); CountDownLatch countDownLatch = new CountDownLatch(size);
mediaProcesses.forEach(mediaProcess -> { threadPool.execute(() -> { String bucket = mediaProcess.getBucket(); String filePath = mediaProcess.getFilePath(); String fileId = mediaProcess.getFileId(); String filename = mediaProcess.getFilename();
File originalVideo = null; File mp4Video = null; try { originalVideo = File.createTempFile("original", null); mp4Video = File.createTempFile("mp4", ".mp4"); } catch (IOException e) { log.error("下载待处理的原始文件前创建临时文件失败"); } try { originalVideo = mediaFilesService.downloadFileFromMinIO(originalVideo, bucket, filePath);
Mp4VideoUtil mp4VideoUtil = new Mp4VideoUtil(ffmpegPath, originalVideo.getAbsolutePath(), mp4Video.getName(), mp4Video.getAbsolutePath()); String result = mp4VideoUtil.generateMp4(); if (!result.equals("success")) { log.error("generateMp4 error ,video_path is {},error msg is {}", bucket + filePath, result); mediaProcessService.saveProcessFinishStatus(3, fileId, null, result); return; } String objectName = getFilePath(fileId, ".mp4"); mediaFilesService.addMediaFilesToMinIO(mp4Video.getAbsolutePath(), bucket, objectName);
String url = "/" + bucket + "/" + objectName; mediaProcessService.saveProcessFinishStatus(2, fileId, url, null);
} catch (Exception e) { e.printStackTrace(); } finally { if (originalVideo != null) { try { originalVideo.delete(); } catch (Exception e) {
} } if (mp4Video != null) { try { mp4Video.delete(); } catch (Exception e) {
} } } countDownLatch.countDown();
}); });
countDownLatch.await(); }
private String getFilePath(String fileMd5,String fileExt){ return fileMd5.substring(0,1) + "/" + fileMd5.substring(1,2) + "/" + fileMd5 + "/" +fileMd5 +fileExt; } }
|
视频处理测试
上传视频代码完善
上传视频成功根据视频文件格式判断,如果是avi文件则需要加入视频待处理表。
前面mediaserviceImpl的代码已经修改完毕
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
@Transactional public MediaFiles addMediaFilesToDB(Long companyId, String fileMd5, UploadFileParamsDto uploadFileParamsDto, String bucket_files, String objectName) { MediaFiles mediaFiles = new MediaFiles(); BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles); mediaFiles.setId(fileMd5); mediaFiles.setFileId(fileMd5); mediaFiles.setCompanyId(companyId); mediaFiles.setBucket(bucket_files); mediaFiles.setFilePath(objectName); mediaFiles.setUrl("/" + bucket_files + "/" + objectName); mediaFiles.setCreateDate(LocalDateTime.now()); mediaFiles.setStatus("1"); mediaFiles.setAuditStatus("002004");
int insert = mediaFilesMapper.insert(mediaFiles); if (insert < 0) { XueChengException.cast("保存文件信息失败"); } String extension = objectName.substring(objectName.lastIndexOf(".")); if (extension.equalsIgnoreCase(".avi")) { MediaProcess mediaProcess = new MediaProcess(); BeanUtils.copyProperties(mediaFiles, mediaProcess); mediaProcessMapper.insert(mediaProcess); mediaFiles.setUrl(null); mediaFilesMapper.updateById(mediaFiles); } return mediaFiles; }
|
视频处理调度策略
进入xxl-job调度中心添加执行器和任务,在xxl-job配置任务调度策略:
配置阻塞处理策略为:丢弃后续调度。
配置视频处理调度时间间隔不用根据视频处理时间去确定,可以配置的小一些,如:5分钟,即使到达调度时间如果视频没有处理完会丢失调度请求。
配置完成开始测试视频处理:
首先上传至少4个视频,非mp4格式。
启动媒资管理服务,观察日志
注意:如果数据库在导表的时候添加了 processHistory 数据,要记得删除,确保 process 表和 processhistory 表都是空的,否则在视频转码完成写入进历史表时会冲突
绑定媒资
需求分析
业务流程
到目前为止,媒资管理已完成文件上传、视频处理、我的媒资功能等基本功能,其它模块可以使用媒资文件,本节要讲解课程计划绑定媒资文件。
- 首先进入课程计划界面,然后选择要绑定的视频进行绑定即可。
具体的业务流程如下:
教育机构用户进入课程管理页面并编辑某一个课程,在"课程大纲"标签页的某一小节后可点击”添加视频”。
弹出添加视频对话框,可通过视频关键字搜索已审核通过的视频媒资。
选择视频媒资,点击提交按钮,完成课程计划绑定媒资流程。
8.1.2 数据模型
课程计划绑定媒资文件后存储至课程计划teachplan绑定媒资表media
接口定义
根据业务流程,用户进入课程计划列表,首先确定向哪个课程计划添加视频,点击”添加视频”后用户选择视频,选择视频,点击提交,提交媒资文件id、文件名称、教学计划id,示例如下:
1 2 3 4 5 6
| JSON { "mediaId": "70a98b4a2fffc89e50b101f959cc33ca", "fileName": "22-Hmily实现TCC事务-开发bank2的confirm方法.avi", "teachplanId": 257 }
|
此接口在内容管理模块content提供
model-生成DTO
在内容管理模块content定义请求参数模型类型:
注意:对象类型要和数据库的对应
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.xuecheng.contentModel.dto;
import io.swagger.annotations.ApiModelProperty; import lombok.Data;
@Data public class BindTeachplanMediaDto { @ApiModelProperty(value = "媒资文件id", required = true) private String mediaId;
@ApiModelProperty(value = "媒资文件名称", required = true) private String fileName;
@ApiModelProperty(value = "课程计划标识", required = true) private Long teachplanId; }
|
service-接口开发
定义service方法
根据需求在teachplanservice中定义接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
public TeachplanMedia associationMedia(BindTeachplanMediaDto bindTeachplanMediaDto);
public void deleteAssociationMedia(Long teachPlanId, String mediaId);
|
实现service方法impl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Override public TeachplanMedia associationMedia(BindTeachplanMediaDto bindTeachplanMediaDto) { Long teachplanId = bindTeachplanMediaDto.getTeachplanId(); Teachplan teachplan = teachplanMapper.selectById(teachplanId); if (teachplan == null) { XueChengException.cast("教学计划不存在"); } Integer grade = teachplan.getGrade(); if (grade != 2) { XueChengException.cast("只允许第二级教学计划绑定媒资文件"); } Long courseId = teachplan.getCourseId();
teachplanMediaMapper.delete(new LambdaQueryWrapper<TeachplanMedia>().eq(TeachplanMedia::getTeachplanId, teachplanId));
TeachplanMedia teachplanMedia = new TeachplanMedia(); teachplanMedia.setCourseId(courseId); teachplanMedia.setTeachplanId(teachplanId); teachplanMedia.setMediaFilename(bindTeachplanMediaDto.getFileName()); teachplanMedia.setMediaId(bindTeachplanMediaDto.getMediaId()); teachplanMedia.setCreateDate(LocalDateTime.now()); teachplanMediaMapper.insert(teachplanMedia); return teachplanMedia; }
@Override public void deleteAssociationMedia(Long teachPlanId, String mediaId) { LambdaQueryWrapper<TeachplanMedia> queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(TeachplanMedia::getTeachplanId, teachPlanId); queryWrapper.eq(TeachplanMedia::getMediaId, mediaId); teachplanMediaMapper.delete(queryWrapper); }
|
api-定义接口
在teachplancontroller中添加接口:
1 2 3 4 5 6 7 8 9
| @PostMapping("/teachplan/association/media") public void associationMedia(@RequestBody BindTeachplanMediaDto bindTeachplanMediaDto){ teachplanService.associationMedia(bindTeachplanMediaDto); }
@DeleteMapping("/teachplan/association/media/{teachPlanId}/{mediaId}") public void deleteAssociationMedia(@PathVariable Long teachPlanId, @PathVariable String mediaId){ teachplanService.deleteAssociationMedia(teachPlanId, mediaId); }
|
接口测试
- 使用httpclient测试
1 2 3 4 5 6 7 8 9 10 11 12 13
| Plain Text ### 课程计划绑定视频 POST {{media_host}}/media/teachplan/association/media Content-Type: application/json
{ "mediaId": "", "fileName": "", "teachplanId": "" }
### 课程计划接触视频绑定 DELETE {{media_host}}/media/teachplan/association/media/{teachPlanId}/{mediaId}
|
- 前后端联调
此功能较为简单推荐直接前后端联调:重启content-api工程,向指定课程计划添加视频、解除视频绑定。
OVER~~