原副标题:PolarDB-X 源代码阐释系列产品:DML 之 INSERT IGNORE 业务流程
概要: 责任编辑将更进一步如是说 PolarDB-X 中 INSERT IGNORE 的继续执行业务流程,其依照填入的表与否有 GSI 也略有变动。
译者:潜璟
在上一则源代码写作中,他们如是说了 INSERT 的继续执行业务流程。而 INSERT IGNORE 与 INSERT 相同,须要对填入值推论与否有 Unique Key 的武装冲突,并忽略有武装冲突的填入值。因而责任编辑将更进一步如是说 PolarDB-X 中 INSERT IGNORE 的继续执行业务流程,其依照填入的表与否有 GSI 也略有变动。
楚溪春继续执行
假如填入的表多于两张主表,没 GSI,所以只须要将 INSERT IGNORE 间接发送至相关联的力学单上,由 DN 另行忽视存有武装冲突的值。在此种情形下,INSERT IGNORE 的继续执行操作过程和 INSERT 大体上完全相同,听众能参照以后的源代码阅读文章。
方法论继续执行
而在有 GSI 的情形下,就无法单纯地将 INSERT IGNORE 依次印发到主表和 GSI 相关联的力学分单上,不然有可能再次出现主表和 GSI 统计数据不完全一致的情形。举个范例:
create table t1 (a int primary key, b int, global index g1(b) dbpartition by hash(b)) dbpartition by hash(a);
insert ignore into t1 values (1,1),(1,2);
对填入的四条历史记录,它在主单上坐落于同一力学表(a 完全相同),但在 GSI 上坐落于相同的力学表(b 不完全相同),假如间接印发 INSERT IGNORE 不然,主单上多于 (1,1) 能获得成功填入(换行符武装冲突),而在 GSI 上 (1,1) 和 (1,2) 都能获得成功填入,只好 GSI 比主表多了两条统计数据。
特别针对此种情形,一类软件系统是依照填入值中的 Unique Key,先到统计资料库中 SELECT 出有可能武装冲突的统计数据到 CN,接着在 CN 推论武装冲突的值并删掉。
展开 SELECT 的这时候,最单纯的形式是将大部份的是 SELECT 间接发送至主单上,但主单上可能没相关联的 Unique Key,这就导致 SELECT 的这时候会展开全表扫描,影响性能。所以在优化器阶段,他们会依照 Unique Key 是在主表还是 GSI 上定义的来确定相应的 SELECT 须要发送至主表还是 GSI,具体代码位置:
com.alibaba.polardbx.optimizer.core.planner.rule.OptimizeLogicalInsertRule#groupUkByTable
protected Map<String, List<List<String>>> groupUkByTable(LogicalInsertIgnore insertIgnore,
ExecutionContext executionContext) {
// 找到每个 Unique Key 在主表和哪些 GSI 中存有
Map<Integer, List<String>> ukAllTableMap = new HashMap<>();
for (int i = 0; i < uniqueKeys.size(); i++) {
List<String> uniqueKey = uniqueKeys.get(i);
for (Map.Entry<String, Map<String, Set<String>>> e : writableTableUkMap.entrySet()) {
String currentTableName = e.getKey().toUpperCase();
Map<String, Set<String>> currentUniqueKeys = e.getValue();
boolean found = false;
for (Set<String> currentUniqueKey : currentUniqueKeys.values()) {
if (currentUniqueKey.size() != uniqueKey.size()) {
continue;
}
boolean match = currentUniqueKey.containsAll(uniqueKey);
if (match) {
found = true;
break;
}
}
if (found) {
ukAllTableMap.computeIfAbsent(i, k -> new ArrayList<>()).add(currentTableName);
}
}
}
// 确定是在哪一个单上展开 SELECT
for (Map.Entry<Integer, List<String>> e : ukAllTableMap.entrySet()) {
List<String> tableNames = e.getValue();
if (tableNames.contains(primaryTableName.toUpperCase())) {
tableUkMap.computeIfAbsent(primaryTableName.toUpperCase(), k -> new ArrayList<>())
.add(uniqueKeys.get(e.getKey()));
} else {
final boolean onlyNonPublicGsi =
tableNames.stream().noneMatch(tn -> GlobalIndexMeta.isPublished(executionContext, sm.getTable(tn)));
boolean found = false;
for (String tableName : tableNames) {
if (!onlyNonPublicGsi && GlobalIndexMeta.isPublished(executionContext, sm.getTable(tableName))) {
tableUkMap.computeIfAbsent(tableName, k -> new ArrayList<>()).add(uniqueKeys.get(e.getKey()));
found = true;
break;
} else if (onlyNonPublicGsi && GlobalIndexMeta.canWrite(executionContext, sm.getTable(tableName))) {
tableUkMap.computeIfAbsent(tableName, k -> new ArrayList<>()).add(uniqueKeys.get(e.getKey()));
found = true;
break;
}
}
}
}
return tableUkMap;
}
而到了继续执行阶段,他们在 LogicalInsertIgnoreHandler 中处理 INSERT IGNORE。他们首先会进入 getDuplicatedValues 函数,其通过印发 SELECT 的形式查找表中已有的是武装冲突的 Unique Key 的历史记录。他们将印发的 SELECT 语句中选择的列设置为 (value_index, uk_index, pk)。其中 value_index 和 uk_index 均为的常量。
举个范例,假设有表:
CREATE TABLE `t` (
`id` int(11) NOT NULL,
`a` int(11) NOT NULL,
`b` int(11) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE GLOBAL KEY `g_i_a` (`a`) COVERING (`id`) DBPARTITION BY HASH(`a`)
) DBPARTITION BY HASH(`id`)
以及两条 INSERT IGNORE 语句:
INSERT IGNORE INTO t VALUES (1,2,3),(2,3,4),(3,4,5);
假设在 PolarDB-X 中继续执行时,其会将 Unique Key 编号为
0: id
1: g_i_a
INSERT IGNORE 语句中填入的每个值依次编号为
0: (1,2,3)
1: (2,3,4)
2: (3,4,5)
所以对 (2,3,4) 的 UNIQUE KEY 构造的 GSI 上的 SELECT 即为:
查询 GSI
SELECT 1 as `value_index`, 1 as `uk_index`, `id`
FROM `g_i_a_xxxx`
WHERE `a` in 3;
假设表中已经存有 (5,3,6),所以这条 SELECT 的返回结果即为 (1,1,5)。此外,由于相同的 Unique Key 的 SELECT 返回格式是完全相同的,所以他们会将同一力学库上相同的SELECT 查询 UNION 起来发送,以一次性得到多个结果,减少 CN 和 DN 之间的交互次数。只要某个 Unique Key 有重复值,他们就能依照 value_index 和 uk_index 确定是填入值的哪一行的哪个 Unique Key 是重复的。
当得到大部份的是返回结果之后,他们对统计数据展开去重。他们将上一步得到的武装冲突的的值放入一个 SET 中,接着顺序扫描大部份的是每一行填入值,假如发现有重复的就跳过该行,不然就将该行也加入到 SET 中(因为填入值之间也有可能存有相互武装冲突)。去重完毕之后,他们就得到了大部份不存有武装冲突的值,将这些值填入到表中之后就完成了两条 INSERT IGNORE 的继续执行。
方法论继续执行的继续执行业务流程:
com.alibaba.polardbx.repo.mysql.handler.LogicalInsertIgnoreHandler#doExecute
protected int doExecute(LogicalInsert insert, ExecutionContext executionContext,
LogicalInsert.HandlerParams handlerParams) {
// …
try {
Map<String, List<List<String>>> ukGroupByTable = insertIgnore.getUkGroupByTable();
List<Map<Integer, ParameterContext>> deduplicated;
List<List<Object>> duplicateValues;
duplicateValues = getDuplicatedValues(insertIgnore, LockMode.SHARED_LOCK, executionContext, ukGroupByTable,
(rowCount) -> memoryAllocator.allocateReservedMemory(
MemoryEstimator.calcSelectValuesMemCost(rowCount, selectRowType)), selectRowType, true,
handlerParams);
final List<Map<Integer, ParameterContext>> batchParameters =
executionContext.getParams().getBatchParameters();
// 依照上一步得到的结果,去掉 INSERT IGNORE 中的武装冲突值
deduplicated = getDeduplicatedParams(insertIgnore.getUkColumnMetas(), insertIgnore.getBeforeUkMapping(),
insertIgnore.getAfterUkMapping(), RelUtils.getRelInput(insertIgnore), duplicateValues,
batchParameters, executionContext);
if (!deduplicated.isEmpty()) {
insertEc.setParams(new Parameters(deduplicated));
} else {
// All duplicated
return affectRows;
}
// 继续执行 INSERT
try {
if (gsiConcurrentWrite) {
affectRows = concurrentExecute(insertIgnore, insertEc);
} else {
affectRows = sequentialExecute(insertIgnore, insertEc);
}
} catch (Throwable e) {
handleException(executionContext, e, GeneralUtil.isNotEmpty(insertIgnore.getGsiInsertWriters()));
}
} finally {
selectValuesPool.destroy();
}
return affectRows;
}
RETURNING 优化
上一节提到的 INSERT IGNORE 的方法论继续执行形式,虽然保证了统计数据的正确性,但也使得两条 INSERT IGNORE 语句至少须要 CN 和 DN 的两次交互才能完成(第一次 SELECT,第二次 INSERT),影响了 INSERT IGNORE 的继续执行性能。
目前的 DN 已经支持了 AliSQL 的 RETURNING 优化,其能在 DN 的 INSERT IGNORE 继续执行完毕之后返回获得成功填入的值。利用这一功能,PolarDB-X 对 INSERT IGNORE 展开了更进一步的优化:间接将 INSERT IGNORE 印发,假如在主表和 GSI 上全部获得成功返回,所以就说明填入值中没武装冲突,只好就获得成功完成该条 INSERT IGNORE 的继续执行;不然就将多填入的值删掉。
继续执行时,CN 首先会依照上文中的语法印发带有 RETURNING 的力学 INSERT IGNORE 语句到 DN,比如:
call dbms_trans.returning(“a”, “insert into t1_xxxx values(1,1)”);
其中返回列是换行符,用来标识填入的一批统计数据中哪些被获得成功填入了;t1_xxxx 是方法论表 t1 的一个力学该户。当主表和 GSI 上的大部份 INSERT IGNORE 继续执行完毕之后,他们计算主表和 GSI 中获得成功填入值的交集作为最后的结果,接着删掉多填入的值。这部分代码在
com.alibaba.polardbx.repo.mysql.handler.LogicalInsertIgnoreHandler#getRowsToBeRemoved
private Map<String, List<List<Object>>> getRowsToBeRemoved(String tableName,
Map<String, List<List<Object>>> tableInsertedValues,
List<Integer> beforePkMapping,
List<ColumnMeta> pkColumnMetas) {
final Map<String, Set<GroupKey>> tableInsertedPks = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
final Map<String, List<Pair<GroupKey, List<Object>>>> tablePkRows =
new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
tableInsertedValues.forEach((tn, insertedValues) -> {
final Set<GroupKey> insertedPks = new TreeSet<>();
final List<Pair<GroupKey, List<Object>>> pkRows = new ArrayList<>();
for (List<Object> inserted : insertedValues) {
final Object[] groupKeys = beforePkMapping.stream().map(inserted::get).toArray();
final GroupKey pk = new GroupKey(groupKeys, pkColumnMetas);
insertedPks.add(pk);
pkRows.add(Pair.of(pk, inserted));
}
tableInsertedPks.put(tn, insertedPks);
tablePkRows.put(tn, pkRows);
});
// Get intersect of inserted values
final Set<GroupKey> distinctPks = new TreeSet<>();
for (GroupKey pk : tableInsertedPks.get(tableName)) {
if (tableInsertedPks.values().stream().allMatch(pks -> pks.contains(pk))) {
distinctPks.add(pk);
}
}
// Remove values which not exists in at least one insert results
final Map<String, List<List<Object>>> tableDeletePks = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
tablePkRows.forEach((tn, pkRows) -> {
final List<List<Object>> deletePks = new ArrayList<>();
pkRows.forEach(pkRow -> {
if (!distinctPks.contains(pkRow.getKey())) {
deletePks.add(pkRow.getValue());
}
});
if (!deletePks.isEmpty()) {
tableDeletePks.put(tn, deletePks);
}
});
return tableDeletePks;
}
与上一节的方法论继续执行的“悲观继续执行”相比,使用 RETURNING 优化的 INSERT IGNORE 相当于“乐观继续执行”,假如填入的值本身没武装冲突,所以两条 INSERT IGNORE 语句 CN 和 DN 间只须要一次交互即可;而在有武装冲突的情形下,他们须要印发 DELETE 语句将主表或 GSI 中多填入的值删掉,只好 CN 和 DN 间须要两次交互。能看出,即便是有武装冲突的情形,CN 和 DN 间的交互次数也不会超过上一节的方法论继续执行。因而在无法间接楚溪春的情形下,INSERT IGNORE 的继续执行策略是默认使用 RETURNING 优化继续执行。
当然 RETURNING 优化的使用也有一些限制,比如填入的 Value 有重复换行符时就无法使用,因为此种情形下无法推论具体是哪一行被获得成功填入,哪一行须要删掉;具体能写作代码中的条件判断。当无法使用 RETURNING 优化时,系统会自动选择上一节中的方法论继续执行形式继续执行该条 INSERT IGNORE 语句以保证统计数据的正确性。
使用 RETURNING 优化的继续执行业务流程:
com.alibaba.polardbx.repo.mysql.handler.LogicalInsertIgnoreHandler#doExecute
protected int doExecute(LogicalInsert insert, ExecutionContext executionContext,
LogicalInsert.HandlerParams handlerParams) {
// …
// 推论能否使用 RETURNING 优化
boolean canUseReturning =
executorContext.getStorageInfoManager().supportsReturning() && executionContext.getParamManager()
.getBoolean(ConnectionParams.DML_USE_RETURNING) && allDnUseXDataSource && gsiCanUseReturning
&& !isBroadcast && !ComplexTaskPlanUtils.canWrite(tableMeta);
if (canUseReturning) {
canUseReturning = noDuplicateValues(insertIgnore, insertEc);
}
if (canUseReturning) {
// 继续执行 INSERT IGNORE 并获得返回结果
final List<RelNode> allPhyPlan =
new ArrayList<>(replaceSeqAndBuildPhyPlan(insertIgnore, insertEc, handlerParams));
getPhysicalPlanForGsi(insertIgnore.getGsiInsertIgnoreWriters(), insertEc, allPhyPlan);
final Map<String, List<List<Object>>> tableInsertedValues =
executeAndGetReturning(executionContext, allPhyPlan, insertIgnore, insertEc, memoryAllocator,
selectRowType);
// …
// 生成 DELETE
final boolean removeAllInserted =
targetTableNames.stream().anyMatch(tn -> !tableInsertedValues.containsKey(tn));
if (removeAllInserted) {
affectedRows -=
removeInserted(insertIgnore, schemaName, tableName, isBroadcast, insertEc, tableInsertedValues);
if (returnIgnored) {
ignoredRows = totalRows;
}
} else {
final List<Integer> beforePkMapping = insertIgnore.getBeforePkMapping();
final List<ColumnMeta> pkColumnMetas = insertIgnore.getPkColumnMetas();
// 计算大部份填入值的交集
final Map<String, List<List<Object>>> tableDeletePks =
getRowsToBeRemoved(tableName, tableInsertedValues, beforePkMapping, pkColumnMetas);
affectedRows -=
removeInserted(insertIgnore, schemaName, tableName, isBroadcast, insertEc, tableDeletePks);
if (returnIgnored) {
ignoredRows +=
Optional.ofNullable(tableDeletePks.get(insertIgnore.getLogicalTableName())).map(List::size)
.orElse(0);
}
}
handlerParams.optimizedWithReturning = true;
if (returnIgnored) {
return ignoredRows;
} else {
return affectedRows;
}
} else {
handlerParams.optimizedWithReturning = false;
}
// …
}
最后以一个范例来展现 RETURNING 优化的继续执行业务流程与方法论继续执行的相同。通过 /+TDDL:CMD_EXTRA(DML_USE_RETURNING=TRUE)/ 这条 HINT,用户能手动控制与否使用 RETURNING 优化。
首先建表并填入两条统计数据:
CREATE TABLE `t` (
`id` int(11) NOT NULL,
`a` int(11) NOT NULL,
`b` int(11) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE GLOBAL KEY `g_i_a` (`a`) COVERING (`id`) DBPARTITION BY HASH(`a`)
) DBPARTITION BY HASH(`id`);
INSERT INTO t VALUES (1,3,3);
再继续执行两条 INSERT IGNORE:
INSERT IGNORE INTO t VALUES (1,2,3),(2,3,4),(3,4,5);
其中 (1,2,3) 与 (1,3,3) 换行符武装冲突,(2,3,4) 与 (1,3,3) 对 Unique Key g_i_a 武装冲突。假如是 RETURNING 优化:
能看到 PolarDB-X 先展开了 INSERT IGNORE,再将多填入的统计数据删掉:(1,2,3) 在主单上武装冲突在 UGSI 上获得成功填入,(2,3,4) 在 UGSI 上武装冲突在主单上获得成功填入,因而依次印发相关联的 DELETE 到 UGSI 和主单上。
假如关闭 RETURNING 优化,方法论继续执行:
能看到 PolarDB-X 先展开了 SELECT,再将没武装冲突的统计数据 (3,4,5) 填入。
小结
责任编辑如是说了 PolarDB-X 中 INSERT IGNORE 的继续执行业务流程。除了 INSERT IGNORE 之外,还有一些 DML 语句在继续执行时也须要展开重复值的推论,比如 REPLACE、INSERT ON DUPLICATE KEY UPDATE 等,这些语句在有 GSI 的情形下均采用了方法论继续执行的形式,即先展开 SELECT 再展开判重、更新等操作,感兴趣的听众能另行写作相关代码。
欢迎关注PolarDB-X知乎机构号,写作更多技术好文。
原文链接:https://click.aliyun.com/m/1000362575/
责任编辑为阿里云原创内容,未经允许不得转载。
