Skip to content

Commit

Permalink
Remove dolphinscheduler-data-quality from dolphinscheduler-task-dataq…
Browse files Browse the repository at this point in the history
…uality (apache#15791)
  • Loading branch information
ruanwenjun authored Apr 4, 2024
1 parent 98bc9ce commit 3b1de41
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 56 deletions.
5 changes: 5 additions & 0 deletions dolphinscheduler-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
<artifactId>dolphinscheduler-meter</artifactId>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-data-quality</artifactId>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-datasource-all</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.data.quality.flow.batch.reader;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.dolphinscheduler.data.quality.Constants.DATABASE;
import static org.apache.dolphinscheduler.data.quality.Constants.DB_TABLE;
import static org.apache.dolphinscheduler.data.quality.Constants.DOTS;
Expand All @@ -32,17 +33,19 @@
import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment;
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchReader;
import org.apache.dolphinscheduler.data.quality.utils.ConfigUtils;
import org.apache.dolphinscheduler.data.quality.utils.ParserUtils;

import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.net.URLDecoder;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import lombok.SneakyThrows;

/**
* AbstractJdbcSource
*/
Expand Down Expand Up @@ -74,14 +77,15 @@ public Dataset<Row> read(SparkRuntimeEnvironment env) {
return jdbcReader(env.sparkSession()).load();
}

@SneakyThrows
private DataFrameReader jdbcReader(SparkSession sparkSession) {

DataFrameReader reader = sparkSession.read()
.format(JDBC)
.option(URL, config.getString(URL))
.option(DB_TABLE, config.getString(DATABASE) + "." + config.getString(TABLE))
.option(USER, config.getString(USER))
.option(PASSWORD, ParserUtils.decode(config.getString(PASSWORD)))
.option(PASSWORD, URLDecoder.decode(config.getString(PASSWORD), UTF_8.name()))
.option(DRIVER, config.getString(DRIVER));

Config jdbcConfig = ConfigUtils.extractSubConfig(config, JDBC + DOTS, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@
import org.apache.dolphinscheduler.data.quality.config.ValidateResult;
import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment;
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchWriter;
import org.apache.dolphinscheduler.data.quality.utils.ParserUtils;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

import lombok.SneakyThrows;

import com.google.common.base.Strings;

/**
Expand Down Expand Up @@ -70,6 +73,7 @@ public void prepare(SparkRuntimeEnvironment prepareEnv) {
}
}

@SneakyThrows
@Override
public void write(Dataset<Row> data, SparkRuntimeEnvironment env) {
if (!Strings.isNullOrEmpty(config.getString(SQL))) {
Expand All @@ -82,7 +86,7 @@ public void write(Dataset<Row> data, SparkRuntimeEnvironment env) {
.option(URL, config.getString(URL))
.option(DB_TABLE, config.getString(DATABASE) + "." + config.getString(TABLE))
.option(USER, config.getString(USER))
.option(PASSWORD, ParserUtils.decode(config.getString(PASSWORD)))
.option(PASSWORD, URLDecoder.decode(config.getString(PASSWORD), StandardCharsets.UTF_8.name()))
.mode(config.getString(SAVE_MODE))
.save();
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@
<artifactId>dolphinscheduler-datasource-all</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-data-quality</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.plugin.task.dq.utils;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_BUSINESS_DATE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_CURRENT_DATE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_DATETIME;
Expand Down Expand Up @@ -62,7 +63,6 @@
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.USER;

import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.data.quality.utils.ParserUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ExecuteSqlType;
Expand All @@ -80,12 +80,15 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import lombok.SneakyThrows;

import com.fasterxml.jackson.databind.node.ArrayNode;

/**
Expand All @@ -102,9 +105,10 @@ private RuleParserUtils() {
private static final String AND_TARGET_FILTER = "AND (${target_filter})";
private static final String WHERE_TARGET_FILTER = "WHERE (${target_filter})";

@SneakyThrows
public static List<BaseConfig> getReaderConfigList(
Map<String, String> inputParameterValue,
DataQualityTaskExecutionContext dataQualityTaskExecutionContext) throws DataQualityException {
DataQualityTaskExecutionContext dataQualityTaskExecutionContext) {

List<BaseConfig> readerConfigList = new ArrayList<>();

Expand All @@ -123,7 +127,7 @@ public static List<BaseConfig> getReaderConfigList(
config.put(URL, DataSourceUtils.getJdbcUrl(DbType.of(dataQualityTaskExecutionContext.getSourceType()),
sourceDataSource));
config.put(USER, sourceDataSource.getUser());
config.put(PASSWORD, ParserUtils.encode(sourceDataSource.getPassword()));
config.put(PASSWORD, URLEncoder.encode(sourceDataSource.getPassword(), UTF_8.name()));
config.put(DRIVER, DataSourceUtils
.getDatasourceDriver(DbType.of(dataQualityTaskExecutionContext.getSourceType())));
String outputTable = inputParameterValue.get(SRC_DATABASE) + "_" + inputParameterValue.get(SRC_TABLE);
Expand All @@ -150,7 +154,7 @@ public static List<BaseConfig> getReaderConfigList(
config.put(URL, DataSourceUtils.getJdbcUrl(DbType.of(dataQualityTaskExecutionContext.getTargetType()),
targetDataSource));
config.put(USER, targetDataSource.getUser());
config.put(PASSWORD, ParserUtils.encode(targetDataSource.getPassword()));
config.put(PASSWORD, URLEncoder.encode(targetDataSource.getPassword(), UTF_8.name()));
config.put(DRIVER, DataSourceUtils
.getDatasourceDriver(DbType.of(dataQualityTaskExecutionContext.getTargetType())));
String outputTable =
Expand Down Expand Up @@ -264,9 +268,10 @@ public static Map<String, String> getInputParameterMapFromEntryList(List<DqRuleI
return defaultInputParameterValue;
}

@SneakyThrows
public static List<BaseConfig> getWriterConfigList(
String sql,
DataQualityTaskExecutionContext dataQualityTaskExecutionContext) throws DataQualityException {
DataQualityTaskExecutionContext dataQualityTaskExecutionContext) {

List<BaseConfig> writerConfigList = new ArrayList<>();
if (StringUtils.isNotEmpty(dataQualityTaskExecutionContext.getWriterConnectorType())) {
Expand All @@ -284,7 +289,7 @@ public static List<BaseConfig> getWriterConfigList(
config.put(URL, DataSourceUtils.getJdbcUrl(DbType.of(dataQualityTaskExecutionContext.getWriterType()),
writerDataSource));
config.put(USER, writerDataSource.getUser());
config.put(PASSWORD, ParserUtils.encode(writerDataSource.getPassword()));
config.put(PASSWORD, URLEncoder.encode(writerDataSource.getPassword(), UTF_8.name()));
config.put(DRIVER, DataSourceUtils
.getDatasourceDriver(DbType.of(dataQualityTaskExecutionContext.getWriterType())));
config.put(SQL, sql);
Expand Down Expand Up @@ -336,8 +341,9 @@ public static List<BaseConfig> getStatisticsValueConfigReaderList(
return readerConfigList;
}

@SneakyThrows
public static BaseConfig getStatisticsValueConfig(
DataQualityTaskExecutionContext dataQualityTaskExecutionContext) throws DataQualityException {
DataQualityTaskExecutionContext dataQualityTaskExecutionContext) {
BaseConfig baseConfig = null;
if (StringUtils.isNotEmpty(dataQualityTaskExecutionContext.getStatisticsValueConnectorType())) {
BaseConnectionParam writerDataSource =
Expand All @@ -354,7 +360,7 @@ public static BaseConfig getStatisticsValueConfig(
config.put(URL, DataSourceUtils.getJdbcUrl(
DbType.of(dataQualityTaskExecutionContext.getStatisticsValueType()), writerDataSource));
config.put(USER, writerDataSource.getUser());
config.put(PASSWORD, ParserUtils.encode(writerDataSource.getPassword()));
config.put(PASSWORD, URLEncoder.encode(writerDataSource.getPassword(), UTF_8.name()));
config.put(DRIVER, DataSourceUtils
.getDatasourceDriver(DbType.of(dataQualityTaskExecutionContext.getWriterType())));
}
Expand Down Expand Up @@ -544,6 +550,7 @@ public static BaseConfig getErrorOutputWriter(Map<String, String> inputParameter

/**
* the unique code use to get the same type and condition task statistics value
*
* @param inputParameterValue
* @return
*/
Expand Down

0 comments on commit 3b1de41

Please sign in to comment.