/** * Copyright 2018-2118 the original author or authors. *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.vci.dbsync.dbimpl; import com.vci.corba.common.VCIError; import com.vci.dbsync.entity.JobItem; import com.vci.dbsync.entity.Operate; import com.vci.dbsync.filesync.FileSync; import com.vci.dbsync.log.SyncLog; import com.vci.dbsync.DBSync; import java.sql.*; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.List; /** * @author Jason * @date 2019/12/17 10:24 * @description Oracle数据库同步实现 * @version 1.0.0 */ public class OracleSync extends AbstractDBSync implements DBSync { //private Logger logger = Logger.getLogger(OracleSync.class); @Override public long assembleSQL(JobItem jobItem, Connection srcConn, Connection destConn, FileSync fileSync) throws SQLException, VCIError { String srcSql = jobItem.getSrcSql(); String fieldStr = jobItem.getDestTableFields(); String[] fields = jobItem.getDestTableFields().split(","); fields = this.trimArrayItem(fields); String[] updateFields = jobItem.getDestTableUpdate().split(","); updateFields = this.trimArrayItem(updateFields); String destTableKey = jobItem.getDestTableKey(); String destTable = jobItem.getDestTable(); Statement stat = srcConn.createStatement(); ResultSet rs = stat.executeQuery(srcSql); ResultSetMetaData metaData = rs.getMetaData(); Statement destStat = destConn.createStatement(); int operate = 0; if (jobItem.getOperate() == Operate.update) operate = 1; else if (jobItem.getOperate() == Operate.insert) operate = 2; else if (jobItem.getOperate() == Operate.both) operate = 3; HashMap mapFieldType = new HashMap(); for (int i = 1 ; i <= metaData.getColumnCount(); i ++) { int type = metaData.getColumnType(i); String name = metaData.getColumnName(i); name = name.toUpperCase(); mapFieldType.put(name, type); } // 文件同步功能 if (fileSync != null && jobItem.getName().equalsIgnoreCase("PLATFORMBTM_FILEOBJECT")){ //List lstFile = new ArrayList(); String sql = "SELECT OID, TS FROM PLATFORMBTM_FILEOBJECT f WHERE f.oid=?"; PreparedStatement pst = destConn.prepareStatement(sql); while (rs.next()){ String oid = rs.getString("OID"); Timestamp ts = rs.getTimestamp("TS"); String volPath = rs.getString("FILEPATH"); String volFile = volPath + "/" + oid; pst.setString(1, oid); ResultSet rsFile = pst.executeQuery(); // 没有查询到目标对象 if (!rsFile.next() && (operate & 0x2) != 0){ // 插入 //lstFile.add(volFile); fileSync.syncFile(volFile); } else if (rsFile.next() && (operate & 0x1) != 0) { // 更新 Timestamp ts1 = rsFile.getTimestamp("TS"); if (ts != ts1) fileSync.syncFile(volFile); //lstFile.add(volFile); } rsFile.close(); } pst.close(); } rs.close(); StringBuffer sql = new StringBuffer(); long count = 0; //String sNewLine = System.getProperty("line.separator"); String sField = "", sValue; //ArrayList lstSql = new ArrayList(); rs = stat.executeQuery(srcSql); while (rs.next()) { sql.setLength(0); String[] keys = destTableKey.split(","); if (keys.length == 1) { String sKeyValue = rs.getString(destTableKey); SyncLog.logger.debug("查询到:" + destTableKey + "=" + sKeyValue); sql.append(String.format("MERGE INTO %s t using (select '%s' as id from dual) b on (t.%s = b.id)", destTable, sKeyValue, destTableKey)); } else if (keys.length > 1) { StringBuffer sbField = new StringBuffer(); StringBuffer sbCond = new StringBuffer(); for (String key : keys) { if (sbField.length() > 0) sbField.append(", "); String sKeyValue = rs.getString(destTableKey); sbField.append(String.format("'%s' as %s", sKeyValue, key)); if (sbCond.length() > 0) sbCond.append(" AND "); sbCond.append(String.format("t.%s=b.%s", key, key)); } sql.append(String.format("MERGE INTO %s t using (select %s from dual) b on (%s)", destTable, sbField.toString(), sbCond.toString())); } if ((operate & 0x1) != 0) { //jobItem.getOperate() == Operate.both || jobItem.getOperate() == Operate.update) { sql.append(" when matched then update set "); for (int index = 0; index < updateFields.length; index++) { sField = updateFields[index].toUpperCase(); if (!mapFieldType.containsKey(sField)) { continue; } sValue = rs.getString(sField); if (sValue == null)// || sValue == "null") sql.append(String.format("%s=null", sField)); else { if (sValue.indexOf('\'') > -1) sValue = sValue.replaceAll("'", "''"); switch (mapFieldType.get(sField)) { // case -7:// BIT // case -6: //TINYINT // case -5: //BIGINT // case -4: //LONGVARBINARY // case -3: //VARBINARY // case -2: //BINARY // case -1: //LONGVARCHAR // case 0: //NULL // case 1: //CHAR // case 2: //NUMERIC // case 3: //DECIMAL // case 4: //INTEGER // case 5: //SMALLINT // case 6: //FLOAT // case 7: //REAL // case 8: //DOUBLE // case 12:// VARCHAR case 93: // TIMESTAMP sql.append( String.format("%s=to_timestamp('%s', 'yyyy-mm-dd hh24:mi:ss.ff')", sField, sValue)); break; case 92: // TIME sql.append(String.format("%s=to_time('%s', 'hh24:mi:ss')", sField, sValue)); break; case 91: // DATE sql.append(String.format("%s=to_date('%s', 'yyyy-mm-dd hh24:mi:ss')", sField, sValue)); break; default: sql.append(String.format("%s='%s'", sField, sValue)); break; } } if (index != (updateFields.length - 1)) sql.append(","); } } if ((operate & 0x2) != 0){ //jobItem.getOperate() == Operate.both || jobItem.getOperate() == Operate.insert) { sql.append(" when not matched then insert ( "); sql.append(fieldStr); sql.append(") values("); for (int index = 0; index < fields.length; index++) { sField = fields[index].toUpperCase(); if (!mapFieldType.containsKey(sField)) { continue; } sValue = rs.getString(sField); if (sValue == null) sql.append("null"); else { // 单引号做转义替换 if (sValue.indexOf('\'') > -1) sValue = sValue.replaceAll("'", "''"); switch (mapFieldType.get(sField)) { // case -7:// BIT // case -6: //TINYINT // case -5: //BIGINT // case -4: //LONGVARBINARY // case -3: //VARBINARY // case -2: //BINARY // case -1: //LONGVARCHAR // case 0: //NULL // case 1: //CHAR // case 2: //NUMERIC // case 3: //DECIMAL // case 4: //INTEGER // case 5: //SMALLINT // case 6: //FLOAT // case 7: //REAL // case 8: //DOUBLE // case 12:// VARCHAR case 93: // TIMESTAMP sql.append(String.format("to_timestamp('%s', 'yyyy-mm-dd hh24:mi:ss.ff')", sValue)); break; case 92: // TIME sql.append(String.format("to_time('%s', 'hh24:mi:ss')", sValue)); break; case 91: // DATE sql.append(String.format("to_date('%s', 'yyyy-mm-dd hh24:mi:ss')", sValue)); break; default: sql.append(String.format("'%s'", sValue)); break; } } if (index != (fields.length - 1)) sql.append(","); else sql.append(")"); } } else { continue; } SyncLog.logger.debug(sql.toString()); destStat.addBatch(sql.toString()); count++; if ((count % 200) == 0) { destStat.executeBatch(); } } if ((count % 200) != 0) { destStat.executeBatch(); } destConn.commit(); if (rs != null) { rs.close(); } if (stat != null) { stat.close(); } if (destStat != null) { destStat.close(); } return count; } @Override public void executeSQL(List sqls, Connection conn) throws SQLException { //PreparedStatement pst = conn.prepareStatement("sql"); //pst.executeUpdate(); //statement.executeLargeUpdate(sql) //pst.close(); Statement state=conn.createStatement(); for (int i = 0; i < sqls.size(); i++) { state.addBatch(sqls.get(i)); } state.executeBatch(); conn.commit(); state.close(); } }