常用业务解决方案
约 16131 字大约 54 分钟
2025-04-28
接口日志,重放接口 AOP实现
整体目录

接口日志实体
package com.yupi.springbootinit.example.interfaceaop.model;
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* API 调用日志
*/
@Data
@TableName(value = "api_log")
public class ApiLog implements Serializable {
/**
* 主键
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 请求唯一标识
*/
@TableField(value = "request_id")
private String requestId;
/**
* 请求URL
*/
@TableField(value = "url")
private String url;
/**
* HTTP方法
*/
@TableField(value = "http_method")
private String httpMethod;
/**
* 请求IP
*/
@TableField(value = "ip")
private String ip;
/**
* 调用方法
*/
@TableField(value = "class_method")
private String classMethod;
/**
* 请求参数
*/
@TableField(value = "request_params")
private String requestParams;
/**
* 响应数据
*/
@TableField(value = "response_data")
private String responseData;
/**
* 请求耗时(ms)
*/
@TableField(value = "time_consumed")
private Long timeConsumed;
/**
* 用户ID
*/
@TableField(value = "user_id")
private String userId;
/**
* 创建时间
*/
@TableField(value = "create_time")
private Date createTime;
/**
* 更新时间
*/
@TableField(value = "update_time")
private Date updateTime;
/**
* 是否删除
*/
@TableField(value = "is_deleted")
@TableLogic
private Integer isDeleted;
@TableField(exist = false)
private static final long serialVersionUID = 1L;
}sql语句
-- 接口日志表
CREATE TABLE api_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
request_id VARCHAR(64) NOT NULL COMMENT '请求唯一标识',
url VARCHAR(255) NOT NULL COMMENT '请求URL',
http_method VARCHAR(10) NOT NULL COMMENT 'HTTP方法',
ip VARCHAR(64) NOT NULL COMMENT '请求IP',
class_method VARCHAR(255) NOT NULL COMMENT '调用方法',
request_params TEXT COMMENT '请求参数',
response_data TEXT COMMENT '响应数据',
time_consumed BIGINT COMMENT '耗时(ms)',
user_id VARCHAR(64) COMMENT '用户ID',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
is_deleted TINYINT NOT NULL DEFAULT 0 COMMENT '是否删除'
);
-- 请求记录表
CREATE TABLE IF NOT EXISTS `api_request_record` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
`url` varchar(255) NOT NULL COMMENT '请求URL',
`http_method` varchar(10) NOT NULL COMMENT 'HTTP方法',
`headers` text COMMENT '请求头',
`request_params` text COMMENT '请求参数',
`response_data` text COMMENT '响应数据',
`status` int DEFAULT NULL COMMENT '请求状态',
`time_consumed` bigint DEFAULT NULL COMMENT '耗时(ms)',
`user_id` varchar(64) DEFAULT NULL COMMENT '用户ID',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '是否删除',
PRIMARY KEY (`id`),
KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='接口请求记录表';
ALTER TABLE `api_request_record`
ADD COLUMN `content_type` varchar(128) DEFAULT NULL COMMENT '请求内容类型' AFTER `headers`,
ADD COLUMN `is_array_request` tinyint(1) DEFAULT '0' COMMENT '是否数组请求' AFTER `time_consumed`;重放日志实体
package com.yupi.springbootinit.example.interfaceaop.model;
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* 接口请求记录
*/
@Data
@TableName(value = "api_request_record")
public class ApiRequestRecord implements Serializable {
/**
* 主键
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 请求URL
*/
@TableField(value = "url")
private String url;
/**
* HTTP方法
*/
@TableField(value = "http_method")
private String httpMethod;
/**
* 请求头
*/
@TableField(value = "headers")
private String headers;
/**
* 请求参数
*/
@TableField(value = "request_params")
private String requestParams;
/**
* 请求内容类型
*/
@TableField(value = "content_type")
private String contentType;
/**
* 是否是数组请求
*/
@TableField(value = "is_array_request")
private Boolean isArrayRequest;
/**
* 响应数据
*/
@TableField(value = "response_data")
private String responseData;
/**
* 请求状态
*/
@TableField(value = "status")
private Integer status;
/**
* 耗时(ms)
*/
@TableField(value = "time_consumed")
private Long timeConsumed;
/**
* 用户ID
*/
@TableField(value = "user_id")
private String userId;
/**
* 创建时间
*/
@TableField(value = "create_time")
private Date createTime;
/**
* 更新时间
*/
@TableField(value = "update_time")
private Date updateTime;
/**
* 是否删除
*/
@TableField(value = "is_deleted")
@TableLogic
private Integer isDeleted;
@TableField(exist = false)
private static final long serialVersionUID = 1L;
}mapper层
package com.yupi.springbootinit.example.interfaceaop.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yupi.springbootinit.example.interfaceaop.model.ApiLog;
import org.apache.ibatis.annotations.Mapper;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/23
*/
@Mapper
public interface ApiLogMapper extends BaseMapper<ApiLog> {
}package com.yupi.springbootinit.example.interfaceaop.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yupi.springbootinit.example.interfaceaop.model.ApiRequestRecord;
import org.apache.ibatis.annotations.Mapper;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/23
*/
@Mapper
public interface ApiRequestRecordMapper extends BaseMapper<ApiRequestRecord> {
}service层
package com.yupi.springbootinit.example.interfaceaop.service;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/23
*/
public interface ApiReplayService {
/**
* 重放日志
* @param recordId
* @return
*/
Object replayRequest(Long recordId);
}package com.yupi.springbootinit.example.interfaceaop.service.impl;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
import cn.hutool.http.Method;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.yupi.springbootinit.common.ErrorCode;
import com.yupi.springbootinit.example.interfaceaop.mapper.ApiRequestRecordMapper;
import com.yupi.springbootinit.example.interfaceaop.model.ApiRequestRecord;
import com.yupi.springbootinit.example.interfaceaop.service.ApiReplayService;
import com.yupi.springbootinit.exception.BusinessException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;
import javax.annotation.Resource;
import java.util.Date;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/23
*/
@Service
@Slf4j
public class ApiReplayServiceImpl implements ApiReplayService {
@Resource
private ApiRequestRecordMapper apiRequestRecordMapper;
@Override
public Object replayRequest(Long recordId) {
// 获取历史请求记录
ApiRequestRecord record = apiRequestRecordMapper.selectById(recordId);
if (record == null) {
throw new BusinessException(ErrorCode.NOT_FOUND_ERROR);
}
// 创建计时器
StopWatch stopWatch = new StopWatch();
try {
stopWatch.start();
// 创建请求对象
HttpRequest request = createHttpRequest(record);
// 设置 Content-Type
String contentType = record.getContentType();
if (StringUtils.isBlank(contentType)) {
// 默认使用 application/json
contentType = "application/json";
}
request.header("Content-Type", contentType);
// 设置其他请求头
if (StringUtils.isNotBlank(record.getHeaders())) {
JSONObject headerJson = JSONUtil.parseObj(record.getHeaders());
headerJson.forEach((key, value) -> {
if (!"content-length".equalsIgnoreCase(key)) { // 跳过 content-length
request.header(key, String.valueOf(value));
}
});
}
// 设置请求体
if (StringUtils.isNotBlank(record.getRequestParams())) {
if (StringUtils.containsIgnoreCase(contentType, "application/json")) {
// JSON 格式
request.body(record.getRequestParams());
} else if (StringUtils.containsIgnoreCase(contentType, "application/x-www-form-urlencoded")) {
// 表单格式
JSONObject params = JSONUtil.parseObj(record.getRequestParams());
params.forEach((key, value) -> request.form(key, value));
} else if (StringUtils.containsIgnoreCase(contentType, "multipart/form-data")) {
// 文件上传格式
JSONObject params = JSONUtil.parseObj(record.getRequestParams());
params.forEach((key, value) -> request.form(key, value));
} else {
// 其他格式,直接设置为字符串
request.body(record.getRequestParams());
}
}
// 执行请求
HttpResponse response = request.execute();
stopWatch.stop();
// 保存重放结果
ApiRequestRecord replayRecord = new ApiRequestRecord();
BeanUtils.copyProperties(record, replayRecord);
replayRecord.setId(null);
replayRecord.setResponseData(response.body());
replayRecord.setStatus(response.getStatus());
replayRecord.setCreateTime(new Date());
replayRecord.setTimeConsumed(stopWatch.getLastTaskTimeMillis());
apiRequestRecordMapper.insert(replayRecord);
return response.body();
} catch (Exception e) {
log.error("接口重放失败", e);
throw new BusinessException(ErrorCode.SYSTEM_ERROR, "接口重放失败:" + e.getMessage());
}
}
/**
* 根据记录创建对应的 HttpRequest
*/
private HttpRequest createHttpRequest(ApiRequestRecord record) {
String method = record.getHttpMethod().toUpperCase();
switch (method) {
case "GET":
return HttpUtil.createGet(record.getUrl());
case "POST":
return HttpUtil.createPost(record.getUrl());
case "PUT":
return HttpUtil.createRequest(Method.PUT, record.getUrl());
case "DELETE":
return HttpUtil.createRequest(Method.DELETE, record.getUrl());
case "PATCH":
return HttpUtil.createRequest(Method.PATCH, record.getUrl());
case "HEAD":
return HttpUtil.createRequest(Method.HEAD, record.getUrl());
case "OPTIONS":
return HttpUtil.createRequest(Method.OPTIONS, record.getUrl());
default:
throw new BusinessException(ErrorCode.PARAMS_ERROR, "不支持的 HTTP 方法:" + record.getHttpMethod());
}
}
}日志注解
package com.yupi.springbootinit.example.interfaceaop.annotation;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/23
*/
import java.lang.annotation.*;
/**
* 接口日志注解
*/
@Target(ElementType.METHOD) // 作用在方法上
@Retention(RetentionPolicy.RUNTIME) // 运行时可见
@Documented // 生成文档
public @interface ApiLog {
/**
* 接口描述
*/
String value() default "";
/**
* 是否记录请求参数
*/
boolean logParams() default true;
/**
* 是否记录响应结果
*/
boolean logResponse() default true;
}package com.yupi.springbootinit.example.interfaceaop.aop;
import cn.hutool.json.JSONUtil;
import com.yupi.springbootinit.example.interfaceaop.mapper.ApiLogMapper;
import com.yupi.springbootinit.example.interfaceaop.model.ApiLog;
import com.yupi.springbootinit.model.entity.User;
import com.yupi.springbootinit.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import java.util.List;
import java.util.UUID;
import static com.yupi.springbootinit.utils.NetUtils.getIpAddress;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/23
*/
@Aspect
@Component
@Slf4j
public class ApiLogAspect {
@Resource
private ApiLogMapper apiLogMapper;
@Resource
private UserService userService;
@Pointcut("@annotation(com.yupi.springbootinit.example.interfaceaop.annotation.ApiLog)")
public void apiLogPointcut() {
}
@Around("apiLogPointcut()")
public Object around(ProceedingJoinPoint point) throws Throwable {
long startTime = System.currentTimeMillis();
String requestId = UUID.randomUUID().toString();
// 获取请求信息
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = attributes.getRequest();
User loginUser = userService.getLoginUser(request);
// 记录请求信息
String url = request.getRequestURL().toString();
String httpMethod = request.getMethod();
String ip = getIpAddress(request);
String classMethod = point.getSignature().getDeclaringTypeName() + "." + point.getSignature().getName();
// 记录请求参数
Object[] args = point.getArgs();
String requestParams = null;
if (args != null && args.length > 0) {
// 检查参数类型并正确序列化
if (args[0] instanceof List) {
requestParams = JSONUtil.toJsonStr(args[0]); // 数组格式
} else {
requestParams = JSONUtil.toJsonStr(args[0]); // 对象格式
}
}
// 执行目标方法
Object result = null;
try {
result = point.proceed();
return result;
} finally {
// 记录响应信息
long endTime = System.currentTimeMillis();
long timeConsumed = endTime - startTime;
// 保存日志到数据库
ApiLog apiLog = new ApiLog();
apiLog.setRequestId(requestId);
apiLog.setUrl(url);
apiLog.setHttpMethod(httpMethod);
apiLog.setIp(ip);
apiLog.setClassMethod(classMethod);
apiLog.setRequestParams(requestParams);
apiLog.setResponseData(result != null ? JSONUtil.toJsonStr(result) : null);
apiLog.setTimeConsumed(timeConsumed);
apiLog.setUserId(String.valueOf(loginUser.getId()));
apiLogMapper.insert(apiLog);
}
}
}日志重放注解
package com.yupi.springbootinit.example.interfaceaop.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/23
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Replayable {
String value() default ""; // 接口描述
}package com.yupi.springbootinit.example.interfaceaop.aop;
import cn.hutool.json.JSONUtil;
import com.yupi.springbootinit.example.interfaceaop.mapper.ApiRequestRecordMapper;
import com.yupi.springbootinit.example.interfaceaop.model.ApiRequestRecord;
import com.yupi.springbootinit.model.entity.User;
import com.yupi.springbootinit.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/23
*/
@Aspect
@Component
@Slf4j
public class ReplayableAspect {
@Resource
private ApiRequestRecordMapper apiRequestRecordMapper;
@Resource
private UserService userService;
@Pointcut("@annotation(com.yupi.springbootinit.example.interfaceaop.annotation.Replayable)")
public void replayablePointcut() {}
@Around("replayablePointcut()")
public Object around(ProceedingJoinPoint point) throws Throwable {
long startTime = System.currentTimeMillis();
// 获取请求信息
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = attributes.getRequest();
User loginUser = userService.getLoginUser(request);
// 记录请求信息
ApiRequestRecord record = new ApiRequestRecord();
record.setUrl(request.getRequestURL().toString());
record.setHttpMethod(request.getMethod());
record.setHeaders(getHeadersJson(request));
record.setRequestParams(getRequestParams(request, point));
record.setUserId(String.valueOf(loginUser.getId()));
Object result = null;
try {
// 执行目标方法
result = point.proceed();
record.setStatus(200);
record.setResponseData(JSONUtil.toJsonStr(result));
} catch (Exception e) {
record.setStatus(500);
record.setResponseData(e.getMessage());
throw e;
} finally {
// 记录耗时
record.setTimeConsumed(System.currentTimeMillis() - startTime);
// 保存记录
apiRequestRecordMapper.insert(record);
}
return result;
}
private String getHeadersJson(HttpServletRequest request) {
Map<String, String> headers = new HashMap<>();
Enumeration<String> headerNames = request.getHeaderNames();
while (headerNames.hasMoreElements()) {
String headerName = headerNames.nextElement();
headers.put(headerName, request.getHeader(headerName));
}
return JSONUtil.toJsonStr(headers);
}
private String getRequestParams(HttpServletRequest request, ProceedingJoinPoint point) {
if ("POST".equalsIgnoreCase(request.getMethod())) {
Object[] args = point.getArgs();
// 如果只有一个参数,直接转换该参数对象
if (args != null && args.length == 1) {
return JSONUtil.toJsonStr(args[0]);
}
// 如果有多个参数,保持数组格式
return JSONUtil.toJsonStr(args);
} else {
return JSONUtil.toJsonStr(request.getParameterMap());
}
}
}控制层
package com.yupi.springbootinit.example.interfaceaop.controller;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.yupi.springbootinit.common.BaseResponse;
import com.yupi.springbootinit.common.ResultUtils;
import com.yupi.springbootinit.example.interfaceaop.model.ApiRequestRecord;
import com.yupi.springbootinit.example.interfaceaop.service.ApiReplayService;
import com.yupi.springbootinit.example.interfaceaop.mapper.ApiRequestRecordMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.List;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/23
*/
@RestController
@RequestMapping("/api/replay")
@Slf4j
public class ApiReplayController {
@Resource
private ApiReplayService apiReplayService;
@Resource
private ApiRequestRecordMapper apiRequestRecordMapper;
@PostMapping("/execute")
public BaseResponse<Object> replayRequest(@RequestParam Long recordId) {
return ResultUtils.success(apiReplayService.replayRequest(recordId));
}
@GetMapping("/records")
public BaseResponse<List<ApiRequestRecord>> getReplayableRecords() {
LambdaQueryWrapper<ApiRequestRecord> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.orderByDesc(ApiRequestRecord::getCreateTime);
return ResultUtils.success(apiRequestRecordMapper.selectList(queryWrapper));
}
}测试
测试请求参数
package com.yupi.springbootinit.example.interfaceaop.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.Pattern;
import java.io.Serializable;
import java.util.Date;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/23
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TestRequest implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 名称
*/
private String name;
/**
* 年龄
*/
private Integer age;
/**
* 性别(0-男, 1-女)
*/
private Integer gender;
/**
* 邮箱
*/
@Pattern(regexp = "^[A-Za-z0-9+_.-]+@(.+)$", message = "邮箱格式不正确")
private String email;
/**
* 手机号
*/
@Pattern(regexp = "^1[3-9]\\d{9}$", message = "手机号格式不正确")
private String phone;
/**
* 创建时间
*/
private Date createTime;
/**
* 更新时间
*/
private Date updateTime;
/**
* 额外信息(JSON格式)
*/
private String extraInfo;
/**
* 参数校验
*/
private String content;
}测试控制层
package com.yupi.springbootinit.example.interfaceaop.controller;
import com.yupi.springbootinit.common.BaseResponse;
import com.yupi.springbootinit.common.ResultUtils;
import com.yupi.springbootinit.example.interfaceaop.annotation.ApiLog;
import com.yupi.springbootinit.example.interfaceaop.annotation.Replayable;
import com.yupi.springbootinit.example.interfaceaop.model.TestRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/23
*/
@RestController
@RequestMapping("/api")
@Slf4j
public class TestController {
@Replayable("可重放的测试接口")
@ApiLog
@PostMapping("/test")
public BaseResponse<String> test(@RequestBody TestRequest request) {
// 业务逻辑
log.info("收到请求: {}", request);
return ResultUtils.success("ok");
}
}执行testcontroller.test

