Bu yazıda MSSQL üzerinde yer alan bir tablomuzu ElasticSearch’e nasıl aktarabileceğimiz konusuna değineceğiz.
Bu aktarımları, alanında en iyi çözüm olarak gördüğüm Logstash kullanarak gerçekleştireceğiz. Ayrıca daha sonraki güncellemeleri de yapabilmemiz için aktarımını yapacağımız tabloda arayacağımız bir özelliğimiz olacak. Bu özellikten kastım otomatik artan bir integer alan yada her kaydın eklendiği tarihi ihtiva eden bir datetime alan.
Öncelikle aktaracağımız tabloyu tanıyarak başlayalım ve adım adım ilerleyelim.
Farz edelim ki okuldaki öğretmenlerin kayıtlarını içeren Ogretmen adında bir tablomuz var. Bu tablomuzda otomatik artan bir Id alanımız mevcut ve yine her kayıt ile birlikte otomatik olarak eklenen bir KayitTarihi alanı bulunmakta.
Adım adım devam edeceğiz.
1. Adım : Logstash Nedir?
Logstash açık kaynak kodlu ve bir çok kaynaktan data okuyarak, bu datayı isteğinize göre dönüştürüp çeşitli kaynaklara yazabilmenizi sağlayan bir araçtır. Genellikle ElasticSearch için kullanılmakta olup diğer birçok ortam içinde tercih edilebilir. Logstashi tıklayarak indirebilirsiniz. Hem Windows hem de Linux ortamında çalışabilmektedir.
Logstash temelde iki parametreden oluşur, bunlar input ve output parametreleridir. Input parametresi verinin okunacağı yeri output parametresi ise yazılacağı yeri ifade etmektedir.
Öncelikle Logstash’in MSSQL ortamından dataya erişebilmesi için jdbc driverini indirelim.
2. Adım : Config Yapılandırması.
Logstash’i ve JDBC Driver’ı indirdik gelelim config yapılandırmasına. İndirmiş olduğunuz Logstash Zip dosyasını açtığınızda ve içerisinde yer alan config klasörünün içinde logstash-sample.conf dosyasyını göreceksiniz.
Gördüğünüz config üzerinde değişiklikler yapmaya başlayalım.
2.1. JDBC Driver
İndirdiğiniz jdbc driver’ı uygun bir klasör içerisine atın ve config içerisine gerekli kodları eklemeye başlayalım.
1234567891011121314151617 input{jdbc_driver_library =>C:\sqljdbc_7.2\enu\mssql-jdbc-7.2.1.jre8.jar # Bu kısımda jdbc dosyamızın yolunu belirtiyoruz.jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"jdbc_connection_string =>"jdbc:sqlserver://SunucuIpAdresi:SunucuPort;" #Bu kısımda sunucumuzun ip adresini veya listener ismini ve ardından portunu belirtmeniz gerekmektedir.jdbc_user => "KullaniciAdi" # SQL sunucusuna bağlanmak için kullanıcı adıjdbc_password => "Parola" # Kullanıcıya ait parola bilgisischedule => "*/10 * * * *" # Logstash’in ne kadar sürede bir çalışacağını bu alanda ifade edebilirsiniz. Örneğin şuan ki ayarlamada her 10 dakikada bir çalışacağını belirtiyoruz. İstediğiniz zamanlamayı yapabilmek için corntab.guruweb adresini kullanabilirsiniz.statement => " Select * FROM test.dbo.Ogretmen with(nolock)" # Bu alan ise çalışmasını istediğiniz sql kodunu içermektedir.}
2.2. Sql_last_value Değerini Kullanarak Sadece Yeni Kayıtların Aktarılması
Yukardaki şekilde ayarlanan bir input alanı sql sunucusuna gidecek ve tüm tabloyu okuyacaktır. Ancak bazı tablolar çok büyük olacağı için sadece gelen yeni kayıtları aktarmak isteyebiliriz. Sql_last_value parametresi ise tam bu noktada yardımımıza yetişiyor. Sql last value parametresi belirlediğimiz bir kolonun son aktarılan değerini bir dosyaya yazıyor ve sonrasında o dosyadan geri okuyarak kaldığımız yerden devam etmemizi sağlıyor. Hemen config üzerinde görelim.
Yine jdbc parantezleri arasına aşağıda yer alan parametreleri eklemeliyiz.
1 2 3 4 5 6 7 |
tracking_column => "kayittarihi" # Bu alanda aktardığımız bir kolonu takip edilecek kolon olarak belirtiyoruz. Bu kolon integer yada datetime olabilir. Ben bu örnekte datetime alan olan kayittarihi alanını tercih ettim use_column_value => true tracking_column_type =>"timestamp" # Bu parametrede ise kolonumuzun hangi tipte olduğunu belirtmemiz gerekiyor. Sadece iki tip desteklenmektedir. Datetime alanları karşılamak için timestamp ve integer alanları karşılamak için numeric. last_run_metadata_path =>"C:logstashAktarimlariaktarim.lastid" # Bu parametre ise takip edilen kolonun yazılacağı dosyayı belirtir. Böyle bir dosya yoksa bile aktarım işlemi bittiğinde logstash kendisi oluşturacaktır. |
Tüm bu işlemleri yaptıktan sonra Sql komutumuzu da biraz değiştirmemiz gerekmektedir.
1 2 3 |
statement => " Select * FROM test.dbo.Ogretmen with(nolock)" kısmına bir where koşulu koyarak en son aktarılan tarihten itibaren devam etmesini sağlayacağız. statement => " Select * FROM test.dbo.Ogretmen with(nolock) Where KayitTarihi>=:sql_last_value" |
2.3. Output
Output parametresi ise ElasticSearch bağlantı komutlarının olduğu kısımdır.
123456789101112131415 output {elasticsearch {hosts => ["sunucuipadresi:9200"] # Birden fazla node’a sahipseniz araya virgül ekleyerek ipyi çoğaltabilirsiniz.index => "indexadi"user => "elastic"password => "password"document_id => "%{id}" # ElasticSearch ortamında her doküman için doküman id oluşturulur. Üstelik aynı id ile bir kayıt daha gelirse bu otomatik olarak eski dökümanın üzerine versiyon olarak yazılır. Sql sorgusu üzerinden gelen id kolonunu bu şekilde doküman idsi olarak kaydedebiliriz.}}
3. Logstash’i Çalıştırmak
Config dosyamızı da ayarladıktan sonra geriye sadece çalıştırmak kaldı. Logstash Zip dosyasını çıkardığınız klasöre gidin “bin” klastörü içerisinde Komut istemi (CMD) aracılığı ile şu komutu yazın.
logstash –f C:config.conf
İşte hepsi bu kadardı. Config dosyasında belirttiğiniz zamanlama ile logstash çalışacak ve dataları aktaracaktır. Sadece JDBC Driver’ı değiştirerek Postgresql ve Mysql gibi ilişkisel veri tabanlarından da aynı mantık ile veri aktarabilirsiniz.