Apache cascading is a abstraction layer for hadoop mapreduce. Which means I can write complex data processing and data flows using Apache cascading.
cas·cade /kasˈkād/ v. (of water) pour downward rapidly and in large quantities
I am creating a simple example that will stream the data in local Filesystem(called as Source) to destination (called Sink) using a Cascading channel(called Pipe).
[STEP 1] download and tar hadoop 2.2.0
$ wget http://www.eng.lsu.edu/mirrors/apache/hadoop/common/hadoop-2.2.0/hadoop-2.2.0.tar.gz && mv hadoop-2.2.0.tar.gz /usr/local && cd /usr/local && tar -zxvf hadoop-2.2.0.tar.gz
[STEP 2] configure HADOOP_HOME
ADOOP_HOME=/usr/local/hadoop-2.2.0; export HADOOP_HOME PATH=$HADOOP_HOME/bin:$PATH; export PATH
[STEP 3] setup gradle and then create a gradle project (part1) with following build.gradle
It includes the cascading dependencies.
repositories { mavenLocal() mavenCentral() mavenRepo name: 'conjars', url: 'http://conjars.org/repo/' } ext.cascadingVersion = '2.2.0' ext.hadoopVersion = '1.1.2' dependencies { compile( group: 'cascading', name: 'cascading-core', version: cascadingVersion ) compile( group: 'cascading', name: 'cascading-local', version: cascadingVersion ) compile( group: 'cascading', name: 'cascading-hadoop', version: cascadingVersion ) providedCompile( group: 'org.apache.hadoop', name: 'hadoop-core', version: hadoopVersion ) } jar { description = "Assembles a Hadoop ready jar file" doFirst { into( 'lib' ) { from configurations.compile } } manifest { attributes( "Main-Class": "logstream/DistributedFileCopy" ) } }
[STEP 4] create a flow to stream data from local file system source to sink.
package impatient; import java.util.Properties; import cascading.flow.Flow; import cascading.flow.FlowDef; import cascading.flow.hadoop.HadoopFlowConnector; import cascading.pipe.Pipe; import cascading.property.AppProps; import cascading.scheme.hadoop.TextDelimited; import cascading.tap.Tap; import cascading.tap.hadoop.Hfs; import cascading.tuple.Fields; public class DistributedFileCopy { public static void main( String[] args ){ String inPath = args[ 0 ]; //data/application.log String outPath = args[ 1 ]; Properties properties = new Properties(); AppProps.setApplicationJarClass( properties, DistributedFileCopy.class ); HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties ); // 1. create the source tap (spout in Storm) Tap inTap = new Hfs( new TextDelimited( true, "\t" ), inPath ); // 2. create the sink tap Tap outTap = new Hfs( new TextDelimited( true, "\t" ), outPath ); // 3. specify a pipe to connect the taps Pipe channel = new Pipe( "copy" ); //4. connect the taps, pipes, etc., into a flow (topology in Storm) FlowDef flowDef = FlowDef.flowDef() .addSource( channel, inTap ) .addTailSink( channel, outTap ); // 5. run the flow flowConnector.connect( flowDef ).complete(); } }
Create an input file(input stream) at data/application.log
doc_id text doc01 A rain shadow is a dry area on the lee back side of a mountainous area. doc02 This sinking, dry air produces a rain shadow, or area in the lee of a mountain with less rain and cloudcover. doc03 A rain shadow is an area of dry land that lies on the leeward (or downwind) side of a mountain. doc04 This is known as the rain shadow effect and is the primary cause of leeward deserts of mountain ranges, such as California's Death Valley. doc05 Two Women. Secrets. A Broken Land. [DVD Australia]
[STEP 5] run app supplying data/application.log as input file
prayag@prayag:/backup/workspace.programming/Impatient/part1$ hadoop jar build/libs/logstream.jar data/application.log output/application 13/10/27 01:20:36 INFO util.HadoopUtil: resolving application jar from found main method on: logstream.DistributedFileCopy 13/10/27 01:20:36 INFO planner.HadoopPlanner: using application jar: /backup/workspace.programming/Impatient/part1/build/libs/logstream.jar 13/10/27 01:20:36 INFO property.AppProps: using app.id: 80EA1575557B4AD6B0677A5C78D7AB73 13/10/27 01:20:37 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar 13/10/27 01:20:37 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address 13/10/27 01:20:37 INFO mapred.FileInputFormat: Total input paths to process : 1 13/10/27 01:20:37 INFO Configuration.deprecation: mapred.used.genericoptionsparser is deprecated. Instead, use mapreduce.client.genericoptionsparser.used 13/10/27 01:20:37 INFO Configuration.deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress 13/10/27 01:20:38 INFO util.Version: Concurrent, Inc - Cascading 2.2.0 13/10/27 01:20:38 INFO flow.Flow: [] starting 13/10/27 01:20:38 INFO flow.Flow: [] source: Hfs["TextDelimited[['doc_id', 'text']]"]["data/application.log"] 13/10/27 01:20:38 INFO flow.Flow: [] sink: Hfs["TextDelimited[['doc_id', 'text']]"]["output/application"] 13/10/27 01:20:38 INFO flow.Flow: [] parallel execution is enabled: false 13/10/27 01:20:38 INFO flow.Flow: [] starting jobs: 1 13/10/27 01:20:38 INFO flow.Flow: [] allocating threads: 1 13/10/27 01:20:38 INFO flow.FlowStep: [] starting step: (1/1) output/application 13/10/27 01:20:38 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id 13/10/27 01:20:38 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 13/10/27 01:20:38 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized 13/10/27 01:20:38 INFO Configuration.deprecation: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir 13/10/27 01:20:38 INFO mapred.FileInputFormat: Total input paths to process : 1 13/10/27 01:20:38 INFO mapreduce.JobSubmitter: number of splits:1 13/10/27 01:20:38 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name 13/10/27 01:20:38 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces 13/10/27 01:20:38 INFO Configuration.deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class 13/10/27 01:20:38 INFO Configuration.deprecation: mapred.output.key.comparator.class is deprecated. Instead, use mapreduce.job.output.key.comparator.class 13/10/27 01:20:38 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name 13/10/27 01:20:38 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir 13/10/27 01:20:38 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps 13/10/27 01:20:38 INFO Configuration.deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class 13/10/27 01:20:38 INFO Configuration.deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir 13/10/27 01:20:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local966329123_0001 13/10/27 01:20:38 WARN conf.Configuration: file:/tmp/hadoop-prayag/mapred/staging/prayag966329123/.staging/job_local966329123_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring. 13/10/27 01:20:38 WARN conf.Configuration: file:/tmp/hadoop-prayag/mapred/staging/prayag966329123/.staging/job_local966329123_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring. 13/10/27 01:20:38 WARN conf.Configuration: file:/tmp/hadoop-prayag/mapred/local/localRunner/prayag/job_local966329123_0001/job_local966329123_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring. 13/10/27 01:20:38 WARN conf.Configuration: file:/tmp/hadoop-prayag/mapred/local/localRunner/prayag/job_local966329123_0001/job_local966329123_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring. 13/10/27 01:20:38 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 13/10/27 01:20:38 INFO mapred.LocalJobRunner: OutputCommitter set in config null 13/10/27 01:20:38 INFO flow.FlowStep: [] submitted hadoop job: job_local966329123_0001 13/10/27 01:20:38 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter 13/10/27 01:20:39 INFO mapred.LocalJobRunner: Waiting for map tasks 13/10/27 01:20:39 INFO mapred.LocalJobRunner: Starting task: attempt_local966329123_0001_m_000000_0 13/10/27 01:20:39 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 13/10/27 01:20:39 INFO io.MultiInputSplit: current split input path: file:/backup/workspace.programming/Impatient/part1/data/application.log 13/10/27 01:20:39 INFO mapred.MapTask: Processing split: cascading.tap.hadoop.io.MultiInputSplit@e8848c 13/10/27 01:20:39 INFO mapred.MapTask: numReduceTasks: 0 13/10/27 01:20:39 INFO hadoop.FlowMapper: cascading version: 2.2.0 13/10/27 01:20:39 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m 13/10/27 01:20:39 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 13/10/27 01:20:39 INFO hadoop.FlowMapper: sourcing from: Hfs["TextDelimited[['doc_id', 'text']]"]["data/application.log"] 13/10/27 01:20:39 INFO hadoop.FlowMapper: sinking to: Hfs["TextDelimited[['doc_id', 'text']]"]["output/application"] 13/10/27 01:20:39 INFO mapred.LocalJobRunner: 13/10/27 01:20:39 INFO mapred.Task: Task:attempt_local966329123_0001_m_000000_0 is done. And is in the process of committing 13/10/27 01:20:39 INFO mapred.LocalJobRunner: 13/10/27 01:20:39 INFO mapred.Task: Task attempt_local966329123_0001_m_000000_0 is allowed to commit now 13/10/27 01:20:39 INFO output.FileOutputCommitter: Saved output of task 'attempt_local966329123_0001_m_000000_0' to file:/backup/workspace.programming/Impatient/part1/output/rain/_temporary/0/task_local966329123_0001_m_000000 13/10/27 01:20:39 INFO mapred.LocalJobRunner: file:/backup/workspace.programming/Impatient/part1/data/application.log:0+510 13/10/27 01:20:39 INFO mapred.Task: Task 'attempt_local966329123_0001_m_000000_0' done. 13/10/27 01:20:39 INFO mapred.LocalJobRunner: Finishing task: attempt_local966329123_0001_m_000000_0 13/10/27 01:20:39 INFO mapred.LocalJobRunner: Map task executor complete. 13/10/27 01:20:44 INFO util.Hadoop18TapUtil: deleting temp path output/application/_temporary
[STEP 6] Verify Output data
prayag@prayag:/backup/workspace.programming/Impatient/part1$ more output/application/part-00000 doc_id text doc01 A rain shadow is a dry area on the lee back side of a mountainous area. doc02 This sinking, dry air produces a rain shadow, or area in the lee of a mountain with less rain and cloudcover. doc03 A rain shadow is an area of dry land that lies on the leeward (or downwind) side of a mountain. doc04 This is known as the rain shadow effect and is the primary cause of leeward deserts of mountain ranges, such as California's Death Valley. doc05 Two Women. Secrets. A Broken Land. [DVD Australia]
References
http://docs.cascading.org/impatient/impatient1.html
https://github.com/Cascading/Impatient/tree/master/part1
Intro to cascading=flows
No comments:
Post a Comment