注:Flume框架对hadoop和zookeeper的依赖只是在jar包上,并不要求flume启动时必须将hadoop和zookeeper服务也启动
来自* <http://www.cnblogs.com/oubo/archive/2012/05/25/2517751.html >
启动flume服务器,需要用一个服务端的配置文件
flume-ng agent --conf conf --conf-file /mysoftware/flume-1.7.0/conf/flume-server.properties --name a1 -Dflume.root.logger=INFO,console > /mysoftware/flume-1.7.0/logs/flume-server.log 2>&1 &
启动flume客户端,需要用一个客户端的配置文件
flume-ng agent --conf conf --conf-file /mysoftware/flume-1.7.0/conf/flume-client.properties --name agent1 -Dflume.root.logger=INFO,console > /mysoftware/flume-1.7.0/logs/flume-client.log 2>&1 &
这里的用到了avro,貌似必须先启动服务器,后启动客户端(这里是从客户端发消息到服务端)
以下是两个例子:
注:文件均需自己创建!,两个配置文件
1. 从网络端口接收,存入HDFS
flume-server.properties: 服务端 (从avro端口到hdfs)
#set Agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# other node,nna to nns source
a1.sources.r1.type = avro
a1.sources.r1.bind = master
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1 #指定传输中event的head(头信息),常用timestamp
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Collector
a1.sources.r1.interceptors.i1.value = master
a1.sources.r1.channels = c1
#set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://master:9000/flume/logdfs
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.rollInterval=1
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
a1.sinks.k1.hdfs.fileSuffix=.txt
flume-client.properties: 客户端 (从命令结果中avro端口)
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1
#set gruop
#agent1.sinkgroups = g1
#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
#set source
agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec #指定源的类型,为一个命令
agent1.sources.r1.command = tail -F /mysoftware/flume-1.7.0/logdfs/flumetest.log
agent1.sources.r1.interceptors = i1 i2
agent1.sources.r1.interceptors.i1.type = static
agent1.sources.r1.interceptors.i1.key = Type
agent1.sources.r1.interceptors.i1.value = LOGIN
agent1.sources.r1.interceptors.i2.type = timestamp
# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro #作为中间传递者,因为它具有很快捷的传输
agent1.sinks.k1.hostname = master
agent1.sinks.k1.port = 52020
# set sink2 #(可以有多个sink)
#agent1.sinks.k2.channel = c1
#agent1.sinks.k2.type = avro
#agent1.sinks.k2.hostname = slave1
#agent1.sinks.k2.port = 52020
#set sink group
#agent1.sinkgroups.g1.sinks = k1 k2
#set failover #设置容错
#agent1.sinkgroups.g1.processor.type = failover #load_balance 还有个是负载均衡
#agent1.sinkgroups.g1.processor.priority.k1 = 10
#agent1.sinkgroups.g1.processor.priority.k2 = 1
#agent1.sinkgroups.g1.processor.maxpenalty = 10000
2. 从监听文件中接收,存入Kafka
从命令结果到kafka(启动kafka消费者来查看结果)
#agent 客户端
agent.sources = origin
agent.channels = memorychannel
agent.sinks = target
agent.sources.origin.type = exec
agent.sources.origin.command = tail -F /home/hadoop/orderslog/app.log
agent.sources.origin.channels = memorychannel
agent.sources.origin.interceptors = i1
agent.sources.origin.interceptors.i1.type = static
agent.sources.origin.interceptors.i1.key = topic
agent.sources.origin.interceptors.i1.value = ordersInfo
agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel
agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 10000
agent.sinks.target.type = avro
agent.sinks.target.channel = memorychannel
agent.sinks.target.hostname =master
agent.sinks.target.port = 4545
#collect 服务器
agent.sources = origin
agent.channels = memorychannel
agent.sinks = target
agent.sources.origin.type = avro
agent.sources.origin.channels = memorychannel
agent.sources.origin.bind = master
agent.sources.origin.port = 4545
agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel
agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 5000000
agent.channels.memorychannel.transactionCapacity = 1000000
agent.sinks.target.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.target.kafka.topic = ordersInfo #数据的主题(kafka)
agent.sinks.target.kafka.bootstrap.servers=master:9092,slave01:9092,slave02:9092,slave03:9092
agent.sinks.target.metadata.broker.list=master:9092,slave01:9092,slave02:9092,slave03:9092
agent.sinks.target.producer.type=sync
#agent.sinks.target.kafka.producer.value.serializer=kafka.serializer.DefaultEncoder
agent.sinks.target.kafka.producer.acks=1
agent.sinks.target.flumeBatchSize=100
agent.sinks.target.channel = memorychannel