ludc
2023-11-10 d2d00b57e1a8ab328e2e94049cd9149ababe7e0c
修改历史数据导入时开始多线程分批并行执行insert
已修改3个文件
72 ■■■■ 文件已修改
Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmIOServiceImpl.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmProductCodeServiceImpl.java 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmEngineServiceImpl.java
@@ -66,6 +66,7 @@
import org.springblade.core.tool.utils.Func;
import org.springblade.core.tool.utils.StringPool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.Cache;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -81,7 +82,7 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import static com.vci.ubcs.code.constant.FrameWorkLangCodeConstant.DATA_OID_NOT_EXIST;
@@ -97,6 +98,11 @@
@Service
public class MdmEngineServiceImpl implements MdmEngineService {
    /**
     * 单次sql的最多导入数量
     */
    @Value("${import.maxNum}")
    private Integer MAX_IMPORT_NUM;
    /**
     * 模板的服务
@@ -3549,10 +3555,46 @@
                throw new VciBaseException("类型转换错误:" + e.toString());
            }
        });
        return commonsMapper.insertByBaseModel(listR.getData().get(0).getTableName(), maps.get(0), maps);
        try {
            bactchExecuteInsert(listR.getData().get(0).getTableName(),maps);
        }catch (Exception e){
            throw new ServiceException("分批执行insert语句报错:"+e.getMessage());
        }
        return maps.size();
    }
    /**
     * 分批执行insert语句
     * @param tableName
     * @param maps
     * @throws ServiceException
     */
    @Transactional(rollbackFor = Exception.class)
    void bactchExecuteInsert(String tableName, List<Map<String, String>> maps) throws ServiceException{
        ExecutorService executor = Executors.newFixedThreadPool(10); // 创建一个固定大小的线程池
        List<Map<String, String>> threadSafeMaps = new CopyOnWriteArrayList<>(maps);
        for (int i = 0; i < threadSafeMaps.size(); i += MAX_IMPORT_NUM) {
            final int startIndex = i;
            final int endIndex = Math.min(i + MAX_IMPORT_NUM, maps.size());
            executor.execute(() -> {
                List<Map<String, String>> subList = threadSafeMaps.subList(startIndex, endIndex);
                // 调用插入数据库的方法
                commonsMapper.insertByBaseModel(tableName, threadSafeMaps.get(0), subList);
            });
        }
        // 关闭线程池
        executor.shutdown();
        try {
            // 等待所有任务执行完成
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            // 处理异常
            throw new ServiceException("多线程方式执行批量插入时产生错误:"+e.getMessage());
        }
    }
    /**
     * 传入业务类型以及ID查询业务表数据是否重复
     *
     * @param btmType 业务类型
Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmIOServiceImpl.java
@@ -1046,7 +1046,7 @@
                            log.error("批量产生编码的时候出错了", e);
                            thisCbos.stream().forEach(cbo -> {
                                String rowIndex = cbo.getAttributeValue(IMPORT_ROW_INDEX);
                                errorMap.put(rowIndex, errorMap.getOrDefault(rowIndex, "") + ";系统错误,存储数据的时候出错了");
                                errorMap.put(rowIndex, errorMap.getOrDefault(rowIndex, "") + ";系统错误,存储数据的时候出错了:"+e.getMessage());
                            });
                        }
Source/UBCS/ubcs-service/ubcs-code/src/main/java/com/vci/ubcs/code/service/impl/MdmProductCodeServiceImpl.java
@@ -41,6 +41,7 @@
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -81,11 +82,11 @@
    private FormulaServiceImpl formulaService;
    @Override
    @Transactional(rollbackFor = VciBaseException.class)
    @Transactional(rollbackFor = Exception.class)
    public List<String> productCodeAndSaveData(CodeClassifyFullInfoBO classifyFullInfoBO, CodeClassifyTemplateVO templateVO, CodeRuleVO ruleVO, List<CodeOrderSecDTO> secDTOList, List<BaseModel> dataCBOList) throws Exception {
        dataCBOList = dataCBOList.stream().sorted(((o1, o2) -> o1.getCreateTime().compareTo(o2.getCreateTime()))).collect(Collectors.toList());
        List<String> codeList = new ArrayList<>();
        final CodeRuleVO finalRuleVO = ruleVO;
        /*****
         * 保证并发的时候,最大流水号都对的,但是这种加锁有弊端
         */
@@ -96,7 +97,7 @@
            //历史数据执行的时候,这个系统会很卡
            //主要是为了录入最大流水号和allcode
            //SessionInfo sessionInfo = VciBaseUtil.getCurrentUserSessionInfo();
            List<CodeAllCode> allCodeDOList = new ArrayList<>();
            List<CodeAllCode> allCodeDOList = new CopyOnWriteArrayList<>();
            Map<String/**流水依据**/, Map<String/**码段的主键**/,Double/**最大流水号**/>> maxSerialMap = new HashMap<>();
            // TODO 多线程流引发的问题已修改
            dataCBOList.parallelStream().forEach(cbo->{
@@ -109,7 +110,7 @@
                cbo.getData().remove(CODE_SEC_LENGTH_FIELD);//将此key除去
                cbo.getData().remove(IMPORT_ROW_INDEX);//将此key除去
                cbo.getData().remove("codeclassifyid");//将此key除去
                List<CodeBasicSecVO> secVOList = ruleVO.getSecVOList().stream().sorted(((o1, o2) -> o1.getOrderNum().compareTo(o2.getOrderNum()))).collect(Collectors.toList());
                List<CodeBasicSecVO> secVOList = finalRuleVO.getSecVOList().stream().sorted(((o1, o2) -> o1.getOrderNum().compareTo(o2.getOrderNum()))).collect(Collectors.toList());
                Map<String/**码段的主键**/,String/**码段的值**/> serialValueMap = new HashMap<>();
                Map<String, CodeBasicSecVO> secVOMap = secVOList.stream().collect(Collectors.toMap(s -> s.getOid(), t -> t));
                for (int i = 0; i < secLengths.length; i++) {
@@ -168,7 +169,7 @@
                CodeAllCode allCodeDO = new CodeAllCode();
                DefaultAttrAssimtUtil.addDefaultAttrAssimt(allCodeDO, MdmBtmTypeConstant.CODE_ALL_CODE);
                allCodeDO.setCodeClassifyOid(classifyFullInfoBO.getCurrentClassifyVO().getOid());
                allCodeDO.setCodeRuleOid(ruleVO.getOid());
                allCodeDO.setCodeRuleOid(finalRuleVO.getOid());
                allCodeDO.setId(cbo.getId());
                allCodeDO.setCodeClassifyTemplateOid(templateVO.getOid());
                allCodeDO.setCreateCodeBtm(cbo.getBtmname());
@@ -186,7 +187,7 @@
            maxSerialMap.forEach((serialUnit,secOidMaxMap)->{
                secOidMaxMap.forEach((secOid,maxSerial)->{
                    QueryWrapper<CodeSerialValue> queryWrapper = new QueryWrapper<>();
                    queryWrapper.eq("codeRuleOid", ruleVO.getOid());
                    queryWrapper.eq("codeRuleOid", finalRuleVO.getOid());
                    queryWrapper.eq("serialUnit", serialUnit);
                    //这个字段是为了解决多个流水的问题
                    queryWrapper.eq("codeSecOid", secOid);
@@ -205,7 +206,7 @@
                        //没有
                        CodeSerialValue serialValueDO = new CodeSerialValue();
                        DefaultAttrAssimtUtil.addDefaultAttrAssimt(serialValueDO, MdmBtmTypeConstant.CODE_SERIAL_VALUE);
                        serialValueDO.setCodeRuleOid(ruleVO.getOid());
                        serialValueDO.setCodeRuleOid(finalRuleVO.getOid());
                        serialValueDO.setSerialUnit(serialUnit);
                        serialValueDO.setCodeSecOid(secOid);
                        serialValueDO.setMaxSerial(maxSerial.toString());
@@ -304,11 +305,12 @@
                    }
                    codeAllCodeService.saveBatch(addCodeDOs);
                }
                mdmEngineService.insertBatchByType(dataCBOList.get(0).getBtmname(),dataCBOList);
            }
            return codeList;
        }else {
            List<CodeBasicSecVO> secVOList = ruleVO.getSecVOList().stream().sorted(((o1, o2) -> o1.getOrderNum().compareTo(o2.getOrderNum()))).collect(Collectors.toList());
            List<CodeBasicSecVO> secVOList = finalRuleVO.getSecVOList().stream().sorted(((o1, o2) -> o1.getOrderNum().compareTo(o2.getOrderNum()))).collect(Collectors.toList());
            List<String> serialUnitList = new LinkedList<>();
            List<String> secValueList = new ArrayList<>();
            Map<String, String> secValueMap = secDTOList.stream().collect(Collectors.toMap(s -> s.getSecOid(), s -> s.getSecValue()==null?"":s.getSecValue()));
@@ -344,7 +346,7 @@
                switchAttrSecValue(attrSecVOList, cbo, thisSecValueList, attrSevIsSerialDepend, thisSerialUnitList);
                String serialUnitString = thisSerialUnitList.size() == 0 ? EMPTY_SERIAL_UNIT : thisSerialUnitList.stream().collect(Collectors.joining(SERIAL_UNIT_SPACE));
                switchSerialSecValue(serialSecVOList, attrSevIsSerialDepend, ruleVO, serialUnitString, maxSerialValueMap, thisSecValueList, lastMaxSerialValueMap, i == 0);
                switchSerialSecValue(serialSecVOList, attrSevIsSerialDepend, finalRuleVO, serialUnitString, maxSerialValueMap, thisSecValueList, lastMaxSerialValueMap, i == 0);
                //组装编码的值
                cbo.setId(thisSecValueList.stream().collect(Collectors.joining()));
@@ -359,10 +361,10 @@
                    }
                }
                //要存储最后的全部allcode
                wrapperAllCode(classifyFullInfoBO, ruleVO, cbo, templateVO, allCodeDOList, serialUnitString, sb.toString());
                wrapperAllCode(classifyFullInfoBO, finalRuleVO, cbo, templateVO, allCodeDOList, serialUnitString, sb.toString());
            }
            //处理最大流水
            saveSerialValue( ruleVO, lastMaxSerialValueMap, maxSerialValueMap);
            saveSerialValue( finalRuleVO, lastMaxSerialValueMap, maxSerialValueMap);
            allCodeDOList.stream().forEach(
                allCode -> {DefaultAttrAssimtUtil.addDefaultAttrAssimt(allCode,"codeallcode");allCode.setLctid("codeAllCodeLC");}