001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.fabric.netty.repository;
016    
017    import com.liferay.portal.fabric.netty.fileserver.FileHelperUtil;
018    import com.liferay.portal.fabric.netty.fileserver.FileRequest;
019    import com.liferay.portal.fabric.netty.fileserver.FileResponse;
020    import com.liferay.portal.fabric.netty.util.NettyUtil;
021    import com.liferay.portal.fabric.repository.Repository;
022    import com.liferay.portal.fabric.repository.RepositoryHelperUtil;
023    import com.liferay.portal.kernel.concurrent.AsyncBroker;
024    import com.liferay.portal.kernel.concurrent.BaseFutureListener;
025    import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
026    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
027    import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
028    import com.liferay.portal.kernel.log.Log;
029    import com.liferay.portal.kernel.log.LogFactoryUtil;
030    
031    import io.netty.channel.Channel;
032    import io.netty.channel.ChannelFuture;
033    import io.netty.channel.ChannelFutureListener;
034    
035    import java.io.IOException;
036    
037    import java.nio.file.Files;
038    import java.nio.file.Path;
039    import java.nio.file.StandardCopyOption;
040    import java.nio.file.attribute.FileTime;
041    
042    import java.util.Iterator;
043    import java.util.Map;
044    import java.util.Set;
045    import java.util.concurrent.ConcurrentHashMap;
046    import java.util.concurrent.Future;
047    import java.util.concurrent.atomic.AtomicInteger;
048    
049    /**
050     * @author Shuyang Zhou
051     */
052    public class NettyRepository implements Repository<Channel> {
053    
054            public NettyRepository(Path repositoryPath, long getFileTimeout) {
055                    if (repositoryPath == null) {
056                            throw new NullPointerException("Repository path is null");
057                    }
058    
059                    if (!Files.isDirectory(repositoryPath)) {
060                            throw new IllegalArgumentException(
061                                    repositoryPath + " is not a directory");
062                    }
063    
064                    this.repositoryPath = repositoryPath;
065                    this.getFileTimeout = getFileTimeout;
066            }
067    
068            @Override
069            public void dispose(boolean delete) {
070                    Set<Map.Entry<Path, Path>> entrySet = pathMap.entrySet();
071    
072                    Iterator<Map.Entry<Path, Path>> iterator = entrySet.iterator();
073    
074                    while (iterator.hasNext()) {
075                            Map.Entry<Path, Path> entry = iterator.next();
076    
077                            iterator.remove();
078    
079                            FileHelperUtil.delete(true, entry.getValue());
080                    }
081    
082                    if (delete) {
083                            FileHelperUtil.delete(true, repositoryPath);
084                    }
085            }
086    
087            @Override
088            public AsyncBroker<Path, FileResponse> getAsyncBroker() {
089                    return asyncBroker;
090            }
091    
092            @Override
093            public NoticeableFuture<Path> getFile(
094                    Channel channel, Path remoteFilePath, Path localFilePath,
095                    boolean deleteAfterFetch) {
096    
097                    if (localFilePath == null) {
098                            return getFile(
099                                    channel, remoteFilePath,
100                                    RepositoryHelperUtil.getRepositoryFilePath(
101                                            repositoryPath, remoteFilePath),
102                                    deleteAfterFetch, true);
103                    }
104    
105                    return getFile(
106                            channel, remoteFilePath, localFilePath, deleteAfterFetch, false);
107            }
108    
109            @Override
110            public NoticeableFuture<Map<Path, Path>> getFiles(
111                    Channel channel, Map<Path, Path> pathMap, boolean deleteAfterFetch) {
112    
113                    final DefaultNoticeableFuture<Map<Path, Path>> defaultNoticeableFuture =
114                            new DefaultNoticeableFuture<>();
115    
116                    if (pathMap.isEmpty()) {
117                            defaultNoticeableFuture.set(pathMap);
118    
119                            return defaultNoticeableFuture;
120                    }
121    
122                    final Map<Path, Path> resultPathMap = new ConcurrentHashMap<>();
123    
124                    final AtomicInteger counter = new AtomicInteger(pathMap.size());
125    
126                    for (Map.Entry<Path, Path> entry : pathMap.entrySet()) {
127                            final Path remoteFilePath = entry.getKey();
128    
129                            NoticeableFuture<Path> noticeableFuture = getFile(
130                                    channel, remoteFilePath, entry.getValue(), deleteAfterFetch);
131    
132                            noticeableFuture.addFutureListener(
133                                    new BaseFutureListener<Path>() {
134    
135                                            @Override
136                                            public void completeWithCancel(Future<Path> future) {
137                                                    defaultNoticeableFuture.cancel(true);
138                                            }
139    
140                                            @Override
141                                            public void completeWithException(
142                                                    Future<Path> future, Throwable throwable) {
143    
144                                                    defaultNoticeableFuture.setException(throwable);
145                                            }
146    
147                                            @Override
148                                            public void completeWithResult(
149                                                    Future<Path> future, Path localFilePath) {
150    
151                                                    if (localFilePath != null) {
152                                                            resultPathMap.put(remoteFilePath, localFilePath);
153                                                    }
154    
155                                                    if (counter.decrementAndGet() <= 0) {
156                                                            defaultNoticeableFuture.set(resultPathMap);
157                                                    }
158                                            }
159    
160                                    });
161                    }
162    
163                    return defaultNoticeableFuture;
164            }
165    
166            @Override
167            public Path getRepositoryPath() {
168                    return repositoryPath;
169            }
170    
171            protected static long getLastModifiedTime(Path path) {
172                    if (path == null) {
173                            return Long.MIN_VALUE;
174                    }
175    
176                    try {
177                            FileTime fileTime = Files.getLastModifiedTime(path);
178    
179                            return fileTime.toMillis();
180                    }
181                    catch (IOException ioe) {
182                            return Long.MIN_VALUE;
183                    }
184            }
185    
186            protected NoticeableFuture<Path> getFile(
187                    Channel channel, final Path remoteFilePath, final Path localFilePath,
188                    boolean deleteAfterFetch, final boolean populateCache) {
189    
190                    if (_log.isDebugEnabled()) {
191                            _log.debug("Fetching remote file " + remoteFilePath);
192                    }
193    
194                    final Path cachedLocalFilePath = pathMap.get(remoteFilePath);
195    
196                    final DefaultNoticeableFuture<FileResponse> defaultNoticeableFuture =
197                            new DefaultNoticeableFuture<>();
198    
199                    NoticeableFuture<FileResponse> noticeableFuture = asyncBroker.post(
200                            remoteFilePath, defaultNoticeableFuture);
201    
202                    if (noticeableFuture == null) {
203                            noticeableFuture = defaultNoticeableFuture;
204    
205                            NettyUtil.scheduleCancellation(
206                                    channel, defaultNoticeableFuture, getFileTimeout);
207    
208                            ChannelFuture channelFuture = channel.writeAndFlush(
209                                    new FileRequest(
210                                            remoteFilePath, getLastModifiedTime(cachedLocalFilePath),
211                                            deleteAfterFetch));
212    
213                            channelFuture.addListener(
214                                    new ChannelFutureListener() {
215    
216                                            @Override
217                                            public void operationComplete(ChannelFuture channelFuture) {
218                                                    if (channelFuture.isSuccess()) {
219                                                            return;
220                                                    }
221    
222                                                    if (channelFuture.isCancelled()) {
223                                                            defaultNoticeableFuture.cancel(true);
224    
225                                                            return;
226                                                    }
227    
228                                                    Throwable throwable = new IOException(
229                                                            "Unable to fetch remote file " + remoteFilePath,
230                                                            channelFuture.cause());
231    
232                                                    if (!asyncBroker.takeWithException(
233                                                                    remoteFilePath, throwable)) {
234    
235                                                            _log.error(
236                                                                    "Unable to place exception because no future " +
237                                                                            "exists with ID " + remoteFilePath,
238                                                                    throwable);
239                                                    }
240                                            }
241    
242                                    });
243                    }
244    
245                    return new NoticeableFutureConverter<Path, FileResponse>(
246                            noticeableFuture) {
247    
248                            @Override
249                            protected Path convert(FileResponse fileResponse)
250                                    throws IOException {
251    
252                                    if (fileResponse.isFileNotFound()) {
253                                            if (_log.isWarnEnabled()) {
254                                                    _log.warn(
255                                                            "Remote file " + remoteFilePath + " is not found");
256                                            }
257    
258                                            return null;
259                                    }
260    
261                                    if (fileResponse.isFileNotModified()) {
262                                            if (_log.isDebugEnabled()) {
263                                                    _log.debug(
264                                                            "Remote file " + remoteFilePath +
265                                                                    " is not modified, use cached local file " +
266                                                                            cachedLocalFilePath);
267                                            }
268    
269                                            return cachedLocalFilePath;
270                                    }
271    
272                                    Path targetLocalFilePath = localFilePath;
273    
274                                    synchronized (fileResponse) {
275                                            Path recheckCacheLocalFilePath = pathMap.get(
276                                                    remoteFilePath);
277    
278                                            if (recheckCacheLocalFilePath != null) {
279                                                    targetLocalFilePath = recheckCacheLocalFilePath;
280                                            }
281    
282                                            Path tempLocalFilePath = fileResponse.getLocalFile();
283    
284                                            if (tempLocalFilePath.startsWith(repositoryPath)) {
285                                                    Files.copy(
286                                                            fileResponse.getLocalFile(), targetLocalFilePath,
287                                                            StandardCopyOption.REPLACE_EXISTING);
288                                            }
289                                            else {
290                                                    Files.move(
291                                                            fileResponse.getLocalFile(), targetLocalFilePath,
292                                                            StandardCopyOption.REPLACE_EXISTING);
293                                            }
294    
295                                            if (populateCache) {
296                                                    pathMap.put(remoteFilePath, targetLocalFilePath);
297                                            }
298    
299                                            fileResponse.setLocalFile(targetLocalFilePath);
300                                    }
301    
302                                    if (_log.isDebugEnabled()) {
303                                            _log.debug(
304                                                    "Fetched remote file " + remoteFilePath + " to " +
305                                                            targetLocalFilePath);
306                                    }
307    
308                                    return targetLocalFilePath;
309                            }
310    
311                    };
312            }
313    
314            protected final AsyncBroker<Path, FileResponse> asyncBroker =
315                    new AsyncBroker<>();
316            protected final long getFileTimeout;
317            protected final Map<Path, Path> pathMap = new ConcurrentHashMap<>();
318            protected final Path repositoryPath;
319    
320            private static final Log _log = LogFactoryUtil.getLog(
321                    NettyRepository.class);
322    
323    }