DataX二次开发——(5)基于CopyIn原理新增greenplumwriter

导读:本篇文章讲解 DataX二次开发——(5)基于CopyIn原理新增greenplumwriter,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

1 快速介绍

GreenplumWriter插件实现了写入数据到 Greenplum Database 主库目的表的功能。在底层实现上,GreenplumWriter通过JDBC连接远程 Greenplum 数据库,并执行相应的 Copy FROM 语句将数据写入 Greenplum。

GreenplumWriter面向ETL开发工程师,他们使用GreenplumWriter从数仓导入数据到Greenplum。同时 GreenplumWriter亦可以作为数据迁移工具为DBA等用户提供服务。

2 实现原理

GreenplumWriter通过 DataX 框架获取 Reader 生成的协议数据,根据你配置生成相应的SQL插入语句

  • copy from ...
注意:
1. GreenplumWriter和MysqlWriter不同,不支持配置writeMode参数。

3 代码实现

1、根目录的代码

1、根目录的papackage.xml新增打包代码

        <fileSet>
            <directory>greenplumwriter/target/datax/</directory>
            <includes>
                <include>**/*.*</include>
            </includes>
            <outputDirectory>datax</outputDirectory>
        </fileSet>

2、根目录的pom.xml新增模块

        <module>greenplumwriter</module>

2、greenplumwriter模块下的代码

模块结构:

DataX二次开发——(5)基于CopyIn原理新增greenplumwriter

package.xml

<assembly
        xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    <id></id>
    <formats>
        <format>dir</format>
    </formats>
    <includeBaseDirectory>false</includeBaseDirectory>
    <fileSets>
        <fileSet>
            <directory>src/main/resources</directory>
            <includes>
                <include>plugin.json</include>
            </includes>
            <outputDirectory>plugin/writer/greenplumwriter</outputDirectory>
        </fileSet>
        <fileSet>
            <directory>target/</directory>
            <includes>
                <include>greenplumwriter-0.0.1-SNAPSHOT.jar</include>
            </includes>
            <outputDirectory>plugin/writer/greenplumwriter</outputDirectory>
        </fileSet>
    </fileSets>

    <dependencySets>
        <dependencySet>
            <useProjectArtifact>false</useProjectArtifact>
            <outputDirectory>plugin/writer/greenplumwriter/libs</outputDirectory>
            <scope>runtime</scope>
        </dependencySet>
    </dependencySets>
</assembly>

CopyProcessor.java 

package com.alibaba.datax.plugin.writer.greenplumwriter;

import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;
import java.sql.Types;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class CopyProcessor implements Callable<Long> {
	private static final char FIELD_DELIMITER = '|';
	private static final char NEWLINE = '\n';
	private static final char QUOTE = '"';
	private static final char ESCAPE = '\\';
	private static int MaxCsvSize = 4194304;
	private static final Logger LOG = LoggerFactory.getLogger(CopyProcessor.class);

	private int columnNumber;
	private CopyWriterTask task;
	private LinkedBlockingQueue<Record> queueIn;
	private LinkedBlockingQueue<byte[]> queueOut;
	private Triple<List<String>, List<Integer>, List<String>> resultSetMetaData;

	public CopyProcessor(CopyWriterTask task, int columnNumber,
                         Triple<List<String>, List<Integer>, List<String>> resultSetMetaData, LinkedBlockingQueue<Record> queueIn,
                         LinkedBlockingQueue<byte[]> queueOut) {
		this.task = task;
		this.columnNumber = columnNumber;
		this.resultSetMetaData = resultSetMetaData;
		this.queueIn = queueIn;
		this.queueOut = queueOut;
	}

	@Override
	public Long call() throws Exception {
		Thread.currentThread().setName("CopyProcessor");
		Record record = null;

		while (true) {
			record = queueIn.poll(1000L, TimeUnit.MILLISECONDS);

			if (record == null && false == task.moreRecord()) {
				break;
			} else if (record == null) {
				continue;
			}

			if (record.getColumnNumber() != this.columnNumber) {
				// 源头读取字段列数与目的表字段写入列数不相等,直接报错
				throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
						String.format("列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 您可能配置了错误的表名称, 请检查您的配置并作出修改.",
								record.getColumnNumber(), this.columnNumber));
			}

			byte[] data = serializeRecord(record);

			if (data.length > MaxCsvSize) {
				String s = new String(data).substring(0, 100) + "...";
				LOG.warn("数据元组超过 {} 字节长度限制被忽略。" + s, MaxCsvSize);
			} else {
				queueOut.put(data);
			}
		}

		return 0L;
	}

	/**
	 * Any occurrence within the value of a QUOTE character or the ESCAPE
	 * character is preceded by the escape character.
	 */
	protected String escapeString(String data) {
		StringBuilder sb = new StringBuilder();

		for (int i = 0; i < data.length(); ++i) {
			char c = data.charAt(i);
			switch (c) {
			case 0x00:
				LOG.warn("字符串中发现非法字符 0x00,已经将其删除");
				continue;
			case QUOTE:
			case ESCAPE:
				sb.append(ESCAPE);
			}

			sb.append(c);
		}
		return sb.toString();
	}

	/**
	 * Non-printable characters are inserted as '\nnn' (octal) and '\' as '\\'.
	 */
	protected String escapeBinary(byte[] data) {
		StringBuilder sb = new StringBuilder();

		for (int i = 0; i < data.length; ++i) {
			if (data[i] == '\\') {
				sb.append('\\');
				sb.append('\\');
			} else if (data[i] < 0x20 || data[i] > 0x7e) {
				byte b = data[i];
				char[] val = new char[3];
				val[2] = (char) ((b & 07) + '0');
				b >>= 3;
				val[1] = (char) ((b & 07) + '0');
				b >>= 3;
				val[0] = (char) ((b & 03) + '0');
				sb.append('\\');
				sb.append(val);
			} else {
				sb.append((char) (data[i]));
			}
		}

		return sb.toString();
	}

	protected byte[] serializeRecord(Record record) throws UnsupportedEncodingException {
		StringBuilder sb = new StringBuilder();
		Column column;
		for (int i = 0; i < this.columnNumber; i++) {
			column = record.getColumn(i);
			int columnSqltype = this.resultSetMetaData.getMiddle().get(i);

			switch (columnSqltype) {
			case Types.CHAR:
			case Types.NCHAR:
			case Types.VARCHAR:
			case Types.LONGVARCHAR:
			case Types.NVARCHAR:
			case Types.LONGNVARCHAR: {
				String data = column.asString();

				if (data != null) {
					sb.append(QUOTE);
					sb.append(escapeString(data));
					sb.append(QUOTE);
				}

				break;
			}
			case Types.BINARY:
			case Types.BLOB:
			case Types.CLOB:
			case Types.LONGVARBINARY:
			case Types.NCLOB:
			case Types.VARBINARY: {
				byte[] data = column.asBytes();

				if (data != null) {
					sb.append(escapeBinary(data));
				}

				break;
			}
			default: {
				String data = column.asString();

				if (data != null) {
					sb.append(data);
				}

				break;
			}
			}

			if (i + 1 < this.columnNumber) {
				sb.append(FIELD_DELIMITER);
			}
		}
		sb.append(NEWLINE);
		return sb.toString().getBytes("UTF-8");
	}
}

