java 连接kafka demo程序
以下是程序
1import java.util.Properties;
2import java.util.Date;
3import java.text.SimpleDateFormat;
4import java.util.concurrent.ExecutionException;
5
6import org.apache.kafka.clients.CommonClientConfigs;
7import org.apache.kafka.clients.producer.KafkaProducer;
8import org.apache.kafka.clients.producer.ProducerConfig;
9import org.apache.kafka.clients.producer.ProducerRecord;
10import org.apache.kafka.clients.producer.RecordMetadata;
11import org.apache.kafka.common.config.SaslConfigs;
12import org.apache.kafka.common.config.SslConfigs;
13import org.apache.kafka.common.serialization.StringSerializer;
14
15public class MyProducer_9093 {
16 public static String topic = "ssl-test";
17 public static String bootstrap_server = "test.ifmx.cc:9093";
18 public static String truststore_jks = "M:\\mm\\ssl_kafka\\src\\main\\java\\kafka_uat_client_truststore.jks";
19
20 public static void main(String[] args) throws InterruptedException {
21 Properties p = new Properties();
22 p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server);
23 p.put(ProducerConfig.ACKS_CONFIG, "1");
24 p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
25 p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
26
27 p.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
28 p.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststore_jks);
29 p.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "FlashKafkaSSL212");
30 p.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
31 p.put(SaslConfigs.SASL_JAAS_CONFIG,
32 "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"multi-redis-sync\" password=\"pangoo.redis@sync1\";");
33 p.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
34
35
36// p.put("security.protocol","SASL_SSL");
37// p.put("ssl.truststore.location", truststore_jks);
38// p.put("ssl.truststore.password","FlashKafkaSSL212");
39// p.put("sasl.mechanism","SCRAM-SHA-512");
40// p.put("ssl.endpoint.identification.algorithm","");
41// p.put("sasl.jaas.config",
42// "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"multi-redis-sync\" password=\"pangoo.redis@sync1\";");
43
44 try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p)) {
45 for (int x = 0; x < 10; x = x + 1) {
46 Date dNow = new Date();
47 SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
48 String msg = x + " Hello, SASL_SSL Producer message." + x + " time: " + ft.format(dNow);
49
50 ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
51 RecordMetadata metadata = kafkaProducer.send(record).get();
52
53 System.out.printf("Successful\tTopic: %s, Partition: %d, Offset %d%n",metadata.topic(), metadata.partition(), metadata.offset());
54 Thread.sleep(50);
55 }
56 } catch (ExecutionException e) {
57 throw new RuntimeException(e);
58 }
59 }
60}
python语言
1#!/usr/bin/env python3
2# --* coding: utf-8 *--
3
4import urllib3
5import json
6import logging
7from time import sleep
8
9# 定义elasticsearch 主机和端口
10host = "10.138.4.41"
11port = "9200"
12
13# 定义日志文件路径,请尽可能的使用绝对路径
14log_file = "unassigned.log"
15
16logger = logging.getLogger()
17logger.setLevel(logging.DEBUG)
18formatter = logging.Formatter("%(asctime)s - %(filename)s [line:%(lineno)d] %(levelname)s : %(message)s")
19
20# 配置日志级别,这里为DEBUG,定义日志写到文件,文件为上面的 log_file 变量
21fh = logging.FileHandler(log_file, mode='a')
22fh.setLevel(logging.DEBUG)
23fh.setFormatter(formatter)
24logger.addHandler(fh)
25
26# 除了把日志写入到文件,同时也打印到控制台,即 标准输出(stdout),这里级别也是 DEBUG
27ch = logging.StreamHandler()
28ch.setLevel(logging.DEBUG)
29ch.setFormatter(formatter)
30logger.addHandler(ch)
31
32
33# 由于每个函数都会用到的 HTTP 对象,所以在这里直接初始化一次
34http = urllib3.PoolManager()
35
36
37def Get_Status(host, port):
38 url = host + ":" + port + "/_cluster/health"
39 r = http.request('GET', url)
40 index_status = json.loads(r.data.decode('utf-8'))
41 return index_status["status"]
42
43
44def Red_Index_name(host, port):
45 url = host + ":" + port + "/_cat/shards"
46 r = http.request('GET', url)
47 result_str = r.data.decode('utf-8')
48 result_list = result_str.split("\n")[0:-1]
49
50 # 获取所有索引的分片信息,找出未分配分片的索引名称,并添加到 list,
51 # 使用 list,兼容存在多个索引出现未分配分片的情况
52 unassigned_list = []
53 for i in result_list:
54 if "UNASSIGNED" in i:
55 unassigned_list.append(i.split()[0])
56 return unassigned_list
57
58
59def Node_Name(host, port, index_name):
60 result_dict = {}
61 node_list = ['node01', 'node02', 'node03', 'node04', 'node05', 'node06']
62 url = host + ":" + port + "/_cat/shards/" + index_name
63 r = http.request('GET', url)
64 node_str = r.data.decode('utf-8')
65
66 # 由于是6个节点,索引只有一个分片不被分配,
67 # 找出那一个分片原本应该属于的节点,以及该分片的id, 添加到字典,后续提交数据
68 for node in node_list:
69 if not node in node_str:
70 result_dict['nodename'] = node
71 break
72 format_str = node_str.split("\n")[0:-1]
73
74 for share_id in format_str:
75 if "UNASSIGNED" in share_id:
76 result_dict['id'] = share_id.split()[1]
77 break
78 return result_dict
79
80
81def exec_unassigned(host, port, index_name, shard_data):
82 summit_data = {
83 'commands': [
84 {
85 'allocate': {
86 'index': index_name,
87 'shard': shard_data['id'],
88 'node': shard_data['nodename'],
89 'allow_primary': True
90 }
91 }
92 ]
93 }
94 encoded_data = json.dumps(summit_data).encode('utf-8')
95 url = host + ":" + port + "/_cluster/reroute"
96 return http.request('POST', url, body=encoded_data, headers={'Content-Type': 'application/json'})
97 # return summit_data
98
99
100if __name__ == '__main__':
101
102 # 查询集群状态,若是 red 的状态,才继续查找未分配的分片信息,提交重新分配
103 if Get_Status(host, port) == "red":
104 index_name = Red_Index_name(host, port)
105 # if len(index_name) > 1:
106 for x in index_name:
107 shard_data = Node_Name(host, port, x)
108 # print(x, shard_data)
109 r = exec_unassigned(host, port, x, shard_data)
110 if r.status == 200:
111 logger.info("Success! Server Return: " +
112 r.data.decode('utf-8'))
113 else:
114 logger.error("Failed! Server Return: " +
115 r.data.decode('utf-8'))
116 sleep(2)
117 else:
118 pass
119
120# curl -X POST -H 'Content-Type': 'application/json' -d '{"commands":[{"move":{"index":"t-data","shard":0,"from_node":"node02","to_node":"node03"}}]}' http://10.138.4.42:9200/_cluster/reroute
html语言
1<!DOCTYPE HTML>
2<html>
3<meta charset="utf-8">
4
5<head>
6 <title>JD_COOKIE</title>
7 <link rel="shortcut icon" href="/favicon.ico">
8</head>
9<style type="text/css">
10 body {
11 text-align: center;
12 }
13
14 .div {
15 margin: 0 auto;
16 width: 600px;
17 height: 280px;
18 padding: 2px;
19 }
20
21 .inp {
22 width: 75%;
23 }
24</style>
25
26<body>
27 <div class="div">
28 <h2>表哥COOKIE自助更新</h2>
29 请仅仅填入 pt_pin 和 pt_key 的值<br />
30 <font color="red">不要包含其他字符, 比如 空格 分号等</font><br />
31 <a href="/help.html" target="_blank">点此查看帮助</a>
32 <hr />
33 <form action="/update" method="get">
34 <b>pt_pin:</b><br />
35 <input class="inp" type="text" name="pt_pin" required="required"
36 onkeyup="this.value=this.value.replace(/(\s+|;)/g,'')" /><br />
37 <b>pt_key</b><br />
38 <input class="inp" type="text" name="pt_key" required="required"
39 onkeyup="this.value=this.value.replace(/(\s+|;)/g,'')" /><br />
40 <br />
41 <input type="submit" value="Update" />
42 </form>
43 </div>
44</body>
45
46</html>
js 语言
1function initializeSearch(index) {
2 const searchKeys = ['title', 'link', 'body', 'id', 'section', 'tags'];
3
4 const searchPageElement = elem('#searchpage');
5
6 const searchOptions = {
7 ignoreLocation: true,
8 findAllMatches: true,
9 includeScore: true,
10 shouldSort: true,
11 keys: searchKeys,
12 threshold: 0.0
13 };
14
15 index = new Fuse(index, searchOptions);
16
17 function minQueryLen(query) {
18 query = query.trim();
19 const queryIsFloat = parseFloat(query);
20 const minimumQueryLength = queryIsFloat ? 1 : 2;
21 return minimumQueryLength;
22 }
23
24 function searchResults(results=[], query="", passive = false) {
25 let resultsFragment = new DocumentFragment();
26 let showResults = elem('.search_results');
27 if(passive) {
28 showResults = searchPageElement;
29 }
30 emptyEl(showResults);
31
32 const queryLen = query.length;
33 const requiredQueryLen = minQueryLen(query);
34
35 if(results.length && queryLen >= requiredQueryLen) {
36 let resultsTitle = createEl('h3');
37 resultsTitle.className = 'search_title';
38 resultsTitle.innerText = quickLinks;
39
40 let goBackButton = createEl('button');
41 goBackButton.textContent = 'Go Back';
42 goBackButton.className = goBackClass;
43 if(passive) {
44 resultsTitle.innerText = searchResultsLabel;
45 }
46 if(!searchPageElement) {
47 results = results.slice(0,8);
48 } else {
49 resultsFragment.appendChild(goBackButton);
50 results = results.slice(0,12);
51 }
52 resultsFragment.appendChild(resultsTitle);
53
54 results.forEach(function(result){
55 let item = createEl('a');
56 item.href = `${result.link}?query=${query}`;
57 item.className = 'search_result';
58 item.style.order = result.score;
59 if(passive) {
60 pushClass(item, 'passive');
61 let itemTitle = createEl('h3');
62 itemTitle.textContent = result.title;
63 item.appendChild(itemTitle);
64
65 let itemDescription = createEl('p');
66 // position of first search term instance
67 let queryInstance = result.body.indexOf(query);
68 itemDescription.textContent = `${result.body.substring(queryInstance, queryInstance + 200)}`;
69 item.appendChild(itemDescription);
70 } else {
71 item.textContent = result.title;
72 }
73 resultsFragment.appendChild(item);
74 });
75 }
76
77 if(queryLen >= requiredQueryLen) {
78 if (!results.length) {
79 showResults.innerHTML = `<span class="search_result">${noMatchesFound}</span>`;
80 }
81 } else {
82 showResults.innerHTML = `<label for="find" class="search_result">${ queryLen > 1 ? shortSearchQuery : typeToSearch }</label>`
83 }
84
85 showResults.appendChild(resultsFragment);
86 }
87
88 function search(searchTerm, scope = null, passive = false) {
89 if(searchTerm.length) {
90 let rawResults = index.search(searchTerm);
91 rawResults = rawResults.map(function(result){
92 const score = result.score;
93 const resultItem = result.item;
94 resultItem.score = (parseFloat(score) * 50).toFixed(0);
95 return resultItem ;
96 })
97
98 if(scope) {
99 rawResults = rawResults.filter(resultItem => {
100 return resultItem.section == scope;
101 });
102 }
103
104 passive ? searchResults(rawResults, searchTerm, true) : searchResults(rawResults, searchTerm);
105
106 } else {
107 passive ? searchResults([], "", true) : searchResults();
108 }
109 }
110
111 function liveSearch() {
112 const searchField = elem(searchFieldClass);
113
114 if (searchField) {
115 const searchScope = searchField.dataset.scope;
116 searchField.addEventListener('input', function() {
117 const searchTerm = searchField.value.trim().toLowerCase();
118 search(searchTerm, searchScope);
119 });
120
121 if(!searchPageElement) {
122 searchField.addEventListener('search', function(){
123 const searchTerm = searchField.value.trim().toLowerCase();
124 if(searchTerm.length) {
125 const scopeParameter = searchScope ? `&scope=${searchScope}` : '';
126 window.location.href = new URL(baseURL + `search/?query=${searchTerm}${ scopeParameter }`).href;
127 }
128 });
129 }
130 }
131 }
132
133 function passiveSearch() {
134 if(searchPageElement) {
135 const searchTerm = findQuery();
136 const searchScope = findQuery('scope');
137 // search actively after search page has loaded
138 const searchField = elem(searchFieldClass);
139
140 search(searchTerm, searchScope, true);
141
142 if(searchField) {
143 searchField.addEventListener('input', function() {
144 const searchTerm = searchField.value.trim().toLowerCase();
145 search(searchTerm, true);
146 wrapText(searchTerm, main);
147 });
148 }
149 }
150 }
151
152 function hasSearchResults() {
153 const searchResults = elem('.results');
154 if(searchResults) {
155 const body = searchResults.innerHTML.length;
156 return [searchResults, body];
157 }
158 return false
159 }
160
161 function clearSearchResults() {
162 let searchResults = hasSearchResults();
163 if(searchResults) {
164 searchResults = searchResults[0];
165 searchResults.innerHTML = "";
166 // clear search field
167 const searchField = elem(searchFieldClass);
168 searchField.value = "";
169 }
170 }
171
172 function onEscape(fn){
173 window.addEventListener('keydown', function(event){
174 if(event.code === "Escape") {
175 fn();
176 }
177 });
178 }
179
180 let main = elem('main');
181 if(!main) {
182 main = elem('.main');
183 }
184
185 searchPageElement ? false : liveSearch();
186 passiveSearch();
187
188 highlightSearchTerms(findQuery(), '.post_body', 'mark', 'search-term');
189
190 onEscape(clearSearchResults);
191
192 window.addEventListener('click', function(event){
193 const target = event.target;
194 const isSearch = target.closest(searchClass) || target.matches(searchClass);
195 if(!isSearch && !searchPageElement) {
196 clearSearchResults();
197 }
198 });
199}
shell脚本
1
2
3curl -X PUT http://10.47.56.10:9200/_cluster/settings?pretty -H 'Content-Type: application/json' -d'{"transient": {"cluster.routing.allocation.enable": "none"}}'
4
5
6curl -X PUT http://10.47.56.10:9200/_cluster/settings?pretty -H 'Content-Type: application/json' -d'{"transient": {"cluster.routing.allocation.enable": "all"}}'
7
8ansible -i shaoew.ip.txt shaoew -m copy -a 'src=log4j-1.2-api-2.16.0.jar dest=/tmp/log4j-1.2-api-2.16.0.jar mode=0644'
9ansible -i shaoew.ip.txt shaoew -m copy -a 'src=log4j-api-2.16.0.jar dest=/tmp/log4j-api-2.16.0.jar mode=0644'
10ansible -i shaoew.ip.txt shaoew -m copy -a 'src=log4j-core-2.16.0.jar dest=/tmp/log4j-core-2.16.0.jar mode=0644'
11
12mv log4j-1.2-api-2.17.0.jar log4j-1.2-api-2.17.0.jar.bak
13mv log4j-api-2.17.0.jar log4j-api-2.17.0.jar.bak
14mv log4j-core-2.17.0.jar log4j-core-2.17.0.jar.bak
15cp /tmp/log4j-*.1.jar .
16mv log4j-*.jar.bak ~
17
18
19
20
21mv log4j-1.2-api-2.9.1.jar log4j-1.2-api-2.9.1.jar.bak
22mv log4j-api-2.9.1.jar log4j-api-2.9.1.jar.bak
23mv log4j-core-2.9.1.jar log4j-core-2.9.1.jar.bak
24cp /tmp/log4j* .
25
26mv log4j-1.2-api-2.8.2.jar log4j-1.2-api-2.8.2.jar.bak
27mv log4j-api-2.8.2.jar log4j-api-2.8.2.jar.bak
28mv log4j-core-2.8.2.jar log4j-core-2.8.2.jar.bak
29cp /tmp/log4j* .
30
31
32mv log4j-1.2-api-2.16.0.jar log4j-1.2-api-2.16.0.jar.bak
33mv log4j-api-2.16.0.jar log4j-api-2.16.0.jar.bak
34mv log4j-core-2.16.0.jar log4j-core-2.16.0.jar.bak
35cp /tmp/log4j-*2.17.0.jar .
36rm -f log4j-1.2-api-2.17.0.jar
37
38
39[nec-es]
4010.138.4.37
4110.138.4.38
4210.138.4.39
4310.138.8.8
44
45nec-promotionadmin-web|nec-promotionweb-web
46
47
48NEC-Dev 调整为0的 deployment
49nec-message-service
50
51
52rm -f log4j-[ac]*0.jar
53cp /tmp/log4j-[ac]*1.jar .
54
55
56for i in $(kubectl get pod -n uat | awk 'NR>1{print $1}'); do echo $i $(kubectl exec -it $i -n uat -- find /data -name 'log4j-core*.jar');done 2>/dev/null | tee nec-uat-pod.log
57
58
59for i in $(kubectl get pod -n product | awk 'NR>1{print $1}'); do echo $i $(kubectl exec -it $i -n product -- find /data -name 'log4j-core*.jar');done 2>/dev/null | tee nec-prod-pod.log
60
61for i in $(kubectl get pod -n dev-jp | awk 'NR>1{print $1}'); do echo $i $(kubectl exec -it $i -n dev-jp -- find /data -name 'log4j-core*.jar');done 2>/dev/null | tee nec-dev-pod.log
62
63
64for i in $(kubectl get pod -n uat | awk 'NR>1{print $1}'); do echo "$i $(kubect exec -it $i -n uat -- ls -la /proc/*/fd/ 2>/dev/null|grep 'log4j-core' | awk '{print $NF}')";done 2>/dev/null | tee nec-pod.log
65
66ls -la /proc/*/fd/ 2>/dev/null|grep -E "log4j-core" | awk '{print $NF}'
67
68echo $i, $(kubectl exec -it $i -n uat -- find /data -name 'log4j-core*.jar')
69
70/api/v1/notify/getVideoId
71
72
73fio -ioengine=libaio -bs=4k -direct=1 -thread -rw=randwrite -filename=/data/test1 -name="BS 4KB write test" -iodepth=16 -runtime=60
74
75sudo fio --name=randwrite --ioengine=libaio --iodepth=8 --rw=randwrite --bs=4k --direct=1 --size=512M --numjobs=2 --runtime=240 --group_reporting --filename=/data/test1
76
77
78
79proxy_set_header X-Real-IP $remote_addr;
80proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
81
82docker 节点
8310.96.83.185 nexus
84
85
86curl -X PUT http://10.120.131.166:9200/orderdiscount/_settings -d '{"number_of_replicas":0}'
87
88while read line;do ssh -o ConnectTimeout=5 -n $line "free -h | awk 'NR==2{print $2}'"; done < s.prod.ip.txt
89
90while read line;do ssh -o ConnectTimeout=5 -n $line "lscpu | awk '/^CPU\(s\)/'"; done < s.pre.ip.txt
91
92
93proxy_set_header X-Real-IP $remote_addr;
94proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
95
9610.138.4.37
9710.138.4.38
9810.138.4.39
9910.138.4.31
10010.138.4.32
10110.138.8.9
10210.138.8.8
103
104mv log4j-1.2-api-2.17.0.jar log4j-1.2-api-2.17.0.jar.bak
105mv log4j-api-2.17.0.jar log4j-api-2.17.0.jar.bak
106mv log4j-core-2.17.0.jar log4j-core-2.17.0.jar.bak
107cp /tmp/log4j-*.jar .
108mv log4j-*.jar.bak ~
109
110
111
112
113for i in $(kubectl get pod -n product | awk '{print $1}' | sed '1d' ); do echo "$i, $(kubectl exec -it $i -n product -- find /data/ -name '*log4j-core-*.jar' )"; echo ""; done