/** * Copyright 2018-2118 the original author or authors. *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.vci.dbsync; import org.quartz.Job; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import com.vci.corba.common.VCIError; import com.vci.dbsync.entity.DBInfo; import com.vci.dbsync.entity.JobInfo; import com.vci.dbsync.entity.JobItem; import com.vci.dbsync.entity.VMInfo; import com.vci.dbsync.filesync.FileSync; import com.vci.dbsync.log.SyncLog; import com.vci.dbsync.DBSyncFactory; import com.vci.dbsync.DBSync; //import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.ArrayList; import java.util.Date; /** * @author liuyazhuang * @date 2018/9/11 10:30 * @description 同步数据库任务的具体实现 * @version 1.0.0 */ public class JobTask implements Job { //private Logger logger = Logger.getLogger(JobTask.class); /** * 执行同步数据库任务 * */ @Override public void execute(JobExecutionContext context) throws JobExecutionException { JobDataMap data = context.getJobDetail().getJobDataMap(); JobInfo jobInfo = (JobInfo) data.get("jobInfo"); SyncLog.logger.info("开始任务【" + jobInfo.getName() + "】 调度: " + new Date()); Connection inConn = null; Connection outConn = null; DBInfo srcDb = (DBInfo) data.get("srcDb"); DBInfo destDb = (DBInfo) data.get("destDb"); VMInfo srcVM = (VMInfo) data.get("srcVM"); VMInfo destVM = (VMInfo) data.get("destVM"); String logTitle = (String) data.get("logTitle"); try { inConn = createConnection(srcDb); outConn = createConnection(destDb); if (inConn == null) { SyncLog.logger.info("请检查源数据连接!"); return; } else if (outConn == null) { SyncLog.logger.info("请检查目标数据连接!"); return; } DBSync dbHelper = DBSyncFactory.create(destDb.getDbtype()); if (dbHelper == null) { SyncLog.logger.info("不支持的数据库类型!"); return; } JobItem[] items = jobInfo.Items(); //ArrayList lstSql = new ArrayList(); for (JobItem item : items) { SyncLog.logger.info("开始同步: " + item.getName()); long start = new Date().getTime(); FileSync fileSync = null; if (item.getName().equalsIgnoreCase("PLATFORMBTM_FILEOBJECT")){ fileSync = new FileSync(srcVM, destVM); if (item.getDirection()) { fileSync.Init(true); } else { fileSync.Init(false); } } long count = 0; if (item.getDirection()) { count = dbHelper.assembleSQL(item, inConn, outConn, fileSync); } else { count = dbHelper.assembleSQL(item, outConn, inConn, fileSync); } SyncLog.logger.info("同步【" + item.getName() + "】查询到 " + count + " 条记录,同步耗时: " + (new Date().getTime() - start) + "ms"); if (fileSync != null){ SyncLog.logger.info("同步【" + item.getName() + "】文件数: " + fileSync.getCount()); fileSync.close(); } // try { // FileWriter file = new FileWriter("e:\\test.sql"); // // String sNewLine = System.getProperty("line.separator"); // for (int i = 0; i < sqls.length; i++) { // file.write(sqls[i]); // 向文件中写入数据 // file.write(sNewLine); // \r\n表示换行 // } // // file.write(sqls.toString()); // file.close(); // } catch (IOException e) { // e.printStackTrace(); // } // SyncLog.logger.info("组装SQL耗时:: " + (new Date().getTime() - start) + "ms"); // if (lstSql.size() > 0) { // SyncLog.logger.debug(lstSql); // long eStart = new Date().getTime(); // dbHelper.executeSQL(lstSql, outConn); // SyncLog.logger.info("执行SQL语句耗时: " + (new Date().getTime() - eStart) + "ms"); // } } } catch (SQLException e) { try { outConn.rollback(); } catch (SQLException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } e.printStackTrace(); SyncLog.logger.error(logTitle + e.getMessage()); SyncLog.logger.error(logTitle + " SQL执行出错,请检查是否存在语法错误"); } catch (VCIError ex){ ex.printStackTrace(); } catch (Exception ex){ ex.printStackTrace(); } finally { SyncLog.logger.info("关闭源数据库连接"); destoryConnection(inConn); SyncLog.logger.info("关闭目标数据库连接"); destoryConnection(outConn); } } /** * 创建数据库连接 * @param db * @return */ private Connection createConnection(DBInfo db) { try { Class.forName(db.getDriver()); Connection conn = DriverManager.getConnection(db.getUrl(), db.getUsername(), db.getPassword()); conn.setAutoCommit(false); return conn; } catch (Exception e) { SyncLog.logger.error(e.getMessage()); } return null; } /** * 关闭并销毁数据库连接 * @param conn */ private void destoryConnection(Connection conn) { try { if (conn != null) { conn.close(); conn = null; //SyncLog.logger.info("数据库连接关闭"); } } catch (SQLException e) { e.printStackTrace(); } } }