/**
|
* Copyright 2018-2118 the original author or authors.
|
* <p>
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
* you may not use this file except in compliance with the License.
|
* You may obtain a copy of the License at
|
* <p>
|
* http://www.apache.org/licenses/LICENSE-2.0
|
* <p>
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*/
|
package com.vci.dbsync.dbimpl;
|
|
import com.vci.corba.common.VCIError;
|
import com.vci.dbsync.entity.JobItem;
|
import com.vci.dbsync.entity.Operate;
|
import com.vci.dbsync.filesync.FileSync;
|
import com.vci.dbsync.log.SyncLog;
|
import com.vci.dbsync.DBSync;
|
|
import java.sql.*;
|
import java.text.DateFormat;
|
import java.text.SimpleDateFormat;
|
import java.util.ArrayList;
|
import java.util.HashMap;
|
import java.util.List;
|
|
/**
|
* @author Jason
|
* @date 2019/12/17 10:24
|
* @description Oracle数据库同步实现
|
* @version 1.0.0
|
*/
|
public class OracleSync extends AbstractDBSync implements DBSync {
|
//private Logger logger = Logger.getLogger(OracleSync.class);
|
|
@Override
|
public long assembleSQL(JobItem jobItem, Connection srcConn, Connection destConn, FileSync fileSync) throws SQLException, VCIError {
|
String srcSql = jobItem.getSrcSql();
|
String fieldStr = jobItem.getDestTableFields();
|
String[] fields = jobItem.getDestTableFields().split(",");
|
fields = this.trimArrayItem(fields);
|
String[] updateFields = jobItem.getDestTableUpdate().split(",");
|
updateFields = this.trimArrayItem(updateFields);
|
String destTableKey = jobItem.getDestTableKey();
|
String destTable = jobItem.getDestTable();
|
Statement stat = srcConn.createStatement();
|
|
ResultSet rs = stat.executeQuery(srcSql);
|
ResultSetMetaData metaData = rs.getMetaData();
|
|
Statement destStat = destConn.createStatement();
|
|
int operate = 0;
|
if (jobItem.getOperate() == Operate.update)
|
operate = 1;
|
else if (jobItem.getOperate() == Operate.insert)
|
operate = 2;
|
else if (jobItem.getOperate() == Operate.both)
|
operate = 3;
|
|
HashMap<String, Integer> mapFieldType = new HashMap<String, Integer>();
|
for (int i = 1 ; i <= metaData.getColumnCount(); i ++) {
|
int type = metaData.getColumnType(i);
|
String name = metaData.getColumnName(i);
|
name = name.toUpperCase();
|
|
mapFieldType.put(name, type);
|
}
|
|
// 文件同步功能
|
if (fileSync != null && jobItem.getName().equalsIgnoreCase("PLATFORMBTM_FILEOBJECT")){
|
//List<String> lstFile = new ArrayList<String>();
|
|
String sql = "SELECT OID, TS FROM PLATFORMBTM_FILEOBJECT f WHERE f.oid=?";
|
|
PreparedStatement pst = destConn.prepareStatement(sql);
|
|
while (rs.next()){
|
String oid = rs.getString("OID");
|
Timestamp ts = rs.getTimestamp("TS");
|
|
String volPath = rs.getString("FILEPATH");
|
String volFile = volPath + "/" + oid;
|
|
pst.setString(1, oid);
|
ResultSet rsFile = pst.executeQuery();
|
|
// 没有查询到目标对象
|
if (!rsFile.next() && (operate & 0x2) != 0){ // 插入
|
//lstFile.add(volFile);
|
fileSync.syncFile(volFile);
|
} else if (rsFile.next() && (operate & 0x1) != 0) { // 更新
|
Timestamp ts1 = rsFile.getTimestamp("TS");
|
if (ts != ts1)
|
fileSync.syncFile(volFile);
|
//lstFile.add(volFile);
|
}
|
rsFile.close();
|
}
|
pst.close();
|
}
|
rs.close();
|
|
StringBuffer sql = new StringBuffer();
|
|
long count = 0;
|
|
//String sNewLine = System.getProperty("line.separator");
|
String sField = "", sValue;
|
//ArrayList<String> lstSql = new ArrayList<String>();
|
rs = stat.executeQuery(srcSql);
|
while (rs.next()) {
|
sql.setLength(0);
|
|
String[] keys = destTableKey.split(",");
|
if (keys.length == 1)
|
{
|
String sKeyValue = rs.getString(destTableKey);
|
SyncLog.logger.debug("查询到:" + destTableKey + "=" + sKeyValue);
|
|
sql.append(String.format("MERGE INTO %s t using (select '%s' as id from dual) b on (t.%s = b.id)", destTable, sKeyValue, destTableKey));
|
}
|
else if (keys.length > 1)
|
{
|
StringBuffer sbField = new StringBuffer();
|
StringBuffer sbCond = new StringBuffer();
|
for (String key : keys)
|
{
|
if (sbField.length() > 0)
|
sbField.append(", ");
|
|
String sKeyValue = rs.getString(destTableKey);
|
sbField.append(String.format("'%s' as %s", sKeyValue, key));
|
|
if (sbCond.length() > 0)
|
sbCond.append(" AND ");
|
sbCond.append(String.format("t.%s=b.%s", key, key));
|
}
|
|
sql.append(String.format("MERGE INTO %s t using (select %s from dual) b on (%s)", destTable, sbField.toString(), sbCond.toString()));
|
}
|
|
if ((operate & 0x1) != 0) { //jobItem.getOperate() == Operate.both || jobItem.getOperate() == Operate.update) {
|
sql.append(" when matched then update set ");
|
|
|
for (int index = 0; index < updateFields.length; index++) {
|
sField = updateFields[index].toUpperCase();
|
if (!mapFieldType.containsKey(sField)) {
|
continue;
|
}
|
sValue = rs.getString(sField);
|
|
if (sValue == null)// || sValue == "null")
|
sql.append(String.format("%s=null", sField));
|
else {
|
if (sValue.indexOf('\'') > -1)
|
sValue = sValue.replaceAll("'", "''");
|
|
switch (mapFieldType.get(sField)) {
|
// case -7:// BIT
|
// case -6: //TINYINT
|
// case -5: //BIGINT
|
// case -4: //LONGVARBINARY
|
// case -3: //VARBINARY
|
// case -2: //BINARY
|
// case -1: //LONGVARCHAR
|
// case 0: //NULL
|
// case 1: //CHAR
|
// case 2: //NUMERIC
|
// case 3: //DECIMAL
|
// case 4: //INTEGER
|
// case 5: //SMALLINT
|
// case 6: //FLOAT
|
// case 7: //REAL
|
// case 8: //DOUBLE
|
// case 12:// VARCHAR
|
case 93: // TIMESTAMP
|
sql.append(
|
String.format("%s=to_timestamp('%s', 'yyyy-mm-dd hh24:mi:ss.ff')", sField, sValue));
|
break;
|
case 92: // TIME
|
sql.append(String.format("%s=to_time('%s', 'hh24:mi:ss')", sField, sValue));
|
break;
|
case 91: // DATE
|
sql.append(String.format("%s=to_date('%s', 'yyyy-mm-dd hh24:mi:ss')", sField, sValue));
|
break;
|
default:
|
sql.append(String.format("%s='%s'", sField, sValue));
|
break;
|
}
|
}
|
|
if (index != (updateFields.length - 1))
|
sql.append(",");
|
}
|
}
|
|
if ((operate & 0x2) != 0){ //jobItem.getOperate() == Operate.both || jobItem.getOperate() == Operate.insert) {
|
sql.append(" when not matched then insert ( ");
|
sql.append(fieldStr);
|
sql.append(") values(");
|
|
for (int index = 0; index < fields.length; index++) {
|
sField = fields[index].toUpperCase();
|
if (!mapFieldType.containsKey(sField)) {
|
continue;
|
}
|
|
sValue = rs.getString(sField);
|
|
if (sValue == null)
|
sql.append("null");
|
else {
|
// 单引号做转义替换
|
if (sValue.indexOf('\'') > -1)
|
sValue = sValue.replaceAll("'", "''");
|
|
switch (mapFieldType.get(sField)) {
|
// case -7:// BIT
|
// case -6: //TINYINT
|
// case -5: //BIGINT
|
// case -4: //LONGVARBINARY
|
// case -3: //VARBINARY
|
// case -2: //BINARY
|
// case -1: //LONGVARCHAR
|
// case 0: //NULL
|
// case 1: //CHAR
|
// case 2: //NUMERIC
|
// case 3: //DECIMAL
|
// case 4: //INTEGER
|
// case 5: //SMALLINT
|
// case 6: //FLOAT
|
// case 7: //REAL
|
// case 8: //DOUBLE
|
// case 12:// VARCHAR
|
case 93: // TIMESTAMP
|
sql.append(String.format("to_timestamp('%s', 'yyyy-mm-dd hh24:mi:ss.ff')", sValue));
|
break;
|
case 92: // TIME
|
sql.append(String.format("to_time('%s', 'hh24:mi:ss')", sValue));
|
break;
|
case 91: // DATE
|
sql.append(String.format("to_date('%s', 'yyyy-mm-dd hh24:mi:ss')", sValue));
|
break;
|
default:
|
sql.append(String.format("'%s'", sValue));
|
break;
|
}
|
}
|
|
if (index != (fields.length - 1))
|
sql.append(",");
|
else
|
sql.append(")");
|
}
|
} else {
|
continue;
|
}
|
|
SyncLog.logger.debug(sql.toString());
|
destStat.addBatch(sql.toString());
|
|
count++;
|
|
if ((count % 200) == 0) {
|
destStat.executeBatch();
|
}
|
}
|
|
if ((count % 200) != 0) {
|
destStat.executeBatch();
|
}
|
|
destConn.commit();
|
|
if (rs != null) {
|
rs.close();
|
}
|
|
if (stat != null) {
|
stat.close();
|
}
|
|
if (destStat != null) {
|
destStat.close();
|
}
|
|
return count;
|
}
|
|
@Override
|
public void executeSQL(List<String> sqls, Connection conn) throws SQLException {
|
//PreparedStatement pst = conn.prepareStatement("sql");
|
//pst.executeUpdate();
|
//statement.executeLargeUpdate(sql)
|
//pst.close();
|
|
Statement state=conn.createStatement();
|
for (int i = 0; i < sqls.size(); i++) {
|
state.addBatch(sqls.get(i));
|
}
|
state.executeBatch();
|
conn.commit();
|
state.close();
|
}
|
}
|