0%

datax 源码学习

源码学习 (分析方法:5w2h)

  • what 数据同步工具
  • how 通过 python 执行,根据 json 配置信息,从源数据库同步到目的数据库
  • why 离线同步数据

问题

  1. 如何解析 json?
    使用 fastJson 解析 JSON.parse(json)生成 Object 对象(本质为 map 对象)
  2. 如何加载各个插件?
    使用 ClassLoader 加载 jar 包,通过配置文件 plugin.json 的 class 属性生成 Job 或者 Task 对象
  3. 具体配置是如何处理的?
    通过 Configuration 对象封装 json 转换的对象,通过 key 打平进行获取及存储, 见附录一

datax 相关命令

控制台测试命令

python D:/fmzh/datax/datax/datax/bin3.0/datax.py -p"-Dhadoop.home.dir=D:\thirdlib\hadoop-common-2.2.0-bin" jobTmp-d80defde826447bbb5f6d50e607633c2.conf > log.logging

附录

一 datax 中 json 扩展

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/**
* 根据用户提供的json path,寻址具体的对象。
* <p/>
* <br>
* <p/>
* NOTE: 目前仅支持Map以及List下标寻址, 例如:
* <p/>
* <br />
* <p/>
* 对于如下JSON
* <p/>
* {"a": {"b": {"c": [0,1,2,3]}}}
* <p/>
* config.get("") 返回整个Map <br>
* config.get("a") 返回a下属整个Map <br>
* config.get("a.b.c") 返回c对应的数组List <br>
* config.get("a.b.c[0]") 返回数字0
*
* @return Java表示的JSON对象,如果path不存在或者对象不存在,均返回null。
*/
public Object get(final String path) {
this.checkPath(path);
try {
return this.findObject(path);
} catch (Exception e) {
return null;
}
}

private String split(final String path) {
return StringUtils.replace(path, "[", ".[");
}

private List<String> split2List(final String path) {
return Arrays.asList(StringUtils.split(split(path), "."));
}

private boolean isPathList(final String path) {
return path.contains("[") && path.contains("]");
}

private boolean isPathMap(final String path) {
return StringUtils.isNotBlank(path) && !isPathList(path);
}

private Object findObject(final String path) {
boolean isRootQuery = StringUtils.isBlank(path);
if (isRootQuery) {
return this.root;
}

Object target = this.root;

for (final String each : split2List(path)) {
if (isPathMap(each)) {
target = findObjectInMap(target, each);
continue;
} else {
target = findObjectInList(target, each);
continue;
}
}

return target;
}

@SuppressWarnings("unchecked")
private Object findObjectInMap(final Object target, final String index) {
boolean isMap = (target instanceof Map);
if (!isMap) {
throw new IllegalArgumentException(String.format(
"您提供的配置文件有误. 路径[%s]需要配置Json格式的Map对象,但该节点发现实际类型是[%s]. 请检查您的配置并作出修改.",
index, target.getClass().toString()));
}

Object result = ((Map<String, Object>) target).get(index);
if (null == result) {
throw new IllegalArgumentException(String.format(
"您提供的配置文件有误. 路径[%s]值为null,datax无法识别该配置. 请检查您的配置并作出修改.", index));
}

return result;
}

@SuppressWarnings({ "unchecked" })
private Object findObjectInList(final Object target, final String each) {
boolean isList = (target instanceof List);
if (!isList) {
throw new IllegalArgumentException(String.format(
"您提供的配置文件有误. 路径[%s]需要配置Json格式的Map对象,但该节点发现实际类型是[%s]. 请检查您的配置并作出修改.",
each, target.getClass().toString()));
}

String index = each.replace("[", "").replace("]", "");
if (!StringUtils.isNumeric(index)) {
throw new IllegalArgumentException(
String.format(
"系统编程错误,列表下标必须为数字类型,但该节点发现实际类型是[%s] ,该异常代表系统编程错误, 请联系DataX开发团队 !",
index));
}

return ((List<Object>) target).get(Integer.valueOf(index));
}