好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

PostgreSQL 数据同步到ES 搭建操作

安装python 和dev 开发包

?

1

2

3

4

5

[root@rtm2 Packages]# rpm -ivh python-devel-2.7.5-58.el7.x86_64.rpm

准备中...       ################################# [100%]

正在升级/安装...

  1:python-devel-2.7.5-58.el7  ################################# [100%]

[root@rtm2 Packages]# ls

安装 multicorn

?

1

2

3

4

5

6

7

8

9

10

11

12

[root@rtm2 multicorn-1.3.5]# make

Python version is 2.7

gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration- after -statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess- precision =standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/errors.o src/errors.c

gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration- after -statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess- precision =standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/python.o src/python.c

gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration- after -statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess- precision =standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/query.o src/query.c

gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration- after -statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess- precision =standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/multicorn.o src/multicorn.c

gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration- after -statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess- precision =standard -O2 -fPIC -shared -o multicorn.so src/errors.o src/python.o src/query.o src/multicorn.o -L/opt/pgsql-10/lib -Wl, --as-needed -Wl,-rpath,'/opt/pgsql-10/lib',--enable-new-dtags -lpthread -ldl -lutil -lm -lpython2.7 -lpthread -ldl -lutil -lm -lpython2.7 -Xlinker -export-dynamic

.//preflight- check .sh

cp sql/multicorn.sql sql/multicorn --1.3.5.sql

[root@rtm2 multicorn-1.3.5]# make install

Python version is 2.7

...

安装pg-es-fdw-master

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

[root@rtm2 multicorn-1.3.5]# cd pg-es-fdw-master

[root@rtm2 pg-es-fdw-master]# ls

demo.sh dite LICENSE README.md setup.py

[root@rtm2 pg-es-fdw-master]# python setup.py build

running build

running build_py

creating build

creating build/lib

creating build/lib/dite

copying dite/__init__.py -> build/lib/dite

[root@rtm2 pg-es-fdw-master]# python setup.py install

running install

running bdist_egg

running egg_info

creating dite.egg-info

writing dite.egg-info/PKG-INFO

安装插件 multicorn

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

[postgres@rtm2 ~]$ psql

psql (10.3)

Type "help" for help.

postgres=# select * from pg_extension;

  extname | extowner | extnamespace | extrelocatable | extversion | extconfig | extcondition

---------+----------+--------------+----------------+------------+-----------+--------------

  plpgsql |  10 |   11 | f    | 1.0  |   |

(1 row)

postgres=# CREATE EXTENSION multicorn;

CREATE EXTENSION

postgres=# psql

postgres=# select * from pg_extension;

  extname | extowner | extnamespace | extrelocatable | extversion | extconfig | extcondition

-----------+----------+--------------+----------------+------------+-----------+--------------

  plpgsql |  10 |   11 | f    | 1.0  |   |

  multicorn |  10 |   2200 | t    | 1.3.5  |   |

(2 rows )

postgres=# CREATE SERVER multicorn_es FOREIGN DATA WRAPPER multicorn OPTIONS(wrapper 'dite.ElasticsearchFDW' );

CREATE SERVER

postgres=#

es

?

1

2

3

4

[root@rtm2 config]# vi elasticsearch.yml

node. name : "es-node1"

network.host: 192.168.31.121

discovery.zen.ping.unicast.hosts: [ "192.168.31.121" ]

?

1

2

3

4

5

6

7

8

9

10

[root@rtm2 config]# vi /etc/sysctl.conf

vm.max_map_count=262144

sysctl -p

[root@rtm2 config]# vi /etc/security/limits.conf

# End of file

  root soft nofile 65536

root hard nofile 65536

root soft nproc 4096

root hard nproc 4096

~

启动es

?

1

2

3

4

[root@rtm2 bin]# ls

elasticsearch  elasticsearch. in .bat elasticsearch-service-mgr.exe elasticsearch-service-x86.exe plugin.bat

elasticsearch.bat elasticsearch. in .sh elasticsearch-service-x64.exe plugin       service.bat

[root@rtm2 bin]# ./bin/elasticsearch

?

1

2

3

4

test=# CREATE FOREIGN TABLE pp_es (id bigint ,age bigint ) SERVER multicorn_es OPTIONS (host

