From d2d00b57e1a8ab328e2e94049cd9149ababe7e0c Mon Sep 17 00:00:00 2001 From: ludc Date: 星期五, 10 十一月 2023 08:52:48 +0800 Subject: [PATCH] 修改历史数据导入时开始多线程分批并行执行insert --- Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java | 46 ++++++++++++++++++++++++++++++++++++++++++++-- 1 files changed, 44 insertions(+), 2 deletions(-) diff --git a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java index bbc4b11..e594481 100644 --- a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java +++ b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java @@ -66,6 +66,7 @@ import org.springblade.core.tool.utils.Func; import org.springblade.core.tool.utils.StringPool; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.cache.Cache; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -81,7 +82,7 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.*; import java.util.stream.Collectors; import static com.vci.ubcs.code.constant.FrameWorkLangCodeConstant.DATA_OID_NOT_EXIST; @@ -97,6 +98,11 @@ @Service public class MdmEngineServiceImpl implements MdmEngineService { + /** + * 鍗曟sql鐨勬渶澶氬鍏ユ暟閲� + */ + @Value("${import.maxNum}") + private Integer MAX_IMPORT_NUM; /** * 妯℃澘鐨勬湇鍔� @@ -3549,10 +3555,46 @@ throw new VciBaseException("绫诲瀷杞崲閿欒锛�" + e.toString()); } }); - return commonsMapper.insertByBaseModel(listR.getData().get(0).getTableName(), maps.get(0), maps); + try { + bactchExecuteInsert(listR.getData().get(0).getTableName(),maps); + }catch (Exception e){ + throw new ServiceException("鍒嗘壒鎵цinsert璇彞鎶ラ敊:"+e.getMessage()); + } + return maps.size(); } /** + * 鍒嗘壒鎵цinsert璇彞 + * @param tableName + * @param maps + * @throws ServiceException + */ + @Transactional(rollbackFor = Exception.class) + void bactchExecuteInsert(String tableName, List<Map<String, String>> maps) throws ServiceException{ + ExecutorService executor = Executors.newFixedThreadPool(10); // 鍒涘缓涓�涓浐瀹氬ぇ灏忕殑绾跨▼姹� + List<Map<String, String>> threadSafeMaps = new CopyOnWriteArrayList<>(maps); + for (int i = 0; i < threadSafeMaps.size(); i += MAX_IMPORT_NUM) { + final int startIndex = i; + final int endIndex = Math.min(i + MAX_IMPORT_NUM, maps.size()); + + executor.execute(() -> { + List<Map<String, String>> subList = threadSafeMaps.subList(startIndex, endIndex); + // 璋冪敤鎻掑叆鏁版嵁搴撶殑鏂规硶 + commonsMapper.insertByBaseModel(tableName, threadSafeMaps.get(0), subList); + }); + } + // 鍏抽棴绾跨▼姹� + executor.shutdown(); + try { + // 绛夊緟鎵�鏈変换鍔℃墽琛屽畬鎴� + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + // 澶勭悊寮傚父 + throw new ServiceException("澶氱嚎绋嬫柟寮忔墽琛屾壒閲忔彃鍏ユ椂浜х敓閿欒:"+e.getMessage()); + } + } + + /** * 浼犲叆涓氬姟绫诲瀷浠ュ強ID鏌ヨ涓氬姟琛ㄦ暟鎹槸鍚﹂噸澶� * * @param btmType 涓氬姟绫诲瀷 -- Gitblit v1.9.3