自用的python数据处理方法
一、字典的数据操作
# -*- coding:utf8 -*-
# create class DataDictOperate
class DataDictOperate:
def __init__(self, data_dict):
self.data_dict = data_dict
"""
create_one_key_to_model_value_dict方法使用:
比如有个列表[[1, 'a'], [2, 'a'], [1, 'c'], [3, 'b'], [1, 'a'], [2, 'd']]
我想整理成{1: ['a', 'c', 'a'], 2: ['a', 'd'], 3: ['b']}这种形式
则使用如下代码可轻松实现
# init dict
data_dict = {}
data_list = [[1, 'a'], [2, 'a'], [1, 'c'], [3, 'b'], [1, 'a'], [2, 'd']]
for data in data_list:
DataDictOperate(data_dict=data_dict).create_one_key_to_more_value_dict(key=data[0], value=data[1])
print(data_dict)
打印出来的字典就是目标格式了
"""
def create_one_key_to_more_value_dict(self, key, value):
# if the input key is in input dict
if self.data_dict.__contains__(key):
self.data_dict.get(key).append(value)
# if the input key is not in input dict
else:
self.data_dict.setdefault(key, []).append(value)
return self.data_dict
二、列表的数据操作
# create class DataListOperate
class DataListOperate:
def __init__(self, data_list):
self.data_list = data_list
self.reverse_status = None
"""
make list sort 方法使用:
比如有个列表[
{'tech': 'a', 'count': 10}, {'tech': 'b', 'count': 13}, {'tech': 'c', 'count': 5}, {'tech': 'd', 'count': 6}
]
我想让列表按照count排序,则可以使用此方法
input_type是选择从大到小排序还是从小到大排序,key是输入你想要排序的键名
"""
def make_list_sort(self, input_type, key):
DataListOperate.change_reverse_status(self, input_type)
# get data_sort_list
data_sort_list = sorted(self.data_list, key=lambda x: x[key], reverse=self.reverse_status)
return data_sort_list
def change_reverse_status(self, input_type):
if input_type == 'min_to_max':
self.reverse_status = False
elif input_type == 'max_to_min':
self.reverse_status = True
三、文件的数据操作
# create class DataPathOperate
class DataPathOperate:
def __init__(self, path):
self.path = path
"""
建立目录
"""
def create_dir(self):
# try verify dir is exist or not
try:
os.mkdir(self.path)
# if dir is exist then except error
except FileExistsError:
pass
"""
判断是否是文件
"""
def judge_is_file(self):
return os.path.isfile(self.path)
"""
删除目录
"""
def remove_dir(self):
# try verify dir is exist or not
try:
os.removedirs(self.path)
# if file not found, then except error
except FileNotFoundError:
pass
# if There are files under the dir, then use the shutil.rmtree function
except OSError:
shutil.rmtree(self.path)
"""
删除文件
"""
def remove_file(self):
try:
os.remove(self.path)
except Exception as e:
LogDataOperate(message_type='error').error(message=e)
"""
"""
def read_data(self, file_type, parse_type):
# get data_list
data_list = []
# if parse type is pandas
if parse_type == 'pandas':
# if file_type is equal to csv:
if file_type == 'csv':
data_list = np.array(pd.read_csv(self.path, encoding='utf-8-sig')).tolist()
# if file_type is equal to excel
elif file_type == 'xlsx':
data_list = np.array(pd.read_excel(io=self.path)).tolist()
# if parse type is with open
elif parse_type == 'with open':
if file_type in ['xlsx', 'csv', 'txt']:
with open(self.path, 'r') as f:
# get data_list
data_list = f.readlines()
return data_list
"""
保存excel数据为xlsx或csv
"""
def save_data(self, save_type, save_data, columns):
if save_type in ['csv', 'xlsx']:
df = pd.DataFrame(save_data)
df.columns = columns
if save_type == 'xlsx':
df.to_excel(self.path, index=False, encoding='utf-8-sig')
elif save_type == 'csv':
df.to_csv(self.path, index=False, encoding='utf-8-sig')
"""
复制拷贝数据
"""
def copy_and_paste_file(self, aim_path):
shutil.copyfile(self.path, aim_path)
"""
把相对路径转换为绝对路径
"""
def change_relative_path_to_abs_path(self):
self.path = os.path.abspath(self.path)
return self.path
"""
把文件写进压缩包里面,并生成压缩包
"""
def write_zipfile(self, zipfile_name, write_file_list):
abs_path = os.path.abspath(self.path)
try:
with ZipFile('{}/{}.zip'.format(abs_path, zipfile_name), mode="w") as f:
for write_file in write_file_list:
new_file_base_path = {'Windows': abs_path, 'Linux': ''}
# get new_file
new_file = '{}/{}'.format(new_file_base_path[RUN_SYSTEM], write_file[write_file.rfind('/')+1:])
shutil.copyfile(write_file, new_file)
f.write(new_file)
try:
os.remove(new_file)
except Exception as e:
pass
except Exception as e:
pass
finally:
f.close()
四、字符串的操作
# create class DataStringOperate
class DataStringOperate:
def __init__(self, data, replace_list=REPLACE_DATA_LIST, after_replace_str=''):
"""
replace_list:要替换的字符串列表,after_replace_str:替换后的字符串
replace_list默认传入的是REPLACE_DATA_LIST,
REPLACE_DATA_LIST = [
'nan', '\xa0', '\ue627', '\u200a', '\u200b', '\u200c', '\u200d', '\u200e', '\u202a', '\u202b',
'\u202c', '\u202d', '\u202e', '\u2060', '\u2061', '\u2062', '\n', 'xa0', 'ue627', 'u200a',
'u200b', 'u200c', 'u200d', 'u200e', 'u202a', 'u202b', 'u202c', 'u202d', 'u202e', 'u2060', 'u2061',
'u2062', '“', '”'
]
"""
self.data = data
self.replace_list = replace_list
self.after_replace_str = after_replace_str
"""
判断是否字符串是人名
"""
def isname(self):
pair_word_list = pseg.lcut(self.data)
for eve_word, cixing in pair_word_list:
if cixing == "nr":
return True
return False
"""
替换不要的字符
"""
def data_replace(self):
# replace_str_list
replace_str_list = self.replace_list
n = 0
while n < len(replace_str_list):
self.data = self.data.replace(replace_str_list[n], self.after_replace_str)
n = n + 1
return self.data
五、日期的操作
# create class DateOperate
class DateOperate:
def __init__(self):
self.date = ''
"""
获取当前日期
当前日期格式为"2022_01_01_20_00_00_083624",精确到毫秒
"""
def get_now_date(self):
self.date = str(datetime.now()).replace(' ', '_').replace(':', '_').replace('.', '_').replace('-', '_')
return self.date
"""
获取上次日期
当last_date_type为days时,为获取昨天的日期
当last_date_type为weeks时,为获取上个星期的日期
当last_date_type为months时,为获取上个月的日期
"""
def get_last_date(self, last_date_type):
if last_date_type == 'days':
self.date = (datetime.now() + relativedelta(days=-1)).strftime("%Y-%m-%d %H:%M:%S")
elif last_date_type == 'weeks':
self.date = (datetime.now() + relativedelta(weeks=-1)).strftime("%Y-%m-%d %H:%M:%S")
elif last_date_type == 'months':
self.date = (datetime.now() + relativedelta(months=-1)).strftime("%Y-%m-%d %H:%M:%S")
return self.date
六、日志的操作
# create class LogDataOperate
class LogDataOperate:
def __init__(self, message_type, clevel=None, flevel=logging.DEBUG):
self.clevel = clevel
# get path
path = '{}/{}/{}.log'.format(LOG_PATH, message_type, str(datetime.now())[:10].replace('-', '_'))
self.logger = logging.getLogger(path)
self.logger.setLevel(logging.DEBUG)
fmt = logging.Formatter(
'[%(asctime)s] [%(created)f] [%(levelname)s] [%(funcName)s] [%(process)d] [%(processName)s] '
'[%(relativeCreated)d] [%(thread)d] [%(threadName)s] %(message)s')
"""
如果想在控制台打印出日志,就不要注释下面的代码;否则就注释
"""
# # setting CMD log
# sh = logging.StreamHandler()
# sh.setFormatter(fmt)
# sh.setLevel(self.clevel)
# self.logger.addHandler(sh)
"""
如果想保存日志文件,就不要注释下面的代码;否则就注释
"""
# setting file log
fh = logging.FileHandler(path)
fh.setFormatter(fmt)
fh.setLevel(flevel)
self.logger.addHandler(fh)
def debug(self, message):
self.logger.debug(message)
def info(self, message):
self.logger.info(message)
def warn(self, message):
self.logger.warning(message)
def error(self, message):
self.logger.error(message)
def cri(self, message):
self.logger.critical(message)
- 持续更新中
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/199451.html