""" ******************************************** * @Date: 2024 09 27 * @Description: BlobObject存储对象 ******************************************** """ import hashlib import os import zlib class BlobObject: object_id: str __buff_size: int = 8192 def __init__(self, file_path: str) -> None: self.file_path = file_path self.object_id = self.contentSha1() def writeBlob(self, base_path) -> None: Folder = base_path + "/" + self.__getFolderName() if not os.path.exists(Folder): os.makedirs(Folder) self.__compressFile( self.file_path, base_path + "/" + self.__getFolderName() + "/" + self.__getFileName(), ) def __compressFile(self, file_path, save_path) -> None: compresser = zlib.compressobj(9) write_file = open(save_path, "wb") with open(file_path, "rb") as f: while True: data = f.read(self.__buff_size) if not data: break compressedData = compresser.compress(data) write_file.write(compressedData) write_file.write(compresser.flush()) write_file.close() def __getFolderName(self) -> str: return self.object_id[:2] def __getFileName(self) -> str: return self.object_id[2:] def getBlobAbsPath(self) -> str: """ 获取blob的绝对路径,此相对路径是基于存储路径的,不是相对当前文件的路径 :return 相对路径,格式为xx/xxxx... """ return self.__getFolderName() + "/" + self.__getFileName() def contentSha1(self) -> str: """ 计算文件内容的sha1值 :return sha1的hex值 """ with open(self.file_path, "rb") as f: sha1 = hashlib.sha1() while True: data = f.read(self.__buff_size) if not data: break sha1.update(data) return sha1.hexdigest()