/** * Copyright 2018-2118 the original author or authors. *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.vci.dbsync.dbimpl; import com.vci.dbsync.entity.JobItem; import com.vci.dbsync.filesync.FileSync; import com.vci.dbsync.log.SyncLog; import com.vci.dbsync.DBSync; import com.vci.dbsync.utils.Tool; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; /** * @author liuyazhuang * @date 2018/9/11 10:21 * @description MySQL数据库同步实现 * @version 1.0.0 */ public class MySQLSync extends AbstractDBSync implements DBSync { //private Logger logger = Logger.getLogger(MySQLSync.class); @Override public long assembleSQL(JobItem jobItem, Connection srcConn, Connection destConn, FileSync fileSync) throws SQLException { String srcSql = jobItem.getSrcSql(); String uniqueName = Tool.generateString(6) + "_" + jobItem.getName(); String[] fields = jobItem.getDestTableFields().split(","); fields = this.trimArrayItem(fields); String[] updateFields = jobItem.getDestTableUpdate().split(","); updateFields = this.trimArrayItem(updateFields); String destTable = jobItem.getDestTable(); String destTableKey = jobItem.getDestTableKey(); PreparedStatement pst = srcConn.prepareStatement(srcSql); ResultSet rs = pst.executeQuery(); StringBuffer sql = new StringBuffer(); sql.append("insert into ").append(jobItem.getDestTable()).append(" (").append(jobItem.getDestTableFields()).append(") values "); long count = 0; while (rs.next()) { sql.append("("); for (int index = 0; index < fields.length; index++) { sql.append("'").append(rs.getString(fields[index])).append(index == (fields.length - 1) ? "'" : "',"); } sql.append("),"); count++; } if (rs != null) { rs.close(); } if (pst != null) { pst.close(); } if (count > 0) { sql = sql.deleteCharAt(sql.length() - 1); if ((!jobItem.getDestTableUpdate().equals("")) && (!jobItem.getDestTableKey().equals(""))) { sql.append(" on duplicate key update "); for (int index = 0; index < updateFields.length; index++) { sql.append(updateFields[index]).append("= values(").append(updateFields[index]).append(index == (updateFields.length - 1) ? ")" : "),"); } StringBuffer sb = new StringBuffer(); sb.append("alter table ").append(destTable).append(" add constraint ").append(uniqueName).append(" unique (").append(destTableKey).append(");").append(sql.toString()) .append(";alter table ").append(destTable).append(" drop index ").append(uniqueName); //lstSQL.add(sb.toString()); //return true;//new String[] {sb.toString()}; } SyncLog.logger.debug(sql.toString()); //lstSQL.add(sql.toString()); //return true;//new String[] {sql.toString()}; } return count; //return false;//null; } @Override public void executeSQL(List sqls, Connection conn) throws SQLException { PreparedStatement pst = conn.prepareStatement(""); //String[] sqlList = sql.split(";"); for (int index = 0; index < sqls.size(); index++) { pst.addBatch(sqls.get(index)); } pst.executeBatch(); conn.commit(); pst.close(); } }