媒资管理模块

模块需求分析

模块介绍

媒资管理系统是每个在线教育平台所必须具备的,查阅百度百科对它的定义如下:

媒体资源管理(Media Asset Management,MAM)系统是建立在多媒体、网络、数据库和数字存储等先进技术基础上的一个对各种媒体及内容(如视/音频资料、文本文件、图表等)进行数字化存储、管理以及应用的总体解决方案,包括数字媒体的采集、编目、管理、传输和编码转换等所有环节。其主要是满足媒体资源拥有者收集、保存、查找、编辑、发布各种信息的要求,为媒体资源的使用者提供访问内容的便捷方法,实现对媒体资源的高效管理,大幅度提高媒体资源的价值。

每个教学机构都可以在媒资系统管理自己的教学资源,包括:视频、教案等文件。

目前媒资管理的主要管理对象是视频、图片、文档等,包括:媒资文件的查询、文件上传、视频处理等。

媒资查询:教学机构查询自己所拥有的媒资信息。

文件上传:包括上传图片、上传文档、上传视频。

视频处理:视频上传成功,系统自动对视频进行编码处理。

文件删除:教学机构删除自己上传的媒资文件。

搭建模块环境

架构的问题分析

当前要开发的是媒资管理服务,目前为止共三个三微服务:内容管理、系统管理、媒资管理,后期还会添加更多的微服务,当前这种由前端直接请求微服务的方式存在弊端:如果在前端对每个请求地址都配置绝对路径,非常不利于系统维护,当系统上线后这里需要改成公网的域名,如果这种地址非常多则非常麻烦。

基于这个问题可以采用网关来解决,这样在前端的代码中只需要指定每个接口的相对路径,在前端代码的一个固定的地方在接口地址前统一加网关的地址,每个请求统一到网关,由网关将请求转发到具体的微服务。

为什么所有的请求先到网关呢?

有了网关就可以对请求进行路由,比如:可以根据请求路径路由、根据host地址路由等,当微服务有多个实例时可以通过负载均衡算法进行路由,另外,网关还可以实现权限控制、限流等功能。

项目采用Spring Cloud Gateway作为网关,网关在请求路由时需要知道每个微服务实例的地址,项目使用Nacos作用服务发现中心和配置中心,流程如下:

  1. 微服务启动,将自己注册到Nacos,Nacos记录了各微服务实例的地址。

  2. 网关从Nacos读取服务列表,包括服务名称、服务地址等。

  3. 请求到达网关,网关将请求路由到具体的微服务。

要使用网关首先搭建Nacos,Nacos有两个作用:

  • 服务发现中心。

    • 微服务将自身注册至Nacos,网关从Nacos获取微服务列表。
  • 配置中心。

    • 微服务众多,它们的配置信息也非常复杂,为了提供系统的可维护性,微服务的配置信息统一在Nacos配置。

搭建Nacos

这里我踩了很多坑,但是删删改改的很难总结,所以直接将最终版的写法展示出来

服务发现中心

根据上节讲解的网关的架构图,要使用网关首先搭建Nacos。

首先搭建Nacos服务发现中心。

在搭建Nacos服务发现中心之前需要搞清楚两个概念:namespace和group

  • namespace:用于区分环境、比如:开发环境、测试环境、生产环境。

  • group:用于区分项目,比如:xuecheng-plus项目、xuecheng2.0项目

首先在nacos配置namespace:登录Centos,启动Naocs,使用 sh /data/soft/restart.sh 将自动启动Nacos。

1
2
3
4
访问:http://192.168.101.65:8848/nacos/

账号:nacos
密码:nacos

首先完成各服务注册到Naocs(导文件,改配置),将资料给的文件 nacos_config_export.zip 导入nacos

下边将服务注册到nacos中。

  1. 在xuecheng-parent中添加依赖管理

1
2
3
4
5
6
7
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

  1. content-api 工程、content-service工程、 system-api 工程以及system-service 工程中添加如下依赖

1
2
3
4
5
6
7
8
9
<!--添加Nacos 注册中心-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

注意是 discovery 和 config !!!


  1. 在idea中配置nacos的地址
  • system-service 的配置文件

    1. 改名:将 application.yml 改名为 bootstrap.yml

    2. 在里面写入以下配置:

注意:profiles是和 cloud 同级的,不要顶格写!!!


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
spring:
application:
name: system-service
cloud:
nacos:
server-addr: nacosip:8848
discovery:
namespace: ${spring.profiles.active}
group: xuecheng-online-project(自定义分组名称)
config:
namespace: xcdev
group: xuecheng-online-project
file-extension: yaml
refresh-enabled: true
shared-configs:
- data-id: logging-${spring.profiles.active}.yaml
group: common
refresh: true
profiles:
active: dev

对应的是nacos的 system-service-dev.yaml 文件——主要存放的是service层连接数据库的信息

image-20230203222912906


  • system-api 的配置文件

    1. 改名:将 application.yml 改名为 bootstrap.yml

    2. 在里面写入以下配置:(注释掉的东西都写在了nacos中)

注意:profiles是和 cloud 同级的,不要顶格写!!!


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#server:
# servlet:
# context-path: /system
# port: 63110
#微服务配置
spring:
application:
name: system-api
# mvc:
# pathmatch:
# matching-strategy: ANT_PATH_MATCHER
cloud:
nacos:
server-addr: nacos ip:8848
discovery:
namespace: xcdev
group: xuecheng-online-project
config:
namespace: xcdev
group: xuecheng-online-project
file-extension: yaml
refresh-enabled: true
extension-configs:
- data-id: system-service-${spring.profiles.active}.yaml
group: xuecheng-online-project
refresh: true
shared-configs:
- data-id: logging-${spring.profiles.active}.yaml
group: common
refresh: true
profiles:
active: dev
# 日志文件配置路径
#logging:
# config: classpath:log4j2-dev.xml


对应的是nacos的 system-api-dev.yaml 文件看上图——主要存放的是 api层的端口号以及配置消息


  • 在 content-api 的配置文件:

    1. 改名:将 application.yml 改名为 bootstrap.yml

    2. 在里面写入以下配置:(注释掉的东西都写在了nacos中)

注意:profiles是和 cloud 同级的,不要顶格写!!!


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#server:
# servlet:
# context-path: /content
# port: 63040
#微服务配置
spring:
application:
name: content-api
# mvc:
# pathmatch:
# matching-strategy: ANT_PATH_MATCHER
cloud:
nacos:
server-addr: ip:8848
discovery:
namespace: xcdev
group: xuecheng-online-project
config:
namespace: xcdev
group: xuecheng-online-project
file-extension: yaml
refresh-enabled: true
extension-configs:
- data-id: content-service-${spring.profiles.active}.yaml
group: xuecheng-online-project
refresh: true
shared-configs:
- data-id: logging-${spring.profiles.active}.yaml
group: common
refresh: true
profiles:
active: dev
# 日志文件配置路径
#logging:
# config: classpath:log4j2-dev.xml

对应的是nacos的 content-api-dev.yaml 文件看上图——主要存放的是 api层的端口号以及配置消息

image-20230203223739484


  • content-service 的配置文件

    1. 改名:将 application.yml 改名为 bootstrap.yml

    2. 在里面写入以下配置:

注意:profiles是和 cloud 同级的,不要顶格写!!!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
spring:
application:
name: content-service
cloud:
nacos:
server-addr: ip:8848
discovery:
namespace: ${spring.profiles.active}
group: xuecheng-online-project
config:
namespace: xcdev
group: xuecheng-online-project
file-extension: yaml
refresh-enabled: true
shared-configs:
- data-id: logging-${spring.profiles.active}.yaml
group: common
refresh: true
profiles:
active: dev

# 日志文件配置路径
#logging:
# config: classpath:log4j2-dev.xml

对应的是nacos的 content-service-dev.yaml 文件看上图——主要存放的是service层连接数据库的信息


  1. 重启内容管理服务content、系统管理服务system。

只要能在 “服务列表”看到启动情况:出现了 content-api 和 system-api,则说明nacos配置成功。


启动content-api工程,查询控制台是否打印出了请求nacos的日志,如下:


1
[NacosRestTemplate.java:476] - HTTP method: POST, url:http://192.168.101.65:8848/nacos/v1/cs/configs/listener

并使用Httpclient测试课程查询接口是否可以正常查询。

公用配置

如何在nacos中配置项目的公用配置呢?

nacos提供了 shared-configs 可以引入公用配置。

对于swagger或者 knife4j 这类接口测试文档,可以将配置定义为一个公用配置,哪个项目需要用直接引入即可。(甚至别的项目也可以直接用)

单独在common分组下创建xuecheng的公用配置,进入nacos的开发环境,添加公用配置:

image-20230204003934050

项目使用shared-configs可以引入公用配置。在接口工程的本地配置文件中引入公用配置,如上面的服务发现中心就已经添加了 shared-configs 引入了公用配置

配置完成后可以查看swagger/knife4j接口文档是否可以正常访问,查看控制台log4j2日志输出是否正常。

配置优先级

到目前为止已将所有微服务的配置统一在nacos进行配置,用到的配置文件有本地的配置文件bootstrap.yaml和nacos上的配置文件,引入配置文件的形式有:

  1. 通过dataid方式引入

  2. 以扩展配置文件方式引入

  3. 以共享配置文件 方式引入

那么这几种配置文件方式的优先级是怎么样的呢

  • 通过dataid方式引入优先级第一高
  • 其次是扩展类配置文件(extension-configs)
  • 再者是共享类配置文件(shared-configs)
  • 最后本地配置文件优先级最低

如果需要设置本地最优先,要在nacos配置文件中配置代码:

1
2
3
4
spring:
cloud:
config:
override-none: true

搭建Gateway

本项目使用Spring Cloud Gateway作为网关,下边创建网关工程。

新建一个网关工程xuecheng-gateway,工程结构如下:

image-20230204004555268

添加依赖

配置pom文件:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.xuecheng</groupId>
<artifactId>xuecheng-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../xuecheng-parent</relativePath> <!-- lookup parent from repository -->
</parent>
<artifactId>xuecheng-gateway</artifactId>
<name>xuecheng-gateway</name>
<description>xuecheng-gateway</description>

<dependencies>
<!--网关-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!--服务发现中心-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

<!-- 排除 Spring Boot 依赖的日志包冲突 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Spring Boot 集成 log4j2 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!-- Spring Boot 集成 Junit -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>


配置bootstrap.yaml配置文件

在gateway工程的 resource 中将 application.properties 改成 bootstrap.yml,在里面配置:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
spring:
application:
name: gateway
cloud:
nacos:
server-addr: nacoip:8848
discovery:
namespace: xcdev
group: xuecheng-online-project
config:
namespace: xcdev
group: xuecheng-online-project
file-extension: yaml
refresh-enabled: true
shared-configs:
- data-id: logging-${spring.profiles.active}.yaml
group: common
refresh: true
profiles:
active: dev

注意:profiles是和 cloud 同级的,不要顶格写!!!


在nacos上配置网关路由策略,详细配置导材料就行


接口测试

启动网关工程,通过网关工程访问微服务进行测试。

在http-client-env.json中配置网关的地址,使用httpclient测试课程查询 接口,如下:


1
2
3
4
5
6
7
8
### 课程查询列表
POST {{gateway_host}}/content/course/list?pageNo=2&pageSize=1
Content-Type: application/json

{
"auditStatus": "202002",
"courseName": ""
}

运行,对照数据库信息,观察是否可以正常访问接口 ,如下所示可以正常请求接口。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
JSON
http://localhost:63010/content/course/list?pageNo=2&pageSize=1

HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/json
Date: Sun, 11 Sep 2022 09:54:32 GMT

{
"items": [
{
"id": 26,
"companyId": 1232141425,
"companyName": null,
"name": "spring cloud实战",
"users": "所有人",
"tags": null,
"mt": "1-3",
"mtName": null,
"st": "1-3-2",
"stName": null,
"grade": "200003",
"teachmode": "201001",
"description": "本课程主要从四个章节进行讲解: 1.微服务架构入门 2.spring cloud 基础入门 3.实战Spring Boot 4.注册中心eureka。",
"pic": "https://cdn.educba.com/academy/wp-content/uploads/2018/08/Spring-BOOT-Interview-questions.jpg",
"createDate": "2019-09-04 09:56:19",
"changeDate": "2021-12-26 22:10:38",
"createPeople": null,
"changePeople": null,
"auditStatus": "202002",
"status": "203001",
"coursePubId": null,
"coursePubDate": null
}
],
"counts": 29,
"page": 2,
"pageSize": 1
}

更改前端工程接口

网关工程搭建完成即可将前端工程中的接口地址改为网关的地址

image-20230204005521609

启动前端工程,测试之前开发内容管理模块的功能。

搭建媒资工程

至此网关、Nacos已经搭建完成,下边将媒资工程导入项目。(建议自己敲熟悉一下)

image-20230204005814757

  1. 从课程资料中获取媒资工程 xuecheng-plus-media,拷贝到项目工程根目录。

  2. 右键pom.xml转为maven工程。

  3. 创建媒资数据库,并导入xcplus_media.sql

  4. 启动media-api工程测试运行

分布式文件系统

MinIO介绍

本项目采用MinIO构建分布式文件系统,MinIO是一个非常轻量的服务,可以很简单的和其他应用的结合使用,它兼容亚马逊 S3云存储服务接口,非常适合于存储大容量非结构化的数据,例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等。

它一大特点就是轻量,使用简单,功能强大,支持各种平台,单个文件最大5TB,兼容Amazon S3接口,提供了 Java、Python、GO等多版本SDK支持。

