2017-04-19 57 views
1
package org.apache.spark.examples.kafkaToflink; 

import java.io.ByteArrayOutputStream; 
import java.io.IOException; 
import java.io.OutputStream; 
import java.io.PrintStream; 
import java.nio.charset.StandardCharsets; 
import java.util.Properties; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema; 

import com.microsoft.azure.datalake.store.ADLException; 
import com.microsoft.azure.datalake.store.ADLFileOutputStream; 
import com.microsoft.azure.datalake.store.ADLStoreClient; 
import com.microsoft.azure.datalake.store.IfExists; 
import com.microsoft.azure.datalake.store.oauth2.AccessTokenProvider; 
import com.microsoft.azure.datalake.store.oauth2.ClientCredsTokenProvider; 

import scala.util.parsing.combinator.testing.Str; 

public class App { 

    public static void main(String[] args) throws Exception { 
     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

     Properties properties = new Properties(); 
     properties.setProperty("bootstrap.servers", "192.168.1.72:9092"); 
     properties.setProperty("group.id", "test"); 
     DataStream<String> stream = env.addSource(
       new FlinkKafkaConsumer010<String>("tenant", new SimpleStringSchema(), properties), "Kafka_Source"); 

     stream.addSink(new ADLSink()).name("Custom_Sink").setParallelism(128); 
     env.execute("App"); 
    } 
} 

class ADLSink<String> extends RichSinkFunction<String> { 

    private java.lang.String clientId = "***********"; 
    private java.lang.String authTokenEndpoint = "***************"; 
    private java.lang.String clientKey = "*****************"; 
    private java.lang.String accountFQDN = "****************"; 
    private java.lang.String filename = "/Bitfinex/ETHBTC/ORDERBOOK/ORDERBOOK.json"; 

    @Override 
    public void invoke(String value) { 

     AccessTokenProvider provider = new ClientCredsTokenProvider(authTokenEndpoint, clientId, clientKey); 
     ADLStoreClient client = ADLStoreClient.createClient(accountFQDN, provider); 
     try { 
      client.setPermission(filename, "744"); 
      ADLFileOutputStream stream = client.getAppendStream(filename); 

      System.out.println(value); 
      stream.write(value.toString().getBytes()); 

      stream.close(); 

     } catch (ADLException e) { 

      System.out.println(e.requestId); 
     } catch (Exception e) { 

      System.out.println(e.getMessage()); 
      System.out.println(e.getCause()); 
     } 

    } 

} 

我不斷地嘗試使用while循環附加一個位於Azure數據湖Store中的文件。但有時候會出現這種情況,HTTP500操作APPEND失敗,啓動錯誤或10分鐘後有時會失敗。我正在使用java使用HTTP500操作APPEND失敗?

+0

謝謝你提出的問題。 HTTP 500是一個「服務器」錯誤。我要求ADLS小組進行調查並可能與您聯繫。 –

+0

您能否提供有關您是否(a)使用append或concurrentappend(b)這是發生在單個線程還是多個線程中的信息? –

+0

@AmitKulkarni我正在使用append,這是發生在單線程 –

回答

1

Anubhav,Azure Data Lake流是單寫入器流 - 也就是說,除非您在這些線程之間進行某種形式的同步,否則無法從多個線程寫入相同的流。這是因爲每個寫指定它正在寫入的偏移量,並且對於多個線程,偏移量不一致。

你似乎是從多個線程(.setParallelism(128)呼叫您的代碼)

在你的情況寫,你有兩個選擇:

  1. 寫在每個線程不同的文件。我不知道你的用例,但我們發現,對於很多情況,這是不同線程的自然使用 - 寫入不同的文件。
  2. 如果讓所有線程都寫入同一個文件很重要,那麼您需要稍微重構一下接收器,以便所有實例都引用相同的ADLFileOutputStream,並且您需要確保調用到write()close()是同步的。

現在,這裏存在一個以上的問題 - 你得到的錯誤應該是一個HTPP 4xx錯誤(指示租賃衝突,因爲ADLFileOutputStream小號採集租賃),而不是HTTP 500,它說有一個服務器端問題。爲了解決這個問題,我需要知道您的帳戶名稱和訪問時間。該信息在StackOverflow上共享並不安全,因此請打開支持憑單並參考此問題,以便最終將問題發送給我。

相關問題