CopyWorker.java

package com.alibaba.datax.plugin.writer.greenplumwriter;

import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
import org.postgresql.util.PSQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class CopyWorker implements Callable<Long> {
    private static final Logger LOG = LoggerFactory.getLogger(CopyWorker.class);

    private CopyWriterTask task = null;
    private Connection connection;
    private LinkedBlockingQueue<byte[]> queue = null;
    private FutureTask<Long> copyResult = null;
    private String sql = null;
    private PipedInputStream pipeIn = null;
    private PipedOutputStream pipeOut = null;
    private Thread copyBackendThread = null;

    public CopyWorker(CopyWriterTask task, String copySql, LinkedBlockingQueue<byte[]> queue) throws IOException {
        this.task = task;
        this.connection = task.createConnection();
        this.queue = queue;
        this.pipeOut = new PipedOutputStream();
        this.pipeIn = new PipedInputStream(pipeOut);
        this.sql = copySql;

        if (task.getMaxCsvLineSize() >= 1024) {
            changeCsvSizelimit(connection);
        }

        this.copyResult = new FutureTask<Long>(new Callable<Long>() {

            @Override
            public Long call() throws Exception {
                try {
                    CopyManager mgr = new CopyManager((BaseConnection) connection);
                    return mgr.copyIn(sql, pipeIn);
                } finally {
                    try {
                        pipeIn.close();
                    } catch (Exception ignore) {
                    }
                }
            }
        });

        copyBackendThread = new Thread(copyResult);
        copyBackendThread.setName(sql);
        copyBackendThread.setDaemon(true);
        copyBackendThread.start();
    }

    @Override
    public Long call() throws Exception {
        Thread.currentThread().setName("CopyWorker");

        byte[] data = null;
        try {
            while (true) {
                data = queue.poll(1000L, TimeUnit.MILLISECONDS);

                if (data == null && false == task.moreData()) {
                    break;
                } else if (data == null) {
                    continue;
                }

                pipeOut.write(data);
            }

            pipeOut.flush();
            pipeOut.close();
        } catch (Exception e) {
            try {
                ((BaseConnection) connection).cancelQuery();
            } catch (SQLException ignore) {
                // ignore if failed to cancel query
            }

            try {
                copyBackendThread.interrupt();
            } catch (SecurityException ignore) {
            }

            try {
                copyResult.get();
            } catch (ExecutionException exec) {
                if (exec.getCause() instanceof PSQLException) {
                    throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, exec.getCause());
                }
                // ignore others
            } catch (Exception ignore) {
            }

            throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
        } finally {
            try {
                pipeOut.close();
            } catch (Exception e) {
                // ignore if failed to close pipe
            }

            try {
                copyBackendThread.join(0);
            } catch (Exception e) {
                // ignore if thread is interrupted
            }

            DBUtil.closeDBResources(null, null, connection);
        }

        try {
            Long count = copyResult.get();
            return count;
        } catch (Exception e) {
            throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
        }
    }

    private void changeCsvSizelimit(Connection conn) {
        List<String> sqls = new ArrayList<String>();
        sqls.add("set gp_max_csv_line_length = " + Integer.toString(task.getMaxCsvLineSize()));

        try {
            WriterUtil.executeSqls(conn, sqls, task.getJdbcUrl(), DataBaseType.PostgreSQL);
        } catch (Exception e) {
            LOG.warn("Cannot set gp_max_csv_line_length to " + task.getMaxCsvLineSize());
        }
    }
}

