Base64转存文件
def save_base64_image(base64_string, output_dir, filename):
# 确保输出目录存在
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 解码Base64字符串
image_data = base64.b64decode(base64_string)
# 将解码的图像数据转换为PIL Image对象
image = Image.open(BytesIO(image_data))
# 构建完整的输出路径
output_path = os.path.join(output_dir, filename)
# 保存图像到指定路径
image.save(output_path)
logging.info(f"Image saved to {output_path}")
return f"{output_path}/{filename}"
output_path = "./images"
filename = "test.jpg"
file_path = save_base64_image("xxxxx",output_path,filename)
使用logging打印日志
import logging
logging.basicConfig(level=logging.INFO)
logging.error(f"Kafka error: {msg.error()}")
logging.info(f"Processing message: {msgJson}")
使用JSON存储配置,并读取
import json
CONFIG_FILE = 'config.json'
with open(CONFIG_FILE) as config_file:
config = json.load(config_file)
mqtt_broker = config['mqtt_broker']
{
"mqtt_broker": "192.168.1.5"
}
任务队列
定时任务
MySQL连接池封装