Friday, 3 June 2016

How to write Spark jobs in Java for Spark Job Server

In the previous post, we learnt about setting up Spark job server, and running the spark jobs. So far, we have used Scala programs to run on job server. Now we’ll see, how to write the Spark jobs in java to run on job server.

As in Scala, job must implement the SparkJob trait.  So the job looks like this:

object SampleJob  extends SparkJob {
    override def runJob(sc:SparkContext, jobConfig: Config): Any = ???
    override def validate(sc:SparkContext, config: Config): SparkJobValidation = ???
}

What these methods are:
  • runJob method contains the implementation of the Job. The SparkContext is managed by the JobServer and will be provided to the job through this method. This relieves the developer from the boiler-plate configuration management that comes with the creation of a Spark job and allows the Job Server to manage and re-use contexts.
  • validate method allows for an initial validation of the context and any provided configuration. If the context and configuration are OK to run the job, returning spark.jobserver.SparkJobValid will let the job execute, otherwise returning spark.jobserver.SparkJobInvalid(reason) prevents the job from running and provides means to convey the reason of failure. In this case, the call immediately returns an HTTP/1.1 400 Bad Request status code.
    validate helps preventing running jobs that will eventually fail due to missing or wrong configuration and save both time and resources.

In Java, we need to extend JavaSparkJob class. It has following methods which will be overridden in the program:
  • runJob(jsc: JavaSparkContext, jobConfig: Config)
  • validate(sc: SparkContext, config: Config)
  • invalidate(jsc: JavaSparkContext, config: Config)

JavaSparkJob class is available in job-server-api package. Build the job-server-api source code and add this jar to your project.  Add spark and other required dependencies in your pom.xml. 

Let’s start with the basic WordCount example:

WordCount.java:

package spark.jobserver;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

import org.apache.commons.lang.StringUtils;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import spark.jobserver.JavaSparkJob;
import spark.jobserver.SparkJobInvalid;
import spark.jobserver.SparkJobValid$;
import spark.jobserver.SparkJobValidation;
import com.typesafe.config.Config;
public class Wordcount extends JavaSparkJob implements Serializable { 
       private static final long serialVersionUID = 1L;
private static final Pattern SPACE = Pattern.compile(" ");
static String fileName = StringUtils.EMPTY;

       public Object runJob(JavaSparkContext jsc, Config config) {
              try {
                     JavaRDD<String> lines = jsc.textFile(
                                  config.getString("input.filename"), 1);
                     JavaRDD<String> words = lines
                                  .flatMap(new FlatMapFunction<String, String>() {
                                         public Iterable<String> call(String s) {
                                                return Arrays.asList(SPACE.split(s));
                                         }
                                  });
                     JavaPairRDD<String, Integer> counts = words.mapToPair(
                                  new PairFunction<String, String, Integer>() {
                                         public Tuple2<String, Integer> call(String s) {
                                                return new Tuple2<String, Integer>(s, 1);
                                         }
                                  }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                           public Integer call(Integer i1, Integer i2) {
                                  return i1 + i2;
                           }
                     });
                     List<Tuple2<String, Integer>> output = counts.collect();
                     System.out.println(output);
                     return output;
              } catch (Exception e) {
                     e.printStackTrace();
                     return null;
              }
       }

       public SparkJobValidation validate(SparkContext sc, Config config) {
              String filename = config.getString("input.filename");
              if (!filename.isEmpty()) {
                     return SparkJobValid$.MODULE$;
              } else {
                     return new SparkJobInvalid(
                                  "Input paramerter is missing. Please mention the filename");
              }
       }

       public String invalidate(JavaSparkContext jsc, Config config) {
              return null;
       }
}

Next step is : compile the code and build the jar. Then upload it to the Job server.

So your Spark job is ready to run on JobServer....!!!  

6 comments:

  1. I am getting error when I copied this example to my project:
    The return types are incompatible for the inherited methods SparkJobBase.validate(Object, JobEnvironment, Config), JavaSparkJob.validate(Object, JobEnvironment, Config)


    ReplyDelete
    Replies
    1. Hey Krish, I'm having the same issue, were able to solve it?

      Delete
  2. @Nishu Tayal, thank you for this example! It's helped me wrap my head around getting this thing going. Would be nice to have a blog post on running StreamingContext as well!

    ReplyDelete
  3. @Nishu Hi i'm doing one development in spark need of some passionated bigdata/spark developer. Let me know if you are interested. Contact : arumugamcovai@gmail.com or whatsapp : +91 9619663272

    ReplyDelete
  4. When i did my technical work for projects.This helpful my project like java jobs in hyderabad


    ReplyDelete
  5. I really appreciate information shared above. It’s of great help. If someone want to learn Online (Virtual) instructor lead live training in Apache Spark TECHNOLOGY , kindly contact us http://www.maxmunus.com/contact
    MaxMunus Offer World Class Virtual Instructor-led training on TECHNOLOGY. We have industry expert trainer. We provide Training Material and Software Support. MaxMunus has successfully conducted 100000+ pieces of training in India, USA, UK, Australia, Switzerland, Qatar, Saudi Arabia, Bangladesh, Bahrain and UAE etc.
    For Demo Contact us.
    Pratik Shekhar
    MaxMunus
    E-mail: pratik@maxmunus.com
    Ph:(0) +91 9066268701
    http://www.maxmunus.com/

    ReplyDelete