官网:https://min.io

中文:https://www.minio.org.cn/,http://docs.minio.org.cn/docs/

MinIO集群采用去中心化共享架构,每个结点是对等关系,通过Nginx可对MinIO进行负载均衡访问。

去中心化有什么好处?

在大数据领域,通常的设计理念都是无中心和分布式。Minio分布式模式可以帮助你搭建一个高可用的对象存储服务,你可以使用这些存储设备,而不用考虑其真实物理位置。

它将分布在不同服务器上的多块硬盘组成一个对象存储服务。由于硬盘分布在不同的节点上,分布式Minio避免了单点故障。

Minio使用纠删码技术来保护数据,它是一种恢复丢失和损坏数据的数学算法,它将数据分块冗余的分散存储在各各节点的磁盘上,所有的可用磁盘组成一个集合,上图由8块硬盘组成一个集合,当上传一个文件时会通过纠删码算法计算对文件进行分块存储,除了将文件本身分成4个数据块,还会生成4个校验块,数据块和校验块会分散的存储在这8块硬盘上。

使用纠删码的好处是即便丢失一半数量(N/2)的硬盘,仍然可以恢复数据。

MinIO安装

直接cmd运行MinIO.exe就能跑起来,最好看着视频弄,挂在虚拟机或者服务器上要去官网下对应的二进制文件

安装参考文档:安装参考

重定向问题参考:nohup出问题了

MinIO 对象存储参考:MinIO 对象存储

上传文件(图片)测试

MinIO提供多个语言版本SDK的支持,下边找到java版本的文档:

地址:https://docs.min.io/docs/java-client-quickstart-guide.html

最低需求Java 1.8或更高版本:

在media-service工程添加此依赖,依赖如下:(我直接附全部pom配置了)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.xuecheng</groupId>
<artifactId>xuecheng-media</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<artifactId>xuecheng-media-service</artifactId>
<name>xuecheng-media-service</name>
<description>xuecheng-media-service</description>

<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>com.xuecheng</groupId>
<artifactId>xuecheng-media-model</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>

<!-- MySQL 驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>

<!-- mybatis plus的依赖 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
<!-- Spring Boot 集成 Junit -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 排除 Spring Boot 依赖的日志包冲突 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Spring Boot 集成 log4j2 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!--添加 minio 分布式文件系统-->
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>8.4.3</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.8.1</version>
</dependency>
<!--添加 xxl_job 调度中心-->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
</dependency>

</dependencies>

</project>

MinIO参数说明

需要三个参数才能连接到minio服务。

参数 说明
Endpoint 对象存储服务的URL
Access Key Access key就像用户ID,可以唯一标识你的账户。
Secret Key Secret key是你账户的密码。

MinIO功能测试

在media-service中添加测试用例,测试,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.xuecheng.mediaService;

import io.minio.*;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.xml.bind.SchemaOutputResolver;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilterInputStream;

@SpringBootTest
class MediaServiceApplicationTests {

@Test
void contextLoads() {
}

static MinioClient minioClient =
MinioClient.builder()
.endpoint("http://MinIOIP:9000") //建议本机启动好看效果
.credentials("minioadmin", "minioadmin")
.build();

//上传
@Test
public void upload() {
try {
UploadObjectArgs uploadObjectArgs = UploadObjectArgs.builder()
.bucket("testbucket") //在minIo页面创建的 bucket名称
.object("rurisa.jpg") //要上传的文件名称
.filename("文件上传的本地路径\\rurisa.jpg")
.build();
minioClient.uploadObject(uploadObjectArgs);
System.out.println("上传成功");
} catch (Exception e) {
System.out.println("上传失败");
}
}

//删除
@Test
public void delete() {
try {
RemoveObjectArgs removeObjectArgs = RemoveObjectArgs.builder()
.bucket("testbucket")
.object("rurisa.jpg")
.build();
minioClient.removeObject(removeObjectArgs);
System.out.println("删除成功");
} catch (Exception e) {
System.out.println("删除失败");
}
}

// 查询/下载对象
@Test
public void getFile() {
GetObjectArgs getObjectArgs = GetObjectArgs.builder()
.bucket("testbucket")
.object("rurisa.jpg")
.build();
try (
FilterInputStream inputStream = minioClient.getObject(getObjectArgs);
FileOutputStream fileOutputStream = new FileOutputStream(new File("文件下载的存放路径\\pretty.jpg"));
) {
if (inputStream != null) {
IOUtils.copy(inputStream, fileOutputStream);
}
} catch (Exception e) {
System.out.println("查询失败");
}
}
}


执行方法,在MinIO页面查看效果,以及在文件下载指定文件夹中查看下载效果


上传图片

需求分析

业务流程

课程图片是宣传课程非常重要的信息,在新增课程界面上传课程图片,也可以修改课程图片。

image-20230204012551573

课程图片上传至分布式文件系统,在课程信息中保存课程图片路径,如下流程:

image-20230204012612381

  1. 前端进入上传图片界面

  2. 上传图片,请求媒资管理服务。

  3. 媒资管理服务将图片文件存储在MinIO。

  4. 媒资管理记录文件信息到数据库。

  5. 保存课程信息,在内容管理数据库保存图片地址。

媒资管理服务由接口层和业务层共同完成,具体分工如下:

用户上传图片请求至媒资管理的接口层,接口层解析文件信息通过业务层将文件保存至minio及数据库。如下图:

image-20230204012735647

数据模型

涉及到的数据表有:课程信息表courseBase中的图片字段、媒资数据库media的文件表mediaFiles

接口定义

根据需求分析,下边进行接口定义,此接口定义为一个通用的上传文件接口,可以上传图片或其它文件。

请求参数:

1
2
3
4
5
请求地址:/media/upload/coursefile

**Content-Type:** multipart/form-data;boundary=\.....

FormData: **filedata=??**

响应参数:文件信息


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
JSON
{
"id": "a16da7a132559daf9e1193166b3e7f52",
"companyId": 1232141425,
"companyName": null,
"filename": "1.jpg",
"fileType": "001001",
"tags": "",
"bucket": "/testbucket/2022/09/12/a16da7a132559daf9e1193166b3e7f52.jpg",
"fileId": "a16da7a132559daf9e1193166b3e7f52",
"url": "/testbucket/2022/09/12/a16da7a132559daf9e1193166b3e7f52.jpg",
"timelength": null,
"username": null,
"createDate": "2022-09-12T21:57:18",
"changeDate": null,
"status": "1",
"remark": "",
"auditStatus": null,
"auditMind": null,
"fileSize": 248329
}

准备环境

配置bucket

首先在minio配置bucket,bucket名称为:mediafiles,并设置bucket的权限为公开。

在nacos配置中minio的相关信息,进入media-service-dev.yaml,配置信息如下:


1
2
3
4
5
6
7
8
YAML
minio:
endpoint: http://minioIp:9000
accessKey: minioadmin
secretKey: minioadmin
bucket:
files: mediafiles
videofiles: video

编写minio配置类

在media-service工程中创建config文件夹,编写minio的配置类:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.xuecheng.mediaService.config;

import io.minio.MinioClient;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author xioaming
* @version 1.0
* @description Minio 配置类
* @date 2023/1/29 17:33
*/
@Configuration
public class MinioConfig {
//读取参数
@Value("${minio.endpoint}")
private String endpoint;
@Value("${minio.accessKey}")
private String accessKey;
@Value("${minio.secretKey}")
private String secretKey;

@Bean
public MinioClient minioClient() {

MinioClient minioClient =
MinioClient.builder()
.endpoint(endpoint)
.credentials(accessKey, secretKey)
.build();
return minioClient;
}
}


model-生成DTO

在media-model 中创建DTO类

定义媒资文件查询请求模型类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.xuecheng.mediaModel.dto;

import lombok.Data;
import lombok.ToString;

/**
* @author xioaming
* @version 1.0
* @description 媒资文件查询请求模型类
* @date 2023/1/29 17:28
*/
@Data
@ToString
public class QueryMediaParamsDto {
//媒资文件名称
private String filename;

//媒资类型
private String fileType;

//审核状态
private String auditStatus;
}

定义上传响应模型类


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.xuecheng.mediaModel.dto;

import com.xuecheng.mediaModel.po.MediaFiles;
import lombok.Data;

/**
* @author xioaming
* @version 1.0
* @description 上传普通文件响应参数
* @date 2023/1/29 17:31
*/
@Data
public class UploadFileResultDto extends MediaFiles {
}

定义请求参数类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.xuecheng.mediaModel.dto;

import lombok.Data;
import lombok.ToString;

/**
* @author xioaming
* @version 1.0
* @description 上传普通文件请求参数
* @date 2023/1/29 17:30
*/
@Data
@ToString
public class UploadFileParamsDto {
/**
* 文件名称
*/
private String filename;

/**
* 文件content-type
*/
private String contentType;

/**
* 文件类型(文档,音频,视频)
*/
private String fileType;
/**
* 文件大小
*/
private Long fileSize;

/**
* 标签
*/
private String tags;

/**
* 上传人
*/
private String username;

/**
* 备注
*/
private String remark;
}

service-接口开发

定义service方法

在mediafilesservice中定义方法:

注意:service和impl中涵盖了day04-06三天的接口方法,可以做到哪了再往上写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package com.xuecheng.mediaService.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.xuecheng.base.model.PageParams;
import com.xuecheng.base.model.PageResult;
import com.xuecheng.base.model.RestResponse;
import com.xuecheng.mediaModel.dto.QueryMediaParamsDto;
import com.xuecheng.mediaModel.dto.UploadFileParamsDto;
import com.xuecheng.mediaModel.dto.UploadFileResultDto;
import com.xuecheng.mediaModel.po.MediaFiles;
import org.springframework.transaction.annotation.Transactional;

import java.io.File;

/**
* @author xiaoming
* @description 媒资文件管理业务类
* @createDate 2023-01-29 17:12:59
*/
public interface MediaFilesService extends IService<MediaFiles> {
/**
* @description 媒资文件查询方法
* @param companyId 机构id
* @param pageParams 分页参数
* @param queryMediaParamsDto 查询条件
* @return com.xuecheng.base.model.PageResult<com.xuecheng.mediaModel.po.MediaFiles></>
* @author xiaoming
* @date 2023/1/29 17:39
*/
public PageResult<MediaFiles> queryMediaFiles(Long companyId, PageParams pageParams, QueryMediaParamsDto queryMediaParamsDto);

/**
* @description 上传文件的通用接口
* @param companyId 机构id
* @param uploadFileParamsDto 文件信息
* @param bytes 文件字节数组
* @param folder 桶下边的子目录
* @param objectName 对象名称
* @return com.xuecheng.mediaModel.dto.UploadFileResultDto
* @author xiaoming
* @date 2023/1/29 17:42
*/
public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, byte[] bytes,String folder,String objectName);

/**
* @description 将文件信息添加到数据库
* @param companyId 机构id
* @param fileMd5 文件md5值
* @param uploadFileParamsDto 上传文件的信息
* @param bucket_files 桶
* @param objectName 对象名称
* @return com.xuecheng.mediaModel.po.MediaFiles
* @author xiaoming
* @date 2023/2/1 21:13
*/
@Transactional
public MediaFiles addMediaFilesToDB(Long companyId,String fileMd5,UploadFileParamsDto uploadFileParamsDto,String bucket_files,String objectName);

/**
* @description 将媒资文件添加到MinIO中
* @param bytes 字节数
* @param bucket_files 桶名
* @param objectName 对象名称
* @param contentType 内容类型
* @return void
* @author xiaoming
* @date 2023/2/3 11:33
*/
public void addMediaFilesToMinIO(byte[] bytes, String bucket_files, String objectName, String contentType);

/**
* @description 添加媒资文件到MinIO
* @param filePath 文件路径
* @param bucket 桶
* @param objectName 对象名称
* @return void
* @author xiaoming
* @date 2023/2/3 11:32
*/
public void addMediaFilesToMinIO(String filePath, String bucket, String objectName);

/**
* @description 根据桶和文件路径从minio下载文件
* @param file 文件
* @param bucket 桶
* @param objectName 对象名称
* @return java.io.File
* @author xiaoming
* @date 2023/2/3 11:31
*/
public File downloadFileFromMinIO(File file, String bucket, String objectName);

/**
* @description 检查文件是否存在
* @param fileMd5 文件的Md5
* @return com.xuecheng.base.model.RestResponse<java.lang.Boolean> false不存在,true存在
* @author xiaoming
* @date 2023/2/2 14:14
*/
public RestResponse<Boolean> checkFile(String fileMd5);

/**
* @description 检查分块是否存在
* @param fileMd5 文件的md5
* @param chunkIndex 分块序号
* @return com.xuecheng.base.model.RestResponse<java.lang.Boolean>
* @author xiaoming
* @date 2023/2/2 14:16
*/
public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex);

/**
* @description 上传分块
* @param fileMd5 文件md5
* @param chunk 分块序号
* @param bytes 文件字节
* @return com.xuecheng.base.model.RestResponse
* @author xiaoming
* @date 2023/2/2 14:16
*/
public RestResponse uploadChunk(String fileMd5,int chunk,byte[] bytes);

/**
* @description 合并分块
* @param companyId 机构id
* @param fileMd5 文件md5
* @param chunkTotal 分块总和
* @param uploadFileParamsDto 文件信息
* @return com.xuecheng.base.model.RestResponse
* @author xiaoming
* @date 2023/2/2 14:17
*/
public RestResponse mergechunks(Long companyId,String fileMd5,int chunkTotal,UploadFileParamsDto uploadFileParamsDto);

