Thursday, 22 September 2016

use flume to stream from avro source and publish to logger sink


flume = a deep narrow channel or ravine with a stream running through it. 

PART 1 - setup flume (can be dockerized)
wget --proxy=off http://apache.claz.org/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz
tar -zxvf apache-flume-1.5.0-bin.tar.gz -C /opt/flume --strip-components=1
rm apache-flume-1.6.0-bin.tar
sudo chmod -R 777 /opt/flume

PART 2 start the flume node (in a docker container)

conf/supply_chain_flume.conf

# Define a memory channel called ch1 on supply_agent                                                        
supply_agent.channels.logEventStream.type = memory

# Define an Avro source called avro-source1 on agent1 and tell it                                     
# to bind to 0.0.0.0:41414. Connect it to channel ch1.                                               
supply_agent.sources.avro-source1.channels = logEventStream
supply_agent.sources.avro-source1.type = avro
supply_agent.sources.avro-source1.bind = 0.0.0.0
supply_agent.sources.avro-source1.port = 41414

# Define a logger sink that simply logs all events it receives                                       
# and connect it to the other end of the same channel.                                               
supply_agent.sinks.log-sink1.channel = logEventStream
supply_agent.sinks.log-sink1.type = logger

# Finally, now that we've defined all of our components, tell                                         
# agent1 which ones we want to activate.                                                           
supply_agent.channels = logEventStream
supply_agent.sources = avro-source1
supply_agent.sinks = log-sink1


start flume agent

bin/flume-ng agent --conf ./conf/ -f conf/supply_chain_flume.conf -Dflume.root.logger=DEBUG,console -n supply_agent

Info: Sourcing environment configuration script /usr/local/apache-flume-1.6.0-bin/conf/flume-env.sh
+ exec /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home//bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/usr/local/apache-flume-1.6.0-bin/conf:/usr/local/apache-flume-1.6.0-bin/lib/*' -Djava.library.path= org.apache.flume.node.Application -f conf/flume.conf -n supply_agent
2016-09-22 00:08:49,909 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
2016-09-22 00:08:49,912 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:78)] Configuration provider started
2016-09-22 00:08:49,913 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:conf/flume.conf for changes
2016-09-22 00:08:49,914 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:conf/flume.conf
2016-09-22 00:08:49,918 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:log-sink1
2016-09-22 00:08:49,918 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1021)] Created context for log-sink1: channel
2016-09-22 00:08:49,918 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:931)] Added sinks: log-sink1 Agent: supply_agent
2016-09-22 00:08:49,918 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:log-sink1
2016-09-22 00:08:49,919 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:314)] Starting validation of configuration for agent: supply_agent, initial-configuration: AgentConfiguration[supply_agent]
SOURCES: {avro-source1={ parameters:{bind=0.0.0.0, channels=ch1, port=41414, type=avro} }}
CHANNELS: {ch1={ parameters:{type=memory} }}
SINKS: {log-sink1={ parameters:{channel=ch1, type=logger} }}

2016-09-22 00:08:49,922 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:469)] Created channel ch1
2016-09-22 00:08:49,926 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:675)] Creating sink: log-sink1 using LOGGER
2016-09-22 00:08:49,927 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:372)] Post validation configuration for supply_agent
AgentConfiguration created without Configuration stubs for which only basic syntactical validation was performed[supply_agent]
SOURCES: {avro-source1={ parameters:{bind=0.0.0.0, channels=ch1, port=41414, type=avro} }}
CHANNELS: {ch1={ parameters:{type=memory} }}
AgentConfiguration created with Configuration stubs for which full validation was performed[supply_agent]
SINKS: {log-sink1=ComponentConfiguration[log-sink1]
  CONFIG:
    CHANNEL:ch1
}

2016-09-22 00:08:49,927 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)] Channels:ch1

2016-09-22 00:08:49,927 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)] Sinks log-sink1

2016-09-22 00:08:49,927 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:138)] Sources avro-source1

