/**
* Copyright 2018-2118 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.vci.dbsync.dbimpl;
import com.vci.corba.common.VCIError;
import com.vci.dbsync.entity.JobItem;
import com.vci.dbsync.entity.Operate;
import com.vci.dbsync.filesync.FileSync;
import com.vci.dbsync.log.SyncLog;
import com.vci.dbsync.DBSync;
import java.sql.*;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* @author Jason
* @date 2019/12/17 10:24
* @description Oracle数据库同步实现
* @version 1.0.0
*/
public class OracleSync extends AbstractDBSync implements DBSync {
//private Logger logger = Logger.getLogger(OracleSync.class);
@Override
public long assembleSQL(JobItem jobItem, Connection srcConn, Connection destConn, FileSync fileSync) throws SQLException, VCIError {
String srcSql = jobItem.getSrcSql();
String fieldStr = jobItem.getDestTableFields();
String[] fields = jobItem.getDestTableFields().split(",");
fields = this.trimArrayItem(fields);
String[] updateFields = jobItem.getDestTableUpdate().split(",");
updateFields = this.trimArrayItem(updateFields);
String destTableKey = jobItem.getDestTableKey();
String destTable = jobItem.getDestTable();
Statement stat = srcConn.createStatement();
ResultSet rs = stat.executeQuery(srcSql);
ResultSetMetaData metaData = rs.getMetaData();
Statement destStat = destConn.createStatement();
int operate = 0;
if (jobItem.getOperate() == Operate.update)
operate = 1;
else if (jobItem.getOperate() == Operate.insert)
operate = 2;
else if (jobItem.getOperate() == Operate.both)
operate = 3;
HashMap mapFieldType = new HashMap();
for (int i = 1 ; i <= metaData.getColumnCount(); i ++) {
int type = metaData.getColumnType(i);
String name = metaData.getColumnName(i);
name = name.toUpperCase();
mapFieldType.put(name, type);
}
// 文件同步功能
if (fileSync != null && jobItem.getName().equalsIgnoreCase("PLATFORMBTM_FILEOBJECT")){
//List lstFile = new ArrayList();
String sql = "SELECT OID, TS FROM PLATFORMBTM_FILEOBJECT f WHERE f.oid=?";
PreparedStatement pst = destConn.prepareStatement(sql);
while (rs.next()){
String oid = rs.getString("OID");
Timestamp ts = rs.getTimestamp("TS");
String volPath = rs.getString("FILEPATH");
String volFile = volPath + "/" + oid;
pst.setString(1, oid);
ResultSet rsFile = pst.executeQuery();
// 没有查询到目标对象
if (!rsFile.next() && (operate & 0x2) != 0){ // 插入
//lstFile.add(volFile);
fileSync.syncFile(volFile);
} else if (rsFile.next() && (operate & 0x1) != 0) { // 更新
Timestamp ts1 = rsFile.getTimestamp("TS");
if (ts != ts1)
fileSync.syncFile(volFile);
//lstFile.add(volFile);
}
rsFile.close();
}
pst.close();
}
rs.close();
StringBuffer sql = new StringBuffer();
long count = 0;
//String sNewLine = System.getProperty("line.separator");
String sField = "", sValue;
//ArrayList lstSql = new ArrayList();
rs = stat.executeQuery(srcSql);
while (rs.next()) {
sql.setLength(0);
String[] keys = destTableKey.split(",");
if (keys.length == 1)
{
String sKeyValue = rs.getString(destTableKey);
SyncLog.logger.debug("查询到:" + destTableKey + "=" + sKeyValue);
sql.append(String.format("MERGE INTO %s t using (select '%s' as id from dual) b on (t.%s = b.id)", destTable, sKeyValue, destTableKey));
}
else if (keys.length > 1)
{
StringBuffer sbField = new StringBuffer();
StringBuffer sbCond = new StringBuffer();
for (String key : keys)
{
if (sbField.length() > 0)
sbField.append(", ");
String sKeyValue = rs.getString(destTableKey);
sbField.append(String.format("'%s' as %s", sKeyValue, key));
if (sbCond.length() > 0)
sbCond.append(" AND ");
sbCond.append(String.format("t.%s=b.%s", key, key));
}
sql.append(String.format("MERGE INTO %s t using (select %s from dual) b on (%s)", destTable, sbField.toString(), sbCond.toString()));
}
if ((operate & 0x1) != 0) { //jobItem.getOperate() == Operate.both || jobItem.getOperate() == Operate.update) {
sql.append(" when matched then update set ");
for (int index = 0; index < updateFields.length; index++) {
sField = updateFields[index].toUpperCase();
if (!mapFieldType.containsKey(sField)) {
continue;
}
sValue = rs.getString(sField);
if (sValue == null)// || sValue == "null")
sql.append(String.format("%s=null", sField));
else {
if (sValue.indexOf('\'') > -1)
sValue = sValue.replaceAll("'", "''");
switch (mapFieldType.get(sField)) {
// case -7:// BIT
// case -6: //TINYINT
// case -5: //BIGINT
// case -4: //LONGVARBINARY
// case -3: //VARBINARY
// case -2: //BINARY
// case -1: //LONGVARCHAR
// case 0: //NULL
// case 1: //CHAR
// case 2: //NUMERIC
// case 3: //DECIMAL
// case 4: //INTEGER
// case 5: //SMALLINT
// case 6: //FLOAT
// case 7: //REAL
// case 8: //DOUBLE
// case 12:// VARCHAR
case 93: // TIMESTAMP
sql.append(
String.format("%s=to_timestamp('%s', 'yyyy-mm-dd hh24:mi:ss.ff')", sField, sValue));
break;
case 92: // TIME
sql.append(String.format("%s=to_time('%s', 'hh24:mi:ss')", sField, sValue));
break;
case 91: // DATE
sql.append(String.format("%s=to_date('%s', 'yyyy-mm-dd hh24:mi:ss')", sField, sValue));
break;
default:
sql.append(String.format("%s='%s'", sField, sValue));
break;
}
}
if (index != (updateFields.length - 1))
sql.append(",");
}
}
if ((operate & 0x2) != 0){ //jobItem.getOperate() == Operate.both || jobItem.getOperate() == Operate.insert) {
sql.append(" when not matched then insert ( ");
sql.append(fieldStr);
sql.append(") values(");
for (int index = 0; index < fields.length; index++) {
sField = fields[index].toUpperCase();
if (!mapFieldType.containsKey(sField)) {
continue;
}
sValue = rs.getString(sField);
if (sValue == null)
sql.append("null");
else {
// 单引号做转义替换
if (sValue.indexOf('\'') > -1)
sValue = sValue.replaceAll("'", "''");
switch (mapFieldType.get(sField)) {
// case -7:// BIT
// case -6: //TINYINT
// case -5: //BIGINT
// case -4: //LONGVARBINARY
// case -3: //VARBINARY
// case -2: //BINARY
// case -1: //LONGVARCHAR
// case 0: //NULL
// case 1: //CHAR
// case 2: //NUMERIC
// case 3: //DECIMAL
// case 4: //INTEGER
// case 5: //SMALLINT
// case 6: //FLOAT
// case 7: //REAL
// case 8: //DOUBLE
// case 12:// VARCHAR
case 93: // TIMESTAMP
sql.append(String.format("to_timestamp('%s', 'yyyy-mm-dd hh24:mi:ss.ff')", sValue));
break;
case 92: // TIME
sql.append(String.format("to_time('%s', 'hh24:mi:ss')", sValue));
break;
case 91: // DATE
sql.append(String.format("to_date('%s', 'yyyy-mm-dd hh24:mi:ss')", sValue));
break;
default:
sql.append(String.format("'%s'", sValue));
break;
}
}
if (index != (fields.length - 1))
sql.append(",");
else
sql.append(")");
}
} else {
continue;
}
SyncLog.logger.debug(sql.toString());
destStat.addBatch(sql.toString());
count++;
if ((count % 200) == 0) {
destStat.executeBatch();
}
}
if ((count % 200) != 0) {
destStat.executeBatch();
}
destConn.commit();
if (rs != null) {
rs.close();
}
if (stat != null) {
stat.close();
}
if (destStat != null) {
destStat.close();
}
return count;
}
@Override
public void executeSQL(List sqls, Connection conn) throws SQLException {
//PreparedStatement pst = conn.prepareStatement("sql");
//pst.executeUpdate();
//statement.executeLargeUpdate(sql)
//pst.close();
Statement state=conn.createStatement();
for (int i = 0; i < sqls.size(); i++) {
state.addBatch(sqls.get(i));
}
state.executeBatch();
conn.commit();
state.close();
}
}