2016-12-01 63 views
0

我已成功實施名爲hello_sample的簡單Java Amazon SWF示例。我創建了ActivityWorker可執行文件,用於輪詢SWF以處理活動任務。我創建了WorkflowWorker可執行文件來輪詢SWF進行決策任務,並且我有一個WorkflowStarter可執行文件來啓動工作流執行。它像廣告一樣工作。我不明白的是,如何配置和添加第二個活動以在第一個活動之後運行?
WorkflowWorker:如何在Amazon SWF中添加第二個活動hello_sample示例

public class WorkflowWorker { 
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient(); 
public static void main(String[] args) { 
    PollForDecisionTaskRequest task_request = 
     new PollForDecisionTaskRequest() 
      .withDomain(Constants.DOMAIN) 
      .withTaskList(new TaskList().withName(Constants.TASKLIST)); 

    while (true) { 
     System.out.println(
       "WorkflowWorker is polling for a decision task from the tasklist '" + 
       Constants.TASKLIST + "' in the domain '" + 
       Constants.DOMAIN + "'."); 

     DecisionTask task = swf.pollForDecisionTask(task_request); 

     String taskToken = task.getTaskToken(); 
     if (taskToken != null) { 
      try { 
       executeDecisionTask(taskToken, task.getEvents()); 
      } 
      catch (Throwable th) { 
       th.printStackTrace(); 
      } 
     } 
    } 
} 

private static void executeDecisionTask(String taskToken, List<HistoryEvent> events) throws Throwable { 
    List<Decision> decisions = new ArrayList<Decision>(); 
    String workflow_input = null; 
    int scheduled_activities = 0; 
    int open_activities = 0; 
    boolean activity_completed = false; 
    String result = null; 

    System.out.println("WorkflowWorker is executing the decision task for the history events: ["); 
    for (HistoryEvent event : events) { 
     System.out.println(" " + event); 
     switch(event.getEventType()) { 
      case "WorkflowExecutionStarted": 
       workflow_input = event.getWorkflowExecutionStartedEventAttributes().getInput(); 
       break; 
      case "ActivityTaskScheduled": 
       scheduled_activities++; 
       break; 
      case "ScheduleActivityTaskFailed": 
       scheduled_activities--; 
       break; 
      case "ActivityTaskStarted": 
       scheduled_activities--; 
       open_activities++; 
       break; 
      case "ActivityTaskCompleted": 
       open_activities--; 
       activity_completed = true; 
       result = event.getActivityTaskCompletedEventAttributes().getResult(); 
       break; 
      case "ActivityTaskFailed": 
       open_activities--; 
       break; 
      case "ActivityTaskTimedOut": 
       open_activities--; 
       break; 
     } 
    } 
    System.out.println("]"); 

    if (activity_completed) { 
     decisions.add(
      new Decision() 
       .withDecisionType(DecisionType.CompleteWorkflowExecution) 
       .withCompleteWorkflowExecutionDecisionAttributes(
        new CompleteWorkflowExecutionDecisionAttributes() 
         .withResult(result))); 
    } 
    else { 
     if (open_activities == 0 && scheduled_activities == 0) { 
      ScheduleActivityTaskDecisionAttributes attrs = 
       new ScheduleActivityTaskDecisionAttributes() 
        .withActivityType(new ActivityType() 
         .withName(Constants.ACTIVITY) 
         .withVersion(Constants.ACTIVITY_VERSION)) 
        .withActivityId(UUID.randomUUID().toString()) 
        .withInput(workflow_input); 

      decisions.add(
        new Decision() 
         .withDecisionType(DecisionType.ScheduleActivityTask) 
         .withScheduleActivityTaskDecisionAttributes(attrs)); 
     } 
     else { 
      // an instance of HelloActivity is already scheduled or running. Do nothing, another 
      // task will be scheduled once the activity completes, fails or times out 
     } 
    } 

    System.out.println("WorkflowWorker is exiting the decision task with the decisions " + decisions); 
    swf.respondDecisionTaskCompleted(
     new RespondDecisionTaskCompletedRequest() 
      .withTaskToken(taskToken) 
      .withDecisions(decisions)); 
} 

}

ActivityWorker:

public class ActivityWorker { 
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient(); 
private static CountDownLatch waitForTermination = new CountDownLatch(1); 
private static volatile boolean terminate = false; 

private static String executeActivityTask(String g_species) throws Throwable { 
    String s = " ******** Hello, " + g_species + "!"; 
    System.out.println(s); 

    String cwd = Paths.get(".").toAbsolutePath().normalize().toString(); 
    String filename = "g_species.txt"; 
    Path filePath = Paths.get(cwd, filename); 
    String filePathName = filePath.toString(); 

    BufferedWriter output = null; 
    try { 
     File file = new File (filePathName); 
     output = new BufferedWriter(new FileWriter(file)); 
     output.write(g_species); 
    } 
    catch (IOException e) { 
     e.printStackTrace(); 
    } 
    finally { 
     if (output != null) { 
     output.close(); 
     } 
    } 

    return g_species; 
} 

public static void main(String[] args) { 
    Runtime.getRuntime().addShutdownHook(new Thread() { 
     @Override 
     public void run() { 
      try { 
       terminate = true; 
       System.out.println("ActivityWorker is waiting for the current poll request to return before shutting down."); 
       waitForTermination.await(60, TimeUnit.SECONDS); 
      } 
      catch (InterruptedException e) { 
       // ignore 
       System.out.println(e.getMessage()); 
      } 
     } 
    }); 
    try { 
     pollAndExecute(); 
    } 
    finally { 
     waitForTermination.countDown(); 
    } 
} 

public static void pollAndExecute() { 
    while (!terminate) { 
     System.out.println("ActivityWorker is polling for an activity task from the tasklist '" 
       + Constants.TASKLIST + "' in the domain '" + Constants.DOMAIN + "'."); 

     ActivityTask task = swf.pollForActivityTask(new PollForActivityTaskRequest() 
      .withDomain(Constants.DOMAIN) 
      .withTaskList(new TaskList().withName(Constants.TASKLIST))); 

     String taskToken = task.getTaskToken(); 

     if (taskToken != null) { 
      String result = null; 
      Throwable error = null; 

      try { 
       System.out.println("ActivityWorker is executing the activity task with input '" + task.getInput() + "'."); 
       result = executeActivityTask(task.getInput()); 
      } 
      catch (Throwable th) { 
       error = th; 
      } 

      if (error == null) { 
       System.out.println("The activity task succeeded with result '" + result + "'."); 
       swf.respondActivityTaskCompleted(
        new RespondActivityTaskCompletedRequest() 
         .withTaskToken(taskToken) 
         .withResult(result)); 
      } 
      else { 
       System.out.println("The activity task failed with the error '" 
         + error.getClass().getSimpleName() + "'."); 
       swf.respondActivityTaskFailed(
        new RespondActivityTaskFailedRequest() 
         .withTaskToken(taskToken) 
         .withReason(error.getClass().getSimpleName()) 
         .withDetails(error.getMessage())); 
      } 
     } 
    } 
} 

}

WorkflowStarter那踢它全部關閉:

public class WorkflowStarter { 
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient(); 
public static final String WORKFLOW_EXECUTION = "HelloWorldWorkflowExecution"; 

public static void main(String[] args) { 

    String workflow_input = "Amazon SWF"; 
    if (args.length > 0) { 
     workflow_input = args[0]; 
    } 

    System.out.println("Starting the workflow execution '" + WORKFLOW_EXECUTION + 
      "' with input '" + workflow_input + "'."); 

    WorkflowType wf_type = new WorkflowType() 
     .withName(Constants.WORKFLOW) 
     .withVersion(Constants.WORKFLOW_VERSION); 

    Run run = swf.startWorkflowExecution(new StartWorkflowExecutionRequest() 
     .withDomain(Constants.DOMAIN) 
     .withWorkflowType(wf_type) 
     .withWorkflowId(WORKFLOW_EXECUTION) 
     .withInput(workflow_input) 
     .withExecutionStartToCloseTimeout("90")); 

    System.out.println("Workflow execution started with the run id '" + 
      run.getRunId() + "'."); 
} 

}

回答

1

我會建議不要重新發明輪子,並使用亞馬遜官方支持的AWS Flow Framework for Java。它已經實現了所有低級別的細節,並允許您直接關注工作流的業務邏輯。

這是一個使用三項活動(取自developer guide)的示例工作流程。

活動界面:

import com.amazonaws.services.simpleworkflow.flow.annotations.Activities; 
import com.amazonaws.services.simpleworkflow.flow.annotations.ActivityRegistrationOptions; 

@ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = 300, 
          defaultTaskStartToCloseTimeoutSeconds = 10) 
@Activities(version="1.0") 