/**
* @description 根据id查询文件信息
* @param id 文件id
* @return com.xuecheng.mediaModel.po.MediaFiles
* @author xiaoming
* @date 2023/2/2 16:27
*/
public MediaFiles getFileById(String id);
}

实现service方法impl

注意:service和impl中涵盖了三天的接口方法,可以做到哪了再往上写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
package com.xuecheng.mediaService.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.xuecheng.base.exception.XueChengException;
import com.xuecheng.base.model.PageParams;
import com.xuecheng.base.model.PageResult;
import com.xuecheng.base.model.RestResponse;
import com.xuecheng.mediaModel.dto.QueryMediaParamsDto;
import com.xuecheng.mediaModel.dto.UploadFileParamsDto;
import com.xuecheng.mediaModel.dto.UploadFileResultDto;
import com.xuecheng.mediaModel.po.MediaFiles;
import com.xuecheng.mediaModel.po.MediaProcess;
import com.xuecheng.mediaService.mapper.MediaFilesMapper;
import com.xuecheng.mediaService.mapper.MediaProcessMapper;
import com.xuecheng.mediaService.service.MediaFilesService;
import io.minio.GetObjectArgs;
import io.minio.MinioClient;
import io.minio.PutObjectArgs;
import io.minio.UploadObjectArgs;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.*;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.List;

/**
* @author xiaoming
* @description 媒资文件管理业务实现类
* @createDate 2023-01-29 17:12:59
*/
@Slf4j
@Service
public class MediaFilesServiceImpl extends ServiceImpl<MediaFilesMapper, MediaFiles>
implements MediaFilesService {

@Autowired
MediaFilesMapper mediaFilesMapper;

@Autowired
MediaProcessMapper mediaProcessMapper;

@Autowired
MinioClient minioClient;

@Autowired
MediaFilesService currentProxy;

@Value("${minio.bucket.files}")
private String bucket_files;

@Value("${minio.bucket.videofiles}")
private String bucket_videoFiles;

@Override
public PageResult<MediaFiles> queryMediaFiles(Long companyId, PageParams pageParams, QueryMediaParamsDto queryMediaParamsDto) {
//构建查询条件对象
LambdaQueryWrapper<MediaFiles> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(StringUtils.isNotBlank(queryMediaParamsDto.getFileType()), MediaFiles::getFileType, queryMediaParamsDto.getFileType());

queryWrapper.like(StringUtils.isNotBlank(queryMediaParamsDto.getFilename()), MediaFiles::getFilename, queryMediaParamsDto.getFilename());
//分页对象
Page<MediaFiles> page = new Page<>(pageParams.getPageNo(), pageParams.getPageSize());
// 查询数据内容获得结果
Page<MediaFiles> pageResult = mediaFilesMapper.selectPage(page, queryWrapper);
// 获取数据列表
List<MediaFiles> list = pageResult.getRecords();
// 获取数据总数
long total = pageResult.getTotal();
// 构建结果集
PageResult<MediaFiles> mediaListResult = new PageResult<>(list, total, pageParams.getPageNo(), pageParams.getPageSize());
return mediaListResult;
}


@Override
public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, byte[] bytes, String folder, String objectName) {
//生成文件id,文件的md5值
String fileId = DigestUtils.md5Hex(bytes);
//得到文件的md5值
String fileMd5 = DigestUtils.md5Hex(bytes);

if (StringUtils.isEmpty(folder)) {
//自动生成目录的路径 按年月日生成,
folder = getFileFolder(new Date(), true, true, true);
} else if (folder.indexOf("/") < 0) {
folder = folder + "/";
}
//文件名称
String filename = uploadFileParamsDto.getFilename();

if (StringUtils.isEmpty(objectName)) {
//如果objectName为空,使用文件的md5值为objectName
objectName = fileMd5 + filename.substring(filename.lastIndexOf("."));
}

objectName = folder + objectName;
MediaFiles mediaFiles = null;

try {
//上传至文件系统
addMediaFilesToMinIO(bytes, bucket_files, objectName, uploadFileParamsDto.getContentType());
//写入文件表
mediaFiles = currentProxy.addMediaFilesToDB(companyId, fileId, uploadFileParamsDto, bucket_files, objectName);

//准备返回数据
UploadFileResultDto uploadFileResultDto = new UploadFileResultDto();
BeanUtils.copyProperties(mediaFiles, uploadFileResultDto);
return uploadFileResultDto;

} catch (Exception e) {
e.printStackTrace();
log.debug("上传文件失败:{}", e.getMessage());
}
return null;
}

/**
* @param companyId 机构id
* @param fileMd5 文件md5值
* @param uploadFileParamsDto 上传文件的信息
* @param bucket_files 桶
* @param objectName 对象名称
* @return com.xuecheng.mediaModel.po.MediaFiles
* @description 将文件信息添加到数据库
* @author xiaoming
* @date 2023/2/1 20:57
*/
@Transactional
public MediaFiles addMediaFilesToDB(Long companyId, String fileMd5, UploadFileParamsDto uploadFileParamsDto, String bucket_files, String objectName) {
//保存到数据库
MediaFiles mediaFiles = new MediaFiles();
//封装数据
BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles);
mediaFiles.setId(fileMd5);
mediaFiles.setFileId(fileMd5);
mediaFiles.setCompanyId(companyId);
mediaFiles.setBucket(bucket_files);
mediaFiles.setFilePath(objectName);
mediaFiles.setUrl("/" + bucket_files + "/" + objectName);
mediaFiles.setCreateDate(LocalDateTime.now());
mediaFiles.setStatus("1");
mediaFiles.setAuditStatus("002004");

//插入文件表
int insert = mediaFilesMapper.insert(mediaFiles);
if (insert < 0) {
XueChengException.cast("保存文件信息失败");
}
//取出文件扩展名
String extension = objectName.substring(objectName.lastIndexOf("."));
//如果是avi则加入视频处理表
if (extension.equalsIgnoreCase(".avi")) {
MediaProcess mediaProcess = new MediaProcess();
BeanUtils.copyProperties(mediaFiles, mediaProcess);
mediaProcessMapper.insert(mediaProcess);
//更新url为空
mediaFiles.setUrl(null);
mediaFilesMapper.updateById(mediaFiles);
}
return mediaFiles;
}

/**
* @param bytes 文件字节数组
* @param bucket_files 桶
* @param objectName 对象名称
* @param contentType 内容类型
* @return void
* @description 将文件写入minIO
* @author xiaoming
* @date 2023/2/1 20:58
*/
public void addMediaFilesToMinIO(byte[] bytes, String bucket_files, String objectName, String contentType) {
//转为流
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
try {
PutObjectArgs putObjectArgs = PutObjectArgs.builder()
.bucket(bucket_files)
.object(objectName)
//InputStream stream, long objectSize 对象大小, long partSize 分片大小(-1表示5M,最大不要超过5T,最多10000)
.stream(byteArrayInputStream, byteArrayInputStream.available(), -1)
.contentType(contentType)
.build();
//上传到minio
minioClient.putObject(putObjectArgs);
} catch (Exception e) {
e.printStackTrace();
XueChengException.cast("上传文件到文件系统出错");
}
}


@Override
public RestResponse<Boolean> checkFile(String fileMd5) {
//查询文件信息
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);
if (mediaFiles != null) {
//桶
String bucket = mediaFiles.getBucket();
//存储目录
String filePath = mediaFiles.getFilePath();
//文件流
InputStream stream = null;
try {
stream = minioClient.getObject(
GetObjectArgs.builder()
.bucket(bucket)
.object(filePath)
.build());

if (stream != null) {
//文件已存在
return RestResponse.success(true);
}
} catch (Exception e) {
}
}
//文件不存在
return RestResponse.success(false);
}

@Override
public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex) {
//得到分块文件目录
String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
//得到分块文件的路径
String chunkFilePath = chunkFileFolderPath + chunkIndex;

//文件流
InputStream fileInputStream = null;
try {
fileInputStream = minioClient.getObject(
GetObjectArgs.builder()
.bucket(bucket_videoFiles)
.object(chunkFilePath)
.build());

if (fileInputStream != null) {
//分块已存在
return RestResponse.success(true);
}
} catch (Exception e) {

}
//分块未存在
return RestResponse.success(false);
}

//得到分块文件的目录
private String getChunkFileFolderPath(String fileMd5) {
return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";
}

@Override
public RestResponse uploadChunk(String fileMd5, int chunk, byte[] bytes) {
//得到分块文件的目录路径
String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
//得到分块文件的路径
String chunkFilePath = chunkFileFolderPath + chunk;

try {
//将文件存储至minIO
addMediaFilesToMinIO(bytes, bucket_videoFiles, chunkFilePath, "application/octet-stream");

} catch (Exception ex) {
ex.printStackTrace();
XueChengException.cast("上传过程出错请重试");
}
return RestResponse.success();
}

//检查所有分块是否上传完毕
private File[] checkChunkStatus(String fileMd5, int chunkTotal) {
//得到分块文件的目录路径
String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
File[] files = new File[chunkTotal];
//检查分块文件是否上传完毕
for (int i = 0; i < chunkTotal; i++) {
String chunkFilePath = chunkFileFolderPath + i;
//下载文件
File chunkFile = null;
try {
chunkFile = File.createTempFile("chunk" + i, null);
} catch (IOException e) {
e.printStackTrace();
XueChengException.cast("下载分块时创建临时文件出错");
}
downloadFileFromMinIO(chunkFile, bucket_videoFiles, chunkFilePath);
files[i] = chunkFile;
}
return files;
}

//根据桶和文件路径从minio下载文件
public File downloadFileFromMinIO(File file, String bucket, String objectName) {
InputStream fileInputStream = null;
OutputStream fileOutputStream = null;
try {
fileInputStream = minioClient.getObject(
GetObjectArgs.builder()
.bucket(bucket)
.object(objectName)
.build());
try {
fileOutputStream = new FileOutputStream(file);
IOUtils.copy(fileInputStream, fileOutputStream);

} catch (IOException e) {
XueChengException.cast("下载文件" + objectName + "出错");
}
} catch (Exception e) {
e.printStackTrace();
XueChengException.cast("文件不存在" + objectName);
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return file;
}

@Override
public RestResponse mergechunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) {
String fileName = uploadFileParamsDto.getFilename();
//下载所有分块文件
File[] chunkFiles = checkChunkStatus(fileMd5, chunkTotal);
//扩展名
String extName = fileName.substring(fileName.lastIndexOf("."));
//创建临时文件作为合并文件
File mergeFile = null;
try {
mergeFile = File.createTempFile(fileMd5, extName);
} catch (IOException e) {
XueChengException.cast("合并文件过程中创建临时文件出错");
}

try {
//开始合并
byte[] b = new byte[1024];
try (RandomAccessFile raf_write = new RandomAccessFile(mergeFile, "rw");) {
for (File chunkFile : chunkFiles) {
try (FileInputStream chunkFileStream = new FileInputStream(chunkFile);) {
int len = -1;
while ((len = chunkFileStream.read(b)) != -1) {
//向合并后的文件写
raf_write.write(b, 0, len);
}
}
}
} catch (IOException e) {
e.printStackTrace();
XueChengException.cast("合并文件过程中出错");
}
log.debug("合并文件完成{}", mergeFile.getAbsolutePath());
uploadFileParamsDto.setFileSize(mergeFile.length());

try (InputStream mergeFileInputStream = new FileInputStream(mergeFile);) {
//对文件进行校验,通过比较md5值
String newFileMd5 = DigestUtils.md5Hex(mergeFileInputStream);
if (!fileMd5.equalsIgnoreCase(newFileMd5)) {
//校验失败
XueChengException.cast("合并文件校验失败");
}
log.debug("合并文件校验通过{}", mergeFile.getAbsolutePath());
} catch (Exception e) {
e.printStackTrace();
//校验失败
XueChengException.cast("合并文件校验异常");
}

//将临时文件上传至minio
String mergeFilePath = getFilePathByMd5(fileMd5, extName);
try {

//上传文件到minIO
addMediaFilesToMinIO(mergeFile.getAbsolutePath(), bucket_videoFiles, mergeFilePath);
log.debug("合并文件上传MinIO完成{}", mergeFile.getAbsolutePath());
} catch (Exception e) {
e.printStackTrace();
XueChengException.cast("合并文件时上传文件出错");
}

//入数据库
MediaFiles mediaFiles = addMediaFilesToDB(companyId, fileMd5, uploadFileParamsDto, bucket_videoFiles, mergeFilePath);
if (mediaFiles == null) {
XueChengException.cast("媒资文件入库出错");
}

return RestResponse.success();
} finally {
//删除临时文件
for (File file : chunkFiles) {
try {
file.delete();
} catch (Exception e) {

}
}
try {
mergeFile.delete();
} catch (Exception e) {

}
}
}

private String getFilePathByMd5(String fileMd5, String fileExt) {
return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + fileExt;
}

//将文件上传到minIO,传入文件绝对路径
public void addMediaFilesToMinIO(String filePath, String bucket, String objectName) {
try {
minioClient.uploadObject(
UploadObjectArgs.builder()
.bucket(bucket)
.object(objectName)
.filename(filePath)
.build());
} catch (Exception e) {
e.printStackTrace();
XueChengException.cast("上传文件到文件系统出错");
}
}

@Override
public MediaFiles getFileById(String id) {
return mediaFilesMapper.selectById(id);
}


