001/* 
002 * JKNIV, whinstone one contract to access your database.
003 * 
004 * Copyright (C) 2017, the original author or authors.
005 *
006 * This library is free software; you can redistribute it and/or
007 * modify it under the terms of the GNU Lesser General Public
008 * License as published by the Free Software Foundation; either
009 * version 2.1 of the License.
010 * 
011 * This library is distributed in the hope that it will be useful,
012 * but WITHOUT ANY WARRANTY; without even the implied warranty of
013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
014 * Lesser General Public License for more details.
015 * 
016 * You should have received a copy of the GNU Lesser General Public
017 * License along with this library; if not, write to the Free Software Foundation, Inc., 
018 * 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
019 */
020package net.sf.jkniv.whinstone.cassandra;
021
022import java.io.File;
023import java.net.MalformedURLException;
024import java.net.URL;
025import java.util.Enumeration;
026import java.util.Properties;
027
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import com.datastax.driver.core.Cluster;
032import com.datastax.driver.core.Host;
033import com.datastax.driver.core.Metadata;
034import com.datastax.driver.core.ProtocolVersion;
035import com.datastax.driver.core.Session;
036
037import net.sf.jkniv.exception.HandleableException;
038import net.sf.jkniv.sqlegance.DefaultClassLoader;
039import net.sf.jkniv.sqlegance.RepositoryConfigException;
040import net.sf.jkniv.sqlegance.RepositoryProperty;
041import net.sf.jkniv.sqlegance.transaction.Isolation;
042import net.sf.jkniv.whinstone.commands.CommandAdapter;
043import net.sf.jkniv.whinstone.types.RegisterType;
044
045/**
046 * 
047 * @author Alisson Gomes
048 * @since 0.6.0
049 */
050@SuppressWarnings("rawtypes")
051public class CassandraSessionFactory //implements ConnectionFactory
052{
053    private static final Logger       LOG = LoggerFactory.getLogger(CassandraSessionFactory.class);
054    private final String              contextName;
055    private Cluster                   cluster;
056    private CassandraCommandAdapter   conn;
057    private final HandleableException handlerException;
058    private final RegisterCodec      registerCodec;
059    
060    public CassandraSessionFactory(Properties props, String contextName, RegisterType registerType, HandleableException handlerException)
061    {
062        this.handlerException = handlerException;
063        String[] urls = props.getProperty(RepositoryProperty.JDBC_URL.key(), "127.0.0.1").split(",");
064        String keyspace = props.getProperty(RepositoryProperty.JDBC_SCHEMA.key());
065        String username = props.getProperty(RepositoryProperty.JDBC_USER.key());
066        String password = props.getProperty(RepositoryProperty.JDBC_PASSWORD.key());
067        String protocol = props.getProperty(RepositoryProperty.PROTOCOL_VERSION.key());
068        ProtocolVersion version = getProtocolVersion(protocol);
069        this.registerCodec = new RegisterCodec();
070        this.contextName = contextName;
071        URL cloudSecureConnect = getCloudSecureConnect(props);
072        
073        if (cloudSecureConnect != null)
074        {
075            cluster = Cluster.builder()
076                        .withCloudSecureConnectBundle(cloudSecureConnect)
077                        .withCredentials(username, password)
078                        .build();
079        }        
080        else if (username != null)
081            cluster = Cluster.builder()
082                    .addContactPoints(urls)
083                    .withCredentials(username, password)
084                    .withProtocolVersion(version).build();
085        else
086            cluster = Cluster.builder().withProtocolVersion(version).addContactPoints(urls).build();
087        
088        settingProperties(props);
089        
090        final Metadata metadata = cluster.getMetadata();
091        if (LOG.isInfoEnabled())
092        {
093            LOG.info("Connected to cluster: {}", metadata.getClusterName());
094            LOG.info("List of hosts");
095            for (final Host host : metadata.getAllHosts())
096                LOG.info("Datacenter: {}; Host: {}; Rack: {}", host.getDatacenter(), host.getAddress(), host.getRack());
097        }
098        Session session = cluster.connect(keyspace);
099        this.conn = new CassandraCommandAdapter(contextName, cluster, session, registerType, registerCodec, handlerException);
100    }
101    
102    private ProtocolVersion getProtocolVersion(String protocol)
103    {
104        ProtocolVersion version = null;
105        for (ProtocolVersion v : ProtocolVersion.values())
106        {
107            if (v.name().equals(protocol))
108                version = v;
109        }
110        if (version == null)
111        {
112            if (protocol != null)
113                LOG.warn("Property {} has invalid value [{}] using protocol {}",
114                        RepositoryProperty.PROTOCOL_VERSION.key(), protocol, ProtocolVersion.NEWEST_SUPPORTED);
115            version = ProtocolVersion.NEWEST_SUPPORTED;
116        }
117        return version;
118    }
119    
120    private void settingProperties(Properties props)
121    {
122        Enumeration<Object> keys = props.keys();
123        while (keys.hasMoreElements())
124        {
125            String k = keys.nextElement().toString();
126            if (k.startsWith("codec."))
127            {
128                boolean enable = Boolean.valueOf(props.getProperty(k));
129                String codecName = k.substring(6);// (6) -> "codec."
130                registerCodec.register(this.cluster, codecName, enable);
131            }
132        }
133    }
134    
135    //@Override
136    public CommandAdapter open()
137    {
138        return conn;
139    }
140    
141    //@Override
142    public CommandAdapter open(Isolation isolation)
143    {
144        LOG.warn("whinstone-cassandra doesn't support isolation attribute [{}]", isolation);
145        return conn;
146    }
147    
148    //    @Override
149    //    public Transactional getTransactionManager()
150    //    {
151    //        // TODO Auto-generated method stub
152    //        return null;
153    //    }
154    //    
155    //    @Override
156    //    public String getContextName()
157    //    {
158    //        return contextName;
159    //    }
160    //    
161    //    @Override
162    //    public void close(ConnectionAdapter conn)
163    //    {
164    ////        try
165    ////        {
166    //            conn.close();
167    ////        }
168    ////        catch (SQLException e)
169    ////        {
170    ////            LOG.warn("Error to try close Cassandra session/cluster [{}]", conn, e);
171    ////        }
172    //    }
173    //    
174    //    @Override
175    //    public void close(PreparedStatement stmt)
176    //    {
177    //        // TODO Auto-generated method stub
178    //        
179    //    }
180    //    
181    //    @Override
182    //    public void close(Statement stmt)
183    //    {
184    //        // TODO Auto-generated method stub
185    //        
186    //    }
187    //    
188    //    @Override
189    //    public void close(ResultSet rs)
190    //    {
191    //        // TODO Auto-generated method stub
192    //        
193    //    }
194    //    
195    //    @Override
196    //    public void close(CallableStatement call)
197    //    {
198    //        // TODO Auto-generated method stub
199    //        
200    //    }
201    
202    private URL getCloudSecureConnect(Properties props)
203    {
204        String keyFile = props.getProperty(RepositoryProperty.KEY_FILE.key());
205        //keyFile = "file:///C:/dev/wks/wks-jkniv-git/jkniv-whinstone/jkniv-whinstone-cassandra/target/test-classes/database/astra-secure-connect-jkniv.zip";
206        URL cloudSecureConnect = null;
207        if(keyFile != null)
208        {
209            if (keyFile.startsWith("file:"))
210            {
211                try
212                {
213                    cloudSecureConnect = new URL(keyFile);
214                }
215                catch (MalformedURLException e)
216                {
217                    throw new RepositoryConfigException("Key file ["+keyFile+"]for Cassandra CloudSecureConnect is MalformedURLException [" + e.getMessage() + "]");
218                }
219                if (! new File(cloudSecureConnect.getFile()).exists())
220                    throw new RepositoryConfigException("Key file ["+keyFile+"]for Cassandra CloudSecureConnect not exists");
221            }
222            else
223                cloudSecureConnect = DefaultClassLoader.getResource(keyFile);
224        }
225        return cloudSecureConnect;
226    }
227}