结语
Apache Atlas 是代销于 Apache 旗下的这款元统计数据处理和环境治理的产品,目前在大统计数据领域应用极为广泛,能较好的协助民营企业管理工作统计数据金融资产,并对那些金融资产进行分类和环境治理,为统计数据预测,统计数据环境治理提供高效率的元统计数据重要信息。
是一件异常郁闷的事情,倘若某一各个环节出了难题,上溯的成本将是巨大的,于是 Atlas 在这种背景下不断涌现了,透过它,他们能非常方便的管理工作元统计数据,并且能上溯表等级,列等级间的亲密关系(亲属亲密关系),为企业的统计数据金融资产提供更多有力的支撑和保障。Atlas 全力支持从 HBase 、Hive、Sqoop、Storm、Kafka 中抽取和管理工作元统计数据,与此同时也能透过 Rest Api 的形式另行表述元统计数据源,聚合元统计数据。
责任编辑他们着重于介绍一下 Atlas 的有关基本概念,协助大家更好的理解 Atlas,与此同时详细传授如何透过 Rest Api 的形式自订统计数据源,聚合亲属亲密关系,以便开发他们的个人化需求。
Atlas 基本原理及有关基本概念
元统计数据
元统计数据只不过是描述统计数据的统计数据,比如表,表头,快照等,每个销售业务控制系统可能单厢他们表述表,表头,快照,那些统计数据从哪来到往哪去,统计数据间与否存有关连,和其它控制系统的统计数据与否存有多次重复和矛盾表头,那些是元统计数据处理要化解的难题,也是 Atlas 要化解的难题。
运转基本原理
Atlas 的基本原理只不过并不足为奇,主要是透过内部提供更多的JAVA加载数仓中的统计资料库内部结构,聚合统计数据源,储存到 Atlas的 Hbase 中,与此同时透过 hook 的形式窃听数仓中的统计数据变动,预测继续执行的 sql 句子,从而聚合Sonbhadra表,列与列的亲属亲密关系倚赖,在后台展现给使用者查阅。
数仓全力支持
Atlas 对 Hive 全力支持最好,他们都知道,Hive 是倚赖 Hadoop 的,统计数据储存有 HDFS 中,Atlas 有专门的 shell JAVA能直接运转加载 Hive 的表内部结构等元统计数据重要信息并行到 Atlas 的储存库中,自动聚合元统计数据源,与此同时 Atlas 提供更多的 HiveHook 能窃听 Hive 的统计数据变动,根据 Hive 继续执行的 sql 推断统计数据与统计数据间的亲密关系,聚合亲属亲密关系图,如果他们想预测其它统计数据储存介质的元统计数据和血缘关系,Atlas 的全力支持并并非很理想。但通常情况下,他们会间歇把销售业务库如 mysql,oracle 中的统计数据间歇并行到数仓中整合预测,而数仓他们一般单厢采用 Hadoop 的自然生态体系,所以这一点并并非问题。
架构图解
以下是 Atlas 的架构图解,能看出,Atlas 所倚赖的自然生态体系是异常庞大的,这也直接导致了它部署起来十分繁琐,责任编辑不再传授 Atlas 的部署,网上有关的教程有很多,感兴趣的朋友可以他们搜索尝试。
核心组件基本概念
Atlas 中主要有以下核心组件,那些需要他们着重于了解,接下来他们透过 Rest Api 自订建模只不过是对以下组件的增删查改操作。
1. Type
元统计数据类型表述,这里能是统计资料库,表,列等,还能细分 mysql 表( mysql_table ),oracle 表( oracle_table )等,atlas自带了很多类型,如 DataSet,Process 等,一般情况下,统计数据有关的类型在表述类型的时候单厢继承 DataSet,而流程有关的类型则会继承 Process,便于聚合亲属亲密关系。他们也能透过调用 api 自订类型。这是一切的起点,表述完类型之后,才能聚合不同类型的元统计数据实体,聚合亲属亲密关系,我个人更喜欢把元统计数据类型称之为建模。
2. Classification
分类,通俗点是给元统计数据打标签,分类是能传递的,比如 A 快照是基于 A 表聚合的,那么如果 A 表打上了 a 这个标签,A 快照也会自动打上 a 标签,这样的好处是便于统计数据的追踪。
3. Entity
实体,表示具体的元统计数据,Atlas 管理工作的对象是各种 Type 的 Entity。
4. Lineage
统计数据血缘,表示统计数据间的传递亲密关系,透过 Lineage 他们能清晰的知道统计数据的从何而来又流向何处,中间经历了哪些操作,这样一旦统计数据出现难题,能迅速上溯,定位是哪个各个环节出现错误。
Altas 使用
Altas 成功部署之后,使用还是很简单的,这是登录界面,使用者名密码默认是 admin,admin:
进入主页,点击右上角 switch to new ,使用新版界面,更直观:
页面左侧便是 Atlas 的类型树,点击树节点的某一类型,能查阅下面的实体,这里他们点击 mysql_table:
能看到下面有很多表,那些都是我之前他们利用 Rest Api 上传表述的,下面他们来传授一下如何透过 Rest Api 的形式自订类型,聚合实体,创建亲属亲密关系。
Atlas Rest Api 详解及示例
他们点击主页上方的 Help-》API Documentation,便能查阅 Atlas 所有的开放接口:
有一点他们需要注意,Atlas 的接口在使用的时候是需要鉴权的,所以他们构建 http 请求的时候需要带上使用者名和密码认证重要信息,本次示例中他们使用 atlas-client-v2 开源组件来进行 Atlas 的 api 调用。
本次示例他们表述一个 my_db 类型,my_table 类型,并且让 my_db 一对多 my_table,然后创建 test_db 实体到 my_db 下,创建 test_table_source 和 test_table_target 实体到 my_table 下,并且表述 test_table_target 的统计数据来自 test_table_source,聚合两个实体的亲属亲密关系倚赖。
自订 my_db 和 my_table 类型
他们对 my_db 和 my_table 类型进行表述,在 Atlas 的Rest Api 中,允许一个请求表述多种类型,在这里他们先构建 json 请求体,然后再透过编码形式实现,二者对比,更容易理解,json 请求体如下(关键地方有注释):
{
“enumDefs”: [],
“structDefs”: [],
“classificationDefs”: [],
//类型表述
“entityDefs”: [
{
“name”: “my_db”,
//统计数据类型的表述,约定俗成,继承Atlas自带的DataSet
“superTypes”: [
“DataSet”
],
//服务类型(便于在界面分组显示类型)
“serviceType”: “my_type”,
“typeVersion”: “1.1”,
“attributeDefs”: []
},
{
“name”: “my_table”,
“superTypes”: [
“DataSet”
],
“serviceType”: “my_type”,
“typeVersion”: “1.1”,
“attributeDefs”: []
}
],
//表述类型间的亲密关系
“relationshipDefs”: [
{
“name”: “my_table_db”,
“serviceType”: “my_type”,
“typeVersion”: “1.1”,
//亲密关系类型:ASSOCIATION:关连亲密关系,没有容器存有,1对1
//AGGREGATION:容器亲密关系,1对多,而且彼此能相互独立存有
//COMPOSITION:容器亲密关系,1对多,但是容器中的实例不能脱离容器存有
“relationshipCategory”: “AGGREGATION”,
//节点一
“endDef1”: {
“type”: “my_table”,
//表中关连的属性名称,对应下面的 my_db
“name”: “db”,
//代表这头是并非容器
“isContainer”: false,
//cardinality: 三种类型SINGLE, LIST, SET
“cardinality”: “SINGLE”
},
// 节点2
“endDef2”: {
“type”: “my_db”,
“name”: “tables”,
“isContainer”: true,
// db 包含 table,table不能多次重复,所以类型设置为 SET
“cardinality”: “SET”
},
// 推导tag NONE 不推导
“propagateTags”: “NONE”
}
]
}
编码实现:
引入 pom 倚赖,注意,如果要集成到他们的销售业务控制系统之中,销售业务控制系统如果使用了其它的日志框架,需要去除 slf4j-log4j12 倚赖,否则日志框架会起冲突,导致启动失败,另外 atlas-client-common 中倚赖了 commons-configuration 1.10,如果销售业务控制系统中有低版本倚赖,记得排除,不然二者会冲突,导致 client 初始化失败。
<dependencies>
<!– Apache Atlas –>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client-common</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!– Apache Atlas Client Version2 –>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client-v2</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
</dependencies>
引入 atlas-application.properties(必须得有,否则会初始化失败):
atlas.rest.address=http://127.0.0.1:21000
代码实现如下(对照json非常容易理解):
AtlasClientV2 atlasClientV2 = new AtlasClientV2(new String[]{“http://127.0.0.1:21000”}, new String[]{“admin”, “admin”});
//父类集合
Set<String> superTypes = new HashSet<>();
superTypes.add(AtlasBaseTypeDef.ATLAS_TYPE_DATASET);
//表述myType
AtlasTypesDef myType = new AtlasTypesDef();
//表述myDb
AtlasEntityDef myDb = new AtlasEntityDef();
myDb.setName(“my_db”);
myDb.setServiceType(“my_type”);
myDb.setSuperTypes(superTypes);
myDb.setTypeVersion(“1.1”);
//表述mytable
AtlasEntityDef myTable = new AtlasEntityDef();
myTable.setName(“my_table”);
myTable.setServiceType(“my_type”);
myTable.setSuperTypes(superTypes);
myTable.setTypeVersion(“1.1”);
//表述relationshipDef
AtlasRelationshipDef relationshipDef = new AtlasRelationshipDef();
relationshipDef.setName(“my_table_db”);
relationshipDef.setServiceType(“my_type”);
relationshipDef.setTypeVersion(“1.1”);
relationshipDef.setRelationshipCategory(AtlasRelationshipDef.RelationshipCategory.AGGREGATION);
relationshipDef.setPropagateTags(AtlasRelationshipDef.PropagateTags.NONE);
//表述endDef1 AtlasRelationshipEndDef endDef1 = new AtlasRelationshipEndDef();
endDef1.setType(“my_table”);
endDef1.setName(“db”);
endDef1.setIsContainer(false);
endDef1.setCardinality(AtlasStructDef.AtlasAttributeDef.Cardinality.SINGLE);
relationshipDef.setEndDef1(endDef1);
//表述endDef2
AtlasRelationshipEndDef endDef2 = new AtlasRelationshipEndDef();
endDef2.setType(“my_db”);
endDef2.setName(“tables”);
endDef2.setIsContainer(true);
endDef2.setCardinality(AtlasStructDef.AtlasAttributeDef.Cardinality.SET);
relationshipDef.setEndDef2(endDef2);
//entityDefs
List<AtlasEntityDef> entityDefs = new ArrayList<>(2);
entityDefs.add(myDb);
entityDefs.add(myTable);
myType.setEntityDefs(entityDefs);
//relationshipDefs
List<AtlasRelationshipDef> relationshipDefs = new ArrayList<>(1);
relationshipDefs.add(relationshipDef);
myType.setRelationshipDefs(relationshipDefs);
//查询与否已有my_db类型,没有则创建
SearchFilter filter = new SearchFilter();
filter.setParam(“name”, “my_db”);
AtlasTypesDef allTypeDefs = atlasClientV2.getAllTypeDefs(filter);
if (allTypeDefs.getEntityDefs().isEmpty()) {
//请求 rest api
atlasClientV2.createAtlasTypeDefs(myType);
}
继续执行以上代码,继续执行完毕后,前往 Atlas 主页查阅,类型已成功创建:
查阅类型模型图:
类型创建完毕,接下来他们进行实体的创建。
创建实体 test_db,test_table_source 和 test_table_target
json 如下:
//my_db 实体
{
“typeName”: “my_db”,
“attributes”: {
“qualifiedName”: “test_db”,
“name”: “test_db”,
“description”: “测试创建db”
}
}
//test_table_source 实体
{
“typeName”: “my_table”,
“attributes”: {
“qualifiedName”: “test_table_source”,
“name”: “test_table_source”,
“description”: “测试创建test_table_source”
},
“relationshipAttributes”: {
“db”: {
“typeName”: “my_db”,
//my_db的guid(创建完my_db后会返回)
“guid”: “xxxx”
}
}
}
//test_table_target 实体
{
“typeName”: “my_table”,
“attributes”: {
“qualifiedName”: “test_table_target”,
“name”: “test_table_target”,
“description”: “测试创建test_table_target”
},
“relationshipAttributes”: {
“db”: {
“typeName”: “my_db”,
“guid”: “xxx”
}
}
}
代码实现如下:
//创建实体 test_db
AtlasEntity testDb = new AtlasEntity();
testDb.setTypeName(“my_db”);
Map<String, Object> attributes = new HashMap<>();
attributes.put(“qualifiedName”, “test_db”);
attributes.put(“name”, “test_db”);
attributes.put(“description”, “测试创建db”);
testDb.setAttributes(attributes);
Map<String, String> queryAttributes = new HashMap<>();
queryAttributes.put(“qualifiedName”, “test_db”);
String myDbGuid = null;
try {
//查询不到会报错
AtlasEntity.AtlasEntityWithExtInfo extInfo = atlasClientV2.getEntityByAttribute(“my_db”, queryAttributes);
myDbGuid = extInfo.getEntity().getGuid();
} catch (AtlasServiceException e) {
if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) {
AtlasEntity.AtlasEntityWithExtInfo extInfo = new AtlasEntity.AtlasEntityWithExtInfo(testDb);
//请求
EntityMutationResponse response = atlasClientV2.createEntity(extInfo);
myDbGuid = response.getGuidAssignments().values().toArray(new String[]{})[0];
}
}
//创建与db的亲密关系
Map<String, Object> relationShipAttr = new HashMap<>();
Map<String, String> dbMap = new HashMap<>();
dbMap.put(“guid”, myDbGuid);
dbMap.put(“typeName”, “my_db”);
relationShipAttr.put(“db”, dbMap);
//创建实体 test_table_source
AtlasEntity testTableSource = new AtlasEntity();
testTableSource.setTypeName(“my_table”);
attributes.put(“qualifiedName”, “test_table_source”);
attributes.put(“name”, “test_table_source”);
attributes.put(“description”, “测试创建test_table_source”);
testTableSource.setAttributes(attributes);
testTableSource.setRelationshipAttributes(relationShipAttr);
queryAttributes.put(“qualifiedName”, “test_table_source”);
try {
//atlasClientV2.updateEntity(new AtlasEntity.AtlasEntityWithExtInfo(testTableSource)); AtlasEntity.AtlasEntityWithExtInfo extInfo = atlasClientV2.getEntityByAttribute(“my_table”, queryAttributes);
testTableSource = extInfo.getEntity();
} catch (AtlasServiceException e) {
if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) {
AtlasEntity.AtlasEntityWithExtInfo extInfo = new AtlasEntity.AtlasEntityWithExtInfo(testTableSource);
//请求
EntityMutationResponse response = atlasClientV2.createEntity(extInfo);
testTableSource.setGuid(response.getGuidAssignments().values().toArray(new String[]{})[0]);
}
}
//创建实体 test_table_target
AtlasEntity testTableTarget = new AtlasEntity();
testTableTarget.setTypeName(“my_table”);
attributes.put(“qualifiedName”, “test_table_target”);
attributes.put(“name”, “test_table_target”);
attributes.put(“description”, “测试创建test_table_target”);
testTableTarget.setAttributes(attributes);
testTableTarget.setRelationshipAttributes(relationShipAttr);
queryAttributes.put(“qualifiedName”, “test_table_target”);
try {
//atlasClientV2.updateEntity(new AtlasEntity.AtlasEntityWithExtInfo(testTableTarget)); AtlasEntity.AtlasEntityWithExtInfo extInfo = atlasClientV2.getEntityByAttribute(“my_table”, queryAttributes);
testTableTarget = extInfo.getEntity();
} catch (AtlasServiceException e) {
if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) {
AtlasEntity.AtlasEntityWithExtInfo extInfo = new AtlasEntity.AtlasEntityWithExtInfo(testTableTarget);
//请求
EntityMutationResponse response = atlasClientV2.createEntity(extInfo);
testTableTarget.setGuid(response.getGuidAssignments().values().toArray(new String[]{})[0]);
}
}
继续执行代码完毕后,查阅类的树形图,发现已经产生了实体:
他们点击右侧的 test_db 实体,能看到它的基本重要信息,也能看到它的 relationship 重要信息,包含了 test_table_source 和 test_table_target 两个实体:
查阅 relationship 重要信息,包含 test_table_source 和 test_table_target:
创建 test_table_source 和 test_table_target 的亲属亲密关系倚赖
前面他们提到,表述 test_table_target 的统计数据来自 test_table_source,亲属亲密关系倚赖在 Atlas 中只不过也是作为实体 Entity 存有,只不过继承的父类是 Process,这样能表述 inputs 和 outputs 属性,构建亲属亲密关系,json 如下:
{
“typeName”: “Process”,
“attributes”: {
“name”: “test_process”,
“qualifiedName”: “test_process”,
“description”: “test_table_target 的统计数据来自 test_table_source”,
“inputs”: [{
“typeName”: “my_table”,
//test_table_sou
“guid”: “xxx”
}],
“outputs”: [{
“typeName”: “my_table”,
////test_table_target的guid,创建
“guid”: “xxx”
}]
}
}
代码实现如下:
AtlasEntity lineage = new AtlasEntity();
//设置为process类型构建血缘
lineage.setTypeName(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);
attributes.put(“qualifiedName”, “test_process”);
attributes.put(“name”, “test_process”);
attributes.put(“description”, “test_table_target 的统计数据来自 test_table_source”);
attributes.put(“inputs”, getLineAgeInfo(testTableSource));
attributes.put(“outputs”, getLineAgeInfo(testTableTarget));
lineage.setAttributes(attributes);
queryAttributes.put(“qualifiedName”, “test_process”);
System.out.println(SingletonObject.OBJECT_MAPPER.writeValueAsString(lineage));
try {
//查询与否存有
atlasClientV2.getEntityByAttribute(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS, queryAttributes);
} catch (AtlasServiceException e) {
if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) {
//创建
AtlasEntity.AtlasEntityWithExtInfo extInfo = new AtlasEntity.AtlasEntityWithExtInfo(lineage);
atlasClientV2.createEntity(extInfo);
}
}
//构建inputs和outputs
private static List<Map<String, String>> getLineAgeInfo(AtlasEntity entity) {
List<Map<String, String>> list = new ArrayList<>();
Map<String, String> map = new HashMap<>();
map.put(“guid”, entity.getGuid());
map.put(“typeName”, entity.getTypeName());
list.add(map);
return list;
}
继续执行以上代码,然后打开主页,点击 my_table 中的 test_table_source,查阅 lineage 标签,亲属亲密关系已成功构建:
至此,他们透过 Atlas Rest Api 的形式另行建模,创建实体,构建亲属亲密关系就完成了。
结语
透过此文,他们能循序渐进,理解 Atlas 的基本概念和基本原理,与此同时也能掌握 Atlas 的 Rest Api,日后如果有二次开发的需求,能为此奠定良好的基础,我所在的公司是基于 Atlas 搭建了他们的统计数据中台,UI 界面也都是他们重新实现的,比原生的界面好看很多,并且实现了他们的许多个人化需求。
网上 Atlas 的教程相对较少,特别是 Rest Api 这块很是缺乏,所以特地写了这篇文章。元统计数据处理,统计数据环境治理,在当下仍然是一个热门的话题,与此同时,它也能协助他们更好的支撑民营企业的统计数据金融资产,更好的预测统计数据,为民营企业的发展决策提供更多有效的协助。
希望责任编辑能对你有所感悟,有所启发,他们下次更新,再见!
关注公众号