2010-02-01 92 views
40

類似的問題:包裝紙異步計算到同步(阻塞)計算

我有一個方法的對象我想暴露到庫客戶機(尤其是腳本客戶端)如下所示:

interface MyNiceInterface 
{ 
    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg); 
    public Future<Baz> doSomething(Foo fooArg, Bar barArg); 
    // doSomethingAndBlock is the straightforward way; 
    // doSomething has more control but deals with 
    // a Future and that might be too much hassle for 
    // scripting clients 
} 

但原始的「東西」我可以是一組事件驅動類:

interface BazComputationSink 
{ 
    public void onBazResult(Baz result); 
} 

class ImplementingThing 
{ 
    public void doSomethingAsync(Foo fooArg, Bar barArg, BazComputationSink sink); 
} 

其中ImplementingThing需要投入,做一些神祕的東西,就像一個任務隊列進行排隊的東西,再後來當結果出現,sink.onBazResult()被調用的線程可能與調用ImplementingThing.doSomethingAsync()的線程相同或不同。

有沒有一種方法可以使用事件驅動的函數以及併發基元來實現MyNiceInterface,以便腳本客戶端可以愉快地等待阻塞線程?

編輯:我可以使用FutureTask嗎?

回答

41

使用自己的未來實行:

public class BazComputationFuture implements Future<Baz>, BazComputationSink { 

    private volatile Baz result = null; 
    private volatile boolean cancelled = false; 
    private final CountDownLatch countDownLatch; 

    public BazComputationFuture() { 
     countDownLatch = new CountDownLatch(1); 
    } 

    @Override 
    public boolean cancel(final boolean mayInterruptIfRunning) { 
     if (isDone()) { 
      return false; 
     } else { 
      countDownLatch.countDown(); 
      cancelled = true; 
      return !isDone(); 
     } 
    } 

    @Override 
    public Baz get() throws InterruptedException, ExecutionException { 
     countDownLatch.await(); 
     return result; 
    } 

    @Override 
    public Baz get(final long timeout, final TimeUnit unit) 
      throws InterruptedException, ExecutionException, TimeoutException { 
     countDownLatch.await(timeout, unit); 
     return result; 
    } 

    @Override 
    public boolean isCancelled() { 
     return cancelled; 
    } 

    @Override 
    public boolean isDone() { 
     return countDownLatch.getCount() == 0; 
    } 

    public void onBazResult(final Baz result) { 
     this.result = result; 
     countDownLatch.countDown(); 
    } 

} 

public Future<Baz> doSomething(Foo fooArg, Bar barArg) { 
    BazComputationFuture future = new BazComputationFuture(); 
    doSomethingAsync(fooArg, barArg, future); 
    return future; 
} 

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) { 
    return doSomething(fooArg, barArg).get(); 
} 

該解決方案在內部創建一個CountDownLatch,一旦收到回調就會清除它。如果用戶調用get,則CountDownLatch用於阻止調用線程,直到計算完成並調用onBazResult回調。 CountDownLatch將確保如果在調用get()之前發生回調,get()方法將立即返回結果。

+0

(+1)很好的解決方案 – skaffman 2010-02-01 22:27:34

+0

+1,因爲我認爲我理解它......但是您能解釋併發性方面嗎? (使用倒計時鎖存器) – 2010-02-01 22:58:10

+0

你也在onBazResult()中的某個地方缺少「done = true」...... – 2010-02-01 23:01:15

14

那麼,有做類似的簡單的解決方案:

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) { 
    final AtomicReference<Baz> notifier = new AtomicReference(); 
    doSomethingAsync(fooArg, barArg, new BazComputationSink() { 
    public void onBazResult(Baz result) { 
     synchronized (notifier) { 
     notifier.set(result); 
     notifier.notify(); 
     } 
    } 
    }); 
    synchronized (notifier) { 
    while (notifier.get() == null) 
     notifier.wait(); 
    } 
    return notifier.get(); 
} 

