Doris写入数据异常提示actual column number in csv file is less than schema column number

版本信息:

  • Flink 1.17.1
  • Doris 1.2.3
  • Flink Doris Connector 1.4.0

写入方式

采用 String 数据流,依照社区网站的样例代码,在sink之前将数据转换为DataStream,分隔符采用"t"。

运行异常

通过Stream Load返回结果json中的ErrorUrl可以看到如题的异常

Reason: actual column number in csv file is less than schema column number. actual number: 10, ..., schema column number: 11; src line: [...] 

数据库表明明只有10个字段,提示schema column number却是11个。是自己眼花数错字段了吗?经过反复确认及同事确认,没有错,目标表就是10个字段,我写入的也是10个字段,是Flink Doris Connector 的bug吗?

分析过程

既然怀疑是bug,那就去扒代码。
实际数据写入逻辑封装在org.apache.doris.flink.sink.writer.DorisWriter,该类实现了org.apache.flink.api.connector.sink.SinkWriter接口。查看该类发现,写入Doris的过程实际是使用微批写入的。

    @Override     public void write(IN in, Context context) throws IOException {         checkLoadException();         byte[] serialize = serializer.serialize(in);         if(Objects.isNull(serialize)){             //ddl record             return;         }         if(!loading) {             //Start streamload only when there has data             dorisStreamLoad.startLoad(currentLabel);             loading = true;         }         dorisStreamLoad.writeRecord(serialize);     }     @Override     public List<DorisCommittable> prepareCommit(boolean flush) throws IOException {         if(!loading){             //There is no data during the entire checkpoint period             return Collections.emptyList();         }         // disable exception checker before stop load.         loading = false;         Preconditions.checkState(dorisStreamLoad != null);         RespContent respContent = dorisStreamLoad.stopLoad(currentLabel);         if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {             String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL());             throw new DorisRuntimeException(errMsg);         }         if (!executionOptions.enabled2PC()) {             return Collections.emptyList();         }         long txnId = respContent.getTxnId();         return ImmutableList.of(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId));     } 

每一条记录都会触发write操作,从上述代码可以看到根据boolean变量loading的值,程序将会触发dorisStreamLoad.startLoad(currentLabel);,而loading的状态在preCommit方法中进行修改,而preCommit是在checkpoint时触发,所以数据提交动作是通过checkpoint触发的。查看startLoad源代码

     /**      * start write data for new checkpoint.      * @param label      * @throws IOException      */     public void startLoad(String label) throws IOException{         loadBatchFirstRecord = true;         HttpPutBuilder putBuilder = new HttpPutBuilder();         recordStream.startInput();         LOG.info("stream load started for {} on host {}", label, hostPort);         try {             InputStreamEntity entity = new InputStreamEntity(recordStream);             putBuilder.setUrl(loadUrlStr)                     .baseAuth(user, passwd)                     .addCommonHeader()                     .addHiddenColumns(enableDelete)                     .setLabel(label)                     .setEntity(entity)                     .addProperties(streamLoadProp);             if (enable2PC) {                putBuilder.enable2PC();             }             pendingLoadFuture = executorService.submit(() -> {                 LOG.info("start execute load");                 return httpClient.execute(putBuilder.build());             });         } catch (Exception e) {             String err = "failed to stream load data with label: " + label;             LOG.warn(err, e);             throw e;         }     } 

DorisStreamLoad类负责将数据实际写入Doris,在上面的代码中我看到了一个陌生的词汇HiddenColumns,“隐藏列”,什么是隐藏列?.addHiddenColumns(enableDelete)的参数enableDelete 是一个boolean值,继续扒代码发现,默认值enableDelete = true;,addHiddenColumn(true)?是否意味着我的put操作数据中必须包含隐藏列?继续扒

    public HttpPutBuilder addHiddenColumns(boolean add) {         if(add){             header.put("hidden_columns", LoadConstants.DORIS_DELETE_SIGN);         }         return this;     } 

在http请求header中添加了一个配置,似乎是指明了"hidden_columns"="DORIS_DELETE_SIGN",看着好像是一个列名称,使用IDEA的跟踪调用功能,查看下哪里用到了这个变量。
Doris写入数据异常提示actual column number in csv file is less than schema column number
跟踪这些代码更确信,这是一个列名称。我的10列加上这一列就是11列啊,设置enableDelete = false,是否意味着我的put操作不再包含这一隐含列?

解决方案

修改构造DorisSink的代码添加.setDeletable(false);

        DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();         executionBuilder.setLabelPrefix(labelPrefix) //streamload label prefix                 .setDeletable(false); 

重新运行代码,写入成功,问题解决。

总结

出现该异常是因为,Flink Doris Connector 在构造Sink时默认用户写入数据中包含了隐藏列__DORIS_DELETE_SIGN__
尽管问题解决了,但是还是有很多疑问,什么是隐藏列,__DORIS_DELETE_SIGN__这个隐藏列是什么意思,从前面的代码中可以看出其取值为0或1,导入数据时为什么默认需要传递该列,该列在最前面还是在最后面?不传递该列是否会有问题?

发表评论

评论已关闭。

相关文章