0%

xxl-job源码学习(2)

xxl-job && datax-web 源码内容

问题

日志相关

  • 日志文件是如何生成 ?
    通过生成日志文件,追加日志。见附录 日志流程图
  • 日志格式使用的是 logback.xml ?
    否,代码写死,见 NtiJobLogger.java
  • 读取日志时如何从某行开始读取 ?
    通过 io 流包装类,LineNumberReader.java

停止线程

  • 直接使用 interrupt 能时线程停止 ?
    能停止线程,process.waitFor()触发报中断异常,但是 datax 任务会执行任务结束
  • 如何获取 Process process = Runtime.getRuntime().exec(command)的进程 id
    通过 java.lang.UNIXProcess 获取
  • 进程 id 如何返回管理平台
    通过 ProcessCallbackThread 单独线程处理、平台提供 processCallback 接口进行回调

xxl-job 交互流程

  • 如何选择执行器 ?
    通过 TriggerParam 参数调用 Executor 的实现类 ExecutorBizImpl 的 run 方法进行判断,类型见 GlueTypeEnum
  • 运行参数是如何传递的 ?
    通过 TriggerParam 参数 executorParams\replaceParam\jvmParam\replaceParamType\extendedParam 等属性进行传递

netty 相关

  • demo 程序
  • 封包解包&&粘包 ?
    将消息分为消息头和消息体,消息头中包含表示信息的总长度(或者消息体长度)的字段
  • 交互时:客户端发送服务端 / 服务端发送客户端 封包解包 区别 ?
    TODO

附录

1、日志流程图

2、NtiJobLogger.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* append log
*
* @param callInfo
* @param appendLog
*/
private static void logDetail(StackTraceElement callInfo, String appendLog) {
// "yyyy-MM-dd HH:mm:ss [ClassName]-[MethodName]-[LineNumber]-[ThreadName] log";
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append(DateUtil.formatDateTime(new Date())).append(" ")
.append("[" + callInfo.getClassName() + "#" + callInfo.getMethodName() + "]").append("-")
.append("[" + callInfo.getLineNumber() + "]").append("-")
.append("[" + Thread.currentThread().getName() + "]").append(" ")
.append(appendLog != null ? appendLog : "");
String formatAppendLog = stringBuffer.toString();

// appendlog
String logFileName = NtiJobFileAppender.contextHolder.get();
if (logFileName != null && logFileName.trim().length() > 0) {
NtiJobFileAppender.appendLog(logFileName, formatAppendLog);
} else {
logger.info(">>>>>>>>>>> {}", formatAppendLog);
}
}

3、文件按行读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
LineNumberReader reader = null;
try {
//reader = new LineNumberReader(new FileReader(logFile));
reader = new LineNumberReader(new InputStreamReader(new FileInputStream(logFile), "utf-8"));
String line = null;

while ((line = reader.readLine()) != null) {
// [from, to], start as 1
toLineNum = reader.getLineNumber();
if (toLineNum >= fromLineNum) {
logContentBuffer.append(line).append("\n");
}
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}

4、读取进程 id

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public static String getProcessId(Process process) {
long pid = -1;
Field field;
if (Platform.isWindows()) {
try {
field = process.getClass().getDeclaredField("handle");
field.setAccessible(true);
pid = Kernel32.INSTANCE.GetProcessId((Long) field.get(process));
} catch (Exception ex) {
logger.error("get process id for windows error {0}", ex);
}
} else if (Platform.isLinux() || Platform.isAIX()) {
try {
Class<?> clazz = Class.forName("java.lang.UNIXProcess");
field = clazz.getDeclaredField("pid");
field.setAccessible(true);
pid = (Integer) field.get(process);
} catch (Throwable e) {
logger.error("get process id for unix error {0}", e);
}
}
return String.valueOf(pid);
}


<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>4.1.0</version>
</dependency>

import com.sun.jna.Library;
import com.sun.jna.Native;

/**
* Kernel32
*
* @author jingwk
* @version 1.0
* @since 2019/11/09
*/

public interface Kernel32 extends Library {
Kernel32 INSTANCE = (Kernel32) Native.loadLibrary("kernel32", Kernel32.class);

long GetProcessId(Long hProcess);
}

5、Executor 执行器类型

1
2
3
4
5
6
7
8
9
10
11
12
BEAN("BEAN", false, null, null),
GLUE_GROOVY("GLUE(Java)", false, null, null),
GLUE_SHELL("GLUE(Shell)", true, "bash", ".sh"),
GLUE_PYTHON("GLUE(Python)", true, "python", ".py"),
GLUE_PHP("GLUE(PHP)", true, "php", ".php"),
GLUE_NODEJS("GLUE(Nodejs)", true, "node", ".js"),
GLUE_POWERSHELL("GLUE(PowerShell)", true, "powershell", ".ps1");

private String desc;
private boolean isScript;
private String cmd;
private String suffix;

实现类

  • MethodJobHandler: bean 模式使用
  • GlueJobHandler: groovy 使用
  • ScriptJobHandler: isScript=true 使用