public interface GreeterActivities { 
    public String getName(); 
    public String getGreeting(String name); 
    public void say(String what); 
} 

活動實施:

public class GreeterActivitiesImpl implements GreeterActivities { 
    @Override 
    public String getName() { 
     return "World"; 
    } 
    @Override 
    public String getGreeting(String name) { 
     return "Hello " + name; 
    } 
    @Override 
    public void say(String what) { 
     System.out.println(what); 
    } 
} 

工作流程界面:

import com.amazonaws.services.simpleworkflow.flow.annotations.Execute; 
import com.amazonaws.services.simpleworkflow.flow.annotations.Workflow; 
import com.amazonaws.services.simpleworkflow.flow.annotations.WorkflowRegistrationOptions; 

@Workflow 
@WorkflowRegistrationOptions(defaultExecutionStartToCloseTimeoutSeconds = 3600) 
public interface GreeterWorkflow { 
    @Execute(version = "1.0") 
    public void greet(); 
} 

工作流程實現:

import com.amazonaws.services.simpleworkflow.flow.core.Promise; 

public class GreeterWorkflowImpl implements GreeterWorkflow { 
    private GreeterActivitiesClient operations = new GreeterActivitiesClientImpl(); 

    public void greet() { 
    Promise<String> name = operations.getName(); 
    Promise<String> greeting = operations.getGreeting(name); 
    operations.say(greeting); 
    } 
} 

承載他們兩個的工作人員。顯然,它可以分成獨立的活動和工作流程工人:

import com.amazonaws.ClientConfiguration; 
import com.amazonaws.auth.AWSCredentials; 
import com.amazonaws.auth.BasicAWSCredentials; 
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow; 
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient; 
import com.amazonaws.services.simpleworkflow.flow.ActivityWorker; 
import com.amazonaws.services.simpleworkflow.flow.WorkflowWorker; 

public class GreeterWorker { 
    public static void main(String[] args) throws Exception { 
    ClientConfiguration config = new ClientConfiguration().withSocketTimeout(70*1000); 

    String swfAccessId = System.getenv("AWS_ACCESS_KEY_ID"); 
    String swfSecretKey = System.getenv("AWS_SECRET_KEY"); 
    AWSCredentials awsCredentials = new BasicAWSCredentials(swfAccessId, swfSecretKey); 

    AmazonSimpleWorkflow service = new AmazonSimpleWorkflowClient(awsCredentials, config); 
    service.setEndpoint("https://swf.us-east-1.amazonaws.com"); 

    String domain = "helloWorldWalkthrough"; 
    String taskListToPoll = "HelloWorldList"; 

    ActivityWorker aw = new ActivityWorker(service, domain, taskListToPoll); 
    aw.addActivitiesImplementation(new GreeterActivitiesImpl()); 
    aw.start(); 

    WorkflowWorker wfw = new WorkflowWorker(service, domain, taskListToPoll); 
    wfw.addWorkflowImplementationType(GreeterWorkflowImpl.class); 
    wfw.start(); 
    } 
} 

工作流程首發:

import com.amazonaws.ClientConfiguration; 
import com.amazonaws.auth.AWSCredentials; 
import com.amazonaws.auth.BasicAWSCredentials; 
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow; 
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient; 

public class GreeterMain { 

    public static void main(String[] args) throws Exception { 
    ClientConfiguration config = new ClientConfiguration().withSocketTimeout(70*1000); 

    String swfAccessId = System.getenv("AWS_ACCESS_KEY_ID"); 
    String swfSecretKey = System.getenv("AWS_SECRET_KEY"); 
    AWSCredentials awsCredentials = new BasicAWSCredentials(swfAccessId, swfSecretKey); 

    AmazonSimpleWorkflow service = new AmazonSimpleWorkflowClient(awsCredentials, config); 
    service.setEndpoint("https://swf.us-east-1.amazonaws.com"); 

    String domain = "helloWorldWalkthrough"; 

    GreeterWorkflowClientExternalFactory factory = new GreeterWorkflowClientExternalFactoryImpl(service, domain); 
    GreeterWorkflowClientExternal greeter = factory.getClient("someID"); 
    greeter.greet(); 
    } 
} 
+0

我使用的可能是舊的代碼示例,它既不使用也不承諾@Actvities –

+0

然後添加更多信息給你的問題。你使用什麼語言和框架?你的代碼看起來像什麼? –

+0

我正在使用Java和com.amazonaws.services.simpleworkflow –

相關問題