Sunday, 24 November 2013

Data flow from source to sink using Cascading 2.2.0 and Hadoop 2.2.0


Apache cascading is a abstraction layer for hadoop mapreduce. Which means I can write complex data processing and data flows using Apache cascading.

cas·cade /kasˈkād/ v. (of water) pour downward rapidly and in large quantities


I am creating a simple example that will stream the data in local Filesystem(called as Source) to destination (called Sink) using a Cascading channel(called Pipe).

[STEP 1] download and tar hadoop 2.2.0
$ wget http://www.eng.lsu.edu/mirrors/apache/hadoop/common/hadoop-2.2.0/hadoop-2.2.0.tar.gz
&& mv hadoop-2.2.0.tar.gz /usr/local
&& cd /usr/local && tar -zxvf hadoop-2.2.0.tar.gz

[STEP 2] configure HADOOP_HOME
ADOOP_HOME=/usr/local/hadoop-2.2.0; export HADOOP_HOME
PATH=$HADOOP_HOME/bin:$PATH; export PATH

[STEP 3] setup gradle and then create a gradle project (part1) with following build.gradle

It includes the cascading dependencies.
repositories {
  mavenLocal()
  mavenCentral()
  mavenRepo name: 'conjars', url: 'http://conjars.org/repo/'
}


ext.cascadingVersion = '2.2.0'
ext.hadoopVersion = '1.1.2'

dependencies {
  compile( group: 'cascading', name: 'cascading-core', version: cascadingVersion )
  compile( group: 'cascading', name: 'cascading-local', version: cascadingVersion )
  compile( group: 'cascading', name: 'cascading-hadoop', version: cascadingVersion )
  providedCompile( group: 'org.apache.hadoop', name: 'hadoop-core', version: hadoopVersion )
}


jar {
  description = "Assembles a Hadoop ready jar file"
  doFirst {
    into( 'lib' ) {
      from configurations.compile
    }
  }

  manifest {
    attributes( "Main-Class": "logstream/DistributedFileCopy" )
  }

}



[STEP 4] create a flow to stream data from local file system source to sink.
package impatient;
import java.util.Properties;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;

public class DistributedFileCopy {

  public static void main( String[] args ){
    String inPath         = args[ 0 ]; //data/application.log
    String outPath        = args[ 1 ];
    Properties properties = new Properties();
    AppProps.setApplicationJarClass( properties, DistributedFileCopy.class );

    HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );

    // 1. create the source tap (spout in Storm)
    Tap inTap  = new Hfs( new TextDelimited( true, "\t" ), inPath );
    // 2. create the sink tap
    Tap outTap = new Hfs( new TextDelimited( true, "\t" ), outPath );

    // 3. specify a pipe to connect the taps
    Pipe channel = new Pipe( "copy" );

    //4. connect the taps, pipes, etc., into a flow (topology in Storm)
    FlowDef flowDef = FlowDef.flowDef()
                             .addSource( channel, inTap )
                             .addTailSink( channel, outTap );
    // 5. run the flow
    flowConnector.connect( flowDef ).complete();
    }

  }



Create an input file(input stream) at data/application.log

doc_id        text

doc01        A rain shadow is a dry area on the lee back side of a mountainous area.

doc02        This sinking, dry air produces a rain shadow, or area in the lee of a mountain with less rain and cloudcover.

doc03        A rain shadow is an area of dry land that lies on the leeward (or downwind) side of a mountain.

doc04        This is known as the rain shadow effect and is the primary cause of leeward deserts of mountain ranges, such as California's Death Valley.

doc05        Two Women. Secrets. A Broken Land. [DVD Australia]

