001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.fabric.local.agent;
016    
017    import com.liferay.portal.fabric.agent.FabricAgent;
018    import com.liferay.portal.fabric.local.worker.LocalFabricWorker;
019    import com.liferay.portal.fabric.status.FabricStatus;
020    import com.liferay.portal.fabric.status.LocalFabricStatus;
021    import com.liferay.portal.fabric.worker.FabricWorker;
022    import com.liferay.portal.kernel.concurrent.FutureListener;
023    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
024    import com.liferay.portal.kernel.process.ProcessCallable;
025    import com.liferay.portal.kernel.process.ProcessConfig;
026    import com.liferay.portal.kernel.process.ProcessException;
027    import com.liferay.portal.kernel.process.ProcessExecutor;
028    
029    import java.io.Serializable;
030    
031    import java.util.Collection;
032    import java.util.Collections;
033    import java.util.Queue;
034    import java.util.concurrent.ConcurrentLinkedQueue;
035    import java.util.concurrent.Future;
036    
037    /**
038     * @author Shuyang Zhou
039     */
040    public class LocalFabricAgent implements FabricAgent {
041    
042            public LocalFabricAgent(ProcessExecutor processExecutor) {
043                    _processExecutor = processExecutor;
044            }
045    
046            @Override
047            public <T extends Serializable> FabricWorker<T> execute(
048                            ProcessConfig processConfig, ProcessCallable<T> processCallable)
049                    throws ProcessException {
050    
051                    final FabricWorker<T> fabricWorker = new LocalFabricWorker<>(
052                            _processExecutor.execute(processConfig, processCallable));
053    
054                    _fabricWorkerQueue.add(fabricWorker);
055    
056                    NoticeableFuture<T> noticeableFuture =
057                            fabricWorker.getProcessNoticeableFuture();
058    
059                    noticeableFuture.addFutureListener(
060                            new FutureListener<T>() {
061    
062                                    @Override
063                                    public void complete(Future<T> future) {
064                                            _fabricWorkerQueue.remove(fabricWorker);
065                                    }
066    
067                            });
068    
069                    return fabricWorker;
070            }
071    
072            @Override
073            public FabricStatus getFabricStatus() {
074                    return LocalFabricStatus.INSTANCE;
075            }
076    
077            @Override
078            public Collection<? extends FabricWorker<?>> getFabricWorkers() {
079                    return Collections.unmodifiableCollection(_fabricWorkerQueue);
080            }
081    
082            private final Queue<FabricWorker<?>> _fabricWorkerQueue =
083                    new ConcurrentLinkedQueue<>();
084            private final ProcessExecutor _processExecutor;
085    
086    }