001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.fabric.netty.repository;
016    
017    import com.liferay.portal.fabric.netty.fileserver.FileHelperUtil;
018    import com.liferay.portal.fabric.netty.fileserver.FileRequest;
019    import com.liferay.portal.fabric.netty.fileserver.FileResponse;
020    import com.liferay.portal.fabric.netty.fileserver.handlers.FileResponseChannelHandler;
021    import com.liferay.portal.fabric.netty.util.NettyUtil;
022    import com.liferay.portal.fabric.repository.Repository;
023    import com.liferay.portal.fabric.repository.RepositoryHelperUtil;
024    import com.liferay.portal.kernel.concurrent.AsyncBroker;
025    import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
026    import com.liferay.portal.kernel.concurrent.FutureListener;
027    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
028    import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
029    import com.liferay.portal.kernel.log.Log;
030    import com.liferay.portal.kernel.log.LogFactoryUtil;
031    
032    import io.netty.channel.Channel;
033    import io.netty.channel.ChannelFuture;
034    import io.netty.channel.ChannelFutureListener;
035    import io.netty.channel.ChannelPipeline;
036    import io.netty.util.concurrent.EventExecutorGroup;
037    
038    import java.io.IOException;
039    
040    import java.nio.file.Files;
041    import java.nio.file.Path;
042    import java.nio.file.attribute.FileTime;
043    
044    import java.util.Map;
045    import java.util.concurrent.ConcurrentHashMap;
046    import java.util.concurrent.ExecutionException;
047    import java.util.concurrent.Future;
048    import java.util.concurrent.atomic.AtomicInteger;
049    
050    /**
051     * @author Shuyang Zhou
052     */
053    public class NettyRepository implements Repository {
054    
055            public NettyRepository(
056                    Path repositoryPath, Channel channel,
057                    EventExecutorGroup eventExecutorGroup, long getFileTimeout) {
058    
059                    if (repositoryPath == null) {
060                            throw new NullPointerException("Repository path is null");
061                    }
062    
063                    if (channel == null) {
064                            throw new NullPointerException("Channel is null");
065                    }
066    
067                    if (eventExecutorGroup == null) {
068                            throw new NullPointerException("Event executor group is null");
069                    }
070    
071                    if (!Files.isDirectory(repositoryPath)) {
072                            throw new IllegalArgumentException(
073                                    repositoryPath + " is not a directory");
074                    }
075    
076                    this.repositoryPath = repositoryPath;
077                    this.channel = channel;
078                    this.getFileTimeout = getFileTimeout;
079    
080                    ChannelPipeline channelPipeline = channel.pipeline();
081    
082                    channelPipeline.addLast(
083                            new FileResponseChannelHandler(asyncBroker, eventExecutorGroup));
084            }
085    
086            @Override
087            public void dispose(boolean delete) {
088                    for (Path path : pathMap.values()) {
089                            FileHelperUtil.delete(true, path);
090                    }
091    
092                    pathMap.clear();
093    
094                    if (delete) {
095                            FileHelperUtil.delete(true, repositoryPath);
096                    }
097            }
098    
099            @Override
100            public NoticeableFuture<Path> getFile(
101                    Path remoteFilePath, Path localFilePath, boolean deleteAfterFetch) {
102    
103                    if (localFilePath == null) {
104                            return getFile(
105                                    remoteFilePath,
106                                    RepositoryHelperUtil.getRepositoryFilePath(
107                                            repositoryPath, remoteFilePath),
108                                    deleteAfterFetch, true);
109                    }
110    
111                    return getFile(remoteFilePath, localFilePath, deleteAfterFetch, false);
112            }
113    
114            @Override
115            public NoticeableFuture<Map<Path, Path>> getFiles(
116                    Map<Path, Path> pathMap, boolean deleteAfterFetch) {
117    
118                    final DefaultNoticeableFuture<Map<Path, Path>> defaultNoticeableFuture =
119                            new DefaultNoticeableFuture<Map<Path, Path>>();
120    
121                    if (pathMap.isEmpty()) {
122                            defaultNoticeableFuture.set(pathMap);
123    
124                            return defaultNoticeableFuture;
125                    }
126    
127                    final Map<Path, Path> resultPathMap =
128                            new ConcurrentHashMap<Path, Path>();
129    
130                    final AtomicInteger counter = new AtomicInteger(pathMap.size());
131    
132                    for (Map.Entry<Path, Path> entry : pathMap.entrySet()) {
133                            final Path remoteFilePath = entry.getKey();
134    
135                            NoticeableFuture<Path> noticeableFuture = getFile(
136                                    remoteFilePath, entry.getValue(), deleteAfterFetch);
137    
138                            noticeableFuture.addFutureListener(
139                                    new FutureListener<Path>() {
140    
141                                            @Override
142                                            public void complete(Future<Path> future) {
143                                                    if (future.isCancelled()) {
144                                                            defaultNoticeableFuture.cancel(true);
145    
146                                                            return;
147                                                    }
148    
149                                                    try {
150                                                            Path localFilePath = future.get();
151    
152                                                            if (localFilePath != null) {
153                                                                    resultPathMap.put(
154                                                                            remoteFilePath, localFilePath);
155                                                            }
156    
157                                                            if (counter.decrementAndGet() <= 0) {
158                                                                    defaultNoticeableFuture.set(resultPathMap);
159                                                            }
160                                                    }
161                                                    catch (Throwable t) {
162                                                            if (t instanceof ExecutionException) {
163                                                                    t = t.getCause();
164                                                            }
165    
166                                                            defaultNoticeableFuture.setException(t);
167                                                    }
168                                            }
169    
170                                    });
171                    }
172    
173                    return defaultNoticeableFuture;
174            }
175    
176            @Override
177            public Path getRepositoryPath() {
178                    return repositoryPath;
179            }
180    
181            protected static long getLastModifiedTime(Path path) {
182                    if (path == null) {
183                            return Long.MIN_VALUE;
184                    }
185    
186                    try {
187                            FileTime fileTime = Files.getLastModifiedTime(path);
188    
189                            return fileTime.toMillis();
190                    }
191                    catch (IOException ioe) {
192                            return Long.MIN_VALUE;
193                    }
194            }
195    
196            protected NoticeableFuture<Path> getFile(
197                    final Path remoteFilePath, final Path localFilePath,
198                    boolean deleteAfterFetch, final boolean populateCache) {
199    
200                    if (_log.isDebugEnabled()) {
201                            _log.debug("Fetching remote file " + remoteFilePath);
202                    }
203    
204                    final Path cachedLocalFilePath = pathMap.get(remoteFilePath);
205    
206                    final NoticeableFuture<FileResponse> noticeableFuture =
207                            asyncBroker.post(remoteFilePath);
208    
209                    NettyUtil.scheduleCancellation(
210                            channel, noticeableFuture, getFileTimeout);
211    
212                    ChannelFuture channelFuture = channel.writeAndFlush(
213                            new FileRequest(
214                                    remoteFilePath, getLastModifiedTime(cachedLocalFilePath),
215                                    deleteAfterFetch));
216    
217                    channelFuture.addListener(
218                            new ChannelFutureListener() {
219    
220                                    @Override
221                                    public void operationComplete(ChannelFuture channelFuture) {
222                                            if (channelFuture.isSuccess()) {
223                                                    return;
224                                            }
225    
226                                            if (channelFuture.isCancelled()) {
227                                                    noticeableFuture.cancel(true);
228    
229                                                    return;
230                                            }
231    
232                                            Throwable throwable = new IOException(
233                                                    "Unable to fetch remote file " + remoteFilePath,
234                                                    channelFuture.cause());
235    
236                                            if (!asyncBroker.takeWithException(
237                                                            remoteFilePath, throwable)) {
238    
239                                                    _log.error(
240                                                            "Unable to place exception because no future " +
241                                                                    "exists with ID " + remoteFilePath,
242                                                            throwable);
243                                            }
244                                    }
245    
246                            });
247    
248                    return new NoticeableFutureConverter<Path, FileResponse>(
249                            noticeableFuture) {
250    
251                                    @Override
252                                    protected Path convert(FileResponse fileResponse)
253                                            throws IOException {
254    
255                                            if (fileResponse.isFileNotFound()) {
256                                                    if (_log.isWarnEnabled()) {
257                                                            _log.warn(
258                                                                    "Remote file " + remoteFilePath +
259                                                                            " is not found");
260                                                    }
261    
262                                                    return null;
263                                            }
264    
265                                            if (fileResponse.isFileNotModified()) {
266                                                    if (_log.isDebugEnabled()) {
267                                                            _log.debug(
268                                                                    "Remote file " + remoteFilePath +
269                                                                            " is not modified, use cached local file " +
270                                                                                    cachedLocalFilePath);
271                                                    }
272    
273                                                    return cachedLocalFilePath;
274                                            }
275    
276                                            FileHelperUtil.move(
277                                                    fileResponse.getLocalFile(), localFilePath);
278    
279                                            if (populateCache) {
280                                                    pathMap.put(remoteFilePath, localFilePath);
281                                            }
282    
283                                            if (_log.isDebugEnabled()) {
284                                                    _log.debug(
285                                                            "Fetched remote file " + remoteFilePath + " to " +
286                                                                    localFilePath);
287                                            }
288    
289                                            return localFilePath;
290                                    }
291    
292                            };
293            }
294    
295            protected final AsyncBroker<Path, FileResponse> asyncBroker =
296                    new AsyncBroker<Path, FileResponse>();
297            protected final Channel channel;
298            protected final long getFileTimeout;
299            protected final Map<Path, Path> pathMap =
300                    new ConcurrentHashMap<Path, Path>();
301            protected final Path repositoryPath;
302    
303            private static final Log _log = LogFactoryUtil.getLog(
304                    NettyRepository.class);
305    
306    }