2013-04-09 64 views
2

我正在處理一個需要將數據插入到Cassandra數據庫的項目。因此,我正在使用Pelops clientjava.lang.RuntimeException:註冊MBean時發生異常com.scale7.cassandra.pelops.pool:type = PooledNode-my_keyspace-localhost

我有一個多線程代碼,它將使用Pelops client插入Cassandra數據庫。我正在使用ExecutorService

在我的程序,每個線程都工作在一定範圍內,比如

Thread1 will work on 1 to 20 
Thread2 will work on 21 to 40 
... 
... 

下面是代碼我有我使用插入到Cassandra的數據庫 -

private static int noOfThreads = 5; 
private static int noOfTasks = 100; 
private static int startRange = 1; 

    public static void main(String[] args) { 

     LOG.info("Loading data in Cassandra database..!!"); 

     ExecutorService service = Executors.newFixedThreadPool(noOfThreads); 

     try { 
      // queue some tasks 
      for (int i = 0, nextId = startRange; i < noOfThreads; i++, nextId += noOfTasks) { 

       service.submit(new CassandraTask(nextId, noOfTasks)); 
      } 

      service.shutdown(); 

      service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
     } catch (InterruptedException e) { 
      LOG.warn("Threw a Interrupted Exception in" + CNAME + ".PelopsLnPClient: boss told me to stop...Not my fault!!"); 
     } catch (Exception e) { 
      LOG.error("Threw a Exception in" + CNAME + e); 
     } 
    } 

下面是CassandraTask class實施Runnable interface

class CassandraTask implements Runnable { 

    private final int id; 
    private final int noOfTasks; 

    private final String nodes = "localhost"; 
    private final String thrift_connection_pool = "Test Cluster"; 
    private final String keyspace = "my_keyspace"; 
    private final String column_family = "PROFILE_USER"; 

     public CassandraTask(int nextId, int noOfTasks) { 
      this.id = nextId; 
      this.noOfTasks = noOfTasks; 

     } 


     public void run() { 

      try { 

       cassandraConnection(); 
       Mutator mutator = Pelops.createMutator(thrift_connection_pool); 

       for (int userId = id; userId < id + noOfTasks; userId++) { 

        mutator.writeColumns(column_family, String.valueOf(userId), 
          mutator.newColumnList(
            mutator.newColumn("unt", "{\"lv\":[{\"v\":{\"regSiteId\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}"), 
            mutator.newColumn("rtising", "{\"lv\":[{\"v\":{\"thirdPartyAdsOnhostdomain\":null,\"hostdomainAdsOnThirdParty\":null,\"userId\":" + userId + "},\"cn\":2}],\"lmd\":20130206211109}"), 
            mutator.newColumn("selling_price_main_cats", "{\"lv\":[{\"v\":{\"regSiteId\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}"), 
            mutator.newColumn("and_keyword_rules", "{\"lv\":[{\"v\":{\"regSiteId\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}"), 
            mutator.newColumn("categories_purchased", "{\"lv\":[{\"v\":{\"regSiteId\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}"), 
            mutator.newColumn("omer_service", "{\"lv\":[{\"v\":{\"regSiteId\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}"), 
            mutator.newColumn("graphic", "{\"lv\":[{\"v\":{\"regSiteId\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}"), 
            mutator.newColumn("rite_searches", "{\"lv\":[{\"v\":{\"regSiteId\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}") 
            )); 
       } 

       mutator.execute(ConsistencyLevel.ONE); 

      } catch (Exception e) { 
       System.err.println("Threw a Exception in " + e); 
      } finally { 
       Pelops.shutdown(); 
      } 
     } 

     /** 
     * Making a Cassandra Connection by adding nodes 
     * 
     /
     private void cassandraConnection() { 

      Cluster cluster = new Cluster(nodes, 9160); 
      Pelops.addPool(thrift_connection_pool, cluster, keyspace); 

     } 
    } 

每當我跑步ng以上程序,我得到以下例外總是 -

Threw a Exception in java.lang.RuntimeException: exception while registering MBean, com.scale7.cassandra.pelops.pool:type=PooledNode-my_keyspace-localhost 

任何人都可以幫助我與這是什麼錯我在這裏做什麼?我相信我在這裏犯了一些小錯誤?如果我緩慢地跑步,那麼我不會得到這個例外。通過在代碼中加入斷點,我的意思是慢。不知何故很奇怪。

我與卡桑德拉1.2.3

任何幫助工作將不勝感激。

+0

,而不是這個'通信System.err.println(+ E「中扔了異常」);'你實際上可以使用e.printStackTrace(),並在發帖提問,所以我們可以得到更多的細節關於這個問題? – 2013-04-09 09:30:04

回答

1

您正在使用哪個客戶端版本?據我所見,每個線程都會向cassandra創建一個池(同名!),並且每個線程都會關閉Pelops客戶端。

在主類中移動創建池,只創建一個池並從線程訪問它,並且從不調用Pelops.shutdown(),直到最後一個線程執行execute方法。

卡羅

+0

感謝卡羅的建議。之後它工作正常。但我只是想確保我做的事情是對的還是不對?因爲我可能會遺漏一些小的細節。你可以看一下,讓我知道我的更新代碼是否與我創建cassandra連接和集羣的方式一致。謝謝您的幫助。 – ferhan 2013-04-09 19:51:11

+0

據我所見,一切都很好 - 你連接到Cassandra的方式沒問題,你可以添加一些其他的配置,但只有當你需要時......例如 'String nodes =「localhost」; int port = 9_160; boolean nodeDiscovery = true; Config casconf = new Config(port,true,0); Cluster cluster = new Cluster(nodes,casconf,nodeDiscovery); Pelops.addPool(thrift_connection_pool,簇,密鑰空間);' 的Ciao,卡羅 – 2013-04-09 21:22:12

+0

至於 'mutator.newColumn( 「LMD」,將String.valueOf(新的Date()的getTime())。)'我想記住你已經有了每列的時間戳,所以你可能不需要寫它!更多請記住,使用Pelops你可以寫長文字 'mutator.newColumn(「lmd」,Bytes.fromLong(System。currenttimemillis())' – 2013-04-09 21:28:02

相關問題