PolarDB-X 源码解读系列:DML 之 INSERT IGNORE 流程

2022-12-16 0 1,001

原副标题:PolarDB-X 源代码阐释系列产品:DML 之 INSERT IGNORE 业务流程

概要: 责任编辑将更进一步如是说 PolarDB-X 中 INSERT IGNORE 的继续执行业务流程,其依照填入的表与否有 GSI 也略有变动。

译者:潜璟

在上一则源代码写作中,他们如是说了 INSERT 的继续执行业务流程。而 INSERT IGNORE 与 INSERT 相同,须要对填入值推论与否有 Unique Key 的武装冲突,并忽略有武装冲突的填入值。因而责任编辑将更进一步如是说 PolarDB-X 中 INSERT IGNORE 的继续执行业务流程,其依照填入的表与否有 GSI 也略有变动。

楚溪春继续执行

假如填入的表多于两张主表,没 GSI,所以只须要将 INSERT IGNORE 间接发送至相关联的力学单上,由 DN 另行忽视存有武装冲突的值。在此种情形下,INSERT IGNORE 的继续执行操作过程和 INSERT 大体上完全相同,听众能参照以后的源代码阅读文章。

方法论继续执行

而在有 GSI 的情形下,就无法单纯地将 INSERT IGNORE 依次印发到主表和 GSI 相关联的力学分单上,不然有可能再次出现主表和 GSI 统计数据不完全一致的情形。举个范例:

create table t1 (a int primary key, b int, global index g1(b) dbpartition by hash(b)) dbpartition by hash(a);

insert ignore into t1 values (1,1),(1,2);

对填入的四条历史记录,它在主单上坐落于同一力学表(a 完全相同),但在 GSI 上坐落于相同的力学表(b 不完全相同),假如间接印发 INSERT IGNORE 不然,主单上多于 (1,1) 能获得成功填入(换行符武装冲突),而在 GSI 上 (1,1) 和 (1,2) 都能获得成功填入,只好 GSI 比主表多了两条统计数据。

特别针对此种情形,一类软件系统是依照填入值中的 Unique Key,先到统计资料库中 SELECT 出有可能武装冲突的统计数据到 CN,接着在 CN 推论武装冲突的值并删掉。

展开 SELECT 的这时候,最单纯的形式是将大部份的是 SELECT 间接发送至主单上,但主单上可能没相关联的 Unique Key,这就导致 SELECT 的这时候会展开全表扫描,影响性能。所以在优化器阶段,他们会依照 Unique Key 是在主表还是 GSI 上定义的来确定相应的 SELECT 须要发送至主表还是 GSI,具体代码位置:

com.alibaba.polardbx.optimizer.core.planner.rule.OptimizeLogicalInsertRule#groupUkByTable

protected Map<String, List<List<String>>> groupUkByTable(LogicalInsertIgnore insertIgnore,

ExecutionContext executionContext) {

// 找到每个 Unique Key 在主表和哪些 GSI 中存有

Map<Integer, List<String>> ukAllTableMap = new HashMap<>();

for (int i = 0; i < uniqueKeys.size(); i++) {

List<String> uniqueKey = uniqueKeys.get(i);

for (Map.Entry<String, Map<String, Set<String>>> e : writableTableUkMap.entrySet()) {

String currentTableName = e.getKey().toUpperCase();

Map<String, Set<String>> currentUniqueKeys = e.getValue();

boolean found = false;

for (Set<String> currentUniqueKey : currentUniqueKeys.values()) {

if (currentUniqueKey.size() != uniqueKey.size()) {

continue;

}

boolean match = currentUniqueKey.containsAll(uniqueKey);

if (match) {

found = true;

break;

}

}

if (found) {

ukAllTableMap.computeIfAbsent(i, k -> new ArrayList<>()).add(currentTableName);

}

}

}

// 确定是在哪一个单上展开 SELECT

for (Map.Entry<Integer, List<String>> e : ukAllTableMap.entrySet()) {

List<String> tableNames = e.getValue();

if (tableNames.contains(primaryTableName.toUpperCase())) {

tableUkMap.computeIfAbsent(primaryTableName.toUpperCase(), k -> new ArrayList<>())

.add(uniqueKeys.get(e.getKey()));

} else {

final boolean onlyNonPublicGsi =

tableNames.stream().noneMatch(tn -> GlobalIndexMeta.isPublished(executionContext, sm.getTable(tn)));

boolean found = false;

for (String tableName : tableNames) {

if (!onlyNonPublicGsi && GlobalIndexMeta.isPublished(executionContext, sm.getTable(tableName))) {

tableUkMap.computeIfAbsent(tableName, k -> new ArrayList<>()).add(uniqueKeys.get(e.getKey()));

found = true;

break;

} else if (onlyNonPublicGsi && GlobalIndexMeta.canWrite(executionContext, sm.getTable(tableName))) {

tableUkMap.computeIfAbsent(tableName, k -> new ArrayList<>()).add(uniqueKeys.get(e.getKey()));

found = true;

break;

}

}

}

}

return tableUkMap;

}