CopyWriterJob.java

package com.alibaba.datax.plugin.writer.greenplumwriter;

import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

public class CopyWriterJob extends CommonRdbmsWriter.Job {
    private static final Logger LOG = LoggerFactory.getLogger(CopyWriterJob.class);
    private List<String> tables = null;

    public CopyWriterJob() {
        super(DataBaseType.PostgreSQL);
    }

    @Override
    public void init(Configuration originalConfig) {
        super.init(originalConfig);
    }

    @Override
    public void prepare(Configuration originalConfig) {
        String username = originalConfig.getString(Key.USERNAME);
        String password = originalConfig.getString(Key.PASSWORD);

        List<Object> conns = originalConfig.getList(Constant.CONN_MARK, Object.class);
        Configuration connConf = Configuration.from(conns.get(0).toString());

        // 这里的 jdbcUrl 已经 append 了合适后缀参数
        String jdbcUrl = connConf.getString(Key.JDBC_URL);
        tables = connConf.getList(Key.TABLE, String.class);

        Connection conn = DBUtil.getConnection(DataBaseType.PostgreSQL, jdbcUrl, username, password);

        List<String> sqls = new ArrayList<String>();

        for (String table : tables) {
            sqls.add("SELECT gp_truncate_error_log('" + table + "');");
            LOG.info("为 {} 清理 ERROR LOG. context info:{}.", table, jdbcUrl);
        }

        WriterUtil.executeSqls(conn, sqls, jdbcUrl, DataBaseType.PostgreSQL);
        DBUtil.closeDBResources(null, null, conn);

        super.prepare(originalConfig);
    }

    @Override
    public void post(Configuration originalConfig) {
        super.post(originalConfig);

        String username = originalConfig.getString(Key.USERNAME);
        String password = originalConfig.getString(Key.PASSWORD);

        // 已经由 prepare 进行了appendJDBCSuffix处理
        String jdbcUrl = originalConfig.getString(Key.JDBC_URL);

        Connection conn = DBUtil.getConnection(DataBaseType.PostgreSQL, jdbcUrl, username, password);

        for (String table : tables) {
            int errors = 0;
            ResultSet res = null;
            String sql = "SELECT count(*) from gp_read_error_log('" + table + "');";

            try {
                res = DBUtil.query(conn, sql, 10);
                if (res.next()) {
                    errors = res.getInt(1);
                }
                res.close();
                conn.commit();
            } catch (SQLException e) {
                LOG.debug("Fail to get error log info:" + e.getMessage());
            }

            if (errors > 0) {
                LOG.warn("加载表 {} 时发现 {} 条错误数据, 使用 \"SELECT * from gp_read_error_log('{}');\" 查看详情", table,
                        errors, table);
            }
        }

        DBUtil.closeDBResources(null, null, conn);
    }
}

CopyWriterTask.java

package com.alibaba.datax.plugin.writer.greenplumwriter;

