【源码改造】flink JDBC connector 源码改造之 类型转换 java.time.LocalDateTime cannot be cast to java.sql.Timestamp

导读:本篇文章讲解 【源码改造】flink JDBC connector 源码改造之 类型转换 java.time.LocalDateTime cannot be cast to java.sql.Timestamp,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

一、问题概述

由于mysql 驱动版本更新,导致flink在消费mysql数据时不能正确将datetime类型数据转换为flink的Timestamp。
在这里插入图片描述

通过降低mysql驱动到8.0.18自测发现问题不再出现
在这里插入图片描述

但低版本mysql驱动会有漏洞扫描问题,故此方式不能采取,进而考虑修改源码。

使用到的flink sql 如下:

CREATE TABLE `test_gao_0519_02` (
   `birthday` TIMESTAMP
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/dataflow_test',
   'username' = 'root',
   'table-name' = 'auto_mysql22'
 );

CREATE TABLE `test_gao` (
   `birthday` TIMESTAMP
 ) WITH (
   'connector' = 'print'
 );

INSERT INTO `test_gao`
 (SELECT `birthday`
 FROM `test_gao_0519_02`);

二、问题分析与解决

1. 源码分析和模型简化

1.1. 简化并解决类型转换问题

Flink jdbc connector对于类型转换逻辑主要在AbstractJdbcRowConverter类实现,通过debug定位到类型转换的错误为,如下截图位置
在这里插入图片描述

通过debug分析是因为mysql-connector-java-8.0.28对于mysql的datetime类型识别为localdatetime,使得数据强转Timestamp类型失败,那既然这样可以将问题简化为:

通过JDBC消费mysql中datetime类型识别为localdatetime,而不是Timestamp

测试jdbc程序

@Log4j2
public class FirstExample {
    // JDBC driver name and database URL
    static final String DB_URL = "jdbc:mysql://localhost:3306/dataflow_test";
    //  Database credentials
    static final String USER = "root";
    static final String PASS = "11111111";