而到了继续执行阶段,他们在 LogicalInsertIgnoreHandler 中处理 INSERT IGNORE。他们首先会进入 getDuplicatedValues 函数,其通过印发 SELECT 的形式查找表中已有的是武装冲突的 Unique Key 的历史记录。他们将印发的 SELECT 语句中选择的列设置为 (value_index, uk_index, pk)。其中 value_index 和 uk_index 均为的常量。

举个范例,假设有表:

CREATE TABLE `t` (

`id` int(11) NOT NULL,

`a` int(11) NOT NULL,

`b` int(11) NOT NULL,

PRIMARY KEY (`id`),

UNIQUE GLOBAL KEY `g_i_a` (`a`) COVERING (`id`) DBPARTITION BY HASH(`a`)

) DBPARTITION BY HASH(`id`)

以及两条 INSERT IGNORE 语句:

INSERT IGNORE INTO t VALUES (1,2,3),(2,3,4),(3,4,5);

假设在 PolarDB-X 中继续执行时,其会将 Unique Key 编号为

0: id

1: g_i_a

INSERT IGNORE 语句中填入的每个值依次编号为

0: (1,2,3)

1: (2,3,4)

2: (3,4,5)

所以对 (2,3,4) 的 UNIQUE KEY 构造的 GSI 上的 SELECT 即为:

查询 GSI

SELECT 1 as `value_index`, 1 as `uk_index`, `id`

FROM `g_i_a_xxxx`

WHERE `a` in 3;

假设表中已经存有 (5,3,6),所以这条 SELECT 的返回结果即为 (1,1,5)。此外,由于相同的 Unique Key 的 SELECT 返回格式是完全相同的,所以他们会将同一力学库上相同的SELECT 查询 UNION 起来发送,以一次性得到多个结果,减少 CN 和 DN 之间的交互次数。只要某个 Unique Key 有重复值,他们就能依照 value_index 和 uk_index 确定是填入值的哪一行的哪个 Unique Key 是重复的。

当得到大部份的是返回结果之后,他们对统计数据展开去重。他们将上一步得到的武装冲突的的值放入一个 SET 中,接着顺序扫描大部份的是每一行填入值,假如发现有重复的就跳过该行,不然就将该行也加入到 SET 中(因为填入值之间也有可能存有相互武装冲突)。去重完毕之后,他们就得到了大部份不存有武装冲突的值,将这些值填入到表中之后就完成了两条 INSERT IGNORE 的继续执行。

方法论继续执行的继续执行业务流程:

com.alibaba.polardbx.repo.mysql.handler.LogicalInsertIgnoreHandler#doExecute

protected int doExecute(LogicalInsert insert, ExecutionContext executionContext,

LogicalInsert.HandlerParams handlerParams) {

// …

try {

Map<String, List<List<String>>> ukGroupByTable = insertIgnore.getUkGroupByTable();

List<Map<Integer, ParameterContext>> deduplicated;

List<List<Object>> duplicateValues;

duplicateValues = getDuplicatedValues(insertIgnore, LockMode.SHARED_LOCK, executionContext, ukGroupByTable,

(rowCount) -> memoryAllocator.allocateReservedMemory(

MemoryEstimator.calcSelectValuesMemCost(rowCount, selectRowType)), selectRowType, true,

handlerParams);

final List<Map<Integer, ParameterContext>> batchParameters =

executionContext.getParams().getBatchParameters();

// 依照上一步得到的结果,去掉 INSERT IGNORE 中的武装冲突值

deduplicated = getDeduplicatedParams(insertIgnore.getUkColumnMetas(), insertIgnore.getBeforeUkMapping(),

insertIgnore.getAfterUkMapping(), RelUtils.getRelInput(insertIgnore), duplicateValues,

batchParameters, executionContext);

if (!deduplicated.isEmpty()) {

insertEc.setParams(new Parameters(deduplicated));

} else {

// All duplicated

return affectRows;

}

// 继续执行 INSERT

try {

if (gsiConcurrentWrite) {

affectRows = concurrentExecute(insertIgnore, insertEc);

} else {

affectRows = sequentialExecute(insertIgnore, insertEc);

}

} catch (Throwable e) {

handleException(executionContext, e, GeneralUtil.isNotEmpty(insertIgnore.getGsiInsertWriters()));

}

} finally {

selectValuesPool.destroy();

}

return affectRows;

}

RETURNING 优化