/**
* @param date 日期
* @description 根据日期拼接目录
* @author xiaoming
* @date 2023/1/29 17:48
*/
private String getFileFolder(Date date, boolean year, boolean month, boolean day) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
//获取当前日期字符串
String dateString = sdf.format(new Date());
//取出年、月、日
String[] dateStringArray = dateString.split("-");
StringBuffer folderString = new StringBuffer();
if (year) {
folderString.append(dateStringArray[0]);
folderString.append("/");
}
if (month) {
folderString.append(dateStringArray[1]);
folderString.append("/");
}
if (day) {
folderString.append(dateStringArray[2]);
folderString.append("/");
}
return folderString.toString();
}
}

api-接口定义

在 media-api 中创建 mediaFilesController,实现以下接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.xuecheng.mediaApi.api;

import com.xuecheng.base.exception.XueChengException;
import com.xuecheng.base.model.PageParams;
import com.xuecheng.base.model.PageResult;
import com.xuecheng.mediaModel.dto.QueryMediaParamsDto;
import com.xuecheng.mediaModel.dto.UploadFileParamsDto;
import com.xuecheng.mediaModel.dto.UploadFileResultDto;
import com.xuecheng.mediaModel.po.MediaFiles;
import com.xuecheng.mediaService.service.MediaFilesService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;

/**
* @author xioaming
* @version 1.0
* @description 媒资文件管理接口
* @date 2023/1/29 17:51
*/
@RestController
public class MediaFilesController {

@Autowired
MediaFilesService mediaFilesService;

@PostMapping("/files")
public PageResult<MediaFiles> list(PageParams pageParams, @RequestBody QueryMediaParamsDto queryMediaParamsDto) {
Long companyId = 1232141425L;
return mediaFilesService.queryMediaFiles(companyId, pageParams, queryMediaParamsDto);

}

@RequestMapping(value = "/upload/coursefile", consumes = {MediaType.MULTIPART_FORM_DATA_VALUE})
public UploadFileResultDto upload(
@RequestPart("filedata") MultipartFile filedata,
@RequestParam(value = "folder", required = false) String folder,
@RequestParam(value = "objectName", required = false) String objectName
) {
Long companyId = 1232141425L;
UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto();
String contentType = filedata.getContentType();
uploadFileParamsDto.setContentType(contentType);
uploadFileParamsDto.setFileSize(filedata.getSize());//文件大小
if (contentType.indexOf("image") >= 0) {
//是个图片
uploadFileParamsDto.setFileType("001001");
} else {
uploadFileParamsDto.setFileType("001003");
}
uploadFileParamsDto.setFilename(filedata.getOriginalFilename());//文件名称
UploadFileResultDto uploadFileResultDto = null;
try {
uploadFileResultDto = mediaFilesService.uploadFile(companyId, uploadFileParamsDto, filedata.getBytes(), folder, objectName);
} catch (Exception e) {
XueChengException.cast("上传文件过程中出错");
}

return uploadFileResultDto;

}

}

接口测试

httpClient测试


1
2
3
4
5
6
7
8
9
### 上传文件
POST {{media_host}}/media/upload/coursefile
Content-Type: multipart/form-data; boundary=WebAppBoundary

--WebAppBoundary
Content-Disposition: form-data; name="filedata"; filename="文件名称"
Content-Type: application/octet-stream

< 文件路径/rurisa.jpg

前后端联调测试

在新增课程、编辑课程界面上传图,保存课程信息后再次进入编辑课程界面,查看是否可以正常保存课程图片信息。

上图图片完成后,进入媒资管理,查看文件列表中是否有刚刚上传的图片信息。

bug修复

上文代码已经修复,主要是对 queryMediaFiles 查询media 信息的方法添加 querywrapper自定义查询,可自行上翻查阅

上传视频

需求分析

  1. 教学机构人员进入媒资管理列表查询自己上传的媒资文件。

  2. 教育机构用户在"媒资管理"页面中点击 "上传视频" 按钮。

  3. 选择要上传的文件,自动执行文件上传。

  4. 视频上传成功会自动处理,处理完成可以预览视频。

断点续传流程

什么是断点续传

通常视频文件都比较大,所以对于媒资系统上传文件的需求要满足大文件的上传要求。http协议本身对上传文件大小没有限制,但是客户的网络环境质量、电脑硬件环境等参差不齐,如果一个大文件快上传完了网断了没有上传完成,需要客户重新上传,用户体验非常差,所以对于大文件上传的要求最基本的是断点续传。

引用百度百科:断点续传指的是在下载或上传时,将下载或上传任务(一个文件或一个压缩包)人为的划分为几个部分,每一个部分采用一个线程进行上传或下载,如果碰到网络故障,可以从已经上传或下载的部分开始继续上传下载未完成的部分,而没有必要从头开始上传下载,断点续传可以提高节省操作时间,提高用户体验性。

流程如下:

  1. 前端上传前先把文件分成块

  2. 一块一块的上传,上传中断后重新上传,已上传的分块则不用再上传

  3. 各分块上传完成最后在服务端合并文件

image-20230204131124400

分块原理

为了更好的理解文件分块上传的原理,下边用java代码测试文件的分块与合并。文件分块的流程如下:

  1. 获取源文件长度

  2. 根据设定的分块文件的大小计算出块数

  3. 从源文件读数据依次向每一个块文件写数据。


合并原理

文件合并流程:

  1. 找到要合并的文件并按文件合并的先后进行排序。

  2. 创建合并文件

  3. 依次从合并的文件中读取数据向合并文件写入数


上传视频流程

下图是上传视频的整体流程:

image-20230204131446844

  1. 前端上传文件前请求媒资接口层检查文件是否存在,如果已经存在则不再上传。
  2. 如果文件在系统不存在前端开始上传,首先对视频文件进行分块
  3. 前端分块进行上传,上传前首先检查分块是否上传,如已上传则不再上传,如果未上传则开始上传分块。
  4. 前端请求媒资管理接口层请求上传分块。
  5. 接口层请求服务层上传分块。
  6. 服务端将分块信息上传到MinIO。
  7. 前端将分块上传完毕请求接口层合并分块。
  8. 接口层请求服务层合并分块。
  9. 服务层根据文件信息找到MinIO中的分块文件,下载到本地临时目录,将所有分块下载完毕后开始合并。
  10. 合并完成将合并后的文件上传到MinIO。

准备环境

在 base 工程的 model 中创建 RestResponse 通用结果模型类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package com.xuecheng.base.model;

import lombok.Data;
import lombok.ToString;

/**
* @author xioaming
* @version 1.0
* @description 通用结果类型
* @date 2023/2/2 14:08
*/
@Data
@ToString
public class RestResponse<T> {
/**
* 响应编码,0为正常,-1错误
*/
private int code;

/**
* 响应提示信息
*/
private String msg;

/**
* 响应内容
*/
private T result;


public RestResponse() {
this(0, "success");
}

public RestResponse(int code, String msg) {
this.code = code;
this.msg = msg;
}

/**
* 错误信息的封装
*
* @param msg
* @param <T>
* @return
*/
public static <T> RestResponse<T> validfail(String msg) {
RestResponse<T> response = new RestResponse<T>();
response.setCode(-1);
response.setMsg(msg);
return response;
}

/**
* 添加正常响应数据(包含响应内容)
*
* @return RestResponse Rest服务封装相应数据
*/
public static <T> RestResponse<T> success(T result) {
RestResponse<T> response = new RestResponse<T>();
response.setResult(result);
return response;
}

/**
* 添加正常响应数据(不包含响应内容)
*
* @return RestResponse Rest服务封装相应数据
*/
public static <T> RestResponse<T> success() {
return new RestResponse<T>();
}


public Boolean isSuccessful() {
return this.code == 0;
}
}

model-生成DTO

无新添加的配置,略

service-接口开发

上传视频主要实现四个接口:检查文件是否存在、检查分块是否存在、上传分块 以及 合并分块

定义service方法

在mediafilesservice中定义接口方法:

如果是导入我上传图片时的service,应该已经写好了,这里也是偷懒直接将整个service复制过来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package com.xuecheng.mediaService.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.xuecheng.base.model.PageParams;
import com.xuecheng.base.model.PageResult;
import com.xuecheng.base.model.RestResponse;
import com.xuecheng.mediaModel.dto.QueryMediaParamsDto;
import com.xuecheng.mediaModel.dto.UploadFileParamsDto;
import com.xuecheng.mediaModel.dto.UploadFileResultDto;
import com.xuecheng.mediaModel.po.MediaFiles;
import org.springframework.transaction.annotation.Transactional;

import java.io.File;

/**
* @author xiaoming
* @description 媒资文件管理业务类
* @createDate 2023-01-29 17:12:59
*/
public interface MediaFilesService extends IService<MediaFiles> {
/**
* @description 媒资文件查询方法
* @param companyId 机构id
* @param pageParams 分页参数
* @param queryMediaParamsDto 查询条件
* @return com.xuecheng.base.model.PageResult<com.xuecheng.mediaModel.po.MediaFiles></>
* @author xiaoming
* @date 2023/1/29 17:39
*/
public PageResult<MediaFiles> queryMediaFiles(Long companyId, PageParams pageParams, QueryMediaParamsDto queryMediaParamsDto);

/**
* @description 上传文件的通用接口
* @param companyId 机构id
* @param uploadFileParamsDto 文件信息
* @param bytes 文件字节数组
* @param folder 桶下边的子目录
* @param objectName 对象名称
* @return com.xuecheng.mediaModel.dto.UploadFileResultDto
* @author xiaoming
* @date 2023/1/29 17:42
*/
public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, byte[] bytes,String folder,String objectName);

/**
* @description 将文件信息添加到数据库
* @param companyId 机构id
* @param fileMd5 文件md5值
* @param uploadFileParamsDto 上传文件的信息
* @param bucket_files 桶
* @param objectName 对象名称
* @return com.xuecheng.mediaModel.po.MediaFiles
* @author xiaoming
* @date 2023/2/1 21:13
*/
@Transactional
public MediaFiles addMediaFilesToDB(Long companyId,String fileMd5,UploadFileParamsDto uploadFileParamsDto,String bucket_files,String objectName);

/**
* @description 将媒资文件添加到MinIO中
* @param bytes 字节数
* @param bucket_files 桶名
* @param objectName 对象名称
* @param contentType 内容类型
* @return void
* @author xiaoming
* @date 2023/2/3 11:33
*/
public void addMediaFilesToMinIO(byte[] bytes, String bucket_files, String objectName, String contentType);

/**
* @description 添加媒资文件到MinIO
* @param filePath 文件路径
* @param bucket 桶
* @param objectName 对象名称
* @return void
* @author xiaoming
* @date 2023/2/3 11:32
*/
public void addMediaFilesToMinIO(String filePath, String bucket, String objectName);

/**
* @description 根据桶和文件路径从minio下载文件
* @param file 文件
* @param bucket 桶
* @param objectName 对象名称
* @return java.io.File
* @author xiaoming
* @date 2023/2/3 11:31
*/
public File downloadFileFromMinIO(File file, String bucket, String objectName);

/**
* @description 检查文件是否存在
* @param fileMd5 文件的Md5
* @return com.xuecheng.base.model.RestResponse<java.lang.Boolean> false不存在,true存在
* @author xiaoming
* @date 2023/2/2 14:14
*/
public RestResponse<Boolean> checkFile(String fileMd5);

/**
* @description 检查分块是否存在
* @param fileMd5 文件的md5
* @param chunkIndex 分块序号
* @return com.xuecheng.base.model.RestResponse<java.lang.Boolean>
* @author xiaoming
* @date 2023/2/2 14:16
*/
public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex);

/**
* @description 上传分块
* @param fileMd5 文件md5
* @param chunk 分块序号
* @param bytes 文件字节
* @return com.xuecheng.base.model.RestResponse
* @author xiaoming
* @date 2023/2/2 14:16
*/
public RestResponse uploadChunk(String fileMd5,int chunk,byte[] bytes);

/**
* @description 合并分块
* @param companyId 机构id
* @param fileMd5 文件md5
* @param chunkTotal 分块总和
* @param uploadFileParamsDto 文件信息
* @return com.xuecheng.base.model.RestResponse
* @author xiaoming
* @date 2023/2/2 14:17
*/
public RestResponse mergechunks(Long companyId,String fileMd5,int chunkTotal,UploadFileParamsDto uploadFileParamsDto);

/**
* @description 根据id查询文件信息
* @param id 文件id
* @return com.xuecheng.mediaModel.po.MediaFiles
* @author xiaoming
* @date 2023/2/2 16:27
*/
public MediaFiles getFileById(String id);
}

实现service方法impl

同复制粘贴上传图片的 mediafilesserviceimpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
package com.xuecheng.mediaService.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.xuecheng.base.exception.XueChengException;
import com.xuecheng.base.model.PageParams;
import com.xuecheng.base.model.PageResult;
import com.xuecheng.base.model.RestResponse;
import com.xuecheng.mediaModel.dto.QueryMediaParamsDto;
import com.xuecheng.mediaModel.dto.UploadFileParamsDto;
import com.xuecheng.mediaModel.dto.UploadFileResultDto;
import com.xuecheng.mediaModel.po.MediaFiles;
import com.xuecheng.mediaModel.po.MediaProcess;
import com.xuecheng.mediaService.mapper.MediaFilesMapper;
import com.xuecheng.mediaService.mapper.MediaProcessMapper;
import com.xuecheng.mediaService.service.MediaFilesService;
import io.minio.GetObjectArgs;
import io.minio.MinioClient;
import io.minio.PutObjectArgs;
import io.minio.UploadObjectArgs;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.*;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.List;

