Skip to content

Commit

Permalink
Merge pull request #6 from civitaspo/feature/param_eval
Browse files Browse the repository at this point in the history
Feature/param eval
  • Loading branch information
civitaspo authored Oct 23, 2018
2 parents 3e68d98 + d86c39d commit b7b91db
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 12 deletions.
48 changes: 48 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,47 @@ _export:
+show3:
echo>: "hoge: ${typeof(hoge) == 'undefined' ? 'None' : hoge}, a.b: ${typeof(a) == 'undefined' ? 'None' : typeof(a.b) == 'undefined' ? 'None' : a.b}"

+eval:
_export:
a: aaa
b: bbb
c: ccc
d: ${a}-${b}-${c}

+a:
_export:
e: ${d}
f:
g: ${d}
h:
- ${d}
+b:
echo>: ${d}
+c:
echo>: ${e}
+d:
echo>: ${f.g}
+e:
param_eval>: f.g
+f:
echo>: ${f.g}
+g:
+h:
for_each>: {i: "${f.h}"}
_do:
echo>: ${i}
+i:
echo>: ${f.h}
+j:
param_eval>: f.h
+k:
+l:
for_each>: {i: "${f.h}"}
_do:
echo>: ${i}
+m:
echo>: ${f.h}

```

# Configuration
Expand All @@ -56,6 +97,13 @@ _export:

- **param_reset>**: Param name to reset. (string, required)

## Configuration for `param_eval>` operator

### Options

- **param_eval>**: Param name to eval. (string, required)
- **NOTE**: This operator is a workaround for the issue: [Exported vars are not evaluated recursively in the context of nested params](https://github.com/treasure-data/digdag/issues/862)

# Development

## Run an Example
Expand Down
41 changes: 41 additions & 0 deletions example/example.dig
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,44 @@ _export:
+show3:
echo>: "hoge: ${typeof(hoge) == 'undefined' ? 'None' : hoge}, a.b: ${typeof(a) == 'undefined' ? 'None' : typeof(a.b) == 'undefined' ? 'None' : a.b}"

+eval:
_export:
a: aaa
b: bbb
c: ccc
d: ${a}-${b}-${c}

+a:
_export:
e: ${d}
f:
g: ${d}
h:
- ${d}
+b:
echo>: ${d}
+c:
echo>: ${e}
+d:
echo>: ${f.g}
+e:
param_eval>: f.g
+f:
echo>: ${f.g}
+g:
+h:
for_each>: {i: "${f.h}"}
_do:
echo>: ${i}
+i:
echo>: ${f.h}
+j:
param_eval>: f.h
+k:
+l:
for_each>: {i: "${f.h}"}
_do:
echo>: ${i}
+m:
echo>: ${f.h}

17 changes: 12 additions & 5 deletions src/main/scala/pro/civitaspo/digdag/plugin/param/ParamPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,30 @@ package pro.civitaspo.digdag.plugin.param
import java.lang.reflect.Constructor
import java.util.{Arrays => JArrays, List => JList}

import io.digdag.spi.{Operator, OperatorContext, OperatorFactory, OperatorProvider, Plugin}
import pro.civitaspo.digdag.plugin.param.operator.{AbstractParamOperator, ParamResetOperator, ParamStoreOperator}
import io.digdag.spi.{Operator, OperatorContext, OperatorFactory, OperatorProvider, Plugin, TemplateEngine}
import javax.inject.Inject
import pro.civitaspo.digdag.plugin.param.operator.{AbstractParamOperator, ParamEvalOperator, ParamResetOperator, ParamStoreOperator}

object ParamPlugin {

class AthenaOperatorProvider extends OperatorProvider {

@Inject protected var templateEngine: TemplateEngine = null

override def get(): JList[OperatorFactory] = {
JArrays.asList(operatorFactory("param_reset", classOf[ParamResetOperator]), operatorFactory("param_store", classOf[ParamStoreOperator]))
JArrays.asList(
operatorFactory("param_reset", classOf[ParamResetOperator]),
operatorFactory("param_store", classOf[ParamStoreOperator]),
operatorFactory("param_eval", classOf[ParamEvalOperator])
)
}

private def operatorFactory[T <: AbstractParamOperator](operatorName: String, klass: Class[T]): OperatorFactory = {
new OperatorFactory {
override def getType: String = operatorName
override def newOperator(context: OperatorContext): Operator = {
val constructor: Constructor[T] = klass.getConstructor(classOf[String], classOf[OperatorContext])
constructor.newInstance(operatorName, context)
val constructor: Constructor[T] = klass.getConstructor(classOf[String], classOf[OperatorContext], classOf[TemplateEngine])
constructor.newInstance(operatorName, context, templateEngine)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package pro.civitaspo.digdag.plugin.param.operator

import io.digdag.client.config.Config
import io.digdag.spi.OperatorContext
import io.digdag.client.config.{Config, ConfigFactory}
import io.digdag.spi.{OperatorContext, TemplateEngine}
import io.digdag.util.BaseOperator
import org.slf4j.{Logger, LoggerFactory}

abstract class AbstractParamOperator(operatorName: String, context: OperatorContext) extends BaseOperator(context) {
abstract class AbstractParamOperator(operatorName: String, context: OperatorContext, templateEngine: TemplateEngine) extends BaseOperator(context) {

protected val logger: Logger = LoggerFactory.getLogger(operatorName)
protected val cf: ConfigFactory = request.getConfig.getFactory
protected val params: Config = request.getConfig

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package pro.civitaspo.digdag.plugin.param.operator
import java.nio.charset.StandardCharsets.UTF_8

import com.google.common.collect.ImmutableList
import io.digdag.client.config.{Config, ConfigKey}
import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine}

class ParamEvalOperator(operatorName: String, context: OperatorContext, templateEngine: TemplateEngine)
extends AbstractParamOperator(operatorName, context, templateEngine) {

protected val key: String = params.get("_command", classOf[String])

override def runTask(): TaskResult = {
val evaluated: Config = eval(params)
val paramsToStore: Config = cf.create()

val elems: Seq[String] = key.split("\\.")
val parents: Seq[String] = elems.reverse.tail.reverse
val child: String = elems.last

if (parents.isEmpty) paramsToStore.set(child, evaluated.get(child, classOf[Object]))
else {
val getter = parents.foldLeft(evaluated) { (nested: Config, k: String) =>
nested.getNested(k)
}
val setter = parents.foldLeft(paramsToStore) { (nested: Config, k: String) =>
nested.getNestedOrSetEmpty(k)
}
setter.set(child, getter.get(child, classOf[Object]))
}

val builder = TaskResult.defaultBuilder(cf)
builder.resetStoreParams(ImmutableList.of(ConfigKey.parse(key)))
builder.storeParams(paramsToStore)
builder.build()
}

protected def eval(params: Config): Config = {
val tmpFile: String = workspace.createTempFile("param_eval", ".json")

val writer = workspace.newBufferedWriter(tmpFile, UTF_8)
try writer.write(params.toString)
finally writer.close()

val content = workspace.templateFile(templateEngine, tmpFile, UTF_8, params)
cf.fromJsonString(content)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package pro.civitaspo.digdag.plugin.param.operator

import com.google.common.collect.ImmutableList
import io.digdag.client.config.ConfigKey
import io.digdag.spi.{OperatorContext, TaskResult}
import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine}

class ParamResetOperator(operatorName: String, context: OperatorContext) extends AbstractParamOperator(operatorName, context) {
class ParamResetOperator(operatorName: String, context: OperatorContext, templateEngine: TemplateEngine)
extends AbstractParamOperator(operatorName, context, templateEngine) {

protected val resetKey: String = params.get("_command", classOf[String])

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package pro.civitaspo.digdag.plugin.param.operator

import io.digdag.client.config.Config
import io.digdag.spi.{OperatorContext, TaskResult}
import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine}

class ParamStoreOperator(operatorName: String, context: OperatorContext) extends AbstractParamOperator(operatorName, context) {
class ParamStoreOperator(operatorName: String, context: OperatorContext, templateEngine: TemplateEngine)
extends AbstractParamOperator(operatorName, context, templateEngine) {

protected val newParams: Config = params.get("_command", classOf[Config])

Expand Down

0 comments on commit b7b91db

Please sign in to comment.