TenantLineInnerInterceptor源码解读

一、引言

TenantLineInnerInterceptor是MyBatis-Plus中的一个拦截器类,位于com.baomidou.mybatisplus.extension.plugins.inner.TenantLineInnerInterceptor,通过MyBatis-Plus的插件机制调用,用于实现表级的多租户功能。

本文基于MyBatis-Plus的3.5.9版本的源码,并fork了代码: https://github.com/changelzj/mybatis-plus/tree/lzj-3.5.9

public class TenantLineInnerInterceptor  extends BaseMultiTableInnerInterceptor implements InnerInterceptor {      private TenantLineHandler tenantLineHandler;      @Override     public void beforeQuery(Executor executor, MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {...}      @Override     public void beforePrepare(StatementHandler sh, Connection connection, Integer transactionTimeout) {...}      @Override     protected void processSelect(Select select, int index, String sql, Object obj) {...}      @Override     protected void processInsert(Insert insert, int index, String sql, Object obj) {...}      @Override     protected void processUpdate(Update update, int index, String sql, Object obj) {...}      @Override     protected void processDelete(Delete delete, int index, String sql, Object obj) {...}      protected void processInsertSelect(Select selectBody, final String whereSegment) {...}      protected void appendSelectItem(List<SelectItem<?>> selectItems) {...}      protected Column getAliasColumn(Table table) {...}      @Override     public void setProperties(Properties properties) {...}      @Override     public Expression buildTableExpression(final Table table, final Expression where, final String whereSegment) {...} } 

多租户和数据权限DataPermissionInterceptor的实现原理是类似的,租户本质上也是一种特殊的数据权限,不同于数据权限的是对于涉及租户的表的增、删、改、查四种操作,都需要对SQL语句进行处理,实现原理是执行SQL前进行拦截,并获取要执行的SQL,然后解析SQL语句中的表,遇到需要租户隔离的表就要进行处理,对于查询、删除和更新的场景,就在现有的SQL条件中追加一个tenant_id = ?的条件,获取当前操作的用户或要执行的某种任务所属的租户ID赋值给tenant_id,对于添加操作,则是将tenant_id字段加入到INSERT列表中并赋值。

TenantLineInnerInterceptor类也像数据权限插件一样继承了用于解析和追加条件的BaseMultiTableInnerInterceptor类,但是BaseMultiTableInnerInterceptor主要是提供了对查询SQL的解析重写能力供插件类使用,本类对于添加数据的场景采用自己实现的解析和重写INSERT SQL的逻辑。

TenantLineInnerInterceptor需要一个TenantLineHandler类型的租户处理器,TenantLineHandler是一个接口,用于给TenantLineInnerInterceptor判断某个表是否需要租户隔离,以及获取租户ID值表达式、租户字段名以及要执行的SQL的列中如果已经包含租户ID字段是否继续,我们使用MyBatis-Plus的租户插件时,需要实现这个接口并在回调方法中将这些信息封装好后返回。

com.baomidou.mybatisplus.extension.plugins.handler.TenantLineHandler

public interface TenantLineHandler {      /**      * 获取租户 ID 值表达式,只支持单个 ID 值      * <p>      *      * @return 租户 ID 值表达式      */     Expression getTenantId();      /**      * 获取租户字段名      * <p>      * 默认字段名叫: tenant_id      *      * @return 租户字段名      */     default String getTenantIdColumn() {         return "tenant_id";     }      /**      * 根据表名判断是否忽略拼接多租户条件      * <p>      * 默认都要进行解析并拼接多租户条件      *      * @param tableName 表名      * @return 是否忽略, true:表示忽略,false:需要解析并拼接多租户条件      */     default boolean ignoreTable(String tableName) {         return false;     }      /**      * 忽略插入租户字段逻辑      *      * @param columns        插入字段      * @param tenantIdColumn 租户 ID 字段      * @return      */     default boolean ignoreInsert(List<Column> columns, String tenantIdColumn) {         return columns.stream().map(Column::getColumnName).anyMatch(i -> i.equalsIgnoreCase(tenantIdColumn));     } }  

二、主要源码解读

本文指定租户ID为1001,对各种结构的INSERT SQL解析重写过程进行解读

TenantLineHandler handler = new TenantLineHandler() {     @Override     public Expression getTenantId() {         return new LongValue(1001);     } }; 

2.1 beforeQuery/beforePrepare

逻辑和DataPermissionInterceptor中的实现基本一致,唯一不同的是,租户的实现需要对INSERT类型的SQL进行解析重写。

