数据存储库通常是超高要求系统的瓶颈。在这些系统中,正在执行的查询数量非常大。DelayedBatchExecutor是一个用于减少所需查询数量的组件,通过在Java多线程应用程序中对所需查询进行批处理。
1个参数的n个查询Vs. n个参数的1个查询
假设有一个对关系数据库执行查询的Java应用程序,以便在给定其唯一标识符(id)的情况下检索Product实体(row)。
查询如下所示:
SELECT * FROM PRODUCT WHERE ID =
现在,检索n个Products,有如下两种方法:
- 执行1个参数的n个独立查询:
SELECT*FROMPRODUCTWHEREID= SELECT*FROMPRODUCTWHEREID= ... SELECT*FROMPRODUCTWHEREID=
- 使用IN运算符或ORs的串联,对n个参数执行1个查询以便同时检索n个 Products
--ExampleusingINOPERATOR SELECT*FROMPRODUCTWHEREIDIN(,,...,)
后者在网络流量和数据库服务器资源(CPU和磁盘)方面更为有效,因为:
- 往返数据库的次数为1,而不是n。
- 数据库引擎优化了n个参数的数据遍历过程,即每个表格可能只需要扫描1次,而不是n次。
这不仅适用于SELECT操作,而且适用于其他操作,例如 INSERTs,UPDATEs和DELETEs。实际上,JDBC API包括上述操作的批量处理操作。
同样的情况也适用于NoSQL存储库,其中大多都明确提供BULK操作。
DelayedBatchExecutor
需要从数据库中检索数据的Java应用程序,如REST微服务或异步消息处理器,通常以多线程应用程序(*1)实现,其中:
- 每个线程在其执行的某个时刻执行相同的查询(每个查询具有不同的参数)。
- 并发线程数很高(每秒数十或数百)。
在这种场景下,数据库很可能在较短的时间间隔内多次执行相同的查询。
如前所述,如果将1个参数的n个查询替换为具有n个参数的单个等效查询,那么则应用程序将使用较少的数据库服务器和网络资源。
好消息是它可以通过timewindows(时间窗口)的机制来实现,如下所示:
第一个尝试执行查询的线程会打开一个时间窗口,因此其参数被存储在一个列表中,同时该线程被暂停。在时间窗口内执行相同查询的其余线程会将其参数添加到列表中,并且也会被暂停。此时,数据库上未执行任何查询。
时间窗口结束或列表已满(先前已定义最大容量限制)后,将使用列表中存储的所有参数执行单个查询。最后,一旦数据库提供了该查询的结果,每个线程将接收相应的结果,同时所有线程将自动恢复。
笔者构建了一个简单而轻量级的应用机制(DelayedBatchExecutor),很容易在新的或现有的应用程序中使用。它基于Reactor库,并且为参数列表使用超时的Flux缓冲发布器。
运用DelayedBatchExecutor的吞吐量和延迟分析
假设针对Products的REST微服务公开了一个端点,用于检索数据库中给定的 productId的Product数据。在没有DelayedBatchExecutor的情况下,如果每秒对端点命中200次,则数据库每秒执行200个查询。如果端点使用的DelayedBatchExecutor 配置了50毫秒的时间窗口且最大容量=10个参数,数据库每秒钟将只执行10个参数的20个查询,代价是每执行一个线程,最多在50毫秒内增加延时(*2)。
换句话说,为了将延时增加50毫秒(* 2),数据库每秒接收的查询减少了10倍,然而保持了系统的整体吞吐量。还不错!!
其他有趣的配置:
- 窗口时间= 100毫秒,最大容量= 20个参数→20个参数的10个查询(查询减少20倍)
- 窗口时间= 500毫秒,最大容量= 100个参数→2个查询100个参数(查询减少100倍)
执行中的DelayedBatchExecutor
深入研究Product微服务示例。假设对于每个传入的HTTP请求,微服务的控制器都要求检索已有id的Product(Java Bean),因此将调用以下方法:
DAO组件(ProductDAO)的public Product getProductById(IntegerproductId) .
以下分别是有和没有 DelayedBatchExecutor的DAO执行。
没有 DelayedBatchExecutor
publicclassProductDAO{ publicProductgetProductById(Integerid){ Productproduct=...//executethequerySELECT*FROMPRODUCTWHEREID= //usingyourfavouriteAPI:JDBC,JPA,Hibernate... returnproduct; } ... }
有DelayedBatchExecutor
//Singleton publicclassProductDAO{ DelayedBatchExecutor2delayedBatchExecutorProductById= DelayedBatchExecutor.define(Duration.ofMillis(50),10,this::retrieveProductsByIds); publicProductgetProductById(Integerid){ Productproduct=delayedBatchExecutorProductById.execute(id); returnproduct; } privateListretrieveProductsByIds(ListidList){ ListproductList=...//executequery:SELECT*FROMPRODUCTWHEREIDIN(idList.get(0),...,idList.get(n)); //usingyourfavouriteAPI:JDBC,JPA,Hibernate... //Thepositionsoftheelementsofthelisttoreturnmustmatchtheonesintheparameterslist. //Forinstance,thefirstProductofthelisttobereturnedmustbetheonewith //theIdinthefirstpositionofproductIdsListandsoon... //NOTE:nullcouldbeusedasvalue,meaningthatnoProductexistforthegivenproductId returnproductList; } ... }
首先,必须在DAO中创建一个DelayedBatchExecutor实例,在本例中为 delayedBatchExecutorProductById。需要以下三个参数:
- 时间窗口(在此示例中为50毫秒)
- 参数列表的最大容量(在此示例中为10个参数)
- 将使用参数列表调用的方法(详细信息见后文)。在此示例中,方法为retrieveProductsByIds
其次,已经重构了DAO方法 publicProduct getProductById(Integer productId),以简单调用delayedBatchExecutorProductById 实例的execute 方法。所有的“magic”都是由 DelayedBatchExecutor完成的。
之所以delayedBatchExecutorProductById是DelayedBatchExecutor2
如果execute方法需要接收两个参数(例如,一个 Integer和一个String)并返回Product实例,则定义为 DelayedBatchExecutor3
最终,retrieveProductsByIds方法必须返回List 并接收List作为参数。
如果使用的是 DelayedBatchExecutor3
就是这样。
一旦运行,执行控制器逻辑的并发线程会在某时刻调用方法 getProductById(Integerid) ,并且此方法将返回对应的Product。并发线程不知自己已经被 DelayedBatchExecutor暂停并恢复了。
由数据存储库延伸的“题外话”
尽管本文与数据存储库有关,但 DelayedBatchExecutor也可以用在其他地方,例如:对REST进行微服务请求。再说,用1个参数启动n个GET请求要比使用n个参数启动1个GET昂贵得多。
DelayedBatchExecutor的优化
笔者创建了 DelayedBatchExecutor并使用了一段时间,有效地解决了个人项目中并发线程启动的多个查询的执行问题。因此相信它对其他人可能也有用处,所以决定将其公开。
话虽如此,DelayedBatchExecutor改进和功能扩展的空间还很大。最有趣的是能够根据执行的特定条件动态更改DelayedBatchExecutor参数(窗口时间和最大容量)的功能,以便在利用带有n个参数的查询时很大程度地减少延时。