Custom Data Transformation function in Kylo

416 views
Skip to first unread message

KADHAMBARI anbalagan

unread,
Apr 15, 2018, 10:43:21 PM4/15/18
to Kylo Community
Is it possible to write a custom data transformation function in Kylo? I have compressed data in the hive table. looking for some ways to perform the decompression on hive.



Greg Hart

unread,
Apr 17, 2018, 1:29:55 PM4/17/18
to Kylo Community
Hi Kadhambari,

Here are the instructions for writing your own data transformation functions:

KADHAMBARI anbalagan

unread,
May 3, 2018, 11:23:24 PM5/3/18
to Kylo Community
We have a unique use case and I would like to know if I will be able to write a custom transformation function for this. We have a sql server table where one of the column is compressed using zlib compression and stored in hexadecimal format. We have written a java code to extract the content from the compressed bytes. Is it possible to use this code in kylo data transformation to convert the compressed data to uncompressed data?  If so, how? 

Java Code for reference:
    byte[] data = hexStringToByteArray("hex string")
                            ByteArrayOutputStream stream2 = new ByteArrayOutputStream();
            Inflater decompresser = new Inflater(true);
            InflaterOutputStream inflaterOutputStream = new InflaterOutputStream(stream2, decompresser);
            inflaterOutputStream.write(data,11,data.length-11);
            inflaterOutputStream.close();
            DirectStream dsv7 = new DirectStream(
            stream2.toByteArray(), stream2.toByteArray().length, oEnvAdapter);


I went through all the sample sparkfunctions  in the github code(link below). But it seems to use the existing functions within spark. Is there any way to write our own code which can be integrated to the transformation engine?


Greg Hart

unread,
May 8, 2018, 12:35:22 PM5/8/18
to Kylo Community
Hi Kadhambari,

Please see these instructions for providing your own transformation functions:

You may need to write it as a Spark UDF so that it can be used with the Spark SQL API.

KADHAMBARI anbalagan

unread,
May 15, 2018, 8:29:48 AM5/15/18
to Kylo Community
Thanks Greg. We are new to spark. We wrote a simple spark udf. as suggested by you(code below). Have added the generated jar file to kylo-ui/lib and kylo-ui/plugin folder

Spark UDF:

import org.apache.spark.sql.api.java.UDF1;

public class LowerCase_UDF implements UDF1<String,String> {

@Override
public String call(String t1) throws Exception {
String output="";
output=t1.toLowerCase();
return output;
}
}

my-spark-function.json

{
  "!name": "my-spark-functions",
  "LowerCaseUDF": {
    "!type": "fn(ut: column) ->column",
    "!doc": "converts to lower case.",
    "!spark": ".call(%c)",
    "!sparkType": "column"
  }
}

Above Json file is added to the kylo-ui/conf folder. Kylo-ui is restarted. Now I am able to see the function name in the data transformation UI. But when i apply the function to a column, it throws the following error. Can you help us resolve this? 


Error:
javax.script.ScriptException: javax.script.ScriptException: error: illegal start of simple expression in <console> at line number 1
at com.thinkbiganalytics.spark.service.TransformService.createShellTask(TransformService.java:466)
at com.thinkbiganalytics.spark.service.TransformService.execute(TransformService.java:208)
at com.thinkbiganalytics.spark.rest.SparkShellTransformController.create(SparkShellTransformController.java:89)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

Greg Hart

unread,
May 15, 2018, 10:40:25 AM5/15/18
to Kylo Community
Hi,

Please ensure that you've registered your UDF according to the Spark documentation:

Then use callUDF to use your UDF:

KADHAMBARI anbalagan

unread,
May 16, 2018, 9:50:31 AM5/16/18
to Kylo Community
I have registered the UDF with the name LCUDF and have modified the json file as below. Still couldnt get it to work. any idea on this? it says "value callUDF not found"

{
  "!name": "my-spark-functions",
  "LCUDF": {
    "!type": "fn(ut: Column) -> Column",
    "!doc": "Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string representing the timestamp of that moment in the current system time zone in the given format.",
    "!spark": "callUDF("LCUDF",%c)",
    "!sparkType": "column"
  }
}

KADHAMBARI anbalagan

unread,
May 17, 2018, 2:35:17 AM5/17/18
to Kylo Community
Registered the UDF and made few modification to the spark json function as below. 

{
  "!name": "my-spark-functions",
  "lowercase": {
    "!type": "fn(udfName: String, col: Column) -> Column",
    "!doc": "converts to lowercase",
    "!spark": "functions.callUDF(%s,%c)",
    "!sparkType": "column"
  }
}