test(# '192.168.31.121' , port '9200' , node 'es-node1' , index 'pp' );

CREATE FOREIGN TABLE

test=#

创建触发器和外部表

?

1

2

3

4

5

6

7

8

9

10

11

test=# CREATE OR REPLACE FUNCTION index_pp() RETURNS trigger AS $def$

test$# BEGIN

test$# INSERT INTO pp_es (id, age) VALU ES

test$# (NEW.id, NEW.age);

test$# RETURN NEW;

test$# END ;

test$# $def$ LANGUAGE plpgsql;

CREATE FUNCTION

test=# CREATE TRIGGER es_insert_pp AFTER INSERT ON pp FOR EACH ROW EXECUTE PROCEDURE index_pp();

CREATE TRIGGER

test=#

新增数据测试

?

1

2

3

4

5

6

7

8

test=# insert into pp (id,age) values (1,11);

INSERT 0 1

test=# select * from pp;

  id | age

----+-----

  1 | 11

(1 row)

test=#

检查es数据

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

[root@rtm2 ~]# curl 'http://192.168.31.121:9200/es-node1/_search?q=*:*&pretty'

{

  "took" : 104,

  "timed_out" : false ,

  "_shards" : {

  "total" : 5,

  "successful" : 5,

  "failed" : 0

  },

  "hits" : {

  "total" : 2,

  "max_score" : 1.0,

  "hits" : [ {

   "_index" : "es-node1" ,

   "_type" : "pp" ,

   "_id" : "1" ,

   "_score" : 1.0,

   "_source" :{ "age" : "11" }

  }, {

   "_index" : "es-node1" ,

   "_type" : "pp" ,

   "_id" : "2" ,

   "_score" : 1.0,

   "_source" :{ "age" : "22" }

  } ]

  }

}

[root@rtm2 ~]#

创建更新触发器

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

test=# CREATE OR REPLACE FUNCTION updadeIndex_pp() RETURNS trigger AS $def$

BEGIN

UPDATE pp_es SET

id = NEW.id,

age = NEW.age

where id =NEW.id;

RETURN NEW;

END ;

$def$ LANGUAGE plpgsql;

CREATE FUNCTION

test=# ^C

test=#

test=# CREATE TRIGGER es_update_pp AFTER UPDATE OF id, age ON pp FOR EACH ROW WHEN (OLD.* IS DISTINCT

test(# FROM NEW.*) EXECUTE PROCEDURE updadeIndex_pp();

CREATE TRIGGER

test=#

更新表数据

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

test=# select * from pp;

  id | age

----+-----

  1 | 11

  2 | 22

  3 | 22

(3 rows )

test=# update pp a set a.age = 33 where a.id = 3;

ERROR: column "a" of relation "pp" does not exist

LINE 1: update pp a set a.age = 33 where a.id = 3;

       ^

test=# update pp set age = 33 where id = 3;

UPDATE 1

test=# select * from pp;

  id | age

----+-----

  1 | 11

  2 | 22

  3 | 33

(3 rows )

test=#

es查询变更

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

[root@rtm2 ~]# curl 'http://192.168.31.121:9200/es-node1/_search?q=*:*&pretty'

{

  "took" : 4,

  "timed_out" : false ,

  "_shards" : {

  "total" : 5,

  "successful" : 5,

  "failed" : 0

  },

  "hits" : {

  "total" : 3,

  "max_score" : 1.0,

  "hits" : [ {

   "_index" : "es-node1" ,

   "_type" : "pp" ,

   "_id" : "1" ,

   "_score" : 1.0,

   "_source" :{ "age" : "11" }

  }, {

   "_index" : "es-node1" ,

   "_type" : "pp" ,

   "_id" : "2" ,

   "_score" : 1.0,

   "_source" :{ "age" : "22" }

  }, {

   "_index" : "es-node1" ,

   "_type" : "pp" ,

   "_id" : "3" ,

   "_score" : 1.0,

   "_source" :{ "age" : "33" }

  } ]

  }

}

[root@rtm2 ~]#

补充:logstash同步pgsql数据到Elasticsearch

一、对于logstash的配置我就不在多说,主要是三部分,input、filter、output的配置

二、配置步骤

1、input配置

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

input {

  stdin {

  }

  jdbc {

   jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/world"

   jdbc_user => "postgres"

   jdbc_password => "zhang123"

   jdbc_driver_library => "D:\logstash-6.4.0\bin\pgsql\postgresql-42.2.5.jar"

   jdbc_driver_class => "org.postgresql.Driver"

   jdbc_paging_enabled => "true"

   jdbc_page_size => "300000"

   use_column_value => "true"

   tracking_column => "id"

   statement_filepath => "D:\logstash-6.4.0\bin\pgsql\jdbc.sql"

  schedule => "* * * * *"

  type => "jdbc"

  jdbc_default_timezone => "Asia/Shanghai"

  }

}

2、filter配置

?

1

2

3

4

5

6

filter {

  json {

   source => "message"

   remove_field => [ "message" ]

  }

}

3、output 配置,就是elasticsearch的基本配置

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

output {

  elasticsearch {

   hosts => [ "localhost:9200" ]

   index => "test_out"

  template => "D:\logstash-6.4.0\bin\pgsql\es-template.json"

  template_name => "t-statistic-out-logstash"

  template_overwrite => true

  document_type => "out"

   document_id => "%{id}"

  }

  stdout {

   codec => json_lines

  }

}

以上就是整个logstash 的jdbc.conf

4、es-template.json的配置

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

{

  "template" : "t-statistis-out-template" ,

  "order" :1,

  "settings" : {

    "index" : {

     "refresh_interval" : "5s"

    }

   },

  "mappings" : {

    "_default_" : {

  "_all" : { "enabled" : false },

     "dynamic_templates" : [

      {

     "message_field" : {

     "match" : "message" ,

     "match_mapping_type" : "string" ,

     "mapping" : { "type" : "string" , "index" : "not_analyzed" }

     }

    }, {

     "string_fields" : {

     "match" : "*" ,

     "match_mapping_type" : "string" ,

     "mapping" : { "type" : "string" , "index" : "not_analyzed" }

     }

    }

     ],

     "properties" : {

      "@timestamp" : {

       "type" : "date"

      },

      "@version" : {

       "type" : "keyword"

      },    

   "id" : {

       "type" : "keyword"

      },

   "name" : {

       "type" : "keyword"

      },

   "pp" : {

       "type" : "keyword"

      } 

     }

    }

   },

   "aliases" : {}

 

}

最后就是就是下载好pgsql的连接驱动,这个官网可以下载;配置好自己的数据库表格的数据

启动命令:进入到logstash的bin目录下,自己的logstash配置都是放在bin的pgsql这个目录下面(这个自己随意创建位置都可以)

?

1

logstash.bat -f ./pgsql/jdbc.conf

以上为个人经验,希望能给大家一个参考,也希望大家多多支持服务器之家。如有错误或未考虑完全的地方,望不吝赐教。

原文链接:https://blog.csdn.net/yaoqiancuo3276/article/details/80612090

查看更多关于PostgreSQL 数据同步到ES 搭建操作的详细内容...

  阅读:45次