Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BitSail] support load hive conf from specified location #390

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class HiveGeneralRowBuilder implements RowBuilder<Writable> {
private final Map<String, String> hiveProperties;
private final String db;
private final String table;
private final String hiveConfLocation;
private final Map<String, Integer> columnMapping;
private final HiveShim hiveShim;

Expand All @@ -84,11 +85,13 @@ public class HiveGeneralRowBuilder implements RowBuilder<Writable> {
public HiveGeneralRowBuilder(Map<String, Integer> columnMapping,
String database,
String table,
String hiveConfLocation,
Map<String, String> hiveProperties) {
this.columnMapping = columnMapping;
// Hive table related
this.db = database;
this.table = table;
this.hiveConfLocation = hiveConfLocation;
this.hiveProperties = hiveProperties;
this.hiveShim = HiveShimLoader.loadHiveShim();
}
Expand All @@ -98,6 +101,7 @@ public HiveGeneralRowBuilder() {
this.columnMapping = null;
this.db = null;
this.table = null;
this.hiveConfLocation = null;
this.hiveProperties = null;
this.hiveShim = null;
}
Expand All @@ -109,7 +113,12 @@ public void build(Writable objectValue, Row reuse, String mandatoryEncoding, Row
if (deserializer == null) {
try {
HiveMetaClientUtil.init();
HiveConf hiveConf = HiveMetaClientUtil.getHiveConf(hiveProperties);
HiveConf hiveConf;
if (hiveConfLocation != null) {
hiveConf = HiveMetaClientUtil.getHiveConf(hiveConfLocation);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can't get hive site file in here when we run job in yarn or k8s environment.

} else {
hiveConf = HiveMetaClientUtil.getHiveConf(hiveProperties);
}
StorageDescriptor storageDescriptor = HiveMetaClientUtil.getTableFormat(hiveConf, db, table);
deserializer = (Deserializer) Class.forName(storageDescriptor.getSerdeInfo().getSerializationLib()).newInstance();
Configuration conf = new Configuration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,37 +26,52 @@
import com.bytedance.bitsail.connector.legacy.hive.option.HiveWriterOptions;
import com.bytedance.bitsail.connector.legacy.hive.util.HiveConfUtils;

import org.apache.hadoop.hive.conf.HiveConf;

public class HiveTableCatalogFactory implements TableCatalogFactory {

@Override
public TableCatalog createTableCatalog(BuilderGroup builderGroup,
ExecutionEnviron executionEnviron,
BitSailConfiguration connectorConfiguration) {

if (BuilderGroup.READER.equals(builderGroup)) {
String database = connectorConfiguration
.getNecessaryOption(HiveReaderOptions.DB_NAME, FrameworkErrorCode.REQUIRED_VALUE);
String table = connectorConfiguration
.getNecessaryOption(HiveReaderOptions.TABLE_NAME, FrameworkErrorCode.REQUIRED_VALUE);
HiveConf hiveConf;
if (connectorConfiguration.fieldExists(HiveReaderOptions.HIVE_CONF_LOCATION)) {
hiveConf = HiveConfUtils.fromHiveConfPath(connectorConfiguration.get(HiveReaderOptions.HIVE_CONF_LOCATION));
} else {
hiveConf = HiveConfUtils.fromJsonProperties(
connectorConfiguration.get(HiveReaderOptions.HIVE_METASTORE_PROPERTIES));
}
return HiveTableCatalog
.builder()
.database(database)
.table(table)
.namespace(null)
.hiveConf(HiveConfUtils.fromJsonProperties(
connectorConfiguration.get(HiveReaderOptions.HIVE_METASTORE_PROPERTIES)))
.hiveConf(hiveConf)
.build();
} else {
String database = connectorConfiguration
.getNecessaryOption(HiveWriterOptions.DB_NAME, FrameworkErrorCode.REQUIRED_VALUE);
String table = connectorConfiguration
.getNecessaryOption(HiveWriterOptions.TABLE_NAME, FrameworkErrorCode.REQUIRED_VALUE);
HiveConf hiveConf;
if (connectorConfiguration.fieldExists(HiveWriterOptions.HIVE_CONF_LOCATION)) {
hiveConf = HiveConfUtils.fromHiveConfPath(connectorConfiguration.get(HiveWriterOptions.HIVE_CONF_LOCATION));
} else {
hiveConf = HiveConfUtils.fromJsonProperties(
connectorConfiguration.get(HiveWriterOptions.HIVE_METASTORE_PROPERTIES));
}
return HiveTableCatalog
.builder()
.database(database)
.table(table)
.namespace(null)
.hiveConf(HiveConfUtils.fromJsonProperties(
connectorConfiguration.get(HiveWriterOptions.HIVE_METASTORE_PROPERTIES)))
.hiveConf(hiveConf)
.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import static com.bytedance.bitsail.common.option.ConfigOptions.key;
import static com.bytedance.bitsail.common.option.ReaderOptions.READER_PREFIX;
import static com.bytedance.bitsail.common.option.WriterOptions.WRITER_PREFIX;

/**
* Created 2022/8/16
Expand All @@ -34,4 +35,8 @@ public interface HiveReaderOptions extends ReaderOptions.BaseReaderOptions {
ConfigOption<String> HIVE_METASTORE_PROPERTIES =
key(READER_PREFIX + "metastore_properties")
.noDefaultValue(String.class);

ConfigOption<String> HIVE_CONF_LOCATION =
key(WRITER_PREFIX + "hive_conf_location")
.noDefaultValue(String.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,8 @@ public interface HiveWriterOptions extends WriterOptions.BaseWriterOptions {
ConfigOption<String> HIVE_METASTORE_PROPERTIES =
key(WRITER_PREFIX + "metastore_properties")
.noDefaultValue(String.class);

ConfigOption<String> HIVE_CONF_LOCATION =
key(WRITER_PREFIX + "hive_conf_location")
.noDefaultValue(String.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,13 @@ private void tryDeleteFilePath(Path filePath) throws IOException {
}

public HiveConf getHiveConf() {
Map<String, String> hiveProperties =
JsonSerializer.parseToMap(outputSliceConfig.getNecessaryOption(HiveWriterOptions.HIVE_METASTORE_PROPERTIES, CommonErrorCode.CONFIG_ERROR));
return HiveMetaClientUtil.getHiveConf(hiveProperties);
if (outputSliceConfig.fieldExists(HiveWriterOptions.HIVE_CONF_LOCATION)) {
return HiveMetaClientUtil.getHiveConf(outputSliceConfig.get(HiveWriterOptions.HIVE_CONF_LOCATION));
} else {
Map<String, String> hiveProperties =
JsonSerializer.parseToMap(outputSliceConfig.getNecessaryOption(HiveWriterOptions.HIVE_METASTORE_PROPERTIES, CommonErrorCode.CONFIG_ERROR));
return HiveMetaClientUtil.getHiveConf(hiveProperties);
}
}

public JobConf initHadoopJobConf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void initPlugin() throws Exception {
this.rowBuilder = new HiveGeneralRowBuilder(getMappingFromMetastore(hiveConf, inputSliceConfig),
db,
table,
inputSliceConfig.get(HiveReaderOptions.HIVE_CONF_LOCATION),
JsonSerializer.parseToMap(inputSliceConfig.get(HiveReaderOptions.HIVE_METASTORE_PROPERTIES)));

setMapredInputJobConf();
Expand Down Expand Up @@ -157,8 +158,12 @@ private InputFormat<Void, ArrayWritable> getMapredInputFormat(BitSailConfigurati
}

private HiveConf getHiveConf(BitSailConfiguration readerConfiguration) {
Map<String, String> hiveProperties =
JsonSerializer.parseToMap(readerConfiguration.getNecessaryOption(HiveReaderOptions.HIVE_METASTORE_PROPERTIES, CommonErrorCode.CONFIG_ERROR));
return HiveMetaClientUtil.getHiveConf(hiveProperties);
if (readerConfiguration.fieldExists(HiveReaderOptions.HIVE_CONF_LOCATION)) {
return HiveMetaClientUtil.getHiveConf(readerConfiguration.get(HiveReaderOptions.HIVE_CONF_LOCATION));
} else {
Map<String, String> hiveProperties =
JsonSerializer.parseToMap(readerConfiguration.getNecessaryOption(HiveReaderOptions.HIVE_METASTORE_PROPERTIES, CommonErrorCode.CONFIG_ERROR));
return HiveMetaClientUtil.getHiveConf(hiveProperties);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static HiveConf fromJsonProperties(String jsonProperties) {
return HiveMetaClientUtil.getHiveConf(hiveProperties);
}

public HiveConf fromHiveConfPath(String location) {
public static HiveConf fromHiveConfPath(String location) {
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(new Path(location));
return hiveConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,8 @@ public interface FileSystemSinkOptions extends WriterOptions.BaseWriterOptions {
ConfigOption<String> HIVE_METASTORE_PROPERTIES =
key(WRITER_PREFIX + "metastore_properties")
.noDefaultValue(String.class);

ConfigOption<String> HIVE_CONF_LOCATION =
key(WRITER_PREFIX + "hive_conf_location")
.noDefaultValue(String.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ public static HiveTableMetaStoreFactory getHiveMetaFromTable(BitSailConfiguratio
String dbName = jobConf.get(FileSystemSinkOptions.DB_NAME);
String tableName = jobConf.get(FileSystemSinkOptions.TABLE_NAME);
String metaStoreProperties = jobConf.get(FileSystemSinkOptions.HIVE_METASTORE_PROPERTIES);
String hiveConfLocation = jobConf.get(FileSystemSinkOptions.HIVE_CONF_LOCATION);

return new HiveTableMetaStoreFactory(dbName, tableName, metaStoreProperties);
return new HiveTableMetaStoreFactory(dbName, tableName, metaStoreProperties, hiveConfLocation);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,20 @@ public class HiveTableMetaStoreFactory implements TableMetaStoreFactory {
private final String database;
private final String tableName;
private final Map<String, String> metaStoreProperties;

private final String hiveConfLocation;
private final HiveShim hiveShim;

public HiveTableMetaStoreFactory(
String database,
String tableName,
String metaStoreProperties) {
String metaStoreProperties,
String hiveConfLocation) {
this.database = database;
this.tableName = tableName;
this.metaStoreProperties = JsonSerializer.parseToMap(metaStoreProperties);
LOG.info("Meta store properties: {}.", metaStoreProperties);
this.hiveConfLocation = hiveConfLocation;
this.hiveShim = HiveMetaClientUtil.getHiveShim();
}

Expand All @@ -96,7 +100,11 @@ public static HiveMeta createHiveMeta(HiveTableMetaStoreFactory factory) {

@Override
public HiveTableMetaStore createTableMetaStore() {
return new HiveTableMetaStore(metaStoreProperties);
if (hiveConfLocation != null) {
return new HiveTableMetaStore(hiveConfLocation);
} else {
return new HiveTableMetaStore(metaStoreProperties);
}
}

public class HiveTableMetaStore implements TableMetaStoreFactory.TableMetaStore {
Expand All @@ -107,6 +115,10 @@ private HiveTableMetaStore(Map<String, String> metaStoreProperties) {
hiveConf = HiveMetaClientUtil.getHiveConf(metaStoreProperties);
}

private HiveTableMetaStore(String location) {
hiveConf = HiveMetaClientUtil.getHiveConf(location);
}

private IMetaStoreClient getMetastoreClient() throws Exception {
if (client == null) {
client = initMetastoreClientWrapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ private void addSinkConf(BitSailConfiguration jobConf) {
return;
}
if (HIVE_FORMAT_TYPE_VALUE.equalsIgnoreCase(formatType)) {
//TODO: whether to load from hive conf location?
Preconditions.checkState(jobConf.fieldExists(FileSystemSinkOptions.HIVE_METASTORE_PROPERTIES));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hk-lrzy not quite sure about this check, is this config a must have option here?

}
}
Expand Down