I enter the following expression in the kylo UI to perform transformation.
lowercase("LCUDF",pxcreateopname)

But it throws the following error.
AnalysisException: undefined function LCUDF;

Same function seems to work in spark-shell. what could be the reason for this?

Greg Hart

unread,
May 17, 2018, 12:16:17 PM5/17/18
to Kylo Community
Hi,

Could you please check a few things?

1) Please verify that you've added the jar file with your UDF to Spark. If you do not add it in spark-defaults.conf then it will need to be added to the Kylo configuration and to the Data Transformation template.

2) Please verify that you've registered the UDF in Kylo Spark Shell. Adding it in spark-shell from the command-line is not sufficient. You can need to create a wrapper function that registers the UDF and then returns the value of functions.callUDF(), and use this function is the spark-functions.json file.

KADHAMBARI anbalagan

unread,
May 22, 2018, 11:08:13 AM5/22/18
to Kylo Community
Hi Greg, 

We tried everything you have mentioned with no luck.

1. We have added the jar file to spark

2. We have added it to spark-defaults.conf as follows
spark.executor.extraClassPath /var/tmp/LowerCase.jar
spark.executor.extraLibraryPath /var/tmp/LowerCase.jar

3. We have registered the UDF in kylo spark shell. Registration is done by creating a wrapper function that registers and returns the value of the functions.callUDF()

Registration done using spark submit:
spark-submit --class com.tekclan.spark.udf.LowerCase /var/tmp/LowerCase.jar

Jar file is generated from the below mentioned code:
package com.tekclan.Spark.udf;

import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;

public class LowerCase {
  public static void main(String[] args) {
 
  
    SparkConf conf        = new SparkConf().setAppName("Java UDF Example");
    JavaSparkContext sc   = new JavaSparkContext(conf);
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
   
    System.out.println("Before ");
    
    // Register the UDF with our SQLContext
    sqlContext.udf().register("LCUDF", new UDF1<String, String>() {
      public String call(String t1) {
      
    System.out.println("SPARK UDF CREATED SUCCESSFULLY : " + t1.toLowerCase());
     
        return t1.toLowerCase();
      }
    }, DataTypes.StringType);
    
    System.out.println("After "); 
    
  }

}

4. We have used this function in the spark-functions.json file

  "!name": "my-spark-functions",
  "lowercase": {
    "!type": "fn(udfName: String, col1: column) -> column",
    "!doc": "Converts to lower case",
    "!spark": "functions.callUDF(%s,%c)",
    "!sparkType": "column"
  }
}

5. This function is called in the kylo UI as follows
lowercase("LCUDF",pxcreateopname)

But still we get to see the same old issue. Analysis Exception undefined function LCUDF

What could be the reason for this? we are unable to get it to work.

Greg Hart

unread,
May 22, 2018, 1:46:37 PM5/22/18
to Kylo Community
Hi,

It's not possible to register the UDF with spark-submit and to use it in Kylo. Please try creating a function that both registers the UDF and calls functions.callUDF(). This should be the function used in spark-functions.json.

KADHAMBARI anbalagan

unread,
May 25, 2018, 6:15:47 AM5/25/18
to Kylo Community
  1. As per your suggestion, we wrote a wrapper function as below
package com.tekclan.Spark.udf;

import static org.apache.spark.sql.functions.callUDF;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;

public class LowerCase {

public Column LowerCaseUDF(Column t1) {

SparkConf conf = new SparkConf().setAppName("LowerCase UDF");
JavaSparkContext sc = new JavaSparkContext(conf);
final SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

System.out.println("Before Registering Spark Context");

// Register the UDF with our SQLContext
sqlContext.udf().register("LCUDF", new UDF1<String, String>() {

public String call(String val) {

System.out.println("SPARK UDF CREATED SUCCESSFULLY : "
+ val.toLowerCase());

String outputtext = val.toLowerCase();

return outputtext;
}
}, DataTypes.StringType);

Column column = callUDF("LCUDF", t1);

return column;

}

2. We added this jar to /opt/kylo/kylo-ui/lib and /opt/kylo/kylo-ui/plugin
3. Same jar is added to spark-defaults.conf as follows
spark.executor.extraClassPath /var/tmp/LowerCase.jar
spark.executor.extraLibraryPath /var/tmp/LowerCase.jar

4. my-spark-functions.json is changed to the following


{
  "!name": "my-spark-functions",
  "LowerCase":{
     "!type": "fn(name: Column) -> Column",
     "!doc": "to lower",
     "!spark": "new com.tekclan.spark.udf.LowerCase().LowerCaseUDF(%c)",
     "!sparkType": "column"
  }
}

4. Now if i invoke this function in kylo ui as "LowerCase(pxcreateopname)", i get the following error
error: value LowerCaseUDF is not a member of com.tekclan.spark.udf.LowerCase in <console> at line number 9

Are we making any mistakes here?

KADHAMBARI anbalagan

unread,
May 26, 2018, 3:36:09 AM5/26/18
to Kylo Community
Updates: 