[STEP 5] run app supplying data/application.log as input file
prayag@prayag:/backup/workspace.programming/Impatient/part1$ hadoop jar build/libs/logstream.jar data/application.log output/application
13/10/27 01:20:36 INFO util.HadoopUtil: resolving application jar from found main method on: logstream.DistributedFileCopy
13/10/27 01:20:36 INFO planner.HadoopPlanner: using application jar: /backup/workspace.programming/Impatient/part1/build/libs/logstream.jar
13/10/27 01:20:36 INFO property.AppProps: using app.id: 80EA1575557B4AD6B0677A5C78D7AB73
13/10/27 01:20:37 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
13/10/27 01:20:37 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
13/10/27 01:20:37 INFO mapred.FileInputFormat: Total input paths to process : 1
13/10/27 01:20:37 INFO Configuration.deprecation: mapred.used.genericoptionsparser is deprecated. Instead, use mapreduce.client.genericoptionsparser.used
13/10/27 01:20:37 INFO Configuration.deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
13/10/27 01:20:38 INFO util.Version: Concurrent, Inc - Cascading 2.2.0
13/10/27 01:20:38 INFO flow.Flow: [] starting
13/10/27 01:20:38 INFO flow.Flow: []  source: Hfs["TextDelimited[['doc_id', 'text']]"]["data/application.log"]
13/10/27 01:20:38 INFO flow.Flow: []  sink: Hfs["TextDelimited[['doc_id', 'text']]"]["output/application"]
13/10/27 01:20:38 INFO flow.Flow: []  parallel execution is enabled: false
13/10/27 01:20:38 INFO flow.Flow: []  starting jobs: 1
13/10/27 01:20:38 INFO flow.Flow: []  allocating threads: 1
13/10/27 01:20:38 INFO flow.FlowStep: [] starting step: (1/1) output/application
13/10/27 01:20:38 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
13/10/27 01:20:38 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
13/10/27 01:20:38 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
13/10/27 01:20:38 INFO Configuration.deprecation: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
13/10/27 01:20:38 INFO mapred.FileInputFormat: Total input paths to process : 1
13/10/27 01:20:38 INFO mapreduce.JobSubmitter: number of splits:1
13/10/27 01:20:38 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name
13/10/27 01:20:38 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
13/10/27 01:20:38 INFO Configuration.deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
13/10/27 01:20:38 INFO Configuration.deprecation: mapred.output.key.comparator.class is deprecated. Instead, use mapreduce.job.output.key.comparator.class
13/10/27 01:20:38 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
13/10/27 01:20:38 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
13/10/27 01:20:38 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
13/10/27 01:20:38 INFO Configuration.deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
13/10/27 01:20:38 INFO Configuration.deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
13/10/27 01:20:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local966329123_0001
13/10/27 01:20:38 WARN conf.Configuration: file:/tmp/hadoop-prayag/mapred/staging/prayag966329123/.staging/job_local966329123_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
13/10/27 01:20:38 WARN conf.Configuration: file:/tmp/hadoop-prayag/mapred/staging/prayag966329123/.staging/job_local966329123_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
13/10/27 01:20:38 WARN conf.Configuration: file:/tmp/hadoop-prayag/mapred/local/localRunner/prayag/job_local966329123_0001/job_local966329123_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
13/10/27 01:20:38 WARN conf.Configuration: file:/tmp/hadoop-prayag/mapred/local/localRunner/prayag/job_local966329123_0001/job_local966329123_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.

13/10/27 01:20:38 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
13/10/27 01:20:38 INFO mapred.LocalJobRunner: OutputCommitter set in config null
13/10/27 01:20:38 INFO flow.FlowStep: [] submitted hadoop job: job_local966329123_0001
13/10/27 01:20:38 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
13/10/27 01:20:39 INFO mapred.LocalJobRunner: Waiting for map tasks
13/10/27 01:20:39 INFO mapred.LocalJobRunner: Starting task: attempt_local966329123_0001_m_000000_0
13/10/27 01:20:39 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
13/10/27 01:20:39 INFO io.MultiInputSplit: current split input path: file:/backup/workspace.programming/Impatient/part1/data/application.log
13/10/27 01:20:39 INFO mapred.MapTask: Processing split: cascading.tap.hadoop.io.MultiInputSplit@e8848c
13/10/27 01:20:39 INFO mapred.MapTask: numReduceTasks: 0
13/10/27 01:20:39 INFO hadoop.FlowMapper: cascading version: 2.2.0
13/10/27 01:20:39 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m
13/10/27 01:20:39 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
13/10/27 01:20:39 INFO hadoop.FlowMapper: sourcing from: Hfs["TextDelimited[['doc_id', 'text']]"]["data/application.log"]
13/10/27 01:20:39 INFO hadoop.FlowMapper: sinking to: Hfs["TextDelimited[['doc_id', 'text']]"]["output/application"]
13/10/27 01:20:39 INFO mapred.LocalJobRunner:
13/10/27 01:20:39 INFO mapred.Task: Task:attempt_local966329123_0001_m_000000_0 is done. And is in the process of committing
13/10/27 01:20:39 INFO mapred.LocalJobRunner:
13/10/27 01:20:39 INFO mapred.Task: Task attempt_local966329123_0001_m_000000_0 is allowed to commit now
13/10/27 01:20:39 INFO output.FileOutputCommitter: Saved output of task 'attempt_local966329123_0001_m_000000_0' to file:/backup/workspace.programming/Impatient/part1/output/rain/_temporary/0/task_local966329123_0001_m_000000
13/10/27 01:20:39 INFO mapred.LocalJobRunner: file:/backup/workspace.programming/Impatient/part1/data/application.log:0+510
13/10/27 01:20:39 INFO mapred.Task: Task 'attempt_local966329123_0001_m_000000_0' done.
13/10/27 01:20:39 INFO mapred.LocalJobRunner: Finishing task: attempt_local966329123_0001_m_000000_0
13/10/27 01:20:39 INFO mapred.LocalJobRunner: Map task executor complete.
13/10/27 01:20:44 INFO util.Hadoop18TapUtil: deleting temp path output/application/_temporary