import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class CopyWriterTask extends CommonRdbmsWriter.Task {
    private static final Logger LOG = LoggerFactory.getLogger(CopyWriterTask.class);
    private Configuration writerSliceConfig = null;
    private int numProcessor;
    private int maxCsvLineSize;
    private int numWriter;
    private int queueSize;
    private volatile boolean stopProcessor = false;
    private volatile boolean stopWriter = false;

    private CompletionService<Long> cs = null;

    public CopyWriterTask() {
        super(DataBaseType.PostgreSQL);
    }

    public String getJdbcUrl() {
        return this.jdbcUrl;
    }

    public Connection createConnection() {
        Connection connection = DBUtil.getConnection(this.dataBaseType, this.jdbcUrl, username, password);
        DBUtil.dealWithSessionConfig(connection, writerSliceConfig, this.dataBaseType, BASIC_MESSAGE);
        return connection;
    }

    private String constructColumnNameList(List<String> columnList) {
        List<String> columns = new ArrayList<String>();

        for (String column : columnList) {
            if (column.endsWith("\"") && column.startsWith("\"")) {
                columns.add(column);
            } else {
                columns.add("\"" + column + "\"");
            }
        }

        return StringUtils.join(columns, ",");
    }

    public String getCopySql(String tableName, List<String> columnList, int segment_reject_limit) {
        StringBuilder sb = new StringBuilder().append("COPY ").append(tableName).append("(")
                .append(constructColumnNameList(columnList))
                .append(") FROM STDIN WITH DELIMITER '|' NULL '' CSV QUOTE '\"' ESCAPE E'\\\\'");

        if (segment_reject_limit >= 2) {
            sb.append(" LOG ERRORS SEGMENT REJECT LIMIT ").append(segment_reject_limit).append(";");
        } else {
            sb.append(";");
        }

        String sql = sb.toString();
        return sql;
    }

    private void send(Record record, LinkedBlockingQueue<Record> queue)
            throws InterruptedException, ExecutionException {
        while (queue.offer(record, 1000, TimeUnit.MILLISECONDS) == false) {
            LOG.debug("Record queue is full, increase num_copy_processor for performance.");
            Future<Long> result = cs.poll();

            if (result != null) {
                result.get();
            }
        }
    }

    public boolean moreRecord() {
        return !stopProcessor;
    }

    public boolean moreData() {
        return !stopWriter;
    }

    public int getMaxCsvLineSize() {
        return maxCsvLineSize;
    }

    @Override
    public void startWrite(RecordReceiver recordReceiver, Configuration writerSliceConfig,
                           TaskPluginCollector taskPluginCollector) {
        this.writerSliceConfig = writerSliceConfig;
        int segment_reject_limit = writerSliceConfig.getInt("segment_reject_limit", 0);
        this.queueSize = writerSliceConfig.getInt("copy_queue_size", 1000);
        this.queueSize = Math.max(this.queueSize, 10);
        this.numProcessor = writerSliceConfig.getInt("num_copy_processor", 4);
        this.numProcessor = Math.max(this.numProcessor, 1);
        this.numWriter = writerSliceConfig.getInt("num_copy_writer", 1);
        this.numWriter = Math.max(this.numWriter, 1);
        this.maxCsvLineSize = writerSliceConfig.getInt("max_csv_line_size", 0);

        String sql = getCopySql(this.table, this.columns, segment_reject_limit);
        LinkedBlockingQueue<Record> recordQueue = new LinkedBlockingQueue<Record>(queueSize);
        LinkedBlockingQueue<byte[]> dataQueue = new LinkedBlockingQueue<byte[]>(queueSize);
        ExecutorService threadPool;

        threadPool = Executors.newFixedThreadPool(this.numProcessor + this.numWriter);
        cs = new ExecutorCompletionService<Long>(threadPool);
        Connection connection = createConnection();

        try {

            this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table,
                    constructColumnNameList(this.columns));
            for (int i = 0; i < numProcessor; i++) {
                cs.submit(new CopyProcessor(this, this.columnNumber, resultSetMetaData, recordQueue, dataQueue));
            }

            for (int i = 0; i < numWriter; i++) {
                cs.submit(new CopyWorker(this, sql, dataQueue));
            }

            Record record;
            while ((record = recordReceiver.getFromReader()) != null) {
                send(record, recordQueue);
                Future<Long> result = cs.poll();

                if (result != null) {
                    result.get();
                }
            }

            stopProcessor = true;
            for (int i = 0; i < numProcessor; i++) {
                cs.take().get();
            }

            stopWriter = true;
            for (int i = 0; i < numWriter; i++) {
                cs.take().get();
            }
        } catch (ExecutionException e) {
            throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e.getCause());
        } catch (Exception e) {
            throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
        } finally {
            threadPool.shutdownNow();
            DBUtil.closeDBResources(null, null, connection);
        }
    }
}

 GreenplumWriter.java

