/** * Copyright 2018-2118 the original author or authors. *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.vci.dbsync.dbimpl; import com.vci.dbsync.entity.JobItem; import com.vci.dbsync.filesync.FileSync; import com.vci.dbsync.log.SyncLog; import com.vci.dbsync.DBSync; import java.sql.*; import java.util.List; /** * @author liuyazhuang * @date 2018/9/11 10:24 * @description SQL Server数据库同步实现 * @version 1.0.0 */ public class SQLServerSync extends AbstractDBSync implements DBSync { //private Logger logger = Logger.getLogger(SQLServerSync.class); @Override public long assembleSQL(JobItem jobItem, Connection srcConn, Connection destConn, FileSync fileSync) throws SQLException { String srcSql = jobItem.getSrcSql(); String fieldStr = jobItem.getDestTableFields(); String[] fields = jobItem.getDestTableFields().split(","); fields = this.trimArrayItem(fields); String[] updateFields = jobItem.getDestTableUpdate().split(","); updateFields = this.trimArrayItem(updateFields); String destTableKey = jobItem.getDestTableKey(); String destTable = jobItem.getDestTable(); Statement stat = srcConn.createStatement(); ResultSet rs = stat.executeQuery(srcSql); StringBuffer sql = new StringBuffer(); long count = 0; while (rs.next()) { sql.setLength(0); sql.append("if not exists (select ").append(destTableKey).append(" from ").append(destTable).append(" where ").append(destTableKey).append("='").append(rs.getString(destTableKey)) .append("')").append("insert into ").append(destTable).append("(").append(fieldStr).append(") values("); for (int index = 0; index < fields.length; index++) { sql.append("'").append(rs.getString(fields[index])).append(index == (fields.length - 1) ? "'" : "',"); } sql.append(") else update ").append(destTable).append(" set "); for (int index = 0; index < updateFields.length; index++) { sql.append(updateFields[index]).append("='").append(rs.getString(updateFields[index])).append(index == (updateFields.length - 1) ? "'" : "',"); } sql.append(" where ").append(destTableKey).append("='").append(rs.getString(destTableKey)).append("';"); //lstSQL.add(sql.toString()); count++; // this.logger.info("第" + count + "耗时: " + (new Date().getTime() - oneStart) + "ms"); } SyncLog.logger.info("总共查询到 " + count + " 条记录"); if (rs != null) { rs.close(); } if (stat != null) { stat.close(); } return count; //return true;//count > 0 ? new String[] {sql.toString()} : null; } @Override public void executeSQL(List sqls, Connection conn) throws SQLException { PreparedStatement pst = conn.prepareStatement(sqls.toString()); pst.executeUpdate(); conn.commit(); pst.close(); } }