From d2d00b57e1a8ab328e2e94049cd9149ababe7e0c Mon Sep 17 00:00:00 2001
From: ludc
Date: 星期五, 10 十一月 2023 08:52:48 +0800
Subject: [PATCH] 修改历史数据导入时开始多线程分批并行执行insert

---
 Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmIOServiceImpl.java          |    2 
 Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java      |   46 ++++++++++++++++++++++-
 Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmProductCodeServiceImpl.java |   24 ++++++-----
 3 files changed, 58 insertions(+), 14 deletions(-)

diff --git a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java
index bbc4b11..e594481 100644
--- a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java
+++ b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java
@@ -66,6 +66,7 @@
 import org.springblade.core.tool.utils.Func;
 import org.springblade.core.tool.utils.StringPool;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.cache.Cache;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
@@ -81,7 +82,7 @@
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.*;
 import java.util.stream.Collectors;
 
 import static com.vci.ubcs.code.constant.FrameWorkLangCodeConstant.DATA_OID_NOT_EXIST;
@@ -97,6 +98,11 @@
 @Service
 public class MdmEngineServiceImpl implements MdmEngineService {
 
+	/**
+	 * 鍗曟sql鐨勬渶澶氬鍏ユ暟閲�
+	 */
+	@Value("${import.maxNum}")
+	private Integer MAX_IMPORT_NUM;
 
     /**
      * 妯℃澘鐨勬湇鍔�
@@ -3549,10 +3555,46 @@
                 throw new VciBaseException("绫诲瀷杞崲閿欒锛�" + e.toString());
             }
         });
-        return commonsMapper.insertByBaseModel(listR.getData().get(0).getTableName(), maps.get(0), maps);
+		try {
+			bactchExecuteInsert(listR.getData().get(0).getTableName(),maps);
+		}catch (Exception e){
+			throw new ServiceException("鍒嗘壒鎵цinsert璇彞鎶ラ敊:"+e.getMessage());
+		}
+        return maps.size();
     }
 
 	/**
+	 * 鍒嗘壒鎵цinsert璇彞
+	 * @param tableName
+	 * @param maps
+	 * @throws ServiceException
+	 */
+	@Transactional(rollbackFor = Exception.class)
+	void bactchExecuteInsert(String tableName, List<Map<String, String>> maps) throws ServiceException{
+		ExecutorService executor = Executors.newFixedThreadPool(10); // 鍒涘缓涓�涓浐瀹氬ぇ灏忕殑绾跨▼姹�
+		List<Map<String, String>> threadSafeMaps = new CopyOnWriteArrayList<>(maps);
+		for (int i = 0; i < threadSafeMaps.size(); i += MAX_IMPORT_NUM) {
+			final int startIndex = i;
+			final int endIndex = Math.min(i + MAX_IMPORT_NUM, maps.size());
+
+			executor.execute(() -> {
+				List<Map<String, String>> subList = threadSafeMaps.subList(startIndex, endIndex);
+				// 璋冪敤鎻掑叆鏁版嵁搴撶殑鏂规硶
+				commonsMapper.insertByBaseModel(tableName, threadSafeMaps.get(0), subList);
+			});
+		}
+		// 鍏抽棴绾跨▼姹�
+		executor.shutdown();
+		try {
+			// 绛夊緟鎵�鏈変换鍔℃墽琛屽畬鎴�
+			executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+		} catch (InterruptedException e) {
+			// 澶勭悊寮傚父
+			throw new ServiceException("澶氱嚎绋嬫柟寮忔墽琛屾壒閲忔彃鍏ユ椂浜х敓閿欒:"+e.getMessage());
+		}
+	}
+
+	/**
 	 * 浼犲叆涓氬姟绫诲瀷浠ュ強ID鏌ヨ涓氬姟琛ㄦ暟鎹槸鍚﹂噸澶�
 	 *
 	 * @param btmType 涓氬姟绫诲瀷
diff --git a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmIOServiceImpl.java b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmIOServiceImpl.java
index 28ca14e..588fe6f 100644
--- a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmIOServiceImpl.java
+++ b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmIOServiceImpl.java
@@ -1046,7 +1046,7 @@
 							log.error("鎵归噺浜х敓缂栫爜鐨勬椂鍊欏嚭閿欎簡", e);
 							thisCbos.stream().forEach(cbo -> {
 								String rowIndex = cbo.getAttributeValue(IMPORT_ROW_INDEX);
-								errorMap.put(rowIndex, errorMap.getOrDefault(rowIndex, "") + ";绯荤粺閿欒锛屽瓨鍌ㄦ暟鎹殑鏃跺�欏嚭閿欎簡");
+								errorMap.put(rowIndex, errorMap.getOrDefault(rowIndex, "") + ";绯荤粺閿欒锛屽瓨鍌ㄦ暟鎹殑鏃跺�欏嚭閿欎簡:"+e.getMessage());
 							});
 
 						}
diff --git a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmProductCodeServiceImpl.java b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmProductCodeServiceImpl.java
index a5058b7..f58d927 100644
--- a/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmProductCodeServiceImpl.java
+++ b/Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmProductCodeServiceImpl.java
@@ -41,6 +41,7 @@
 
 import javax.annotation.Resource;
 import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -81,11 +82,11 @@
 	private FormulaServiceImpl formulaService;
 
 	@Override
-	@Transactional(rollbackFor = VciBaseException.class)
+	@Transactional(rollbackFor = Exception.class)
 	public List<String> productCodeAndSaveData(CodeClassifyFullInfoBO classifyFullInfoBO, CodeClassifyTemplateVO templateVO, CodeRuleVO ruleVO, List<CodeOrderSecDTO> secDTOList, List<BaseModel> dataCBOList) throws Exception {
 		dataCBOList = dataCBOList.stream().sorted(((o1, o2) -> o1.getCreateTime().compareTo(o2.getCreateTime()))).collect(Collectors.toList());
 		List<String> codeList = new ArrayList<>();
-
+		final CodeRuleVO finalRuleVO = ruleVO;
 		/*****
 		 * 淇濊瘉骞跺彂鐨勬椂鍊欙紝鏈�澶ф祦姘村彿閮藉鐨勶紝浣嗘槸杩欑鍔犻攣鏈夊紛绔�
 		 */
@@ -96,7 +97,7 @@
 			//鍘嗗彶鏁版嵁鎵ц鐨勬椂鍊欙紝杩欎釜绯荤粺浼氬緢鍗�
 			//涓昏鏄负浜嗗綍鍏ユ渶澶ф祦姘村彿鍜宎llcode
 			//SessionInfo sessionInfo = VciBaseUtil.getCurrentUserSessionInfo();
-			List<CodeAllCode> allCodeDOList = new ArrayList<>();
+			List<CodeAllCode> allCodeDOList = new CopyOnWriteArrayList<>();
 			Map<String/**娴佹按渚濇嵁**/, Map<String/**鐮佹鐨勪富閿�**/,Double/**鏈�澶ф祦姘村彿**/>> maxSerialMap = new HashMap<>();
 			// TODO 澶氱嚎绋嬫祦寮曞彂鐨勯棶棰樺凡淇敼
 			dataCBOList.parallelStream().forEach(cbo->{
@@ -109,7 +110,7 @@
 				cbo.getData().remove(CODE_SEC_LENGTH_FIELD);//灏嗘key闄ゅ幓
 				cbo.getData().remove(IMPORT_ROW_INDEX);//灏嗘key闄ゅ幓
 				cbo.getData().remove("codeclassifyid");//灏嗘key闄ゅ幓
-				List<CodeBasicSecVO> secVOList = ruleVO.getSecVOList().stream().sorted(((o1, o2) -> o1.getOrderNum().compareTo(o2.getOrderNum()))).collect(Collectors.toList());
+				List<CodeBasicSecVO> secVOList = finalRuleVO.getSecVOList().stream().sorted(((o1, o2) -> o1.getOrderNum().compareTo(o2.getOrderNum()))).collect(Collectors.toList());
 				Map<String/**鐮佹鐨勪富閿�**/,String/**鐮佹鐨勫��**/> serialValueMap = new HashMap<>();
 				Map<String, CodeBasicSecVO> secVOMap = secVOList.stream().collect(Collectors.toMap(s -> s.getOid(), t -> t));
 				for (int i = 0; i < secLengths.length; i++) {
@@ -168,7 +169,7 @@
 				CodeAllCode allCodeDO = new CodeAllCode();
 				DefaultAttrAssimtUtil.addDefaultAttrAssimt(allCodeDO, MdmBtmTypeConstant.CODE_ALL_CODE);
 				allCodeDO.setCodeClassifyOid(classifyFullInfoBO.getCurrentClassifyVO().getOid());
-				allCodeDO.setCodeRuleOid(ruleVO.getOid());
+				allCodeDO.setCodeRuleOid(finalRuleVO.getOid());
 				allCodeDO.setId(cbo.getId());
 				allCodeDO.setCodeClassifyTemplateOid(templateVO.getOid());
 				allCodeDO.setCreateCodeBtm(cbo.getBtmname());
@@ -186,7 +187,7 @@
 			maxSerialMap.forEach((serialUnit,secOidMaxMap)->{
 				secOidMaxMap.forEach((secOid,maxSerial)->{
 					QueryWrapper<CodeSerialValue> queryWrapper = new QueryWrapper<>();
-					queryWrapper.eq("codeRuleOid", ruleVO.getOid());
+					queryWrapper.eq("codeRuleOid", finalRuleVO.getOid());
 					queryWrapper.eq("serialUnit", serialUnit);
 					//杩欎釜瀛楁鏄负浜嗚В鍐冲涓祦姘寸殑闂
 					queryWrapper.eq("codeSecOid", secOid);
@@ -205,7 +206,7 @@
 						//娌℃湁
 						CodeSerialValue serialValueDO = new CodeSerialValue();
 						DefaultAttrAssimtUtil.addDefaultAttrAssimt(serialValueDO, MdmBtmTypeConstant.CODE_SERIAL_VALUE);
-						serialValueDO.setCodeRuleOid(ruleVO.getOid());
+						serialValueDO.setCodeRuleOid(finalRuleVO.getOid());
 						serialValueDO.setSerialUnit(serialUnit);
 						serialValueDO.setCodeSecOid(secOid);
 						serialValueDO.setMaxSerial(maxSerial.toString());
@@ -304,11 +305,12 @@
 					}
 					codeAllCodeService.saveBatch(addCodeDOs);
 				}
+
 				mdmEngineService.insertBatchByType(dataCBOList.get(0).getBtmname(),dataCBOList);
 			}
 			return codeList;
 		}else {
-			List<CodeBasicSecVO> secVOList = ruleVO.getSecVOList().stream().sorted(((o1, o2) -> o1.getOrderNum().compareTo(o2.getOrderNum()))).collect(Collectors.toList());
+			List<CodeBasicSecVO> secVOList = finalRuleVO.getSecVOList().stream().sorted(((o1, o2) -> o1.getOrderNum().compareTo(o2.getOrderNum()))).collect(Collectors.toList());
 			List<String> serialUnitList = new LinkedList<>();
 			List<String> secValueList = new ArrayList<>();
 			Map<String, String> secValueMap = secDTOList.stream().collect(Collectors.toMap(s -> s.getSecOid(), s -> s.getSecValue()==null?"":s.getSecValue()));
@@ -344,7 +346,7 @@
 				switchAttrSecValue(attrSecVOList, cbo, thisSecValueList, attrSevIsSerialDepend, thisSerialUnitList);
 
 				String serialUnitString = thisSerialUnitList.size() == 0 ? EMPTY_SERIAL_UNIT : thisSerialUnitList.stream().collect(Collectors.joining(SERIAL_UNIT_SPACE));
-				switchSerialSecValue(serialSecVOList, attrSevIsSerialDepend, ruleVO, serialUnitString, maxSerialValueMap, thisSecValueList, lastMaxSerialValueMap, i == 0);
+				switchSerialSecValue(serialSecVOList, attrSevIsSerialDepend, finalRuleVO, serialUnitString, maxSerialValueMap, thisSecValueList, lastMaxSerialValueMap, i == 0);
 
 				//缁勮缂栫爜鐨勫��
 				cbo.setId(thisSecValueList.stream().collect(Collectors.joining()));
@@ -359,10 +361,10 @@
 					}
 				}
 				//瑕佸瓨鍌ㄦ渶鍚庣殑鍏ㄩ儴allcode
-				wrapperAllCode(classifyFullInfoBO, ruleVO, cbo, templateVO, allCodeDOList, serialUnitString, sb.toString());
+				wrapperAllCode(classifyFullInfoBO, finalRuleVO, cbo, templateVO, allCodeDOList, serialUnitString, sb.toString());
 			}
 			//澶勭悊鏈�澶ф祦姘�
-			saveSerialValue( ruleVO, lastMaxSerialValueMap, maxSerialValueMap);
+			saveSerialValue( finalRuleVO, lastMaxSerialValueMap, maxSerialValueMap);
 
 			allCodeDOList.stream().forEach(
 				allCode -> {DefaultAttrAssimtUtil.addDefaultAttrAssimt(allCode,"codeallcode");allCode.setLctid("codeAllCodeLC");}

--
Gitblit v1.9.3