001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.dao.jdbc;
016    
017    import com.liferay.portal.kernel.concurrent.ConcurrentHashSet;
018    import com.liferay.portal.kernel.concurrent.FutureListener;
019    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
020    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
021    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
022    import com.liferay.portal.kernel.util.GetterUtil;
023    import com.liferay.portal.kernel.util.PropsKeys;
024    import com.liferay.portal.kernel.util.PropsUtil;
025    import com.liferay.portal.kernel.util.ProxyUtil;
026    
027    import java.lang.reflect.InvocationHandler;
028    import java.lang.reflect.Method;
029    
030    import java.sql.Connection;
031    import java.sql.DatabaseMetaData;
032    import java.sql.PreparedStatement;
033    import java.sql.SQLException;
034    
035    import java.util.Set;
036    import java.util.concurrent.Callable;
037    import java.util.concurrent.ExecutionException;
038    import java.util.concurrent.Future;
039    
040    /**
041     * @author Shuyang Zhou
042     * @author Preston Crary
043     */
044    public class AutoBatchPreparedStatementUtil {
045    
046            public static PreparedStatement autoBatch(
047                            PreparedStatement preparedStatement)
048                    throws SQLException {
049    
050                    Connection connection = preparedStatement.getConnection();
051    
052                    DatabaseMetaData databaseMetaData = connection.getMetaData();
053    
054                    if (databaseMetaData.supportsBatchUpdates()) {
055                            return (PreparedStatement)ProxyUtil.newProxyInstance(
056                                    ClassLoader.getSystemClassLoader(), _interfaces,
057                                    new BatchInvocationHandler(preparedStatement));
058                    }
059    
060                    return (PreparedStatement)ProxyUtil.newProxyInstance(
061                            ClassLoader.getSystemClassLoader(), _interfaces,
062                            new NoBatchInvocationHandler(preparedStatement));
063            }
064    
065            public static PreparedStatement concurrentAutoBatch(
066                            Connection connection, String sql)
067                    throws SQLException {
068    
069                    DatabaseMetaData databaseMetaData = connection.getMetaData();
070    
071                    if (databaseMetaData.supportsBatchUpdates()) {
072                            return (PreparedStatement)ProxyUtil.newProxyInstance(
073                                    ClassLoader.getSystemClassLoader(), _interfaces,
074                                    new ConcurrentBatchInvocationHandler(connection, sql));
075                    }
076    
077                    return (PreparedStatement)ProxyUtil.newProxyInstance(
078                            ClassLoader.getSystemClassLoader(), _interfaces,
079                            new ConcurrentNoBatchInvocationHandler(connection, sql));
080            }
081    
082            private static final int _HIBERNATE_JDBC_BATCH_SIZE = GetterUtil.getInteger(
083                    PropsUtil.get(PropsKeys.HIBERNATE_JDBC_BATCH_SIZE));
084    
085            private static final Method _addBatchMethod;
086            private static final Method _closeMethod;
087            private static final Method _executeBatch;
088            private static final Class<?>[] _interfaces =
089                    new Class<?>[] {PreparedStatement.class};
090    
091            static {
092                    try {
093                            _addBatchMethod = PreparedStatement.class.getMethod("addBatch");
094                            _closeMethod = PreparedStatement.class.getMethod("close");
095                            _executeBatch = PreparedStatement.class.getMethod("executeBatch");
096                    }
097                    catch (NoSuchMethodException nsme) {
098                            throw new ExceptionInInitializerError(nsme);
099                    }
100            }
101    
102            private static class BatchInvocationHandler implements InvocationHandler {
103    
104                    @Override
105                    public Object invoke(Object proxy, Method method, Object[] args)
106                            throws Throwable {
107    
108                            if (method.equals(_executeBatch)) {
109                                    if (_count > 0) {
110                                            _count = 0;
111    
112                                            return _preparedStatement.executeBatch();
113                                    }
114    
115                                    return new int[0];
116                            }
117    
118                            if (!method.equals(_addBatchMethod)) {
119                                    return method.invoke(_preparedStatement, args);
120                            }
121    
122                            _preparedStatement.addBatch();
123    
124                            if (++_count >= _HIBERNATE_JDBC_BATCH_SIZE) {
125                                    _preparedStatement.executeBatch();
126    
127                                    _count = 0;
128                            }
129    
130                            return null;
131                    }
132    
133                    private BatchInvocationHandler(PreparedStatement preparedStatement) {
134                            _preparedStatement = preparedStatement;
135                    }
136    
137                    private int _count;
138                    private final PreparedStatement _preparedStatement;
139    
140            }
141    
142            private static class ConcurrentBatchInvocationHandler
143                    implements InvocationHandler {
144    
145                    @Override
146                    public Object invoke(Object proxy, Method method, Object[] args)
147                            throws Throwable {
148    
149                            if (method.equals(_addBatchMethod)) {
150                                    _preparedStatement.addBatch();
151    
152                                    if (++_count >= _HIBERNATE_JDBC_BATCH_SIZE) {
153                                            _executeBatch();
154                                    }
155    
156                                    return null;
157                            }
158    
159                            if (method.equals(_executeBatch)) {
160                                    if (_count > 0) {
161                                            _executeBatch();
162                                    }
163    
164                                    return new int[0];
165                            }
166    
167                            if (method.equals(_closeMethod)) {
168                                    Throwable throwable = null;
169    
170                                    for (Future<Void> future : _futures) {
171                                            try {
172                                                    future.get();
173                                            }
174                                            catch (Throwable t) {
175                                                    if (t instanceof ExecutionException) {
176                                                            t = t.getCause();
177                                                    }
178    
179                                                    if (throwable == null) {
180                                                            throwable = t;
181                                                    }
182                                                    else {
183                                                            throwable.addSuppressed(t);
184                                                    }
185                                            }
186                                    }
187    
188                                    if (throwable != null) {
189                                            throw throwable;
190                                    }
191                            }
192    
193                            return method.invoke(_preparedStatement, args);
194                    }
195    
196                    private ConcurrentBatchInvocationHandler(
197                                    Connection connection, String sql)
198                            throws SQLException {
199    
200                            _connection = connection;
201                            _sql = sql;
202    
203                            _preparedStatement = _connection.prepareStatement(_sql);
204                    }
205    
206                    private void _executeBatch() throws SQLException {
207                            _count = 0;
208    
209                            final PreparedStatement preparedStatement = _preparedStatement;
210    
211                            NoticeableFuture<Void> noticeableFuture =
212                                    _threadPoolExecutor.submit(
213                                            new Callable<Void>() {
214    
215                                                    @Override
216                                                    public Void call() throws SQLException {
217                                                            try {
218                                                                    preparedStatement.executeBatch();
219                                                            }
220                                                            finally {
221                                                                    synchronized (_connection) {
222                                                                            preparedStatement.close();
223                                                                    }
224                                                            }
225    
226                                                            return null;
227                                                    }
228    
229                                            });
230    
231                            _futures.add(noticeableFuture);
232    
233                            noticeableFuture.addFutureListener(
234                                    new FutureListener<Void>() {
235    
236                                            @Override
237                                            public void complete(Future<Void> future) {
238                                                    try {
239                                                            future.get();
240    
241                                                            _futures.remove(future);
242                                                    }
243                                                    catch (Throwable t) {
244                                                    }
245                                            }
246    
247                                    });
248    
249                            _preparedStatement = _connection.prepareStatement(_sql);
250                    }
251    
252                    private final Connection _connection;
253                    private int _count;
254                    private final Set<Future<Void>> _futures = new ConcurrentHashSet<>();
255                    private PreparedStatement _preparedStatement;
256                    private final String _sql;
257                    private final ThreadPoolExecutor _threadPoolExecutor =
258                            PortalExecutorManagerUtil.getPortalExecutor(
259                                    ConcurrentBatchInvocationHandler.class.getName());
260    
261            }
262    
263            private static class ConcurrentNoBatchInvocationHandler
264                    implements InvocationHandler {
265    
266                    @Override
267                    public Object invoke(Object proxy, Method method, Object[] args)
268                            throws Throwable {
269    
270                            if (method.equals(_addBatchMethod)) {
271                                    _executeUpdate();
272    
273                                    return null;
274                            }
275    
276                            if (method.equals(_executeBatch)) {
277                                    return new int[0];
278                            }
279    
280                            if (method.equals(_closeMethod)) {
281                                    Throwable throwable = null;
282    
283                                    for (Future<Void> future : _futures) {
284                                            try {
285                                                    future.get();
286                                            }
287                                            catch (Throwable t) {
288                                                    if (t instanceof ExecutionException) {
289                                                            t = t.getCause();
290                                                    }
291    
292                                                    if (throwable == null) {
293                                                            throwable = t;
294                                                    }
295                                                    else {
296                                                            throwable.addSuppressed(t);
297                                                    }
298                                            }
299                                    }
300    
301                                    if (throwable != null) {
302                                            throw throwable;
303                                    }
304                            }
305    
306                            return method.invoke(_preparedStatement, args);
307                    }
308    
309                    private ConcurrentNoBatchInvocationHandler(
310                                    Connection connection, String sql)
311                            throws SQLException {
312    
313                            _connection = connection;
314                            _sql = sql;
315    
316                            _preparedStatement = _connection.prepareStatement(_sql);
317                    }
318    
319                    private void _executeUpdate() throws SQLException {
320                            final PreparedStatement preparedStatement = _preparedStatement;
321    
322                            NoticeableFuture<Void> noticeableFuture =
323                                    _threadPoolExecutor.submit(
324                                            new Callable<Void>() {
325    
326                                                    @Override
327                                                    public Void call() throws SQLException {
328                                                            try {
329                                                                    preparedStatement.executeUpdate();
330                                                            }
331                                                            finally {
332                                                                    synchronized (_connection) {
333                                                                            preparedStatement.close();
334                                                                    }
335                                                            }
336    
337                                                            return null;
338                                                    }
339    
340                                            });
341    
342                            _futures.add(noticeableFuture);
343    
344                            noticeableFuture.addFutureListener(
345                                    new FutureListener<Void>() {
346    
347                                            @Override
348                                            public void complete(Future<Void> future) {
349                                                    try {
350                                                            future.get();
351    
352                                                            _futures.remove(future);
353                                                    }
354                                                    catch (Throwable t) {
355                                                    }
356                                            }
357    
358                                    });
359    
360                            _preparedStatement = _connection.prepareStatement(_sql);
361                    }
362    
363                    private final Connection _connection;
364                    private final Set<Future<Void>> _futures = new ConcurrentHashSet<>();
365                    private PreparedStatement _preparedStatement;
366                    private final String _sql;
367                    private final ThreadPoolExecutor _threadPoolExecutor =
368                            PortalExecutorManagerUtil.getPortalExecutor(
369                                    ConcurrentNoBatchInvocationHandler.class.getName());
370    
371            }
372    
373            private static class NoBatchInvocationHandler implements InvocationHandler {
374    
375                    @Override
376                    public Object invoke(Object proxy, Method method, Object[] args)
377                            throws Throwable {
378    
379                            if (method.equals(_addBatchMethod)) {
380                                    _preparedStatement.executeUpdate();
381    
382                                    return null;
383                            }
384    
385                            if (method.equals(_executeBatch)) {
386                                    return new int[0];
387                            }
388    
389                            return method.invoke(_preparedStatement, args);
390                    }
391    
392                    private NoBatchInvocationHandler(PreparedStatement preparedStatement) {
393                            _preparedStatement = preparedStatement;
394                    }
395    
396                    private final PreparedStatement _preparedStatement;
397    
398            }
399    
400    }