/**
* @author xiaoming
* @description 媒资文件管理业务实现类
* @createDate 2023-01-29 17:12:59
*/
@Slf4j
@Service
public class MediaFilesServiceImpl extends ServiceImpl<MediaFilesMapper, MediaFiles>
implements MediaFilesService {

@Autowired
MediaFilesMapper mediaFilesMapper;

@Autowired
MediaProcessMapper mediaProcessMapper;

@Autowired
MinioClient minioClient;

@Autowired
MediaFilesService currentProxy;

@Value("${minio.bucket.files}")
private String bucket_files;

@Value("${minio.bucket.videofiles}")
private String bucket_videoFiles;

@Override
public PageResult<MediaFiles> queryMediaFiles(Long companyId, PageParams pageParams, QueryMediaParamsDto queryMediaParamsDto) {
//构建查询条件对象
LambdaQueryWrapper<MediaFiles> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(StringUtils.isNotBlank(queryMediaParamsDto.getFileType()), MediaFiles::getFileType, queryMediaParamsDto.getFileType());

queryWrapper.like(StringUtils.isNotBlank(queryMediaParamsDto.getFilename()), MediaFiles::getFilename, queryMediaParamsDto.getFilename());
//分页对象
Page<MediaFiles> page = new Page<>(pageParams.getPageNo(), pageParams.getPageSize());
// 查询数据内容获得结果
Page<MediaFiles> pageResult = mediaFilesMapper.selectPage(page, queryWrapper);
// 获取数据列表
List<MediaFiles> list = pageResult.getRecords();
// 获取数据总数
long total = pageResult.getTotal();
// 构建结果集
PageResult<MediaFiles> mediaListResult = new PageResult<>(list, total, pageParams.getPageNo(), pageParams.getPageSize());
return mediaListResult;
}


@Override
public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, byte[] bytes, String folder, String objectName) {
//生成文件id,文件的md5值
String fileId = DigestUtils.md5Hex(bytes);
//得到文件的md5值
String fileMd5 = DigestUtils.md5Hex(bytes);

if (StringUtils.isEmpty(folder)) {
//自动生成目录的路径 按年月日生成,
folder = getFileFolder(new Date(), true, true, true);
} else if (folder.indexOf("/") < 0) {
folder = folder + "/";
}
//文件名称
String filename = uploadFileParamsDto.getFilename();

if (StringUtils.isEmpty(objectName)) {
//如果objectName为空,使用文件的md5值为objectName
objectName = fileMd5 + filename.substring(filename.lastIndexOf("."));
}

objectName = folder + objectName;
MediaFiles mediaFiles = null;

try {
//上传至文件系统
addMediaFilesToMinIO(bytes, bucket_files, objectName, uploadFileParamsDto.getContentType());
//写入文件表
mediaFiles = currentProxy.addMediaFilesToDB(companyId, fileId, uploadFileParamsDto, bucket_files, objectName);

//准备返回数据
UploadFileResultDto uploadFileResultDto = new UploadFileResultDto();
BeanUtils.copyProperties(mediaFiles, uploadFileResultDto);
return uploadFileResultDto;

} catch (Exception e) {
e.printStackTrace();
log.debug("上传文件失败:{}", e.getMessage());
}
return null;
}

/**
* @param companyId 机构id
* @param fileMd5 文件md5值
* @param uploadFileParamsDto 上传文件的信息
* @param bucket_files 桶
* @param objectName 对象名称
* @return com.xuecheng.mediaModel.po.MediaFiles
* @description 将文件信息添加到数据库
* @author xiaoming
* @date 2023/2/1 20:57
*/
@Transactional
public MediaFiles addMediaFilesToDB(Long companyId, String fileMd5, UploadFileParamsDto uploadFileParamsDto, String bucket_files, String objectName) {
//保存到数据库
MediaFiles mediaFiles = new MediaFiles();
//封装数据
BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles);
mediaFiles.setId(fileMd5);
mediaFiles.setFileId(fileMd5);
mediaFiles.setCompanyId(companyId);
mediaFiles.setBucket(bucket_files);
mediaFiles.setFilePath(objectName);
mediaFiles.setUrl("/" + bucket_files + "/" + objectName);
mediaFiles.setCreateDate(LocalDateTime.now());
mediaFiles.setStatus("1");
mediaFiles.setAuditStatus("002004");

//插入文件表
int insert = mediaFilesMapper.insert(mediaFiles);
if (insert < 0) {
XueChengException.cast("保存文件信息失败");
}
//取出文件扩展名
String extension = objectName.substring(objectName.lastIndexOf("."));
//如果是avi则加入视频处理表
if (extension.equalsIgnoreCase(".avi")) {
MediaProcess mediaProcess = new MediaProcess();
BeanUtils.copyProperties(mediaFiles, mediaProcess);
mediaProcessMapper.insert(mediaProcess);
//更新url为空
mediaFiles.setUrl(null);
mediaFilesMapper.updateById(mediaFiles);
}
return mediaFiles;
}

/**
* @param bytes 文件字节数组
* @param bucket_files 桶
* @param objectName 对象名称
* @param contentType 内容类型
* @return void
* @description 将文件写入minIO
* @author xiaoming
* @date 2023/2/1 20:58
*/
public void addMediaFilesToMinIO(byte[] bytes, String bucket_files, String objectName, String contentType) {
//转为流
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
try {
PutObjectArgs putObjectArgs = PutObjectArgs.builder()
.bucket(bucket_files)
.object(objectName)
//InputStream stream, long objectSize 对象大小, long partSize 分片大小(-1表示5M,最大不要超过5T,最多10000)
.stream(byteArrayInputStream, byteArrayInputStream.available(), -1)
.contentType(contentType)
.build();
//上传到minio
minioClient.putObject(putObjectArgs);
} catch (Exception e) {
e.printStackTrace();
XueChengException.cast("上传文件到文件系统出错");
}
}


@Override
public RestResponse<Boolean> checkFile(String fileMd5) {
//查询文件信息
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);
if (mediaFiles != null) {
//桶
String bucket = mediaFiles.getBucket();
//存储目录
String filePath = mediaFiles.getFilePath();
//文件流
InputStream stream = null;
try {
stream = minioClient.getObject(
GetObjectArgs.builder()
.bucket(bucket)
.object(filePath)
.build());

if (stream != null) {
//文件已存在
return RestResponse.success(true);
}
} catch (Exception e) {
}
}
//文件不存在
return RestResponse.success(false);
}

@Override
public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex) {
//得到分块文件目录
String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
//得到分块文件的路径
String chunkFilePath = chunkFileFolderPath + chunkIndex;

//文件流
InputStream fileInputStream = null;
try {
fileInputStream = minioClient.getObject(
GetObjectArgs.builder()
.bucket(bucket_videoFiles)
.object(chunkFilePath)
.build());

if (fileInputStream != null) {
//分块已存在
return RestResponse.success(true);
}
} catch (Exception e) {

}
//分块未存在
return RestResponse.success(false);
}

//得到分块文件的目录
private String getChunkFileFolderPath(String fileMd5) {
return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";
}

@Override
public RestResponse uploadChunk(String fileMd5, int chunk, byte[] bytes) {
//得到分块文件的目录路径
String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
//得到分块文件的路径
String chunkFilePath = chunkFileFolderPath + chunk;

try {
//将文件存储至minIO
addMediaFilesToMinIO(bytes, bucket_videoFiles, chunkFilePath, "application/octet-stream");

} catch (Exception ex) {
ex.printStackTrace();
XueChengException.cast("上传过程出错请重试");
}
return RestResponse.success();
}

//检查所有分块是否上传完毕
private File[] checkChunkStatus(String fileMd5, int chunkTotal) {
//得到分块文件的目录路径
String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
File[] files = new File[chunkTotal];
//检查分块文件是否上传完毕
for (int i = 0; i < chunkTotal; i++) {
String chunkFilePath = chunkFileFolderPath + i;
//下载文件
File chunkFile = null;
try {
chunkFile = File.createTempFile("chunk" + i, null);
} catch (IOException e) {
e.printStackTrace();
XueChengException.cast("下载分块时创建临时文件出错");
}
downloadFileFromMinIO(chunkFile, bucket_videoFiles, chunkFilePath);
files[i] = chunkFile;
}
return files;
}

//根据桶和文件路径从minio下载文件
public File downloadFileFromMinIO(File file, String bucket, String objectName) {
InputStream fileInputStream = null;
OutputStream fileOutputStream = null;
try {
fileInputStream = minioClient.getObject(
GetObjectArgs.builder()
.bucket(bucket)
.object(objectName)
.build());
try {
fileOutputStream = new FileOutputStream(file);
IOUtils.copy(fileInputStream, fileOutputStream);

} catch (IOException e) {
XueChengException.cast("下载文件" + objectName + "出错");
}
} catch (Exception e) {
e.printStackTrace();
XueChengException.cast("文件不存在" + objectName);
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return file;
}

@Override
public RestResponse mergechunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) {
String fileName = uploadFileParamsDto.getFilename();
//下载所有分块文件
File[] chunkFiles = checkChunkStatus(fileMd5, chunkTotal);
//扩展名
String extName = fileName.substring(fileName.lastIndexOf("."));
//创建临时文件作为合并文件
File mergeFile = null;
try {
mergeFile = File.createTempFile(fileMd5, extName);
} catch (IOException e) {
XueChengException.cast("合并文件过程中创建临时文件出错");
}

try {
//开始合并
byte[] b = new byte[1024];
try (RandomAccessFile raf_write = new RandomAccessFile(mergeFile, "rw");) {
for (File chunkFile : chunkFiles) {
try (FileInputStream chunkFileStream = new FileInputStream(chunkFile);) {
int len = -1;
while ((len = chunkFileStream.read(b)) != -1) {
//向合并后的文件写
raf_write.write(b, 0, len);
}
}
}
} catch (IOException e) {
e.printStackTrace();
XueChengException.cast("合并文件过程中出错");
}
log.debug("合并文件完成{}", mergeFile.getAbsolutePath());
uploadFileParamsDto.setFileSize(mergeFile.length());

try (InputStream mergeFileInputStream = new FileInputStream(mergeFile);) {
//对文件进行校验,通过比较md5值
String newFileMd5 = DigestUtils.md5Hex(mergeFileInputStream);
if (!fileMd5.equalsIgnoreCase(newFileMd5)) {
//校验失败
XueChengException.cast("合并文件校验失败");
}
log.debug("合并文件校验通过{}", mergeFile.getAbsolutePath());
} catch (Exception e) {
e.printStackTrace();
//校验失败
XueChengException.cast("合并文件校验异常");
}

//将临时文件上传至minio
String mergeFilePath = getFilePathByMd5(fileMd5, extName);
try {

//上传文件到minIO
addMediaFilesToMinIO(mergeFile.getAbsolutePath(), bucket_videoFiles, mergeFilePath);
log.debug("合并文件上传MinIO完成{}", mergeFile.getAbsolutePath());
} catch (Exception e) {
e.printStackTrace();
XueChengException.cast("合并文件时上传文件出错");
}

//入数据库
MediaFiles mediaFiles = addMediaFilesToDB(companyId, fileMd5, uploadFileParamsDto, bucket_videoFiles, mergeFilePath);
if (mediaFiles == null) {
XueChengException.cast("媒资文件入库出错");
}

return RestResponse.success();
} finally {
//删除临时文件
for (File file : chunkFiles) {
try {
file.delete();
} catch (Exception e) {

}
}
try {
mergeFile.delete();
} catch (Exception e) {

}
}
}

private String getFilePathByMd5(String fileMd5, String fileExt) {
return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + fileExt;
}

//将文件上传到minIO,传入文件绝对路径
public void addMediaFilesToMinIO(String filePath, String bucket, String objectName) {
try {
minioClient.uploadObject(
UploadObjectArgs.builder()
.bucket(bucket)
.object(objectName)
.filename(filePath)
.build());
} catch (Exception e) {
e.printStackTrace();
XueChengException.cast("上传文件到文件系统出错");
}
}

@Override
public MediaFiles getFileById(String id) {
return mediaFilesMapper.selectById(id);
}


/**
* @param date 日期
* @description 根据日期拼接目录
* @author xiaoming
* @date 2023/1/29 17:48
*/
private String getFileFolder(Date date, boolean year, boolean month, boolean day) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
//获取当前日期字符串
String dateString = sdf.format(new Date());
//取出年、月、日
String[] dateStringArray = dateString.split("-");
StringBuffer folderString = new StringBuffer();
if (year) {
folderString.append(dateStringArray[0]);
folderString.append("/");
}
if (month) {
folderString.append(dateStringArray[1]);
folderString.append("/");
}
if (day) {
folderString.append(dateStringArray[2]);
folderString.append("/");
}
return folderString.toString();
}
}

api-接口定义

根据上传视频流程,在media-api中新创建controller,定义如下接口:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package com.xuecheng.mediaApi.api;

import com.j256.simplemagic.ContentInfo;
import com.j256.simplemagic.ContentInfoUtil;
import com.xuecheng.base.exception.XueChengException;
import com.xuecheng.base.model.RestResponse;
import com.xuecheng.mediaModel.dto.UploadFileParamsDto;
import com.xuecheng.mediaModel.po.MediaFiles;
import com.xuecheng.mediaService.service.MediaFilesService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;

