北京芯盾时代
约 9946 字大约 33 分钟
2026-04-04
项目中数据采集的主要流程
数采原始数据存储在数据库Wonderware,数据由PLC通过OPC协议上传,数据库时序数据库,基于时间
一条生产线,对应多个工序,每个工序有多个数采点位(数据采集地址),通过自定义线程查询每个点位的数据库数据,通过kafka消息队列推送每个点位的数据,每个生产线为一个topic(一个topic可以有多个分区,分区内数据是有序的),使用消费组订阅对应的topic,通过offset偏移来保证每个消费者不会重复消费,每个数据点位(牌号,批次,工单号等信息)对各自的数据进行消费,消费时存储数据到数据库同时存储数据到内存HashMap,并计算点位的监控图形(CPK,PPK..质量参数),通过websocket和前端实时通信,推送监控图形数据
其中有一个 Topic 叫做 order_topic ,用于存储生产工单的数采消息。
现在有以下几种消费场景:
场景 1 :有一个消费组 group_a ,它订阅了 order_topic ,用于处理生产工单的数采逻辑。 group_a 有 3 个消费者实例,它们会共同消费 order_topic 中的消息,每个消费者实例负责处理一部分生产工单的数采。
场景 2 :同时还有一个消费组 group_b ,它也订阅了 order_topic ,用于处理生产工单的批次牌号逻辑。 group_b 有 2 个消费者实例,它们会独立于 group_a ,共同消费 order_topic 中的消息,负责处理生产工单的批次牌号。
场景 3 :如果 group_a 中的一个消费者实例出现故障,Kafka 会自动将该消费者实例负责的消息重新分配给 group_a 中的其他消费者实例,以保证生产工单的数采逻辑正常运行,而 group_b 的消费逻辑不受影响。
时序数据库与传统机构化存储数据库的区别
传统关系型数据库:数据存储在磁盘上的文件系统中,通常以行存储或列存储的形式组织,通过索引(如 B-Tree 索引)来加速查询,但索引的维护成本较高
时序数据库:通常采用列存储(Columnar Storage)或混合存储方式,数据按时间顺序存储,便于快速读取时间范围内的数据
介绍一下kafka集群
多个broker组成,每个 Broker 可以存储多个 Topic 的分区数据
Broker 接收生产者(Producer)发送的消息,并将消息存储到对应的分区中
Broker 之间通过 ZooKeeper(或 Kafka 自带的 KRaft 模式)进行通信,协调集群的状态和元数据。
Kafka 集群的工作原理
消息存储
生产者将消息发送到指定的 Topic 和分区。Broker 接收到消息后,将其追加到对应的分区日志文件中。每个分区的日志文件是一个有序的、不可变的消息序列。
消息在分区中按顺序存储,并通过偏移量(Offset)来标识消息的位置。
消息读取
消费者从指定的 Topic 和分区中读取消息。消费者通过偏移量来定位消息,并向 Broker 发送读取请求。
Kafka 支持多种消费模式,包括从最早的消息开始消费、从最新的消息开始消费,或者从指定的偏移量开始消费。
副本同步
Kafka 使用异步复制机制来同步分区副本。Leader 副本接收消息后,将消息写入本地日志文件,然后通知 Follower 副本进行数据同步。
Follower 副本从 Leader 副本拉取数据,并将其写入本地日志文件。如果 Follower 副本在指定的时间内没有同步数据,它将被视为“落后”副本,Kafka 会尝试重新同步。
领导选举
Kudu 的 Raft 协议是如何工作的?如何通过 Raft 实现一致性和容错? - 面试鸭 - 程序员求职面试刷题神器
什么是 ZooKeeper 集群的脑裂现象?ZooKeeper 是如何解决脑裂问题的? - 面试鸭 - 程序员求职面试刷题神器
当 Leader 副本所在的 Broker 出现故障时,Kafka 会从 Follower 副本中选举一个新的 Leader 副本。
领导选举过程由 ZooKeeper(在 ZooKeeper 模式下)或 Kafka 自身的 KRaft 模式来协调。选举算法通常基于 Raft 协议,确保在分布式环境中能够快速、一致地选举出新的 Leader。
数据均衡
Kafka 提供了数据均衡机制,用于在 Broker 之间均匀分布分区副本。当集群的负载不均衡时,可以通过 Kafka 的工具(如 kafka-reassign-partitions )手动或自动地重新分配分区副本。
数据均衡可以提高集群的性能和可用性,确保每个 Broker 的负载相对均衡。
有自己部署过Kafka 集群吗,怎么配置
https://blog.csdn.net/weixin_53269650/article/details/142088476
配置kafka集群节点
复制默认配置config/server.properties文件,分别设置3个server1~3的配置文件
3个主要配置
# The id of the broker. This must be set to a unique integer for each broker.
修改节点id
broker.id=1
修改通信端口号
# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092
设置日志文件路径
# A comma separated list of directories under which to store log files
log.dirs=D:/tools/kafka_2.13-3.7.2/kafka-logs/kafka1/logs启动脚本
@echo off
title Kafka Cluster Starter
setlocal enabledelayedexpansion
:: 设置 KAFKA_HOME(请修改为你的 Kafka 安装路径)
set KAFKA_HOME=D:\tools\kafka_2.13-3.7.2
:: 启动 Kafka 集群
echo Starting Kafka Cluster...
:: 启动第一个 Kafka 节点
start "kafka1" %KAFKA_HOME%\bin\windows\kafka-server-start.bat %KAFKA_HOME%\config\server1.properties
echo Started Kafka Broker 1
timeout /t 10
:: 启动第二个 Kafka 节点
start "kafka2" %KAFKA_HOME%\bin\windows\kafka-server-start.bat %KAFKA_HOME%\config\server2.properties
echo Started Kafka Broker 2
timeout /t 10
:: 启动第三个 Kafka 节点
start "kafka3" %KAFKA_HOME%\bin\windows\kafka-server-start.bat %KAFKA_HOME%\config\server3.properties
echo Started Kafka Broker 3
echo.
echo Kafka cluster is starting...
echo Please wait for all brokers to initialize...
timeout /t 10
:: 验证集群状态
echo Checking cluster status...
call %KAFKA_HOME%\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list
echo.
echo Kafka cluster startup completed.
endlocalKafka 中 Zookeeper 的作用? - 消息队列面试题 - 面试鸭 - 程序员求职面试刷题神器
配置zookeeper集群节点
复制默认配置config/zookeeper.properties文件,分别设置3个zookeeper1~3的配置文件
# the directory where the snapshot is stored.
指定 ZooKeeper 数据存储的目录。
dataDir=D:/tools/kafka_2.13-3.7.2/zookeeper-logs/zk1
# the port at which the clients will connect
指定 ZooKeeper 客户端连接的端口
clientPort=2181
# 集群配置
初始化连接的超时时间(以 tickTime 为单位)
initLimit=5
Follower 与 Leader 同步的超时时间(以 tickTime 为单位)。
syncLimit=2
tickTime=2000
定义 ZooKeeper 集群中的节点信息,【集群内部通信的端口(Follower 与 Leader 之间的通信):选举 Leader 的端口(用于 ZooKeeper 的 Leader 选举机制)】
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890启动脚本
@echo off
title ZooKeeper Cluster Starter
:: 设置 KAFKA_HOME
set KAFKA_HOME=D:\tools\kafka_2.13-3.7.2
:: 启动 ZooKeeper 集群
start "zookeeper1" %KAFKA_HOME%\bin\windows\zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper1.properties
timeout /t 5
start "zookeeper2" %KAFKA_HOME%\bin\windows\zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper2.properties
timeout /t 5
start "zookeeper3" %KAFKA_HOME%\bin\windows\zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper3.properties
echo ZooKeeper cluster is starting...
echo Please wait...
timeout /t 10
echo ZooKeeper cluster has been started.单个消费者处理不过来,怎么处理
通过消费组订阅同一个topic里面的消息,使用多个消费组进行消费,通过offset防止重复消费
kafka怎么确定这个线程是去消费哪里的消息,怎么去拉取消息的
初始化偏移量(Offset)
首次消费:当消费者首次启动时,它需要确定从哪个偏移量开始消费。
Kafka 提供了多种策略来初始化偏移量:
auto.offset.reset :这是一个消费者配置参数,用于指定在没有初始偏移量或偏移量无效时的行为。
latest :从最新的消息开始消费(默认值)。消费者将跳过所有已存在的消息,从当前时间点开始消费新消息。
earliest :从最早的消息开始消费。消费者将从分区的最开始位置读取所有消息。
none :如果找不到有效的偏移量,抛出异常。
已有偏移量:如果消费者之前已经消费过消息,Kafka 会从 __consumer_offsets 主题中获取最后提交的偏移量,并从该偏移量的下一个位置开始消费。
分区分配
消费者组(Consumer Group):消费者属于一个消费者组,Kafka 会根据消费者组的成员和订阅的 Topic 来分配分区。
拉取消息的机制
消费者轮询(Poll)
Kafka 消费者采用 拉取(Poll)模式,而不是推送模式。消费者通过调用 poll() 方法主动向 Kafka Broker 请求消息。
拉取请求的发送
消费者通过 client.send() 方法异步发送拉取请求到 Kafka Broker。
请求参数:
FetchRequest :包含要拉取的分区信息、当前偏移量、最大拉取字节数等参数。
max.poll.records :一次 poll() 调用返回的最大消息数量(默认为 500)。
fetch.max.bytes :一次拉取操作的最大字节数(默认为 50MB)。
消息的接收和缓存
接收响应:当 Kafka Broker 处理完拉取请求后,会将消息返回给消费者。消费者将这些消息存储在本地缓存区( completedFetches )中。
缓存区管理:消费者从缓存区中解析消息,并逐条处理。处理完成后,消费者会更新每个分区的偏移量( nextFetchOffset )。
消息处理:消费者通过回调方法(如 ConsumerRecord )逐条处理消息。
处理完成后,消费者可以选择提交偏移量( commit ),以记录消息的消费进度。
提交偏移量
自动提交
手动提交
多个消费者消费同一个队列里面的消息,怎么解决重复消费的问题
手动提交偏移量,消费者在处理完一批消息后,手动提交消费偏移量到 Kafka 集群。这样可以确保消费者在下一次启动时能够从上次提交的位置继续消费。
幂等性消费:幂等性消费是指消费者对同一条消息进行多次处理时,结果与处理一次相同。
使用唯一标识符,Redis 去重:使用 Redis 的 SET 操作记录已处理的消息标识符。如果标识符已存在,则跳过处理。
消费者组(Consumer Group)管理
Kafka 允许多个消费者以消费者组的形式同时消费同一个主题的消息。每个消费者组都有唯一的消费者组 ID,并且每个消费者在消费时只能消费属于该消费者组的某个分区中的消息。这样可以避免重复消费。
有使用过别的消息队列吗?
怎么通过quartz实现的执行定时器
初始化数据表
默认的sql创建语句
定义任务类
实现 org.quartz.Job 接口,并重写 execute 方法
配置jobdetail,设置类class,名称,是否分组名称
明细设置到对应的触发器,触发器设置cron表达式
调度器会将任务和触发器注册到内部的调度表中,并根据 Cron 表达式计算出的下一次执行时间,将任务放入延迟队列中。
延迟队列:调度器使用一个优先级队列来管理任务的执行时间。队列中的任务按照执行时间排序,调度器会定期检查队列,取出即将执行的任务。
任务执行:当任务的执行时间到达时,调度器会从队列中取出任务,并调用任务的 execute 方法来执行任务。
多个服务部署怎么保证只有一个定时任务执行
quartz集群模式通过共享的数据库表,通过数据库行锁加锁,保证定时任务在同一时刻只有一个服务运行
平时开发过程中事务是怎么处理的
执行一个service方法,通过一个通用的方法,需要设置请求参数,执行的方法名称,上下文等,通过反射执行对应的方法
事务通过TransactionTemplate spring的事务管理器统一管理
介绍一下AOP和IOC
AOP:面向切面,将业务无关的逻辑抽取出来,关注切面类(具体需要处理的逻辑),连接点(执行时机,方法调用/异常),通知(前置,后置),切入点(应用在哪些类或方法)
IOC:控制反转,把对象的交给spring容器管理
AOP的实现方式
动态代理:通过代理模式,生成目标对象的代理对象。分为JDK动态代理(接口),CGLIB动态代理(子类继承)
编译时织入:通过修改字节码实现,AspectJ
类加载时织入:通过修改字节码实现,AspectJ
CGLIB和JDK动态代理的区别
JDK 动态代理和 CGLIB 动态代理有什么区别? - 面试鸭 - 程序员求职面试刷题神器
CGLIB:生成目标类的子类,注意final修改的无法的代理
DK动态代理:基于接口
如果要做一个乐观锁怎么实现
获取数据不加锁,更新数据加锁,通过版本号对比,更新时版本号不一致就报错
java8 lambda表达式是怎么实现的
核心机制是 函数式接口 和 方法引用
函数式接口
@FunctionalInterface
public interface Consumer<T> {
void accept(T t);
}Lambda 表达式可以被看作是函数式接口的一个匿名实现
Consumer<String> printConsumer = (String s) -> System.out.println(s);方法引用
方法引用是 Lambda 表达式的一种特殊形式,它允许直接引用已有的方法。
Consumer<String> printConsumer = System.out::println;java8 stream流为什么可以toMap,toList,多个链式调用的执行流程
链式调用是通过方法的返回值来实现的
当一个方法返回一个对象时,你可以继续在这个对象上调用其他方法。
List<String> names = Arrays.asList("Alice", "Bob", "Charlie");
names.stream() // 创建流
.filter(name -> name.startsWith("A")) // 过滤以 "A" 开头的名字
.map(String::toUpperCase) // 将名字转换为大写
.forEach(System.out::println); // 打印每个名字链式调用的执行流程是从左到右的。每个方法调用都会返回一个新的对象,然后在该对象上调用下一个方法。
多线程的含义
并行:多个线程同时执行
并发:多个线程争抢同一个资源
有用过的设计模式
线程池的核心参数,解释含义
你了解 Java 线程池的原理吗? - Java 并发面试题 - 面试鸭 - 程序员求职面试刷题神器
Java 线程池核心线程数在运行过程中能修改吗?如何修改? - Java 并发面试题 - 面试鸭 - 程序员求职面试刷题神器
关键参数:
核心线程数
可以动态修改
ThreadPoolExecutor.setCorePoolSize(int corePoolSize),修改核心线程数不会中断现有任务,新任务会生效;如果当前线程池中的数量超过核心线程数,多余的线程会在空闲时被回收
最大线程数
可以动态修改
setMaximumPoolSize(int maximumPoolSize)空闲存活时间
超过时间的非核心线程会被销毁
工作队列
- LinkedBlockingQueue:链表结构的阻塞队列,大小无限
- SynchronousQueue:不存储任务,直接将任务提交给线程
- ArrayBlockingQueue:数组结构的有界阻塞队列
- PriorityBlockingQueue:带优先级的无界阻塞队列
线程工厂
任务拒绝处理器
- AbortPolicy:当任务队列满且没有线程空闲,直接抛出异常,默认的拒绝策略。适合需要通知执行失败
- CallerRunsPolicy:当任务队列满且没有线程空闲,由调用者线程执行。适合通过减缓任务提交来稳定系统
- DiscardOldestPolicy:当任务队列满且没有线程空闲,会删除最早的任务,重新提交当前任务。适合丢弃最旧的任务以保证新的重要任务能够被处理
- DiscardPolicy:直接丢弃当前提交的任务,不会执行任何操作,也不会抛出异常。适合对部分任务丢弃没有影响
- 自定义拒绝策略,实现RejectedExecutionHandler接口,重写rejectedExecution
工作原理
任务提交,线程池线程数未达到核心线程数

