diff --git a/mykit-data-monitor/src/main/java/io/mykit/data/monitor/oracle/dcn/BlockingQueueFactory.java b/mykit-data-monitor/src/main/java/io/mykit/data/monitor/oracle/dcn/BlockingQueueFactory.java
new file mode 100644
index 0000000..3d966e0
--- /dev/null
+++ b/mykit-data-monitor/src/main/java/io/mykit/data/monitor/oracle/dcn/BlockingQueueFactory.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright 2020-9999 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mykit.data.monitor.oracle.dcn;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * @author binghe
+ * @version 1.0.0
+ * @description BlockingQueue队列
+ */
+public class BlockingQueueFactory {
+
+ private static volatile BlockingQueue queue;
+
+ static {
+ queue = new LinkedBlockingQueue<>(500);
+ }
+
+ public static BlockingQueue getInstance(){
+ return queue;
+ }
+}
diff --git a/mykit-data-monitor/src/main/java/io/mykit/data/monitor/oracle/dcn/DBChangeNotification.java b/mykit-data-monitor/src/main/java/io/mykit/data/monitor/oracle/dcn/DBChangeNotification.java
index 56a7b68..f9a85a0 100644
--- a/mykit-data-monitor/src/main/java/io/mykit/data/monitor/oracle/dcn/DBChangeNotification.java
+++ b/mykit-data-monitor/src/main/java/io/mykit/data/monitor/oracle/dcn/DBChangeNotification.java
@@ -42,6 +42,9 @@ public class DBChangeNotification {
private Map tables;
private List listeners;
+ //执行Task的线程,消费queue队列的数据
+ private Thread worker;
+
public DBChangeNotification(String username, String password, String url) {
this.username = username;
this.password = password;
@@ -74,6 +77,12 @@ public void start() throws SQLException {
clean(statement, regId, callback);
statement.setDatabaseChangeRegistration(dcr);
+ //设置消费信息的信息并启动
+ this.worker = new Thread(new Task());
+ worker.setName(new StringBuilder("dcn-parser-").append(host).append(":").append(port).append("_").append(regId).toString());
+ worker.setDaemon(false);
+ worker.start();
+
// 配置监听表
for (Map.Entry m : tables.entrySet()) {
statement.executeQuery(String.format(QUERY_TABLE_SQL, m.getValue()));
@@ -87,6 +96,12 @@ public void start() throws SQLException {
}
public void close() {
+
+ if(null != worker && !worker.isInterrupted()){
+ worker.interrupt();
+ worker = null;
+ }
+
try {
if (null != statement) {
statement.close();
@@ -202,7 +217,34 @@ public void onDatabaseChangeNotification(DatabaseChangeEvent event) {
RowChangeDescription[] rds = td.getRowChangeDescription();
for (RowChangeDescription rd : rds) {
RowChangeDescription.RowOperation opr = rd.getRowOperation();
- parseEvent(tables.get(td.getObjectNumber()), rd.getRowid().stringValue(), opr);
+ //parseEvent(tables.get(td.getObjectNumber()), rd.getRowid().stringValue(), opr);
+ try {
+ BlockingQueueFactory.getInstance().put(new DCNEvent(tables.get(td.getObjectNumber()), rd.getRowid().stringValue(), opr));
+ //logger.info("向队列中添加数据,当前队列长度为:{}", BlockingQueueFactory.getInstance().size());
+ } catch (InterruptedException e) {
+ logger.error("Table[{}], RowId:{}, Code:{}, Error:{}", tables.get(td.getObjectNumber()), rd.getRowid().stringValue(), rd.getRowOperation().getCode(), e.getMessage());
+ }
+ }
+ }
+ }
+ }
+
+ final class Task implements Runnable{
+
+ @Override
+ public void run() {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ // 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
+ DCNEvent event = BlockingQueueFactory.getInstance().take();
+ //logger.info("从队列中消费数据,当前队列长度为:{}", BlockingQueueFactory.getInstance().size());
+ //取出的对象不为空
+ if(null != event){
+ parseEvent(event.getTableName(), event.getRowId(), event.getEvent());
+ }
+ } catch (InterruptedException e) {
+ logger.error("程序异常:{}", e);
+ break;
}
}
}
@@ -210,32 +252,31 @@ public void onDatabaseChangeNotification(DatabaseChangeEvent event) {
private void parseEvent(String tableName, String rowId, RowChangeDescription.RowOperation event) {
List data = new ArrayList<>();
data.add(rowId);
-
if (event.getCode() != TableChangeDescription.TableOperation.DELETE.getCode()) {
ResultSet rs = null;
try {
- //修复由于并发情况下rs未关闭时再次执行executeQuery方法,导致的Oracle数据库抛出“结果集已耗尽”的异常,目前采用加锁方式解决
- if(LockUtils.tryLock()){
+ if(LockUtils.tryLock()) {
+ //logger.info("执行的SQL语句为: {}",String.format(QUERY_ROW_DATA_SQL, tableName, rowId));
rs = statement.executeQuery(String.format(QUERY_ROW_DATA_SQL, tableName, rowId));
- final int size = rs.getMetaData().getColumnCount();
- while (rs.next()) {
- for (int i = 1; i <= size; i++) {
- data.add(rs.getObject(i));
+ if(rs != null) {
+ final int size = rs.getMetaData().getColumnCount();
+ //logger.error("获取到的数据表:{}, 中的数据列数量:{}", tableName, size);
+ while (rs.next()) {
+ for (int i = 1; i <= size; i++) {
+ data.add(rs.getObject(i));
+ }
}
}
}
} catch (SQLException e) {
- logger.error(e.getMessage());
+ logger.error("异常:{}", e);
} finally {
close(rs);
rs = null;
- //解锁操作
LockUtils.unlock();
}
}
-
listeners.forEach(e -> e.onEvents(new RowChangeEvent(tableName, event.getCode(), data)));
}
}
-
}
\ No newline at end of file
diff --git a/mykit-data-monitor/src/main/java/io/mykit/data/monitor/oracle/dcn/DBChangeNotificationBak.java b/mykit-data-monitor/src/main/java/io/mykit/data/monitor/oracle/dcn/DBChangeNotificationBak.java
new file mode 100644
index 0000000..56c3f78
--- /dev/null
+++ b/mykit-data-monitor/src/main/java/io/mykit/data/monitor/oracle/dcn/DBChangeNotificationBak.java
@@ -0,0 +1,241 @@
+/**
+ * DBSyncer Copyright 2019-2024 All Rights Reserved.
+ */
+package io.mykit.data.monitor.oracle.dcn;
+
+import io.mykit.data.utils.lock.LockUtils;
+import oracle.jdbc.OracleDriver;
+import oracle.jdbc.OracleStatement;
+import oracle.jdbc.dcn.*;
+import oracle.jdbc.driver.OracleConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.*;
+
+/**
+ * 授予登录账号监听事件权限
+ * sqlplus/as sysdba
+ *
+ *
grant change notification to AE86
+ */
+public class DBChangeNotificationBak {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private static final String QUERY_ROW_DATA_SQL = "SELECT * FROM \"%s\" WHERE ROWID = '%s'";
+ private static final String QUERY_TABLE_ALL_SQL = "SELECT DATA_OBJECT_ID, OBJECT_NAME FROM DBA_OBJECTS WHERE OWNER='%S' AND OBJECT_TYPE = 'TABLE' AND OBJECT_NAME NOT IN (SELECT OBJECT_NAME FROM DBA_OBJECTS WHERE OBJECT_TYPE = 'MATERIALIZED VIEW')";
+ private static final String QUERY_TABLE_SQL = "SELECT 1 FROM \"%s\" WHERE 1=2";
+ private static final String QUERY_CALLBACK_SQL = "SELECT REGID,CALLBACK FROM USER_CHANGE_NOTIFICATION_REGS";
+ private static final String CALLBACK = "net8://(ADDRESS=(PROTOCOL=tcp)(HOST=%s)(PORT=%s))?PR=0";
+
+ private String username;
+ private String password;
+ private String url;
+ private OracleConnection conn;
+ private OracleStatement statement;
+ private DatabaseChangeRegistration dcr;
+ private Map tables;
+ private List listeners;
+
+ public DBChangeNotificationBak(String username, String password, String url) {
+ this.username = username;
+ this.password = password;
+ this.url = url;
+ this.listeners = new ArrayList<>();
+ }
+
+ public void start() throws SQLException {
+ try {
+ conn = connect();
+ statement = (OracleStatement) conn.createStatement();
+ readTables();
+
+ Properties prop = new Properties();
+ prop.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
+ prop.setProperty(OracleConnection.DCN_IGNORE_UPDATEOP, "false");
+ prop.setProperty(OracleConnection.DCN_IGNORE_INSERTOP, "false");
+ prop.setProperty(OracleConnection.DCN_IGNORE_INSERTOP, "false");
+
+ // add the listener:NTFDCNRegistration
+ dcr = conn.registerDatabaseChangeNotification(prop);
+ dcr.addListener(new DCNListener());
+
+ final long regId = dcr.getRegId();
+ final String host = getHost();
+ final int port = getPort(dcr);
+ final String callback = String.format(CALLBACK, host, port);
+ logger.info("regId:{}, callback:{}", regId, callback);
+ // clean the registrations
+ clean(statement, regId, callback);
+ statement.setDatabaseChangeRegistration(dcr);
+
+ // 配置监听表
+ for (Map.Entry m : tables.entrySet()) {
+ statement.executeQuery(String.format(QUERY_TABLE_SQL, m.getValue()));
+ }
+ } catch (SQLException ex) {
+ // if an exception occurs, we need to close the registration in order
+ // to interrupt the thread otherwise it will be hanging around.
+ close();
+ throw ex;
+ }
+ }
+
+ public void close() {
+ try {
+ if (null != statement) {
+ statement.close();
+ }
+ } catch (SQLException e) {
+ logger.error(e.getMessage());
+ }
+
+ try {
+ if (null != conn) {
+ conn.unregisterDatabaseChangeNotification(dcr);
+ conn.close();
+ }
+ } catch (SQLException e) {
+ logger.error(e.getMessage());
+ }
+ }
+
+ private void close(ResultSet rs) {
+ if (null != rs) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ logger.error(e.getMessage());
+ }
+ }
+ }
+
+ private void readTables() {
+ tables = new LinkedHashMap<>();
+ ResultSet rs = null;
+ try {
+ String sql = String.format(QUERY_TABLE_ALL_SQL, username);
+ rs = statement.executeQuery(sql);
+ while (rs.next()) {
+ int tableId = rs.getInt(1);
+ String tableName = rs.getString(2);
+ tables.put(tableId, tableName);
+ }
+ } catch (SQLException e) {
+ logger.error(e.getMessage());
+ } finally {
+ close(rs);
+ }
+ }
+
+ private String getHost() {
+ if (url != null) {
+ String host = url.substring(url.indexOf("@") + 1);
+ host = host.substring(0, host.indexOf(":"));
+ return host;
+ }
+ return "127.0.0.1";
+ }
+
+ private int getPort(DatabaseChangeRegistration dcr) {
+ Object obj = null;
+ try {
+ // 反射获取抽象属性 NTFRegistration
+ Class clazz = dcr.getClass().getSuperclass();
+ Method method = clazz.getDeclaredMethod("getClientTCPPort");
+ method.setAccessible(true);
+ obj = method.invoke(dcr, new Object[]{});
+ } catch (NoSuchMethodException e) {
+ logger.error(e.getMessage());
+ } catch (IllegalAccessException e) {
+ logger.error(e.getMessage());
+ } catch (InvocationTargetException e) {
+ logger.error(e.getMessage());
+ }
+ return null == obj ? 0 : Integer.parseInt(String.valueOf(obj));
+ }
+
+ private void clean(OracleStatement statement, long excludeRegId, String excludeCallback) {
+ ResultSet rs = null;
+ try {
+ rs = statement.executeQuery(QUERY_CALLBACK_SQL);
+ while (rs.next()) {
+ long regId = rs.getLong(1);
+ String callback = rs.getString(2);
+
+ if (regId != excludeRegId && callback.equals(excludeCallback)) {
+ logger.info("Clean regid:{}, callback:{}", regId, callback);
+ conn.unregisterDatabaseChangeNotification(regId, callback);
+ }
+ }
+ } catch (SQLException e) {
+ logger.error(e.getMessage());
+ } finally {
+ close(rs);
+ }
+ }
+
+ private OracleConnection connect() throws SQLException {
+ OracleDriver dr = new OracleDriver();
+ Properties prop = new Properties();
+ prop.setProperty("user", username);
+ prop.setProperty("password", password);
+ return (OracleConnection) dr.connect(url, prop);
+ }
+
+ public void addRowEventListener(RowEventListener rowEventListener) {
+ this.listeners.add(rowEventListener);
+ }
+
+ final class DCNListener implements DatabaseChangeListener {
+
+ @Override
+ public void onDatabaseChangeNotification(DatabaseChangeEvent event) {
+ TableChangeDescription[] tds = event.getTableChangeDescription();
+
+ for (TableChangeDescription td : tds) {
+ RowChangeDescription[] rds = td.getRowChangeDescription();
+ for (RowChangeDescription rd : rds) {
+ RowChangeDescription.RowOperation opr = rd.getRowOperation();
+ parseEvent(tables.get(td.getObjectNumber()), rd.getRowid().stringValue(), opr);
+ }
+ }
+ }
+
+ private void parseEvent(String tableName, String rowId, RowChangeDescription.RowOperation event) {
+ List data = new ArrayList<>();
+ data.add(rowId);
+
+ if (event.getCode() != TableChangeDescription.TableOperation.DELETE.getCode()) {
+ ResultSet rs = null;
+ try {
+ //修复由于并发情况下rs未关闭时再次执行executeQuery方法,导致的Oracle数据库抛出“结果集已耗尽”的异常,目前采用加锁方式解决
+ if(LockUtils.tryLock()){
+ rs = statement.executeQuery(String.format(QUERY_ROW_DATA_SQL, tableName, rowId));
+ final int size = rs.getMetaData().getColumnCount();
+ while (rs.next()) {
+ for (int i = 1; i <= size; i++) {
+ data.add(rs.getObject(i));
+ }
+ }
+ }
+ } catch (SQLException e) {
+ logger.error(e.getMessage());
+ } finally {
+ close(rs);
+ rs = null;
+ //解锁操作
+ LockUtils.unlock();
+ }
+ }
+
+ listeners.forEach(e -> e.onEvents(new RowChangeEvent(tableName, event.getCode(), data)));
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/mykit-data-monitor/src/main/java/io/mykit/data/monitor/oracle/dcn/DCNEvent.java b/mykit-data-monitor/src/main/java/io/mykit/data/monitor/oracle/dcn/DCNEvent.java
new file mode 100644
index 0000000..39e4f80
--- /dev/null
+++ b/mykit-data-monitor/src/main/java/io/mykit/data/monitor/oracle/dcn/DCNEvent.java
@@ -0,0 +1,82 @@
+/**
+ * Copyright 2020-9999 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mykit.data.monitor.oracle.dcn;
+
+import oracle.jdbc.dcn.RowChangeDescription.RowOperation;
+
+/**
+ * @author binghe
+ * @version 1.0.0
+ * @description 封装监听到的Oracle事件
+ */
+public final class DCNEvent {
+
+ /**
+ * 数据表名称
+ */
+ private String tableName;
+ /**
+ * 数据表rowId
+ */
+ private String rowId;
+ /**
+ * 行操作事件
+ */
+ private RowOperation event;
+
+
+ public DCNEvent() {
+ super();
+ }
+
+
+ public DCNEvent(String tableName, String rowId, RowOperation event) {
+ super();
+ this.tableName = tableName;
+ this.rowId = rowId;
+ this.event = event;
+ }
+
+
+ public String getTableName() {
+ return tableName;
+ }
+
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+
+ public String getRowId() {
+ return rowId;
+ }
+
+
+ public void setRowId(String rowId) {
+ this.rowId = rowId;
+ }
+
+
+ public RowOperation getEvent() {
+ return event;
+ }
+
+
+ public void setEvent(RowOperation event) {
+ this.event = event;
+ }
+}
diff --git a/mykit-data-utils/src/main/java/io/mykit/data/utils/lock/LockUtils.java b/mykit-data-utils/src/main/java/io/mykit/data/utils/lock/LockUtils.java
index 472e720..ecadedf 100644
--- a/mykit-data-utils/src/main/java/io/mykit/data/utils/lock/LockUtils.java
+++ b/mykit-data-utils/src/main/java/io/mykit/data/utils/lock/LockUtils.java
@@ -38,7 +38,7 @@ public class LockUtils {
*/
public static boolean tryLock() {
try {
- return LOCK.tryLock(3, TimeUnit.SECONDS);
+ return LOCK.tryLock(15, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.error("加锁异常:{}", e);
return false;