0

我想在春季雲數據流量配置DLQ。這裏是流認定中,我如何部署它春季雲數據流DLQ配置不工作

stream create --definition ":someTestTopic > custom-transform 
    --spring.cloud.stream.bindings.input.consumer.headerMode=raw | log --spring.cloud.stream.bindings.input.consumer.headerMode=raw" --name ticktran 


    stream deploy ticktran --properties 
    "apps.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,apps.custom-transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.custom-transform.spring.cloud.stream.bindings.output.destination=test-tran,app.log.spring.cloud.stream.bindings.input.destination=test-tran,app.custom-transform.spring.cloud.stream.kafka.bindings.test-tran.consumer.enableDlq=true" 

在定製變換 - 處理器的代碼,我已經提到

if(out.contains("ERROR")) { 
      throw new RuntimeException("Error "); 
     } 

這意味着,如果消息包含錯誤,那麼RuntimeException的,我要捕捉那些DLQ中的消息。但是,當我運行代碼時,似乎沒有得到名稱爲test-tran的任何Kafka DL隊列。

我是否需要設置更多的屬性,以使DLQ或者我需要改變代碼的東西正確使用DLQ的。

自定義轉換代碼

TransformationServiceApplication.java

import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.hateoas.config.EnableEntityLinks; 

@SpringBootApplication 
@EnableEntityLinks 
public class TransformationServiceApplication { 

    public static void main(String[] args) { 
     SpringApplication.run(TransformationServiceApplication.class, args); 
    } 
} 

TransformationMessageEndPoint.java

@EnableBinding(Processor.class) 
@MessageEndpoint 
public class TransformationMessageEndpoint { 

    private static final String NS = "http://openrisk.com/ingestion/"; 

    AtomicInteger index = new AtomicInteger(1); 
    @ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) 
    public Object process(Message<?> message) { 
     String out = new String((byte[])message.getPayload()); 

     System.out.println("*****" + out); 

     if(out.contains("ERROR")) { 
      throw new RuntimeException("Error "); 
     } 

     return message; 

    } 
} 

的pom.xml

<parent> 
     <groupId>org.springframework.boot</groupId> 
     <artifactId>spring-boot-starter-parent</artifactId> 
     <version>1.3.6.RELEASE</version> 
     <relativePath /> <!-- lookup parent from repository --> 
    </parent> 

    <properties> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 
     <java.version>1.8</java.version> 
    </properties> 

    <dependencies> 
     <dependency> 
      <groupId>org.springframework.cloud</groupId> 
      <artifactId>spring-cloud-starter-dataflow-server-local</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.springframework.cloud</groupId> 
      <artifactId>spring-cloud-starter-stream-kafka</artifactId> 
     </dependency> 
     <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kafka --> 
     <dependency> 
      <groupId>org.springframework.cloud</groupId> 
      <artifactId>spring-cloud-stream-binder-kafka</artifactId> 
      <version>1.0.0.RELEASE</version> 
     </dependency> 
     <dependency> 
      <groupId>org.springframework.cloud</groupId> 
      <artifactId>spring-cloud-stream-test-support</artifactId> 
      <version>1.0.0.BUILD-SNAPSHOT</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.springframework.cloud.stream.module</groupId> 
      <artifactId>spring-cloud-stream-modules-test-support</artifactId> 
      <version>1.0.0.BUILD-SNAPSHOT</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.jena</groupId> 
      <artifactId>jena-core</artifactId> 
      <version>3.1.0</version> 
     </dependency> 
     <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core --> 
     <dependency> 
      <groupId>com.fasterxml.jackson.core</groupId> 
      <artifactId>jackson-core</artifactId> 
      <version>2.8.0</version> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-test</artifactId> 
      <scope>test</scope> 
     </dependency> 

添加國防部ULE

app register --name custom-transform --type processor --uri maven://com.openrisk.openmargin:TransformationService:0.0.1-SNAPSHOT 

添加流

stream create --definition ":someTesstTopic > custom-transform | log " --name ticktran 

部署流

stream deploy ticktran --properties "app.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.custom-transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.custom-transform.spring.cloud.stream.kafka.bindings.input.consumer.enableDlq" 
+0

您正在使用什麼版本的新加坡民防部隊的工作? 'module register'命令超舊;至少6-7個月大。我們早已遠離這些條款。請嘗試最新的[1.1 M1版本](http://docs.spring.io/spring-cloud-dataflow/docs/1.1.0.M1/reference/htmlsingle/)。 –

+0

隨着新版本的工作。謝謝。 –

+0

很高興聽到這個消息。請分享您的最終發現和/或評論,也許還考慮將問題標記爲已解決。 –

回答

0

有你流定義的幾個問題。

  • 部署屬性以app.<app-name>.開頭,但您在少數地方已有apps.<app-name>.
  • 目的地是在SCDF中自動創建的,因此不推薦使用覆蓋默認值。但是,您可以在運行獨立應用程序spring-cloud-stream時執行此操作。
  • 而不是使用自定義的目的地,您可以通過直接默認通道相互作用使DLQ - 參見下面的例子。

流創建--definition 「:someTesssstTopic>變換|日誌」 --name ticktran

流部署ticktran --properties「app.log.spring.cloud.stream.bindings.input。 consumer.headerMode = raw,app.transform.spring.cloud.stream.bindings.input.consumer.headerMode = raw,app.transform.spring.cloud.stream.kafka.bindings.input.consumer.enableDlq「

  • 目標test-tran不在accep表格格式在涉及app.transform.spring.cloud.stream.kafka.bindings.<channelName>.consumer.enableDlq屬性中的引用。
  • 最後,當有一個錯誤,纔會創建了error.<destination>.<group>話題。

我們將通過添加幾個DSL樣本到參考指南:#885

編輯: 我更新了流定義以反映正確的部署屬性前綴。

+0

謝謝破舊。我嘗試過上面的例子,但仍未創建DLQ。在轉換代碼中,我拋出了RunTimeException,並且由於異常,我得到了處理3次的消息,但是在完成所有處理後,我期待在DLQ中轉儲的消息和異常不會發生。我仍然缺少一些屬性? –

+0

我們可以通過編程的方式定義錯誤通道嗎,我看過一些在處理器代碼中定義錯誤通道的例子。什麼是正確的方法來做到這一點,爲什麼需要以編程方式定義錯誤通道。 –

+0

對不起,我在流定義中有錯字 - 我現在糾正了。你可以請重試嗎?此外,如果您可以共享您的自定義處理器代碼(_gh repo?_),我們可以嘗試複製此行爲。 –

0

我改變了數據流的版本1.1 M1 release與下面提及的命令來創建和部署特性,現在

stream create --definition ":someTesstTopic > transform | log " --name ticktran 


stream deploy ticktran --properties "app.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.transform.spring.cloud.stream.kafka.bindings.input.consumer.enableDlq" 

感謝Sabby阿南丹