package com.alibaba.datax.plugin.writer.greenplumwriter;

import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.writer.Key;

import java.util.List;

/**
 * @Description: GreenplumWriter
 * @Author: chenweifeng
 * @Date: 2022年08月11日 下午4:26
 **/
public class GreenplumWriter extends Writer {

    public static class Job extends Writer.Job {
        private Configuration originalConfig = null;
        private CopyWriterJob copyJob;

        @Override
        public void init() {
            this.originalConfig = super.getPluginJobConf();

            // greenplumwriter
            String writeMode = this.originalConfig.getString(Key.WRITE_MODE);
            if (null != writeMode) {
                throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
                        String.format("写入模式(writeMode)配置有误. 因为Greenplum Database不支持配置参数项 writeMode: %s, Greenplum Database仅使用insert sql 插入数据. 请检查您的配置并作出修改.", writeMode));
            }

            int segment_reject_limit = this.originalConfig.getInt("segment_reject_limit", 0);

            if (segment_reject_limit != 0 && segment_reject_limit < 2) {
                throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, "segment_reject_limit 必须为0或者大于等于2");
            }

            this.copyJob = new CopyWriterJob();
            this.copyJob.init(this.originalConfig);
        }

        @Override
        public void prepare() {
            this.copyJob.prepare(this.originalConfig);
        }

        @Override
        public List<Configuration> split(int mandatoryNumber) {
            return this.copyJob.split(this.originalConfig, mandatoryNumber);
        }

        @Override
        public void post() {
            this.copyJob.post(this.originalConfig);
        }

        @Override
        public void destroy() {
            this.copyJob.destroy(this.originalConfig);
        }

    }

    public static class Task extends Writer.Task {
        private Configuration writerSliceConfig;
        private CopyWriterTask copyTask;

        @Override
        public void init() {
            this.writerSliceConfig = super.getPluginJobConf();
            this.copyTask = new CopyWriterTask();
            this.copyTask.init(this.writerSliceConfig);
        }

        @Override
        public void prepare() {
            this.copyTask.prepare(this.writerSliceConfig);
        }

        public void startWrite(RecordReceiver recordReceiver) {
            this.copyTask.startWrite(recordReceiver, this.writerSliceConfig,
                    super.getTaskPluginCollector());
        }

        @Override
        public void post() {
            this.copyTask.post(this.writerSliceConfig);
        }

        @Override
        public void destroy() {
            this.copyTask.destroy(this.writerSliceConfig);
        }
    }
}

plugin.json

{
    "name": "greenplumwriter",
    "class": "com.alibaba.datax.plugin.writer.greenplumwriter.GreenplumWriter",
    "description": "简单插件,有待测试验证.  原理: copy in",
    "developer": "Carson"
}

plugin_template_job.json 

{
  "name": "greenplumwriter",
  "parameter": {
    "username": "",
    "password": "",
    "segment_reject_limit": 0,
    "column": [],
    "copy_queue_size": [],
    "num_copy_processor": 1,
    "num_copy_writer": 1,
    "connection": [
      {
        "jdbcUrl": "",
        "table": []
      }
    ],
    "preSql": [],
    "postSql": []
  }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>datax-all</artifactId>
        <groupId>com.alibaba.datax</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>greenplumwriter</artifactId>

    <properties>

    </properties>

    <dependencies>
        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>datax-common</artifactId>
            <version>${datax-project-version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>plugin-rdbms-util</artifactId>
            <version>${datax-project-version}</version>
        </dependency>
        <dependency>
            <groupId>com.pivotal</groupId>
            <artifactId>greenplum-jdbc</artifactId>
            <version>5.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.23</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-dbcp2</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.11</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- compiler plugin -->
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>${project-sourceEncoding}</encoding>
                </configuration>
            </plugin>
            <!-- assembly plugin -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptors>
                        <descriptor>src/main/assembly/package.xml</descriptor>
                    </descriptors>
                    <finalName>datax</finalName>
                </configuration>
                <executions>
                    <execution>
                        <id>dwzip</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


</project>

4、运行测试

设计一个34个字段的表,分别是id、c1……c32、create_time,写入320w左右的数据,本次测试是mysql -> greenplum,数据大小大概300M左右

本地测试结果:

DataX二次开发——(5)基于CopyIn原理新增greenplumwriter

 同集群的服务器上测试结果:

DataX二次开发——(5)基于CopyIn原理新增greenplumwriter

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/71334.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!