From d2d00b57e1a8ab328e2e94049cd9149ababe7e0c Mon Sep 17 00:00:00 2001
From: ludc
Date: 星期五, 10 十一月 2023 08:52:48 +0800
Subject: [PATCH] 修改历史数据导入时开始多线程分批并行执行insert
---
Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmIOServiceImpl.java | 2
Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java | 46 ++++++++++++++++++++++-
Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmProductCodeServiceImpl.java | 24 ++++++-----
3 files changed, 58 insertions(+), 14 deletions(-)
diff --git a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java
index bbc4b11..e594481 100644
--- a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java
+++ b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java
@@ -66,6 +66,7 @@
import org.springblade.core.tool.utils.Func;
import org.springblade.core.tool.utils.StringPool;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.Cache;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -81,7 +82,7 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.*;
import java.util.stream.Collectors;
import static com.vci.ubcs.code.constant.FrameWorkLangCodeConstant.DATA_OID_NOT_EXIST;
@@ -97,6 +98,11 @@
@Service
public class MdmEngineServiceImpl implements MdmEngineService {
+ /**
+ * 鍗曟sql鐨勬渶澶氬鍏ユ暟閲�
+ */
+ @Value("${import.maxNum}")
+ private Integer MAX_IMPORT_NUM;
/**
* 妯℃澘鐨勬湇鍔�
@@ -3549,10 +3555,46 @@
throw new VciBaseException("绫诲瀷杞崲閿欒锛�" + e.toString());
}
});
- return commonsMapper.insertByBaseModel(listR.getData().get(0).getTableName(), maps.get(0), maps);
+ try {
+ bactchExecuteInsert(listR.getData().get(0).getTableName(),maps);
+ }catch (Exception e){
+ throw new ServiceException("鍒嗘壒鎵цinsert璇彞鎶ラ敊:"+e.getMessage());
+ }
+ return maps.size();
}
/**
+ * 鍒嗘壒鎵цinsert璇彞
+ * @param tableName
+ * @param maps
+ * @throws ServiceException
+ */
+ @Transactional(rollbackFor = Exception.class)
+ void bactchExecuteInsert(String tableName, List<Map<String, String>> maps) throws ServiceException{
+ ExecutorService executor = Executors.newFixedThreadPool(10); // 鍒涘缓涓�涓浐瀹氬ぇ灏忕殑绾跨▼姹�
+ List<Map<String, String>> threadSafeMaps = new CopyOnWriteArrayList<>(maps);
+ for (int i = 0; i < threadSafeMaps.size(); i += MAX_IMPORT_NUM) {
+ final int startIndex = i;
+ final int endIndex = Math.min(i + MAX_IMPORT_NUM, maps.size());
+
+ executor.execute(() -> {
+ List<Map<String, String>> subList = threadSafeMaps.subList(startIndex, endIndex);
+ // 璋冪敤鎻掑叆鏁版嵁搴撶殑鏂规硶
+ commonsMapper.insertByBaseModel(tableName, threadSafeMaps.get(0), subList);
+ });
+ }
+ // 鍏抽棴绾跨▼姹�
+ executor.shutdown();
+ try {
+ // 绛夊緟鎵�鏈変换鍔℃墽琛屽畬鎴�
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ } catch (InterruptedException e) {
+ // 澶勭悊寮傚父
+ throw new ServiceException("澶氱嚎绋嬫柟寮忔墽琛屾壒閲忔彃鍏ユ椂浜х敓閿欒:"+e.getMessage());
+ }
+ }
+
+ /**
* 浼犲叆涓氬姟绫诲瀷浠ュ強ID鏌ヨ涓氬姟琛ㄦ暟鎹槸鍚﹂噸澶�
*
* @param btmType 涓氬姟绫诲瀷
diff --git a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmIOServiceImpl.java b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmIOServiceImpl.java
index 28ca14e..588fe6f 100644
--- a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmIOServiceImpl.java
+++ b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmIOServiceImpl.java
@@ -1046,7 +1046,7 @@
log.error("鎵归噺浜х敓缂栫爜鐨勬椂鍊欏嚭閿欎簡", e);
thisCbos.stream().forEach(cbo -> {
String rowIndex = cbo.getAttributeValue(IMPORT_ROW_INDEX);
- errorMap.put(rowIndex, errorMap.getOrDefault(rowIndex, "") + ";绯荤粺閿欒锛屽瓨鍌ㄦ暟鎹殑鏃跺�欏嚭閿欎簡");
+ errorMap.put(rowIndex, errorMap.getOrDefault(rowIndex, "") + ";绯荤粺閿欒锛屽瓨鍌ㄦ暟鎹殑鏃跺�欏嚭閿欎簡:"+e.getMessage());
});
}
diff --git a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmProductCodeServiceImpl.java b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmProductCodeServiceImpl.java
index a5058b7..f58d927 100644
--- a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmProductCodeServiceImpl.java
+++ b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmProductCodeServiceImpl.java
@@ -41,6 +41,7 @@
import javax.annotation.Resource;
import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -81,11 +82,11 @@
private FormulaServiceImpl formulaService;
@Override
- @Transactional(rollbackFor = VciBaseException.class)
+ @Transactional(rollbackFor = Exception.class)
public List<String> productCodeAndSaveData(CodeClassifyFullInfoBO classifyFullInfoBO, CodeClassifyTemplateVO templateVO, CodeRuleVO ruleVO, List<CodeOrderSecDTO> secDTOList, List<BaseModel> dataCBOList) throws Exception {
dataCBOList = dataCBOList.stream().sorted(((o1, o2) -> o1.getCreateTime().compareTo(o2.getCreateTime()))).collect(Collectors.toList());
List<String> codeList = new ArrayList<>();
-
+ final CodeRuleVO finalRuleVO = ruleVO;
/*****
* 淇濊瘉骞跺彂鐨勬椂鍊欙紝鏈�澶ф祦姘村彿閮藉鐨勶紝浣嗘槸杩欑鍔犻攣鏈夊紛绔�
*/
@@ -96,7 +97,7 @@
//鍘嗗彶鏁版嵁鎵ц鐨勬椂鍊欙紝杩欎釜绯荤粺浼氬緢鍗�
//涓昏鏄负浜嗗綍鍏ユ渶澶ф祦姘村彿鍜宎llcode
//SessionInfo sessionInfo = VciBaseUtil.getCurrentUserSessionInfo();
- List<CodeAllCode> allCodeDOList = new ArrayList<>();
+ List<CodeAllCode> allCodeDOList = new CopyOnWriteArrayList<>();
Map<String/**娴佹按渚濇嵁**/, Map<String/**鐮佹鐨勪富閿�**/,Double/**鏈�澶ф祦姘村彿**/>> maxSerialMap = new HashMap<>();
// TODO 澶氱嚎绋嬫祦寮曞彂鐨勯棶棰樺凡淇敼
dataCBOList.parallelStream().forEach(cbo->{
@@ -109,7 +110,7 @@
cbo.getData().remove(CODE_SEC_LENGTH_FIELD);//灏嗘key闄ゅ幓
cbo.getData().remove(IMPORT_ROW_INDEX);//灏嗘key闄ゅ幓
cbo.getData().remove("codeclassifyid");//灏嗘key闄ゅ幓
- List<CodeBasicSecVO> secVOList = ruleVO.getSecVOList().stream().sorted(((o1, o2) -> o1.getOrderNum().compareTo(o2.getOrderNum()))).collect(Collectors.toList());
+ List<CodeBasicSecVO> secVOList = finalRuleVO.getSecVOList().stream().sorted(((o1, o2) -> o1.getOrderNum().compareTo(o2.getOrderNum()))).collect(Collectors.toList());
Map<String/**鐮佹鐨勪富閿�**/,String/**鐮佹鐨勫��**/> serialValueMap = new HashMap<>();
Map<String, CodeBasicSecVO> secVOMap = secVOList.stream().collect(Collectors.toMap(s -> s.getOid(), t -> t));
for (int i = 0; i < secLengths.length; i++) {
@@ -168,7 +169,7 @@
CodeAllCode allCodeDO = new CodeAllCode();
DefaultAttrAssimtUtil.addDefaultAttrAssimt(allCodeDO, MdmBtmTypeConstant.CODE_ALL_CODE);
allCodeDO.setCodeClassifyOid(classifyFullInfoBO.getCurrentClassifyVO().getOid());
- allCodeDO.setCodeRuleOid(ruleVO.getOid());
+ allCodeDO.setCodeRuleOid(finalRuleVO.getOid());
allCodeDO.setId(cbo.getId());
allCodeDO.setCodeClassifyTemplateOid(templateVO.getOid());
allCodeDO.setCreateCodeBtm(cbo.getBtmname());
@@ -186,7 +187,7 @@
maxSerialMap.forEach((serialUnit,secOidMaxMap)->{
secOidMaxMap.forEach((secOid,maxSerial)->{
QueryWrapper<CodeSerialValue> queryWrapper = new QueryWrapper<>();
- queryWrapper.eq("codeRuleOid", ruleVO.getOid());
+ queryWrapper.eq("codeRuleOid", finalRuleVO.getOid());
queryWrapper.eq("serialUnit", serialUnit);
//杩欎釜瀛楁鏄负浜嗚В鍐冲涓祦姘寸殑闂
queryWrapper.eq("codeSecOid", secOid);
@@ -205,7 +206,7 @@
//娌℃湁
CodeSerialValue serialValueDO = new CodeSerialValue();
DefaultAttrAssimtUtil.addDefaultAttrAssimt(serialValueDO, MdmBtmTypeConstant.CODE_SERIAL_VALUE);
- serialValueDO.setCodeRuleOid(ruleVO.getOid());
+ serialValueDO.setCodeRuleOid(finalRuleVO.getOid());
serialValueDO.setSerialUnit(serialUnit);
serialValueDO.setCodeSecOid(secOid);
serialValueDO.setMaxSerial(maxSerial.toString());
@@ -304,11 +305,12 @@
}
codeAllCodeService.saveBatch(addCodeDOs);
}
+
mdmEngineService.insertBatchByType(dataCBOList.get(0).getBtmname(),dataCBOList);
}
return codeList;
}else {
- List<CodeBasicSecVO> secVOList = ruleVO.getSecVOList().stream().sorted(((o1, o2) -> o1.getOrderNum().compareTo(o2.getOrderNum()))).collect(Collectors.toList());
+ List<CodeBasicSecVO> secVOList = finalRuleVO.getSecVOList().stream().sorted(((o1, o2) -> o1.getOrderNum().compareTo(o2.getOrderNum()))).collect(Collectors.toList());
List<String> serialUnitList = new LinkedList<>();
List<String> secValueList = new ArrayList<>();
Map<String, String> secValueMap = secDTOList.stream().collect(Collectors.toMap(s -> s.getSecOid(), s -> s.getSecValue()==null?"":s.getSecValue()));
@@ -344,7 +346,7 @@
switchAttrSecValue(attrSecVOList, cbo, thisSecValueList, attrSevIsSerialDepend, thisSerialUnitList);
String serialUnitString = thisSerialUnitList.size() == 0 ? EMPTY_SERIAL_UNIT : thisSerialUnitList.stream().collect(Collectors.joining(SERIAL_UNIT_SPACE));
- switchSerialSecValue(serialSecVOList, attrSevIsSerialDepend, ruleVO, serialUnitString, maxSerialValueMap, thisSecValueList, lastMaxSerialValueMap, i == 0);
+ switchSerialSecValue(serialSecVOList, attrSevIsSerialDepend, finalRuleVO, serialUnitString, maxSerialValueMap, thisSecValueList, lastMaxSerialValueMap, i == 0);
//缁勮缂栫爜鐨勫��
cbo.setId(thisSecValueList.stream().collect(Collectors.joining()));
@@ -359,10 +361,10 @@
}
}
//瑕佸瓨鍌ㄦ渶鍚庣殑鍏ㄩ儴allcode
- wrapperAllCode(classifyFullInfoBO, ruleVO, cbo, templateVO, allCodeDOList, serialUnitString, sb.toString());
+ wrapperAllCode(classifyFullInfoBO, finalRuleVO, cbo, templateVO, allCodeDOList, serialUnitString, sb.toString());
}
//澶勭悊鏈�澶ф祦姘�
- saveSerialValue( ruleVO, lastMaxSerialValueMap, maxSerialValueMap);
+ saveSerialValue( finalRuleVO, lastMaxSerialValueMap, maxSerialValueMap);
allCodeDOList.stream().forEach(
allCode -> {DefaultAttrAssimtUtil.addDefaultAttrAssimt(allCode,"codeallcode");allCode.setLctid("codeAllCodeLC");}
--
Gitblit v1.9.3