日志表有信息

重发接口信息表

查询最近的接口信息

测试重发接口

调用成功


callback
使用反射(可使用jdk或cglib代理) + transactionTemplate事务管理 + ServiceUtils.commonExec统一的service入口
选择建议:
如果目标类有接口,优先使用JDK动态代理
符合面向接口编程原则
代理生成速度快
内存占用少
如果目标类没有接口,使用CGLIB代理
运行时性能好
不需要额外定义接口
特殊情况考虑:
如果对性能要求特别高,可以考虑CGLIB
如果内存资源紧张,建议JDK动态代理
如果类有final方法,必须用JDK动态代理
sql语句
CREATE TABLE service_processor_config (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
serviceName VARCHAR(100) NOT NULL COMMENT '服务名称',
methodName VARCHAR(100) NOT NULL COMMENT '方法名称',
processorName VARCHAR(100) NOT NULL COMMENT '处理器名称',
timing VARCHAR(10) NOT NULL COMMENT '执行时机(before/after)',
isAsync TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否异步(0-同步,1-异步)',
ignoreError TINYINT(1) NOT NULL DEFAULT 1 COMMENT '是否忽略错误(0-不忽略,1-忽略)',
status TINYINT(1) NOT NULL DEFAULT 1 COMMENT '是否启用(0-禁用,1-启用)',
createTime DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updateTime DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
isDelete TINYINT NOT NULL DEFAULT 0 COMMENT '是否删除'
) COMMENT '服务处理器配置表';
INSERT INTO service_processor_config
(serviceName, methodName, processorName, timing, isAsync, ignoreError, status)
VALUES
('testService', 'testParameters', 'testNotReturnCallback', 'before', 1, 1, 1);
INSERT INTO service_processor_config
(serviceName, methodName, processorName, timing, isAsync, ignoreError, status)
VALUES
('testService', 'testNotReturn', 'testNotReturnCallback', 'before', 1, 1, 1);请求参数
package com.yupi.springbootinit.example.commonserviceExec.model.dto;
import lombok.Data;
import javax.validation.constraints.NotBlank;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/24
*/
@Data
public class CommonServiceRequest<T> {
/**
* 服务名称 (例如: "userService")
*/
@NotBlank(message = "服务名称不能为空")
private String serviceName;
/**
* 方法名称
*/
@NotBlank(message = "方法名称不能为空")
private String methodName;
/**
* 方法参数
*/
private T requestData;
}调用方法参数实体
package com.yupi.springbootinit.example.commonserviceExec.model.entity;
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.util.Date;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/24
*/
@Data
@TableName("service_processor_config")
public class ServiceProcessorConfig {
@TableId(type = IdType.AUTO)
private Long id;
private String serviceName;
private String methodName;
private String processorName;
private String timing;
private Boolean isAsync;
private Boolean ignoreError;
private Boolean status;
private Date createTime;
private Date updateTime;
/**
* 是否删除
*/
@TableLogic
private Integer isDelete;
@TableField(exist = false)
private static final long serialVersionUID = 1L;
}配置信息mapper
package com.yupi.springbootinit.example.commonserviceExec.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yupi.springbootinit.example.commonserviceExec.model.entity.ServiceProcessorConfig;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface ServiceProcessorConfigMapper extends BaseMapper<ServiceProcessorConfig> {
}获取配置信息service
package com.yupi.springbootinit.example.commonserviceExec.service;
import com.yupi.springbootinit.example.commonserviceExec.model.entity.ServiceProcessorConfig;
import java.util.List;
/**
* @author tuaofei
* @description 处理器配置服务
* @date 2024/12/24
*/
public interface ProcessorConfigService {
List<ServiceProcessorConfig> getConfigs(String serviceName, String methodName);
}package com.yupi.springbootinit.example.commonserviceExec.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.yupi.springbootinit.example.commonserviceExec.mapper.ServiceProcessorConfigMapper;
import com.yupi.springbootinit.example.commonserviceExec.model.entity.ServiceProcessorConfig;
import com.yupi.springbootinit.example.commonserviceExec.service.ProcessorConfigService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/24
*/
@Service
public class ProcessorConfigServiceImpl implements ProcessorConfigService {
@Resource
private ServiceProcessorConfigMapper configMapper;
@Override
public List<ServiceProcessorConfig> getConfigs(String serviceName, String methodName) {
return configMapper.selectList(new QueryWrapper<ServiceProcessorConfig>()
.eq("serviceName", serviceName)
.eq("methodName", methodName)
.eq("status", true));
}
}处理器接口
package com.yupi.springbootinit.example.commonserviceExec.service;
import com.yupi.springbootinit.common.BaseResponse;
import com.yupi.springbootinit.example.commonserviceExec.model.dto.CommonServiceRequest;
/**
* @author tuaofei
* @description 处理器接口
* @date 2024/12/24
*/
public interface ServiceProcessor {
/**
* 处理方法
*/
void process(String serviceName, String methodName,
CommonServiceRequest<?> request, BaseResponse<?> response);
}工具类
package com.yupi.springbootinit.example.commonserviceExec.utils;
import com.yupi.springbootinit.common.BaseResponse;
import com.yupi.springbootinit.common.ErrorCode;
import com.yupi.springbootinit.common.ResultUtils;
import com.yupi.springbootinit.example.commonserviceExec.model.dto.CommonServiceRequest;
import com.yupi.springbootinit.example.commonserviceExec.model.entity.ServiceProcessorConfig;
import com.yupi.springbootinit.example.commonserviceExec.service.ProcessorConfigService;
import com.yupi.springbootinit.example.commonserviceExec.service.ServiceProcessor;
import com.yupi.springbootinit.exception.BusinessException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/24
*/
@Component
@Slf4j
public class ServiceUtils implements ApplicationContextAware {
private static ThreadPoolTaskExecutor taskExecutor;
private static TransactionTemplate transactionTemplate;
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
applicationContext = context;
}
@Resource
public void setTransactionTemplate(TransactionTemplate transactionTemplate) {
ServiceUtils.transactionTemplate = transactionTemplate;
}
@Resource
public void setTaskExecutor(ThreadPoolTaskExecutor taskExecutor) {
ServiceUtils.taskExecutor = taskExecutor;
}
/**
* 通用服务方法执行
*
* @param request 请求参数
* @return 执行结果
*/
@SuppressWarnings("unchecked")
public static <T> BaseResponse<T> commonExec(CommonServiceRequest<?> request) {
return transactionTemplate.execute(status -> {
try {
String serviceName = request.getServiceName();
String methodName = request.getMethodName();
// 获取处理器配置
ProcessorConfigService configService = applicationContext.getBean(ProcessorConfigService.class);
List<ServiceProcessorConfig> configs = configService.getConfigs(serviceName, methodName);
// 1. 获取服务实例
Object service = applicationContext.getBean(serviceName);
// 2. 获取方法
Method method = service.getClass().getMethod(methodName,
CommonServiceRequest.class, BaseResponse.class);
// 3. 创建响应对象
BaseResponse<T> response = new BaseResponse<>(ErrorCode.SUCCESS);
// 执行前置处理器
executeProcessors(configs, "before", serviceName, methodName, request, response);
// 4. 执行方法
Object result = method.invoke(service, request, response);
// 执行后置处理器
executeProcessors(configs, "after", serviceName, methodName, request, response);
return ResultUtils.success((T) result);
} catch (Exception e) {
log.error("Service execution error", e);
status.setRollbackOnly();
return ResultUtils.error(ErrorCode.SYSTEM_ERROR, e.getMessage());
}
});
}
public static void executeProcessors(List<ServiceProcessorConfig> configs, String timing,
String serviceName, String methodName,
CommonServiceRequest<?> request,
BaseResponse<?> response) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (ServiceProcessorConfig config : configs) {
if (!config.getTiming().equals(timing)) {
continue;
}
ServiceProcessor processor = applicationContext.getBean(config.getProcessorName(), ServiceProcessor.class);
if (config.getIsAsync()) {
// 异步执行
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
// 在新的事务中执行处理器
transactionTemplate.execute(processorStatus -> {
try {
processor.process(serviceName, methodName, request, response);
return null;
} catch (Exception e) {
if (!config.getIgnoreError()) {
// 如果不忽略异常,设置事务回滚并抛出异常
processorStatus.setRollbackOnly();
throw new BusinessException(ErrorCode.SYSTEM_ERROR, e.getMessage());
}
// 如果忽略异常,只记录日志
log.error("Async processor execution error", e);
return null;
}
});
} catch (Exception e) {
if (!config.getIgnoreError()) {
// 外层异常处理,如果不忽略异常则抛出
throw new BusinessException(ErrorCode.SYSTEM_ERROR, e.getMessage());
}
log.error("Async processor execution error", e);
}
}, taskExecutor);
futures.add(future);
} else {
// 同步执行
try {
processor.process(serviceName, methodName, request, response);
} catch (Exception e) {
if (!config.getIgnoreError()) {
throw new BusinessException(ErrorCode.SYSTEM_ERROR, e.getMessage());
}
log.error("Sync processor execution error", e);
}
}
}
// 等待所有异步任务完成
if (!futures.isEmpty()) {
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new BusinessException(ErrorCode.SYSTEM_ERROR, e.getMessage());
}
}
}
}具体service的callback
package com.yupi.springbootinit.example.commonserviceExec.service.impl;
import com.yupi.springbootinit.common.BaseResponse;
import com.yupi.springbootinit.example.commonserviceExec.model.dto.CommonServiceRequest;
import com.yupi.springbootinit.example.commonserviceExec.service.ServiceProcessor;
import com.yupi.springbootinit.model.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/24
*/
@Slf4j
@Component("TestParametersCallback")
public class TestParametersCallback implements ServiceProcessor {
@Override
public void process(String serviceName, String methodName,
CommonServiceRequest<?> request, BaseResponse<?> response) {
log.info("Service: {}, Method: {}, Request: {}, Response: {}",
serviceName, methodName, request, response);
Object requestData = request.getRequestData();
if (requestData instanceof User) {
int i = 1 / 0;
log.info("模拟执行 testNotReturnCallback ,参数:{}", requestData);
}
}
}package com.yupi.springbootinit.example.commonserviceExec.service.impl;
import com.yupi.springbootinit.common.BaseResponse;
import com.yupi.springbootinit.example.commonserviceExec.model.dto.CommonServiceRequest;
import com.yupi.springbootinit.example.commonserviceExec.service.ServiceProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/24
*/
@Slf4j
@Component("testNotReturnCallback")
public class testNotReturnCallback implements ServiceProcessor {
@Override
public void process(String serviceName, String methodName,
CommonServiceRequest<?> request, BaseResponse<?> response) {
log.info("Service: {}, Method: {}, Request: {}, Response: {}",
serviceName, methodName, request, response);
int i = 1 / 0;
log.info("模拟执行 {testNotReturnCallback}");
}
}测试service实现类
package com.yupi.springbootinit.example.commonserviceExec.service;
import com.yupi.springbootinit.common.BaseResponse;
import com.yupi.springbootinit.example.commonserviceExec.model.dto.CommonServiceRequest;
import com.yupi.springbootinit.model.entity.User;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/24
*/
public interface TestService {
void testNotReturn(CommonServiceRequest request, BaseResponse response);
BaseResponse<User> testParameters(CommonServiceRequest<User> request, BaseResponse<User> response);
}package com.yupi.springbootinit.example.commonserviceExec.service.impl;
import com.yupi.springbootinit.common.BaseResponse;
import com.yupi.springbootinit.common.ResultUtils;
import com.yupi.springbootinit.example.commonserviceExec.model.dto.CommonServiceRequest;
import com.yupi.springbootinit.example.commonserviceExec.service.TestService;
import com.yupi.springbootinit.model.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/24
*/
@Service("testService")
@Slf4j
public class TestServiceImpl implements TestService {
@Override
public void testNotReturn(CommonServiceRequest request, BaseResponse response) {
log.info("exec testNotReturn");
}
@Override
public BaseResponse<User> testParameters(CommonServiceRequest<User> request, BaseResponse<User> response) {
log.info("exec testParameters");
User user = request.getRequestData();
user.setUserName("xiaofei");
return ResultUtils.success(user);
}
}测试controller
package com.yupi.springbootinit.example.commonserviceExec.controller;
import com.yupi.springbootinit.common.BaseResponse;
import com.yupi.springbootinit.example.commonserviceExec.model.dto.CommonServiceRequest;
import com.yupi.springbootinit.example.commonserviceExec.utils.ServiceUtils;
import com.yupi.springbootinit.model.entity.User;
import org.apache.poi.ss.formula.functions.T;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/24
*/
@RestController("service")
@RequestMapping("/common/service")
public class TestController {
@PostMapping("/exec")
public BaseResponse<T> commonExec(@RequestBody CommonServiceRequest request) {
return ServiceUtils.commonExec(request);
}
@PostMapping("/textExec")
public BaseResponse<T> textExec() {
CommonServiceRequest serviceRequest = new CommonServiceRequest();
serviceRequest.setServiceName("testService");
serviceRequest.setMethodName("testNotReturn");
return commonExec(serviceRequest);
}
@PostMapping("/testParametersExec")
public BaseResponse<T> testParametersExec() {
User user = new User();
CommonServiceRequest serviceRequest = new CommonServiceRequest();
serviceRequest.setServiceName("testService");
serviceRequest.setMethodName("testParameters");
serviceRequest.setRequestData(user);
return commonExec(serviceRequest);
}
}可配置测试信息