    public static void main(String[] args) {
        Connection conn = null;
        PreparedStatement stmt1 = null;
        PreparedStatement stmt2 = null;
        try {
            //STEP 2: Register JDBC driver
            Class.forName("com.mysql.cj.jdbc.Driver");
            //STEP 3: Open a connection
            log.info("Connecting to database...");
            conn = DriverManager.getConnection(DB_URL, USER, PASS);

            //STEP 4: Execute a query
            log.info("Creating statement...");
            String sql = "SELECT * FROM dataflow_test.auto_mysql22";
            stmt1 = conn.prepareStatement(sql);
            ResultSet resultSet = stmt1.executeQuery();


            //STEP 5: Extract data from result set
            while (resultSet.next()) {
                //Retrieve by column name
                Date birthday = resultSet.getDate("birthday");
                Object object = resultSet.getObject(3);
                LocalDateTime object1 = (LocalDateTime) object;
                long time = Timestamp.valueOf(object1).getTime();
                System.out.println(time);

                //Display values
                log.info("birthday: {}", birthday);
            }
            //STEP 6: Clean-up environment
            。。。
}

看到数据能够通过JDBC消费成功,那到此就解决第一步:数据类型的转换。类型转换的关键代码是:

Date birthday = resultSet.getDate("birthday");
Object object = resultSet.getObject(3);
LocalDateTime object1 = (LocalDateTime) object;
long time = Timestamp.valueOf(object1).getTime();

2. 源码改造

解决了前面一小步数据类型的转换,下面尝试修改源码。
思路很简单,就是在AbstractJdbcRowConverter类中对于timestamp这个类型下的数据类型转换逻辑下进行适配,添加以下代码

return new JdbcDeserializationConverter() {
                    @Override
                    public Object deserialize(Object jdbcField) throws SQLException {
                        if (jdbcField instanceof LocalDateTime) {
                            return TimestampData.fromTimestamp(Timestamp.valueOf((LocalDateTime) jdbcField));
                        }
                        return TimestampData.fromTimestamp((Timestamp) jdbcField);
                    }
                };

需要注意的两点:
1.修改的程序不能影响原有源码执行逻辑
2.能够预想到修改完逻辑会影响的范围

2.1. 修改尝试

适配完逻辑之后,适配发现AbstractJdbcRowConverter似乎并不能兼容我对源码的修改,测试发现直接报类似类型转换错误。。。
在这里插入图片描述
观察到AbstractJdbcRowConverter是一个abstract类,那继续尝试在MySQLRowConverter,完善类型转换逻辑。

2.2. 正解


package org.apache.flink.connector.jdbc.internal.converter;
。。。

/**
 * Runtime converter that responsible to convert between JDBC object and Flink internal object for
 * MySQL.
 */
public class MySQLRowConverter extends AbstractJdbcRowConverter {

    private static final long serialVersionUID = 1L;

    @Override
    public String converterName() {
        return "MySQL";
    }

    public MySQLRowConverter(RowType rowType) {
        super(rowType);
    }


    @Override
    protected JdbcDeserializationConverter createNullableInternalConverter(LogicalType type) {
        return wrapIntoNullableInternalConverter(createInternalConverter(type));
    }


    /**
     * 重新实现JdbcDeserializationConverter实例:添加处理 数据为null时的处理逻辑。
     *
     * @param jdbcDeserializationConverter
     * @return
     */
    @Override
    protected JdbcDeserializationConverter wrapIntoNullableInternalConverter(
            JdbcDeserializationConverter jdbcDeserializationConverter) {
        return val -> {
            if (val == null) {
                return null;
            } else {
                return jdbcDeserializationConverter.deserialize(val);
            }
        };
    }


    /**
     * 用于 jdbc 数据 转为 row type 的实例
     * <p>
     * 用于识别并处理mysql自己的数据类型
     *
     * @param type
     * @return
     */
    @Override
    protected JdbcDeserializationConverter createInternalConverter(LogicalType type) {
        switch (type.getTypeRoot()) {
            case NULL:
                return val -> null;
            case BOOLEAN:
            case FLOAT:
            case DOUBLE:
            case INTERVAL_YEAR_MONTH:
            case INTERVAL_DAY_TIME:
                return val -> val;
            case TINYINT:
                return val -> ((Integer) val).byteValue();
            case SMALLINT:
                // Converter for small type that casts value to int and then return short value,
                // since
                // JDBC 1.0 use int type for small values.
                return val -> val instanceof Integer ? ((Integer) val).shortValue() : val;
            case INTEGER:
                return val -> val;
            case BIGINT:
                return val -> val;
            case DECIMAL:
                final int precision = ((DecimalType) type).getPrecision();
                final int scale = ((DecimalType) type).getScale();
                // using decimal(20, 0) to support db type bigint unsigned, user should define
                // decimal(20, 0) in SQL,
                // but other precision like decimal(30, 0) can work too from lenient consideration.
                return val ->
                        val instanceof BigInteger
                                ? DecimalData.fromBigDecimal(
                                new BigDecimal((BigInteger) val, 0), precision, scale)
                                : DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
            case DATE:
                return val -> (int) (((Date) val).toLocalDate().toEpochDay());
            case TIME_WITHOUT_TIME_ZONE:
                return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
            case TIMESTAMP_WITH_TIME_ZONE:
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                return new JdbcDeserializationConverter() {
                    @Override
                    public Object deserialize(Object jdbcField) throws SQLException {
                        if (jdbcField instanceof LocalDateTime) {
                            return TimestampData.fromTimestamp(Timestamp.valueOf((LocalDateTime) jdbcField));
                        }
                        return TimestampData.fromTimestamp((Timestamp) jdbcField);
                    }
                };

            case CHAR:
            case VARCHAR:
                return val -> StringData.fromString((String) val);
            case BINARY:
            case VARBINARY:
                return val -> (byte[]) val;
            case ARRAY:
            case ROW:
            case MAP:
            case MULTISET:
            case RAW:
            default:
                throw new UnsupportedOperationException("Unsupported type:" + type);
        }
    }
}

再次测试发现:数据消费成功了!
在这里插入图片描述

三、总结-bug自测收尾

类型转换并消费成功后,还需要考虑是否能再次将数据写入到数据库中,测试:

CREATE TABLE `test_gao_0519_02` (
   `birthday` TIMESTAMP
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/dataflow_test',
   'username' = 'root',
   'table-name' = 'auto_mysql22'
 );

CREATE TABLE `test_gao` (
   `birthday` TIMESTAMP
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/dataflow_test',
   'username' = 'root',
   'table-name' = 'auto_mysql22'
 );
INSERT INTO `test_gao`
 (SELECT `birthday`
 FROM `test_gao_0519_02`);

观察能消费,并能写入到数据库:
在这里插入图片描述
到此问题解决。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/65413.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!