2014-10-10 69 views
0

我使用Ubuntu 14.04 我有我的配置文件,如下所示:如何配置Apache Flume 1.4.0以從Twitter獲取數據並放入HDFS(Apache Hadoop版本2.5)?

TwitterAgent.sources = Twitter 
TwitterAgent.channels = MemChannel 
TwitterAgent.sinks = HDFS 
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource 
TwitterAgent.sources.Twitter.channels = MemChannel 
TwitterAgent.sources.Twitter.consumerKey = Q5JF4gVmrahNk93C913GjgJgB 
TwitterAgent.sources.Twitter.consumerSecret = GFM6F0QuqEHn1eKpL1k4CHwdecEp626xLepajp9CAbtRBxEVCC 
TwitterAgent.sources.Twitter.accessToken = 152956374-hTFXO9g1RBSn1yikmi2mQClilZe2PqnyqphFQh9t 
TwitterAgent.sources.Twitter.accessTokenSecret = SODGEbkQvHYzZMtPsWoI2k9ZKiAd7q21ebtG3SNMu3Y0a 
TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing 

TwitterAgent.sinks.HDFS.type = hdfs 
TwitterAgent.sinks.HDFS.channel = MemChannel 
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/flume/tweets/ 
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream 
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text 
#number of events written to file before it is flushed to HDFS/default 100 
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000 
#File size to trigger roll, in bytes (0: never roll based on file size) 
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0 
#Number of events written to file before it rolled (0 = never roll based #on number of events) 
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000 
TwitterAgent.channels.MemChannel.type = memory 
#The maximum number of events stored in the channel 
TwitterAgent.channels.MemChannel.capacity = 10000 
#The maximum number of events the channel will take from a source or give to a sink per #transaction 
TwitterAgent.channels.MemChannel.transactionCapacity = 100 

我用我的終端上運行以下命令:

[email protected]:/usr/lib/flume-ng/apache-flume-1.4.0-bin/bin$ ./flume-ng agent –conf ./conf/ -f /usr/lib/flume-ng/apache-flume-1.4.0-bin/conf/flume.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent 

我收到以下錯誤:

14/10/10 17:24:12 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: HDFS started 
14/10/10 17:24:12 INFO twitter4j.TwitterStreamImpl: Establishing connection. 
14/10/10 17:24:22 INFO twitter4j.TwitterStreamImpl: Connection established. 
14/10/10 17:24:22 INFO twitter4j.TwitterStreamImpl: Receiving status stream. 
14/10/10 17:24:22 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false 
14/10/10 17:24:22 INFO hdfs.BucketWriter: Creating hdfs://localhost:9000/user/flume/tweets//FlumeData.1412942062375.tmp 
14/10/10 17:24:22 ERROR hdfs.HDFSEventSink: process failed 
java.lang.VerifyError: class org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$RecoverLeaseRequestProto overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet; 
    at java.lang.ClassLoader.defineClass1(Native Method) 
    at java.lang.ClassLoader.defineClass(ClassLoader.java:800) 
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) 
    at java.net.URLClassLoader.access$100(URLClassLoader.java:71) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358) 
    at java.lang.Class.getDeclaredMethods0(Native Method) 
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2570) 
    at java.lang.Class.privateGetPublicMethods(Class.java:2690) 
    at java.lang.Class.privateGetPublicMethods(Class.java:2700) 
    at java.lang.Class.getMethods(Class.java:1467) 
    at sun.misc.ProxyGenerator.generateClassFile(ProxyGenerator.java:426) 
    at sun.misc.ProxyGenerator.generateProxyClass(ProxyGenerator.java:323) 
    at java.lang.reflect.Proxy$ProxyClassFactory.apply(Proxy.java:672) 
    at java.lang.reflect.Proxy$ProxyClassFactory.apply(Proxy.java:592) 
    at java.lang.reflect.WeakCache$Factory.get(WeakCache.java:244) 
    at java.lang.reflect.WeakCache.get(WeakCache.java:141) 
    at java.lang.reflect.Proxy.getProxyClass0(Proxy.java:455) 
    at java.lang.reflect.Proxy.newProxyInstance(Proxy.java:738) 
    at org.apache.hadoop.ipc.ProtobufRpcEngine.getProxy(ProtobufRpcEngine.java:92) 
    at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:537) 
    at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:366) 
    at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:262) 
    at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:153) 
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:602) 
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:547) 
    at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:139) 
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) 
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89) 
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2625) 
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2607) 
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) 
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) 
    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:226) 
    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:220) 
    at org.apache.flume.sink.hdfs.BucketWriter$8$1.run(BucketWriter.java:536) 
    at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:160) 
    at org.apache.flume.sink.hdfs.BucketWriter.access$1000(BucketWriter.java:56) 
    at org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:533) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

Apache Flume和Apache Hadoop的版本是否存在兼容性問題?我沒有找到任何可以幫助我安裝Apache Flume版本的好源代碼1.5.1 如果沒有兼容性問題,那麼我應該如何在我的HDFS中獲取推文?

回答

1

Hadoop是使用具有protobuf的2.5生成的protobuf 2.5

hadoop-project/pom.xml: <protobuf.version>2.5.0</protobuf.version> 

代碼是二進制不相容與舊protobuf的庫。不幸的是,當前Flume 1.4包的protobuf 2.4.1的穩定版本。 您可以通過移動Fluob的lib目錄中的protobuf和番石榴來解決此問題。

+0

現在這是較早到來的錯誤已經解決,但我們得到了一個新的錯誤: 14/10/14十五點57分19秒WARN hdfs.HDFSEventSink:HDFS IO錯誤 \t java.net.ConnectException:致電Hotshot/127.0.1.1到localhost:9000連接失敗異常:java.net.ConnectException:連接被拒絕;有關更多詳細信息,請參閱:http://wiki.apache.org/hadoop/ConnectionRefused – 2014-10-14 10:51:53

+0

上述錯誤已通過在運行hadoop namenodes和datanodes後運行命令從twitter獲取數據來解決 – 2014-10-14 11:08:26