java创建hive表并导入本地txt文件数据方法模板
问题背景
项目需求创建hive表,并把文件数据导入到表中
方法样例模板
public void uploadToHive(String sourcePath) throws Exception {
String tableName = "hivetable";
//文件路径,取出文件夹,带最后一个反斜杠
String sourceDir = sourcePath.substring(0, sourcePath.lastIndexOf(File.separator) + 1);
String localFileName = sourcePath.substring(sourcePath.lastIndexOf(File.separator) + 1);
//存储第一行的字段名
String fieldNamePath = sourceDir + "fieldName.txt";
//第一行为字段名,第二行开始为数据,导入数据不需要导入字段名
LinuxUtil.executeCmd("head -n 1 " + sourcePath + " > " + fieldNamePath);
//删除第一行字段名
LinuxUtil.executeCmd("sed -i '1d' " + sourcePath);
//上传至hdfs,原本后面导入表数据通过load local本地导入,但由于hive是分布式集群,导致本地找不到文件,所以上传到hdfs进行load导入
String hdfsDir = hdfsUploadDir + File.separator + (new Random().nextInt(899999) + 100000);
//创建hdfs临时目录
LinuxUtil.executeCmd("hadoop fs -mkdir " + hdfsDir);
//上传文件至hdfs
LinuxUtil.executeCmd(String.format("hadoop fs -put -f %s %s", sourcePath, hdfsDir));
//读入第一行字段名作为建表的字段名
File fieldFile = new File(fieldNamePath);
BufferedReader readerField = null;
FileInputStream fileInputStreamField = null;
Connection connection = null;
List<String> fieldNames = Lists.newArrayList();
Statement stmt;
int retry = 0;
boolean success = false;
boolean dataFlag = false;
try {
fileInputStreamField = new FileInputStream(fieldFile);
InputStreamReader inputStreamReader = new InputStreamReader(fileInputStreamField, StandardCharsets.UTF_8);
readerField = new BufferedReader(inputStreamReader);
String line;
while ((line = readerField.readLine()) != null) {
if (!Strings.isBlank(line)) {
String[] temp = line.split("\t");
for (int j = 0; j < temp.length; j++) {
fieldNames.add(temp[j]);
}
}
}
} catch (IOException e) {
e.printStackTrace();
log.error("Error: {}", e.getMessage());
throw e;
} finally {
if (readerField != null) {
try {
readerField.close();
fileInputStreamField.close();
} catch (IOException e) {
e.printStackTrace();
log.error("Error: {}", e.getMessage());
}
}
}
//创建表的字段属性
String firstField = "";
for (String field : fieldNames) {
firstField += field + " String,";
}
//创建表的HQL
String createSql = "create table if not exists " + tableName + " (" + firstField.substring(0, firstField.length() - 1) + ") "
+ "ROW FORMAT DELIMITED fields terminated by '\t'"
+ " lines terminated by '\n'"
+ " stored as textfile";
//导入表的HQL,通过hdfs导入,如果loa local data使用本地导入
String loadSql = String.format("load data inpath '%s' into table %s", hdfsDir + File.separator + localFileName, tableName);
try {
// 连接 hive
connection = getHiveConn();
// hive sql statement
stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
try {
// 执行createSql,不能使用executeQuery
stmt.executeUpdate(createSql);
} catch (SQLException e) {
//重试3次
while (!success && retry < 3) {
retry++;
try {
stmt.executeUpdate(createSql);
//获得列集
success = true;
} catch (SQLException ex) {
success = false;
}
}
if (!success) {
throw e;
}
}
try {
// 执行loadSql
stmt.executeUpdate(loadSql);
} catch (SQLException e) {
//重试3次
while (!dataFlag && retry < 3) {
retry++;
try {
stmt.executeQuery(loadSql);
//获得列集
dataFlag = true;
} catch (SQLException ex) {
dataFlag = false;
}
}
if (!dataFlag) {
throw e;
}
} finally {
//删除临时文件夹
LinuxUtil.executeCmd(String.format("hadoop fs -rm -r %s", hdfsDir));
}
} catch (Exception e) {
log.error("Execute hive sql error, sql: {}, error: {}", createSql, e);
throw e;
} finally {
closeHiveConn(connection);
}
}
总结
- 最近一段时间非常忙,但今天有个可爱的粉丝提醒俺更新博客了,第一次受到了莫大的精神支持,所以今天挤一些时间写一篇,非常感谢这位粉丝,在俺前进的道路上点亮一盏明灯
作为程序员第 105 篇文章,每次写一句歌词记录一下,看看人生有几首歌的时间,wahahaha …
Lyric: 我知道坚持要走是你受伤的借口
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/110770.html