本地文件批量上传更新至第三方接口

本地图片批量上传更新至第三方接口

@(Python笔记)[kaps_upload_image]


  • 项目背景

  • 实现思路

  • 依赖库安装

  • 代码部分


项目背景

人员组织架构管理平台,服务不提供人员照片批量上传方法,页面操作繁琐消耗人力

实现思路

1、本地图片文件遍历,输出人员编号和图片文件路径对应关系 2、调用查询接口获取人员信息,加入图片base64转码后的字符串处理生成更新接口入参 3、调用更新接口,打印上传结果并输出日志文件

依赖库安装

pip install requests
pip install base64
pip install os
pip install json

代码部分

# *_* coding : UTF-8 *_*
# 开发人员: demon
# 开发时间: 2023/05/24 11:20
# 文件名称: kaps_upload_image.py
# 开发工具: PyCharm
# 脚本用途:人员微服务批量上传照片
import hashlib
import requests
import base64
import os
import json

path = os.getcwd()
logfile_path = "kaps_upload_image.log"


def get_user_info(host, user_id, clientId, clientSecret, headPortraitName):
url = "http://{}/kaps-server/open-api/user/details/{}?clientId={}&clientSecret={}".format(host, user_id, clientId,
clientSecret)
r = requests.post(url)
r_json = r.json()
if bool(r_json.get("result")):
r_json["dataMap"] = r_json.pop("result")
del r_json["code"]
del r_json["status"]
del r_json["timeElapsed"]
del r_json["timestamp"]
r_json["dataMap"]["headPortraitName"] = headPortraitName
r_json.update({"isAudit": "", "deviceType": "kaps_user"})
return r_json
else:
return None


def get_img_bs4(file_path):
with open(file_path, "rb") as f:
base64_data = base64.b64encode(f.read())
base64_data = str(base64_data)[2:-1]
base64_data = "data:image/png;base64," + base64_data
return base64_data


def traverse_folder():
data = {}
img_path = os.path.join(path, "kaps_img")
for root, directories, files in os.walk(img_path):
for file_name in files:
file_path = os.path.join(root, file_name)
if file_path.lower().endswith(('.jpg', '.png', '.jpeg')):
user_id = os.path.splitext(os.path.basename(file_path))[0]
data[user_id] = file_path
return data


def kaps_login(host, username, passwd):
url = "http://{}/kaps-server/oauth2/password".format(host)
data = {
"username": username,
"password": passwd
}
try:
r = requests.post(url, data=data)
except Exception as e:
raise NameError(e)
return r.json()["result"]["jwt_token"]


def update_user_info(user_id, data, img_bs4, host, token):
url = "http://{}/kaps-server/devices/update".format(host)
headers = {
"Content-Type": "application/json",
"jwt-token": token
}
data["dataMap"]["headPortrait"] = img_bs4
json_data = json.dumps(data)
r = requests.put(url, data=json_data, headers=headers)
if r.json()["status"] == 200:
return "人员编号{}的用户图片上传成功".format(user_id)
else:
return "人员编号{}的用户图片上传失败".format(user_id)


def get_config():
with open("config.json", "r") as f:
config = json.loads(f.read())
return config


def append_to_file(file_path, content):
with open(file_path, 'a') as f:
f.write(content)
f.write('n')


if __name__ == "__main__":
config_json = get_config()
host = config_json["host"]
clientId = config_json["clientId"]
clientSecret = config_json["clientSecret"]
username = config_json["username"]
passwd = config_json["passwd"]
token = kaps_login(host, username, passwd)
dict_userid_img = traverse_folder()
for user_id, file_path in dict_userid_img.items():
img_bs4 = get_img_bs4(file_path)
headPortraitName = os.path.splitext(os.path.basename(file_path))
kaps_json = get_user_info(host, user_id, clientId, clientSecret, headPortraitName)
if kaps_json is None:
result = "人员编号{}的用户不存在".format(user_id)
print(result)
append_to_file(logfile_path, result)
else:
result = update_user_info(user_id, kaps_json, img_bs4, host, token)
print(result)
append_to_file(logfile_path, result)

原文始发于微信公众号(给爷整笑辣):本地文件批量上传更新至第三方接口

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/206864.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!