0
我們不知道爲什麼運行這個簡單的測試時,DataflowAssert失敗:DataflowAssert未通過測試的TableRow
@Test
@Category(RunnableOnService.class)
public void testTableRow() throws Exception {
Pipeline p = TestPipeline.create();
PCollection<TableRow> pCollectionTable1 = p.apply("a",Create.of(TABLEROWS_ARRAY_1));
PCollection<TableRow> pCollectionTable2 = p.apply("b",Create.of(TABLEROWS_ARRAY_2));
PCollection<TableRow> joinedTables = Table.join(pCollectionTable1, pCollectionTable2);
DataflowAssert.that(joinedTables).containsInAnyOrder(TABLEROW_TEST);
p.run();
}
我們收到以下異常:
Sep 25, 2015 10:42:50 AM com.google.cloud.dataflow.sdk.testing.DataflowAssert$TwoSideInputAssert$CheckerDoFn processElement
SEVERE: DataflowAssert failed expectations.
java.lang.AssertionError:
Expected: iterable over [<{id=x}>] in any order
but: Not matched: <{id=x}>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at org.junit.Assert.assertThat(Assert.java:865)
at org.junit.Assert.assertThat(Assert.java:832)
at ...
爲了簡化在DataflowAssert測試中,我們硬編碼Table.join的輸出匹配DataflowAssert,具有:
private static final TableRow TABLEROW_TEST = new TableRow()
.set("id", "x");
static PCollection<TableRow> join(PCollection<TableRow> pCollectionTable1,
PCollection<TableRow> pCollectionTable2) throws Exception {
final TupleTag<String> pCollectionTable1Tag = new TupleTag<String>();
final TupleTag<String> pCollectionTable2Tag = new TupleTag<String>();
PCollection<KV<String, String>> table1Data = pCollectionTable1
.apply(ParDo.of(new ExtractTable1DataFn()));
PCollection<KV<String, String>> table2Data = pCollectionTable2
.apply(ParDo.of(new ExtractTable2DataFn()));
PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
.of(pCollectionTable1Tag, table1Data).and(pCollectionTable2Tag, table2Data)
.apply(CoGroupByKey.<String> create());
PCollection<KV<String, String>> resultCollection = kvpCollection
.apply(ParDo.named("Process join")
.of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
// System.out.println(c);
KV<String, CoGbkResult> e = c.element();
String key = e.getKey();
String value = null;
for (String table1Value : c.element().getValue().getAll(pCollectionTable2Tag)) {
for (String table2Value : c.element().getValue().getAll(pCollectionTable2Tag)) {
value = table1Value + "," + table2Value;
}
}
c.output(KV.of(key, value));
}
}));
PCollection<TableRow> formattedResults = resultCollection.apply(
ParDo.named("Format join").of(new DoFn<KV<String, String>, TableRow>() {
private static final long serialVersionUID = 0;
public void processElement(ProcessContext c) {
TableRow row = new TableRow().set("id", "x");
c.output(row);
}
}));
return formattedResults;
}
待辦事項任何人都知道我們做錯了什麼?
望着例外它說,它預期錶行與'id = x',它沒有找到一個。 你爲什麼期望這個特定的輸出?你能否提供更多關於常量值的細節('TABLEROWS_ARRAY_1','TABLEROWS_ARRAY_2','TABLEROW_TEST')和'Table.join'的行爲? –
我添加了更多信息。 –