

Develop SQL MR function using AsterData Developer Express, part 2: develop SQL-MR function
Create SQL-Map-Reduce function using wizard
Aster Developer Express has built-in wizard which helps to create Java class for SQL-MR function.
Right click on the project, select "new", "Other"
Then select the right wizard
Type in desired package and Class name
Define input schema (a list of columns and column types which our function can recognize). You can just type it in or use previously set nCluster connection. You can see how column from staging table table has been selected.
Now it's time to define output columns. We do use wizard for defining input and output columns. This wizard will generate Java code. You can easilly skip input and ouput columns step and write code on your own.
Now let's specify arguments for our function. Our SQL-MR fuction will accept two arguments: lists of EXCLUDES and INCLUDES. We can filter data from staging table using these arguments. For example: EXCLUDES('pdf') will exclude all Adobe Acrobat documents. INCLUDES ('html', 'htm') will leave only requests to HTML documents in resulting set.
You can skip this step and write Java code later. It's not complicated.
Create junit test
We need to write JUNIT tests for our methods in SQL-MR function. You can also create some kind of mock test for the entire SQL-MR function.
Create Test class
Write code
We are not going to invent the wheel, so I took part of parsing code from here: http://javatechworld.blogspot.com/2011/08/apache-http-access-log-parser.html
The main things are:
//We have to implement RowFunction interface. public final class ApacheWebLogParserFunction implements RowFunction { /***/ }
//Here are the fields of class. /** If specified, rows with listed substrings will be excluded. */ private static final String ARG_CLAUSE_EXCLUDES = "EXCLUDES"; /** If specified, only rows with listed substrings will included into result set. */ private static final String ARG_CLAUSE_INCLUDES = "INCLUDES"; // // These member variables will be populated with the values // of the argument clauses passed to the SQL-MR function. // private List<String> includesArgument = new ArrayList<String>(); private List<String> excludesArgument = new ArrayList<String>();
//Constructor of our class //You can see code that defines input and output coumns. //As I said before you can easilly modify this code. // // The constructor establishes the RuntimeContract between // the SQL-MR function and nCluster. During query planning, // the function will constructed on a single node. During // query execution, it will be constructed and run on // one or more nodes. // public ApacheWebLogParserFunction(RuntimeContract contract) { // // Verify that the function accepts the given input schema. // List<SqlType> expectedInputTypes = new ArrayList<SqlType>(); expectedInputTypes.add(SqlType.getType("character varying")); List<SqlType> actualInputTypes = new ArrayList<SqlType>(); for (ColumnDefinition d : contract.getInputInfo().getColumns()){ actualInputTypes.add(d.getColumnType()); } if (!actualInputTypes.equals(expectedInputTypes)) { throw new IllegalUsageException( "Expected input types: (character varying)"); } // // Construct the output schema. // List<ColumnDefinition> outputColumns = new ArrayList<ColumnDefinition>(); outputColumns.add(new ColumnDefinition("CLIENT", SqlType.getType("character varying"))); outputColumns.add(new ColumnDefinition("REMOTE_USER", SqlType.getType("character varying"))); outputColumns.add(new ColumnDefinition("REQUEST_TS", SqlType.getType("timestamp with time zone"))); outputColumns.add(new ColumnDefinition("REQUEST_VERB", SqlType.getType("character varying"))); outputColumns.add(new ColumnDefinition("REQUEST", SqlType.getType("character varying"))); outputColumns.add(new ColumnDefinition("PROTOCOL", SqlType.getType("character varying"))); outputColumns.add(new ColumnDefinition("RESPONSE_CODE", SqlType.getType("integer"))); outputColumns.add(new ColumnDefinition("CONTENT_LENGTH", SqlType.getType("integer"))); outputColumns.add(new ColumnDefinition("USER_AGENT", SqlType.getType("character varying"))); // // Read argument clauses into appropriate member variables. // ArgumentClause tmpAc; String tmpValue; // // Read argument clause 'INCLUDES' // if (contract.hasArgumentClause(ARG_CLAUSE_INCLUDES)) { tmpAc = contract.useArgumentClause(ARG_CLAUSE_INCLUDES); for (int i = 0; i < tmpAc.getValueCount(); i++) { tmpValue = tmpAc.getValues().get(i); includesArgument.add(tmpValue); } } // // Read argument clause 'EXCLUDES' // if (contract.hasArgumentClause(ARG_CLAUSE_EXCLUDES)) { tmpAc = contract.useArgumentClause(ARG_CLAUSE_EXCLUDES); for (int i = 0; i < tmpAc.getValueCount(); i++) { tmpValue = tmpAc.getValues().get(i); excludesArgument.add(tmpValue); } } if(!includesArgument.isEmpty() && !excludesArgument.isEmpty()){ throw new ClientVisibleException("You can use "+ARG_CLAUSE_EXCLUDES+" current values are:["+excludesArgument+"] or "+ ARG_CLAUSE_INCLUDES+" current values are:["+includesArgument+"] at one time. You can't use them together."); } System.out.println("includes["+includesArgument+"] excludes["+excludesArgument+"]"); // // Add any other function-specific initialization of member variables // or other checks here. Throw a ClientVisibleException to signal // an error to the client. // // // Complete the contract // contract.setOutputInfo(new OutputInfo(outputColumns)); contract.complete(); }
/** * This method does processing of input rows. We did select "ROW FUNCTION", so * each row will be processed one-by-one. Set of rows can be accessed via {@link RowIterator} * @param inputIterator gives access to input set of rows. * @param outputEmitter collects processed rows. * */ public void operateOnSomeRows(RowIterator inputIterator, RowEmitter outputEmitter) { String logLine = null; List<String> values = null; try{ while (inputIterator.advanceToNextRow()) { logLine = inputIterator.getStringAt(0); values = splitLogLine(logLine); if(!values.isEmpty() && checkArgClauseIncludeExclude(values.get(RESOURCE_COL_NUM))){ outputEmitter.addString(values.get(0)); //CLIENT outputEmitter.addString(values.get(1)); //REMOTE_USER Timestamp ts = Timestamp.fromSqlTimestamp(SqlType.timestamp(), toSqlTimestamp(toDate(values.get(2)))); outputEmitter.addTimestamp(ts); //REQUEST_TS outputEmitter.addString(values.get(3)); //REQUEST_VERB outputEmitter.addString(values.get(4)); //REQUEST outputEmitter.addString(values.get(5)); //PROTOCOL outputEmitter.addInt(Integer.valueOf(values.get(6))); //RESPONSE_CODE // // Server can return code 304, it means client should get resource from cache, // because resource hasn't been modified. Content length won't be specified in access log. // if(values.get(7).matches("\\d+")){ outputEmitter.addInt(Integer.valueOf(values.get(7))); //CONTENT_LENGTH }else{ outputEmitter.addInt(0); } outputEmitter.addString(values.get(8)); //USER_AGENT outputEmitter.emitRow(); } } }catch (Exception e) { throw new ClientVisibleException("Error while parsing log line["+logLine+"], parsed line["+values+"]"); } }
I did show you the key points:
constructor which accepts RuntimeContract and defines input/output columns, and a method that parses input log line and produces row for result set. As you can see the solution is rather simple and easy to understand. Feel free to download the code and modify it. The part is not interesting and I won't cover it here. Please leave you questions in comments, I'll answer you.
Deploy the function
Rather easy and quick process. I suggest you to write ant script for compilation, package and delpoyment procedures.
Now let's export JAR file.
Specify export package. Exclude everything excluding our classes.
Copy ApacheWebLogParserFunction.jar to /home/aster/demo/ext-function on queen server.
Create install.sql file with text:
-- installApacheWebLogParserFunction \remove ApacheWebLogParserFunction.jar \install ApacheWebLogParserFunction.jar;
Be sure that install.sql and ApacheWebLogParserFunction.jar do have appropritate rights. I suggest you to set 777.
run install.sql
aster@linux-qvsn:~> cd /home/aster/demo/ext-function aster@linux-qvsn:~/demo/ext-function> ls -la total 16 drwxr-xr-x 2 root root 4096 2012-09-25 11:30 . drwxr-xr-x 7 aster users 4096 2012-09-25 11:29 .. -rwxrwxrwx 1 aster root 182 2012-09-25 11:00 ApacheWebLogParserFunction.jar -rwxrwxrwx 1 aster root 77 2012-09-25 11:30 install.sql aster@linux-qvsn:~/demo/ext-function> act Password: Welcome to act 4.6.2, the Aster nCluster Terminal. Type: \copyright for distribution terms \h for help with SQL commands \? for help with act commands \g or terminate with semicolon to execute query \q to quit beehive=> \i install.sql beehive=> \dF List of installed files schemaname | filename | fileowner | uploadtime ------------+--------------------------------+-----------+---------------------------- public | ApacheWebLogParserFunction.jar | beehive | 2012-09-25 11:35:49.938061
It's time to run our function via Datasource explorer
Oups.. We got an exception! No problem, just modify the code and repeat deployment step.
Also you can inspect logs here using AMC queen console
That'a all! We've just finished writing our first SQL-MR function. Download the code, import it and try to run. Ask your questions in comments, I'll do my best to help you.
Hello,
Can you please share the complete Java Code?
Thanks
Abdul
when i run query - select * from ApacheWebLogParserFunction ( on ....) ..
i get correct output
but when i run =- select CLIENT,REMOTE_USER from ApacheWebLogParserFunction ( on ....) ..
i get error : CLIENT is not exist???
Not sure if you got the query to work but you have two ways to do it. Aster likes things in lower case so if you do select client that will work...also you can quote the column names if you want to use exact case so select "CLIENT" will work also.