From d2d00b57e1a8ab328e2e94049cd9149ababe7e0c Mon Sep 17 00:00:00 2001 From: ludc Date: 星期五, 10 十一月 2023 08:52:48 +0800 Subject: [PATCH] 修改历史数据导入时开始多线程分批并行执行insert --- Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmIOServiceImpl.java | 2 Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java | 46 ++++++++++++++++++++++- Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmProductCodeServiceImpl.java | 24 ++++++----- 3 files changed, 58 insertions(+), 14 deletions(-) diff --git a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java index bbc4b11..e594481 100644 --- a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java +++ b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java @@ -66,6 +66,7 @@ import org.springblade.core.tool.utils.Func; import org.springblade.core.tool.utils.StringPool; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.cache.Cache; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -81,7 +82,7 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.*; import java.util.stream.Collectors; import static com.vci.ubcs.code.constant.FrameWorkLangCodeConstant.DATA_OID_NOT_EXIST; @@ -97,6 +98,11 @@ @Service public class MdmEngineServiceImpl implements MdmEngineService { + /** + * 鍗曟sql鐨勬渶澶氬鍏ユ暟閲� + */ + @Value("${import.maxNum}") + private Integer MAX_IMPORT_NUM; /** * 妯℃澘鐨勬湇鍔� @@ -3549,10 +3555,46 @@ throw new VciBaseException("绫诲瀷杞崲閿欒锛�" + e.toString()); } }); - return commonsMapper.insertByBaseModel(listR.getData().get(0).getTableName(), maps.get(0), maps); + try { + bactchExecuteInsert(listR.getData().get(0).getTableName(),maps); + }catch (Exception e){ + throw new ServiceException("鍒嗘壒鎵цinsert璇彞鎶ラ敊:"+e.getMessage()); + } + return maps.size(); } /** + * 鍒嗘壒鎵цinsert璇彞 + * @param tableName + * @param maps + * @throws ServiceException + */ + @Transactional(rollbackFor = Exception.class) + void bactchExecuteInsert(String tableName, List<Map<String, String>> maps) throws ServiceException{ + ExecutorService executor = Executors.newFixedThreadPool(10); // 鍒涘缓涓�涓浐瀹氬ぇ灏忕殑绾跨▼姹� + List<Map<String, String>> threadSafeMaps = new CopyOnWriteArrayList<>(maps); + for (int i = 0; i < threadSafeMaps.size(); i += MAX_IMPORT_NUM) { + final int startIndex = i; + final int endIndex = Math.min(i + MAX_IMPORT_NUM, maps.size()); + + executor.execute(() -> { + List<Map<String, String>> subList = threadSafeMaps.subList(startIndex, endIndex); + // 璋冪敤鎻掑叆鏁版嵁搴撶殑鏂规硶 + commonsMapper.insertByBaseModel(tableName, threadSafeMaps.get(0), subList); + }); + } + // 鍏抽棴绾跨▼姹� + executor.shutdown(); + try { + // 绛夊緟鎵�鏈変换鍔℃墽琛屽畬鎴� + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + // 澶勭悊寮傚父 + throw new ServiceException("澶氱嚎绋嬫柟寮忔墽琛屾壒閲忔彃鍏ユ椂浜х敓閿欒:"+e.getMessage()); + } + } + + /** * 浼犲叆涓氬姟绫诲瀷浠ュ強ID鏌ヨ涓氬姟琛ㄦ暟鎹槸鍚﹂噸澶� * * @param btmType 涓氬姟绫诲瀷 diff --git a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmIOServiceImpl.java b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmIOServiceImpl.java index 28ca14e..588fe6f 100644 --- a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmIOServiceImpl.java +++ b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmIOServiceImpl.java @@ -1046,7 +1046,7 @@ log.error("鎵归噺浜х敓缂栫爜鐨勬椂鍊欏嚭閿欎簡", e); thisCbos.stream().forEach(cbo -> { String rowIndex = cbo.getAttributeValue(IMPORT_ROW_INDEX); - errorMap.put(rowIndex, errorMap.getOrDefault(rowIndex, "") + ";绯荤粺閿欒锛屽瓨鍌ㄦ暟鎹殑鏃跺�欏嚭閿欎簡"); + errorMap.put(rowIndex, errorMap.getOrDefault(rowIndex, "") + ";绯荤粺閿欒锛屽瓨鍌ㄦ暟鎹殑鏃跺�欏嚭閿欎簡:"+e.getMessage()); }); } diff --git a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmProductCodeServiceImpl.java b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmProductCodeServiceImpl.java index a5058b7..f58d927 100644 --- a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmProductCodeServiceImpl.java +++ b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmProductCodeServiceImpl.java @@ -41,6 +41,7 @@ import javax.annotation.Resource; import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -81,11 +82,11 @@ private FormulaServiceImpl formulaService; @Override - @Transactional(rollbackFor = VciBaseException.class) + @Transactional(rollbackFor = Exception.class) public List<String> productCodeAndSaveData(CodeClassifyFullInfoBO classifyFullInfoBO, CodeClassifyTemplateVO templateVO, CodeRuleVO ruleVO, List<CodeOrderSecDTO> secDTOList, List<BaseModel> dataCBOList) throws Exception { dataCBOList = dataCBOList.stream().sorted(((o1, o2) -> o1.getCreateTime().compareTo(o2.getCreateTime()))).collect(Collectors.toList()); List<String> codeList = new ArrayList<>(); - + final CodeRuleVO finalRuleVO = ruleVO; /***** * 淇濊瘉骞跺彂鐨勬椂鍊欙紝鏈�澶ф祦姘村彿閮藉鐨勶紝浣嗘槸杩欑鍔犻攣鏈夊紛绔� */ @@ -96,7 +97,7 @@ //鍘嗗彶鏁版嵁鎵ц鐨勬椂鍊欙紝杩欎釜绯荤粺浼氬緢鍗� //涓昏鏄负浜嗗綍鍏ユ渶澶ф祦姘村彿鍜宎llcode //SessionInfo sessionInfo = VciBaseUtil.getCurrentUserSessionInfo(); - List<CodeAllCode> allCodeDOList = new ArrayList<>(); + List<CodeAllCode> allCodeDOList = new CopyOnWriteArrayList<>(); Map<String/**娴佹按渚濇嵁**/, Map<String/**鐮佹鐨勪富閿�**/,Double/**鏈�澶ф祦姘村彿**/>> maxSerialMap = new HashMap<>(); // TODO 澶氱嚎绋嬫祦寮曞彂鐨勯棶棰樺凡淇敼 dataCBOList.parallelStream().forEach(cbo->{ @@ -109,7 +110,7 @@ cbo.getData().remove(CODE_SEC_LENGTH_FIELD);//灏嗘key闄ゅ幓 cbo.getData().remove(IMPORT_ROW_INDEX);//灏嗘key闄ゅ幓 cbo.getData().remove("codeclassifyid");//灏嗘key闄ゅ幓 - List<CodeBasicSecVO> secVOList = ruleVO.getSecVOList().stream().sorted(((o1, o2) -> o1.getOrderNum().compareTo(o2.getOrderNum()))).collect(Collectors.toList()); + List<CodeBasicSecVO> secVOList = finalRuleVO.getSecVOList().stream().sorted(((o1, o2) -> o1.getOrderNum().compareTo(o2.getOrderNum()))).collect(Collectors.toList()); Map<String/**鐮佹鐨勪富閿�**/,String/**鐮佹鐨勫��**/> serialValueMap = new HashMap<>(); Map<String, CodeBasicSecVO> secVOMap = secVOList.stream().collect(Collectors.toMap(s -> s.getOid(), t -> t)); for (int i = 0; i < secLengths.length; i++) { @@ -168,7 +169,7 @@ CodeAllCode allCodeDO = new CodeAllCode(); DefaultAttrAssimtUtil.addDefaultAttrAssimt(allCodeDO, MdmBtmTypeConstant.CODE_ALL_CODE); allCodeDO.setCodeClassifyOid(classifyFullInfoBO.getCurrentClassifyVO().getOid()); - allCodeDO.setCodeRuleOid(ruleVO.getOid()); + allCodeDO.setCodeRuleOid(finalRuleVO.getOid()); allCodeDO.setId(cbo.getId()); allCodeDO.setCodeClassifyTemplateOid(templateVO.getOid()); allCodeDO.setCreateCodeBtm(cbo.getBtmname()); @@ -186,7 +187,7 @@ maxSerialMap.forEach((serialUnit,secOidMaxMap)->{ secOidMaxMap.forEach((secOid,maxSerial)->{ QueryWrapper<CodeSerialValue> queryWrapper = new QueryWrapper<>(); - queryWrapper.eq("codeRuleOid", ruleVO.getOid()); + queryWrapper.eq("codeRuleOid", finalRuleVO.getOid()); queryWrapper.eq("serialUnit", serialUnit); //杩欎釜瀛楁鏄负浜嗚В鍐冲涓祦姘寸殑闂 queryWrapper.eq("codeSecOid", secOid); @@ -205,7 +206,7 @@ //娌℃湁 CodeSerialValue serialValueDO = new CodeSerialValue(); DefaultAttrAssimtUtil.addDefaultAttrAssimt(serialValueDO, MdmBtmTypeConstant.CODE_SERIAL_VALUE); - serialValueDO.setCodeRuleOid(ruleVO.getOid()); + serialValueDO.setCodeRuleOid(finalRuleVO.getOid()); serialValueDO.setSerialUnit(serialUnit); serialValueDO.setCodeSecOid(secOid); serialValueDO.setMaxSerial(maxSerial.toString()); @@ -304,11 +305,12 @@ } codeAllCodeService.saveBatch(addCodeDOs); } + mdmEngineService.insertBatchByType(dataCBOList.get(0).getBtmname(),dataCBOList); } return codeList; }else { - List<CodeBasicSecVO> secVOList = ruleVO.getSecVOList().stream().sorted(((o1, o2) -> o1.getOrderNum().compareTo(o2.getOrderNum()))).collect(Collectors.toList()); + List<CodeBasicSecVO> secVOList = finalRuleVO.getSecVOList().stream().sorted(((o1, o2) -> o1.getOrderNum().compareTo(o2.getOrderNum()))).collect(Collectors.toList()); List<String> serialUnitList = new LinkedList<>(); List<String> secValueList = new ArrayList<>(); Map<String, String> secValueMap = secDTOList.stream().collect(Collectors.toMap(s -> s.getSecOid(), s -> s.getSecValue()==null?"":s.getSecValue())); @@ -344,7 +346,7 @@ switchAttrSecValue(attrSecVOList, cbo, thisSecValueList, attrSevIsSerialDepend, thisSerialUnitList); String serialUnitString = thisSerialUnitList.size() == 0 ? EMPTY_SERIAL_UNIT : thisSerialUnitList.stream().collect(Collectors.joining(SERIAL_UNIT_SPACE)); - switchSerialSecValue(serialSecVOList, attrSevIsSerialDepend, ruleVO, serialUnitString, maxSerialValueMap, thisSecValueList, lastMaxSerialValueMap, i == 0); + switchSerialSecValue(serialSecVOList, attrSevIsSerialDepend, finalRuleVO, serialUnitString, maxSerialValueMap, thisSecValueList, lastMaxSerialValueMap, i == 0); //缁勮缂栫爜鐨勫�� cbo.setId(thisSecValueList.stream().collect(Collectors.joining())); @@ -359,10 +361,10 @@ } } //瑕佸瓨鍌ㄦ渶鍚庣殑鍏ㄩ儴allcode - wrapperAllCode(classifyFullInfoBO, ruleVO, cbo, templateVO, allCodeDOList, serialUnitString, sb.toString()); + wrapperAllCode(classifyFullInfoBO, finalRuleVO, cbo, templateVO, allCodeDOList, serialUnitString, sb.toString()); } //澶勭悊鏈�澶ф祦姘� - saveSerialValue( ruleVO, lastMaxSerialValueMap, maxSerialValueMap); + saveSerialValue( finalRuleVO, lastMaxSerialValueMap, maxSerialValueMap); allCodeDOList.stream().forEach( allCode -> {DefaultAttrAssimtUtil.addDefaultAttrAssimt(allCode,"codeallcode");allCode.setLctid("codeAllCodeLC");} -- Gitblit v1.9.3