2011-11-05 84 views
0

我正在使用Hadoop IPC創建序列號生成服務,但無法在程序退出時停止服務器。有人能幫助我嗎?無法停止Hadoop IPC服務

import java.io.File; 
import java.io.IOException; 
import java.net.InetAddress; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Map.Entry; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.SequenceFile; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.ipc.RPC; 
import org.apache.hadoop.ipc.Server; 

import dk.aau.cs.cloudetl.common.CEConstants; 
import dk.aau.cs.cloudetl.hadoop.fs.FSUtil; 

public class SequenceServer extends Thread implements ClientProtocol { 

    Map<String, Integer> seqMap = new HashMap<String, Integer>(); 
    Configuration conf; 
    Server server; 
    Path seqFile; 
    volatile private boolean running = true;   

    public SequenceServer(Configuration conf) { 
     try { 
      this.conf = conf; 
      this.seqFile = new Path(CEConstants.META_DIR + Path.SEPARATOR 
        + "cloudETL.seq"); 

      InetAddress addr = InetAddress.getLocalHost(); 
      server = RPC.getServer(this, addr.getHostName(),CEConstants.SEQ_SERVER_PORT, 5, true, conf); 

     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void run() { 
     try { 
      readSeqsFromHDFS(); 
      server.start(); 


      System.out.println("=============Start=============="); 
      while(running){ 
       sleep(5000); 
      } 
      System.out.println("=============END=============="); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 



    private void readSeqsFromHDFS() { 
     try { 
      FileSystem fs = FileSystem.getLocal(conf); 
      if (fs.exists(seqFile)) { 
       SequenceFile.Reader reader = new SequenceFile.Reader(fs, 
         seqFile, conf); 
       Text key = new Text(); 
       IntWritable value = new IntWritable(); 
       while (reader.next(key, value)) { 
        String name = key.toString(); 
        int seq = value.get(); 
        seqMap.put(name, seq); 
       } 
      } 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 

    private void writeSeqsToHDFS() { 
     try { 
      FileSystem fs = FileSystem.getLocal(conf); 
      Path tmp = new Path(seqFile.getParent() + Path.SEPARATOR 
        + "tmp.seq"); 
      SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, 
        tmp, Text.class, IntWritable.class); 
      for (Entry<String, Integer> entry : seqMap.entrySet()) { 
       String name = entry.getKey(); 
       int seq = entry.getValue(); 
       writer.append(new Text(name), new IntWritable(seq)); 
      } 
      writer.close(); 

      FSUtil.replaceFile(new File(tmp.toString()), 
        new File(seqFile.toString())); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 

    } 

    synchronized public void stopServer() { 
     try { 
      System.out.println(server.getNumOpenConnections()); 
      server.stop(); 

      writeSeqsToHDFS(); 
      running = false; 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public long getProtocolVersion(String protocol, long clientVersion) 
      throws IOException { 
     return versionID; 
    } 

    @Override 
    synchronized public IntWritable nextSeq(Text name) { 
     String seqName = name.toString(); 
     if (!seqMap.containsKey(seqName)) { 
      seqMap.put(seqName, new Integer(CEConstants.SEQ_INCR_DELTA)); 
      return new IntWritable(0); 
     } else { 
      int ret = seqMap.get(seqName); 
      seqMap.put(seqName, ret + CEConstants.SEQ_INCR_DELTA); 
      return new IntWritable(ret); 
     } 
    } 

    public static void main(String[] args) { 
     SequenceServer server = new SequenceServer(new Configuration()); 
     server.start(); 

     server.stopServer(); 
    } 
} 

我有另一個客戶端程序來獲取唯一編號。我不會在這裏發帖。


感謝您的回答。正如你所說,我知道這個問題。但是,我目前的問題是無法停止RPC服務器。由於RPC服務器作爲守護進程模式運行,即使我運行stop(),它仍然無法退出。你可以試試這個:

import java.io.IOException; 
import java.net.InetAddress; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.ipc.RPC; 
import org.apache.hadoop.ipc.Server; 

public class Test { 
    Server server; 

    public Test() { 
     try { 
      InetAddress addr = InetAddress.getLocalHost(); 
      server = RPC.getServer(this, addr.getHostName(),16000, 5, true, new Configuration()); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 

    public void start(){ 
     server.start(); 
    } 

    public void stop(){ 
     server.stop(); 
    } 

    public static void main(String[] args) { 
     Test test = new Test(); 
     test.start(); 
     test.stop(); 
    } 
} 

謝謝!但它仍然不起作用。你能試試我的例子嗎?你只需複製並保存爲Test.java,然後運行它。你會看到它不能退出主線程。

import java.io.IOException; 
import java.net.InetAddress; 
import java.net.InetSocketAddress; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.ipc.RPC; 
import org.apache.hadoop.ipc.Server; 

interface MyProtocal extends org.apache.hadoop.ipc.VersionedProtocol { 
    public static final long versionID = 1L; 

    IntWritable nextSeq(Text name); 
} 

public class Test implements MyProtocal { 
    Server server; 

    public Test() { 
     try { 
      InetAddress addr = InetAddress.getLocalHost(); 
      server = RPC.getServer(this, addr.getHostName(), 16000, 5, true, 
        new Configuration()); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 

    public void start() { 
     server.start(); 
    } 

    public void stop() { 
     server.stop(); 
    } 

    @Override 
    public long getProtocolVersion(String protocol, long clientVersion) 
      throws IOException { 
     return versionID; 
    } 

    @Override 
    public IntWritable nextSeq(Text name) { 
     return new IntWritable(999); 
    } 

    static class SEQ { 
     MyProtocal client; 

     public SEQ() { 
      InetAddress addr; 
      try { 
       addr = InetAddress.getLocalHost(); 
       client = (MyProtocal) RPC.waitForProxy(MyProtocal.class, 
         MyProtocal.versionID, 
         new InetSocketAddress(addr.getHostName(), 16000), 
         new Configuration()); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 

     public void print() { 
      System.out.println(client.nextSeq(new Text("aa")).get()); 
     } 

     public void stop() { 
      RPC.stopProxy(client); 
     } 
    } 

    public static void main(String[] args) { 
     Test server = new Test(); 
     server.start(); 

     SEQ seq = new SEQ(); 
     seq.print(); 
     seq.stop(); 
     server.stop(); 
    } 
} 

回答

0

您的設計已損壞。 爲什麼你需要在一個單獨的線程中執行你的程序?

它已經在主線程中運行,RPC服務器也在單獨的線程中運行。

我的建議是刪除自己的線程,只需調用沒有while()循環的運行方法,然後停止服務器。

通則說明:實施Runnable,而不是如果你需要堅持你的無用線程Thread

延伸,那麼在你的主要方法server.join()調用,並從run方法的最後調用stopServer()。

+0

感謝您的回答。正如你所說,我知道這個問題。但是,我目前的問題是無法停止RPC服務器。由於RPC服務器作爲守護進程模式運行,即使我運行stop(),它仍然無法退出。你可以試試 – afancy

+0

嗨,我在幾分鐘前做了Hadoop RPC和avro之間的基準測試。這是我的代碼:https://github.com/thomasjungblut/hama-avro/blob/master/src/de/jungblut/avro/AvroRPCTest.java#L141 –

+0

在我的情況下,服務器完全關閉。 –