/**
* @author xioaming
* @version 1.0
* @description 大文件上传接口
* @date 2023/2/2 14:06
*/
@RestController
public class BigFilesController {
@Autowired
MediaFilesService mediaFilesService;

/**
* @description 文件上传前检查文件
* @param fileMd5
* @return com.xuecheng.base.model.RestResponse<java.lang.Boolean>
* @author xiaoming
* @date 2023/2/2 14:12
*/
@PostMapping("/upload/checkfile")
public RestResponse<Boolean> checkfile(
@RequestParam("fileMd5") String fileMd5
) throws Exception {
return mediaFilesService.checkFile(fileMd5);
}

/**
* @description 分块文件上传前的检测
* @param fileMd5 文件md5
* @param chunk 分块序号
* @return com.xuecheng.base.model.RestResponse<java.lang.Boolean>
* @author xiaoming
* @date 2023/2/2 14:33
*/
@PostMapping("/upload/checkchunk")
public RestResponse<Boolean> checkchunk(
@RequestParam("fileMd5") String fileMd5,
@RequestParam("chunk") int chunk
) throws Exception {
return mediaFilesService.checkChunk(fileMd5, chunk);
}

/**
* @description 上传分块文件
* @param file 文件
* @param fileMd5 文件md5
* @param chunk 分块序号
* @return com.xuecheng.base.model.RestResponse
* @author xiaoming
* @date 2023/2/2 14:35
*/
@PostMapping("/upload/uploadchunk")
public RestResponse uploadchunk(
@RequestParam("file") MultipartFile file,
@RequestParam("fileMd5") String fileMd5,
@RequestParam("chunk") int chunk
) throws Exception {
return mediaFilesService.uploadChunk(fileMd5,chunk,file.getBytes());
}

/**
* @description 合并文件
* @param fileMd5 文件md5
* @param fileName 文件名
* @param chunkTotal 分块总和
* @return com.xuecheng.base.model.RestResponse
* @author xiaoming
* @date 2023/2/2 14:36
*/
@PostMapping("/upload/mergechunks")
public RestResponse mergechunks(
@RequestParam("fileMd5") String fileMd5,
@RequestParam("fileName") String fileName,
@RequestParam("chunkTotal") int chunkTotal
) throws Exception {
Long companyId = 1232141425L;

UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto();
uploadFileParamsDto.setFileType("001002");
uploadFileParamsDto.setTags("课程视频");
uploadFileParamsDto.setRemark("");
uploadFileParamsDto.setFilename(fileName);
ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(fileName);
String mimeType = extensionMatch.getMimeType();
uploadFileParamsDto.setContentType(mimeType);

return mediaFilesService.mergechunks(companyId, fileMd5, chunkTotal, uploadFileParamsDto);
}

/**
* @description 预览文件
* @param mediaId 文件id
* @return com.xuecheng.base.model.RestResponse<java.lang.String>
* @author xiaoming
* @date 2023/2/2 16:26
*/
@GetMapping("/preview/{mediaId}")
public RestResponse<String> getPlayUrlByMediaId(@PathVariable String mediaId){
MediaFiles mediaFiles = mediaFilesService.getFileById(mediaId);
if (mediaFiles == null || StringUtils.isEmpty(mediaFiles.getUrl())){
XueChengException.cast("视频并未进行转码处理");
}
return RestResponse.success(mediaFiles.getUrl());
}

}


接口测试

如果是单个接口测试使用httpclient

建议直接前后端联调,更好看到结果


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Java
### 检查文件
POST{{media_host}}/media/upload/register
Content-Type: application/x-www-form-urlencoded;

fileMd5=查表找filemd5

### 上传分块前检查
POST {{media_host}}/media/upload/checkchunk
Content-Type: application/x-www-form-urlencoded;

fileMd5=c5c75d70f382e6016d2f506d134eee11&chunk=0

### 上传分块文件
POST {{media_host}}/media/upload/uploadchunk?fileMd5=c5c75d70f382e6016d2f506d134eee11&chunk=1
Content-Type: multipart/form-data; boundary=WebAppBoundary

--WebAppBoundary
Content-Disposition: form-data; name="file"; filename="1"
Content-Type: application/octet-stream

< E:/ffmpeg_test/chunks/1


### 合并文件
POST {{media_host}}/media/upload/mergechunks
Content-Type: application/x-www-form-urlencoded;

fileMd5=dcb37b85c9c03fc5243e20ab4dfbc1c8&fileName=8.avi&chunkTotal=1

下边介绍采用前后联调:

  1. 首先在每个接口层方法上打开断点

在前端上传视频,观察接口层是否收到参数。

  1. 进入service方法逐行跟踪。

  2. 断点续传测试

上传一部分后,停止刷新浏览器再重新上传,通过浏览器日志发现已经上传过的分块不再重新上传

文件预览

需求分析

图片上传成功、视频上传成功可以通过预览功能查看文件的内容。

预览的方式是通过浏览器直接打文件,对于图片和浏览器支持的视频格式的视频文件可以直接预览。

业务流程如下:

  1. 前端请求接口层预览文件

  2. 接口层将文件id传递给服务层

  3. 服务层使用文件id查询媒资数据库文件表,获取文件的url

  4. 接口层将文件url返回给前端,通过浏览器打开URL。

接口定义

根据需求分析定义接口如下:


1
2
3
4
5
6
Java
@ApiOperation("预览文件")
@GetMapping("/preview/{mediaId}")
public RestResponse<String> getPlayUrlByMediaId(@PathVariable String mediaId){

}

model-生成DTO

无新增配置,略

service-接口开发

定义service方法

还是在media-service中创建接口


1
2
3
4
5
6
7
8
/**
* @description 根据id查询文件信息
* @param id 文件id
* @return com.xuecheng.mediaModel.po.MediaFiles
* @author xiaoming
* @date 2023/2/2 16:27
*/
public MediaFiles getFileById(String id);

实现service方法impl


1
2
3
4
Java
public MediaFiles getFileById(String id) {
return mediaFilesMapper.selectById(id);
}

api-接口定义:

在 bigFilescontroller中完善:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* @description 预览文件
* @param mediaId 文件id
* @return com.xuecheng.base.model.RestResponse<java.lang.String>
* @author xiaoming
* @date 2023/2/2 16:26
*/
@GetMapping("/preview/{mediaId}")
public RestResponse<String> getPlayUrlByMediaId(@PathVariable String mediaId){
MediaFiles mediaFiles = mediaFilesService.getFileById(mediaId);
if (mediaFiles == null || StringUtils.isEmpty(mediaFiles.getUrl())){
XueChengException.cast("视频并未进行转码处理");
}
return RestResponse.success(mediaFiles.getUrl());
}

接口测试

使用前后端联调

  • 上传mp4视频文件,预览文件。

  • 上传图片文件,预览文件。

对于无法预览的视频文件,稍后通过视频处理对视频转码。

视频处理

FFmpeg 的基本使用

我们将视频录制完成后,使用视频编码软件对视频进行编码,本项目使用FFmpeg对视频进行编码 。

FFmpeg被许多开源项目采用,QQ影音、暴风影音、VLC等。

下载:FFmpeg https://www.ffmpeg.org/download.html#build-windows

请从课程资料目录解压ffmpeg.zip,并将解压得到的exe文件加入环境变量。

测试是否正常:cmd运行 ffmpeg -v

安装成功,作下简单测试:将一个.avi文件转成mp4、mp3、gif等。

比如我们将nacos.avi文件转成mp4,运行如下命令:

ffmpeg -i nacos.avi nacos.mp4

转成mp3:ffmpeg -i nacos.avi nacos.mp3

转成gif:ffmpeg -i nacos.avi nacos.gif

官方文档(英文):http://ffmpeg.org/ffmpeg.html

分布式任务处理概念

什么是分布式任务调度

如何去高效处理一批任务呢?

  1. 多线程

多线程是充分利用单机的资源。

  1. 分布式加多线程

充分利用我台计算机,每台计算机多线程处理。

方案2可扩展性更强,是一种分布式任务调度的处理方案。

在思考什么是分布式任务调度之前,我们可以先思考一下下面业务场景的解决方案:

  • 某电商系统需要在每天上午10点,下午3点,晚上8点发放一批优惠券。

  • 某财务系统需要在每天上午10点前结算前一天的账单数据,统计汇总。

  • 某电商平台每天凌晨3点,要对订单中的无效订单进行清理。

  • 12306网站会根据车次不同,设置几个时间点分批次放票。

  • 电商整点抢购,商品价格某天上午8点整开始优惠。

  • 商品成功发货后,需要向客户发送短信提醒。

以上这些场景,就是任务调度所需要解决的问题。

任务调度顾名思义,就是对任务的调度,它是指系统为了完成特定业务,基于给定时间点,给定时间间隔或者给定执行次数自动执行任务。

如何实现任务调度?

多线程方式实现:

学过多线程的同学,可能会想到,我们可以开启一个线程,每sleep一段时间,就去检查是否已到预期执行时间。

以下代码简单实现了任务调度的功能:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Java
public static void main(String[] args) {
//任务执行间隔时间
final long timeInterval = 1000;
Runnable runnable = new Runnable() {
public void run() {
while (true) {
//TODO:something
try {
Thread.sleep(timeInterval);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Thread thread = new Thread(runnable);
thread.start();
}

上面的代码实现了按一定的间隔时间执行任务调度的功能。

Jdk也为我们提供了相关支持,如Timer、ScheduledExecutor,下边我们了解下。

Timer方式实现


1
2
3
4
5
6
7
8
9
10
Java
public static void main(String[] args){
Timer timer = new Timer();
timer.schedule(new TimerTask(){
@Override
public void run() {
//TODO:something
}
}, 1000, 2000); //1秒后开始调度,每2秒执行一次
}

Timer的优点在于简单易用,每个Timer对应一个线程,因此可以同时启动多个Timer并行执行多个任务,同一个Timer中的任务是串行执行。

ScheduledExecutor方式实现


1
2
3
4
5
6
7
8
9
10
11
12
13
Java
public static void main(String [] agrs){
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
service.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
//TODO:something
System.out.println("todo something");
}
}, 1,
2, TimeUnit.SECONDS);
}

Java 5 推出了基于线程池设计的ScheduledExecutor,其设计思想是,每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰。

Timer 和 ScheduledExecutor都仅能提供基于开始时间与重复间隔的任务调度,不能胜任更加复杂的调度需求。比如,设置每月第一天凌晨1点执行任务、复杂调度任务的管理、任务间传递数据等等。

Quartz
Quartz是一个功能强大的任务调度框架,它可以满足更多更复杂的调度需求,Quartz设计的核心类包括 Scheduler, Job 以及 Trigger。其中,Job负责定义需要执行的任务,Trigger 负责设置调度策略,Scheduler将二者组装在一起,并触发任务开始执行。Quartz支持简单的按时间间隔调度、还支持按日历调度方式,通过设置CronTrigger表达式(包括:秒、分、时、日、月、周、年)进行任务调度。

第三方Quartz方式实现


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Java
public static void main(String [] agrs) throws SchedulerException {
//创建一个Scheduler
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler scheduler = schedulerFactory.getScheduler();
//创建JobDetail
JobBuilder jobDetailBuilder = JobBuilder.newJob(MyJob.class);
jobDetailBuilder.withIdentity("jobName","jobGroupName");
JobDetail jobDetail = jobDetailBuilder.build();
//创建触发的CronTrigger 支持按日历调度
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity("triggerName", "triggerGroupName")
.startNow()
.withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?"))
.build();
//创建触发的SimpleTrigger 简单的间隔调度
/*SimpleTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity("triggerName","triggerGroupName")
.startNow()
.withSchedule(SimpleScheduleBuilder
.simpleSchedule()
.withIntervalInSeconds(2)
.repeatForever())
.build();*/
scheduler.scheduleJob(jobDetail,trigger);
scheduler.start();
}

public class MyJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext){
System.out.println("todo something");
}
}

通过以上内容我们学习了什么是任务调度,任务调度所解决的问题,以及任务调度的多种实现方式。

什么是分布式任务调度?

通常任务调度的程序是集成在应用中的,比如:优惠卷服务中包括了定时发放优惠卷的的调度程序,结算服务中包括了定期生成报表的任务调度程序,由于采用分布式架构,一个服务往往会部署多个冗余实例来运行我们的业务,在这种分布式系统环境下运行任务调度,我们称之为分布式任务调度

分布式调度要实现的目标:

不管是任务调度程序集成在应用程序中,还是单独构建的任务调度系统,如果采用分布式调度任务的方式就相当于将任务调度程序分布式构建,这样就可以具有分布式系统的特点,并且提高任务的调度处理能力:

  1. 并行任务调度

并行任务调度实现靠多线程,如果有大量任务需要调度,此时光靠多线程就会有瓶颈了,因为一台计算机CPU的处理能力是有限的。

如果将任务调度程序分布式部署,每个结点还可以部署为集群,这样就可以让多台计算机共同去完成任务调度,我们可以将任务分割为若干个分片,由不同的实例并行执行,来提高任务调度的处理效率。

  1. 高可用

若某一个实例宕机,不影响其他实例来执行任务。

  1. 弹性扩容

当集群中增加实例就可以提高并执行任务的处理效率。

  1. 任务管理与监测

对系统中存在的所有定时任务进行统一的管理及监测。让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应。

  1. 避免任务重复执行

当任务调度以集群方式部署,同一个任务调度可能会执行多次,比如在上面提到的电商系统中到点发优惠券的例子,就会发放多次优惠券,对公司造成很多损失,所以我们需要控制相同的任务在多个运行实例上只执行一次。

XXL-JOB介绍

XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

官网:https://www.xuxueli.com/xxl-job/

