异步优化写入备份
This commit is contained in:
@@ -6,12 +6,16 @@
|
||||
"""
|
||||
|
||||
import os
|
||||
import asyncio
|
||||
from typing import List, Union
|
||||
from .tree_object import TreeObject
|
||||
from .blob_object import BlobObject
|
||||
from utils.log import Logger
|
||||
|
||||
logger = Logger("BackupObject")
|
||||
|
||||
|
||||
class BackupObject(ObjectBase):
|
||||
class BackupObject:
|
||||
"""
|
||||
备份对象,负责管理存储对象(tree、blob)对象、备份信息、恢复等操作
|
||||
"""
|
||||
@@ -21,19 +25,27 @@ class BackupObject(ObjectBase):
|
||||
backup_time_create: str
|
||||
backup_size: int
|
||||
backup_version_number: int
|
||||
backup_tree: List[Union[TreeObject, BlobObject]]
|
||||
backup_trees: List[Union[TreeObject, BlobObject]] = []
|
||||
new_backup_flag: bool
|
||||
|
||||
def __init__(self, backup_name: str, backup_base_path: str):
|
||||
self.backup_name = backup_name
|
||||
|
||||
backup_path = os.path.join(backup_base_path, backup_name)
|
||||
if not os.path.exists(backup_path):
|
||||
# 获取备份路径的绝对路径
|
||||
if os.path.isabs(backup_base_path):
|
||||
self.backup_path = os.path.join(backup_base_path, backup_name)
|
||||
else:
|
||||
self.backup_path = os.path.join(
|
||||
os.path.abspath(backup_base_path), backup_name
|
||||
)
|
||||
|
||||
logger.debug(f"Backup path: {self.backup_path}")
|
||||
|
||||
if not os.path.exists(self.backup_path):
|
||||
self.new_backup_flag = True
|
||||
else:
|
||||
self.new_backup_flag = False
|
||||
pass #TODO 读取备份信息
|
||||
|
||||
pass # TODO 读取备份信息
|
||||
|
||||
def createNewBackup(self, backup_dirs: List[str]):
|
||||
"""
|
||||
@@ -43,12 +55,26 @@ class BackupObject(ObjectBase):
|
||||
"""
|
||||
for backup_dir in backup_dirs:
|
||||
if os.path.isdir(backup_dir):
|
||||
self.backup_tree.append(TreeObject(backup_dir))
|
||||
self.backup_trees.append(TreeObject(backup_dir))
|
||||
else:
|
||||
self.backup_tree.append(BlobObject(backup_dir))
|
||||
self.backup_trees.append(BlobObject(backup_dir))
|
||||
|
||||
logger.info("New backup created successfully.")
|
||||
logger.debug(f"Backup trees: {self.backup_trees}")
|
||||
|
||||
def backup(self):
|
||||
pass
|
||||
asyncio.run(self.__writeBlobs())
|
||||
|
||||
async def __writeBlobs(self):
|
||||
"""
|
||||
写入所有对象到备份路径
|
||||
"""
|
||||
obj_save_path = os.path.join(self.backup_path, "objects")
|
||||
for obj in self.backup_trees:
|
||||
if isinstance(obj, TreeObject):
|
||||
await obj.writeBlobs(obj_save_path) # 如果是TreeObject,则调用writeBlobs方法
|
||||
else:
|
||||
await obj.writeBlob(obj_save_path) # 如果是BlobObject,则调用writeBlob方法
|
||||
|
||||
def recover(self, recover_path: str):
|
||||
"""
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
import hashlib
|
||||
import os
|
||||
import zlib
|
||||
import aiofiles
|
||||
|
||||
|
||||
class BlobObject:
|
||||
@@ -16,45 +17,45 @@ class BlobObject:
|
||||
|
||||
def __init__(self, file_path: str) -> None:
|
||||
self.file_path = file_path
|
||||
self.object_id = self.contentSha1()
|
||||
self.object_id = self.__contentSha1()
|
||||
|
||||
def writeBlob(self, base_path) -> None:
|
||||
Folder = base_path + "/" + self.__getFolderName()
|
||||
async def writeBlob(self, base_path: str) -> None:
|
||||
Folder = base_path + "/" + self.object_id[:2]
|
||||
if not os.path.exists(Folder):
|
||||
os.makedirs(Folder)
|
||||
self.__compressFile(
|
||||
await self.__compressFile(
|
||||
self.file_path,
|
||||
base_path + "/" + self.__getFolderName() + "/" + self.__getFileName(),
|
||||
base_path + "/" + self.object_id[:2] + "/" + self.object_id[2:],
|
||||
)
|
||||
|
||||
def __compressFile(self, file_path, save_path) -> None:
|
||||
compresser = zlib.compressobj(9)
|
||||
write_file = open(save_path, "wb")
|
||||
with open(file_path, "rb") as f:
|
||||
while True:
|
||||
data = f.read(self.__buff_size)
|
||||
if not data:
|
||||
break
|
||||
compressedData = compresser.compress(data)
|
||||
write_file.write(compressedData)
|
||||
write_file.write(compresser.flush())
|
||||
write_file.close()
|
||||
|
||||
def __getFolderName(self) -> str:
|
||||
return self.object_id[:2]
|
||||
|
||||
def __getFileName(self) -> str:
|
||||
return self.object_id[2:]
|
||||
|
||||
def getBlobAbsPath(self) -> str:
|
||||
"""
|
||||
获取blob的绝对路径,此相对路径是基于存储路径的,不是相对当前文件的路径
|
||||
|
||||
:return 相对路径,格式为xx/xxxx...
|
||||
"""
|
||||
return self.__getFolderName() + "/" + self.__getFileName()
|
||||
return self.object_id[:2] + "/" + self.object_id[2:]
|
||||
|
||||
def contentSha1(self) -> str:
|
||||
async def __compressFile(self, file_path: str, save_path: str) -> None:
|
||||
"""
|
||||
压缩文件
|
||||
|
||||
:param file_path: 要压缩的文件路径
|
||||
:param save_path: 保存路径
|
||||
:return
|
||||
"""
|
||||
compresser = zlib.compressobj(9)
|
||||
async with aiofiles.open(save_path, mode='wb') as wf, aiofiles.open(file_path, mode='rb') as rf:
|
||||
while True:
|
||||
data = await rf.read(self.__buff_size)
|
||||
if not data:
|
||||
break
|
||||
compressedData = compresser.compress(data)
|
||||
await wf.write(compressedData)
|
||||
await wf.write(compresser.flush())
|
||||
|
||||
|
||||
def __contentSha1(self) -> str:
|
||||
"""
|
||||
计算文件内容的sha1值
|
||||
|
||||
@@ -68,3 +69,7 @@ class BlobObject:
|
||||
break
|
||||
sha1.update(data)
|
||||
return sha1.hexdigest()
|
||||
|
||||
|
||||
def newBlobObject(file_path: str) -> BlobObject:
|
||||
return BlobObject(file_path)
|
||||
@@ -8,7 +8,7 @@
|
||||
from typing import Dict, List
|
||||
from .blob_object import BlobObject
|
||||
import os
|
||||
|
||||
import asyncio
|
||||
|
||||
class TreeObject:
|
||||
__children: List[BlobObject] = [] # 备份树节点列表,节点为BlobObject备份存储对象
|
||||
@@ -26,6 +26,8 @@ class TreeObject:
|
||||
self.__children.append(blob)
|
||||
self.__file_map[blob_path] = blob.object_id
|
||||
|
||||
def writeBlobs(self, base_path: str):
|
||||
async def writeBlobs(self, base_path: str):
|
||||
tasks = []
|
||||
for child in self.__children:
|
||||
child.writeBlob(base_path)
|
||||
tasks.append(asyncio.create_task(child.writeBlob(base_path)))
|
||||
asyncio.gather(*tasks)
|
||||
|
||||
Reference in New Issue
Block a user