2015-09-25 93 views
0

我們不知道爲什麼運行這個簡單的測試時,DataflowAssert失敗:DataflowAssert未通過測試的TableRow

@Test 
    @Category(RunnableOnService.class) 
    public void testTableRow() throws Exception { 
     Pipeline p = TestPipeline.create(); 
     PCollection<TableRow> pCollectionTable1 = p.apply("a",Create.of(TABLEROWS_ARRAY_1)); 
     PCollection<TableRow> pCollectionTable2 = p.apply("b",Create.of(TABLEROWS_ARRAY_2)); 
     PCollection<TableRow> joinedTables = Table.join(pCollectionTable1, pCollectionTable2); 
     DataflowAssert.that(joinedTables).containsInAnyOrder(TABLEROW_TEST); 
     p.run(); 
    } 

我們收到以下異常:

Sep 25, 2015 10:42:50 AM com.google.cloud.dataflow.sdk.testing.DataflowAssert$TwoSideInputAssert$CheckerDoFn processElement 
SEVERE: DataflowAssert failed expectations. 
java.lang.AssertionError: 
    Expected: iterable over [<{id=x}>] in any order 
    but: Not matched: <{id=x}> 
    at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
    at org.junit.Assert.assertThat(Assert.java:865) 
    at org.junit.Assert.assertThat(Assert.java:832) 
    at ... 

爲了簡化在DataflowAssert測試中,我們硬編碼Table.join的輸出匹配DataflowAssert,具有:

private static final TableRow TABLEROW_TEST = new TableRow() 
     .set("id", "x"); 


static PCollection<TableRow> join(PCollection<TableRow> pCollectionTable1, 
     PCollection<TableRow> pCollectionTable2) throws Exception { 

    final TupleTag<String> pCollectionTable1Tag = new TupleTag<String>(); 
    final TupleTag<String> pCollectionTable2Tag = new TupleTag<String>(); 

    PCollection<KV<String, String>> table1Data = pCollectionTable1 
      .apply(ParDo.of(new ExtractTable1DataFn())); 
    PCollection<KV<String, String>> table2Data = pCollectionTable2 
      .apply(ParDo.of(new ExtractTable2DataFn())); 

    PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple 
      .of(pCollectionTable1Tag, table1Data).and(pCollectionTable2Tag, table2Data) 
      .apply(CoGroupByKey.<String> create()); 

    PCollection<KV<String, String>> resultCollection = kvpCollection 
      .apply(ParDo.named("Process join") 
        .of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { 
         private static final long serialVersionUID = 0; 

         @Override 
         public void processElement(ProcessContext c) { 
          // System.out.println(c); 
          KV<String, CoGbkResult> e = c.element(); 
          String key = e.getKey(); 
          String value = null; 
          for (String table1Value : c.element().getValue().getAll(pCollectionTable2Tag)) { 

           for (String table2Value : c.element().getValue().getAll(pCollectionTable2Tag)) { 
            value = table1Value + "," + table2Value; 
           } 
          } 
          c.output(KV.of(key, value)); 
         } 
        })); 

    PCollection<TableRow> formattedResults = resultCollection.apply(
      ParDo.named("Format join").of(new DoFn<KV<String, String>, TableRow>() { 
       private static final long serialVersionUID = 0; 

       public void processElement(ProcessContext c) { 
        TableRow row = new TableRow().set("id", "x"); 
        c.output(row);      
       } 
      })); 

    return formattedResults; 
} 

待辦事項任何人都知道我們做錯了什麼?

+0

望着例外它說,它預期錶行與'id = x',它沒有找到一個。 你爲什麼期望這個特定的輸出?你能否提供更多關於常量值的細節('TABLEROWS_ARRAY_1','TABLEROWS_ARRAY_2','TABLEROW_TEST')和'Table.join'的行爲? –

+0

我添加了更多信息。 –

回答

1

我認爲錯誤信息告訴你,實際的集合包含的元素比期望更多的副本。

Expected: iterable over [<{id=x}>] in any order 
but: Not matched: <{id=x}> 

這是hamcrest表明你想要一個迭代超過一個元素,但實際的集合有一個不匹配的項目。由於所有來自「格式連接」的項目具有相同的值,因此讀取比應該更難。

具體地說,這是當我運行下面的測試產生的信息,它檢查以查看是否具有row兩個副本集合是包含row正好一個拷貝:

@Category(RunnableOnService.class) 
@Test 
public void testTableRow() throws Exception { 
    Pipeline p = TestPipeline.create(); 

    TableRow row = new TableRow().set("id", "x"); 

    PCollection<TableRow> rows = p.apply(Create.<TableRow>of(row, row)); 
    DataflowAssert.that(rows).containsInAnyOrder(row); 

    p.run(); 
} 

在爲了得到你的代碼的結果,我不得不利用這個事實,即你只是迭代table2中的條目。具體做法是:

// Use these as the input tables. 
table1 = [("keyA", "A1a"), ("keyA", "A1b] 
table2 = [("keyA", "A2a"), ("keyA", "A2b"), ("keyB", "B2")] 

// The CoGroupByKey returns 
[("keyA", (["A1a", "A1b"], ["A2a", "A2b"])), 
("keyB", ([], ["B2"]))] 

// When run through "Process join" this produces. 
// For details on why see the next section. 
["A2b,A2b", 
"B2,B2"] 

// When run through "Format join" this becomes the following. 
[{id=x}, {id=x}] 

注意,DoFn爲「過程加入」可能不會產生預期的結果作爲評價如下:

String key = e.getKey(); 
String value = null; 
// NOTE: Both table1Value and table2Value iterate over pCollectionTable2Tag 
for (String table1Value : c.element().getValue().getAll(pCollectionTable2Tag)) { 
    for (String table2Value : c.element().getValue().getAll(pCollectionTable2Tag)) { 
     // NOTE: this updates value, and doesn't output it. So for each 
     // key there will be a single output with the *last* value 
     // rather than one for each pair. 
     value = table1Value + "," + table2Value; 
    } 
} 
c.output(KV.of(key, value));