當然,這裏假設你的Baz結果永遠不會爲空...

+0

這個工程,是一個不錯的優雅模式。謝謝。 – 2013-12-11 00:23:44

+0

內部回調(BazComputationSink(){}),它會抱怨 - 初始化通知程序。如果初始化爲null,則會爲第一次命中引發NullPointerException,導致我的結果還沒有準備好回到異步任務中。 – 2014-06-19 01:46:06

+1

這是不推薦的:如果你的回調立即返回,在等待之前會通知通知,並且你會遇到死鎖(如果你有一個緩存邏輯,這可能會發生) – for3st 2015-09-14 14:05:48

11

谷歌guava library有一個易於使用的SettableFuture,使這個問題很簡單(約10行代碼)。

public class ImplementingThing { 

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) { 
    try { 
     return doSomething(fooArg, barArg).get(); 
    } catch (Exception e) { 
     throw new RuntimeException("Oh dear"); 
    } 
}; 

public Future<Baz> doSomething(Foo fooArg, Bar barArg) { 
    final SettableFuture<Baz> future = new SettableFuture<Baz>(); 
    doSomethingAsync(fooArg, barArg, new BazComputationSink() { 
     @Override 
     public void onBazResult(Baz result) { 
      future.set(result); 
     } 
    }); 
    return future; 
}; 

// Everything below here is just mock stuff to make the example work, 
// so you can copy it into your IDE and see it run. 

public static class Baz {} 
public static class Foo {} 
public static class Bar {} 

public static interface BazComputationSink { 
    public void onBazResult(Baz result); 
} 

