需求:在nginx日志清洗中根据IP查询对应地点
1,清洗后数据如下
2,需要查询出IP对应的国家,地区,省份,城市,运营商
3,自定义UDTF
1,创建maven工程,
2,引入依赖
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
3,继承GenericUDTF重写initialize,process,close
package cn.dim;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.lionsoul.ip2region.DataBlock;
import org.lionsoul.ip2region.DbConfig;
import org.lionsoul.ip2region.DbSearcher;
import org.lionsoul.ip2region.Util;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
/**
* @version 1.0
* @Author:杨杰
* @Date:2022/6/15 15:55
*/
public class DIMNginx extends GenericUDTF {
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
List<String> colNames = new ArrayList<>();
List<ObjectInspector> fieldIOs = new ArrayList<>();
colNames.add("ip");
colNames.add("country");
colNames.add("region");
colNames.add("province");
colNames.add("city");
colNames.add("operator");
fieldIOs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldIOs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldIOs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldIOs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldIOs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldIOs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(colNames,fieldIOs);
}
@Override
public void process(Object[] objects) throws HiveException {
String cityInfo = null;
try {
cityInfo = getCityInfo(objects[0].toString());
} catch (MalformedURLException e) {
e.printStackTrace();
}
String[] result = new String[6];
result[0] = objects[0].toString();
result[1] = cityInfo.split("\\|")[0];
result[2] = cityInfo.split("\\|")[1];
result[3] = cityInfo.split("\\|")[2];
result[4] = cityInfo.split("\\|")[3];
result[5] = cityInfo.split("\\|")[4];
forward(result);
}
@Override
public void close() throws HiveException {
}
}
1,引入ip2region(一个IP查询库,准确率99.9%的离线IP地址定位库,0.0x毫秒级查询,ip2region.db数据库只有数MB,提供了java,php,c,python,nodejs,golang,c#等查询绑定和Binary,B树,内存三种查询算法,比以往我们自己调网上的接口方便多了)
<dependency>
<groupId>org.lionsoul</groupId>
<artifactId>ip2region</artifactId>
<version>1.7.2</version>
</dependency>
2,下载ip2region.db,需要的评论区告诉我
放在resource目录下
3,编写代码
public static String getCityInfo(String ip) throws MalformedURLException {
// URL url = DIMNginx.class.getClassLoader().getResource("ip2region.db");
URL url = new URL("file:/root/awz/soft/hive/cusjar/ip2region.db");
File file;
if (url != null) {
file = new File(url.getFile());
} else {
return null;
}
if (!file.exists()) {
System.out.println("Error: Invalid ip2region.db file, filePath:" + file.getPath());
return null;
}
//查询算法
int algorithm = DbSearcher.BTREE_ALGORITHM; //B-tree
//DbSearcher.BINARY_ALGORITHM //Binary
//DbSearcher.MEMORY_ALGORITYM //Memory
try {
DbConfig config = new DbConfig();
DbSearcher searcher = new DbSearcher(config, file.getPath());
Method method;
switch ( algorithm )
{
case DbSearcher.BTREE_ALGORITHM:
method = searcher.getClass().getMethod("btreeSearch", String.class);
break;
case DbSearcher.BINARY_ALGORITHM:
method = searcher.getClass().getMethod("binarySearch", String.class);
break;
case DbSearcher.MEMORY_ALGORITYM:
method = searcher.getClass().getMethod("memorySearch", String.class);
break;
default:
return null;
}
DataBlock dataBlock;
if (!Util.isIpAddress(ip)) {
System.out.println("Error: Invalid ip address");
return null;
}
dataBlock = (DataBlock) method.invoke(searcher, ip);
return dataBlock.getRegion();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
注意:URL url = new URL(“file:/root/awz/soft/hive/cusjar/ip2region.db”);服务器上路径,本地测试用 DIMNginx.class.getClassLoader().getResource(“ip2region.db”);否则,到服务器上会报找不到静态资源路径ip2region.db的路径,ip2region.db也要上传到对应路径下
完整代码
package cn.dim;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.lionsoul.ip2region.DataBlock;
import org.lionsoul.ip2region.DbConfig;
import org.lionsoul.ip2region.DbSearcher;
import org.lionsoul.ip2region.Util;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
/**
* @version 1.0
* @Author:杨杰
* @Date:2022/6/15 15:55
*/
public class DIMNginx extends GenericUDTF {
// public static void main(String[] args) throws IOException, HiveException {
// String[] str = new String[]{"209.17.97.122"};
//
DIMNginx dwNginx = new DIMNginx();
dwNginx.process(str);
//
// // province-1.0-SNAPSHOT.jar!
//
// URL url2 = new URL("D:\\HadoopDevelopment\\nginxLog\\province\\src\\main\\resources\\ip2region.db");
//
// System.out.println(url2);
URL url = DIMNginx.class.getClassLoader().getResource("ip2region.db");
System.out.println(url.toString().replace("classes","d"));
URL url1 = DIMNginx.class.getClassLoader().getResource(url.toString().replace("classes","d"));
System.out.println(url1);
//
String cityInfo = getCityInfo("117.187.32.84");
System.out.println(cityInfo);
for (String s : cityInfo.split("\\|")) {
System.out.println(s);
}
// }
public static String getCityInfo(String ip) throws MalformedURLException {
// URL url = DIMNginx.class.getClassLoader().getResource("ip2region.db");
URL url = new URL("file:/root/awz/soft/hive/cusjar/ip2region.db");
File file;
if (url != null) {
file = new File(url.getFile());
} else {
return null;
}
if (!file.exists()) {
System.out.println("Error: Invalid ip2region.db file, filePath:" + file.getPath());
return null;
}
//查询算法
int algorithm = DbSearcher.BTREE_ALGORITHM; //B-tree
//DbSearcher.BINARY_ALGORITHM //Binary
//DbSearcher.MEMORY_ALGORITYM //Memory
try {
DbConfig config = new DbConfig();
DbSearcher searcher = new DbSearcher(config, file.getPath());
Method method;
switch ( algorithm )
{
case DbSearcher.BTREE_ALGORITHM:
method = searcher.getClass().getMethod("btreeSearch", String.class);
break;
case DbSearcher.BINARY_ALGORITHM:
method = searcher.getClass().getMethod("binarySearch", String.class);
break;
case DbSearcher.MEMORY_ALGORITYM:
method = searcher.getClass().getMethod("memorySearch", String.class);
break;
default:
return null;
}
DataBlock dataBlock;
if (!Util.isIpAddress(ip)) {
System.out.println("Error: Invalid ip address");
return null;
}
dataBlock = (DataBlock) method.invoke(searcher, ip);
return dataBlock.getRegion();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
List<String> colNames = new ArrayList<>();
List<ObjectInspector> fieldIOs = new ArrayList<>();
colNames.add("ip");
colNames.add("country");
colNames.add("region");
colNames.add("province");
colNames.add("city");
colNames.add("operator");
fieldIOs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldIOs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldIOs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldIOs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldIOs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldIOs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(colNames,fieldIOs);
}
@Override
public void process(Object[] objects) throws HiveException {
String cityInfo = null;
try {
cityInfo = getCityInfo(objects[0].toString());
} catch (MalformedURLException e) {
e.printStackTrace();
}
String[] result = new String[6];
result[0] = objects[0].toString();
result[1] = cityInfo.split("\\|")[0];
result[2] = cityInfo.split("\\|")[1];
result[3] = cityInfo.split("\\|")[2];
result[4] = cityInfo.split("\\|")[3];
result[5] = cityInfo.split("\\|")[4];
forward(result);
}
@Override
public void close() throws HiveException {
}
}
打包上传
hive添加jar包
add jar /root/awz/soft/hive/cusjar/province-1.0-SNAPSHOT.jar;// 添加对应路径
create temporary function dim_province as "cn.dim.DIMNginx"; //创建对应临时函数
查询地址
insert overwrite directory "/user/hive/warehouse/nginx.db/dim_province"
row format delimited fields terminated by ","
select dim_province(ip) from dwd_data;
结果
OK
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/133779.html