Skip to content

Commit

Permalink
search worker
Browse files Browse the repository at this point in the history
  • Loading branch information
ponfee committed Jan 24, 2024
1 parent 6305d7f commit 5c18389
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public AjaxResult updateOwnUser(@RequestParam("group") String group,
@RequiresPermissions(PERMISSION_CODE)
@GetMapping("/worker")
public String worker(@RequestParam("group") String group, ModelMap mmap) {
mmap.put("workers", serverMetricsService.workers(group));
mmap.put("workers", serverMetricsService.workers(group, null));
return PREFIX + "/worker";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package cn.ponfee.disjob.admin.controller;

import cn.ponfee.disjob.admin.util.PageUtils;
import cn.ponfee.disjob.common.base.Symbol.Str;
import cn.ponfee.disjob.supervisor.application.AuthorizeGroupService;
import cn.ponfee.disjob.supervisor.application.SchedGroupService;
import cn.ponfee.disjob.supervisor.application.ServerMetricsService;
Expand All @@ -24,6 +25,7 @@
import com.ruoyi.common.enums.BusinessType;
import com.ruoyi.system.service.ISysUserService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.springframework.stereotype.Controller;
import org.springframework.ui.ModelMap;
Expand Down Expand Up @@ -129,11 +131,18 @@ public AjaxResult doEdit(SchedGroupUpdateRequest req) {

@RequiresPermissions(PERMISSION_CODE)
@GetMapping("/worker")
public String worker(@RequestParam("group") String group, ModelMap mmap) {
public String worker(@RequestParam("group") String group,
@RequestParam(value = "worker", required = false) String worker,
ModelMap mmap) {
AuthorizeGroupService.authorizeGroup(getLoginName(), group);

if (StringUtils.isBlank(worker) || !worker.contains(Str.COLON)) {
worker = null;
}

mmap.put("group", group);
mmap.put("workers", serverMetricsService.workers(group));
mmap.put("worker", worker);
mmap.put("workers", serverMetricsService.workers(group, worker));
return PREFIX + "/worker";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,23 @@
<body class="gray-bg">
<div class="container-div">
<div class="row">
<div class="col-sm-12 search-collapse">
<form id="form-worker">
<div class="select-list">
<ul>
<li>
<label>Worker:</label>
<input type="text" name="worker" th:value="${worker}" placeholder="摘除后可输入`host:port`来查询" autocomplete="off" />
</li>
<li>
<a class="btn btn-primary btn-rounded btn-sm" onclick="searchWorker()"><i class="fa fa-search"></i>&nbsp;搜索</a>
<a class="btn btn-warning btn-rounded btn-sm" onclick="resetForm()"><i class="fa fa-refresh"></i>&nbsp;重置</a>
</li>
</ul>
</div>
</form>
</div>

<div class="btn-group-sm" id="toolbar" role="group">
<a th:if="${not #lists.isEmpty(workers)}" class="btn btn-success" onclick="modifyAllWorkerMaximumPoolSize()">
<i class="fa fa-edit"></i> 修改全部
Expand Down Expand Up @@ -224,7 +241,8 @@
};
$.operate.post(prefix + "/configure_one_worker", params, function (result) {
if (result.code === 0) {
location.reload();
//location.reload();
resetForm();
layer.close(index);
}
}, false);
Expand All @@ -244,11 +262,21 @@
$.modal.confirm("确认要 `" + msg + "` 吗?", function () {
$.operate.post(prefix + "/configure_one_worker", params, function (result) {
if (result.code === 0) {
location.reload();
resetForm();
}
}, false);
});
}

function searchWorker() {
const href = window.location.href;
window.location.href = href.split("\?")[0] + "?group=" + [[${group}]] + "&worker=" + $("input[name='worker']").val()
}

function resetForm() {
$("input[name='worker']").val("");
searchWorker();
}
</script>

</body>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
public class WorkerMetrics extends ToJsonString implements Serializable {
private static final long serialVersionUID = -5848721038892533810L;

/**
* Worker ID
*/
String workerId;

/**
* 启动时间
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@
public class GetMetricsParam extends AuthenticationParam {
private static final long serialVersionUID = 6100003437491314940L;

public GetMetricsParam(String supervisorToken) {
/**
* Group
*/
private String group;

public GetMetricsParam(String supervisorToken, String group) {
super.setSupervisorToken(supervisorToken);
this.group = group;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import cn.ponfee.disjob.common.concurrent.MultithreadExecutors;
import cn.ponfee.disjob.common.concurrent.ThreadPoolExecutors;
import cn.ponfee.disjob.common.spring.RestTemplateUtils;
import cn.ponfee.disjob.common.util.Numbers;
import cn.ponfee.disjob.core.base.*;
import cn.ponfee.disjob.core.exception.AuthenticationException;
import cn.ponfee.disjob.core.exception.KeyExistsException;
Expand All @@ -28,6 +29,7 @@
import cn.ponfee.disjob.supervisor.application.response.WorkerMetricsResponse;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpMethod;
Expand All @@ -36,6 +38,7 @@
import org.springframework.web.client.RestTemplate;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

Expand Down Expand Up @@ -78,10 +81,18 @@ public List<SupervisorMetricsResponse> supervisors() throws Exception {
return MultithreadExecutors.call(list, this::getMetrics, ThreadPoolExecutors.commonThreadPool());
}

public List<WorkerMetricsResponse> workers(String group) {
List<Worker> list = supervisorRegistry.getDiscoveredServers(group);
list = Collects.sorted(list, Comparator.comparing(e -> e.equals(Worker.current()) ? 0 : 1));
return MultithreadExecutors.call(list, this::getMetrics, ThreadPoolExecutors.commonThreadPool());
public List<WorkerMetricsResponse> workers(String group, String worker) {
if (StringUtils.isNotBlank(worker)) {
String[] array = worker.trim().split(":");
String host = array[0].trim();
int port = Numbers.toInt(array[1].trim(), -1);
WorkerMetricsResponse metrics = getMetrics(new Worker(group, "", host, port));
return StringUtils.isBlank(metrics.getWorkerId()) ? Collections.emptyList() : Collections.singletonList(metrics);
} else {
List<Worker> list = supervisorRegistry.getDiscoveredServers(group);
list = Collects.sorted(list, Comparator.comparing(e -> e.equals(Worker.current()) ? 0 : 1));
return MultithreadExecutors.call(list, this::getMetrics, ThreadPoolExecutors.commonThreadPool());
}
}

public void configureOneWorker(ConfigureOneWorkerRequest req) {
Expand All @@ -91,9 +102,7 @@ public void configureOneWorker(ConfigureOneWorkerRequest req) {
if (workers != null && workers.stream().anyMatch(worker::sameWorker)) {
throw new KeyExistsException("Worker already registered: " + worker);
}

verifyWorkerSignature(worker);

// add worker to this group
req.setData(req.getGroup());
} else {
Expand Down Expand Up @@ -129,10 +138,7 @@ private SupervisorMetricsResponse getMetrics(Supervisor supervisor) {
LOG.warn("Ping supervisor occur error: {} {}", supervisor, e.getMessage());
}

SupervisorMetricsResponse response = (metrics == null) ?
new SupervisorMetricsResponse() :
ServerMetricsConverter.INSTANCE.convert(metrics);

SupervisorMetricsResponse response = (metrics == null) ? new SupervisorMetricsResponse() : ServerMetricsConverter.INSTANCE.convert(metrics);
response.setHost(supervisor.getHost());
response.setPort(supervisor.getPort());
response.setPingTime(pingTime);
Expand All @@ -142,8 +148,9 @@ private SupervisorMetricsResponse getMetrics(Supervisor supervisor) {
private WorkerMetricsResponse getMetrics(Worker worker) {
WorkerMetrics metrics = null;
Long pingTime = null;
String group = worker.getGroup();
String url = String.format(WORKER_METRICS_URL, worker.getHost(), worker.getPort());
GetMetricsParam param = new GetMetricsParam(SchedGroupService.createSupervisorAuthenticationToken(worker.getGroup()));
GetMetricsParam param = new GetMetricsParam(SchedGroupService.createSupervisorAuthenticationToken(group), group);
try {
long start = System.currentTimeMillis();
metrics = RestTemplateUtils.invokeRpc(restTemplate, url, HttpMethod.GET, WorkerMetrics.class, null, param);
Expand All @@ -152,14 +159,15 @@ private WorkerMetricsResponse getMetrics(Worker worker) {
LOG.warn("Ping worker occur error: {} {}", worker, e.getMessage());
}

WorkerMetricsResponse response = (metrics == null) ?
new WorkerMetricsResponse() :
ServerMetricsConverter.INSTANCE.convert(metrics);
if (metrics != null && !SchedGroupService.verifyWorkerSignatureToken(metrics.getSignature(), group)) {
metrics = null;
}

WorkerMetricsResponse response = (metrics == null) ? new WorkerMetricsResponse() : ServerMetricsConverter.INSTANCE.convert(metrics);
response.setHost(worker.getHost());
response.setPort(worker.getPort());
response.setWorkerId(worker.getWorkerId());
response.setPingTime(pingTime);
response.setWorkerId(metrics != null ? metrics.getWorkerId() : worker.getWorkerId());
return response;
}

Expand All @@ -174,7 +182,7 @@ private List<Worker> getDiscoveredWorkers(String group) {
private void verifyWorkerSignature(Worker worker) {
String group = worker.getGroup();
String url = String.format(WORKER_METRICS_URL, worker.getHost(), worker.getPort());
GetMetricsParam param = new GetMetricsParam(SchedGroupService.createSupervisorAuthenticationToken(group));
GetMetricsParam param = new GetMetricsParam(SchedGroupService.createSupervisorAuthenticationToken(group), group);
WorkerMetrics metrics = RestTemplateUtils.invokeRpc(restTemplate, url, HttpMethod.GET, WorkerMetrics.class, null, param);
if (!SchedGroupService.verifyWorkerSignatureToken(metrics.getSignature(), group)) {
throw new AuthenticationException("Worker authenticated failed: " + worker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ static synchronized void setWorkerThreadPool(WorkerThreadPool threadPool) {

public static WorkerMetrics metrics() {
WorkerMetrics metrics = new WorkerMetrics();
metrics.setWorkerId(Worker.current().getWorkerId());
metrics.setStartupAt(Dates.toDate(Worker.current().getStartupAt()));
metrics.setAlsoSupervisor(Supervisor.current() != null);
metrics.setJvmThreadActiveCount(Thread.activeCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public List<SplitTask> split(JobHandlerParam param) throws JobException {

@Override
public WorkerMetrics metrics(GetMetricsParam param) {
String wGroup = Worker.current().getGroup();
String pGroup = param.getGroup();
if (!wGroup.equals(pGroup)) {
throw new IllegalArgumentException("Inconsistent get metrics group: " + wGroup + " != " + pGroup);
}
currentWork.verifySupervisorAuthenticationToken(param);
return WorkerConfigurator.metrics();
}
Expand Down

0 comments on commit 5c18389

Please sign in to comment.