[STEP 6] Verify Output data 
prayag@prayag:/backup/workspace.programming/Impatient/part1$ more output/application/part-00000

doc_id text

doc01 A rain shadow is a dry area on the lee back side of a mountainous area.

doc02 This sinking, dry air produces a rain shadow, or area in the lee of a mountain with less rain and cloudcover.

doc03 A rain shadow is an area of dry land that lies on the leeward (or downwind) side of a mountain.

doc04 This is known as the rain shadow effect and is the primary cause of leeward deserts of mountain ranges, such as California's Death Valley.

doc05 Two Women. Secrets. A Broken Land. [DVD Australia]



References
http://docs.cascading.org/impatient/impatient1.html
https://github.com/Cascading/Impatient/tree/master/part1

Intro to cascading=flows

Saturday, 23 November 2013

Hacking on scala play framework 2.0.6


STEP 1 Download and install play framework
$ mv play-2.0.6.zip /opt
$ cd /opt
$ sudo unzip play-2.0.6.zip
$ vi /etc/profile
PLAY_PATH=/opt/play-2.0.6; export PLAY_PATH
PATH=$PLAY_PATH:$PATH; export PATH

$ sudo chmod 777 -R play-2.0.6

STEP 2 create an app
$ play new shaharma


STEP 3 project props

$ cat project/plugins.sbt
  1 // Comment to get more information during initialization
  2 logLevel := Level.Warn
  3
  4 // The Typesafe repository
  5 resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/"
  6
  7 // Use the Play sbt plugin for Play projects
  8 addSbtPlugin("play" % "sbt-plugin" % "2.0.6")


$ cat project/Build.scala
  1 import sbt._
  2 import Keys._
  3 import PlayProject._
  4
  5 object ApplicationBuild extends Build {
  6
  7     val appName         = "shaharma"
  8     val appVersion      = "1.0-SNAPSHOT"
  9
 10     val appDependencies = Seq(
 11       // Add your project dependencies here,
 12     )
 13
 14     val main = PlayProject(appName, appVersion, appDependencies, mainLang = SCALA).settings(
 15       // Add your own project settings here
 16     )
 17
 18 }


$ cat project/build.properties
sbt.version=0.11.3

STEP 4 run-app at port 9000
$ play run


Sunday, 17 November 2013

count lines of code (loc) using terminal


prayag@prayag:/programming//Healthcare/Webservice/src/java/com/eccount/trending/elasticsearch/rest/action/listeners/transaction$ cat *.java | grep '[;{]' | wc -l
724



Reference

Sunday, 3 November 2013

Devkota’s Twentieth Century Prometheus


In Greek mythology, Prometheus is a Titan, culture hero, and trickster figure who is credited with the creation of man from clay, and who defies the gods and gives fire to humanity (theft of fire), an act that enabled progress and civilization. He is known for his intelligence and as a champion of mankind. (wikipedia, 2013)



In 20th Century, While working on Greek mythology Laxmi Prasad Devkota has used translation as a liberating force on the one hand and captivating force on the other. In his efforts of liberating and captivating these characters,he has created a different type of myth. Devkota was endowed with unleashing poetic gifts and capacity of creating and recreating myths.By redescribing Greek myths from the perspective of his own socio-political and cultural environs, Devkota has recreated a different mythic reality. In the process of transcreation, this myth creator has become a myth for the successive generations. (Translation and Mythology A case of Devkotas Promithas - Bal Ram Adhikari, 2013)

An excerpt from Devkota's masterpiece when Prometheus stirs the human beings to chant
"बोल हो मानव , झरोस , झरोस , गिरोस , जिउस "(Translation : Say, oh humans, may Zeus fall, may he fall.)

References
http://en.wikipedia.org/wiki/Prometheus#Prometheus_in_the_Twentieth_Century
http://en.wikipedia.org/wiki/Laxmi_Prasad_Devkota
http://www.academia.edu/4657380/Translation_and_Mythology_A_case_of_Devkotas_Promithas