2016-01-22 95 views
1

比方說,我有一個相對較大的文件(大約100MB),我想要多播到羣集的所有成員。我如何使用jgroups(最好是代碼演示)以塊的形式發送文件?文件應該在接收端以塊讀取。另外我怎樣才能確保數據塊的順序在接收端保持不變。如何在jgroups中多播大文件

編輯1 這是我到目前爲止嘗試過的。我只是在接收器的側發送文件作爲一個整體和寫入其內容的臨時文件

public class SimpleFileTransfer extends ReceiverAdapter { 

    JChannel channel; 

    private void start() throws Exception{ 
     channel = new JChannel(); 
     channel.setReceiver(this); 
     channel.connect("FileCluster"); 
//  channel.getState(null, 10000); 
     File file = new File("/res/test.txt"); //the file to be sent 
     eventLoop(file); 
     channel.close(); 
    } 

    private void eventLoop(File file) throws IOException{ 
     BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(file))); 
     try { 
      Message msg = new Message(null, null, in); 
      channel.send(msg); 
     } 
     catch (Exception e){ 
      e.printStackTrace(); 
     } 
    } 


    public void receive(Message msg) 
    { 
     try { 
      File temp = new File("/res/temp.txt"); 
      FileWriter writer = new FileWriter(temp); 
      InputStream in = new ByteArrayInputStream(msg.getBuffer()); 
      int next = in.read(); 
      while (next != -1){ 
       writer.write(next); 
       next = in.read(); 
      } 
     } 
     catch (IOException ie) 
     { 
      ie.printStackTrace(); 
     } 


    } 

} 
+0

這不是人們編寫代碼對你的站點。顯示你已經嘗試了什麼,然後人們會幫助你。 Downvoting。 –

+0

@我們是博格,夠公平的。我用迄今爲止的嘗試更新了這個問題。 – avidProgrammer

+0

@WeareBorg我認爲你提出了你的觀點,你不需要冷靜下來一個非常好的問題。 – kimathie

回答

3

下面是更好的版本,其口吃了起來大文件到8K的塊。 將文件X寫入/ tmp/X。需要注意的是/home/bela/fast.xml配置必須被改變,當然,:

public class SimpleFileTransfer extends ReceiverAdapter { 
protected String filename; 
protected JChannel channel; 
protected Map<String,OutputStream> files=new ConcurrentHashMap<>(); 
protected static final short ID=3500; 

private void start(String name, String filename) throws Exception { 
    ClassConfigurator.add((short)3500, FileHeader.class); 
    this.filename=filename; 
    channel=new JChannel("/home/bela/fast.xml").name(name); 
    channel.setReceiver(this); 
    channel.connect("FileCluster"); 
    eventLoop(); 
} 

private void eventLoop() throws Exception { 
    while(true) { 
     Util.keyPress(String.format("<enter to send %s>\n", filename)); 
     sendFile(); 
    } 
} 

protected void sendFile() throws Exception { 
    FileInputStream in=new FileInputStream(filename); 
    try { 
     for(;;) { 
      byte[] buf=new byte[8096]; 
      int bytes=in.read(buf); 
      if(bytes == -1) 
       break; 
      sendMessage(buf, 0, bytes, false); 
     } 
    } 
    catch(Exception e) { 
     e.printStackTrace(); 
    } 
    finally { 
     sendMessage(null, 0, 0, true); 
    } 
} 


public void receive(Message msg) { 
    byte[] buf=msg.getRawBuffer(); 
    FileHeader hdr=(FileHeader)msg.getHeader(ID); 
    if(hdr == null) 
     return; 
    OutputStream out=files.get(hdr.filename); 
    try { 
     if(out == null) { 
      File tmp=new File(hdr.filename); 
      String fname=tmp.getName(); 
      fname="/tmp/" + fname; 
      out=new FileOutputStream(fname); 
      files.put(hdr.filename, out); 
     } 
     if(hdr.eof) { 
      Util.close(files.remove(hdr.filename)); 
     } 
     else { 
      out.write(msg.getRawBuffer(), msg.getOffset(), msg.getLength()); 
     } 
    } 
    catch(Throwable t) { 
     System.err.println(t); 
    } 
} 


protected void sendMessage(byte[] buf, int offset, int length, boolean eof) throws Exception { 
    Message msg=new Message(null, buf, offset, length).putHeader(ID, new FileHeader(filename, eof)); 
    // set this if the sender doesn't want to receive the file 
    // msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK); 
    channel.send(msg); 
} 

protected static class FileHeader extends Header { 
    protected String filename; 
    protected boolean eof; 

