本次主要分享结合单例模式实际应用案例:实现实时增量数据加载工具的解决方案。最关键的是实现一个可进行添加、修改、删除等操作的增量ID记录表。
单例模式:提供全局访问点,确保类有且只有一个特定类型的对象。通常用于以下场景:日志记录或数据库操作等,避免对用一资源请求冲突。
创建增量ID记录表
import?sqlite3 import?datetime import?pymssql import?pandas?as?pd import?time pd.set_option('expand_frame_repr',?False)
导入所需模块
?#?创建数据表 database_path?=?r'.\Database\ID_Record.db' from?sqlite3?import?connect with?connect(database_path)?as?conn: ????conn.execute( ????????'CREATE?TABLE?IF?NOT?EXISTS?Incremental_data_max_id_record(id?INTEGER?PRIMARY?KEY?AUTOINCREMENT,F_SDaqID_MAX?TEXT,record_date?datetime)')
增量最新记录ID-F_SDaqID_MAX数据库存储
#数据保存到本地txt def?text_save(filename,?record):#filename为写入txt文件的路径,record为要写入F_SDaqID_MAX、record_date数据列表. ????file?=?open(filename,'a')?追加方式 ????#?file?=?open(filename,?'w')??#覆盖方式 ????for?i?in?range(len(record)): ????????s?=?str(record[i]).replace('[','').replace(']','') ????????s?=?s.replace("'",'').replace(',','')?+'\n'???#去除单引号,逗号,每行末尾追加换行符 ????????file.write(s) ????file.close()
增量最新记录ID-F_SDaqID_MAX临时文件存储
增量ID记录提供了两种实现方案 ,一个是数据持久化存储模式,另一个是临时文件存储模式。数据持久化模式顾名思义,也就是说在创建对象的时候,能将操作关键信息如增量ID-F_SDaqID_MAX记录下来,这种flag记录映射是常选择的设计模式。
数据库连接类
实现实时增量数据获取需要实现两个数据库连接类:增量数据ID存储类和增量目标数据源类。这里利用单例模式实现数据库操作类,将增量服务记录信息按照顺序存储到数据库或特定的日志文件中,以维护数据的一致性。
1、增量数据ID存储sqlite连接类代码
class?Database_sqlite(metaclass=MetaSingleton): ????database_path?=?r'.\Database\energy_rc_configure.db' ????connection?=?None ????def?connect(self): ????????if?self.connection?is?None: ????????????self.connection?=?sqlite3.connect(self.database_path,check_same_thread=False,isolation_level=None) ????????????self.cursorobj?=??self.connection.cursor() ????????return?self.cursorobj,self.connection ????#?插入最大记录 ????@staticmethod ????def?Insert_Max_ID_Record(f1,?f2): ????????cursor?=?Database_sqlite().connect() ????????print(cursor) ????????sql?=?f"""insert?into?Incremental_data_max_id_record(F_SDaqID_MAX,record_date)?values("{f1}","{f2}")""" ????????cursor[0].execute(sql) ????????#?sql?=?"insert??into?Incremental_data_max_id_record(F_SDaqID_MAX,record_date)?values(?,?)" ????????#?cursor[0].execute(sql,(f"{f1}",f"{f2}")) ????????cursor[1].commit() ????????print("插入成功!") ????????#?cursor[0].close() ????????return? ????#?取出增量数据库中最新一次ID记录 ????@staticmethod ????def?View_Max_ID_Records(): ????????cursor?=?Database_sqlite().connect() ????????sql?=?"select?max(F_SDaqID_MAX)?from?Incremental_data_max_id_record" ????????cursor[0].execute(sql) ????????results?=?cursor[0].fetchone()[0] ????????#?#单例模式不用关闭数据库连接 ????????#?cursor[0].close() ????????print("最新记录ID",?results) ????????return?results ????#删除数据记录ID ????@staticmethod ????def?Del_Max_ID_Records(): ????????cursor?=?Database_sqlite().connect() ????????sql?=?"delete?from?Incremental_data_max_id_record?where?record_date?=?(select?MAX(record_date)?from?Incremental_data_max_id_record)" ????????cursor[0].execute(sql) ????????#?results?=?cursor[0].fetchone()[0] ????????#?#?cursor[0].close() ????????cursor[1].commit() ????????print("删除成功") ????????return
2、增量数据源sqlserver连接类代码
class?Database_sqlserver(metaclass=MetaSingleton): ????""" ????#实时数据库 ????""" ????connection?=?None ????#?def?connect(self): ????def?__init__(self): ????????if?self.connection?is?None: ????????????self.connection?=?pymssql.connect(host="xxxxx",user="xxxxx",password="xxxxx",database="xxxxx",charset="utf8") ????????????if?self.connection: ????????????????print("连接成功!") ????????????#?打开数据库连接 ????????????self.cursorobj?=?self.connection.cursor() ????????#?return?self.cursorobj,?self.connection ????#?获取数据源中最大ID ????@staticmethod ????def?get_F_SDaqID_MAX(): ????????#?cursor_insert?=?Database_sqlserver().connect() ????????cursor_insert?=?Database_sqlserver().cursorobj ????????sql_MAXID?=?"""select?MAX(F_SDaqID)?from?T_DaqDataForEnergy""" ????????cursor_insert.execute(sql_MAXID)??#?执行查询语句,选择表中所有数据 ????????F_SDaqID_MAX?=?cursor_insert.fetchone()[0]??#?获取记录 ????????print("最大ID值:{0}".format(F_SDaqID_MAX)) ????????return?F_SDaqID_MAX ????#?提取增量数据 ????@staticmethod ????def?get_incremental_data(incremental_Max_ID): ????????#?开始获取增量数据 ????????sql_incremental_data?=?"""select?F_ID,F_Datetime,F_Data?from?T_DaqDataForEnergy??where?F_ID?>?{0}""".format( ????????????incremental_Max_ID) ????????#?cursor_find?=?Database_sqlserver().connect() ????????cursor_find?=?Database_sqlserver().cursorobj ????????cursor_find.execute(sql_incremental_data)??#?执行查询语句,选择表中所有数据 ????????Target_data_source?=?cursor_find.fetchall()??#?获取所有数据记录 ????????#?cursor_find.close() ????????cursor_find.close() ????????df?=?pd.DataFrame( ????????????Target_data_source, ????????????columns=[ ????????????????"F_ID", ????????????????"F_Datetime", ????????????????"F_Data"]) ????????print("提取数据",?df) ????????return?df
数据资源应用服务设计主要考虑数据库操作的一致性和优化数据库的各种操作,提高内存或CPU利用率。
实现多种读取和写入操作,客户端操作调用API,执行相应的DB操作。
注:
1、使用metaclass实现创建具有单例特征的类
Database_sqlserver(metaclass=MetaSingleton)
Database_sqlite(metaclass=MetaSingleton)
使用class定义新类时,数据库类Database_sqlserver由MetaSingleton装饰后即指定了metaclass,那么MetaSingleton的特殊方法__call__方法将自动执行。
class?MetaSingleton(type): ????_instances={} ????def?__call__(cls,?*args,?**kwargs): ????????if?cls?not?in?cls._instances: ????????????cls._instances[cls]?=?super(MetaSingleton,cls).__call__(*args,**kwargs) ????????return?cls._instances[cls]
以上代码基于元类的单例实现,当客户端对数据库执行某些操作时,会多次实例化数据库类,但是只创建一个对象,所以对数据库的调用是同步的。
2、多线程使用同一数据库连接资源需采取一定同步机制
如果没采用同步机制,可能出现一些意料之外的情况
1)with cls.lock加锁
class?MetaSingleton(type): ????_instances={} ????lock?=?threading.Lock() ????def?__call__(cls,?*args,?**kwargs): ????????with?cls.lock: ????????????if?cls?not?in?cls._instances: ????????????????time.sleep(0.05)??#模拟耗时 ????????????????cls._instances[cls]?=?super(MetaSingleton,cls).__call__(*args,**kwargs) ????????????return?cls._instances[cls]
锁的创建和释放需要消耗资源,上面代码每次创建都必须获得锁。
3、如果我们开发的程序非单个应用,而是集群化的,即多个客户端共享单个数据库,导致数据库操作无法同步,而数据库连接池是更好的选择。大大节省了内存,提高了服务器地服务效率,能够支持更多的客户服务。
数据库连接池的解决方案是在应用程序启动时建立足够的数据库连接,并讲这些连接组成一个连接池,由应用程序动态地对池中的连接进行申请、使用和释放。对于多于连接池中连接数的并发请求,应该在请求队列中排队等待。
增量数据服务客户端
增量处理策略:第一次加载先判断增量数据表中是否存在最新记录,若有直接加载;否则,记录一下最大/最新的数据记录ID或时间点,保存到一个增量数据库或记录文件中。
从第二次加载开始只加载最大/最新的ID或时间点以后的数据。当加载过程全部成功完成之后并同步更新增量数据库或记录文件,更新这次数据记录的最后记录ID或时间点。
一般这类数据记录表有自增长列,那么也可以使用自增长列来实现这个标识特征。比如本次我用到数据表增长列F_ID。
class?IncrementalRecordServer: ????_servers?=?[] ????_instance?=?None ????def?__new__(cls,?*args,?**kwargs): ????????if?not?IncrementalRecordServer._instance: ????????????#?IncrementalRecordServer._instance?=?super().__new__(cls) ????????????IncrementalRecordServer._instance?=?super(IncrementalRecordServer,cls).__new__(cls) ????????return?IncrementalRecordServer._instance ????def?__init__(self,changeServersID=None): ????????""" ????????变量初始化过程 ????????""" ????????self.F_SDaqID_MAX?=?Database_sqlserver().get_F_SDaqID_MAX() ????????self.record_date?=?datetime.datetime.now().strftime('%Y-%m-%d?%H:%M:%S') ????????self.changeServersID?=?changeServersID ????#?回调更新本地记录,清空记录替换,临时记录 ????def?record(func): ????????def?Server_record(self): ????????????v?=?func(self) ????????????text_save(filename=r"F:\AutoOps_platform\Database\Server_record.txt",record=IncrementalRecordServer._servers) ????????????print("保存成功") ????????????return?v ????????return?Server_record ????#增加服务记录 ????@record ????def?addServer(self): ????????self._servers.append([int(self.F_SDaqID_MAX),self.record_date]) ????????print("添加记录") ????????Database_sqlite.Insert_Max_ID_Record(f1=self.F_SDaqID_MAX,?f2=self.record_date) ????#修改服务记录 ????@record ????def?changeServers(self): ????????#?self._servers.pop() ????????#?此处传入手动修改的记录ID ????????self._servers.append([self.changeServersID,self.record_date]) ????????#先删除再插入实现修改 ????????Database_sqlite.Del_Max_ID_Records() ????????Database_sqlite.Insert_Max_ID_Record(f1=self.changeServersID,?f2=self.record_date) ????????print("更新记录") ????#删除服务记录 ????@record ????def?popServers(self): ????????#?self._servers.pop() ????????print("删除记录") ????????Database_sqlite.Del_Max_ID_Records() ????#?最新服务记录 ????def?getServers(self): ????????#?print(self._servers[-1]) ????????Max_ID_Records?=?Database_sqlite.View_Max_ID_Records() ????????print("查看记录",Max_ID_Records) ????????return?Max_ID_Records ????#提取数据 ????def?Incremental_data_client(self): ????????""" ????????#?提取数据(增量数据MAXID获取,并提取增量数据) ????????""" ????????#?实时数据库 ????????#?第一次加载先判断是否存在最新记录 ????????if?self.getServers()?==?None: ????????????#?插入增量数据库ID ????????????self.addServer() ????????????#?提取增量数据 ????????????data?=?Database_sqlserver.get_incremental_data(self.F_SDaqID_MAX) ????????????return?data ????????#?获取增量数据库中已有的最新最大ID记录 ????????incremental_Max_ID?=?self.getServers() ????????#添加记录 ????????self.addServer() ????????#?提取增量数据 ????????Target_data_source?=?Database_sqlserver.get_incremental_data(incremental_Max_ID) ????????return?Target_data_source
优化策略:
1、延迟加载方式
以上增量记录服务类IncrementalRecordServer通过覆盖__new__方法来控制对象的创建,我们在创建对象的时候会先检查对象是否存在。也可以通过懒加载的方式实现,节约资源优化如下。
class?IncrementalRecordServer: ????_servers?=?[] ????_instance?=?None ????def?__init__(self,changeServersID=None): ????????""" ????????变量初始化过程 ????????""" ????????self.F_SDaqID_MAX?=?Database_sqlserver().get_F_SDaqID_MAX() ????????self.record_date?=?datetime.datetime.now().strftime('%Y-%m-%d?%H:%M:%S') ????????self.changeServersID?=?changeServersID ????????if?not?IncrementalRecordServer._instance: ????????????print("__init__对象创建") ????????else: ????????????print("对象已经存在:",IncrementalRecordServer._instance) ????????????self.getInstance() ????@classmethod ????def?getInstance(cls): ????????if?not?cls._instance: ????????????cls._instance?=?IncrementalRecordServer() ????????return?cls._instance
懒汉式实例化能够确保实际需要时才创建对象,实例化a= IncrementalRecordServer()时,调用初始化__init__方法,但是没有新的对象创建。懒汉式这种方式加载类对象,也称为延迟加载方式。
2、单例模式能有效利用空间资源,每次利用同一空间资源。
不同操作对象的内存地址相同,且不同对象初始化将上一个对象初始化变量覆盖,确保最新记录实时更新。表面上以上代码实现了单例模式没问题,但多线程并发情况下,存在线程安全问题,可能同时创建不同的对象空间。考虑到线程安全,也可以进一步加锁处理.
3、适用范围及注意事项
本次代码适用于部署生产指定时间点运行之后产出的增量数据,长时间未启用再启动需要清空历史记录即增量数据库或文件ID需清空,一般实时数据增量实现一次加载没有什么问题,所以这一点也不用很关注(文件方式代码可自行完善);当加载历史数据库或定时间隔产生数据量过大时,需要进一步修改代码,需要判断数据规模,指定起始节点及加载数据量,综合因素考虑,下次分享一下亿级数据量提取方案。
4、进一步了解Python垃圾回收机制;并发情况下,通过优化线程池来管理资源。
最后可以添加一个函数来释放资源
def?__del__(self): ????class_name?=?self.__class__.__name__ ????print(class_name,"销毁")
del obj 调用__del__() 销毁对象,释放其空间;只有Python 对象在不再引用对象时被释放。当程序中有其它变量引用该实例对象时,即便手动调用 __del__() 方法,该方法也不会立即执行。这和 Python 的垃圾回收机制的实现有关。
结果测试
if?__name__?==?'__main__': ????for?i?in?range(6): ????????hc1?=?IncrementalRecordServer() ????????hc1.addServer() ????????print("Record_ID",hc1._servers[i]) ????????#?del?hc1 ????????time.sleep(60) ????#Server2-客户端client ????#?最新服务记录 ????hc2?=?IncrementalRecordServer() ????hc2.getServers() ????#查看增量数据 ????hc2.Incremental_data_client()
插入记录
模拟每1分钟插入一条记录,向增量数据库插入7条
if?__name__?==?'__main__': ????#?Server3-客户端client ????#?手动添加增量起始ID记录 ????hc3?=?IncrementalRecordServer(changeServersID='346449980') ????hc3.changeServers()
if?__name__?==?'__main__': ????#删除ID ????hc3?=?IncrementalRecordServer(changeServersID='346449980') ????#?hc3.changeServers() ????hc3.popServers()
以上就是Python实现实时增量数据加载工具的解决方案的详细内容,更多关于Python增量数据加载的资料请关注其它相关文章!
查看更多关于Python实现实时增量数据加载工具的解决方案的详细内容...