public void doSomethingAsync(Foo fooArg, Bar barArg, final BazComputationSink sink) { 
    new Thread(new Runnable() { 
     @Override 
     public void run() { 
      try { 
       Thread.sleep(4000); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      Baz baz = new Baz(); 
      sink.onBazResult(baz); 
     } 
    }).start(); 
}; 

public static void main(String[] args) { 
    System.err.println("Starting Main"); 
    System.err.println((new ImplementingThing()).doSomethingAndBlock(null, null)); 
    System.err.println("Ending Main"); 
} 
3

一個很簡單的例子,只是爲了瞭解CountDownLatch沒有任何 額外的代碼。

A java.util.concurrent.CountDownLatch是一個併發結構,它允許一個或多個線程等待給定的一組操作來完成。

A CountDownLatch用給定的計數初始化。該計數通過調用countDown()方法遞減。等待此計數達到零的線程可以調用await()方法之一。調用await()會阻塞該線程,直到計數達到零。

下面是一個簡單的例子。在減價者在CountDownLatch上呼叫countDown()三次後,等待的服務員從await()呼叫中釋放。

你也可以提一些TimeOut等待。

CountDownLatch latch = new CountDownLatch(3); 

Waiter  waiter  = new Waiter(latch); 
Decrementer decrementer = new Decrementer(latch); 

new Thread(waiter)  .start(); 
new Thread(decrementer).start(); 

Thread.sleep(4000); 
public class Waiter implements Runnable{ 

    CountDownLatch latch = null; 

    public Waiter(CountDownLatch latch) { 
     this.latch = latch; 
    } 

    public void run() { 
     try { 
      latch.await(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

     System.out.println("Waiter Released"); 
    } 
} 

// --------------

public class Decrementer implements Runnable { 

    CountDownLatch latch = null; 

    public Decrementer(CountDownLatch latch) { 
     this.latch = latch; 
    } 

    public void run() { 

     try { 
      Thread.sleep(1000); 
      this.latch.countDown(); 

      Thread.sleep(1000); 
      this.latch.countDown(); 

      Thread.sleep(1000); 
      this.latch.countDown(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

Reference

如果你不想使用CountDownLatch或你的要求是什麼與Facebook相似並且與功能不同。意思是如果一個方法被調用,那麼不要調用其他方法。

在這種情況下,你可以聲明

private volatile Boolean isInprocessOfLikeOrUnLike = false; 

然後你就可以在你的方法調用的開頭,如果它是false然後調用方法,否則返回..取決於你的實施檢查。

+0

volatile布爾值?我想你想要一個易變的布爾值(lowecase b),或者更好:AtomicReference – 2016-06-02 15:36:58

2

下面是基於保羅Wagland的回答更通用的解決方案:

public abstract class AsyncRunnable<T> { 
    protected abstract void run(AtomicReference<T> notifier); 

    protected final void finish(AtomicReference<T> notifier, T result) { 
     synchronized (notifier) { 
      notifier.set(result); 
      notifier.notify(); 
     } 
    } 

    public static <T> T wait(AsyncRunnable<T> runnable) { 
     final AtomicReference<T> notifier = new AtomicReference<>(); 

     // run the asynchronous code 
     runnable.run(notifier); 

     // wait for the asynchronous code to finish 
     synchronized (notifier) { 
      while (notifier.get() == null) { 
       try { 
        notifier.wait(); 
       } catch (InterruptedException ignore) {} 
      } 
     } 

     // return the result of the asynchronous code 
     return notifier.get(); 
    } 
} 

下面是一個例子,如何使用它::

String result = AsyncRunnable.wait(new AsyncRunnable<String>() { 
     @Override 
     public void run(final AtomicReference<String> notifier) { 
      // here goes your async code, e.g.: 
      new Thread(new Runnable() { 
       @Override 
       public void run() { 
        finish(notifier, "This was a asynchronous call!"); 
       } 
      }).start(); 
     } 
    }); 

代碼的更詳細的版本可以在這裏找到: http://pastebin.com/hKHJUBqE

編輯: 與問題相關的示例將是:

public Baz doSomethingAndBlock(final Foo fooArg, final Bar barArg) { 
    return AsyncRunnable.wait(new AsyncRunnable<Baz>() { 
     @Override 
     protected void run(final AtomicReference<Baz> notifier) { 
      doSomethingAsync(fooArg, barArg, new BazComputationSink() { 
       public void onBazResult(Baz result) { 
        synchronized (notifier) { 
         notifier.set(result); 
         notifier.notify(); 
        } 
       } 
      }); 
     } 
    }); 
} 
+0

我理解你的答案,但我不知道如何將它映射到我六年前的問題。 – 2016-03-15 03:12:11

+0

我更新了答案,所以你可以將它映射到問題 – 2016-03-15 03:31:18

+0

順便說一句,如果你必須在可能的地方這樣做,我會考慮使用這個:https://github.com/ReactiveX/RxJava。儘管它的學習曲線陡峭(至少對我而言),但ReactiveX仍然是您的問題的完美選擇。這是一個很好的介紹:https://www.youtube.com/watch?v = _t06LRX0DV0。 – 2016-03-15 03:41:04

2

這是死的簡單與RxJava 2.X:

try { 
    Baz baz = Single.create((SingleEmitter<Baz> emitter) -> 
      doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result))) 
      .toFuture().get(); 
} catch (InterruptedException e) { 
    e.printStackTrace(); 
} catch (ExecutionException e) { 
    e.printStackTrace(); 
} 

或者不LAMBDA符號:

Baz baz = Single.create(new SingleOnSubscribe<Baz>() { 
       @Override 
       public void subscribe(SingleEmitter<Baz> emitter) { 
        doSomethingAsync(fooArg, barArg, new BazComputationSink() { 
         @Override 
         public void onBazResult(Baz result) { 
          emitter.onSuccess(result); 
         } 
        }); 
       } 
      }).toFuture().get(); 

更簡單:

Baz baz = Single.create((SingleEmitter<Baz> emitter) -> 
       doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result))) 
       .blockingGet();