Sunday, 24 November 2013

Data flow from source to sink using Cascading 2.2.0 and Hadoop 2.2.0


Apache cascading is a abstraction layer for hadoop mapreduce. Which means I can write complex data processing and data flows using Apache cascading.

cas·cade /kasˈkād/ v. (of water) pour downward rapidly and in large quantities


I am creating a simple example that will stream the data in local Filesystem(called as Source) to destination (called Sink) using a Cascading channel(called Pipe).

[STEP 1] download and tar hadoop 2.2.0
$ wget http://www.eng.lsu.edu/mirrors/apache/hadoop/common/hadoop-2.2.0/hadoop-2.2.0.tar.gz
&& mv hadoop-2.2.0.tar.gz /usr/local
&& cd /usr/local && tar -zxvf hadoop-2.2.0.tar.gz

[STEP 2] configure HADOOP_HOME
ADOOP_HOME=/usr/local/hadoop-2.2.0; export HADOOP_HOME
PATH=$HADOOP_HOME/bin:$PATH; export PATH

[STEP 3] setup gradle and then create a gradle project (part1) with following build.gradle

It includes the cascading dependencies.
repositories {
  mavenLocal()
  mavenCentral()
  mavenRepo name: 'conjars', url: 'http://conjars.org/repo/'
}


ext.cascadingVersion = '2.2.0'
ext.hadoopVersion = '1.1.2'

dependencies {
  compile( group: 'cascading', name: 'cascading-core', version: cascadingVersion )
  compile( group: 'cascading', name: 'cascading-local', version: cascadingVersion )
  compile( group: 'cascading', name: 'cascading-hadoop', version: cascadingVersion )
  providedCompile( group: 'org.apache.hadoop', name: 'hadoop-core', version: hadoopVersion )
}


jar {
  description = "Assembles a Hadoop ready jar file"
  doFirst {
    into( 'lib' ) {
      from configurations.compile
    }
  }

  manifest {
    attributes( "Main-Class": "logstream/DistributedFileCopy" )
  }

}



[STEP 4] create a flow to stream data from local file system source to sink.
package impatient;
import java.util.Properties;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;

public class DistributedFileCopy {

  public static void main( String[] args ){
    String inPath         = args[ 0 ]; //data/application.log
    String outPath        = args[ 1 ];
    Properties properties = new Properties();
    AppProps.setApplicationJarClass( properties, DistributedFileCopy.class );

    HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );

    // 1. create the source tap (spout in Storm)
    Tap inTap  = new Hfs( new TextDelimited( true, "\t" ), inPath );
    // 2. create the sink tap
    Tap outTap = new Hfs( new TextDelimited( true, "\t" ), outPath );

    // 3. specify a pipe to connect the taps
    Pipe channel = new Pipe( "copy" );

    //4. connect the taps, pipes, etc., into a flow (topology in Storm)
    FlowDef flowDef = FlowDef.flowDef()
                             .addSource( channel, inTap )
                             .addTailSink( channel, outTap );
    // 5. run the flow
    flowConnector.connect( flowDef ).complete();
    }

  }



Create an input file(input stream) at data/application.log

doc_id        text

doc01        A rain shadow is a dry area on the lee back side of a mountainous area.

doc02        This sinking, dry air produces a rain shadow, or area in the lee of a mountain with less rain and cloudcover.

doc03        A rain shadow is an area of dry land that lies on the leeward (or downwind) side of a mountain.

doc04        This is known as the rain shadow effect and is the primary cause of leeward deserts of mountain ranges, such as California's Death Valley.

doc05        Two Women. Secrets. A Broken Land. [DVD Australia]