核心线程数已满,任务队列未满

核心线程数已满,任务队列已满

线程池中线程数已达最大线程数

如果队列使用LinkedBlockingQueue,那不是任务队列不会满,永远不会创建非核心线程数
LinkedBlockingQueue默认最大为Integer.MAX_VALUE,这么大的数量不可能一直把线程放到任务队列,否则服务器内存会被打爆,必须合理设置
任务队列设置多少合适,怎么监控线程池的运行状态,去修改核心线程数
通过监控每个线程运行的平均耗时,和新线程进来的时间(比如监控任务队列每秒的增量)
ThreadPoolExecutor的钩子函数beforeExecute,afterExecute,重写该方法
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
// 用于存储任务的开始时间
private final ConcurrentHashMap<Runnable, Long> taskStartTimeMap = new ConcurrentHashMap<>();
// 用于统计任务执行时间
private final AtomicLong totalExecutionTime = new AtomicLong(0);
public MonitoredThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
// 记录任务的开始时间
taskStartTimeMap.put(r, System.currentTimeMillis());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
// 获取任务的开始时间
Long startTime = taskStartTimeMap.remove(r);
if (startTime != null) {
long endTime = System.currentTimeMillis();
long executionTime = endTime - startTime;
// 累加任务的执行时间
totalExecutionTime.addAndGet(executionTime);
System.out.println("Task " + r.hashCode() + " executed in " + executionTime + " ms");
}
}
// 获取总执行时间
public long getTotalExecutionTime() {
return totalExecutionTime.get();
}
}AOP 切面监控,Aspect
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
@Aspect
@Component
public class ThreadPoolMonitorAspect {
@Pointcut("execution(* java.util.concurrent.ThreadPoolExecutor.execute(..))")
public void threadPoolExecute() {}
@Around("threadPoolExecute()")
public Object monitorThreadPoolExecute(ProceedingJoinPoint joinPoint) throws Throwable {
long startTime = System.currentTimeMillis();
Object result = joinPoint.proceed(); // 执行原始方法
long endTime = System.currentTimeMillis();
long executionTime = endTime - startTime;
System.out.println("Task executed in " + executionTime + " ms");
return result;
}
}核心线程数一般根据什么来确定
如何合理地设置 Java 线程池的线程数? - 面试鸭 - 程序员求职面试刷题神器
CPU密集型任务
单纯计算,不涉及I/O操作,【=CPU核心数 + 1】
I/O密集型任务
很多I/O操作,文件读取,数据库读取,无法利用CPU,【 =CPU核心数 * 2】
守护线程和普通线程的区别
用户线程(普通线程):是我们正常通过继承 Thread 或实现 Runnable / Callable 接口等方式创建的线程,也是线程池默认创建的线程类型。用户线程通常用于执行业务逻辑等主要任务。
守护线程(Daemon Thread):是一种特殊的线程,它的存在不会阻止 Java 虚拟机(JVM)的关闭。守护线程通常用于为用户线程提供服务,如垃圾回收、定时任务等。
如果想实现一个优先级的线程池,有什么方案
任务队列修改为优先级队列(PriorityBlockingQueue),任务类需要实现 Comparable 接口,或者在创建 PriorityBlockingQueue 时提供一个 Comparator
hibernate和mybatis的区别
MyBatis 与 Hibernate 有哪些不同? - 面试鸭 - 程序员求职面试刷题神器
mybatis
半ORM框架,需要写SQL,可灵活根据条件优化SQL
hibernate
ORM框架,通过面向对象的角度操作数据库
提供HQL语句,面向对象的查询语句
提供Criteria接口API可以实现复杂sql
你是通过什么标准来决定是使用hibernate和mybatis的
单表或主从结构的简单关联,或只有简单的增删改查可以考虑使用mybatis,可以手动优化SQL
如果设计复杂的对象关系映射可以使用hibernate,如果关联复杂且数据量大,对执行速率有要求的化,用hibernate有时候查询会很慢
如果有一个主从结构的中间表,hibernate和mybatis通过什么方式来接收这个中间表的数据
Hibernate
Hibernate 支持 @OneToMany 、 @ManyToOne 和 @ManyToMany 注解来定义关联关系。
@Entity
public class User {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String username;
@ManyToMany
@JoinTable(
name = "user_role",
joinColumns = @JoinColumn(name = "user_id"),
inverseJoinColumns = @JoinColumn(name = "role_id")
)
private Set<Role> roles = new HashSet<>();
}
@Entity
public class Role {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String name;
@ManyToMany(mappedBy = "roles")
private Set<User> users = new HashSet<>();
}Hibernate 会自动处理中间表的查询和映射。
如果需要单独查询这个表,可通过JoinTable-name直接查询
MyBatis
在 MyBatis 中,需要手动编写 SQL 语句,并使用 <resultMap> 来定义结果映射。MyBatis 提供了 <association> 和 <collection> 标签来处理关联关系。
<resultMap id="UserResultMap" type="User">
<id property="id" column="user_id"/>
<result property="username" column="username"/>
<collection property="roles" ofType="Role" resultMap="RoleResultMap"/>
</resultMap>
<resultMap id="RoleResultMap" type="Role">
<id property="id" column="role_id"/>
<result property="name" column="role_name"/>
</resultMap>
<select id="getUserWithRoles" resultMap="UserResultMap">
SELECT u.id as user_id, u.username, r.id as role_id, r.name as role_name
FROM users u
LEFT JOIN user_role ur ON u.id = ur.user_id
LEFT JOIN roles r ON ur.role_id = r.id
WHERE u.id = #{id}
</select>如果是需要一个统计的count(*)之类的统计字段,hibernate和mybatis怎么接收
Hibernate
使用HQL
使用 Criteria API 查询
使用原生SQL,通过Object数组接收
MyBatis
使用 Mapper XML,使用 resultType 指定返回类型为 java.lang.Integer 或 java.lang.Long 。
使用 resultMap来映射结果。
callback机制是怎么实现的,有什么作用
方法调用通过统一的方法调用对应的service方法,并设置前置后置方法,是否忽略错误(通过是否抛出异常),是否异步(是否通过另外的线程去执行)
这个callback信息配置在对应的数据表中,每次执行方法时查询对应的service是否有绑定的callback
callback作为bean注册到spring容器中,去进行配置
比如当生产工单完成后,需要进行上报,或者提醒某些人,可以通过单独的callback去开发,灵活扩展,不用和业务代理强关联
mysql或oracle的redis的区别
mysql或oracle
关系型数据库,基于表格、行和列的结构存储数据,数据以表格形式组织,表之间通过外键等关系进行关联
redis
数据以键值对的形式存储,每个键对应一个值,值可以是简单类型或复杂数据结构。如字符串(strings)、哈希(hashes)、列表(lists)、集合(sets)、有序集合(sorted sets)等。
redis单个命令是原子性的,如果想多个命令也保证原子性,怎么办
使用lua脚本,把多个命令打包成一个命令执行
redis集群的数据是怎么存储的,存储流程
Redis 集群的实现原理是什么? - 面试鸭 - 程序员求职面试刷题神器
哈希槽

