001/* 
002 * JKNIV, whinstone one contract to access your database.
003 * 
004 * Copyright (C) 2017, the original author or authors.
005 *
006 * This library is free software; you can redistribute it and/or
007 * modify it under the terms of the GNU Lesser General Public
008 * License as published by the Free Software Foundation; either
009 * version 2.1 of the License.
010 * 
011 * This library is distributed in the hope that it will be useful,
012 * but WITHOUT ANY WARRANTY; without even the implied warranty of
013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
014 * Lesser General Public License for more details.
015 * 
016 * You should have received a copy of the GNU Lesser General Public
017 * License along with this library; if not, write to the Free Software Foundation, Inc., 
018 * 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
019 */
020package net.sf.jkniv.whinstone.cassandra.statement;
021
022import java.sql.SQLException;
023import java.util.Collections;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027
028import org.slf4j.Logger;
029
030import com.datastax.driver.core.ColumnDefinitions;
031import com.datastax.driver.core.PagingState;
032import com.datastax.driver.core.ResultSet;
033import com.datastax.driver.core.Row;
034import com.datastax.driver.core.Session;
035import com.datastax.driver.core.Statement;
036
037import net.sf.jkniv.exception.HandlerException;
038import net.sf.jkniv.experimental.TimerKeeper;
039import net.sf.jkniv.sqlegance.OneToMany;
040import net.sf.jkniv.sqlegance.RepositoryException;
041import net.sf.jkniv.sqlegance.logger.DataMasking;
042import net.sf.jkniv.whinstone.JdbcColumn;
043import net.sf.jkniv.whinstone.Param;
044import net.sf.jkniv.whinstone.Queryable;
045import net.sf.jkniv.whinstone.ResultRow;
046import net.sf.jkniv.whinstone.ResultSetParser;
047import net.sf.jkniv.whinstone.cassandra.CassandraColumn;
048import net.sf.jkniv.whinstone.cassandra.LoggerFactory;
049import net.sf.jkniv.whinstone.classification.Groupable;
050import net.sf.jkniv.whinstone.classification.GroupingBy;
051import net.sf.jkniv.whinstone.classification.NoGroupingBy;
052import net.sf.jkniv.whinstone.classification.Transformable;
053import net.sf.jkniv.whinstone.statement.AutoKey;
054import net.sf.jkniv.whinstone.statement.StatementAdapter;
055import net.sf.jkniv.whinstone.types.RegisterType;
056
057/*
058 * https://docs.datastax.com/en/developer/java-driver/3.1/manual/statements/prepared/
059 * 
060 * //FIXME unsupported method bound.setInet(...)
061 * //FIXME unsupported method bound.setConsistencyLevel(ConsistencyLevel)
062 * //FIXME unsupported method bound.setIdempotent(boolean)
063 * //FIXME unsupported method bound.setBytes(...ByteBuffer)
064 * //FIXME unsupported method bound.setInet(...InetAddress)
065 * //FIXME unsupported method bound.setPartitionKeyToken(Token)
066 * //FIXME unsupported method bound.setRoutingKey(ByteBuffer) 
067 * //FIXME unsupported method bound.setToken(...Token)
068 * //FIXME unsupported method bound.setUUID(...UUID)
069 * 
070 * @author Alisson Gomes
071 * @since 0.6.0
072 */
073@SuppressWarnings({ "unchecked", "rawtypes" })
074public class CassandraStatementAdapter<T, R> implements StatementAdapter<T, Row>
075{
076    private static final Logger SQLLOG = net.sf.jkniv.whinstone.cassandra.LoggerFactory.getLogger();
077    private static final DataMasking MASKING = LoggerFactory.getDataMasking();
078    
079    private final HandlerException   handlerException;
080    private final Statement          stmt;
081    private int                      index, indexIN;
082    private Class<T>                 returnType;
083    private ResultRow<T, Row>        resultRow;
084    private boolean                  scalar;
085    private Session                  session;
086    private Queryable                queryable;
087    private final RegisterType       registerType;
088    
089    public CassandraStatementAdapter(Session session, Statement stmt, Queryable queryable, RegisterType registerType)
090    {
091        this.stmt = stmt;
092        this.session = session;
093        this.registerType = registerType;
094        this.handlerException = new HandlerException(RepositoryException.class, "Cannot set parameter [%s] value [%s]");
095        this.queryable = queryable;
096        this.returnType = (Class<T>) queryable.getReturnType();
097        this.reset();
098    }
099    
100    @Override
101    public StatementAdapter<T, Row> with(ResultRow<T, Row> resultRow)
102    {
103        this.resultRow = resultRow;
104        return this;
105    }
106    
107    @Override
108    public StatementAdapter<T, Row> bind(String name, Object value)
109    {
110        return this;
111        /*
112        log(name, value);
113        if (name.toLowerCase().startsWith("in:"))
114        {
115            try
116            {
117                setValueIN((Object[]) value);
118                return this;
119            }
120            catch (SQLException e)
121            {
122                this.handlerException.handle(e);// FIXME handler default message with custom params
123            }
124        }
125        return bindInternal(value);
126        */
127    }
128    
129    @Override
130    public StatementAdapter<T, Row> bind(Param value)
131    {
132        return this;
133        /*
134        log(value);
135        try
136        {
137            if (value instanceof java.util.Date)
138            {
139                setInternalValue((java.util.Date) value);
140            }
141            else if (Enum.class.isInstance(value))
142            {
143                setInternalValue((Enum<?>) value);
144            }
145            else if (value instanceof java.util.Calendar)
146            {
147                setInternalValue((Calendar) value);
148            }
149            else
150            {
151                bindInternal(value);
152            }
153        }
154        catch (SQLException e)
155        {
156            this.handlerException.handle(e);// FIXME handler default message with custom params
157        }
158        return this;
159        */
160    }
161    
162    @Override
163    public StatementAdapter<T, Row> bind(Param... values)
164    {
165        return this;
166    }
167    /*
168    @Override
169    public void batch()
170    {
171        // TODO implements batch https://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/BatchStatement.html
172        // TODO implements batch https://docs.datastax.com/en/drivers/python/3.2/api/cassandra/query.html
173        // TODO implements batch https://docs.datastax.com/en/cql/3.3/cql/cql_using/useBatch.html
174        // TODO implements batch https://docs.datastax.com/en/cql/3.3/cql/cql_using/useBatchGoodExample.html
175        // TODO https://www.datastax.com/dev/blog/client-side-improvements-in-cassandra-2-0
176    }
177    */
178
179    public List<T> rows()
180    {
181        ResultSet rs = null;
182        ResultSetParser<T, ResultSet> rsParser = null;
183        Groupable<T, ?> grouping = new NoGroupingBy<T, T>();
184        List<T> list = Collections.emptyList();
185        try
186        {
187            if (queryable.getBookmark() != null)
188            {
189                PagingState pagingState = PagingState.fromString(queryable.getBookmark());
190                stmt.setPagingState(pagingState);
191            }
192            TimerKeeper.start();
193            rs = session.execute(stmt);
194            queryable.getDynamicSql().getStats().add(TimerKeeper.clear());
195            
196            JdbcColumn<Row>[] columns = getJdbcColumns(rs.getColumnDefinitions());
197            setResultRow(columns);
198            
199            Transformable<T> transformable = resultRow.getTransformable();
200            if (hasGroupingBy())
201            {
202                grouping = new GroupingBy(getGroupingBy(), returnType, transformable);
203            }
204            rsParser = new ObjectResultSetParser(resultRow, grouping);
205            list = rsParser.parser(rs);//rs.getExecutionInfo().getPagingStateUnsafe();
206            PagingState pagingState = rs.getExecutionInfo().getPagingState();
207            //LOG.info("AvailableWithoutFetching={}, FullyFetched={}, Exhausted={}", rs.getAvailableWithoutFetching(), rs.isFullyFetched(), rs.isExhausted());
208            if (pagingState != null)
209                queryable.setBookmark(pagingState.toString());
210        }
211        catch (SQLException e)
212        {
213            queryable.getDynamicSql().getStats().add(e);
214            
215            handlerException.handle(e, e.getMessage());
216        }
217        finally {
218            TimerKeeper.clear();            
219        }
220        return list;
221    }
222    
223    @Override
224    public void bindKey()
225    {
226        throw new UnsupportedOperationException("No implemented operation generatedKeys for RepositoryCassandra!");
227    }
228    
229    @Override
230    public StatementAdapter<T, Row> with(AutoKey generateKey)
231    {
232        return this;
233    }
234    
235    public int execute()
236    {
237        session.execute(stmt);
238        return java.sql.Statement.SUCCESS_NO_INFO; // FIXME design Statement.SUCCESS_NO_INFO
239    }
240    
241    @Override
242    public int reset()
243    {
244        int before = (index + indexIN);
245        index = 0;
246        indexIN = 0;
247        return before;
248    }
249    
250    /*******************************************************************************/
251    
252    private void setResultRow(JdbcColumn<Row>[] columns)
253    {
254        if (resultRow != null)
255        {
256            resultRow.setColumns(columns);
257            return;
258        }
259        
260        if (scalar)
261            resultRow = new ScalarResultRow(columns);
262        else if (Map.class.isAssignableFrom(returnType))
263            resultRow = new MapResultRow(returnType, columns);
264        else if (Number.class.isAssignableFrom(returnType)) // FIXME implements for date, calendar, boolean improve design
265            resultRow = new NumberResultRow(returnType, columns);
266        else if (String.class.isAssignableFrom(returnType))
267            resultRow = new StringResultRow(columns);
268        else if (!hasOneToMany())
269            resultRow = new FlatObjectResultRow(returnType, columns);
270        else
271            resultRow = new PojoResultRow(returnType, columns, getOneToMany());
272    }
273    
274    /**
275     * Summarize the columns from SQL result in binary data or not.
276     * @param metadata  object that contains information about the types and properties of the columns in a <code>ResultSet</code> 
277     * @return Array of columns with name and index
278     */
279    private JdbcColumn<Row>[] getJdbcColumns(ColumnDefinitions metadata)
280    {
281        JdbcColumn<Row>[] columns = new JdbcColumn[metadata.size()];
282        
283        for (int i = 0; i < columns.length; i++)
284        {
285            String columnName = metadata.getName(i);
286            columns[i] = new CassandraColumn(i, columnName, metadata.getType(i).getName(), registerType, queryable.getReturnType());
287        }
288        return columns;
289    }
290    
291    @Override
292    public void close()
293    {
294        // TODO how to close cassandra statement ?
295    }
296    
297    @Override
298    public void setFetchSize(int rows)
299    {
300        stmt.setFetchSize(rows);
301    }
302
303    private boolean hasOneToMany()
304    {
305        return !queryable.getDynamicSql().asSelectable().getOneToMany().isEmpty();
306    }
307
308    private Set<OneToMany> getOneToMany()
309    {
310        return queryable.getDynamicSql().asSelectable().getOneToMany();
311    }
312
313    private boolean hasGroupingBy()
314    {
315        return !queryable.getDynamicSql().asSelectable().getGroupByAsList().isEmpty();
316    }
317
318    private List<String> getGroupingBy()
319    {
320        return queryable.getDynamicSql().asSelectable().getGroupByAsList();        
321    }
322}