2016-09-22 00:08:49,927 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:141)] Post-validation flume configuration contains configuration for agents: [supply_agent]
2016-09-22 00:08:49,928 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:145)] Creating channels
2016-09-22 00:08:49,933 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel ch1 type memory
2016-09-22 00:08:49,936 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:200)] Created channel ch1
2016-09-22 00:08:49,937 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source avro-source1, type avro
2016-09-22 00:08:49,953 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: log-sink1, type: logger
2016-09-22 00:08:49,955 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:114)] Channel ch1 connected to [avro-source1, log-sink1]
2016-09-22 00:08:49,960 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{avro-source1=EventDrivenSourceRunner: { source:Avro source avro-source1: { bindAddress: 0.0.0.0, port: 41414 } }} sinkRunners:{log-sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2d9e85b0 counterGroup:{ name:null counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel{name: ch1}} }
2016-09-22 00:08:49,969 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel ch1
2016-09-22 00:08:50,021 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean.
2016-09-22 00:08:50,021 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: CHANNEL, name: ch1 started
2016-09-22 00:08:50,021 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink log-sink1
2016-09-22 00:08:50,022 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source avro-source1
2016-09-22 00:08:50,022 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:228)] Starting Avro source avro-source1: { bindAddress: 0.0.0.0, port: 41414 }...
2016-09-22 00:08:50,023 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)] Polling sink runner starting
2016-09-22 00:08:50,254 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SOURCE, name: avro-source1: Successfully registered new MBean.
2016-09-22 00:08:50,254 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: avro-source1 started
2016-09-22 00:08:50,254 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:253)] Avro source avro-source1 started.


2016-09-22 00:09:50,023 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:conf/flume.conf for changes



PART 3 - publish to avro source listening on port 41414
Running an Avro client, sends either a file or data from stdin to a specified host and port where a Flume NG Avro Source is listening.

/var/log/supply_source.log
{"timemillis" : 2876873673, "correlation" : 1, item : "pants"}
{"timemillis" : 8347583748, "correlation" : 2, item : "shirts"}

bin/flume-ng avro-client --conf conf -H localhost -p 41414 -F /var/log/supply_source.log -Dflume.root.logger=DEBUG,console

Now, the sink receives two events.


2016-09-22 00:36:36,610 (New I/O server boss #1 ([id: 0x4ca942fa, /0:0:0:0:0:0:0:0:41414])) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x49d18056, /127.0.0.1:50833 => /127.0.0.1:41414] OPEN
2016-09-22 00:36:36,610 (New I/O  worker #4) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x49d18056, /127.0.0.1:50833 => /127.0.0.1:41414] BOUND: /127.0.0.1:41414
2016-09-22 00:36:36,610 (New I/O  worker #4) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x49d18056, /127.0.0.1:50833 => /127.0.0.1:41414] CONNECTED: /127.0.0.1:50833
2016-09-22 00:36:36,802 (New I/O  worker #4) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 2 events.

2016-09-22 00:36:36,802 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 7B 22 63 6F 72 72 65 6C 61 74 69 6F 6E 22 20 3A {"correlation" : }
2016-09-22 00:36:36,802 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 7B 22 63 6F 72 72 65 6C 61 74 69 6F 6E 22 20 3A {"correlation" : }
2016-09-22 00:36:36,817 (New I/O  worker #4) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x49d18056, /127.0.0.1:50833 :> /127.0.0.1:41414] DISCONNECTED
2016-09-22 00:36:36,817 (New I/O  worker #4) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x49d18056, /127.0.0.1:50833 :> /127.0.0.1:41414] UNBOUND
2016-09-22 00:36:36,817 (New I/O  worker #4) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x49d18056, /127.0.0.1:50833 :> /127.0.0.1:41414] CLOSED

2016-09-22 00:36:36,818 (New I/O  worker #4) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)] Connection to /127.0.0.1:50833 disconnected.

Reference
docker flume example, https://github.com/prayagupd/docker-flume

No comments:

Post a Comment