2016-08-18 81 views
2

我正在使用Spring AWS Cloud Framework來輪詢隊列上的S3事件通知。我正在使用QueueMessagingTemplate來做到這一點。我希望能夠設置消息的最大數量並等待輪詢時間,請參閱:http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html在Spring中設置SNS WaitTime AWS Cloud Framework

import com.amazonaws.services.s3.event.S3EventNotification; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate; 

public class MyQueue { 

    private QueueMessagingTemplate queueMsgTemplate; 

    @Autowired 
    public MyQueue(QueueMessagingTemplate queueMsgTemplate) { 
     this.queueMsgTemplate = queueMsgTemplate; 

    } 

    @Override 
    public S3EventNotification poll() { 
     S3EventNotification s3Event = queueMsgTemplate 
      .receiveAndConvert("myQueueName", S3EventNotification.class); 
    } 
} 

語境

@Bean 
public AWSCredentialsProviderChain awsCredentialsProviderChain() { 
    return new AWSCredentialsProviderChain(
      new DefaultAWSCredentialsProviderChain()); 
} 

@Bean 
public ClientConfiguration clientConfiguration() { 
    return new ClientConfiguration(); 
} 

@Bean 
public AmazonSQS sqsClient(ClientConfiguration clientConfiguration,// UserData userData, 
          AWSCredentialsProviderChain credentialsProvider) { 
    AmazonSQSClient amazonSQSClient = new AmazonSQSClient(credentialsProvider, clientConfiguration); 
    amazonSQSClient.setEndpoint("http://localhost:9324"); 
    return amazonSQSClient; 
} 

@Bean 
public QueueMessagingTemplate queueMessagingTemplate(AmazonSQS sqsClient) { 
    return new QueueMessagingTemplate(sqsClient); 
} 

不知道如何配置這些?由於

回答

2

QueueMessagingTemplate.receiveAndConvert()是基於QueueMessageChannel.receive()方法,你可以找到所需代碼:

@Override 
public Message<String> receive() { 
    return this.receive(0); 
} 

@Override 
public Message<String> receive(long timeout) { 
    ReceiveMessageResult receiveMessageResult = this.amazonSqs.receiveMessage(
      new ReceiveMessageRequest(this.queueUrl). 
        withMaxNumberOfMessages(1). 
        withWaitTimeSeconds(Long.valueOf(timeout).intValue()). 
        withAttributeNames(ATTRIBUTE_NAMES). 
        withMessageAttributeNames(MESSAGE_ATTRIBUTE_NAMES)); 
    if (receiveMessageResult.getMessages().isEmpty()) { 
     return null; 
    } 
    com.amazonaws.services.sqs.model.Message amazonMessage = receiveMessageResult.getMessages().get(0); 
    Message<String> message = createMessage(amazonMessage); 
    this.amazonSqs.deleteMessage(new DeleteMessageRequest(this.queueUrl, amazonMessage.getReceiptHandle())); 
    return message; 
} 

所以,當你看到withMaxNumberOfMessages(1)被硬編碼到1。這是正確的,因爲只能輪詢一條消息。 withWaitTimeSeconds(Long.valueOf(timeout).intValue())完全等於提供的timeout。呃,在receiveAndConvert()的情況下你不能修改它。

考慮使用QueueMessagingTemplate中的QueueMessageChannel.receive(long timeout)messageConverter。就像是在完成的:

public <T> T receiveAndConvert(QueueMessageChannel destination, Class<T> targetClass) throws MessagingException { 
    Message<?> message = destination.receive(); 
    if (message != null) { 
     return (T) getMessageConverter().fromMessage(message, targetClass); 
    } else { 
     return null; 
    } 
} 

可以達到通過代碼適當QueueMessageChannel

String physicalResourceId = this.destinationResolver.resolveDestination(destination); 
new QueueMessageChannel(this.amazonSqs, physicalResourceId);