2013-05-13 78 views
0

我正在嘗試編譯hadoop的putmerge程序,但是它不是在HDFS上創建文件,而是在我的localmachine上創建文件(我已經安裝了蝕)。putoge of hadoop in action is not working

它看起來像我的conf沒有從XML文件中選擇正確的配置。

編輯:

import java.io.IOException; 
import java.util.ArrayList; 
import java.util.Collection; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileStatus; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 

public class PlayWithHadoop { 

public static void main(String[] args) throws IOException { 
Configuration conf = new Configuration(); 
//conf.set("fs.default.name", "hdfs://localhost:54310/user/hduser"); 
//conf.set("dfs.data.dir", "/user/hduser"); 
conf.addResource(new Path("/home/hduser/hadoop/conf/core-site.xml")); 
conf.addResource(new Path("/home/hduser/hadoop/conf/hdfs-site.xml")); 

FileSystem hdfs = FileSystem.get(conf); 
FileSystem local = FileSystem.getLocal(conf); 
Path inputDir = new Path(args[0]); 
Path hdfsFile = new Path(args[1]); 

try { 
//hdfs.setWorkingDirectory(new Path("/user/hduser/hadoop")); 
FileStatus[] inputFiles = local.listStatus(inputDir); 
FSDataOutputStream out = hdfs.create(hdfsFile); 

for (int i=0; i<inputFiles.length; i++) { 
System.out.println(inputFiles[i].getPath().getName()); 
FSDataInputStream in =local.open(inputFiles[i].getPath()); 
System.out.println(); 
System.out.println(hdfs.getWorkingDirectory().toString()); 


byte buffer[] = new byte[256]; 
int bytesRead = 0; 
while((bytesRead = in.read(buffer)) > 0) { 
out.write(buffer, 0, bytesRead); 
} 
in.close(); 
} 
out.close(); 
} catch (IOException e) { 
e.printStackTrace(); 
} 
} 
} 

回答

0

在代碼中添加這些行,看看它的工作原理:

Configuration conf = new Configuration(); 
conf.addResource(new Path("/HADOOP_HOME/conf/core-site.xml")); 
conf.addResource(new Path("/HADOOP_HOME/conf/hdfs-site.xml")); 
+0

thanx的答覆,我加入這些選項,但在另一種方式等conf.set( 「fs.default.name」, 「HDFS://本地主機:54310 /用戶/ hduser」);但是,當我使用System.out.println(hdfs.getWorkingDirectory()。toString()); 它顯示hdfs:// localhost:54310/user/mohit 理想情況下它應該顯示hdfs:// localhost:54310/user/hduser – user2378089 2013-05-13 14:35:47

+0

這與我指定的屬性不同。它告訴你NN的地址,我顯示的屬性告訴你的代碼在哪裏獲取配置文件來獲取配置參數。如果你沒有添加「fs.default.name」,你的代碼仍然可以工作,恕我直言 – Tariq 2013-05-13 14:44:15

+0

我按照你所說的做了conf.addResource(新路徑(「/ home/hduser/hadoop/conf/core-site.xml 「)); conf.addResource(new Path(「/ home/hduser/hadoop/conf/hdfs-site.xml」)); 但仍然得到hdfs://本地主機:54310 /用戶/ mohit 它應該是hdfs://本地主機:54310 /用戶/ hduser – user2378089 2013-05-13 14:52:42

0

//試試這個代碼,看看你給輸入,outpath中路徑正確的罐子。

import java.io.FileNotFoundException; 
import java.io.IOException; 
import java.net.URI; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileStatus; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.fs.permission.FsPermission; 
import org.apache.hadoop.io.FloatWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.Progressable; 

/** 
* 
* @author hadoop1 
*/ 
public class PutMerge { 

    /** 
    * @param args the command line arguments 
    */ 
    public static void main(String[] args) throws IOException, ClassNotFoundException { 

     //Configuration would be taking the resource that contains set of name/value pairs as XML 
     //Since Configuration automatically loads the properties from core-site.xml in its constructor, and core-site.xml should contain fs.defaultFS, 
     Configuration conf = new Configuration(); 

     //FIleSystem Abstract class is used to get the conf key to connect to HDFS 
     //FileSystem needs only one configuration key to successfully connect to HDFS. Previously it was fs.default.name. 
     //From yarn onward it's changed to fs.defaultFS. So the following snippet is sufficient for the connection. 
     FileSystem hdfs = FileSystem.get(conf); 
     //For local connection l 
     FileSystem local = FileSystem.getLocal(conf); 

     Path inputDirectory = new Path(args[0]); 
     Path hdfsFiles = new Path(args[1]); 

     try { 

      FileStatus[] inputFiles = local.listStatus(inputDirectory); 
      FSDataOutputStream out = hdfs.create(hdfsFiles); 

      for (int i = 0; i < inputFiles.length; i++) { 
       //System.out.println(inputFiles[i].getPath().getName()); 

       FSDataInputStream in = local.open(inputFiles[i].getPath()); 
       byte buffer[] = new byte[256]; 
       int byteRead = 0; 

       while ((byteRead = in.read(buffer)) > 0) { 
        out.write(buffer, 0, byteRead); 
       } 
       in.close(); 
      } 
      out.close(); 
     } catch (Exception e) { 
      System.out.print("We have an error"); 
     } 

     Job job = Job.getInstance(conf, "PutMerge"); 
     job.setJarByClass(PutMerge.class); 
     job.setMapperClass(PutMergeMapper.class); 
     job.setCombinerClass(PutMergeReducer.class); 
     job.setReducerClass(PutMergeReducer.class); 
     job.setMapOutputKeyClass(Text.class); 
     job.setOutputValueClass(FloatWritable.class); 


     // HDFS path will contain the merged files 
     FileInputFormat.addInputPath(job, hdfsFiles); 
     FileOutputFormat.setOutputPath(job, new Path(args[2])); 

     try { 

      // For the job to wait unless one part of job is over 
      System.exit(job.waitForCompletion(true)? 0 : 1); 
      //To take the input path of multiple csvs 
     } catch (InterruptedException ex) { 
      System.out.print("Error 1 in main file %^#%$^&*(&&$%^&*(*^&%"); 
     } 
    } 
}