路虽远 行则将至

Lee


  • 主页
  • 归档
  • 分类
  • 标签
  •   

站点访客数   人

站点浏览量   次

本页浏览量   次

© 2024 辣辣辣白菜

Theme Typography by Makito

Proudly published with Hexo

记录Python的一些服务、文件操作

Posted at 2024-04-24 Coding  Python 

WebSocket Server以及接收图片和文本两种

import asyncio
import websockets
async def save_image(websocket, path):
    try:
        async for message in websocket:
            if message == 'image':
                image_data = await websocket.recv() # 图片原始数据
                await websocket.send(f"FILE")
            elif message == 'text':
                text_message = await websocket.recv()
                print(f"Received text message: {text_message}")
                await websocket.send("Text message received successfully!")
    except websockets.exceptions.ConnectionClosedOK:
        print("Connection closed.")
    except Exception as e:
        print(f"Error occurred: {e}")

async def start_websocket_server():
    server_address = "localhost"
    port = 8765
    async with websockets.serve(save_image, server_address, port,max_size=32*1024*1024): 
        # max_size为消息最大字节数,此处设为32MB,为了做图片上传服务使用
        print(f"WebSocket server started at ws://{server_address}:{port}")
        await asyncio.Future()  # 防止服务器退出
        
websocket_server_task = asyncio.run(start_websocket_server())

HTTP Server

from threading import Thread
import socketserver
import http.server

def start_http_server():
    PORT = 8766
    DIRECTORY = "./"  # 设置文件服务器的根目录

    # 定义处理 HTTP 请求的处理程序
    class Handler(http.server.SimpleHTTPRequestHandler):
        def __init__(self, *args, **kwargs):
            super().__init__(*args, directory=DIRECTORY, **kwargs)

    # 启动 HTTP 服务器
    with socketserver.TCPServer(("", PORT), Handler) as httpd:
        print(f"HTTP server started at http://localhost:{PORT}")
        httpd.serve_forever()

# 另一个线程运行HTTP服务器
http_server_thread = Thread(target=start_http_server)
http_server_thread.start()

清空目录(包括目录自身)

import shutil
directory_to_delete = './runs/'
def delete_contents(directory):
    try:
        # 删除目录及其下所有内容
        shutil.rmtree(directory)
        print(f"Successfully deleted all contents of '{directory}'.")
    except Exception as e:
        print(f"Error occurred while deleting contents of '{directory}': {e}")

delete_contents(directory_to_delete)

新建文件夹

os.makedirs("./data/images")

删除指定文件

os.remove(f"./img.png")

공유하기 

 이전 포스트: DockerCompose部署Zookeeper+Kafka+KafkaManager 다음 포스트: Conda的环境迁移 

站点访客数   人

站点浏览量   次

本页浏览量   次

© 2024 辣辣辣白菜

Theme Typography by Makito

Proudly published with Hexo