上一节提到的 INSERT IGNORE 的方法论继续执行形式,虽然保证了统计数据的正确性,但也使得两条 INSERT IGNORE 语句至少须要 CN 和 DN 的两次交互才能完成(第一次 SELECT,第二次 INSERT),影响了 INSERT IGNORE 的继续执行性能。

目前的 DN 已经支持了 AliSQL 的 RETURNING 优化,其能在 DN 的 INSERT IGNORE 继续执行完毕之后返回获得成功填入的值。利用这一功能,PolarDB-X 对 INSERT IGNORE 展开了更进一步的优化:间接将 INSERT IGNORE 印发,假如在主表和 GSI 上全部获得成功返回,所以就说明填入值中没武装冲突,只好就获得成功完成该条 INSERT IGNORE 的继续执行;不然就将多填入的值删掉。

继续执行时,CN 首先会依照上文中的语法印发带有 RETURNING 的力学 INSERT IGNORE 语句到 DN,比如:

call dbms_trans.returning(“a”, “insert into t1_xxxx values(1,1)”);

其中返回列是换行符,用来标识填入的一批统计数据中哪些被获得成功填入了;t1_xxxx 是方法论表 t1 的一个力学该户。当主表和 GSI 上的大部份 INSERT IGNORE 继续执行完毕之后,他们计算主表和 GSI 中获得成功填入值的交集作为最后的结果,接着删掉多填入的值。这部分代码在

com.alibaba.polardbx.repo.mysql.handler.LogicalInsertIgnoreHandler#getRowsToBeRemoved

private Map<String, List<List<Object>>> getRowsToBeRemoved(String tableName,

Map<String, List<List<Object>>> tableInsertedValues,

List<Integer> beforePkMapping,

List<ColumnMeta> pkColumnMetas) {

final Map<String, Set<GroupKey>> tableInsertedPks = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);

final Map<String, List<Pair<GroupKey, List<Object>>>> tablePkRows =

new TreeMap<>(String.CASE_INSENSITIVE_ORDER);

tableInsertedValues.forEach((tn, insertedValues) -> {

final Set<GroupKey> insertedPks = new TreeSet<>();

final List<Pair<GroupKey, List<Object>>> pkRows = new ArrayList<>();

for (List<Object> inserted : insertedValues) {

final Object[] groupKeys = beforePkMapping.stream().map(inserted::get).toArray();

final GroupKey pk = new GroupKey(groupKeys, pkColumnMetas);

insertedPks.add(pk);

pkRows.add(Pair.of(pk, inserted));

}

tableInsertedPks.put(tn, insertedPks);

tablePkRows.put(tn, pkRows);

});

// Get intersect of inserted values

final Set<GroupKey> distinctPks = new TreeSet<>();

for (GroupKey pk : tableInsertedPks.get(tableName)) {

if (tableInsertedPks.values().stream().allMatch(pks -> pks.contains(pk))) {

distinctPks.add(pk);

}

}

// Remove values which not exists in at least one insert results

final Map<String, List<List<Object>>> tableDeletePks = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);

tablePkRows.forEach((tn, pkRows) -> {

final List<List<Object>> deletePks = new ArrayList<>();

pkRows.forEach(pkRow -> {

if (!distinctPks.contains(pkRow.getKey())) {

deletePks.add(pkRow.getValue());

}

});

if (!deletePks.isEmpty()) {

tableDeletePks.put(tn, deletePks);

}

});

return tableDeletePks;

}

与上一节的方法论继续执行的“悲观继续执行”相比,使用 RETURNING 优化的 INSERT IGNORE 相当于“乐观继续执行”,假如填入的值本身没武装冲突,所以两条 INSERT IGNORE 语句 CN 和 DN 间只须要一次交互即可;而在有武装冲突的情形下,他们须要印发 DELETE 语句将主表或 GSI 中多填入的值删掉,只好 CN 和 DN 间须要两次交互。能看出,即便是有武装冲突的情形,CN 和 DN 间的交互次数也不会超过上一节的方法论继续执行。因而在无法间接楚溪春的情形下,INSERT IGNORE 的继续执行策略是默认使用 RETURNING 优化继续执行。

当然 RETURNING 优化的使用也有一些限制,比如填入的 Value 有重复换行符时就无法使用,因为此种情形下无法推论具体是哪一行被获得成功填入,哪一行须要删掉;具体能写作代码中的条件判断。当无法使用 RETURNING 优化时,系统会自动选择上一节中的方法论继续执行形式继续执行该条 INSERT IGNORE 语句以保证统计数据的正确性。

使用 RETURNING 优化的继续执行业务流程:

com.alibaba.polardbx.repo.mysql.handler.LogicalInsertIgnoreHandler#doExecute