测试异步执行,执行时机在方法之前,不忽略异常,主线程是否中断
测试异步执行,执行时机在方法之前,忽略异常,主线程是否中断
多数据源管理
继承 AbstractRoutingDataSource 抽象类
application.yml配置
spring:
datasource:
master:
jdbc-url: jdbc:mysql://localhost:3306/my_db?serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: root
lovefinder:
jdbc-url: jdbc:mysql://localhost:3306/lovefinder?serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: root数据源类型枚举
package com.yupi.springbootinit.config.datasource;
/**
* @author tuaofei
* @description 数据源类型枚举
* @date 2024/12/24
*/
public enum DataSourceType {
MASTER,
LOVEFINDER
}数据源配置类
package com.yupi.springbootinit.config.datasource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
/**
* @author tuaofei
* @description 数据源配置类
* @date 2024/12/24
*/
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties("spring.datasource.master")
public DataSource masterDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties("spring.datasource.lovefinder")
public DataSource lovefinderDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@Primary
public DataSource dynamicDataSource() {
Map<Object, Object> targetDataSources = new HashMap<>(2);
targetDataSources.put(DataSourceType.MASTER.name(), masterDataSource());
targetDataSources.put(DataSourceType.LOVEFINDER.name(), lovefinderDataSource());
return new DynamicDataSource(masterDataSource(), targetDataSources);
}
}动态数据源实现
package com.yupi.springbootinit.config.datasource;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import javax.sql.DataSource;
import java.util.Map;
/**
* @author tuaofei
* @description 动态数据源实现
* @date 2024/12/24
*/
public class DynamicDataSource extends AbstractRoutingDataSource {
/**
* 构造方法,初始化动态数据源
*
* @param defaultTargetDataSource 默认数据源
* @param targetDataSources 目标数据源集合
*/
public DynamicDataSource(DataSource defaultTargetDataSource, Map<Object, Object> targetDataSources) {
// 设置默认数据源
super.setDefaultTargetDataSource(defaultTargetDataSource);
// 设置数据源集合
super.setTargetDataSources(targetDataSources);
// 执行afterPropertiesSet方法,完成属性初始化
super.afterPropertiesSet();
}
/**
* 获取当前数据源的key
*
* @return 数据源key
*/
@Override
protected Object determineCurrentLookupKey() {
// 从ThreadLocal中获取当前数据源的key
return DataSourceContextHolder.getDataSource();
}
}数据源上下文
package com.yupi.springbootinit.config.datasource;
/**
* @author tuaofei
* @description 数据源上下文
* @date 2024/12/24
*/
public class DataSourceContextHolder {
/**
* 使用ThreadLocal存储当前线程的数据源key
*/
private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();
/**
* 设置数据源
*
* @param dataSourceType 数据源类型
*/
public static void setDataSource(DataSourceType dataSourceType) {
CONTEXT_HOLDER.set(dataSourceType.name());
}
/**
* 获取数据源
*
* @return 数据源key
*/
public static String getDataSource() {
return CONTEXT_HOLDER.get();
}
/**
* 清除数据源
*/
public static void clearDataSource() {
CONTEXT_HOLDER.remove();
}
}MyBatis Plus 配置
package com.yupi.springbootinit.config;
import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import javax.annotation.Resource;
import javax.sql.DataSource;
/**
* MyBatis Plus 配置
*/
@Configuration
@MapperScan("com.yupi.springbootinit.mapper")
public class MyBatisPlusConfig {
@Resource(name = "dynamicDataSource")
private DataSource dynamicDataSource;
@Bean
public PlatformTransactionManager transactionManager() {
return new DataSourceTransactionManager(dynamicDataSource);
}
}数据源切换切面
package com.yupi.springbootinit.aop;
import com.yupi.springbootinit.annotation.DataSource;
import com.yupi.springbootinit.config.datasource.DataSourceContextHolder;
import com.yupi.springbootinit.config.datasource.DataSourceType;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
/**
* @author tuaofei
* @description 数据源切换切面
* @date 2024/12/24
*/
@Slf4j
@Aspect
@Component
public class DataSourceAspect {
@Pointcut("@annotation(com.yupi.springbootinit.annotation.DataSource)")
public void apiLogPointcut() {
}
@Around("apiLogPointcut()")
public Object around(ProceedingJoinPoint point) throws Throwable {
MethodSignature signature = (MethodSignature) point.getSignature();
DataSource dataSource = signature.getMethod().getAnnotation(DataSource.class);
if (dataSource != null) {
DataSourceType dataSourceType = dataSource.value();
log.info("切换数据源到: {}", dataSourceType);
DataSourceContextHolder.setDataSource(dataSourceType);
}
try {
return point.proceed();
} finally {
log.info("清除数据源配置");
DataSourceContextHolder.clearDataSource();
}
}
}数据源切换注解
package com.yupi.springbootinit.annotation;
import com.yupi.springbootinit.config.datasource.DataSourceType;
import java.lang.annotation.*;
/**
* @author tuaofei
* @description 数据源切换注解
* @date 2024/12/24
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataSource {
DataSourceType value() default DataSourceType.MASTER;
}service测试
package com.yupi.springbootinit.service;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.IService;
import com.yupi.springbootinit.annotation.DataSource;
import com.yupi.springbootinit.config.datasource.DataSourceType;
import com.yupi.springbootinit.model.dto.user.UserQueryRequest;
import com.yupi.springbootinit.model.entity.User;
import com.yupi.springbootinit.model.vo.LoginUserVO;
import com.yupi.springbootinit.model.vo.UserVO;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import me.chanjar.weixin.common.bean.WxOAuth2UserInfo;
/**
* 用户服务
*
* @author <a href="https://github.com/liyupi">程序员鱼皮</a>
* @from <a href="https://yupi.icu">编程导航知识星球</a>
*/
public interface UserService extends IService<User> {
/**
* 同步用户数据(主库->从库)
* @return
*/
boolean synchronizationUser();
/**
* 执行插入操作,向主库和从库
* @return
*/
boolean execInsert();
/**
* 查询最新的用户
* @return
*/
User queryLastNewUser();
}package com.yupi.springbootinit.service.impl;
import static com.yupi.springbootinit.constant.UserConstant.USER_LOGIN_STATE;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.yupi.springbootinit.annotation.DataSource;
import com.yupi.springbootinit.common.ErrorCode;
import com.yupi.springbootinit.config.datasource.DataSourceContextHolder;
import com.yupi.springbootinit.config.datasource.DataSourceType;
import com.yupi.springbootinit.constant.CommonConstant;
import com.yupi.springbootinit.exception.BusinessException;
import com.yupi.springbootinit.mapper.UserMapper;
import com.yupi.springbootinit.model.dto.user.UserQueryRequest;
import com.yupi.springbootinit.model.entity.User;
import com.yupi.springbootinit.model.enums.UserRoleEnum;
import com.yupi.springbootinit.model.vo.LoginUserVO;
import com.yupi.springbootinit.model.vo.UserVO;
import com.yupi.springbootinit.service.UserService;
import com.yupi.springbootinit.utils.SqlUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.common.bean.WxOAuth2UserInfo;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.DigestUtils;
/**
* 用户服务实现
*
* @author <a href="https://github.com/liyupi">程序员鱼皮</a>
* @from <a href="https://yupi.icu">编程导航知识星球</a>
*/
@Service
@Slf4j
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {
@Override
public boolean synchronizationUser() {
try {
DataSourceContextHolder.setDataSource(DataSourceType.MASTER);
QueryWrapper<User> wrapper = new QueryWrapper<>();
List<User> userPoList = list(wrapper);
if (CollectionUtil.isEmpty(userPoList)) {
log.info("主库无数据!");
return false;
}
DataSourceContextHolder.setDataSource(DataSourceType.LOVEFINDER);
boolean savedBatch = saveBatch(userPoList);
return savedBatch;
} finally {
DataSourceContextHolder.clearDataSource();
}
}
@Override
public boolean execInsert() {
User user = queryLastNewUser();
if (user != null){
user.setId(null);
}
boolean save = save(user);
// int i = 1/0;
return save;
}
@Override
@DataSource(DataSourceType.LOVEFINDER)
public User queryLastNewUser() {
Page<User> page = new Page<>(1, 1);
QueryWrapper<User> wrapper = new QueryWrapper<>();
wrapper.orderByDesc("createTime");
Page<User> userPage = userMapper.selectPage(page, wrapper);
List<User> records = userPage.getRecords();
return records.get(0);
}
}Controller测试
package com.yupi.springbootinit.example.datasource.controller;
import com.yupi.springbootinit.common.BaseResponse;
import com.yupi.springbootinit.common.ErrorCode;
import com.yupi.springbootinit.common.ResultUtils;
import com.yupi.springbootinit.model.entity.User;
import com.yupi.springbootinit.service.impl.UserServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/24
*/
@RestController("dataSource")
@RequestMapping("/dataSource")
public class DataSourceController {
@Autowired
private UserServiceImpl userService;
@PostMapping("/test1")
public BaseResponse<String> test1() {
boolean save = userService.synchronizationUser();
if (save) {
return ResultUtils.success("插入成功");
}
return ResultUtils.error(ErrorCode.SYSTEM_ERROR, "插入失败");
}
@PostMapping("/test2")
public BaseResponse<String> test2() {
boolean insert = userService.execInsert();
if (insert) {
return ResultUtils.success("插入成功");
}
return ResultUtils.error(ErrorCode.SYSTEM_ERROR, "插入失败");
}
@PostMapping("/test3")
public BaseResponse<User> test3() {
User user = userService.queryLastNewUser();
if (user != null) {
return ResultUtils.success(user);
}
return ResultUtils.error(ErrorCode.SYSTEM_ERROR, "查询失败");
}
}失效场景
@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {
// 这种情况下,@DataSource 注解不会生效,因为是类内部调用
public void methodA() {
methodB(); // 直接调用不会触发AOP
}
@DataSource(DataSourceType.LOVEFINDER)
public void methodB() {
// ...
}
// 解决方案1:使用AopContext获取代理对象
public void methodA() {
((UserServiceImpl) AopContext.currentProxy()).methodB();
}
// 解决方案2:注入自己
@Resource
private UserService userService; // 注入接口而不是实现类
public void methodA() {
userService.methodB(); // 通过代理对象调用
}
}接口调用-签名认证-AK/SK
AK/SK认证机制概述
AK (Access Key): 访问标识,相当于用户名
SK (Secret Key): 密钥,用于生成签名的密钥,必须保密
// 1. 获取请求头中的关键参数
String accessKey = headers.getFirst("accessKey"); // 访问密钥
String secretKey = user.getSecretKey(); // 密钥(从用户信息中获取)
String timestamp = headers.getFirst("timestamp"); // 时间戳
String sign = headers.getFirst("sign"); // 请求签名
String body = headers.getFirst("body"); // 请求体accessKey/secretKey创建用户时生成:
/**
* Return a hexadecimal string representation of the MD5 digest of the given bytes.
* @param bytes the bytes to calculate the digest over
* @return a hexadecimal digest string
*/
public static String md5DigestAsHex(byte[] bytes) {
return digestAsHexString(MD5_ALGORITHM_NAME, bytes);
}
/**
* 盐值,混淆密码
*/
String SALT = "api";
/**
* ak/sk 混淆
*/
String VOUCHER = "accessKey_secretKey";
// ak/sk
String accessKey = DigestUtils.md5DigestAsHex((userAccount + UserConstant.SALT + UserConstant.VOUCHER).getBytes());
String secretKey = DigestUtils.md5DigestAsHex((UserConstant.SALT + UserConstant.VOUCHER + userAccount).getBytes());安全校验步骤
1.参数完整性校验
if (StringUtils.isAnyBlank(body, sign, accessKey, timestamp)) {
throw new BusinessException(ResponseCode.FORBIDDEN_ERROR);
}2.防重放攻击
/**
* 五分钟过期时间
*/
private static final long FIVE_MINUTES = 5L * 60;
long currentTime = System.currentTimeMillis() / 1000;
if (currentTime - Long.parseLong(timestamp) >= FIVE_MINUTES) {
throw new BusinessException(ResponseCode.NOT_LOGIN_ERROR, "会话已过期,请重试!");
}3.AK验证
//通过访问密钥获取用户信息
user = dubboUserService.getInvokeUserByAccessKey(accessKey);
if (!user.getAccessKey().equals(accessKey)) {
throw new BusinessException(ResponseCode.NO_AUTH_ERROR, "请先获取请求密钥");
}4.签名验证
public static String getSign(String body, String secretKey) {
return MD5.create().digestHex(JSONUtil.toJsonStr(body) + '.' + secretKey);
}
if (!SignUtil.getSign(body, user.getSecretKey()).equals(sign)) {
throw new BusinessException(ResponseCode.NO_AUTH_ERROR, "非法请求");
}签名生成原理
客户端使用相同的算法,用 body 和 SK 生成签名
服务端用相同的参数和算法生成签名,与客户端传来的签名比对
签名一致才说明:
请求确实来自持有正确 SK 的客户端
请求参数在传输过程中未被篡改
安全特点
防篡改:任何参数被修改都会导致签名验证失败
防重放:使用时间戳确保请求的时效性
身份认证:通过 AK/SK 确保调用者身份
密钥安全:SK 不在网络传输,只用于本地签名
最佳实践
SK 必须妥善保管,不能泄露
时间戳应使用标准时间
签名算法要足够安全(通常使用 HMAC-SHA256 等)
关键操作要使用 HTTPS
Quartz
Spring Boot 官方文档:https://docs.spring.io/spring-boot/docs/2.1.0.RELEASE/reference/htmlsingle/#boot-features-quartz
Quartz 官方文档:http://www.quartz-scheduler.org/documentation/quartz-2.2.x/quick-start.html
Quartz 重复调度问题:https://segmentfault.com/a/1190000015492260
关于Quartz定时任务状态 (在
QRTZ_TRIGGERS表中的TRIGGER_STATE字段)
image-20241225124042872
升级spring boot异常:DataSource name not set-CSDN博客
目录结构

pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>创建表
mysql
#
# Quartz seems to work best with the driver mm.mysql-2.0.7-bin.jar
#
# PLEASE consider using mysql with innodb tables to avoid locking issues
#
# In your Quartz properties file, you'll need to set
# org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#
DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;
CREATE TABLE QRTZ_JOB_DETAILS
(
SCHED_NAME VARCHAR(120) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
JOB_CLASS_NAME VARCHAR(250) NOT NULL,
IS_DURABLE VARCHAR(1) NOT NULL,
IS_NONCONCURRENT VARCHAR(1) NOT NULL,
IS_UPDATE_DATA VARCHAR(1) NOT NULL,
REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
);
CREATE TABLE QRTZ_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
NEXT_FIRE_TIME BIGINT(13) NULL,
PREV_FIRE_TIME BIGINT(13) NULL,
PRIORITY INTEGER NULL,
TRIGGER_STATE VARCHAR(16) NOT NULL,
TRIGGER_TYPE VARCHAR(8) NOT NULL,
START_TIME BIGINT(13) NOT NULL,
END_TIME BIGINT(13) NULL,
CALENDAR_NAME VARCHAR(200) NULL,
MISFIRE_INSTR SMALLINT(2) NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
);
CREATE TABLE QRTZ_SIMPLE_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
REPEAT_COUNT BIGINT(7) NOT NULL,
REPEAT_INTERVAL BIGINT(12) NOT NULL,
TIMES_TRIGGERED BIGINT(10) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_CRON_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
CRON_EXPRESSION VARCHAR(200) NOT NULL,
TIME_ZONE_ID VARCHAR(80),
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_SIMPROP_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
STR_PROP_1 VARCHAR(512) NULL,
STR_PROP_2 VARCHAR(512) NULL,
STR_PROP_3 VARCHAR(512) NULL,
INT_PROP_1 INT NULL,
INT_PROP_2 INT NULL,
LONG_PROP_1 BIGINT NULL,
LONG_PROP_2 BIGINT NULL,
DEC_PROP_1 NUMERIC(13,4) NULL,
DEC_PROP_2 NUMERIC(13,4) NULL,
BOOL_PROP_1 VARCHAR(1) NULL,
BOOL_PROP_2 VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_BLOB_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
BLOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_CALENDARS
(
SCHED_NAME VARCHAR(120) NOT NULL,
CALENDAR_NAME VARCHAR(200) NOT NULL,
CALENDAR BLOB NOT NULL,
PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)
);
CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_FIRED_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
ENTRY_ID VARCHAR(95) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
FIRED_TIME BIGINT(13) NOT NULL,
SCHED_TIME BIGINT(13) NOT NULL,
PRIORITY INTEGER NOT NULL,
STATE VARCHAR(16) NOT NULL,
JOB_NAME VARCHAR(200) NULL,
JOB_GROUP VARCHAR(200) NULL,
IS_NONCONCURRENT VARCHAR(1) NULL,
REQUESTS_RECOVERY VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,ENTRY_ID)
);
CREATE TABLE QRTZ_SCHEDULER_STATE
(
SCHED_NAME VARCHAR(120) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
CHECKIN_INTERVAL BIGINT(13) NOT NULL,
PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
);
CREATE TABLE QRTZ_LOCKS
(
SCHED_NAME VARCHAR(120) NOT NULL,
LOCK_NAME VARCHAR(40) NOT NULL,
PRIMARY KEY (SCHED_NAME,LOCK_NAME)
);
commit;oracle
--
-- A hint submitted by a user: Oracle DB MUST be created as "shared" and the
-- job_queue_processes parameter must be greater than 2
-- However, these settings are pretty much standard after any
-- Oracle install, so most users need not worry about this.
--
-- Many other users (including the primary author of Quartz) have had success
-- runing in dedicated mode, so only consider the above as a hint ;-)
--
delete from qrtz_fired_triggers;
delete from qrtz_simple_triggers;
delete from qrtz_simprop_triggers;
delete from qrtz_cron_triggers;
delete from qrtz_blob_triggers;
delete from qrtz_triggers;
delete from qrtz_job_details;
delete from qrtz_calendars;
delete from qrtz_paused_trigger_grps;
delete from qrtz_locks;
delete from qrtz_scheduler_state;
drop table qrtz_calendars;
drop table qrtz_fired_triggers;
drop table qrtz_blob_triggers;
drop table qrtz_cron_triggers;
drop table qrtz_simple_triggers;
drop table qrtz_simprop_triggers;
drop table qrtz_triggers;
drop table qrtz_job_details;
drop table qrtz_paused_trigger_grps;
drop table qrtz_locks;
drop table qrtz_scheduler_state;
CREATE TABLE qrtz_job_details
(
SCHED_NAME VARCHAR2(120) NOT NULL,
JOB_NAME VARCHAR2(200) NOT NULL,
JOB_GROUP VARCHAR2(200) NOT NULL,
DESCRIPTION VARCHAR2(250) NULL,
JOB_CLASS_NAME VARCHAR2(250) NOT NULL,
IS_DURABLE VARCHAR2(1) NOT NULL,
IS_NONCONCURRENT VARCHAR2(1) NOT NULL,
IS_UPDATE_DATA VARCHAR2(1) NOT NULL,
REQUESTS_RECOVERY VARCHAR2(1) NOT NULL,
JOB_DATA BLOB NULL,
CONSTRAINT QRTZ_JOB_DETAILS_PK PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
);
CREATE TABLE qrtz_triggers
(
SCHED_NAME VARCHAR2(120) NOT NULL,
TRIGGER_NAME VARCHAR2(200) NOT NULL,
TRIGGER_GROUP VARCHAR2(200) NOT NULL,
JOB_NAME VARCHAR2(200) NOT NULL,
JOB_GROUP VARCHAR2(200) NOT NULL,
DESCRIPTION VARCHAR2(250) NULL,
NEXT_FIRE_TIME NUMBER(13) NULL,
PREV_FIRE_TIME NUMBER(13) NULL,
PRIORITY NUMBER(13) NULL,
TRIGGER_STATE VARCHAR2(16) NOT NULL,
TRIGGER_TYPE VARCHAR2(8) NOT NULL,
START_TIME NUMBER(13) NOT NULL,
END_TIME NUMBER(13) NULL,
CALENDAR_NAME VARCHAR2(200) NULL,
MISFIRE_INSTR NUMBER(2) NULL,
JOB_DATA BLOB NULL,
CONSTRAINT QRTZ_TRIGGERS_PK PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
CONSTRAINT QRTZ_TRIGGER_TO_JOBS_FK FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
);
CREATE TABLE qrtz_simple_triggers
(
SCHED_NAME VARCHAR2(120) NOT NULL,
TRIGGER_NAME VARCHAR2(200) NOT NULL,
TRIGGER_GROUP VARCHAR2(200) NOT NULL,
REPEAT_COUNT NUMBER(7) NOT NULL,
REPEAT_INTERVAL NUMBER(12) NOT NULL,
TIMES_TRIGGERED NUMBER(10) NOT NULL,
CONSTRAINT QRTZ_SIMPLE_TRIG_PK PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
CONSTRAINT QRTZ_SIMPLE_TRIG_TO_TRIG_FK FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE qrtz_cron_triggers
(
SCHED_NAME VARCHAR2(120) NOT NULL,
TRIGGER_NAME VARCHAR2(200) NOT NULL,
TRIGGER_GROUP VARCHAR2(200) NOT NULL,
CRON_EXPRESSION VARCHAR2(120) NOT NULL,
TIME_ZONE_ID VARCHAR2(80),
CONSTRAINT QRTZ_CRON_TRIG_PK PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
CONSTRAINT QRTZ_CRON_TRIG_TO_TRIG_FK FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE qrtz_simprop_triggers
(
SCHED_NAME VARCHAR2(120) NOT NULL,
TRIGGER_NAME VARCHAR2(200) NOT NULL,
TRIGGER_GROUP VARCHAR2(200) NOT NULL,
STR_PROP_1 VARCHAR2(512) NULL,
STR_PROP_2 VARCHAR2(512) NULL,
STR_PROP_3 VARCHAR2(512) NULL,
INT_PROP_1 NUMBER(10) NULL,
INT_PROP_2 NUMBER(10) NULL,
LONG_PROP_1 NUMBER(13) NULL,
LONG_PROP_2 NUMBER(13) NULL,
DEC_PROP_1 NUMERIC(13,4) NULL,
DEC_PROP_2 NUMERIC(13,4) NULL,
BOOL_PROP_1 VARCHAR2(1) NULL,
BOOL_PROP_2 VARCHAR2(1) NULL,
CONSTRAINT QRTZ_SIMPROP_TRIG_PK PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
CONSTRAINT QRTZ_SIMPROP_TRIG_TO_TRIG_FK FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE qrtz_blob_triggers
(
SCHED_NAME VARCHAR2(120) NOT NULL,
TRIGGER_NAME VARCHAR2(200) NOT NULL,
TRIGGER_GROUP VARCHAR2(200) NOT NULL,
BLOB_DATA BLOB NULL,
CONSTRAINT QRTZ_BLOB_TRIG_PK PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
CONSTRAINT QRTZ_BLOB_TRIG_TO_TRIG_FK FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE qrtz_calendars
(
SCHED_NAME VARCHAR2(120) NOT NULL,
CALENDAR_NAME VARCHAR2(200) NOT NULL,
CALENDAR BLOB NOT NULL,
CONSTRAINT QRTZ_CALENDARS_PK PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)
);
CREATE TABLE qrtz_paused_trigger_grps
(
SCHED_NAME VARCHAR2(120) NOT NULL,
TRIGGER_GROUP VARCHAR2(200) NOT NULL,
CONSTRAINT QRTZ_PAUSED_TRIG_GRPS_PK PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
);
CREATE TABLE qrtz_fired_triggers
(
SCHED_NAME VARCHAR2(120) NOT NULL,
ENTRY_ID VARCHAR2(95) NOT NULL,
TRIGGER_NAME VARCHAR2(200) NOT NULL,
TRIGGER_GROUP VARCHAR2(200) NOT NULL,
INSTANCE_NAME VARCHAR2(200) NOT NULL,
FIRED_TIME NUMBER(13) NOT NULL,
SCHED_TIME NUMBER(13) NOT NULL,
PRIORITY NUMBER(13) NOT NULL,
STATE VARCHAR2(16) NOT NULL,
JOB_NAME VARCHAR2(200) NULL,
JOB_GROUP VARCHAR2(200) NULL,
IS_NONCONCURRENT VARCHAR2(1) NULL,
REQUESTS_RECOVERY VARCHAR2(1) NULL,
CONSTRAINT QRTZ_FIRED_TRIGGER_PK PRIMARY KEY (SCHED_NAME,ENTRY_ID)
);
CREATE TABLE qrtz_scheduler_state
(
SCHED_NAME VARCHAR2(120) NOT NULL,
INSTANCE_NAME VARCHAR2(200) NOT NULL,
LAST_CHECKIN_TIME NUMBER(13) NOT NULL,
CHECKIN_INTERVAL NUMBER(13) NOT NULL,
CONSTRAINT QRTZ_SCHEDULER_STATE_PK PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
);
CREATE TABLE qrtz_locks
(
SCHED_NAME VARCHAR2(120) NOT NULL,
LOCK_NAME VARCHAR2(40) NOT NULL,
CONSTRAINT QRTZ_LOCKS_PK PRIMARY KEY (SCHED_NAME,LOCK_NAME)
);
create index idx_qrtz_j_req_recovery on qrtz_job_details(SCHED_NAME,REQUESTS_RECOVERY);
create index idx_qrtz_j_grp on qrtz_job_details(SCHED_NAME,JOB_GROUP);
create index idx_qrtz_t_j on qrtz_triggers(SCHED_NAME,JOB_NAME,JOB_GROUP);
create index idx_qrtz_t_jg on qrtz_triggers(SCHED_NAME,JOB_GROUP);
create index idx_qrtz_t_c on qrtz_triggers(SCHED_NAME,CALENDAR_NAME);
create index idx_qrtz_t_g on qrtz_triggers(SCHED_NAME,TRIGGER_GROUP);
create index idx_qrtz_t_state on qrtz_triggers(SCHED_NAME,TRIGGER_STATE);
create index idx_qrtz_t_n_state on qrtz_triggers(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
create index idx_qrtz_t_n_g_state on qrtz_triggers(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
create index idx_qrtz_t_next_fire_time on qrtz_triggers(SCHED_NAME,NEXT_FIRE_TIME);
create index idx_qrtz_t_nft_st on qrtz_triggers(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
create index idx_qrtz_t_nft_misfire on qrtz_triggers(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
create index idx_qrtz_t_nft_st_misfire on qrtz_triggers(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
create index idx_qrtz_t_nft_st_misfire_grp on qrtz_triggers(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);
create index idx_qrtz_ft_trig_inst_name on qrtz_fired_triggers(SCHED_NAME,INSTANCE_NAME);
create index idx_qrtz_ft_inst_job_req_rcvry on qrtz_fired_triggers(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
create index idx_qrtz_ft_j_g on qrtz_fired_triggers(SCHED_NAME,JOB_NAME,JOB_GROUP);
create index idx_qrtz_ft_jg on qrtz_fired_triggers(SCHED_NAME,JOB_GROUP);
create index idx_qrtz_ft_t_g on qrtz_fired_triggers(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
create index idx_qrtz_ft_tg on qrtz_fired_triggers(SCHED_NAME,TRIGGER_GROUP);创建查询实体
package com.yupi.springbootinit.example.quartz.model.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import lombok.Data;
import java.math.BigInteger;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/25
*/
@Data
public class JobAndTrigger {
@TableId(type = IdType.AUTO)
private Long id;
/**
* 定时任务名称
*/
private String jobName;
/**
* 定时任务组
*/
private String jobGroup;
/**
* 定时任务全类名
*/
private String jobClassName;
/**
* 触发器名称
*/
private String triggerName;
/**
* 触发器组
*/
private String triggerGroup;
/**
* 重复间隔
*/
private BigInteger repeatInterval;
/**
* 触发次数
*/
private BigInteger timesTriggered;
/**
* cron 表达式
*/
private String cronExpression;
/**
* 时区
*/
private String timeZoneId;
/**
* 定时任务状态
*/
private String triggerState;
}mapper
package com.yupi.springbootinit.example.quartz.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.yupi.springbootinit.example.quartz.model.entity.JobAndTrigger;
import java.util.List;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/25
*/
public interface JobMapper extends BaseMapper<JobAndTrigger> {
/**
* 查询定时作业和触发器列表
* @return 定时作业和触发器列表
*/
List<JobAndTrigger> list(Page<JobAndTrigger> page);
/**
* 查询指定定时任务
* @param jobAndTrigger
* @return
*/
List<JobAndTrigger> selectQrtzCronTriggers(JobAndTrigger jobAndTrigger);
}xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.yupi.springbootinit.example.quartz.mapper.JobMapper">
<parameterMap id="jobAndTrigger" type="com.yupi.springbootinit.example.quartz.model.entity.JobAndTrigger"></parameterMap>
<select id="list" resultType="com.yupi.springbootinit.example.quartz.model.entity.JobAndTrigger">
SELECT job_details.JOB_NAME as jobName,
job_details.JOB_GROUP as jobGroup,
job_details.JOB_CLASS_NAME as jobClassName,
cron_triggers .CRON_EXPRESSION as cronExpression,
cron_triggers.TIME_ZONE_ID as timeZoneId,
qrtz_triggers.TRIGGER_NAME as triggerName,
qrtz_triggers.TRIGGER_GROUP as triggerGroup,
qrtz_triggers.TRIGGER_STATE as triggerState
FROM QRTZ_JOB_DETAILS job_details
LEFT JOIN QRTZ_CRON_TRIGGERS cron_triggers
ON job_details.JOB_NAME = cron_triggers.TRIGGER_NAME
AND job_details.JOB_GROUP = cron_triggers.TRIGGER_GROUP
LEFT JOIN QRTZ_TRIGGERS qrtz_triggers
ON qrtz_triggers.TRIGGER_NAME = job_details.JOB_NAME
AND qrtz_triggers.TRIGGER_GROUP = job_details.JOB_GROUP
</select>
<select id="selectQrtzCronTriggers" resultType="com.yupi.springbootinit.example.quartz.model.entity.JobAndTrigger" parameterMap="jobAndTrigger">
select
qrtz_triggers.TRIGGER_NAME as triggerName,
qrtz_triggers.TRIGGER_GROUP as triggerGroup,
qrtz_triggers.CRON_EXPRESSION as cronExpression
from QRTZ_CRON_TRIGGERS qrtz_triggers
<where>
<if test="triggerName != null and triggerName != ''">
AND cron_triggers.TRIGGER_NAME = #{triggerName}
</if>
<if test="triggerGroup != null and triggerGroup != ''">
AND cron_triggers.TRIGGER_GROUP = #{triggerGroup}
</if>
</where>
</select>
</mapper>请求实体
package com.yupi.springbootinit.example.quartz.model.dto;
import lombok.Data;
import javax.validation.constraints.NotBlank;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/25
*/
@Data
public class QueryJob {
/**
* 定时任务全类名
*/
@NotBlank(message = "类名不能为空")
private String jobClassName;
/**
* 任务组名
*/
@NotBlank(message = "任务组名不能为空")
private String jobGroupName;
/**
* 定时任务cron表达式
*/
private String cronExpression;
}package com.yupi.springbootinit.example.quartz.model.dto;
import lombok.Data;
import lombok.experimental.Accessors;
import javax.validation.constraints.NotBlank;
/**
* @author tuaofei
* @description 定时任务详情
* @date 2024/12/25
*/
@Data
@Accessors(chain = true)
public class JobForm {
/**
* 定时任务全类名
*/
@NotBlank(message = "类名不能为空")
private String jobClassName;
/**
* 任务组名
*/
@NotBlank(message = "任务组名不能为空")
private String jobGroupName;
/**
* 定时任务cron表达式
*/
@NotBlank(message = "cron表达式不能为空")
private String cronExpression;
}封装工作Job
package com.yupi.springbootinit.example.quartz.job.base;
import org.quartz.*;
/**
* @author tuaofei
* @description Job 基类,主要是在 {@link org.quartz.Job} 上再封装一层,只让我们自己项目里的Job去实现
* @date 2024/12/25
*/
public interface BaseJob extends Job {
/**
* <p>
* Called by the <code>{@link Scheduler}</code> when a <code>{@link Trigger}</code>
* fires that is associated with the <code>Job</code>.
* </p>
*
* <p>
* The implementation may wish to set a
* {@link JobExecutionContext#setResult(Object) result} object on the
* {@link JobExecutionContext} before this method exits. The result itself
* is meaningless to Quartz, but may be informative to
* <code>{@link JobListener}s</code> or
* <code>{@link TriggerListener}s</code> that are watching the job's
* execution.
* </p>
*
* @param context 上下文
* @throws JobExecutionException if there is an exception while executing the job.
*/
@Override
void execute(JobExecutionContext context) throws JobExecutionException;
}构建一个定时任务
package com.yupi.springbootinit.example.quartz.job;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.yupi.springbootinit.example.quartz.job.base.BaseJob;
import com.yupi.springbootinit.example.quartz.model.entity.JobAndTrigger;
import com.yupi.springbootinit.example.quartz.service.JobService;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import javax.annotation.Resource;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/25
*/
@Slf4j
public class TestJob1 implements BaseJob {
@Resource
private JobService jobService;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
log.info("开始执行定时任务...");
log.warn("执行时间: {}", DateUtil.now());
JobDetail jobDetail = context.getJobDetail();
Page<JobAndTrigger> list = jobService.list(1, 10);
log.info("所有定时任务信息: {}", JSONUtil.toJsonStr(list));
log.info("结束执行定时任务...");
}
}定时任务反射工具类
package com.yupi.springbootinit.example.quartz.utils;
import com.yupi.springbootinit.example.quartz.job.base.BaseJob;
/**
* @author tuaofei
* @description 定时任务反射工具类
* @date 2024/12/25
*/
public class JobUtil {
/**
* 根据全类名获取Job实例
*
* @param classname Job全类名
* @return {@link BaseJob} 实例
* @throws Exception 泛型获取异常
*/
public static BaseJob getClass(String classname) throws Exception {
Class<?> clazz = Class.forName(classname);
return (BaseJob) clazz.newInstance();
}
}service
package com.yupi.springbootinit.example.quartz.service;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.IService;
import com.yupi.springbootinit.example.quartz.model.dto.JobForm;
import com.yupi.springbootinit.example.quartz.model.dto.QueryJob;
import com.yupi.springbootinit.example.quartz.model.entity.JobAndTrigger;
import org.quartz.SchedulerException;
import java.util.Date;
import java.util.List;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/25
*/
public interface JobService extends IService<JobAndTrigger> {
/**
* 添加并启动定时任务
*
* @param form 表单参数 {@link JobForm}
* @throws Exception 异常
*/
void addJob(JobForm form) throws Exception;
/**
* 删除定时任务
*
* @param form 表单参数 {@link JobForm}
* @throws SchedulerException 异常
*/
void deleteJob(JobForm form) throws SchedulerException;
/**
* 暂停定时任务
*
* @param form 表单参数 {@link JobForm}
* @throws SchedulerException 异常
*/
void pauseJob(JobForm form) throws SchedulerException;
/**
* 恢复定时任务
*
* @param form 表单参数 {@link JobForm}
* @throws SchedulerException 异常
*/
void resumeJob(JobForm form) throws SchedulerException;
/**
* 重新配置定时任务
*
* @param form 表单参数 {@link JobForm}
* @throws Exception 异常
*/
void cronJob(JobForm form) throws Exception;
/**
* 查询定时任务列表
*
* @param currentPage 当前页
* @param pageSize 每页条数
* @return 定时任务列表
*/
Page<JobAndTrigger> list(Integer currentPage, Integer pageSize);
/**
* 查询该表达式的下次执行时间
* @param cronExpression
* @param numTimes
* @return
*/
List<Date> getNextFireTimes(String cronExpression, Integer numTimes);
/**
* 通过定时任务获取执行时间
* @param queryJob
* @return
*/
List<Date> getNextFireTimesByJob(QueryJob queryJob);
}servieImpl
package com.yupi.springbootinit.example.quartz.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.yupi.springbootinit.common.ErrorCode;
import com.yupi.springbootinit.example.quartz.job.base.BaseJob;
import com.yupi.springbootinit.example.quartz.mapper.JobMapper;
import com.yupi.springbootinit.example.quartz.model.dto.JobForm;
import com.yupi.springbootinit.example.quartz.model.dto.QueryJob;
import com.yupi.springbootinit.example.quartz.model.entity.JobAndTrigger;
import com.yupi.springbootinit.example.quartz.service.JobService;
import com.yupi.springbootinit.example.quartz.utils.JobUtil;
import com.yupi.springbootinit.exception.BusinessException;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/25
*/
@Service
@Slf4j
public class JobServiceImpl extends ServiceImpl<JobMapper, JobAndTrigger> implements JobService {
private final Scheduler scheduler;
private final JobMapper jobMapper;
@Autowired
public JobServiceImpl(Scheduler scheduler, JobMapper jobMapper) {
this.scheduler = scheduler;
this.jobMapper = jobMapper;
}
/**
* 添加并启动定时任务
*
* @param form 表单参数 {@link JobForm}
* @throws Exception 异常
*/
@Override
public void addJob(JobForm form) throws Exception {
//启动调度器
scheduler.start();
//构建Job信息
Class<? extends BaseJob> jobClass = JobUtil.getClass(form.getJobClassName()).getClass();
JobBuilder jobBuilder = JobBuilder.newJob(jobClass);
JobDetail jobDetail = jobBuilder.withIdentity(form.getJobClassName(), form.getJobGroupName()).build();
// cron表达式调度构建器(任务执行时间)
CronScheduleBuilder cron = CronScheduleBuilder.cronSchedule(form.getCronExpression());
//根据cron表达式构建一个trigger
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
CronTrigger trigger = triggerBuilder.withIdentity(form.getJobClassName(), form.getJobGroupName()).withSchedule(cron).build();
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
log.error("【定时任务】创建失败!", e);
throw new BusinessException(ErrorCode.SYSTEM_ERROR, "【定时任务】创建失败!");
}
}
/**
* 删除定时任务
*
* @param form 表单参数 {@link JobForm}
* @throws SchedulerException 异常
*/
@Override
public void deleteJob(JobForm form) throws SchedulerException {
TriggerKey triggerKey = TriggerKey.triggerKey(form.getJobClassName(), form.getJobGroupName());
//暂停触发器
scheduler.pauseTrigger(triggerKey);
//移除触发器
scheduler.unscheduleJob(triggerKey);
JobKey jobKey = JobKey.jobKey(form.getJobClassName(), form.getJobGroupName());
//删除任务
scheduler.deleteJob(jobKey);
}
/**
* 暂停定时任务
*
* @param form 表单参数 {@link JobForm}
* @throws SchedulerException 异常
*/
@Override
public void pauseJob(JobForm form) throws SchedulerException {
scheduler.pauseJob(JobKey.jobKey(form.getJobClassName(), form.getJobGroupName()));
}
/**
* 恢复定时任务
*
* @param form 表单参数 {@link JobForm}
* @throws SchedulerException 异常
*/
@Override
public void resumeJob(JobForm form) throws SchedulerException {
scheduler.resumeJob(JobKey.jobKey(form.getJobClassName(), form.getJobGroupName()));
}
/**
* 重新配置定时任务
*
* @param form 表单参数 {@link JobForm}
* @throws Exception 异常
*/
@Override
public void cronJob(JobForm form) throws Exception {
TriggerKey triggerKey = TriggerKey.triggerKey(form.getJobClassName(), form.getJobGroupName());
// cron表达式调度构建器(任务执行时间)
CronScheduleBuilder cron = CronScheduleBuilder.cronSchedule(form.getCronExpression());
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(cron).build();
try {
//按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);
} catch (SchedulerException e) {
log.error("【定时任务】更新失败!", e);
throw new BusinessException(ErrorCode.SYSTEM_ERROR, "【定时任务】更新失败!");
}
}
/**
* 查询定时任务列表
*
* @param currentPage 当前页
* @param pageSize 每页条数
* @return 定时任务列表
*/
@Override
public Page<JobAndTrigger> list(Integer currentPage, Integer pageSize) {
Page<JobAndTrigger> page = new Page<>(currentPage, pageSize);
List<JobAndTrigger> list = jobMapper.list(page);
page.setRecords(list);
return page;
}
@Override
public List<Date> getNextFireTimes(String cronExpression, Integer numTimes) {
List<Date> nextFireTimes = new ArrayList<>();
try {
CronTrigger trigger = TriggerBuilder.newTrigger()
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
.build();
Date now = new Date();
Date nextFireTime = trigger.getFireTimeAfter(now);
// 获取接下来的n次执行时间
for (int i = 0; i < numTimes && nextFireTime != null; i++) {
nextFireTimes.add(nextFireTime);
nextFireTime = trigger.getFireTimeAfter(nextFireTime);
}
} catch (Exception e) {
log.error("获取下次执行时间失败: {}", cronExpression, e);
throw new BusinessException(ErrorCode.PARAMS_ERROR, "Cron表达式无效");
}
return nextFireTimes;
}
@Override
public List<Date> getNextFireTimesByJob(QueryJob queryJob) {
List<Date> dateList = new ArrayList<>();
if (queryJob == null) {
return dateList;
}
JobAndTrigger jobAndTriggerQuery = new JobAndTrigger();
jobAndTriggerQuery.setJobClassName(queryJob.getJobClassName());
jobAndTriggerQuery.setJobGroup(queryJob.getJobGroupName());
List<JobAndTrigger> jobAndTriggers = jobMapper.selectQrtzCronTriggers(jobAndTriggerQuery);
if (CollectionUtil.isEmpty(jobAndTriggers)) {
return dateList;
}
JobAndTrigger jobAndTrigger = jobAndTriggers.get(0);
String cronExpression = jobAndTrigger.getCronExpression();
List<Date> nextFireTimes = getNextFireTimes(cronExpression, 5);
return nextFireTimes;
}
}controller
package com.yupi.springbootinit.example.quartz.controller;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.yupi.springbootinit.common.BaseResponse;
import com.yupi.springbootinit.common.ErrorCode;
import com.yupi.springbootinit.common.ResultUtils;
import com.yupi.springbootinit.example.quartz.model.dto.JobForm;
import com.yupi.springbootinit.example.quartz.model.dto.QueryJob;
import com.yupi.springbootinit.example.quartz.model.entity.JobAndTrigger;
import com.yupi.springbootinit.example.quartz.service.JobService;
import com.yupi.springbootinit.example.quartz.service.impl.JobServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.quartz.SchedulerException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import javax.validation.Valid;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/25
*/
@RestController
@RequestMapping("/job")
@Slf4j
public class JobController {
private final JobService jobService;
@Autowired
public JobController(JobService jobService) {
this.jobService = jobService;
}
/**
* 保存定时任务
*/
@PostMapping("/addJob")
public BaseResponse<String> addJob(@Valid JobForm form) {
try {
jobService.addJob(form);
} catch (Exception e) {
return ResultUtils.error(ErrorCode.SYSTEM_ERROR, e.getMessage());
}
return ResultUtils.success("操作成功");
}
/**
* 删除定时任务
*/
@DeleteMapping
public BaseResponse<String> deleteJob(JobForm form) throws SchedulerException {
if (StrUtil.hasBlank(form.getJobGroupName(), form.getJobClassName())) {
return ResultUtils.error(ErrorCode.PARAMS_ERROR);
}
jobService.deleteJob(form);
return ResultUtils.success("删除成功");
}
/**
* 暂停定时任务
*/
@PutMapping(params = "pause")
public BaseResponse<String> pauseJob(JobForm form) throws SchedulerException {
if (StrUtil.hasBlank(form.getJobGroupName(), form.getJobClassName())) {
return ResultUtils.error(ErrorCode.PARAMS_ERROR);
}
jobService.pauseJob(form);
return ResultUtils.success("暂停成功");
}
/**
* 恢复定时任务
*/
@PutMapping(params = "resume")
public BaseResponse<String> resumeJob(JobForm form) throws SchedulerException {
if (StrUtil.hasBlank(form.getJobGroupName(), form.getJobClassName())) {
return ResultUtils.error(ErrorCode.PARAMS_ERROR);
}
jobService.resumeJob(form);
return ResultUtils.success("恢复成功");
}
/**
* 修改定时任务,定时时间
*/
@PutMapping(params = "cron")
public BaseResponse<String> cronJob(@Valid JobForm form) {
try {
jobService.cronJob(form);
} catch (Exception e) {
return ResultUtils.error(ErrorCode.SYSTEM_ERROR, e.getMessage());
}
return ResultUtils.success("修改成功");
}
@GetMapping
public BaseResponse<List<JobAndTrigger>> jobList(Integer currentPage, Integer pageSize) {
if (ObjectUtil.isNull(currentPage)) {
currentPage = 1;
}
if (ObjectUtil.isNull(pageSize)) {
pageSize = 10;
}
Page<JobAndTrigger> all = jobService.list(currentPage, pageSize);
List<JobAndTrigger> records = all.getRecords();
return ResultUtils.success(records);
}
@PostMapping("/queryJobNextFireTimes")
public BaseResponse<List<Date>> queryJobNextFireTimes(@Valid QueryJob queryJob) {
List<Date> dateList = new ArrayList<>();
try {
dateList = jobService.getNextFireTimesByJob(queryJob);
} catch (Exception e) {
return ResultUtils.error(ErrorCode.SYSTEM_ERROR, e.getMessage());
}
return ResultUtils.success(dateList);
}
}测试



websocket
实现后端主动往前端推送数据
- Spring Boot 整合 Websocket 官方文档:https://docs.spring.io/spring/docs/5.1.2.RELEASE/spring-framework-reference/web.html#websocket
- 服务器信息采集 oshi 使用:https://github.com/oshi/oshi
目录结构

pom.xml
<!-- 用于获取操作系统和硬件信息-->
<dependency>
<groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId>
<version>3.13.5</version> <!-- 或其他旧版本 -->
</dependency>
<!-- websocket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>WebSocketConfig
package com.yupi.springbootinit.example.websocket.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
/**
* @author tuaofei
* @description WebSocket配置
* @date 2024/12/25
*/
@Configuration
@EnableWebSocket
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 注册一个 /notification 端点,前端通过这个端点进行连接
registry.addEndpoint("/notification")
//解决跨域问题
.setAllowedOriginPatterns("*").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//定义了一个客户端订阅地址的前缀信息,也就是客户端接收服务端发送消息的前缀信息
registry.enableSimpleBroker("/topic");
}
}服务器相关实体
CPU
package com.yupi.springbootinit.example.websocket.model.server;
import cn.hutool.core.util.NumberUtil;
/**
* <p>
* CPU相关信息实体
* </p>
*
* @author yangkai.shen
* @date Created in 2018-12-14 16:09
*/
public class Cpu {
/**
* 核心数
*/
private int cpuNum;
/**
* CPU总的使用率
*/
private double total;
/**
* CPU系统使用率
*/
private double sys;
/**
* CPU用户使用率
*/
private double used;
/**
* CPU当前等待率
*/
private double wait;
/**
* CPU当前空闲率
*/
private double free;
public int getCpuNum() {
return cpuNum;
}
public void setCpuNum(int cpuNum) {
this.cpuNum = cpuNum;
}
public double getTotal() {
return NumberUtil.round(NumberUtil.mul(total, 100), 2).doubleValue();
}
public void setTotal(double total) {
this.total = total;
}
public double getSys() {
return NumberUtil.round(NumberUtil.mul(sys / total, 100), 2).doubleValue();
}
public void setSys(double sys) {
this.sys = sys;
}
public double getUsed() {
return NumberUtil.round(NumberUtil.mul(used / total, 100), 2).doubleValue();
}
public void setUsed(double used) {
this.used = used;
}
public double getWait() {
return NumberUtil.round(NumberUtil.mul(wait / total, 100), 2).doubleValue();
}
public void setWait(double wait) {
this.wait = wait;
}
public double getFree() {
return NumberUtil.round(NumberUtil.mul(free / total, 100), 2).doubleValue();
}
public void setFree(double free) {
this.free = free;
}
}package com.yupi.springbootinit.example.websocket.payload.server;
import com.google.common.collect.Lists;
import com.yupi.springbootinit.example.websocket.model.server.Cpu;
import com.yupi.springbootinit.example.websocket.payload.KV;
import lombok.Data;
import java.util.List;
/**
* <p>
* CPU相关信息实体VO
* </p>
*
* @author yangkai.shen
* @date Created in 2018-12-14 17:27
*/
@Data
public class CpuVO {
List<KV> data = Lists.newArrayList();
public static CpuVO create(Cpu cpu) {
CpuVO vo = new CpuVO();
vo.data.add(new KV("核心数", cpu.getCpuNum()));
vo.data.add(new KV("CPU总的使用率", cpu.getTotal()));
vo.data.add(new KV("CPU系统使用率", cpu.getSys() + "%"));
vo.data.add(new KV("CPU用户使用率", cpu.getUsed() + "%"));
vo.data.add(new KV("CPU当前等待率", cpu.getWait() + "%"));
vo.data.add(new KV("CPU当前空闲率", cpu.getFree() + "%"));
return vo;
}
}Jvm
package com.yupi.springbootinit.example.websocket.model.server;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.NumberUtil;
import java.lang.management.ManagementFactory;
import java.util.Date;
/**
* JVM相关信息实体
*/
public class Jvm {
/**
* 当前JVM占用的内存总数(M)
*/
private double total;
/**
* JVM最大可用内存总数(M)
*/
private double max;
/**
* JVM空闲内存(M)
*/
private double free;
/**
* JDK版本
*/
private String version;
/**
* JDK路径
*/
private String home;
/**
* JDK启动时间
*/
private String startTime;
/**
* JDK运行时间
*/
private String runTime;
public double getTotal() {
return NumberUtil.div(total, (1024 * 1024), 2);
}
public void setTotal(double total) {
this.total = total;
}
public double getMax() {
return NumberUtil.div(max, (1024 * 1024), 2);
}
public void setMax(double max) {
this.max = max;
}
public double getFree() {
return NumberUtil.div(free, (1024 * 1024), 2);
}
public void setFree(double free) {
this.free = free;
}
public double getUsed() {
return NumberUtil.div(total - free, (1024 * 1024), 2);
}
public double getUsage() {
return NumberUtil.mul(NumberUtil.div(total - free, total, 4), 100);
}
/**
* 获取JDK名称
*/
public String getName() {
return ManagementFactory.getRuntimeMXBean().getVmName();
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public String getHome() {
return home;
}
public void setHome(String home) {
this.home = home;
}
public void setStartTime(String startTime) {
this.startTime = startTime;
}
public String getStartTime() {
return DateUtil.formatDateTime(new Date(ManagementFactory.getRuntimeMXBean().getStartTime()));
}
public void setRunTime(String runTime) {
this.runTime = runTime;
}
public String getRunTime() {
long startTime = ManagementFactory.getRuntimeMXBean().getStartTime();
return DateUtil.formatBetween(DateUtil.current() - startTime);
}
}package com.yupi.springbootinit.example.websocket.payload.server;
import com.google.common.collect.Lists;
import com.yupi.springbootinit.example.websocket.model.server.Jvm;
import com.yupi.springbootinit.example.websocket.payload.KV;
import lombok.Data;
import java.util.List;
/**
* <p>
* JVM相关信息实体VO
* </p>
*
* @author yangkai.shen
* @date Created in 2018-12-14 17:28
*/
@Data
public class JvmVO {
List<KV> data = Lists.newArrayList();
public static JvmVO create(Jvm jvm) {
JvmVO vo = new JvmVO();
vo.data.add(new KV("当前JVM占用的内存总数(M)", jvm.getTotal() + "M"));
vo.data.add(new KV("JVM最大可用内存总数(M)", jvm.getMax() + "M"));
vo.data.add(new KV("JVM空闲内存(M)", jvm.getFree() + "M"));
vo.data.add(new KV("JVM使用率", jvm.getUsage() + "%"));
vo.data.add(new KV("JDK版本", jvm.getVersion()));
vo.data.add(new KV("JDK路径", jvm.getHome()));
vo.data.add(new KV("JDK启动时间", jvm.getStartTime()));
vo.data.add(new KV("JDK运行时间", jvm.getRunTime()));
return vo;
}
}Mem
package com.yupi.springbootinit.example.websocket.model.server;
import cn.hutool.core.util.NumberUtil;
/**
* <p>
* 內存相关信息实体
* </p>
*
* @author yangkai.shen
* @date Created in 2018-12-14 16:09
*/
public class Mem {
/**
* 内存总量
*/
private double total;
/**
* 已用内存
*/
private double used;
/**
* 剩余内存
*/
private double free;
public double getTotal() {
return NumberUtil.div(total, (1024 * 1024 * 1024), 2);
}
public void setTotal(long total) {
this.total = total;
}
public double getUsed() {
return NumberUtil.div(used, (1024 * 1024 * 1024), 2);
}
public void setUsed(long used) {
this.used = used;
}
public double getFree() {
return NumberUtil.div(free, (1024 * 1024 * 1024), 2);
}
public void setFree(long free) {
this.free = free;
}
public double getUsage() {
return NumberUtil.mul(NumberUtil.div(used, total, 4), 100);
}
}package com.yupi.springbootinit.example.websocket.payload.server;
import com.google.common.collect.Lists;
import com.yupi.springbootinit.example.websocket.model.server.Mem;
import com.yupi.springbootinit.example.websocket.payload.KV;
import lombok.Data;
import java.util.List;
/**
* <p>
* 內存相关信息实体VO
* </p>
*
* @author yangkai.shen
* @date Created in 2018-12-14 17:28
*/
@Data
public class MemVO {
List<KV> data = Lists.newArrayList();
public static MemVO create(Mem mem) {
MemVO vo = new MemVO();
vo.data.add(new KV("内存总量", mem.getTotal() + "G"));
vo.data.add(new KV("已用内存", mem.getUsed() + "G"));
vo.data.add(new KV("剩余内存", mem.getFree() + "G"));
vo.data.add(new KV("使用率", mem.getUsage() + "%"));
return vo;
}
}Sys
package com.yupi.springbootinit.example.websocket.model.server;
/**
* <p>
* 系统相关信息实体
* </p>
*
* @author yangkai.shen
* @date Created in 2018-12-14 16:10
*/
public class Sys {
/**
* 服务器名称
*/
private String computerName;
/**
* 服务器Ip
*/
private String computerIp;
/**
* 项目路径
*/
private String userDir;
/**
* 操作系统
*/
private String osName;
/**
* 系统架构
*/
private String osArch;
public String getComputerName() {
return computerName;
}
public void setComputerName(String computerName) {
this.computerName = computerName;
}
public String getComputerIp() {
return computerIp;
}
public void setComputerIp(String computerIp) {
this.computerIp = computerIp;
}
public String getUserDir() {
return userDir;
}
public void setUserDir(String userDir) {
this.userDir = userDir;
}
public String getOsName() {
return osName;
}
public void setOsName(String osName) {
this.osName = osName;
}
public String getOsArch() {
return osArch;
}
public void setOsArch(String osArch) {
this.osArch = osArch;
}
}package com.yupi.springbootinit.example.websocket.payload.server;
import com.google.common.collect.Lists;
import com.yupi.springbootinit.example.websocket.model.server.Sys;
import com.yupi.springbootinit.example.websocket.payload.KV;
import lombok.Data;
import java.util.List;
/**
* <p>
* 系统相关信息实体VO
* </p>
*
* @author yangkai.shen
* @date Created in 2018-12-14 17:28
*/
@Data
public class SysVO {
List<KV> data = Lists.newArrayList();
public static SysVO create(Sys sys) {
SysVO vo = new SysVO();
vo.data.add(new KV("服务器名称", sys.getComputerName()));
vo.data.add(new KV("服务器Ip", sys.getComputerIp()));
vo.data.add(new KV("项目路径", sys.getUserDir()));
vo.data.add(new KV("操作系统", sys.getOsName()));
vo.data.add(new KV("系统架构", sys.getOsArch()));
return vo;
}
}SysFile
package com.yupi.springbootinit.example.websocket.model.server;
/**
* <p>
* 系统文件相关信息实体
* </p>
*
* @author yangkai.shen
* @date Created in 2018-12-14 16:10
*/
public class SysFile {
/**
* 盘符路径
*/
private String dirName;
/**
* 盘符类型
*/
private String sysTypeName;
/**
* 文件类型
*/
private String typeName;
/**
* 总大小
*/
private String total;
/**
* 剩余大小
*/
private String free;
/**
* 已经使用量
*/
private String used;
/**
* 资源的使用率
*/
private double usage;
public String getDirName() {
return dirName;
}
public void setDirName(String dirName) {
this.dirName = dirName;
}
public String getSysTypeName() {
return sysTypeName;
}
public void setSysTypeName(String sysTypeName) {
this.sysTypeName = sysTypeName;
}
public String getTypeName() {
return typeName;
}
public void setTypeName(String typeName) {
this.typeName = typeName;
}
public String getTotal() {
return total;
}
public void setTotal(String total) {
this.total = total;
}
public String getFree() {
return free;
}
public void setFree(String free) {
this.free = free;
}
public String getUsed() {
return used;
}
public void setUsed(String used) {
this.used = used;
}
public double getUsage() {
return usage;
}
public void setUsage(double usage) {
this.usage = usage;
}
}package com.yupi.springbootinit.example.websocket.payload.server;
import com.google.common.collect.Lists;
import com.yupi.springbootinit.example.websocket.model.server.SysFile;
import com.yupi.springbootinit.example.websocket.payload.KV;
import lombok.Data;
import java.util.List;
/**
* <p>
* 系统文件相关信息实体VO
* </p>
*
* @author yangkai.shen
* @date Created in 2018-12-14 17:30
*/
@Data
public class SysFileVO {
List<List<KV>> data = Lists.newArrayList();
public static SysFileVO create(List<SysFile> sysFiles) {
SysFileVO vo = new SysFileVO();
for (SysFile sysFile : sysFiles) {
List<KV> item = Lists.newArrayList();
item.add(new KV("盘符路径", sysFile.getDirName()));
item.add(new KV("盘符类型", sysFile.getSysTypeName()));
item.add(new KV("文件类型", sysFile.getTypeName()));
item.add(new KV("总大小", sysFile.getTotal()));
item.add(new KV("剩余大小", sysFile.getFree()));
item.add(new KV("已经使用量", sysFile.getUsed()));
item.add(new KV("资源的使用率", sysFile.getUsage() + "%"));
vo.data.add(item);
}
return vo;
}
}Server
package com.yupi.springbootinit.example.websocket.model;
import cn.hutool.core.util.NumberUtil;
import com.yupi.springbootinit.example.websocket.model.server.*;
import com.yupi.springbootinit.example.websocket.utils.IpUtil;
import oshi.SystemInfo;
import oshi.hardware.CentralProcessor;
import oshi.hardware.CentralProcessor.TickType;
import oshi.hardware.GlobalMemory;
import oshi.hardware.HardwareAbstractionLayer;
import oshi.software.os.FileSystem;
import oshi.software.os.OSFileStore;
import oshi.software.os.OperatingSystem;
import oshi.util.Util;
import java.net.UnknownHostException;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
/**
* <p>
* 服务器相关信息实体
* </p>
*
* @author yangkai.shen
* @date Created in 2018-12-14 16:09
*/
public class Server {
private static final int OSHI_WAIT_SECOND = 1000;
/**
* CPU相关信息
*/
private Cpu cpu = new Cpu();
/**
* 內存相关信息
*/
private Mem mem = new Mem();
/**
* JVM相关信息
*/
private Jvm jvm = new Jvm();
/**
* 服务器相关信息
*/
private Sys sys = new Sys();
/**
* 磁盘相关信息
*/
private List<SysFile> sysFiles = new LinkedList<SysFile>();
public Cpu getCpu() {
return cpu;
}
public void setCpu(Cpu cpu) {
this.cpu = cpu;
}
public Mem getMem() {
return mem;
}
public void setMem(Mem mem) {
this.mem = mem;
}
public Jvm getJvm() {
return jvm;
}
public void setJvm(Jvm jvm) {
this.jvm = jvm;
}
public Sys getSys() {
return sys;
}
public void setSys(Sys sys) {
this.sys = sys;
}
public List<SysFile> getSysFiles() {
return sysFiles;
}
public void setSysFiles(List<SysFile> sysFiles) {
this.sysFiles = sysFiles;
}
public void copyTo() throws Exception {
SystemInfo si = new SystemInfo();
HardwareAbstractionLayer hal = si.getHardware();
setCpuInfo(hal.getProcessor());
setMemInfo(hal.getMemory());
setSysInfo();
setJvmInfo();
setSysFiles(si.getOperatingSystem());
}
/**
* 设置CPU信息
*/
private void setCpuInfo(CentralProcessor processor) {
// CPU信息
long[] prevTicks = processor.getSystemCpuLoadTicks();
Util.sleep(OSHI_WAIT_SECOND);
long[] ticks = processor.getSystemCpuLoadTicks();
long nice = ticks[TickType.NICE.getIndex()] - prevTicks[TickType.NICE.getIndex()];
long irq = ticks[TickType.IRQ.getIndex()] - prevTicks[TickType.IRQ.getIndex()];
long softirq = ticks[TickType.SOFTIRQ.getIndex()] - prevTicks[TickType.SOFTIRQ.getIndex()];
long steal = ticks[TickType.STEAL.getIndex()] - prevTicks[TickType.STEAL.getIndex()];
long cSys = ticks[TickType.SYSTEM.getIndex()] - prevTicks[TickType.SYSTEM.getIndex()];
long user = ticks[TickType.USER.getIndex()] - prevTicks[TickType.USER.getIndex()];
long iowait = ticks[TickType.IOWAIT.getIndex()] - prevTicks[TickType.IOWAIT.getIndex()];
long idle = ticks[TickType.IDLE.getIndex()] - prevTicks[TickType.IDLE.getIndex()];
long totalCpu = user + nice + cSys + idle + iowait + irq + softirq + steal;
cpu.setCpuNum(processor.getLogicalProcessorCount());
cpu.setTotal(totalCpu);
cpu.setSys(cSys);
cpu.setUsed(user);
cpu.setWait(iowait);
cpu.setFree(idle);
}
/**
* 设置内存信息
*/
private void setMemInfo(GlobalMemory memory) {
mem.setTotal(memory.getTotal());
mem.setUsed(memory.getTotal() - memory.getAvailable());
mem.setFree(memory.getAvailable());
}
/**
* 设置服务器信息
*/
private void setSysInfo() {
Properties props = System.getProperties();
sys.setComputerName(IpUtil.getHostName());
sys.setComputerIp(IpUtil.getHostIp());
sys.setOsName(props.getProperty("os.name"));
sys.setOsArch(props.getProperty("os.arch"));
sys.setUserDir(props.getProperty("user.dir"));
}
/**
* 设置Java虚拟机
*/
private void setJvmInfo() throws UnknownHostException {
Properties props = System.getProperties();
jvm.setTotal(Runtime.getRuntime().totalMemory());
jvm.setMax(Runtime.getRuntime().maxMemory());
jvm.setFree(Runtime.getRuntime().freeMemory());
jvm.setVersion(props.getProperty("java.version"));
jvm.setHome(props.getProperty("java.home"));
}
/**
* 设置磁盘信息
*/
private void setSysFiles(OperatingSystem os) {
FileSystem fileSystem = os.getFileSystem();
OSFileStore[] fsArray = fileSystem.getFileStores();
for (OSFileStore fs : fsArray) {
long free = fs.getUsableSpace();
long total = fs.getTotalSpace();
long used = total - free;
SysFile sysFile = new SysFile();
sysFile.setDirName(fs.getMount());
sysFile.setSysTypeName(fs.getType());
sysFile.setTypeName(fs.getName());
sysFile.setTotal(convertFileSize(total));
sysFile.setFree(convertFileSize(free));
sysFile.setUsed(convertFileSize(used));
sysFile.setUsage(NumberUtil.mul(NumberUtil.div(used, total, 4), 100));
sysFiles.add(sysFile);
}
}
/**
* 字节转换
*
* @param size 字节大小
* @return 转换后值
*/
public String convertFileSize(long size) {
long kb = 1024;
long mb = kb * 1024;
long gb = mb * 1024;
if (size >= gb) {
return String.format("%.1f GB", (float) size / gb);
} else if (size >= mb) {
float f = (float) size / mb;
return String.format(f > 100 ? "%.0f MB" : "%.1f MB", f);
} else if (size >= kb) {
float f = (float) size / kb;
return String.format(f > 100 ? "%.0f KB" : "%.1f KB", f);
} else {
return String.format("%d B", size);
}
}
}package com.yupi.springbootinit.example.websocket.payload;
import com.google.common.collect.Lists;
import com.yupi.springbootinit.example.websocket.model.Server;
import com.yupi.springbootinit.example.websocket.payload.server.*;
import lombok.Data;
import java.util.List;
/**
* <p>
* 服务器信息VO
* </p>
*
* @author yangkai.shen
* @date Created in 2018-12-14 17:25
*/
@Data
public class ServerVO {
List<CpuVO> cpu = Lists.newArrayList();
List<JvmVO> jvm = Lists.newArrayList();
List<MemVO> mem = Lists.newArrayList();
List<SysFileVO> sysFile = Lists.newArrayList();
List<SysVO> sys = Lists.newArrayList();
public ServerVO create(Server server) {
cpu.add(CpuVO.create(server.getCpu()));
jvm.add(JvmVO.create(server.getJvm()));
mem.add(MemVO.create(server.getMem()));
sysFile.add(SysFileVO.create(server.getSysFiles()));
sys.add(SysVO.create(server.getSys()));
return null;
}
}utils
package com.yupi.springbootinit.example.websocket.utils;
import cn.hutool.core.lang.Dict;
import com.yupi.springbootinit.example.websocket.model.Server;
import com.yupi.springbootinit.example.websocket.payload.ServerVO;
/**
* <p>
* 服务器转换工具类
* </p>
*
* @author yangkai.shen
* @date Created in 2018-12-17 10:24
*/
public class ServerUtil {
/**
* 包装成 ServerVO
*
* @param server server
* @return ServerVO
*/
public static ServerVO wrapServerVO(Server server) {
ServerVO serverVO = new ServerVO();
serverVO.create(server);
return serverVO;
}
/**
* 包装成 Dict
*
* @param serverVO serverVO
* @return Dict
*/
public static Dict wrapServerDict(ServerVO serverVO) {
Dict dict = Dict.create().set("cpu", serverVO.getCpu().get(0).getData()).set("mem", serverVO.getMem().get(0).getData()).set("sys", serverVO.getSys().get(0).getData()).set("jvm", serverVO.getJvm().get(0).getData()).set("sysFile", serverVO.getSysFile().get(0).getData());
return dict;
}
}package com.yupi.springbootinit.example.websocket.utils;
import javax.servlet.http.HttpServletRequest;
import java.net.InetAddress;
import java.net.UnknownHostException;
/**
* <p>
* IP 工具类
* </p>
*
* @author yangkai.shen
* @date Created in 2018-12-14 16:08
*/
public class IpUtil {
public static String getIpAddr(HttpServletRequest request) {
if (request == null) {
return "unknown";
}
String ip = request.getHeader("x-forwarded-for");
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("Proxy-Client-IP");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("X-Forwarded-For");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("WL-Proxy-Client-IP");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("X-Real-IP");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getRemoteAddr();
}
return "0:0:0:0:0:0:0:1".equals(ip) ? "127.0.0.1" : ip;
}
public static boolean internalIp(String ip) {
byte[] addr = textToNumericFormatV4(ip);
return internalIp(addr) || "127.0.0.1".equals(ip);
}
private static boolean internalIp(byte[] addr) {
final byte b0 = addr[0];
final byte b1 = addr[1];
// 10.x.x.x/8
final byte SECTION_1 = 0x0A;
// 172.16.x.x/12
final byte SECTION_2 = (byte) 0xAC;
final byte SECTION_3 = (byte) 0x10;
final byte SECTION_4 = (byte) 0x1F;
// 192.168.x.x/16
final byte SECTION_5 = (byte) 0xC0;
final byte SECTION_6 = (byte) 0xA8;
switch (b0) {
case SECTION_1:
return true;
case SECTION_2:
if (b1 >= SECTION_3 && b1 <= SECTION_4) {
return true;
}
case SECTION_5:
switch (b1) {
case SECTION_6:
return true;
}
default:
return false;
}
}
/**
* 将IPv4地址转换成字节
*
* @param text IPv4地址
* @return byte 字节
*/
public static byte[] textToNumericFormatV4(String text) {
if (text.length() == 0) {
return null;
}
byte[] bytes = new byte[4];
String[] elements = text.split("\\.", -1);
try {
long l;
int i;
switch (elements.length) {
case 1:
l = Long.parseLong(elements[0]);
if ((l < 0L) || (l > 4294967295L)) {
return null;
}
bytes[0] = (byte) (int) (l >> 24 & 0xFF);
bytes[1] = (byte) (int) ((l & 0xFFFFFF) >> 16 & 0xFF);
bytes[2] = (byte) (int) ((l & 0xFFFF) >> 8 & 0xFF);
bytes[3] = (byte) (int) (l & 0xFF);
break;
case 2:
l = Integer.parseInt(elements[0]);
if ((l < 0L) || (l > 255L)) {
return null;
}
bytes[0] = (byte) (int) (l & 0xFF);
l = Integer.parseInt(elements[1]);
if ((l < 0L) || (l > 16777215L)) {
return null;
}
bytes[1] = (byte) (int) (l >> 16 & 0xFF);
bytes[2] = (byte) (int) ((l & 0xFFFF) >> 8 & 0xFF);
bytes[3] = (byte) (int) (l & 0xFF);
break;
case 3:
for (i = 0; i < 2; ++i) {
l = Integer.parseInt(elements[i]);
if ((l < 0L) || (l > 255L)) {
return null;
}
bytes[i] = (byte) (int) (l & 0xFF);
}
l = Integer.parseInt(elements[2]);
if ((l < 0L) || (l > 65535L)) {
return null;
}
bytes[2] = (byte) (int) (l >> 8 & 0xFF);
bytes[3] = (byte) (int) (l & 0xFF);
break;
case 4:
for (i = 0; i < 4; ++i) {
l = Integer.parseInt(elements[i]);
if ((l < 0L) || (l > 255L)) {
return null;
}
bytes[i] = (byte) (int) (l & 0xFF);
}
break;
default:
return null;
}
} catch (NumberFormatException e) {
return null;
}
return bytes;
}
public static String getHostIp() {
try {
return InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
}
return "127.0.0.1";
}
public static String getHostName() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
}
return "未知";
}
}package com.yupi.springbootinit.example.websocket.common;
/**
* <p>
* WebSocket常量
* </p>
*
* @author yangkai.shen
* @date Created in 2018-12-14 16:01
*/
public interface WebSocketConsts {
String PUSH_SERVER = "/topic/server";
}KV
package com.yupi.springbootinit.example.websocket.payload;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* <p>
* 键值匹配
* </p>
*
* @author yangkai.shen
* @date Created in 2018-12-14 17:41
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class KV {
/**
* 键
*/
private String key;
/**
* 值
*/
private Object value;
}推送任务
package com.yupi.springbootinit.example.websocket.task;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.Dict;
import cn.hutool.json.JSONUtil;
import com.yupi.springbootinit.common.ErrorCode;
import com.yupi.springbootinit.example.quartz.job.base.BaseJob;
import com.yupi.springbootinit.example.websocket.common.WebSocketConsts;
import com.yupi.springbootinit.example.websocket.model.Server;
import com.yupi.springbootinit.example.websocket.payload.ServerVO;
import com.yupi.springbootinit.example.websocket.utils.ServerUtil;
import com.yupi.springbootinit.exception.BusinessException;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @author tuaofei
* @description 服务器定时执行任务
* @date 2024/12/25
*/
@Component
@Slf4j
public class ServerTask implements BaseJob {
@Autowired
private SimpMessagingTemplate wsTemplate;
@Override
public void execute(JobExecutionContext context) {
try {
log.info("【推送消息】开始执行:{}", DateUtil.formatDateTime(new Date()));
//查询服务器状态
Server server = new Server();
server.copyTo();
ServerVO serverVO = ServerUtil.wrapServerVO(server);
Dict dict = ServerUtil.wrapServerDict(serverVO);
String jsonStr = JSONUtil.toJsonStr(dict);
log.info(jsonStr);
wsTemplate.convertAndSend(WebSocketConsts.PUSH_SERVER, jsonStr);
log.info("【推送消息】执行结束:{}", DateUtil.formatDateTime(new Date()));
} catch (Exception e) {
log.error("【推送消息】出现错误", e);
throw new BusinessException(ErrorCode.SYSTEM_ERROR, e.getMessage());
}
}
}controller
package com.yupi.springbootinit.example.websocket.controller;
import cn.hutool.core.lang.Dict;
import com.yupi.springbootinit.example.websocket.model.Server;
import com.yupi.springbootinit.example.websocket.payload.ServerVO;
import com.yupi.springbootinit.example.websocket.utils.ServerUtil;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* <p>
* 服务器监控Controller
* </p>
*
* @author yangkai.shen
* @date Created in 2018-12-17 10:22
*/
@RestController
@RequestMapping("/server")
public class ServerController {
@GetMapping
public Dict serverInfo() throws Exception {
Server server = new Server();
server.copyTo();
ServerVO serverVO = ServerUtil.wrapServerVO(server);
return ServerUtil.wrapServerDict(serverVO);
}
}前端
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>服务器信息</title>
<link href="https://cdnjs.cloudflare.com/ajax/libs/element-ui/2.4.11/theme-chalk/index.css" rel="stylesheet">
<style>
.el-row {
margin-bottom: 20px;
}
.el-row:last-child {
margin-bottom: 0;
}
.sysFile {
margin-bottom: 5px;
}
.sysFile:last-child {
margin-bottom: 0;
}
</style>
</head>
<body>
<div id="app">
<el-container>
<el-header>
<el-button @click="_initSockJs" type="primary" :disabled="isConnected">手动连接</el-button>
<el-button @click="_destroySockJs" type="danger" :disabled="!isConnected">断开连接</el-button>
</el-header>
<el-main>
<el-row :gutter="20">
<el-col :span="12">
<el-card>
<div slot="header">
<span>CPU信息</span>
</div>
<el-table size="small" border :data="server.cpu" style="width: 100%">
<el-table-column prop="key" label="属性">
</el-table-column>
<el-table-column prop="value" label="值">
</el-table-column>
</el-table>
</el-card>
</el-col>
<el-col :span="12">
<el-card>
<div slot="header">
<span>内存信息</span>
</div>
<el-table size="small" border :data="server.mem" style="width: 100%">
<el-table-column prop="key" label="属性">
</el-table-column>
<el-table-column prop="value" label="值">
</el-table-column>
</el-table>
</el-card>
</el-col>
</el-row>
<el-row>
<el-col :span="24">
<el-card>
<div slot="header">
<span>服务器信息</span>
</div>
<el-table size="small" border :data="server.sys" style="width: 100%">
<el-table-column prop="key" label="属性">
</el-table-column>
<el-table-column prop="value" label="值">
</el-table-column>
</el-table>
</el-card>
</el-col>
</el-row>
<el-row>
<el-col :span="24">
<el-card>
<div slot="header">
<span>Java虚拟机信息</span>
</div>
<el-table size="small" border :data="server.jvm" style="width: 100%">
<el-table-column prop="key" label="属性">
</el-table-column>
<el-table-column prop="value" label="值">
</el-table-column>
</el-table>
</el-card>
</el-col>
</el-row>
<el-row>
<el-col :span="24">
<el-card>
<div slot="header">
<span>磁盘状态</span>
</div>
<div class="sysFile" v-for="(item,index) in server.sysFile" :key="index">
<el-table size="small" border :data="item" style="width: 100%">
<el-table-column prop="key" label="属性">
</el-table-column>
<el-table-column prop="value" label="值">
</el-table-column>
</el-table>
</div>
</el-card>
</el-col>
</el-row>
</el-main>
</el-container>
</div>
</body>
<script src="js/sockjs.min.js"></script>
<script src="js/stomp.js"></script>
<script src="https://cdn.bootcss.com/vue/2.5.21/vue.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/element-ui/2.4.11/index.js"></script>
<script src="https://cdn.bootcss.com/axios/0.19.0-beta.1/axios.min.js"></script>
<script>
const wsHost = "http://localhost:8101/api/notification";
const wsTopic = "/topic/server";
const app = new Vue({
el: '#app',
data: function () {
return {
isConnected: false,
stompClient: {},
socket: {},
server: {
cpu: [],
mem: [],
jvm: [],
sys: [],
sysFile: []
}
}
},
methods: {
_getServerInfo() {
axios.get('/api/server')
.then((response) => {
this.server = response.data
});
},
_initSockJs() {
this._getServerInfo();
this.socket = new SockJS(wsHost);
this.stompClient = Stomp.over(this.socket);
this.stompClient.connect({}, (frame) => {
console.log('websocket连接成功:' + frame);
this.isConnected = true;
this.$message('websocket服务器连接成功');
// 另外再注册一下消息推送
this.stompClient.subscribe(wsTopic, (response) => {
this.server = JSON.parse(response.body);
});
});
},
_destroySockJs() {
if (this.stompClient != null) {
this.stompClient.disconnect();
this.socket.onclose;
this.socket.close();
this.stompClient = {};
this.socket = {};
this.isConnected = false;
this.server.cpu = [];
this.server.mem = [];
this.server.jvm = [];
this.server.sys = [];
this.server.sysFile = [];
}
console.log('websocket断开成功!');
this.$message.error('websocket断开成功!');
}
},
mounted() {
this._initSockJs();
},
beforeDestroy() {
this._destroySockJs();
}
})
</script>
</html>测试


kafka
简单发送和接收信息
配置
pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>application.yml
kafka:
bootstrap-servers: localhost:9092
producer:
retries: 3
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: spring-boot-init
# 手动提交
enable-auto-commit: false
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
session.timeout.ms: 60000
listener:
log-container-config: false
concurrency: 5
# 手动提交
ack-mode: manual_immediateKafkaConfig
package com.yupi.springbootinit.example.kafka.config;
import com.yupi.springbootinit.example.kafka.constants.KafkaConstants;
import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/26
*/
@Configuration
@EnableConfigurationProperties({KafkaProperties.class})
@EnableKafka
@AllArgsConstructor
public class KafkaConfig {
private final KafkaProperties kafkaProperties;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(KafkaConstants.DEFAULT_PARTITION_NUM);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
@Bean("ackContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> ackContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(KafkaConstants.DEFAULT_PARTITION_NUM);
return factory;
}
}package com.yupi.springbootinit.example.kafka.constants;
/**
* <p>
* kafka 常量池
* </p>
*
* @author yangkai.shen
* @date Created in 2019-01-07 14:52
*/
public interface KafkaConstants {
/**
* 默认分区大小
*/
Integer DEFAULT_PARTITION_NUM = 3;
/**
* Topic 名称
*/
String TOPIC_TEST = "test";
/**
* 死信队列主题
*/
String TOPIC_DLQ = "test-dlq";
/**
* 重试次数的header key
*/
String HEADER_RETRY_COUNT = "retry-count";
/**
* 最大重试次数
*/
int MAX_RETRY_COUNT = 3;
/**
* 数据采集服务 topic
*/
String RAW_DATA_TOPIC = "raw-data";
String RAW_DATA_GROUP_ID = "raw-data-process-group";
}发送消息接口
package com.yupi.springbootinit.example.kafka.controller;
import com.yupi.springbootinit.common.BaseResponse;
import com.yupi.springbootinit.common.ErrorCode;
import com.yupi.springbootinit.common.ResultUtils;
import com.yupi.springbootinit.example.kafka.model.dto.KafKaMsg;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.validation.Valid;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/26
*/
@RestController
@Slf4j
public class KafKaController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 消息发送
*
* @param kafKaMsg
* @return
*/
@PostMapping("/sendMsg")
public BaseResponse<String> sendMsg(@Valid KafKaMsg kafKaMsg) {
String topic = kafKaMsg.getTopic();
String message = kafKaMsg.getMsg();
ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(topic, message);
StringBuilder resultMsg = new StringBuilder();
//异步回调方式
send.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable ex) {
// 消息发送失败
log.error("消息发送失败:topic = {}, message = {}", topic, message, ex);
resultMsg.append("消息发送失败:topic = ").append(topic).append(", message = ").append(message).append("异常信息: ").append(ex).toString();
}
@Override
public void onSuccess(SendResult<String, String> result) {
// 消息发送成功
log.info("消息发送成功:topic = {}, message = {}", topic, message);
resultMsg.append("消息发送成功:topic = ").append(topic).append(", message = ").append(message);
// 可以获取更多发送结果信息
RecordMetadata metadata = result.getRecordMetadata();
log.info("partition = {}, offset = {}", metadata.partition(), metadata.offset());
}
});
if (StringUtils.isNotBlank(resultMsg) && resultMsg.toString().contains("消息发送失败")) {
ResultUtils.error(ErrorCode.SYSTEM_ERROR, resultMsg.toString());
}
return ResultUtils.success(resultMsg.toString());
}
}接收消息处理
正常消息处理
package com.yupi.springbootinit.example.kafka.handler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yupi.springbootinit.example.kafka.constants.KafkaConstants;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/26
*/
@Component(KafkaConstants.TOPIC_TEST)
@Slf4j
public class MessageHandler {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, containerFactory = "ackContainerFactory")
public void handleMessage(ConsumerRecords<String, String> records, Acknowledgment acknowledgment) {
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext()) {
ConsumerRecord<String, String> record = iterator.next();
String message = record.value();
// 获取当前重试次数
int retryCount = getRetryCount(record);
try {
log.info("收到消息: topic = {}, partition = {}, offset = {}, retryCount = {}, message = {}",
record.topic(), record.partition(), record.offset(), retryCount, message);
// 处理消息的业务逻辑
processMessage(message);
// 处理成功,确认消息
acknowledgment.acknowledge();
log.info("消息处理成功,已提交offset");
} catch (Exception e) {
log.error("消息处理失败: topic = {}, partition = {}, offset = {}, retryCount = {}",
record.topic(), record.partition(), record.offset(), retryCount, e);
handleFailedMessage(record, retryCount, e);
// 确认原消息,因为消息已经被转发到重试队列或死信队列
acknowledgment.acknowledge();
}
}
}
/**
* 获取当前重试次数
*
* @param record
* @return
*/
private int getRetryCount(ConsumerRecord<String, String> record) {
Headers headers = record.headers();
Header retryHeader = headers.lastHeader(KafkaConstants.HEADER_RETRY_COUNT);
return retryHeader == null ? 0 : ByteBuffer.wrap(retryHeader.value()).getInt();
}
/**
* 处理失败消息
*
* @param record
* @param retryCount
* @param e
*/
private void handleFailedMessage(ConsumerRecord<String, String> record, int retryCount, Exception e) {
if (retryCount < KafkaConstants.MAX_RETRY_COUNT) {
// 未达到最大重试次数,发送到重试队列
sendToRetryTopic(record, retryCount + 1);
} else {
// 达到最大重试次数,发送到死信队列
sendToDLQ(record, e);
}
}
/**
* 发送到重试队列
*
* @param record
* @param nextRetryCount
*/
private void sendToRetryTopic(ConsumerRecord<String, String> record, int nextRetryCount) {
ProducerRecord<String, String> retryRecord = new ProducerRecord<>(
KafkaConstants.TOPIC_TEST, // 仍然发送到原主题
record.partition(),
record.key(),
record.value()
);
// 添加重试次数到header
retryRecord.headers().add(
KafkaConstants.HEADER_RETRY_COUNT,
ByteBuffer.allocate(4).putInt(nextRetryCount).array()
);
try {
kafkaTemplate.send(retryRecord).get(10, TimeUnit.SECONDS);
log.info("消息已发送到重试队列: retryCount = {}", nextRetryCount);
} catch (Exception ex) {
log.error("发送到重试队列失败", ex);
// 如果发送重试消息失败,则发送到死信队列
sendToDLQ(record, ex);
}
}
/**
* 发送到死信队列
*
* @param record
* @param e
*/
private void sendToDLQ(ConsumerRecord<String, String> record, Exception e) {
// 构建死信消息,包含原始消息信息和错误信息
Map<String, Object> dlqMessage = new HashMap<>();
dlqMessage.put("originalTopic", record.topic());
dlqMessage.put("originalPartition", record.partition());
dlqMessage.put("originalOffset", record.offset());
dlqMessage.put("originalMessage", record.value());
dlqMessage.put("error", e.getMessage());
dlqMessage.put("stackTrace", ExceptionUtils.getStackTrace(e));
dlqMessage.put("timestamp", System.currentTimeMillis());
try {
String dlqMessageJson = new ObjectMapper().writeValueAsString(dlqMessage);
kafkaTemplate.send(KafkaConstants.TOPIC_DLQ, dlqMessageJson).get(10, TimeUnit.SECONDS);
log.info("消息已发送到死信队列");
} catch (Exception ex) {
log.error("发送到死信队列失败", ex);
}
}
private void processMessage(String message) throws InterruptedException {
// 业务处理逻辑
// Thread.sleep(1000);
int i = 1/0;
}
}失败消息处理(死信队列)
package com.yupi.springbootinit.example.kafka.handler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yupi.springbootinit.example.kafka.constants.KafkaConstants;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/26
*/
@Component(KafkaConstants.TOPIC_DLQ)
@Slf4j
public class DLQMessageHandler {
@KafkaListener(topics = KafkaConstants.TOPIC_DLQ, containerFactory = "ackContainerFactory")
public void handleDLQMessage(ConsumerRecords<String, String> records, Acknowledgment acknowledgment) {
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext()) {
ConsumerRecord<String, String> record = iterator.next();
try {
String message = record.value();
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> dlqMessage = mapper.readValue(message, Map.class);
// 打印完整的错误信息
log.error("\n死信队列消息详情:\n" +
"原始主题:{}\n" +
"原始分区:{}\n" +
"原始偏移量:{}\n" +
"原始消息内容:{}\n" +
"错误信息:{}\n" +
"错误堆栈:\n{}\n" +
"失败时间:{}",
dlqMessage.get("originalTopic"),
dlqMessage.get("originalPartition"),
dlqMessage.get("originalOffset"),
dlqMessage.get("originalMessage"),
dlqMessage.get("error"),
dlqMessage.get("stackTrace"),
new Date((Long) dlqMessage.get("timestamp"))
);
// 可以将错误信息保存到数据库
saveDLQMessage(dlqMessage);
// 发送告警通知
sendAlert(dlqMessage);
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("处理死信队列消息失败", e);
// 死信队列的消息处理失败,这里选择不提交offset
}
}
}
private void saveDLQMessage(Map<String, Object> dlqMessage) {
// 示例:打印 SQL 语句
log.info("执行SQL: INSERT INTO kafka_dlq_messages (" +
"original_topic, original_partition, original_offset, " +
"original_message, error_message, error_stack_trace, failed_time" +
") VALUES ('{}', {}, {}, '{}', '{}', '{}', '{}')",
dlqMessage.get("originalTopic"),
dlqMessage.get("originalPartition"),
dlqMessage.get("originalOffset"),
dlqMessage.get("originalMessage"),
dlqMessage.get("error"),
dlqMessage.get("stackTrace"),
new Date((Long) dlqMessage.get("timestamp"))
);
}
private void sendAlert(Map<String, Object> dlqMessage) {
String alertMessage = String.format(
"Kafka消息处理失败告警\n" +
"主题:%s\n" +
"分区:%s\n" +
"偏移量:%s\n" +
"消息内容:%s\n" +
"错误原因:%s\n" +
"失败时间:%s",
dlqMessage.get("originalTopic"),
dlqMessage.get("originalPartition"),
dlqMessage.get("originalOffset"),
dlqMessage.get("originalMessage"),
dlqMessage.get("error"),
new Date((Long) dlqMessage.get("timestamp"))
);
log.error("发送告警消息:\n{}", alertMessage);
// 这里可以调用告警接口,如:
// - 发送邮件
// - 发送钉钉消息
// - 发送企业微信消息
// - 发送短信
}
}部署zookeeper集群
在kafka/config配置目录下
复制zookeeper.properties配置文件
zookeeper1.properties
dataDir=D:/tools/kafka_2.13-3.7.2/zookeeper-logs/zk1
clientPort=2181
# 集群配置
initLimit=5
syncLimit=2
tickTime=2000
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890zookeeper2.properties
dataDir=D:/tools/kafka_2.13-3.7.2/zookeeper-logs/zk1
clientPort=2182 #修改
# 集群配置
initLimit=5
syncLimit=2
tickTime=2000
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890zookeeper3.properties
dataDir=D:/tools/kafka_2.13-3.7.2/zookeeper-logs/zk1
clientPort=2183 #修改
# 集群配置
initLimit=5
syncLimit=2
tickTime=2000
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890注意配置的dataDir目录下的myid文件
里面存放的server.xx=的内容,上面的就是存放的1、2、3,注意不能有换行和空格,不然无法启动
启动脚本
@echo off
title ZooKeeper Cluster Starter
:: 设置 KAFKA_HOME
set KAFKA_HOME=D:\tools\kafka_2.13-3.7.2
:: 启动 ZooKeeper 集群
start "zookeeper1" %KAFKA_HOME%\bin\windows\zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper1.properties
timeout /t 5
start "zookeeper2" %KAFKA_HOME%\bin\windows\zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper2.properties
timeout /t 5
start "zookeeper3" %KAFKA_HOME%\bin\windows\zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper3.properties
echo ZooKeeper cluster is starting...
echo Please wait...
timeout /t 10
echo ZooKeeper cluster has been started.部署kafka集群
在kafka/config配置目录下
复制service.properties配置文件
service1.properties
broker.id=1
listeners=PLAINTEXT://:9092
log.dirs=D:/tools/kafka_2.13-3.7.2/kafka-logs/kafka1/logs
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183service1.properties
broker.id=2
listeners=PLAINTEXT://:9093
log.dirs=D:/tools/kafka_2.13-3.7.2/kafka-logs/kafka1/logs
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183service1.properties
broker.id=3
listeners=PLAINTEXT://:9094
log.dirs=D:/tools/kafka_2.13-3.7.2/kafka-logs/kafka1/logs
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183注意配置的log.dirs里面的meta.properties文件
#
#Fri Dec 27 09:47:11 CST 2024
cluster.id=bhEmyjJsTWyNn8bD8uQNAA
version=0
broker.id=1cluster.id可能对不上,根据提示换一个
启动脚本
@echo off
title Kafka Cluster Starter
setlocal enabledelayedexpansion
:: 设置 KAFKA_HOME(请修改为你的 Kafka 安装路径)
set KAFKA_HOME=D:\tools\kafka_2.13-3.7.2
:: 启动 Kafka 集群
echo Starting Kafka Cluster...
:: 启动第一个 Kafka 节点
start "kafka1" %KAFKA_HOME%\bin\windows\kafka-server-start.bat %KAFKA_HOME%\config\server1.properties
echo Started Kafka Broker 1
timeout /t 10
:: 启动第二个 Kafka 节点
start "kafka2" %KAFKA_HOME%\bin\windows\kafka-server-start.bat %KAFKA_HOME%\config\server2.properties
echo Started Kafka Broker 2
timeout /t 10
:: 启动第三个 Kafka 节点
start "kafka3" %KAFKA_HOME%\bin\windows\kafka-server-start.bat %KAFKA_HOME%\config\server3.properties
echo Started Kafka Broker 3
echo.
echo Kafka cluster is starting...
echo Please wait for all brokers to initialize...
timeout /t 10
:: 验证集群状态
echo Checking cluster status...
call %KAFKA_HOME%\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list
echo.
echo Kafka cluster startup completed.
endlocalredis
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 对象池,使用redis时必须引入 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>config
package com.yupi.springbootinit.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.io.Serializable;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/27
*/
@Configuration
@AutoConfigureAfter(RedisAutoConfiguration.class)
@EnableCaching
public class RedisConfig {
@Autowired
private RedisProperties redisProperties;
@Bean
public LettuceConnectionFactory lettuceConnectionFactory() {
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
redisStandaloneConfiguration.setHostName(redisProperties.getHost());
redisStandaloneConfiguration.setPort(redisProperties.getPort());
return new LettuceConnectionFactory(redisStandaloneConfiguration);
}
/**
* 默认情况下的模板只能支持RedisTemplate<String, String>,也就是只能存入字符串,因此支持序列化
*/
@Bean
public RedisTemplate<String, Serializable> redisCacheTemplate(LettuceConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Serializable> template = new RedisTemplate<>();
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
template.setConnectionFactory(redisConnectionFactory);
return template;
}
/**
* 配置使用注解的时候缓存配置,默认是序列化反序列化的形式,加上此配置则为 json 形式
*/
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
// 配置序列化
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
RedisCacheConfiguration redisCacheConfiguration = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())).serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
return RedisCacheManager.builder(factory).cacheDefaults(redisCacheConfiguration).build();
}
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
// 使用Jackson2JsonRedisSerializer作为序列化器
Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
mapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
serializer.setObjectMapper(mapper);
template.setValueSerializer(serializer);
template.setHashValueSerializer(serializer);
// 使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
}package com.yupi.springbootinit.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.time.Duration;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/27
*/
@ConfigurationProperties(prefix = "spring.redis")
@Component
@Data
public class RedisProperties {
private String host;
private int port;
private Duration timeout;
}yml
redis:
host: localhost
port: 6379
timeout: 10000ms
# Redis默认情况下有16个分片,这里配置具体使用的分片
database: 1
lettuce:
pool:
# 连接池最大连接数(使用负值表示没有限制) 默认 8
max-active: 8
# 连接池最大阻塞等待时间(使用负值表示没有限制) 默认 -1
max-wait: -1ms
# 连接池中的最大空闲连接 默认 8
max-idle: 8
# 连接池中的最小空闲连接 默认 0
min-idle: 0
cache:
# 一般来说是不用配置的,Spring Cache 会根据依赖的包自行装配
type: redis实体
package com.yupi.springbootinit.example.redis.model.entity;
import com.baomidou.mybatisplus.annotation.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/27
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@TableName("code_dictionary")
public class CodeDictionary implements Serializable {
@TableId(type = IdType.AUTO)
private Long id;
/**
* 是否删除
*/
@TableLogic
private Integer isDeleted;
private Date createTime;
private Date updateTime;
@TableField(exist = false)
private static final long serialVersionUID = 1L;
private String type;
private String code;
private String name;
private String attr1;
private String attr2;
private String attr3;
private String attr4;
private String attr5;
private String attr6;
private String attr7;
private String attr8;
private String attr9;
private String attr10;
private String attr11;
private String attr12;
}常量
package com.yupi.springbootinit.example.redis.constants;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/27
*/
public interface RedisKeyConstants {
String CACHE_KEY_PREFIX = "code_dictionary:";
}mapper
package com.yupi.springbootinit.example.redis.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yupi.springbootinit.example.redis.model.entity.CodeDictionary;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/27
*/
public interface CodeDictionaryMapper extends BaseMapper<CodeDictionary> {
}service
package com.yupi.springbootinit.example.redis.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.yupi.springbootinit.example.redis.model.entity.CodeDictionary;
import java.util.List;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/27
*/
public interface CodeDictionaryService extends IService<CodeDictionary> {
void loadCache();
void clearCache();
CodeDictionary getByAndTypeCode(String type, String code);
List<CodeDictionary> getByType(String type);
}package com.yupi.springbootinit.example.redis.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.yupi.springbootinit.example.redis.constants.RedisKeyConstants;
import com.yupi.springbootinit.example.redis.mapper.CodeDictionaryMapper;
import com.yupi.springbootinit.example.redis.model.entity.CodeDictionary;
import com.yupi.springbootinit.example.redis.service.CodeDictionaryService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/27
*/
@Service
public class CodeDictionaryServiceImpl extends ServiceImpl<CodeDictionaryMapper, CodeDictionary> implements CodeDictionaryService {
@Autowired
private RedisTemplate<String, Serializable> redisTemplate;
@Autowired
private CodeDictionaryMapper codeDictionaryMapper;
@Override
public void loadCache() {
List<CodeDictionary> codeDictionaryList = (List<CodeDictionary>) redisTemplate.opsForValue().get(RedisKeyConstants.CACHE_KEY_PREFIX);
if (CollectionUtil.isNotEmpty(codeDictionaryList)) {
List<CodeDictionary> list = list();
if (list != null && list.size() > 0) {
Map<String, List<CodeDictionary>> typeMap = list.stream().collect(Collectors.groupingBy(item -> RedisKeyConstants.CACHE_KEY_PREFIX + item.getType()));
for (Map.Entry<String, List<CodeDictionary>> entry : typeMap.entrySet()) {
redisTemplate.opsForValue().set(entry.getKey(), (Serializable) entry.getValue());
}
}
}
}
@Override
public void clearCache() {
Set<String> keys = redisTemplate.keys(RedisKeyConstants.CACHE_KEY_PREFIX + "*");
if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
}
}
@Override
public CodeDictionary getByAndTypeCode(String type, String code) {
List<CodeDictionary> codeDictionarieList = (List<CodeDictionary>) redisTemplate.opsForValue().get(RedisKeyConstants.CACHE_KEY_PREFIX + type);
if (CollectionUtil.isNotEmpty(codeDictionarieList)) {
for (CodeDictionary codeDictionary : codeDictionarieList) {
if (code.equals(codeDictionary.getCode())) {
return codeDictionary;
}
}
}
QueryWrapper<CodeDictionary> wrapper = new QueryWrapper<>();
wrapper.eq("type", type);
wrapper.eq("code", code);
CodeDictionary codeDictionary = codeDictionaryMapper.selectOne(wrapper);
if (codeDictionary != null) {
return codeDictionary;
}
return new CodeDictionary();
}
@Override
public List<CodeDictionary> getByType(String type) {
List<CodeDictionary> codeDictionarieList = (List<CodeDictionary>) redisTemplate.opsForValue().get(RedisKeyConstants.CACHE_KEY_PREFIX + type);
if (CollectionUtil.isNotEmpty(codeDictionarieList)) {
return codeDictionarieList;
}
QueryWrapper<CodeDictionary> wrapper = new QueryWrapper<>();
wrapper.eq("type", type);
List<CodeDictionary> codeDictionaries = codeDictionaryMapper.selectList(wrapper);
if (CollectionUtil.isNotEmpty(codeDictionaries)) {
return codeDictionaries;
}
return new ArrayList<>();
}
}controller
package com.yupi.springbootinit.example.redis.model.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.checkerframework.checker.units.qual.A;
import javax.validation.constraints.NotBlank;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/27
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CodeDictionaryDto {
@NotBlank(message = "类型不能为空")
private String type;
@NotBlank(message = "编码不能为空")
private String code;
@NotBlank(message = "名称不能为空")
private String name;
private String attr1;
private String attr2;
private String attr3;
private String attr4;
private String attr5;
private String attr6;
private String attr7;
private String attr8;
private String attr9;
private String attr10;
private String attr11;
private String attr12;
}package com.yupi.springbootinit.example.redis.controller;
import com.yupi.springbootinit.common.BaseResponse;
import com.yupi.springbootinit.common.ErrorCode;
import com.yupi.springbootinit.common.ResultUtils;
import com.yupi.springbootinit.example.redis.model.dto.CodeDictionaryDto;
import com.yupi.springbootinit.example.redis.model.entity.CodeDictionary;
import com.yupi.springbootinit.example.redis.service.impl.CodeDictionaryServiceImpl;
import com.yupi.springbootinit.exception.BusinessException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @author tuaofei
* @description TODO
* @date 2024/12/27
*/
@RestController
@RequestMapping("/cache")
public class RedisController {
@Autowired
private CodeDictionaryServiceImpl codeDictionaryService;
@PostMapping("/loadCache")
public BaseResponse<String> loadCache() {
codeDictionaryService.loadCache();
return ResultUtils.success("加载缓存操作成功");
}
@PostMapping("/clearCache")
public BaseResponse<String> clearCache() {
codeDictionaryService.clearCache();
return ResultUtils.success("清除缓存操作成功");
}
@PostMapping("/getByAndTypeCode")
public BaseResponse<CodeDictionary> getByAndTypeCode() {
CodeDictionary byAndTypeCode = codeDictionaryService.getByAndTypeCode("USER", "TUAOFEI");
return ResultUtils.success(byAndTypeCode);
}
@PostMapping("/getByType")
public BaseResponse<List<CodeDictionary>> getByType() {
List<CodeDictionary> user = codeDictionaryService.getByType("USER");
return ResultUtils.success(user);
}
@PostMapping("/addCodeDictionary")
public BaseResponse<String> addCodeDictionary(CodeDictionaryDto codeDictionaryDto) {
if (codeDictionaryDto == null) {
throw new BusinessException(ErrorCode.PARAMS_ERROR);
}
Date curDate = new Date();
CodeDictionary codeDictionary = conversion(codeDictionaryDto);
codeDictionary.setCreateTime(curDate);
codeDictionary.setUpdateTime(curDate);
boolean status = codeDictionaryService.save(codeDictionary);
if (status) {
return ResultUtils.success("新增成功");
}
return ResultUtils.error(ErrorCode.OPERATION_ERROR, "新增失败");
}
@PostMapping("/addCodeDictionaryList")
public BaseResponse<String> addCodeDictionaryList(List<CodeDictionaryDto> codeDictionaryDtoList) {
if (codeDictionaryDtoList == null) {
throw new BusinessException(ErrorCode.PARAMS_ERROR);
}
List<CodeDictionary> codeDictionaryList = new ArrayList<>(codeDictionaryDtoList.size());
Date curDate = new Date();
for (CodeDictionaryDto codeDictionaryDto : codeDictionaryDtoList) {
CodeDictionary codeDictionary = conversion(codeDictionaryDto);
codeDictionary.setCreateTime(curDate);
codeDictionary.setUpdateTime(curDate);
codeDictionaryList.add(codeDictionary);
}
boolean status = codeDictionaryService.saveBatch(codeDictionaryList);
if (status) {
return ResultUtils.success("批量新增成功");
}
return ResultUtils.error(ErrorCode.OPERATION_ERROR, "批量新增失败");
}
private CodeDictionary conversion(CodeDictionaryDto codeDictionaryDto) {
CodeDictionary codeDictionary = new CodeDictionary();
codeDictionary.setType(codeDictionaryDto.getType());
codeDictionary.setCode(codeDictionaryDto.getCode());
codeDictionary.setName(codeDictionaryDto.getName());
codeDictionary.setAttr1(codeDictionaryDto.getAttr1());
codeDictionary.setAttr2(codeDictionaryDto.getAttr2());
codeDictionary.setAttr3(codeDictionaryDto.getAttr3());
codeDictionary.setAttr4(codeDictionaryDto.getAttr4());
codeDictionary.setAttr5(codeDictionaryDto.getAttr5());
codeDictionary.setAttr6(codeDictionaryDto.getAttr6());
codeDictionary.setAttr7(codeDictionaryDto.getAttr7());
codeDictionary.setAttr8(codeDictionaryDto.getAttr8());
codeDictionary.setAttr9(codeDictionaryDto.getAttr9());
codeDictionary.setAttr10(codeDictionaryDto.getAttr10());
codeDictionary.setAttr11(codeDictionaryDto.getAttr11());
codeDictionary.setAttr12(codeDictionaryDto.getAttr12());
return codeDictionary;
}
}测试


接收接口-消费-订阅模式
类似线程池的执行流程
贡献者
更新日志
fb8bc-更新为vuepress于