为什么使用hash槽来存储,有什么作用
数据分片:每个 Redis 节点负责一部分哈希槽,从而将数据分布到多个节点上,实现水平扩展。
负载均衡:通过合理分配哈希槽,每个节点承担的读写请求量大致相同,避免了某些节点过载而其他节点闲置的情况。
高可用性:每个主节点负责一部分哈希槽,其从节点作为备份。当主节点发生故障时,从节点可以自动升级为主节点,继续提供服务。
有了解redis的持久化机制吗?为了解决什么问题
Redis 的持久化机制有哪些? - 面试鸭 - 程序员求职面试刷题神器
RDB:生成某一时刻的数据快照
AOF:每个写操作追加到日志文件
你使用到的es主要做什么业务
搜索文件
es是怎么进行搜索的
Elasticsearch 底层如何执行文档的更新和删除操作?详细流程是什么? - ElasticSearch 面试题 - 面试鸭 - 程序员求职面试刷题神器
Elasticsearch 的搜索流程可以分为两个主要阶段:Query 阶段 和 Fetch 阶段。
Query 阶段
主要负责确定哪些文档满足查询条件
解析查询语句:协调节点(Coordinating Node)接收到客户端的查询请求后,解析查询语句,确定需要访问的分片(Shard)。
分发查询请求:协调节点将查询请求分发到各个数据节点(Data Node),每个数据节点负责处理其上的分片。
执行倒排索引查询:数据节点使用 Lucene 的倒排索引(Inverted Index)来快速查找满足条件的文档 ID 集合。
返回中间结果:每个数据节点将查询结果(文档 ID 集合)返回给协调节点。
Fetch 阶段
主要负责获取满足条件的文档内容,并进行排序和分页等处理。
聚合结果:协调节点将来自各个数据节点的中间结果进行聚合,生成最终的文档 ID 集合。
获取文档内容:协调节点根据文档 ID 集合,从数据节点中获取完整的文档内容。
排序和分页:协调节点对获取的文档内容进行排序和分页处理,生成最终的搜索结果。
返回结果:协调节点将最终的搜索结果返回给客户端。
es的存储结构是什么
Elasticsearch 底层是如何实现数据存储的?比如数据的存储流程和管理机制 - ElasticSearch 面试题 - 面试鸭 - 程序员求职面试刷题神器
倒排索引结构
[‘xxx’,'xxx'] -> [id1,di2]
es分词策略是什么,不同的分词策略对中文和英文分词有什么区别
Analyzer(分析器)
Tokenizer(分词器):负责将原始文本切割成 tokens。
Token Filters(令牌过滤器):对分词器产生的 tokens 进行进一步处理,如大小写转换、去除停用词、词干提取等。
Char Filters(字符过滤器):在分词之前对原始文本进行预处理,如去除 HTML 标签、转义特殊字符等。
分词策略的类型
标准分词(Standard Tokenization)
语言分词(Language Tokenization)
中文和英文分词的区别
英文分词方式:英文单词之间有明显的分隔符(如空格、标点符号),分词相对简单。
中文分词方式:中文没有明显的分隔符,需要使用复杂的分词算法来识别词语边界。
英文分词器
Standard Tokenizer:默认分词器,适用于大多数英文文本。
English Analyzer:专门针对英文的分词器,支持词性还原和词干提取。
中文分词器
IK Analyzer:开源的中文分词器,支持多粒度分词和自定义词典。
es的存储结构和redis有什么区别
redis
键值存储:Redis 是一个键值存储系统,支持多种数据结构,如字符串(Strings)、哈希(Hashes)、列表(Lists)、集合(Sets)、有序集合(Sorted Sets)等。
存储方式:数据以键值对的形式存储在内存中,键是唯一的标识符,值可以是简单的字符串,也可以是复杂的数据结构。
Elasticsearch
文档存储:Elasticsearch 是一个分布式搜索引擎,以文档的形式存储数据。每个文档都是一个结构化的 JSON 对象。
存储方式:文档存储在索引(Index)中,索引被划分为多个分片(Shard),每个分片是一个独立的 Lucene 索引。分片分布在不同的节点上,实现水平扩展。
接口安全性怎么保证
AK/SK认证机制
如何接口防重放,防篡改,防劫持
防重放:时间戳+请求唯一id redis存储每次请求id 过期时间和时间戳保持一致
防篡改:请求参数 散列加密生成签名
防劫持:请求参数对称加密
AK/SK认证机制概述
AK (Access Key): 访问标识,相当于用户名
SK (Secret Key): 密钥,用于生成签名的密钥,必须保密
// 1. 获取请求头中的关键参数
String accessKey = headers.getFirst("accessKey"); // 访问密钥
String secretKey = user.getSecretKey(); // 密钥(从用户信息中获取)
String timestamp = headers.getFirst("timestamp"); // 时间戳
String sign = headers.getFirst("sign"); // 请求签名
String body = headers.getFirst("body"); // 请求体复制❌!已复制!accessKey/secretKey创建用户时生成:
/**
* Return a hexadecimal string representation of the MD5 digest of the given bytes.
* @param bytes the bytes to calculate the digest over
* @return a hexadecimal digest string
*/
public static String md5DigestAsHex(byte[] bytes) {
return digestAsHexString(MD5_ALGORITHM_NAME, bytes);
}
/**
* 盐值,混淆密码
*/
String SALT = "api";
/**
* ak/sk 混淆
*/
String VOUCHER = "accessKey_secretKey";
// ak/sk
String accessKey = DigestUtils.md5DigestAsHex((userAccount + UserConstant.SALT + UserConstant.VOUCHER).getBytes());
String secretKey = DigestUtils.md5DigestAsHex((UserConstant.SALT + UserConstant.VOUCHER + userAccount).getBytes());复制❌!已复制!安全校验步骤
1.参数完整性校验
if (StringUtils.isAnyBlank(body, sign, accessKey, timestamp)) {
throw new BusinessException(ResponseCode.FORBIDDEN_ERROR);
}复制❌!已复制!2.防重放攻击
/**
* 五分钟过期时间
*/
private static final long FIVE_MINUTES = 5L * 60;
long currentTime = System.currentTimeMillis() / 1000;
if (currentTime - Long.parseLong(timestamp) >= FIVE_MINUTES) {
throw new BusinessException(ResponseCode.NOT_LOGIN_ERROR, "会话已过期,请重试!");
}复制❌!已复制!3.AK验证
//通过访问密钥获取用户信息
user = dubboUserService.getInvokeUserByAccessKey(accessKey);
if (!user.getAccessKey().equals(accessKey)) {
throw new BusinessException(ResponseCode.NO_AUTH_ERROR, "请先获取请求密钥");
}复制❌!已复制!4.签名验证
public static String getSign(String body, String secretKey) {
return MD5.create().digestHex(JSONUtil.toJsonStr(body) + '.' + secretKey);
}
if (!SignUtil.getSign(body, user.getSecretKey()).equals(sign)) {
throw new BusinessException(ResponseCode.NO_AUTH_ERROR, "非法请求");
}复制❌!已复制!签名生成原理
- 客户端使用相同的算法,用 body 和 SK 生成签名
- 服务端用相同的参数和算法生成签名,与客户端传来的签名比对
- 签名一致才说明:
请求确实来自持有正确 SK 的客户端
请求参数在传输过程中未被篡改
安全特点
防篡改:任何参数被修改都会导致签名验证失败
防重放:使用时间戳确保请求的时效性
身份认证:通过 AK/SK 确保调用者身份
密钥安全:SK 不在网络传输,只用于本地签名
最佳实践
SK 必须妥善保管,不能泄露
时间戳应使用标准时间
签名算法要足够安全(通常使用 HMAC-SHA256 等)
关键操作要使用 HTTPS
https和http的区别
HTTP 和 HTTPS 有什么区别? - 计算机网络面试题 - 面试鸭 - 程序员求职面试刷题神器
| 数据传输 | 端口号 | 性能 | SEO | |
|---|---|---|---|---|
| HTTP | 明文 | 80 | 无加密过程,连接建立速度稍快 | 搜索引擎会降低未加密站点的排名 |
| HTTPS | SSL/TLS加密传输 | 443 | 多了SSL/TLS实现加密增加计算开销,握手时间较长 | 搜索引擎会优先展示HTTPS网站 |
https怎么保证安全性
HTTPS 提供了数据加密、身份验证和完整性保护,确保数据在传输过程中的安全性
防窃听
信息加密:混合加密(对称加密+非对称加密)
1.通信建立前使用非对称加密交换会话密钥
2.通信过程使用对称加密的会话密钥加密明文数据
防篡改
校验机制,摘要算法确保数据完整性
内容计算出一个指纹,和内容一起发送给对方
公钥加密,私钥解密:只有拥有私钥的人才能机密,保证内容安全
私钥加密,公钥解密:如果公钥能正常解密出私钥加密的内容,证明消息的来源是这个有私钥的人发送的
防冒充
身份证书,服务器公钥加入数字证书
这个公/私钥可以被冒充,为了解决这个问题,就需要【个人信息+公钥+数字签名】打包成一份证书
需要验证时,通过公钥去解密,如果解密成功,就证明这个证书是合法注册过的
TLS用到的证书包含哪些内容
证书主题信息(域名,组织名称)
证书颁发者信息(颁发机构名称)
公钥信息(公钥,公钥算法)
签名算法(用于生成证书签名的算法)
数字签名(证书颁发机构使用其私钥对证书内容进行签名的密文)
证书有效期
证书序列号(证书的唯一标识符)
TLS用到的证书和密钥在网络请求中有什么作用
证书:用于验证服务器的身份,防止中间人攻击,并用于加密对称密钥的协商。
密钥:用于加密和解密数据,确保数据在传输过程中的安全性和完整性。
TLS 握手过程中的证书和密钥的作用
请解释 HTTP 和 HTTPS 之间的区别及工作原理。 - 面试鸭 - 程序员求职面试刷题神器

