2017-08-28 63 views
1

我使用的是Ignite 2.1.0,我創建了一個簡單的程序來嘗試DataStreamer,但我得到的錯誤如下: 「[diagnostic]未能等待分區映射交換」或「嘗試釋放寫鎖而不保持它「。 我啓動了兩個本地節點,一個是使用示例xml配置在Windows CMD中啓動的,另一個是在Eclipse中啓動的。我在Eclipse代碼:DataStreamer不能正常工作

public class TestDataStreamer { 

    public static void main(String[] args) { 
     // TODO Auto-generated method stub 
     long bgn,end; 
     IgniteConfiguration cfg = new IgniteConfiguration(); 
     cfg.setPeerClassLoadingEnabled(true); 
     Ignite ignite = Ignition.start(cfg); 
     CacheConfiguration<Long, Map> cacheConf = new CacheConfiguration(); 
     cacheConf.setName("TestDataStreamer").setCacheMode(CacheMode.REPLICATED); 
     cacheConf.setBackups(0); 
     IgniteCache cache = ignite.getOrCreateCache(cacheConf); 
     cache.clear(); 
     File dataFile = new File("D:/data/1503307171374.data"); //10,000,000 rows text data 
     bgn = System.currentTimeMillis(); 
     try { 
      loadByStreamer(dataFile,ignite,"TestDataStreamer"); 
     } catch (Exception e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } finally { 
      end = System.currentTimeMillis(); 
      System.out.println("---------------"); 
      System.out.println((end-bgn)/1000.0+" s"); 
     } 
     cache.destroy(); 
     System.out.println("cache destroy..."); 
     ignite.close(); 
     System.out.println("finish"); 
    } 

    private static void loadByStreamer(File dataFile, Ignite ignite, String cacheName) throws Exception { 
     IgniteDataStreamer<Long,TestObj> ds = ignite.dataStreamer(cacheName); 
     //ds.allowOverwrite(true); 
     ds.autoFlushFrequency(10000); 
     ds.perNodeBufferSize(4096); 
     BufferedReader br = new BufferedReader(new InputStreamReader(
       new FileInputStream(dataFile),"UTF-8")); 
     String line = null; 
     long count = 0; 
     while((line=br.readLine())!=null){ 
      ds.addData(System.currentTimeMillis(), parseData(line, Constants.DEFAULT_SEPARATOR, 
        "id,sn,type_code,trade_ts,bill_ts,company_code,company_name,biz_type,charge_amt,pay_mode".split(","))); 
      if(++count%10000==0){ 
       System.out.println(count+" loaded..."); 
      } 
      //System.out.println(count+":"+line); 
     } 
     System.out.println("flushing..."); 
     ds.flush(); 
     System.out.println("flushed"); 
     br.close(); 
     ds.close(); 
     System.out.println("file handled..."); 
    } 

    private static TestObj parseData(String data, String saperator, String[] fields){ 
     TestObj obj = new TestObj(); 
     if(data!=null && saperator.trim().length()>0){ 
      String[] values = data.split(saperator); 
      obj.setId(values[0]); 
      obj.setSn(values[1]); 
      obj.setType_code(values[2]); 
      obj.setTrade_ts(values[3]); 
      obj.setBill_ts(values[4]); 
      obj.setCompany_code(values[5]); 
      obj.setCompany_name(values[6]); 
      obj.setBiz_type(values[7]); 
      obj.setCharge_amt(values[8]); 
      obj.setPay_mode(values[9]); 
     } 
     return obj; 
    } 
} 

class TestObj { 
    private String id; 
    private String sn; 
    private String type_code; 
    private String trade_ts; 
    private String bill_ts; 
    private String company_code; 
    private String company_name; 
    private String biz_type; 
    private String charge_amt; 
    private String pay_mode; 
    public String getId() { 
     return id; 
    } 
    public void setId(String id) { 
     this.id = id; 
    } 
    public String getSn() { 
     return sn; 
    } 
    public void setSn(String sn) { 
     this.sn = sn; 
    } 
    public String getType_code() { 
     return type_code; 
    } 
    public void setType_code(String type_code) { 
     this.type_code = type_code; 
    } 
    public String getTrade_ts() { 
     return trade_ts; 
    } 
    public void setTrade_ts(String trade_ts) { 
     this.trade_ts = trade_ts; 
    } 
    public String getBill_ts() { 
     return bill_ts; 
    } 
    public void setBill_ts(String bill_ts) { 
     this.bill_ts = bill_ts; 
    } 
    public String getCompany_code() { 
     return company_code; 
    } 
    public void setCompany_code(String company_code) { 
     this.company_code = company_code; 
    } 
    public String getCompany_name() { 
     return company_name; 
    } 
    public void setCompany_name(String company_name) { 
     this.company_name = company_name; 
    } 
    public String getBiz_type() { 
     return biz_type; 
    } 
    public void setBiz_type(String biz_type) { 
     this.biz_type = biz_type; 
    } 
    public String getCharge_amt() { 
     return charge_amt; 
    } 
    public void setCharge_amt(String charge_amt) { 
     this.charge_amt = charge_amt; 
    } 
    public String getPay_mode() { 
     return pay_mode; 
    } 
    public void setPay_mode(String pay_mode) { 
     this.pay_mode = pay_mode; 
    } 
} 

如果停在CMD啓動的節點和只在一個節點上運行的程序,效果很好。 有沒有人可以幫助我?

+0

您使用哪個jdk版本? –

+0

1.8.0_144,64位 – CrazyRen

+0

您確定兩個節點使用相同的版本1.8.0_144,64位? –

回答

1

將兩個節點的jdk更新到相同版本,例如1.8.0_144(因爲您已經安裝了它),或者至少嘗試在eclipse中更新idk到最新的1.7版本,它應該有助於。

有一個thread on Ignite user list,當人們面對相同的異常和Java版本的更新幫助他們修復它。

+0

我已經嘗試在VM中使用Linux而不是Windows CMD來啓動節點,並且Linux和Eclipse都使用jdk 1.8.0_144,這沒關係。非常感謝。 – CrazyRen