2017-06-30 54 views
1

我試着去實現Redis的特定於集合類型下沉= MAP彈簧XD Redis的水槽與收藏型MAP

stream create tTEST_GF_SINK --definition "trigger --initialDelay=0 --fixedDelay=1 --timeUnit=MINUTES --payload= 'new Date().toString()' --outputType=application/json | 
    transform --expression='new java.util.Date().toString()'| 
    redis --collectionType=MAP --key=1" --deploy 

對於所有其他集合類型,如LIST,SET,ZSET流能夠寫入這些redis下沉,但是當我使用MAP作爲集合類型時,它會在redis_mapkey上拋出以下錯誤。

2017-06-30T16:47:59-0400 1.3.1.RELEASE ERROR任務調度-3- handler.LoggingHandler - org.springframework.messaging.MessageHandlingException:無法在Redis的集合存儲消息數據; 嵌套的異常是org.springframework.expression.spel.SpelEvaluationException:EL1008E :(pos 8):在'org.springframework.messaging.MessageHeaders'類型的對象上找不到屬性或字段'redis_mapKey' - 可能不公開? 在org.springframework.integration.redis.outbound.RedisStoreWritingMessageHandler.handleMessageInternal(RedisStoreWritingMessageHandler.java:278) 在org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 在org.springframework.integration .dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 在org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) 在org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java :120) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) at org.springframework.integration.ch annel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java: 115) 在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 在org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 在org.springframework.integration。 handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) at org.springfr amework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) 在org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) 在org.springframework.integration.handler.AbstractMessageHandler.handleMessage( AbstractMessageHandler.java:127) 在org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 在org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) 的組織。 springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscriba (org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) at org。在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send( AbstractMessageSendingTemplate。的java:105) 在org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) 在org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) 在org.springframework。 integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) 在org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) 在org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler。 java:127) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) at org.springframework.integration.channel。在org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392)處, at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springfr amework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 在org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) 在org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput( AbstractMessageProducingHandler.java:154) 在org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) 在org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) 的組織。 springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) at org.springframework.integration.dispatcher.Abstra ctDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 在org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) 在org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) 在org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 在org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) 在org.springframework.integration.channel。 AbstractMessageChannel.send(AbstractMessageChannel.java:392) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate .doSend(GenericMessagingTemplate.java:45) 在org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 在org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) 在org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) 在org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) 在org.springframework.integration.handler.AbstractReplyProducingMessageHandler .handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler。的java:127)在 org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 在org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) 在org.springframework。 integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) 在org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 在org.springframework.integration.channel.AbstractMessageChannel.send(摘要消息信道。的java:442)在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) 在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 在org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 在org.springframework.integration.handler。 AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) 在org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) 在org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) 在org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) 在org.spring framework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 在org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 在org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch( UnicastingDispatcher.java:147) 在org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) 在org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 的組織。 springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) 在org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) 在org.springfr amework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 在org.springframework.messaging.core.AbstractMessageSendingTemplate.send( AbstractMessageSendingTemplate.java:105) 在org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:161) 在org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:251) 的組織。 springframework.integration.endpoint.AbstractPollingEndpoint.access $ 000(AbstractPollingEndpoint.java:57) 在org.springframework.integration.endpoint.AbstractPollingEndpoint $ 1.call(AbstractPollingEndpoint.java:176) 在組織.springframework.integration.endpoint.AbstractPollingEndpoint $ 1.call(AbstractPollingEndpoint.java:173) 在org.springframework.integration.endpoint.AbstractPollingEndpoint $波蘭德$ 1.run(AbstractPollingEndpoint.java:330) 在org.springframework.integration.util .ErrorHandlingTaskExecutor $ 1.run(ErrorHandlingTaskExecutor.java:55) 在org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) 在org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java: 51) 在org.springframework.integration.endpoint.AbstractPollingEndpoint $ Poller.run(AbstractPollingEndpoint.java:324) 在org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 的組織。SpringFramework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) at java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask。的java:266) 在java.util.concurrent.ScheduledThreadPoolExecutor中$ $ ScheduledFutureTask.access 201(ScheduledThreadPoolExecutor.java:180) 在java.util.concurrent.ScheduledThreadPoolExecutor中$ ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 在java.util中.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 引起:org.springframework.expression.spel.SpelEvaluationExcept離子:EL1008E :(pos 8):在'org.springframework.messaging.MessageHeaders'類型的對象上找不到屬性或字段'redis_mapKey' - 可能不公開? 在org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:224) 在org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:94) 在org.springframework。 expression.pel.ast.PropertyOrFieldReference.access $ 000(PropertyOrFieldReference.java:46) at org.springframework.expression.spel.ast.PropertyOrFieldReference $ AccessorLValue.getValue(PropertyOrFieldReference.java:374) at org.springframework.expression.spel .ast.CompoundExpression.getValueInternal(CompoundExpression.java:88) at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:120) at org.springframework.expression.spel.standard.SpelExpression.getValue (SpelExpression.java:267) 在org.springframework.integration.redis.outbound.RedisStoreWritingMessageHandler.determineMapKey(RedisStoreWritingMessageHandler.java:415) 在org.springframework.integration.redis.outbound.RedisStoreWritingMessageHandler.writeToMap(RedisStoreWritingMessageHandler.java:379) 在org.springframework。 integration.redis.outbound.RedisStoreWritingMessageHandler.handleMessageInternal(RedisStoreWritingMessageHandler.java:271) ...... 99多個

任何一個可以幫助我

回答

0

與地圖收藏,也有涉及到兩個鍵;地圖存儲的關鍵字以及元素存儲在地圖中的關鍵字。

XD模塊不會將mapKey作爲屬性公開。您可以添加一個上游頭來設置redis_mapKey頭,也可以編輯模塊的配置以添加map-key-expression

例如,編輯xd/modules/sink/redis/config/redis.xml是這樣的...

<beans:beans profile="use-store-expression"> 
    <redis:store-outbound-channel-adapter 
     map-key-expression="payload.substring(17)" 
     key-expression="${keyExpression}" collection-type="${collectionType}" 
     channel="input" connection-factory="redisConnectionFactory" /> 
</beans:beans> 

與...

xd:>stream create foo --definition "time | redis --collectionType=MAP --key='foo'" --deploy 

結果...

127.0.0.1:6379> hgetall "foo" 
1) "52" 
2) "2017-07-01 08:43:52" 
3) "53" 
4) "2017-07-01 08:43:53" 
5) "54" 
6) "2017-07-01 08:43:54" 
7) "55" 
8) "2017-07-01 08:43:55" 
... 
0

@Gary。我試着爲header-richher添加redis_mapkey的建議。它的工作正常。

流創建tTEST_GF_SINK --definition「觸發--initialDelay = 0 --fixedDelay = 1 --timeUnit = MINUTES --payload = '新日期()。的toString()' --outputType =應用/ JSON | header-enricher --headers = {\「redis_mapKey \」:\「payload.substring(17)\」} | transform --expression ='new java.util.Date()。的toString()「| redis的--collectionType = MAP --key = 1" --deploy

127.0.0.1:6379> hgetall 1

1) 「NG()」

2)「星期六7月1 23:29:16 EDT 2017「