文档:https://www.xuxueli.com/xxl-job/#%E3%80%8A%E5%88%86%E5%B8%83%E5%BC%8F%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA%A6%E5%B9%B3%E5%8F%B0XXL-JOB%E3%80%8B

XXL-JOB主要有调度中心、执行器、任务:

  • 调度中心:

    • 负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码;

    • 主要职责为执行器管理、任务管理、监控运维、日志管理等

  • 任务执行器:

    • 负责接收调度请求并执行任务逻辑;

    • 只要职责是注册服务、任务执行服务(接收到任务后会放入线程池中的任务队列)、执行结果上报、日志服务等

  • 任务:负责执行具体的业务处理。

    • 调度中心与执行器之间的工作流程如下:

image-20230204134128242

执行流程:

  1. 任务执行器根据配置的调度中心的地址,自动注册到调度中心

  2. 达到任务触发条件,调度中心下发任务

  3. 执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中

  4. 执行器消费内存队列中的执行结果,主动上报给调度中心

  5. 当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情

搭建XXL-JOB

下载XXL-JOB

GitHub:https://github.com/xuxueli/xxl-job

码云:https://gitee.com/xuxueli0323/xxl-job

项目使用2.3.1版本:
https://github.com/xuxueli/xxl-job/releases/tag/2.3.1

也可从课程资料目录获取,解压xxl-job-2.3.1.zip,使用IDEA打开解压后的目录

  • xxl-job-admin:调度中心

  • xxl-job-core:公共依赖

  • xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用)

    • xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;

    • xxl-job-executor-sample-frameless:无框架版本;

doc :文档资料,包含数据库脚本

创建数据库

首先修改doc下的tables_xxl_job.sql脚本内容:


1
2
3
Java
CREATE database if NOT EXISTS `xxl_job_2.3.1` default character set utf8mb4 collate utf8mb4_unicode_ci;
use `xxl_job_2.3.1`;

将tables_xxl_job.sql脚本导入xxl_job_2.3.1数据库

修改配置文件

修改xxl-job-admin任务调度中心下application.properties的配置文件内容,修改数据库链接地址:


1
2
3
4
Java
spring.datasource.url=jdbc:mysql://mysqlIP:3306/xxl_job_2.3.1?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=账号
spring.datasource.password=密码

启动工程

启动xxl-job-admin任务调度中心,运行com.xxl.job.admin.XxlJobAdminApplication

启动成功访问http://localhost:8080/xxl-job-admin

账号:admin

密码:123456

到此调度中心启动成功。

配置执行器

执行器负责与调度中心通信接收调度中心发起的任务调度请求。

添加依赖

在media-service工程添加依赖,在项目的父工程已约定了版本2.3.1


1
2
3
4
5
XML
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
</dependency>

配置nacos信息

在nacos下的media-service-dev.yaml下配置xxl-job


1
2
3
4
5
6
7
8
9
10
11
12
13
YAML
xxl:
job:
admin:
addresses: http://localhost:端口号/xxl-job-admin
executor:
appname: media-process-service
address:
ip:
port: 9999
logpath: /data/applogs/xxl-job/jobhandler
logretentiondays: 30
accessToken: default_token

注意配置中的appname这是执行器的应用名,稍后在调度中心配置执行器时要使用。

配置xxl-job执行器

将示例工程下配置类:

image-20230204134833634

拷贝到media-service工程的config文件夹下:

image-20230204134920409

添加执行器

进入调度中心添加执行器

image-20230204134952095

点击新增,填写执行器信息,appname是前边在nacos中配置xxl信息时指定的执行器的应用名。

image-20230204135006426

添加成功:

image-20230204135023768

到此完成媒资管理模块service工程配置xxl-job执行器,在xxl-job调度中心添加执行器

测试连接

准备测试执行器与调度中心是否正常通信,因为接口工程依赖了service工程,所以启动media-api工程。

启动后观察日志,出现下边的日志表示执行器在调度中心注册成功

image-20230204135116819

同时观察调度中心中的执行器界面

image-20230204135132368

在线机器地址处已显示1个执行器。

测试任务类

  1. 在media-service包下新建jobhandler存放任务类,下边参考示例工程编写一个任务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.xuecheng.mediaService.jobhandler;

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* @author xioaming
* @version 1.0
* @description 测试执行器
* @date 2023/2/2 22:02
*/
@Component
@Slf4j
public class SampleJob {
/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("testJob")
public void testJob() throws Exception {
log.info("开始执行.....");

}

/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {

// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
log.info("开始执行第"+shardIndex+"批任务");

}
}


  1. 在调度中心添加任务,进入任务管理

image-20230204135332870

点击新增,填写任务信息

image-20230204135349522

注意红色标记处

  • 调度类型选择Cron,并配置Cron表达式设置定时策略。

  • 运行模式有BEAN和GLUE,bean模式较常用就是在项目工程中编写执行器的任务代码,GLUE是将任务代码编写在调度中心。

  • JobHandler任务方法名填写@XxlJob注解中的名称。

  1. 添加成功,启动任务

image-20230204135451690

通过调度日志查看任务执行情况

image-20230204135505397

  1. 启动媒资管理的service工程,启动执行器,观察执行器方法的执行。

image-20230204135528438

  1. 如果要停止任务需要在调度中心操作

image-20230204135552671

  1. 任务跑一段时间注意清理日志

image-20230204135615285

需求分析

作业分片方案

掌握了xxl-job的作业分片调度方式,下边思考如何分布式去执行学成在线平台中的视频处理任务。

任务添加成功后,对于要处理的任务会添加到待处理任务表中,现在启动多个执行器实例去查询这些待处理任务,此时如何保证多个执行器不会重复执行任务?

执行器收到调度请求后各自己查询属于自己的任务,这样就保证了执行器之间不会重复执行任务。

xxl-job设计作业分片就是为了分布式执行任务,XXL-JOB并不直接提供数据处理的功能,它只会给执行器分配好分片序号并向执行器传递分片总数、分片序号这些参数,开发者需要自行处理分片项与真实数据的对应关系。

每个执行器收到广播任务有两个参数:分片总数、分片序号。每个执行从数据表取任务时可以让任务id
模上 分片总数,如果等于分片序号则执行此任务。

上边两个执行器实例那么分片总数为2,序号为0、1,从任务1开始,如下:

1 % 2 = 1 执行器2执行

2 % 2 = 0 执行器1执行

3 % 2 = 1 执行器2执行

以此类推.

保证任务不重复执行

通过作业分片的方式保证了执行器之间分配的任务不重复,如果同一个执行器在处理一个视频还没有完成,此时调度中心又一次请求调度,为了不重复处理同一个视频该怎么办?

  1. 首先配置调度过期策略,查看文档如下:

调度过期策略:

  • 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;

  • 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;

  • 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;

这里我们选择忽略,如果立即执行一次可能会重复调度。

  1. 看阻塞处理策略,阻塞处理策略就是当前执行器正在执行任务还没有结束,此时调度时间到该如何处理,查看文档如下:
  • 单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
  • 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
  • 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;

这里选择 丢弃后续调度,避免重复调度。

  1. 注意保证任务处理的幂等性

什么是任务的幂等性?

任务的幂等性是指:对于数据的操作不论多少次,操作的结果始终是一致的。执行器接收调度请求去执行任务,要有办法去判断该任务是否处理完成,如果处理完则不再处理,即使重复调度处理相同的任务也不能重复处理相同的视频。

什么是幂等性?

它描述了一次和多次请求某一个资源对于资源本身应该具有同样的结果。幂等性是为了解决重复提交问题,比如:恶意刷单,重复支付等。

解决幂等性常用的方案:

  1. 数据库约束,比如:唯一索引,主键。

  2. 乐观锁,常用于数据库,更新数据时根据乐观锁状态去更新。

  3. 唯一序列号,操作传递一个唯一序列号,操作时判断与该序列号相等则执行。

这里我们在数据库视频处理表中添加处理状态字段,视频处理完成更新状态为完成,执行视频处理前判断状态是否完成,如果完成则不再处理。

业务流程

确定了分片方案,下边梳理整个视频上传及处理的业务流程。

image-20230204140310947

视频处理的详细流程如下:

image-20230204140329560

  1. 任务调度中心广播作业分片。

  2. 执行器收到广播作业分片,从数据库读取待处理任务。

  3. 执行器根据任务内容从MinIO下载要处理的文件。

  4. 执行器启动多线程去处理任务。

  5. 任务处理完成,上传处理后的视频到MinIO。

  6. 将更新任务处理结果,如果视频处理完成除了更新任务处理结果以外还要将文件的访问地址更新至任务处理表及文件表中,最后将任务完成记录写入历史表

model-生成DTO

根据需求分析,DAO接口包括如下:

  1. 查询待处理视频列表。
  2. 保证视频处理结果
  3. 添加视频历史处理记录表。
  4. 删除已完成视频处理记录
  5. 上传处理后的视频文件向media_files插入记录

在 mediaProcessMapper中编写根据分片参数获取待处理任务的DAO方法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.xuecheng.mediaService.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.xuecheng.mediaModel.po.MediaProcess;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;

import java.util.List;

/**
* @author 15182
* @description 针对表【media_process】的数据库操作Mapper
* @createDate 2023-01-29 17:12:59
* @Entity com.xuecheng.mediaModel.po.MediaProcess
*/
public interface MediaProcessMapper extends BaseMapper<MediaProcess> {

/**
* @description 根据分片参数获取待处理任务
* @param shardTotal 分片总数
* @param shardindex 分片序号
* @param count 任务数
* @return java.util.List<com.xuecheng.mediaModel.po.MediaProcess>
* @author xiaoming
* @date 2023/2/3 10:58
*/
@Select("SELECT t.* FROM media_process t WHERE t.id % #{shardTotal} = #{shardindex} and t.status='1' limit #{count}")
List<MediaProcess> selectListByShardIndex(@Param("shardTotal") int shardTotal, @Param("shardindex") int shardindex, @Param("count") int count);
}

Service-接口开发

定义Service方法

在MediaFileProcessService中定义方法:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.xuecheng.mediaService.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.xuecheng.mediaModel.po.MediaProcess;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

/**
* @author 15182
* @description 针对表【media_process】的数据库操作Service
* @createDate 2023-01-29 17:12:59
*/
public interface MediaProcessService extends IService<MediaProcess> {
/**
* @description 将url存储至数据,并更新状态为成功,并将待处理视频记录删除存入历史
* @param status 处理结果,2:成功 / 3:失败
* @param fileId 文件Id
* @param url 文件访问url
* @param errorMsg 失败信息
* @return void
* @author xiaoming
* @date 2023/2/3 11:01
*/
@Transactional
public void saveProcessFinishStatus(int status, String fileId, String url, String errorMsg);

/**
* @description 获取待处理信息
* @param shardIndex 分配序号
* @param shardTotal 分配总数
* @param count 获取记录数
* @return java.util.List<com.xuecheng.mediaModel.po.MediaProcess>
* @author xiaoming
* @date 2023/2/4 14:07
*/
public List<MediaProcess> getMediaProcessList(int shardIndex, int shardTotal, int count);

}


实现service方法impl


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package com.xuecheng.mediaService.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.xuecheng.mediaModel.po.MediaFiles;
import com.xuecheng.mediaModel.po.MediaProcess;
import com.xuecheng.mediaModel.po.MediaProcessHistory;
import com.xuecheng.mediaService.mapper.MediaFilesMapper;
import com.xuecheng.mediaService.mapper.MediaProcessHistoryMapper;
import com.xuecheng.mediaService.mapper.MediaProcessMapper;
import com.xuecheng.mediaService.service.MediaProcessService;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.List;

/**
* @author 15182
* @description 针对表【media_process】的数据库操作Service实现
* @createDate 2023-01-29 17:12:59
*/
@Service
public class MediaProcessServiceImpl extends ServiceImpl<MediaProcessMapper, MediaProcess>
implements MediaProcessService {

@Autowired
MediaFilesMapper mediaFilesMapper;

@Autowired
MediaProcessMapper mediaProcessMapper;

@Autowired
MediaProcessHistoryMapper mediaProcessHistoryMapper;

@Override
public void saveProcessFinishStatus(int status, String fileId, String url, String errorMsg) {
LambdaQueryWrapper<MediaProcess> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(MediaProcess::getFileId, fileId);
MediaProcess mediaProcess = mediaProcessMapper.selectOne(queryWrapper);
if (mediaProcess == null){
return;
}

//处理失败结果
if (status == 3){
mediaProcess.setStatus("3");
mediaProcess.setErrormsg(errorMsg);
mediaProcessMapper.updateById(mediaProcess);
return;
}

MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId);
if (mediaFiles != null){
mediaFiles.setUrl(url);
mediaFilesMapper.updateById(mediaFiles);
}

mediaProcess.setUrl(url);
mediaProcess.setStatus("2");
mediaProcess.setFinishDate(LocalDateTime.now());
mediaProcessMapper.updateById(mediaProcess);

//添加到历史记录
MediaProcessHistory mediaProcessHistory = new MediaProcessHistory();
BeanUtils.copyProperties(mediaProcess, mediaProcessHistory);
mediaProcessHistoryMapper.insert(mediaProcessHistory);
//删除mediaProcess
mediaProcessMapper.deleteById(mediaProcess.getId());
}

@Override
public List<MediaProcess> getMediaProcessList(int shardIndex, int shardTotal, int count) {
List<MediaProcess> mediaProcessList = mediaProcessMapper.selectListByShardIndex(shardTotal, shardIndex, count);
return mediaProcessList;
}
}


