001
014
015 package com.liferay.portal.fabric.local.agent;
016
017 import com.liferay.portal.fabric.agent.FabricAgent;
018 import com.liferay.portal.fabric.local.worker.LocalFabricWorker;
019 import com.liferay.portal.fabric.status.FabricStatus;
020 import com.liferay.portal.fabric.status.LocalFabricStatus;
021 import com.liferay.portal.fabric.worker.FabricWorker;
022 import com.liferay.portal.kernel.concurrent.FutureListener;
023 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
024 import com.liferay.portal.kernel.process.ProcessCallable;
025 import com.liferay.portal.kernel.process.ProcessConfig;
026 import com.liferay.portal.kernel.process.ProcessException;
027 import com.liferay.portal.kernel.process.ProcessExecutor;
028
029 import java.io.Serializable;
030
031 import java.util.Collection;
032 import java.util.Collections;
033 import java.util.Queue;
034 import java.util.concurrent.ConcurrentLinkedQueue;
035 import java.util.concurrent.Future;
036
037
040 public class LocalFabricAgent implements FabricAgent {
041
042 public LocalFabricAgent(ProcessExecutor processExecutor) {
043 _processExecutor = processExecutor;
044 }
045
046 @Override
047 public <T extends Serializable> FabricWorker<T> execute(
048 ProcessConfig processConfig, ProcessCallable<T> processCallable)
049 throws ProcessException {
050
051 final FabricWorker<T> fabricWorker = new LocalFabricWorker<>(
052 _processExecutor.execute(processConfig, processCallable));
053
054 _fabricWorkerQueue.add(fabricWorker);
055
056 NoticeableFuture<T> noticeableFuture =
057 fabricWorker.getProcessNoticeableFuture();
058
059 noticeableFuture.addFutureListener(
060 new FutureListener<T>() {
061
062 @Override
063 public void complete(Future<T> future) {
064 _fabricWorkerQueue.remove(fabricWorker);
065 }
066
067 });
068
069 return fabricWorker;
070 }
071
072 @Override
073 public FabricStatus getFabricStatus() {
074 return LocalFabricStatus.INSTANCE;
075 }
076
077 @Override
078 public Collection<? extends FabricWorker<?>> getFabricWorkers() {
079 return Collections.unmodifiableCollection(_fabricWorkerQueue);
080 }
081
082 private final Queue<FabricWorker<?>> _fabricWorkerQueue =
083 new ConcurrentLinkedQueue<>();
084 private final ProcessExecutor _processExecutor;
085
086 }