    public FileHeader() {} // for de-serialization 

    public FileHeader(String filename, boolean eof) { 
     this.filename=filename; 
     this.eof=eof; 
    } 

    public int size() { 
     return Util.size(filename) + Global.BYTE_SIZE; 
    } 

    public void writeTo(DataOutput out) throws Exception { 
     Util.writeObject(filename, out); 
     out.writeBoolean(eof); 
    } 

    public void readFrom(DataInput in) throws Exception { 
     filename=(String)Util.readObject(in); 
     eof=in.readBoolean(); 
    } 
} 

public static void main(String[] args) throws Exception { 
    if(args.length != 2) { 
     System.out.printf("%s <name> <filename>\n", SimpleFileTransfer.class.getSimpleName()); 
     return; 
    } 
    new SimpleFileTransfer().start(args[0], args[1]); // name and file 
} 

}

0

沒有人會寫代碼你,但:

  1. 打開文件到字節陣列
  2. 分段陣列成塊
  3. 裹在說,這是其片
  4. 發送塊的包絡每個塊
  5. 閱讀信封再次將它們放回到一起

這些事情都不是很難。

+0

請參閱我的問題 – avidProgrammer

+0

的更新您的更新對我給出的建議沒有任何影響 –

1

下面是不好的解決方案。要運行它,配置需要有bundler_type =「發送者發送」(在UDP中)並且應用程序需要足夠的內存。 這個解決方案不好,因爲它將整個文件讀入一個緩衝區,該緩衝區也在JGroups中複製一次。 我將發佈的下一個解決方案更好,因爲它將大文件分成多個小塊。請注意,發送大文件JGroups也會執行內部分塊(碎片),但您仍然必須在應用程序級別創建大型字節[]緩衝區,這很糟糕。

public class SimpleFileTransfer extends ReceiverAdapter { 
protected String filename; 
protected JChannel channel; 

private void start(String name, String filename) throws Exception { 
    this.filename=filename; 
    channel=new JChannel("/home/bela/fast.xml").name(name); 
    channel.setReceiver(this); 
    channel.connect("FileCluster"); 
    eventLoop(); 
    channel.close(); 
} 

private void eventLoop() throws Exception { 
    while(true) { 
     Util.keyPress(String.format("<enter to send %s>\n", filename)); 
     sendFile(); 
    } 
} 

protected void sendFile() throws Exception { 
    Buffer buffer=readFile(filename); 
    try { 
     Message msg=new Message(null, buffer); 
     channel.send(msg); 
    } 
    catch(Exception e) { 
     e.printStackTrace(); 
    } 
} 


public void receive(Message msg) { 
    System.out.printf("received %s from %s\n", Util.printBytes(msg.getLength()), msg.src()); 
    try { 
     File temp=new File("/tmp/temp.txt"); 
     FileWriter writer=new FileWriter(temp); 
     InputStream in=new ByteArrayInputStream(msg.getBuffer()); 
     int next=in.read(); 
     while(next != -1) { 
      writer.write(next); 
      next=in.read(); 
     } 
    } 
    catch(IOException ie) { 
     ie.printStackTrace(); 
    } 
} 


protected static Buffer readFile(String filename) throws Exception { 
    File file=new File(filename); 
    int size=(int)file.length(); 
    FileInputStream input=new FileInputStream(file); 
    ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(size); 
    byte[] read_buf=new byte[1024]; 
    int bytes; 
    while((bytes=input.read(read_buf)) != -1) 
     out.write(read_buf, 0, bytes); 
    return out.getBuffer(); 
} 


public static void main(String[] args) throws Exception { 
    if(args.length != 2) { 
     System.out.printf("%s <name> <filename>\n", SimpleFileTransfer.class.getSimpleName()); 
     return; 
    } 
    new SimpleFileTransfer().start(args[0], args[1]); // name and file 
} 

}