 @Override public void beforeQuery(Executor executor, MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {     if (InterceptorIgnoreHelper.willIgnoreTenantLine(ms.getId())) {         return;     }     PluginUtils.MPBoundSql mpBs = PluginUtils.mpBoundSql(boundSql);     mpBs.sql(parserSingle(mpBs.sql(), null)); } 
@Override public void beforePrepare(StatementHandler sh, Connection connection, Integer transactionTimeout) {     PluginUtils.MPStatementHandler mpSh = PluginUtils.mpStatementHandler(sh);     MappedStatement ms = mpSh.mappedStatement();     SqlCommandType sct = ms.getSqlCommandType();     if (sct == SqlCommandType.INSERT || sct == SqlCommandType.UPDATE || sct == SqlCommandType.DELETE) {         if (InterceptorIgnoreHelper.willIgnoreTenantLine(ms.getId())) {             return;         }         PluginUtils.MPBoundSql mpBs = mpSh.mPBoundSql();         mpBs.sql(parserMulti(mpBs.sql(), null));     } }  

2.2 processSelect

对SELECT语句的解析和重写,已经在父类BaseMultiTableInnerInterceptor中实现

@Override protected void processSelect(Select select, int index, String sql, Object obj) {     final String whereSegment = (String) obj;     processSelectBody(select, whereSegment);     List<WithItem> withItemsList = select.getWithItemsList();     if (!CollectionUtils.isEmpty(withItemsList)) {         withItemsList.forEach(withItem -> processSelectBody(withItem, whereSegment));     } }  

2.3 processInsert

该方法是本类中一个很重要的方法,用于对INSERT语句进行解析和重写以实现租户隔离。

@Override protected void processInsert(Insert insert, int index, String sql, Object obj) {     if (tenantLineHandler.ignoreTable(insert.getTable().getName())) {         // 过滤退出执行         return;     }     List<Column> columns = insert.getColumns();     if (CollectionUtils.isEmpty(columns)) {         // 针对不给列名的insert 不处理         return;     }     String tenantIdColumn = tenantLineHandler.getTenantIdColumn();     if (tenantLineHandler.ignoreInsert(columns, tenantIdColumn)) {         // 针对已给出租户列的insert 不处理         return;     }     columns.add(new Column(tenantIdColumn));     Expression tenantId = tenantLineHandler.getTenantId();     // fixed gitee pulls/141 duplicate update     List<UpdateSet> duplicateUpdateColumns = insert.getDuplicateUpdateSets();     if (CollectionUtils.isNotEmpty(duplicateUpdateColumns)) {         EqualsTo equalsTo = new EqualsTo();         equalsTo.setLeftExpression(new StringValue(tenantIdColumn));         equalsTo.setRightExpression(tenantId);         duplicateUpdateColumns.add(new UpdateSet(new Column(tenantIdColumn), tenantId));     }      Select select = insert.getSelect();     if (select instanceof PlainSelect) { //fix github issue 4998  修复升级到4.5版本的问题         this.processInsertSelect(select, (String) obj);     } else if (insert.getValues() != null) {         // fixed github pull/295         Values values = insert.getValues();         ExpressionList<Expression> expressions = (ExpressionList<Expression>) values.getExpressions();         if (expressions instanceof ParenthesedExpressionList) {             expressions.addExpression(tenantId);         } else {             if (CollectionUtils.isNotEmpty(expressions)) {//fix github issue 4998 jsqlparse 4.5 批量insert ItemsList不是MultiExpressionList 了,需要特殊处理                 int len = expressions.size();                 for (int i = 0; i < len; i++) {                     Expression expression = expressions.get(i);                     if (expression instanceof Parenthesis) {                         ExpressionList rowConstructor = new RowConstructor<>()                             .withExpressions(new ExpressionList<>(((Parenthesis) expression).getExpression(), tenantId));                         expressions.set(i, rowConstructor);                     } else if (expression instanceof ParenthesedExpressionList) {                         ((ParenthesedExpressionList) expression).addExpression(tenantId);                     } else {                         expressions.add(tenantId);                     }                 }             } else {                 expressions.add(tenantId);             }         }     } else {         throw ExceptionUtils.mpe("Failed to process multiple-table update, please exclude the tableName or statementId");     } }  

首先判断if (CollectionUtils.isEmpty(columns)):如SQL没有指明要更新的列,则不处理

然后判断if (tenantLineHandler.ignoreInsert(columns, tenantIdColumn)),如要执行的SQL中已经包含租户ID字段,则可能是已经明确指定了具体的租户ID,同样不处理

然后调用tenantLineHandlergetTenantIdColumn()获取租户列的字段名,先把租户的字段名添加到INSERT INTO后面原有的字段名的最后

之后针对不同结构的SQL,会分别走到不同的分支,针对几种常见的INSERT SQL,分别进行解读:

2.3.1 最常见的新增SQL语句

insert into t_user (name, age) values ('liming', 15) 

首先会尝试获取INSERT语句中的查询结构Select select = insert.getSelect(),并判断是否带有查询结构,这种情况是不带查询结构的,会走到else if (insert.getValues() != null)这个分支,然后insert.getValues()获取代表一组值的对象values

紧接着获取values的结构ExpressionList<Expression> expressions = (ExpressionList<Expression>) values.getExpressions()得到('liming', 15)

然后,通过if (expressions instanceof ParenthesedExpressionList)判断是否为带着括号的Expression结构,很显然是,通过expressions.addExpression(tenantId);将租户ID的值追加到('liming', 15)的最后,得到SQL:

INSERT INTO t_user (name, age, tenant_id) VALUES ('liming', 15, 1001) 

2.3.2 批量新增数据的SQL语句

insert into t_user (name, age) values ('liming', 15), ('zhaoying', 16) 

与2.3.1不同的是,这种SQL在通过if (expressions instanceof ParenthesedExpressionList)判断是否为带着括号的Expression结构时结果为false,因为这种SQL的VALUES部分结构是('liming', 15), ('zhaoying', 16)显然不符合,因此会走到else分支,分别取出其中每个元素(...),再去判断每个元素是否为带着括号的Expression结构,显然每个(...)都符合,因此对每个(...)中最后一个值后面再追加上租户ID即可,相当于将大的拆散分别处理,最终得到SQL:

INSERT INTO t_user (name, age, tenant_id)  VALUES ('liming', 15, 1001), ('zhaoying', 16, 1001) 

2.3.3 ON DUPLICATE KEY UPDATE的SQL

INSERT INTO table_name (col1, col2)  VALUES (val1, val2)  ON DUPLICATE KEY UPDATE col1 = val3, col2 = col4 + 1; 

这种SQL,在if (CollectionUtils.isNotEmpty(duplicateUpdateColumns))处为true,属于添加发生冲突时对冲突的字段进行更新的SQL结构,会先进入这个if分支处理ON DUPLICATE的部分,意思是如果insert.getDuplicateUpdateSets()不为空,则会先将tenant_id = 1001追加到ON DUPLICATE KEY UPDATE后面,再后面的VALUES (val1, val2, 1001)的结构和2.3.1处理方式相同

INSERT INTO table_name (col1, col2, tenant_id)  VALUES (val1, val2, 1001)  ON DUPLICATE KEY UPDATE col1 = val3, col2 = col4 + 1, tenant_id = 1001 

2.3.4 INSERT SELECT的SQL

INSERT INTO table_name (col1, col2) SELECT col1, col2 FROM another_table  

与2.3.1情况相反,这种情况是带查询结构的,这种SQL要添加的值在一个查询结果集中,该方法在获取查询结构Select select = insert.getSelect()并判断是否带有查询结构时,就会走到if (select instanceof PlainSelect)中,调用processInsertSelect()方法并将SQL上获取到的Select结构传入,对SQL中的查询结构进行处理,processInsertSelect方法解读详见2.6,最终得到SQL:

INSERT INTO table_name (col1, col2, tenant_id)  SELECT col1, col2, tenant_id FROM another_table WHERE tenant_id = 1001 

2.3.5 SELECT INTO的结构

SELECT col1,col2  INTO table_name2 FROM table_name1 

这种会被当成select语句进行处理

2.4 processUpdate

该方法用于解析重写update语句,针对租户的processUpdate方法和数据权限的实现类似但也有区别

/**  * update 语句处理  */ @Override protected void processUpdate(Update update, int index, String sql, Object obj) {     final Table table = update.getTable();     if (tenantLineHandler.ignoreTable(table.getName())) {         // 过滤退出执行         return;     }     List<UpdateSet> sets = update.getUpdateSets();     if (!CollectionUtils.isEmpty(sets)) {         sets.forEach(us -> us.getValues().forEach(ex -> {             if (ex instanceof Select) {                 processSelectBody(((Select) ex), (String) obj);             }         }));     }     update.setWhere(this.andExpression(table, update.getWhere(), (String) obj)); }  

用于解析和重写update语句的租户逻辑,对于常规的update语句处理较为简单,直接在where后面追加租户过滤条件:update.setWhere(this.andExpression(table, update.getWhere(), (String) obj)),例如:

UPDATE user SET username = 5 WHERE id = 1  

重写后:

UPDATE user SET username = 5 WHERE id = 1 AND tenant_id = 1001 

和数据权限拦截器插件的实现不同的是,多租户对于update语句更新后的值是子查询的情况进行了额外处理,对子查询SQL也进行了解析和重写,通过sets.forEach(us -> us.getValues().forEach(ex -> {获取所有要更新的值并遍历,如果某个值属于子查询结构(ex instanceof Select)则处理子查询,例如:

UPDATE user  SET username = (SELECT name FROM employee WHERE emp_no = 'UA001')  WHERE id = 1  

重写后:

UPDATE user  SET username = (SELECT name FROM employee WHERE emp_no = 'UA001' AND tenant_id = 1001)  WHERE id = 1 AND tenant_id = 1001 

2.5 processDelete

删除语句,处理较为简单,处理方式类似简单的update语句,直接追加过滤条件在where后面即可

 /**  * delete 语句处理  */ @Override protected void processDelete(Delete delete, int index, String sql, Object obj) {     if (tenantLineHandler.ignoreTable(delete.getTable().getName())) {         // 过滤退出执行         return;     }     delete.setWhere(this.andExpression(delete.getTable(), delete.getWhere(), (String) obj)); }  

2.6 processInsertSelect

该方法用于对INSERT...SELECT...结构后面的SELECT部分进行处理

 /**  * 处理 insert into select  * <p>  * 进入这里表示需要 insert 的表启用了多租户,则 select 的表都启动了  *  * @param selectBody SelectBody  */ protected void processInsertSelect(Select selectBody, final String whereSegment) {     if(selectBody instanceof PlainSelect){         PlainSelect plainSelect = (PlainSelect) selectBody;         FromItem fromItem = plainSelect.getFromItem();         if (fromItem instanceof Table) {             // fixed gitee pulls/141 duplicate update             processPlainSelect(plainSelect, whereSegment);             appendSelectItem(plainSelect.getSelectItems());         } else if (fromItem instanceof Select) {             Select subSelect = (Select) fromItem;             appendSelectItem(plainSelect.getSelectItems());             processInsertSelect(subSelect, whereSegment);         }     } else if(selectBody instanceof ParenthesedSelect){         ParenthesedSelect parenthesedSelect = (ParenthesedSelect) selectBody;         processInsertSelect(parenthesedSelect.getSelect(), whereSegment);      } } 

解读:

1.表:if (fromItem instanceof Table)针对的是SELECT部分查询的是表的情况

INSERT INTO table_name (col1, col2) SELECT col1, col2 FROM another_table 

直接调用父类processPlainSelect对表where条件追加租户过滤条件,再将租户ID字段名添加到查询字段名列表中即可,得到如下SQL:

INSERT INTO table_name (col1, col2, tenant_id)  SELECT col1, col2, tenant_id FROM another_table WHERE tenant_id = 1001 

2.子查询:else if (fromItem instanceof Select)针对的是SELECT部分查询的是子查询的情况

INSERT INTO table_name (col1, col2)  SELECT col1, col2 FROM (select col1, col2 from  another_table) t 

appendSelectItem()将租户ID字段名添加到查询字段名列表中,然后获取子查询再递归调用当前processInsertSelect方法,如果子查询中查询的是表,则将租户ID字段名添加到子查询的字段名列表中然后追加租户过滤条件在子查询的where条件上,如果子查询中的查询来源还是子查询,则继续递归解析,最终会得到如下SQL:

INSERT INTO table_name (col1, col2, tenant_id)  SELECT col1, col2, tenant_id FROM (     SELECT col1, col2, tenant_id FROM another_table WHERE tenant_id = 1001 ) t 

2.7 appendSelectItem

该方法配合processInsertSelect使用,用于将租户ID字段名插入到select后的字段名列表中,使得结果集可以直接作为要添加的值进行批量insert,如果select的字段是模糊的select *表示的,则不处理,直接跳过

/**  * 追加 SelectItem  *  * @param selectItems SelectItem  */ protected void appendSelectItem(List<SelectItem<?>> selectItems) {     if (CollectionUtils.isEmpty(selectItems)) {         return;     }     if (selectItems.size() == 1) {         SelectItem item = selectItems.get(0);         Expression expression = item.getExpression();         if (expression instanceof AllColumns) {             return;         }     }     selectItems.add(new SelectItem<>(new Column(tenantLineHandler.getTenantIdColumn()))); } 

结束语

该类是MyBatis-Plus的多租户插件实现源码,基本上和数据权限插件的实现逻辑类似,本质上讲租户也是一种特殊的数据权限,根据租户的业务逻辑,本类针对INSERT SQL的解析和重写进行了实现,并对UPDATE SQL做了和数据权限插件不一样的处理:针对更新后的值是子查询的情况也对子查询SQL进行了租户隔离。

原文首发:https://blog.liuzijian.com/post/mybatis-plus-source-tenant-line-inner-interceptor.html

发表评论

评论已关闭。

相关文章