前言
本章学习Sentinel Dashboard相关源码。
Sentinel 控制台是流量控制、熔断降级规则统一配置和管理的入口,它为用户提供了机器自发现、簇点链路自发现、监控、规则配置等功能。在 Sentinel 控制台上,我们可以配置规则并实时查看流量控制效果。

重点关注几个问题:
Dashboard如何发现客户端?为什么客户端必须有资源请求经过,才能有数据展示?
Dashboard如何得到客户端的规则?
Dashboard如何更新客户端的规则?
Dashboard如何存储规则?
一、客户端接入
客户端接入Dashboard需要做两件事情。
第一步,新增sentinel-transport-simple-http依赖:
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
</dependency>
第二步,配置:
指定Dashboard地址和端口:
-Dcsp.sentinel.dashboard.server=127.0.0.1:8080
指定当前应用名称:
-Dproject.name=xxx
指定客户端暴露给Dashbord调用的端口(默认是 8719):
-Dcsp.sentinel.api.port=8720
二、Dashboard服务发现
客户端启动后,Dashboard不能直接发现应用,必须要应用里有请求经过才行(SpringMVC),这是为什么呢?
1、服务注册表
先从Dashboard服务端出发,看看Dashboard的注册表在哪。
Sentinel展示应用列表,走* /app/briefinfos.json* 。
@RestController
@RequestMapping(value = "/app")
public class AppController {
@Autowired
private AppManagement appManagement;
@GetMapping("/briefinfos.json")
public Result<List<AppInfo>> queryAppInfos(HttpServletRequest request) {
List<AppInfo> list = new ArrayList<>(appManagement.getBriefApps());
Collections.sort(list, Comparator.comparing(AppInfo::getApp));
return Result.ofSuccess(list);
}
}
@Component
public class AppManagement implements MachineDiscovery {
@Autowired
private ApplicationContext context;
private MachineDiscovery machineDiscovery;
@PostConstruct
public void init() {
machineDiscovery = context.getBean(SimpleMachineDiscovery.class);
}
@Override
public Set<AppInfo> getBriefApps() {
return machineDiscovery.getBriefApps();
}
}
MachineDiscovery管理Dashboard发现的所有客户端。
@Component
public class SimpleMachineDiscovery implements MachineDiscovery {
// 应用名称 - 应用信息(包含多个实例MachineInfo)
private final ConcurrentMap<String, AppInfo> apps = new ConcurrentHashMap<>();
// 应用注册
@Override
public long addMachine(MachineInfo machineInfo) {
AssertUtil.notNull(machineInfo, "machineInfo cannot be null");
AppInfo appInfo = apps.computeIfAbsent(machineInfo.getApp(), o -> new AppInfo(machineInfo.getApp(), machineInfo.getAppType()));
appInfo.addMachine(machineInfo);
return 1;
}
// 查找应用
@Override
public Set<AppInfo> getBriefApps() {
return new HashSet<>(apps.values());
}
}
反向找到addMachine的调用入口,是Dashboard暴露的/registry/machine方法。
@Component
public class AppManagement implements MachineDiscovery {
private MachineDiscovery machineDiscovery;
@Override
public long addMachine(MachineInfo machineInfo) {
return machineDiscovery.addMachine(machineInfo);
}
}
@Controller
@RequestMapping(value = "/registry", produces = MediaType.APPLICATION_JSON_VALUE)
public class MachineRegistryController {
@Autowired
private AppManagement appManagement;
@ResponseBody
@RequestMapping("/machine")
public Result<?> receiveHeartBeat(String app, @RequestParam(value = "app_type", required = false, defaultValue = "0") Integer appType, Long version, String v, String hostname, String ip, Integer port) {
// ...
try {
// ...
appManagement.addMachine(machineInfo);
return Result.ofSuccessMsg("success");
} catch (Exception e) {
logger.error("Receive heartbeat error", e);
return Result.ofFail(-1, e.getMessage());
}
}
}
2、心跳
接着向客户端继续反向搜索,定位到sentinel-transport-simple-http模块中的SimpleHttpHeartbeatSender。
// SimpleHttpHeartbeatSender.java
@Override
public boolean sendHeartbeat() throws Exception {
if (TransportConfig.getRuntimePort() <= 0) {
RecordLog.info("[SimpleHttpHeartbeatSender] Command server port not initialized, won't send heartbeat");
return false;
}
// 心跳发送,走dashboard tomcat的端口
// -Dcsp.sentinel.dashboard.server
Endpoint addrInfo = getAvailableAddress();
if (addrInfo == null) {
return false;
}
// /registry/machine
SimpleHttpRequest request = new SimpleHttpRequest(addrInfo, TransportConfig.getHeartbeatApiPath());
// 心跳请求参数
request.setParams(heartBeat.generateCurrentMessage());
try {
SimpleHttpResponse response = httpClient.post(request);
if (response.getStatusCode() == OK_STATUS) {
return true;
} else if (clientErrorCode(response.getStatusCode()) || serverErrorCode(response.getStatusCode())) {
RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addrInfo
+ ", http status code: " + response.getStatusCode());
}
} catch (Exception e) {
RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addrInfo, e);
}
return false;
}
sendHeartbeat根据-Dcsp.sentinel.dashboard.server配置的Dashboard地址,调用/registry/machine发送心跳,心跳报文如下:
public class HeartbeatMessage {
private final Map<String, String> message = new HashMap<String, String>();
public HeartbeatMessage() {
message.put("hostname", HostNameUtil.getHostName());
message.put("ip", TransportConfig.getHeartbeatClientIp());
message.put("app", AppNameUtil.getAppName());
// Put application type (since 1.6.0).
message.put("app_type", String.valueOf(SentinelConfig.getAppType()));
message.put("port", String.valueOf(TransportConfig.getPort()));
}
public Map<String, String> generateCurrentMessage() {
// Version of Sentinel.
message.put("v", Constants.SENTINEL_VERSION);
// Actually timestamp.
message.put("version", String.valueOf(TimeUtil.currentTimeMillis()));
message.put("port", String.valueOf(TransportConfig.getPort()));
return message;
}
}
port字段为客户端-Dcsp.sentinel.api.port设置的端口,默认8719,客户端暴露的这个端口会用于后续Dashboard上配置的规则推送;
app字段为注册表的key,对应客户端配置的-Dproject.name;
心跳发送间隔是多少?为什么只有客户端请求资源,才会发送心跳呢?
sentinel-transport-simple-http模块依赖的sentinel-transport-common模块中,提供了HeartbeatSenderInitFunc,它的init方法开启了心跳任务。
@InitOrder(-1)
public class HeartbeatSenderInitFunc implements InitFunc {
private ScheduledExecutorService pool = null;
private void initSchedulerIfNeeded() {
if (pool == null) {
pool = new ScheduledThreadPoolExecutor(2,
new NamedThreadFactory("sentinel-heartbeat-send-task", true),
new DiscardOldestPolicy());
}
}
@Override
public void init() {
HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
if (sender == null) {
RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");
return;
}
// 心跳线程池
initSchedulerIfNeeded();
// 心跳间隔
long interval = retrieveInterval(sender);
setIntervalIfNotExists(interval);
// 开启心跳任务
scheduleHeartbeatTask(sender, interval);
}
private void scheduleHeartbeatTask(final HeartbeatSender sender, long interval) {
pool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
sender.sendHeartbeat();
} catch (Throwable e) {
RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);
}
}
}, 5000, interval, TimeUnit.MILLISECONDS);
}
}
心跳间隔默认为10s,可以通过-Dcsp.sentinel.heartbeat.interval.ms设置。
// HeartbeatSenderInitFunc.java
long retrieveInterval(HeartbeatSender sender) {
// 优先取-Dcsp.sentinel.heartbeat.interval.ms
Long intervalInConfig = TransportConfig.getHeartbeatIntervalMs();
if (isValidHeartbeatInterval(intervalInConfig)) {
return intervalInConfig;
} else {
// 默认SimpleHttpHeartbeatSender.DEFAULT_INTERVAL = 10秒
long senderInterval = sender.intervalMs();
return senderInterval;
}
}
当用户代码调用Sentinel的核心API时,如SphU.entry方法,底层是用Env.sph实现的。
加载Env时,触发了所有InitFunc(通过SPI机制加载),包含HeartbeatSenderInitFunc。
所以只有客户端调用了Sentinel的核心API后,才能开始发送心跳给Dashboard,Dashboard页面上才能展示注册上来的应用。
public class Env {
public static final Sph sph = new CtSph();
static {
InitExecutor.doInit();
}
}
如果客户端没及时发送心跳,Dashboard会判定客户端下线。Dashboard是怎么做的呢?
Dashboard为注册表中每个实例记录了lastHeartbeat上次心跳时间。
默认60s内没收到实例心跳,认为实例isHealthy=false。
心跳超时时间可以通过sentinel.dashboard.unhealthyMachineMillis参数设置。
public class MachineInfo implements Comparable<MachineInfo> {
// 上次心跳时间戳
private long lastHeartbeat;
public boolean isHealthy() {
long delta = System.currentTimeMillis() - lastHeartbeat;
return delta < DashboardConfig.getUnhealthyMachineMillis();
}
}
三、规则增删改查
下面以流控规则为例,看看Dashboard的增删改查逻辑。
1、查询规则
GET /v1/flow/rules,查询规则。入参app, ip, port都是针对某个客户端实例。
@RestController
@RequestMapping(value = "/v1/flow")
public class FlowControllerV1 {
@Autowired
private InMemoryRuleRepositoryAdapter<FlowRuleEntity> repository;
@Autowired
private SentinelApiClient sentinelApiClient;
@GetMapping("/rules")
@AuthAction(PrivilegeType.READ_RULE)
public Result<List<FlowRuleEntity>> apiQueryMachineRules(@RequestParam String app,
@RequestParam String ip,
@RequestParam Integer port) {
try {
// #1
List<FlowRuleEntity> rules = sentinelApiClient.fetchFlowRuleOfMachine(app, ip, port);
// #2
rules = repository.saveAll(rules);
return Result.ofSuccess(rules);
} catch (Throwable throwable) {
logger.error("Error when querying flow rules", throwable);
return Result.ofThrowable(-1, throwable);
}
}
}
查询规则分为两步:
-
Dashboard查询规则,是从客户端实例实时获取的,SentinelApiClient用于与客户端通讯;
-
将实时查询结果,保存到内存中InMemoryRuleRepositoryAdapter。
InMemoryRuleRepositoryAdapter是个抽象类,实现了所有内存的增删改查逻辑。
public abstract class InMemoryRuleRepositoryAdapter<T extends RuleEntity> implements RuleRepository<T, Long> {
// 实例 - 规则id - 规则
private Map<MachineInfo, Map<Long, T>> machineRules = new ConcurrentHashMap<>(16);
// 规则id - 规则
private Map<Long, T> allRules = new ConcurrentHashMap<>(16);
// 应用名 - 规则id - 规则
private Map<String, Map<Long, T>> appRules = new ConcurrentHashMap<>(16);
abstract protected long nextId();
}
InMemoryRuleRepositoryAdapter只有一个抽象方法nextId,每个实现类都必须实现,用于存储规则时生成规则id。SentinelDashboard所有规则id是自增的。
@Component
public class InMemFlowRuleStore extends InMemoryRuleRepositoryAdapter<FlowRuleEntity> {
private static AtomicLong ids = new AtomicLong(0);
@Override
protected long nextId() {
return ids.incrementAndGet();
}
}
2、新增规则
POST /v1/flow/rule,新增规则分为两步:
-
保存规则到内存;
-
发布规则给客户端;
// FlowControllerV1.java
@PostMapping("/rule")
@AuthAction(PrivilegeType.WRITE_RULE)
public Result<FlowRuleEntity> apiAddFlowRule(@RequestBody FlowRuleEntity entity) {
Result<FlowRuleEntity> checkResult = checkEntityInternal(entity);
if (checkResult != null) {
return checkResult;
}
entity.setId(null);
Date date = new Date();
entity.setGmtCreate(date);
entity.setGmtModified(date);
entity.setLimitApp(entity.getLimitApp().trim());
entity.setResource(entity.getResource().trim());
try {
// #1
entity = repository.save(entity);
// #2
publishRules(entity.getApp(), entity.getIp(), entity.getPort()).get(5000, TimeUnit.MILLISECONDS);
return Result.ofSuccess(entity);
} catch (Throwable t) {
Throwable e = t instanceof ExecutionException ? t.getCause() : t;
logger.error("Failed to add new flow rule, app={}, ip={}", entity.getApp(), entity.getIp(), e);
return Result.ofFail(-1, e.getMessage());
}
}
Dashboard发布规则给客户端,并不是只发布刚新增的一个,而是将内存中所有规则都同步给客户端。
// FlowControllerV1.java
private CompletableFuture<Void> publishRules(String app, String ip, Integer port) {
List<FlowRuleEntity> rules = repository.findAllByMachine(MachineInfo.of(app, ip, port));
return sentinelApiClient.setFlowRuleOfMachineAsync(app, ip, port, rules);
}
客户端ModifyRulesCommandHandler处理所有规则变更,客户端除了将规则保存到XXXManager(内存)以外,还允许将规则写入另一个数据源,关于外部数据源下一章再看。
@CommandMapping(name = "setRules", desc = "modify the rules, accept param: type={ruleType}&data={ruleJson}")
public class ModifyRulesCommandHandler implements CommandHandler<String> {
private static final int FASTJSON_MINIMAL_VER = 0x01020C00;
@Override
public CommandResponse<String> handle(CommandRequest request) {
// ...
String type = request.getParam("type");
String data = request.getParam("data");
if (StringUtil.isNotEmpty(data)) {
try {
data = URLDecoder.decode(data, "utf-8");
} catch (Exception e) {
RecordLog.info("Decode rule data error", e);
return CommandResponse.ofFailure(e, "decode rule data error");
}
}
String result = "success";
if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {
List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);
// 写入内存,加载规则
FlowRuleManager.loadRules(flowRules);
// 写入外部存储
if (!writeToDataSource(getFlowDataSource(), flowRules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
}
// 其他规则...
return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
}
}
3、修改/删除规则
PUT /v1/flow/save.json,修改流控规则;
DELETE /v1/flow/delete.json,删除流控规则;
逻辑与新增规则差不多,都是先操作内存,然后发布规则给客户度。
// FlowControllerV1.java
@PutMapping("/save.json")
@AuthAction(PrivilegeType.WRITE_RULE)
public Result<FlowRuleEntity> apiUpdateFlowRule(...) {
entity = repository.save(entity);
publishRules(entity.getApp(), entity.getIp(), entity.getPort()).get(5000, TimeUnit.MILLISECONDS);
return Result.ofSuccess(entity);
}
@DeleteMapping("/delete.json")
@AuthAction(PrivilegeType.WRITE_RULE)
public Result<Long> apiDeleteFlowRule(Long id) {
repository.delete(id);
publishRules(oldEntity.getApp(), oldEntity.getIp(), oldEntity.getPort()).get(5000, TimeUnit.MILLISECONDS);
return Result.ofSuccess(id);
}
总结
Sentinel客户端每10s向Dashboard发送心跳。
心跳包中包含了本机ip和接收dashboard请求的port(-Dcsp.sentinel.api.port设置的端口,默认8719);
Dashboard默认60s没收到实例心跳,认为实例非健康。
Dashboard用内存存储规则,然而内存规则并不是靠定时器或心跳同步,而是靠用户在dashboard点击后,主动查询sentinel客户端,并同步到内存。
每次界面查询规则,都会返回客户端当前有的所有配置。
每次修改规则,Dashboard会先更新内存规则,然后发布规则给对应客户端。
客户端接收规则变更,加载到RuleManager使规则生效。

原文始发于微信公众号(程序猿阿越):Sentinel源码(十)Dashboard
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/214691.html