spring boot starter怎么实现的,实现步骤
API 开放平台-开发sdk(spring-boot-starter)
有了解过java的响应式API?
spring boot starter怎么知道需要什么参数
自动配置(SPI),spring.factories
@Data
@NoArgsConstructor
@ConfigurationProperties(prefix = "api.client")
public class ApiClient {
/**
* 访问密码
*/
private String accessKey;
/**
* 安全密钥
*/
private String secretKey;
/**
* 网关地址
*/
private String gatewayHost = "http://xiaofei.site:9000";
public ApiClient(String accessKey, String secretKey) {
this.accessKey = accessKey;
this.secretKey = secretKey;
}
}spring-configuration-metadata.json配置项,配置提示
{
"groups": [
{
"name": "api.client",
"type": "site.xiaofei.apisdk.client.ApiClient",
"sourceType": "site.xiaofei.apisdk.client.ApiClient"
}
],
"properties": [
{
"name": "api.client.access-key",
"type": "java.lang.String",
"description": "访问密码",
"sourceType": "site.xiaofei.apisdk.client.ApiClient"
},
{
"name": "api.client.gateway-host",
"type": "java.lang.String",
"description": "网关地址",
"defaultValue": "http://localhost:9000",
"sourceType": "site.xiaofei.apisdk.client.ApiClient"
},
{
"name": "api.client.secret-key",
"type": "java.lang.String",
"description": "安全密钥",
"sourceType": "site.xiaofei.apisdk.client.ApiClient"
}
],
"hints": []
}spring boot starter怎么实现的一行代码实现调用
通过统一的ApiClient里面的AK/SK
每一个对应的方法都是一个远程调用,就像调用本地方法一样
springcloud主要组件,哪些组件是必须的
服务注册发现(必须):nacos,Eureka
负载均衡:Ribbon
断路器:Hystrix
远程调用(必须):fegin,Dubbo
Gateway:API 网关
Config:配置管理
nginx怎么配置负载均衡
配置项stream指定负载的服务器:IP,可指定优先级
nginx负载均衡的常用策略,怎么各自的使用场景
负载均衡算法有哪些? - 面试鸭 - 程序员求职面试刷题神器
轮询(Round Robin):后端服务器处理能力差不多
加权轮询:性能高的服务器权重更高
随机:后端服务器被选中的概率不一致
加权随机:考虑服务器性能的随机算法
最少连接:选择当前连接最少的服务器
哈希算法:根据特定关键字进行负载均衡,如URL,IP地址,如果使用session,同一个用户多次请求到同一台服务器
有遇到CPU过高或内存不足的问题吗
服务器无法处理请求,nginx代理其中一台服务器处理不了请求,直接访问也打不开页面
1.日志查看出现GC
jmap -dump 查看是否内存溢出
发现某个对象频繁创建2.查看进程占用高达1400%
top -Hp
printf %\n
jstack 线程 获取堆栈信息
发现某个方法一直在执行,分析可能是数据量大/数据库死锁这个方法是统计报表用到,每个月底会执行,数据每天20w作业,统计实验室的温度,湿度;获取数据湖的数据
本地debug 直接卡死,GC
多线程并行分批处理,发现一天一天执行时间还是10分钟左右
排查sql问题,sql执行1-3s
方法里面打印执行时间,发现查询数据慢
可能是sql字段过多,映射实体较慢
修改xml指定具体字段,换指定jdbc类型和java类型转换
发现问题还是没有解决
可能是mybatis默认的反射(运行时动态解析)填充字段慢
修改为构造器(编译时,调用类的构造函数)方式映射
修改完执行时间减少到1-2分钟(每一天的数据)
怎么排查网络抖动导致的问题/怎么判断不是我们服务器的问题
请求是否到达服务器
请求到达网关,没有转发的具体的tomcat服务器,无日志
从网络角度来看,用户从输入网址到网页显示,期间发生了什么? - 计算机网络面试题 - 面试鸭 - 程序员求职面试刷题神器
- 检查网络连接:使用 ping 、 traceroute 和 mtr 命令检测网络延迟和丢包情况。
- 检查服务器性能:查看服务器的资源使用情况和日志文件。
- 检查应用程序日志:查看应用程序的日志文件,使用 Arthas 工具监控和诊断应用程序。
- 检查网络配置:检查服务器的网络配置和防火墙规则
- 使用网络监控工具:使用 Wireshark 和 tcpdump 捕获和分析网络流量。
- 判断是否为服务器问题:排除网络问题,检查服务器性能和应用程序。
gateway 还是网络层的网关 如果是网络层的话 可以用 ping telnet nc 等命令查看网络联通性 如果是微服务,那么就要先看注册中心的服务是不是有问题了
网络不可达
比如微服务机器的ip有多个,注册了一个不可达的内部Ip ,网关过不来
网络抖动
网络抖动的话 多试几次 是好是坏就可能是这个原因 时好时坏还有一种可能性就是注册的微服务节点有一部分是好的一部分是坏的,如果是这种情况 ,那么时好时坏是一个非常有规律的现象
项目是怎么打包及发布的
使用Linux脚本,nginx负载均衡切换发布
贡献者
更新日志
fb8bc-更新为vuepress于