  1. As per your suggestion, we wrote a wrapper function as below
package com.tekclan.Spark.udf;

import static org.apache.spark.sql.functions.callUDF;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;

public class LowerCase {

public Column LowerCaseUDF(Column t1) {

SparkConf conf = new SparkConf().setAppName("LowerCase UDF");
                conf.set("spark.driver.allowMultipleContexts","true");
JavaSparkContext sc = new JavaSparkContext(conf);
final SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

System.out.println("Before Registering Spark Context");

// Register the UDF with our SQLContext
sqlContext.udf().register("LCUDF", new UDF1<String, String>() {

public String call(String val) {

System.out.println("SPARK UDF CREATED SUCCESSFULLY : "
+ val.toLowerCase());

String outputtext = val.toLowerCase();

return outputtext;
}
}, DataTypes.StringType);

Column column = callUDF("LCUDF", t1);

return column;

}

2. We added this jar to /opt/kylo/kylo-ui/lib and /opt/kylo/kylo-ui/plugin
3. Same jar is added to spark-defaults.conf as follows
spark.executor.extraClassPath /var/tmp/LowerCase.jar
spark.executor.extraLibraryPath /var/tmp/LowerCase.jar

4. my-spark-functions.json is changed to the following


{
  "!name": "my-spark-functions",
  "LowerCase":{
     "!type": "fn(name: Column) -> Column",
     "!doc": "to lower",
     "!spark": "com.tekclan.spark.udf.LowerCase.LowerCaseUDF(%c)",
     "!sparkType": "column"
  }
}

4. Now if i invoke this function in kylo ui as "LowerCase(pxcreateopname)", i see that my jar file is getting executed.
It initially said that a context is already present and to ignore this asked me to set spark.driver.allowmultiplecontext to true. After I did that in the code, it executes the callUDF() function but ends up throwing the same error as before
An error occurred AnalysisException: undefined function LCUDF

What could be the reason for this? Infact, we are registering the UDF just before calling it. Any suggestion?

KADHAMBARI anbalagan

unread,
May 26, 2018, 4:49:00 AM5/26/18
to Kylo Community
Finally got it to work Greg. Thank you so much for leading us in the right path. All I was missing was the sqlContext which needs to be passed to this function through the sparkfunctions.json. 

KADHAMBARI anbalagan

unread,
May 30, 2018, 4:10:28 AM5/30/18
to Kylo Community
I am able to create new custom data transformation functions and use it in Visual Query of Kylo. However, when i create a data transformation feed using these custom UDFs, it throws the following error. 

Step Execution Failures:
Failed Step Execute Script
NiFi exceptions:
ExecuteSparkJob[id=73c97bc8-4762-3cbe-d925-7ee43c84b1ed] ExecuteSparkJob for Execute Script and flowfile: StandardFlowFileRecord[uuid=0470f0af-0d8a-4c7e-879e-61d0bdf384a9,claim=,offset=0,name=1888405283310336,size=0] completed with failed status 1

I am assuming the ExecuteSparkjob does not have the reference to the deployed jar files. Is that correct? if so, how do we deploy it for data transformation feeds?

Note: A normal data transformation feed works just fine when it uses the builtin transformation functions.

Greg Hart

unread,
May 31, 2018, 11:31:43 AM5/31/18
to Kylo Community
Hi,

You may need to add the jar to the Execute Script processor in the data-transformation reusable template. Try adding it to the 'Extra JARs' property and to 'Spark Configurations' with 'spark.driver.extraClassPath=/path/to/file.jar'.

KADHAMBARI anbalagan

unread,
May 31, 2018, 11:48:18 AM5/31/18
to Kylo Community
Yes that worked. Thank you

vipulma...@gmail.com

unread,
Feb 4, 2019, 12:48:07 PM2/4/19
to Kylo Community
How did you get it to work? Could you share the solution?
Reply all
Reply to author
Forward
0 new messages