utils-任务类开发

视频处理工具类

将课程资料目录中的util.zip解压,将解压出的工具类拷贝至base工程,并在base工程添加如下依赖:

image-20230204141021004


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<!-- fast Json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>

<!-- servlet Api 依赖 -->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<scope>provided</scope>
</dependency>

<!-- 通用组件 -->
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.11</version>
</dependency>

其中Mp4VideoUtil类是用于将视频转为mp4格式,是我们项目要使用的工具类。

下边看下这个类的代码,并进行测试。

我们要通过ffmpeg对视频转码,Java程序调用ffmpeg,使用java.lang.ProcessBuilder去完成,具体在Mp4VideoUtil类的63行。

使用该类的main方法进行测试


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Java
public static void main(String[] args) throws IOException {
//ffmpeg的路径
String ffmpeg_path = "D:\\soft\\ffmpeg\\ffmpeg.exe";//ffmpeg的安装位置
//源avi视频的路径
String video_path = "D:\\develop\\bigfile_test\\nacos01.avi";
//转换后mp4文件的名称
String mp4_name = "nacos01.mp4";
//转换后mp4文件的路径
String mp4_path = "D:\\develop\\bigfile_test\\nacos01.mp4";
//创建工具类对象
Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path,video_path,mp4_name,mp4_path);
//开始视频转换,成功将返回success
String s = videoUtil.generateMp4();
System.out.println(s);
}

执行main方法,最终在控制台输出 success 表示执行成功。

定义任务类

视频采用并发处理,每个视频使用一个线程去处理,每次处理的视频数量不要超过cpu核心数。

所有视频处理完成结束本次执行,为防止代码异常出现无限期等待添加超时设置,到达超时时间还没有处理完成仍结束任务。

在media-service工程的 jobhandler文件夹中定义任务类VideoTask 如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package com.xuecheng.mediaService.jobhandler;

import com.xuecheng.base.utils.Mp4VideoUtil;
import com.xuecheng.mediaModel.po.MediaProcess;
import com.xuecheng.mediaService.service.MediaFilesService;
import com.xuecheng.mediaService.service.MediaProcessService;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author xioaming
* @version 1.0
* @description 视频处理任务
* @date 2023/2/3 11:23
*/
@Component
@Slf4j
public class VideoTask {

@Autowired
private MediaFilesService mediaFilesService;

@Autowired
private MediaProcessService mediaProcessService;

//ffmpeg绝对路径
@Value("${videoprocess.ffmpegpath}")
String ffmpegPath;

//视频处理
@XxlJob("videoJobHandler")
public void videoJobHandler() throws Exception {
// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
log.debug("shardIndex=" + shardIndex + ",shardTotal=" + shardTotal);

//一次取出2条记录,可以调整此数据,一次处理的最大个数不要超过cpu核心数
List<MediaProcess> mediaProcesses = mediaProcessService.getMediaProcessList(shardIndex, shardTotal, 2);
//数据个数
int size = mediaProcesses.size();

log.debug("取出待处理视频记录" + size + "条");
if (size <= 0) {
return;
}

//启动size上线程数量的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(size);
//计数器
CountDownLatch countDownLatch = new CountDownLatch(size);

mediaProcesses.forEach(mediaProcess -> {
threadPool.execute(() -> {
//桶
String bucket = mediaProcess.getBucket();
//存储路径
String filePath = mediaProcess.getFilePath();
//原始视频的md5值
String fileId = mediaProcess.getFileId();
//原始文件名称
String filename = mediaProcess.getFilename();

//下载文件
//先创建临时文件,为原始的视频文件
File originalVideo = null;
//处理结束的mp4文件
File mp4Video = null;
try {
originalVideo = File.createTempFile("original", null);
mp4Video = File.createTempFile("mp4", ".mp4");
} catch (IOException e) {
log.error("下载待处理的原始文件前创建临时文件失败");
}
try {
originalVideo = mediaFilesService.downloadFileFromMinIO(originalVideo, bucket, filePath);

//开始处理视频
Mp4VideoUtil mp4VideoUtil = new Mp4VideoUtil(ffmpegPath, originalVideo.getAbsolutePath(), mp4Video.getName(), mp4Video.getAbsolutePath());
String result = mp4VideoUtil.generateMp4();
if (!result.equals("success")) {
//记录错误信息
log.error("generateMp4 error ,video_path is {},error msg is {}", bucket + filePath, result);
mediaProcessService.saveProcessFinishStatus(3, fileId, null, result);
return;
}
//将mp4上传至minio
//文件路径
String objectName = getFilePath(fileId, ".mp4");
mediaFilesService.addMediaFilesToMinIO(mp4Video.getAbsolutePath(), bucket, objectName);

//访问url
String url = "/" + bucket + "/" + objectName;
//将url存储至数据,并更新状态为成功,并将待处理视频记录删除存入历史
mediaProcessService.saveProcessFinishStatus(2, fileId, url, null);

} catch (Exception e) {
e.printStackTrace();
} finally {
//清理文件
if (originalVideo != null) {
try {
originalVideo.delete();
} catch (Exception e) {

}
}
if (mp4Video != null) {
try {
mp4Video.delete();
} catch (Exception e) {

}
}
}
countDownLatch.countDown();

});
});

//等待,给一个充裕的超时时间,防止无限等待,到达超时时间还没有处理完成则结束任务
countDownLatch.await();
}

private String getFilePath(String fileMd5,String fileExt){
return fileMd5.substring(0,1) + "/" + fileMd5.substring(1,2) + "/" + fileMd5 + "/" +fileMd5 +fileExt;
}
}

视频处理测试

上传视频代码完善

上传视频成功根据视频文件格式判断,如果是avi文件则需要加入视频待处理表。

前面mediaserviceImpl的代码已经修改完毕


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* @param companyId 机构id
* @param fileMd5 文件md5值
* @param uploadFileParamsDto 上传文件的信息
* @param bucket_files 桶
* @param objectName 对象名称
* @return com.xuecheng.mediaModel.po.MediaFiles
* @description 将文件信息添加到数据库
* @author xiaoming
* @date 2023/2/1 20:57
*/
@Transactional
public MediaFiles addMediaFilesToDB(Long companyId, String fileMd5, UploadFileParamsDto uploadFileParamsDto, String bucket_files, String objectName) {
//保存到数据库
MediaFiles mediaFiles = new MediaFiles();
//封装数据
BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles);
mediaFiles.setId(fileMd5);
mediaFiles.setFileId(fileMd5);
mediaFiles.setCompanyId(companyId);
mediaFiles.setBucket(bucket_files);
mediaFiles.setFilePath(objectName);
mediaFiles.setUrl("/" + bucket_files + "/" + objectName);
mediaFiles.setCreateDate(LocalDateTime.now());
mediaFiles.setStatus("1");
mediaFiles.setAuditStatus("002004");

//插入文件表
int insert = mediaFilesMapper.insert(mediaFiles);
if (insert < 0) {
XueChengException.cast("保存文件信息失败");
}
//取出文件扩展名
String extension = objectName.substring(objectName.lastIndexOf("."));
//如果是avi则加入视频处理表
if (extension.equalsIgnoreCase(".avi")) {
MediaProcess mediaProcess = new MediaProcess();
BeanUtils.copyProperties(mediaFiles, mediaProcess);
mediaProcessMapper.insert(mediaProcess);
//更新url为空
mediaFiles.setUrl(null);
mediaFilesMapper.updateById(mediaFiles);
}
return mediaFiles;
}

视频处理调度策略

进入xxl-job调度中心添加执行器和任务,在xxl-job配置任务调度策略:

  1. 配置阻塞处理策略为:丢弃后续调度。

  2. 配置视频处理调度时间间隔不用根据视频处理时间去确定,可以配置的小一些,如:5分钟,即使到达调度时间如果视频没有处理完会丢失调度请求。

image-20230204141648093

配置完成开始测试视频处理:

  1. 首先上传至少4个视频,非mp4格式。

  2. 启动媒资管理服务,观察日志

注意:如果数据库在导表的时候添加了 processHistory 数据,要记得删除,确保 process 表和 processhistory 表都是空的,否则在视频转码完成写入进历史表时会冲突

绑定媒资

需求分析

业务流程

到目前为止,媒资管理已完成文件上传、视频处理、我的媒资功能等基本功能,其它模块可以使用媒资文件,本节要讲解课程计划绑定媒资文件。

  1. 首先进入课程计划界面,然后选择要绑定的视频进行绑定即可。

具体的业务流程如下:

  1. 教育机构用户进入课程管理页面并编辑某一个课程,在"课程大纲"标签页的某一小节后可点击”添加视频”。

  2. 弹出添加视频对话框,可通过视频关键字搜索已审核通过的视频媒资。

  3. 选择视频媒资,点击提交按钮,完成课程计划绑定媒资流程。

8.1.2 数据模型

课程计划绑定媒资文件后存储至课程计划teachplan绑定媒资表media

接口定义

根据业务流程,用户进入课程计划列表,首先确定向哪个课程计划添加视频,点击”添加视频”后用户选择视频,选择视频,点击提交,提交媒资文件id、文件名称、教学计划id,示例如下:


1
2
3
4
5
6
JSON
{
"mediaId": "70a98b4a2fffc89e50b101f959cc33ca",
"fileName": "22-Hmily实现TCC事务-开发bank2的confirm方法.avi",
"teachplanId": 257
}

此接口在内容管理模块content提供

model-生成DTO

在内容管理模块content定义请求参数模型类型:

注意:对象类型要和数据库的对应


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.xuecheng.contentModel.dto;

import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

/**
* @author xioaming
* @version 1.0
* @description 教学计划-媒资绑定提交数据
* @date 2023/2/3 16:14
*/
@Data
public class BindTeachplanMediaDto {
@ApiModelProperty(value = "媒资文件id", required = true)
private String mediaId;

@ApiModelProperty(value = "媒资文件名称", required = true)
private String fileName;

@ApiModelProperty(value = "课程计划标识", required = true)
private Long teachplanId;
}


service-接口开发

定义service方法

根据需求在teachplanservice中定义接口


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* @description 教学计划板顶媒资
* @param bindTeachplanMediaDto 教学计划-媒资绑定提交数据
* @return com.xuecheng.contentModel.po.TeachplanMedia
* @author xiaoming
* @date 2023/2/3 16:18
*/
public TeachplanMedia associationMedia(BindTeachplanMediaDto bindTeachplanMediaDto);

/**
* @description 删除教学计划与媒资之间的绑定关系
* @param teachPlanId 教学计划Id
* @param mediaId 媒资文件Id
* @return void
* @author xiaoming
* @date 2023/2/3 16:20
*/
public void deleteAssociationMedia(Long teachPlanId, String mediaId);

实现service方法impl


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public TeachplanMedia associationMedia(BindTeachplanMediaDto bindTeachplanMediaDto) {
//教学计划id
Long teachplanId = bindTeachplanMediaDto.getTeachplanId();
Teachplan teachplan = teachplanMapper.selectById(teachplanId);
if (teachplan == null) {
XueChengException.cast("教学计划不存在");
}
Integer grade = teachplan.getGrade();
if (grade != 2) {
XueChengException.cast("只允许第二级教学计划绑定媒资文件");
}
//课程id
Long courseId = teachplan.getCourseId();

//先删除原因该教学计划绑定的媒资
teachplanMediaMapper.delete(new LambdaQueryWrapper<TeachplanMedia>().eq(TeachplanMedia::getTeachplanId, teachplanId));

//再添加教学计划与媒资的绑定关系
TeachplanMedia teachplanMedia = new TeachplanMedia();
teachplanMedia.setCourseId(courseId);
teachplanMedia.setTeachplanId(teachplanId);
teachplanMedia.setMediaFilename(bindTeachplanMediaDto.getFileName());
teachplanMedia.setMediaId(bindTeachplanMediaDto.getMediaId());
teachplanMedia.setCreateDate(LocalDateTime.now());
teachplanMediaMapper.insert(teachplanMedia);
return teachplanMedia;
}

@Override
public void deleteAssociationMedia(Long teachPlanId, String mediaId) {
LambdaQueryWrapper<TeachplanMedia> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(TeachplanMedia::getTeachplanId, teachPlanId);
queryWrapper.eq(TeachplanMedia::getMediaId, mediaId);
teachplanMediaMapper.delete(queryWrapper);
}


api-定义接口

在teachplancontroller中添加接口:


1
2
3
4
5
6
7
8
9
@PostMapping("/teachplan/association/media")
public void associationMedia(@RequestBody BindTeachplanMediaDto bindTeachplanMediaDto){
teachplanService.associationMedia(bindTeachplanMediaDto);
}

@DeleteMapping("/teachplan/association/media/{teachPlanId}/{mediaId}")
public void deleteAssociationMedia(@PathVariable Long teachPlanId, @PathVariable String mediaId){
teachplanService.deleteAssociationMedia(teachPlanId, mediaId);
}

接口测试

  1. 使用httpclient测试

1
2
3
4
5
6
7
8
9
10
11
12
13
Plain Text
### 课程计划绑定视频
POST {{media_host}}/media/teachplan/association/media
Content-Type: application/json

{
"mediaId": "",
"fileName": "",
"teachplanId": ""
}

### 课程计划接触视频绑定
DELETE {{media_host}}/media/teachplan/association/media/{teachPlanId}/{mediaId}

  1. 前后端联调

此功能较为简单推荐直接前后端联调:重启content-api工程,向指定课程计划添加视频、解除视频绑定。

OVER~~