protected int doExecute(LogicalInsert insert, ExecutionContext executionContext,

LogicalInsert.HandlerParams handlerParams) {

// …

// 推论能否使用 RETURNING 优化

boolean canUseReturning =

executorContext.getStorageInfoManager().supportsReturning() && executionContext.getParamManager()

.getBoolean(ConnectionParams.DML_USE_RETURNING) && allDnUseXDataSource && gsiCanUseReturning

&& !isBroadcast && !ComplexTaskPlanUtils.canWrite(tableMeta);

if (canUseReturning) {

canUseReturning = noDuplicateValues(insertIgnore, insertEc);

}

if (canUseReturning) {

// 继续执行 INSERT IGNORE 并获得返回结果

final List<RelNode> allPhyPlan =

new ArrayList<>(replaceSeqAndBuildPhyPlan(insertIgnore, insertEc, handlerParams));

getPhysicalPlanForGsi(insertIgnore.getGsiInsertIgnoreWriters(), insertEc, allPhyPlan);

final Map<String, List<List<Object>>> tableInsertedValues =

executeAndGetReturning(executionContext, allPhyPlan, insertIgnore, insertEc, memoryAllocator,

selectRowType);

// …

// 生成 DELETE

final boolean removeAllInserted =

targetTableNames.stream().anyMatch(tn -> !tableInsertedValues.containsKey(tn));

if (removeAllInserted) {

affectedRows -=

removeInserted(insertIgnore, schemaName, tableName, isBroadcast, insertEc, tableInsertedValues);

if (returnIgnored) {

ignoredRows = totalRows;

}

} else {

final List<Integer> beforePkMapping = insertIgnore.getBeforePkMapping();

final List<ColumnMeta> pkColumnMetas = insertIgnore.getPkColumnMetas();

// 计算大部份填入值的交集

final Map<String, List<List<Object>>> tableDeletePks =

getRowsToBeRemoved(tableName, tableInsertedValues, beforePkMapping, pkColumnMetas);

affectedRows -=

removeInserted(insertIgnore, schemaName, tableName, isBroadcast, insertEc, tableDeletePks);

if (returnIgnored) {

ignoredRows +=

Optional.ofNullable(tableDeletePks.get(insertIgnore.getLogicalTableName())).map(List::size)

.orElse(0);

}

}

handlerParams.optimizedWithReturning = true;

if (returnIgnored) {

return ignoredRows;

} else {

return affectedRows;

}

} else {

handlerParams.optimizedWithReturning = false;

}

// …

}

最后以一个范例来展现 RETURNING 优化的继续执行业务流程与方法论继续执行的相同。通过 /+TDDL:CMD_EXTRA(DML_USE_RETURNING=TRUE)/ 这条 HINT,用户能手动控制与否使用 RETURNING 优化。

首先建表并填入两条统计数据:

CREATE TABLE `t` (

`id` int(11) NOT NULL,

`a` int(11) NOT NULL,

`b` int(11) NOT NULL,

PRIMARY KEY (`id`),

UNIQUE GLOBAL KEY `g_i_a` (`a`) COVERING (`id`) DBPARTITION BY HASH(`a`)

) DBPARTITION BY HASH(`id`);

INSERT INTO t VALUES (1,3,3);

再继续执行两条 INSERT IGNORE:

INSERT IGNORE INTO t VALUES (1,2,3),(2,3,4),(3,4,5);

其中 (1,2,3) 与 (1,3,3) 换行符武装冲突,(2,3,4) 与 (1,3,3) 对 Unique Key g_i_a 武装冲突。假如是 RETURNING 优化:

能看到 PolarDB-X 先展开了 INSERT IGNORE,再将多填入的统计数据删掉:(1,2,3) 在主单上武装冲突在 UGSI 上获得成功填入,(2,3,4) 在 UGSI 上武装冲突在主单上获得成功填入,因而依次印发相关联的 DELETE 到 UGSI 和主单上。

假如关闭 RETURNING 优化,方法论继续执行:

能看到 PolarDB-X 先展开了 SELECT,再将没武装冲突的统计数据 (3,4,5) 填入。

小结

责任编辑如是说了 PolarDB-X 中 INSERT IGNORE 的继续执行业务流程。除了 INSERT IGNORE 之外,还有一些 DML 语句在继续执行时也须要展开重复值的推论,比如 REPLACE、INSERT ON DUPLICATE KEY UPDATE 等,这些语句在有 GSI 的情形下均采用了方法论继续执行的形式,即先展开 SELECT 再展开判重、更新等操作,感兴趣的听众能另行写作相关代码。

欢迎关注PolarDB-X知乎机构号,写作更多技术好文。

原文链接:https://click.aliyun.com/m/1000362575/

责任编辑为阿里云原创内容,未经允许不得转载。

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务