2016-01-24 79 views
1

我有一個應用程序需要從MemSQL讀取數據並加載到DataFrame。我正在使用memsql-spark-connector來連接該通信。但是,不幸的是,我被困在了我試圖連接到我的memSQL主節點的地步。我連接到memsql主節點的方式有什麼問題? 其實我試圖在我的本地機器上使用mySQL客戶端來登錄到memsql主節點,它工作。所以我想這個問題與服務器端沒有關係。使用MemSQL的錯誤Spark連接器

這裏是我的異常堆棧跟蹤:

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.) 
java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure 

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.) 
at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:2294) 
at org.apache.commons.dbcp2.BasicDataSource.createDataSource(BasicDataSource.java:2039) 
at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:1533) 
at com.memsql.spark.connector.MemSQLConnectionPool$.connect(MemSQLConnectionPool.scala:34) 
at com.memsql.spark.connector.MemSQLConnectionPool$.withConnection(MemSQLConnectionPool.scala:38) 
at com.memsql.spark.connector.MemSQLCluster$$anonfun$withAggregatorConn$1.apply(MemSQLCluster.scala:26) 
at com.memsql.spark.connector.MemSQLCluster$$anonfun$withAggregatorConn$1.apply(MemSQLCluster.scala:26) 
at org.apache.spark.sql.memsql.MemSQLCatalog.getDBTablePairs(MemSQLCatalog.scala:64) 
at org.apache.spark.sql.memsql.MemSQLCatalog.lookupTable(MemSQLCatalog.scala:58) 
at org.apache.spark.sql.memsql.MemSQLCatalog.lookupRelation(MemSQLCatalog.scala:24) 
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:257) 
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:268) 
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:264) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) 
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54) 
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:279) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54) 
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:279) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54) 
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:264) 
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:254) 
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83) 
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80) 
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) 
at scala.collection.immutable.List.foldLeft(List.scala:84) 
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80) 
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72) 
at scala.collection.immutable.List.foreach(List.scala:318) 
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72) 
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:916) 
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:916) 
at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:914) 
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132) 
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) 
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725) 
at org.apache.spark.sql.memsql.MemSQLContext.sql(MemSQLContext.scala:48) 
at org.apache.spark.sql.memsql.MemSQLContext.sql(MemSQLContext.scala:39) 
at MemsqlSparkLoader.load(MemsqlSparkLoader.scala:19) 
at MemsqlSparkLoaderTest$$anonfun$1.apply$mcV$sp(MemsqlSparkLoaderTest.scala:20) 
at MemsqlSparkLoaderTest$$anonfun$1.apply(MemsqlSparkLoaderTest.scala:14) 
at MemsqlSparkLoaderTest$$anonfun$1.apply(MemsqlSparkLoaderTest.scala:14) 
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure 

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. 
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) 
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 
at java.lang.reflect.Constructor.newInstance(Constructor.java:526) 
at com.mysql.jdbc.Util.handleNewInstance(Util.java:377) 
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1036) 
at com.mysql.jdbc.MysqlIO.<init>(MysqlIO.java:338) 
at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2232) 
at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2265) 
at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2064) 
at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:790) 
at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:44) 
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) 
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 
at java.lang.reflect.Constructor.newInstance(Constructor.java:526) 
at com.mysql.jdbc.Util.handleNewInstance(Util.java:377) 
at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:395) 
at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:325) 
at org.apache.commons.dbcp2.DriverConnectionFactory.createConnection(DriverConnectionFactory.java:39) 
at org.apache.commons.dbcp2.PoolableConnectionFactory.makeObject(PoolableConnectionFactory.java:256) 
at org.apache.commons.dbcp2.BasicDataSource.validateConnectionFactory(BasicDataSource.java:2304) 
at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:2290) 
... 126 more 
Caused by: java.net.SocketTimeoutException: connect timed out 
at java.net.PlainSocketImpl.socketConnect(Native Method) 
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) 
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) 
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) 
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
at java.net.Socket.connect(Socket.java:579) 
at com.mysql.jdbc.StandardSocketFactory.connect(StandardSocketFactory.java:213) 
at com.mysql.jdbc.MysqlIO.<init>(MysqlIO.java:297) 
... 142 more  

的代碼段的最後一行下面是哪裏此異常被拋出:

val masterHost:String = "XXX" 
val masterPort:Int = 3307 
val defaultDBName:String = "mydbtest" 
val user:String = "user" 
val passwd:String = "passwd" 

val query:String = "select * from transactions where block_height <= 1000" 

val conf:SparkConf = new SparkConf().setAppName("MemsqlSparkLoaderTest").setMaster("local") 
conf.set("memsql.host", masterHost) 
conf.set("memsql.port", masterPort.toString) 
conf.set("memsql.defaultDatabase", defaultDBName) 
conf.set("memsql.user", user) 
conf.set("memsql.password", passwd) 

val sc:SparkContext = new SparkContext(conf) 
val msc:MemSQLContext = new MemSQLContext(sc) 

val df = msc.sql(query) 

我的主人的memsql.cnf配置文件節點寫成如下:

[server] 
basedir = . 
bind_address = 0.0.0.0 
core_file 
durability = on 
lc_messages_dir = ./share 
lock_wait_timeout = 60 
max_connections = 100000 
plan_expiration_minutes = 180 
redundancy_level = 2 
skip_name_resolve = on 
snapshot_trigger_size = 256m 
socket = memsql.sock 
ssl_cert = /var/lib/memsql/certs/server-cert.pem 
ssl_key = /var/lib/memsql/certs/server-key.pem 
tmpdir = . 
transaction_buffer = 64m 
; ------------------------------------------------------------------------ 
; MEMSQL OPS VARIABLES 
; 
; Variables below this header are controlled by MemSQL Ops. 
; Please do not edit any of these values directly. 
; ------------------------------------------------------------------------ 
master_aggregator 
port = 3307 
+0

您可以發佈MemSQL節點設置(memsql.cnf配置) –

+0

@張貼在我的問題的身體萊特 - 獵鷹強制。這是我memsql集羣主節點上的memsql.cnf文件。 –

+0

檢查:http://stackoverflow.com/questions/6865538/solving-a-communications-link-failure-with-jdbc-and-mysql –

回答

0

看來你的ap折is不能連接到數據庫。您的應用程序可能沒有網絡訪問您的數據庫,或者防火牆阻止您的應用程序需要使用的某些端口。另外,請嘗試使用主IP而不是主主機名。我認爲你說「這個問題與服務器端無關」是正確的。

請讓我知道,如果這解決了你的問題。 :)

+0

我使用IP而不是主機名本身。但是,當我打'的mysql -h -P -u -p'我可以成功連接到數據庫memsql。考慮到這種情況,你是否認爲網絡訪問問題仍然存在?至少有一個例外是與網絡有關的。 –

2

我也有同樣的問題。

我的MySQL客戶端可以從我的Spark主服務器連接到我的數據庫服務器,但是我的Spark從服務器無法連接到我的數據庫服務器。

我使用了AWS RDS服務。我爲ElasticMapReduce-slave向RDS安全組添加了授權,我的問題已解決。