[STEP 5] run app supplying data/application.log as input file
prayag@prayag:/backup/workspace.programming/Impatient/part1$ hadoop jar build/libs/logstream.jar data/application.log output/application
13/10/27 01:20:36 INFO util.HadoopUtil: resolving application jar from found main method on: logstream.DistributedFileCopy
13/10/27 01:20:36 INFO planner.HadoopPlanner: using application jar: /backup/workspace.programming/Impatient/part1/build/libs/logstream.jar
13/10/27 01:20:36 INFO property.AppProps: using app.id: 80EA1575557B4AD6B0677A5C78D7AB73
13/10/27 01:20:37 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
13/10/27 01:20:37 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
13/10/27 01:20:37 INFO mapred.FileInputFormat: Total input paths to process : 1
13/10/27 01:20:37 INFO Configuration.deprecation: mapred.used.genericoptionsparser is deprecated. Instead, use mapreduce.client.genericoptionsparser.used
13/10/27 01:20:37 INFO Configuration.deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
13/10/27 01:20:38 INFO util.Version: Concurrent, Inc - Cascading 2.2.0
13/10/27 01:20:38 INFO flow.Flow: [] starting
13/10/27 01:20:38 INFO flow.Flow: []  source: Hfs["TextDelimited[['doc_id', 'text']]"]["data/application.log"]
13/10/27 01:20:38 INFO flow.Flow: []  sink: Hfs["TextDelimited[['doc_id', 'text']]"]["output/application"]
13/10/27 01:20:38 INFO flow.Flow: []  parallel execution is enabled: false
13/10/27 01:20:38 INFO flow.Flow: []  starting jobs: 1
13/10/27 01:20:38 INFO flow.Flow: []  allocating threads: 1
13/10/27 01:20:38 INFO flow.FlowStep: [] starting step: (1/1) output/application
13/10/27 01:20:38 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
13/10/27 01:20:38 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
13/10/27 01:20:38 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
13/10/27 01:20:38 INFO Configuration.deprecation: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
13/10/27 01:20:38 INFO mapred.FileInputFormat: Total input paths to process : 1
13/10/27 01:20:38 INFO mapreduce.JobSubmitter: number of splits:1
13/10/27 01:20:38 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name
13/10/27 01:20:38 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
13/10/27 01:20:38 INFO Configuration.deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
13/10/27 01:20:38 INFO Configuration.deprecation: mapred.output.key.comparator.class is deprecated. Instead, use mapreduce.job.output.key.comparator.class
13/10/27 01:20:38 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
13/10/27 01:20:38 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
13/10/27 01:20:38 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
13/10/27 01:20:38 INFO Configuration.deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
13/10/27 01:20:38 INFO Configuration.deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
13/10/27 01:20:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local966329123_0001
13/10/27 01:20:38 WARN conf.Configuration: file:/tmp/hadoop-prayag/mapred/staging/prayag966329123/.staging/job_local966329123_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
13/10/27 01:20:38 WARN conf.Configuration: file:/tmp/hadoop-prayag/mapred/staging/prayag966329123/.staging/job_local966329123_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
13/10/27 01:20:38 WARN conf.Configuration: file:/tmp/hadoop-prayag/mapred/local/localRunner/prayag/job_local966329123_0001/job_local966329123_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
13/10/27 01:20:38 WARN conf.Configuration: file:/tmp/hadoop-prayag/mapred/local/localRunner/prayag/job_local966329123_0001/job_local966329123_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.

13/10/27 01:20:38 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
13/10/27 01:20:38 INFO mapred.LocalJobRunner: OutputCommitter set in config null
13/10/27 01:20:38 INFO flow.FlowStep: [] submitted hadoop job: job_local966329123_0001
13/10/27 01:20:38 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
13/10/27 01:20:39 INFO mapred.LocalJobRunner: Waiting for map tasks
13/10/27 01:20:39 INFO mapred.LocalJobRunner: Starting task: attempt_local966329123_0001_m_000000_0
13/10/27 01:20:39 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
13/10/27 01:20:39 INFO io.MultiInputSplit: current split input path: file:/backup/workspace.programming/Impatient/part1/data/application.log
13/10/27 01:20:39 INFO mapred.MapTask: Processing split: cascading.tap.hadoop.io.MultiInputSplit@e8848c
13/10/27 01:20:39 INFO mapred.MapTask: numReduceTasks: 0
13/10/27 01:20:39 INFO hadoop.FlowMapper: cascading version: 2.2.0
13/10/27 01:20:39 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m
13/10/27 01:20:39 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
13/10/27 01:20:39 INFO hadoop.FlowMapper: sourcing from: Hfs["TextDelimited[['doc_id', 'text']]"]["data/application.log"]
13/10/27 01:20:39 INFO hadoop.FlowMapper: sinking to: Hfs["TextDelimited[['doc_id', 'text']]"]["output/application"]
13/10/27 01:20:39 INFO mapred.LocalJobRunner:
13/10/27 01:20:39 INFO mapred.Task: Task:attempt_local966329123_0001_m_000000_0 is done. And is in the process of committing
13/10/27 01:20:39 INFO mapred.LocalJobRunner:
13/10/27 01:20:39 INFO mapred.Task: Task attempt_local966329123_0001_m_000000_0 is allowed to commit now
13/10/27 01:20:39 INFO output.FileOutputCommitter: Saved output of task 'attempt_local966329123_0001_m_000000_0' to file:/backup/workspace.programming/Impatient/part1/output/rain/_temporary/0/task_local966329123_0001_m_000000
13/10/27 01:20:39 INFO mapred.LocalJobRunner: file:/backup/workspace.programming/Impatient/part1/data/application.log:0+510
13/10/27 01:20:39 INFO mapred.Task: Task 'attempt_local966329123_0001_m_000000_0' done.
13/10/27 01:20:39 INFO mapred.LocalJobRunner: Finishing task: attempt_local966329123_0001_m_000000_0
13/10/27 01:20:39 INFO mapred.LocalJobRunner: Map task executor complete.
13/10/27 01:20:44 INFO util.Hadoop18TapUtil: deleting temp path output/application/_temporary



[STEP 6] Verify Output data 
prayag@prayag:/backup/workspace.programming/Impatient/part1$ more output/application/part-00000

doc_id text

doc01 A rain shadow is a dry area on the lee back side of a mountainous area.

doc02 This sinking, dry air produces a rain shadow, or area in the lee of a mountain with less rain and cloudcover.

doc03 A rain shadow is an area of dry land that lies on the leeward (or downwind) side of a mountain.

doc04 This is known as the rain shadow effect and is the primary cause of leeward deserts of mountain ranges, such as California's Death Valley.

doc05 Two Women. Secrets. A Broken Land. [DVD Australia]



References
http://docs.cascading.org/impatient/impatient1.html
https://github.com/Cascading/Impatient/tree/master/part1

Intro to cascading=flows

No comments:

Post a Comment