java 连接kafka demo程序

以下是程序

 1import java.util.Properties;
 2import java.util.Date;
 3import java.text.SimpleDateFormat;
 4import java.util.concurrent.ExecutionException;
 5
 6import org.apache.kafka.clients.CommonClientConfigs;
 7import org.apache.kafka.clients.producer.KafkaProducer;
 8import org.apache.kafka.clients.producer.ProducerConfig;
 9import org.apache.kafka.clients.producer.ProducerRecord;
10import org.apache.kafka.clients.producer.RecordMetadata;
11import org.apache.kafka.common.config.SaslConfigs;
12import org.apache.kafka.common.config.SslConfigs;
13import org.apache.kafka.common.serialization.StringSerializer;
14
15public class MyProducer_9093 {
16    public static String topic = "ssl-test";
17    public static String bootstrap_server = "test.ifmx.cc:9093";
18    public static String truststore_jks = "M:\\mm\\ssl_kafka\\src\\main\\java\\kafka_uat_client_truststore.jks";
19
20    public static void main(String[] args) throws InterruptedException {
21        Properties p = new Properties();
22        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server);
23        p.put(ProducerConfig.ACKS_CONFIG, "1");
24        p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
25        p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
26
27        p.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
28        p.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststore_jks);
29        p.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "FlashKafkaSSL212");
30        p.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
31        p.put(SaslConfigs.SASL_JAAS_CONFIG,
32                "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"multi-redis-sync\" password=\"pangoo.redis@sync1\";");
33        p.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
34
35
36//        p.put("security.protocol","SASL_SSL");
37//        p.put("ssl.truststore.location", truststore_jks);
38//        p.put("ssl.truststore.password","FlashKafkaSSL212");
39//        p.put("sasl.mechanism","SCRAM-SHA-512");
40//        p.put("ssl.endpoint.identification.algorithm","");
41//        p.put("sasl.jaas.config",
42//                "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"multi-redis-sync\" password=\"pangoo.redis@sync1\";");
43
44        try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p)) {
45            for (int x = 0; x < 10; x = x + 1) {
46                Date dNow = new Date();
47                SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
48                String msg = x + " Hello, SASL_SSL Producer message." + x + " time: " + ft.format(dNow);
49
50                ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
51                RecordMetadata metadata = kafkaProducer.send(record).get();
52
53                System.out.printf("Successful\tTopic: %s, Partition: %d, Offset %d%n",metadata.topic(), metadata.partition(), metadata.offset());
54                Thread.sleep(50);
55            }
56        } catch (ExecutionException e) {
57            throw new RuntimeException(e);
58        }
59    }
60}

python语言

  1#!/usr/bin/env python3
  2# --* coding: utf-8 *--
  3
  4import urllib3
  5import json
  6import logging
  7from time import sleep
  8
  9# 定义elasticsearch 主机和端口
 10host = "10.138.4.41"
 11port = "9200"
 12
 13# 定义日志文件路径,请尽可能的使用绝对路径
 14log_file = "unassigned.log"
 15
 16logger = logging.getLogger()
 17logger.setLevel(logging.DEBUG)
 18formatter = logging.Formatter("%(asctime)s - %(filename)s [line:%(lineno)d] %(levelname)s : %(message)s")
 19
 20# 配置日志级别,这里为DEBUG,定义日志写到文件,文件为上面的 log_file 变量
 21fh = logging.FileHandler(log_file, mode='a')
 22fh.setLevel(logging.DEBUG)
 23fh.setFormatter(formatter)
 24logger.addHandler(fh)
 25
 26# 除了把日志写入到文件,同时也打印到控制台,即 标准输出(stdout),这里级别也是 DEBUG
 27ch = logging.StreamHandler()
 28ch.setLevel(logging.DEBUG)
 29ch.setFormatter(formatter)
 30logger.addHandler(ch) 
 31
 32
 33# 由于每个函数都会用到的 HTTP 对象,所以在这里直接初始化一次
 34http = urllib3.PoolManager()
 35
 36
 37def Get_Status(host, port):
 38    url = host + ":" + port + "/_cluster/health"
 39    r = http.request('GET', url)
 40    index_status = json.loads(r.data.decode('utf-8'))
 41    return index_status["status"]
 42
 43
 44def Red_Index_name(host, port):
 45    url = host + ":" + port + "/_cat/shards"
 46    r = http.request('GET', url)
 47    result_str = r.data.decode('utf-8')
 48    result_list = result_str.split("\n")[0:-1]
 49
 50    # 获取所有索引的分片信息,找出未分配分片的索引名称,并添加到 list,
 51    # 使用 list,兼容存在多个索引出现未分配分片的情况
 52    unassigned_list = []
 53    for i in result_list:
 54        if "UNASSIGNED" in i:
 55            unassigned_list.append(i.split()[0])
 56    return unassigned_list
 57
 58
 59def Node_Name(host, port, index_name):
 60    result_dict = {}
 61    node_list = ['node01', 'node02', 'node03', 'node04', 'node05', 'node06']
 62    url = host + ":" + port + "/_cat/shards/" + index_name
 63    r = http.request('GET', url)
 64    node_str = r.data.decode('utf-8')
 65
 66    # 由于是6个节点,索引只有一个分片不被分配,
 67    # 找出那一个分片原本应该属于的节点,以及该分片的id, 添加到字典,后续提交数据
 68    for node in node_list:
 69        if not node in node_str:
 70            result_dict['nodename'] = node
 71            break
 72    format_str = node_str.split("\n")[0:-1]
 73
 74    for share_id in format_str:
 75        if "UNASSIGNED" in share_id:
 76            result_dict['id'] = share_id.split()[1]
 77            break
 78    return result_dict
 79
 80
 81def exec_unassigned(host, port, index_name, shard_data):
 82    summit_data = {
 83        'commands': [
 84            {
 85                'allocate': {
 86                    'index': index_name,
 87                    'shard': shard_data['id'],
 88                    'node': shard_data['nodename'],
 89                    'allow_primary': True
 90                }
 91            }
 92        ]
 93    }
 94    encoded_data = json.dumps(summit_data).encode('utf-8')
 95    url = host + ":" + port + "/_cluster/reroute"
 96    return http.request('POST', url, body=encoded_data, headers={'Content-Type': 'application/json'})
 97    # return summit_data
 98
 99
100if __name__ == '__main__':
101
102    # 查询集群状态,若是 red 的状态,才继续查找未分配的分片信息,提交重新分配
103    if Get_Status(host, port) == "red":
104        index_name = Red_Index_name(host, port)
105        # if len(index_name) > 1:
106        for x in index_name:
107            shard_data = Node_Name(host, port, x)
108            # print(x, shard_data)
109            r = exec_unassigned(host, port, x, shard_data)
110            if r.status == 200:
111                logger.info("Success!  Server Return: " +
112                            r.data.decode('utf-8'))
113            else:
114                logger.error("Failed! Server Return: " +
115                             r.data.decode('utf-8'))
116            sleep(2)
117    else:
118        pass
119
120# curl -X POST -H 'Content-Type': 'application/json' -d '{"commands":[{"move":{"index":"t-data","shard":0,"from_node":"node02","to_node":"node03"}}]}' http://10.138.4.42:9200/_cluster/reroute

html语言

 1<!DOCTYPE HTML>
 2<html>
 3<meta charset="utf-8">
 4
 5<head>
 6    <title>JD_COOKIE</title>
 7    <link rel="shortcut icon" href="/favicon.ico">
 8</head>
 9<style type="text/css">
10    body {
11        text-align: center;
12    }
13
14    .div {
15        margin: 0 auto;
16        width: 600px;
17        height: 280px;
18        padding: 2px;
19    }
20
21    .inp {
22        width: 75%;
23    }
24</style>
25
26<body>
27    <div class="div">
28        <h2>表哥COOKIE自助更新</h2>
29        请仅仅填入 pt_pin 和 pt_key 的值<br />
30        <font color="red">不要包含其他字符, 比如 空格 分号等</font><br />
31        <a href="/help.html" target="_blank">点此查看帮助</a>
32        <hr />
33        <form action="/update" method="get">
34            <b>pt_pin:</b><br />
35            <input class="inp" type="text" name="pt_pin" required="required"
36                onkeyup="this.value=this.value.replace(/(\s+|;)/g,'')" /><br />
37            <b>pt_key</b><br />
38            <input class="inp" type="text" name="pt_key" required="required"
39                onkeyup="this.value=this.value.replace(/(\s+|;)/g,'')" /><br />
40            <br />
41            <input type="submit" value="Update" />
42        </form>
43    </div>
44</body>
45
46</html>

js 语言

  1function initializeSearch(index) {
  2  const searchKeys = ['title', 'link', 'body', 'id', 'section', 'tags'];
  3
  4  const searchPageElement = elem('#searchpage');
  5
  6  const searchOptions = {
  7    ignoreLocation: true,
  8    findAllMatches: true,
  9    includeScore: true,
 10    shouldSort: true,
 11    keys: searchKeys,
 12    threshold: 0.0
 13  };
 14
 15  index = new Fuse(index, searchOptions);
 16
 17  function minQueryLen(query) {
 18    query = query.trim();
 19    const queryIsFloat = parseFloat(query);
 20    const minimumQueryLength = queryIsFloat ? 1 : 2;
 21    return minimumQueryLength;
 22  }
 23
 24  function searchResults(results=[], query="", passive = false) {
 25    let resultsFragment = new DocumentFragment();
 26    let showResults = elem('.search_results');
 27    if(passive) {
 28      showResults = searchPageElement;
 29    }
 30    emptyEl(showResults);
 31
 32    const queryLen = query.length;
 33    const requiredQueryLen = minQueryLen(query);
 34
 35    if(results.length && queryLen >= requiredQueryLen) {
 36      let resultsTitle = createEl('h3');
 37      resultsTitle.className = 'search_title';
 38      resultsTitle.innerText = quickLinks;
 39
 40      let goBackButton = createEl('button');
 41      goBackButton.textContent = 'Go Back';
 42      goBackButton.className = goBackClass;
 43      if(passive) {
 44        resultsTitle.innerText = searchResultsLabel;
 45      }
 46      if(!searchPageElement) {
 47        results = results.slice(0,8);
 48      } else {
 49        resultsFragment.appendChild(goBackButton);
 50        results = results.slice(0,12);
 51      }
 52      resultsFragment.appendChild(resultsTitle);
 53
 54      results.forEach(function(result){
 55        let item = createEl('a');
 56        item.href = `${result.link}?query=${query}`;
 57        item.className = 'search_result';
 58        item.style.order = result.score;
 59        if(passive) {
 60          pushClass(item, 'passive');
 61          let itemTitle = createEl('h3');
 62          itemTitle.textContent = result.title;
 63          item.appendChild(itemTitle);
 64
 65          let itemDescription = createEl('p');
 66          // position of first search term instance
 67          let queryInstance = result.body.indexOf(query);
 68          itemDescription.textContent = `${result.body.substring(queryInstance, queryInstance + 200)}`;
 69          item.appendChild(itemDescription);
 70        } else {
 71          item.textContent = result.title;
 72        }
 73        resultsFragment.appendChild(item);
 74      });
 75    }
 76
 77    if(queryLen >= requiredQueryLen) {
 78      if (!results.length) {
 79        showResults.innerHTML = `<span class="search_result">${noMatchesFound}</span>`;
 80      }
 81    } else {
 82      showResults.innerHTML = `<label for="find" class="search_result">${ queryLen > 1 ? shortSearchQuery : typeToSearch }</label>`
 83    }
 84
 85    showResults.appendChild(resultsFragment);
 86  }
 87
 88  function search(searchTerm, scope = null, passive = false) {
 89    if(searchTerm.length) {
 90      let rawResults = index.search(searchTerm);
 91      rawResults = rawResults.map(function(result){
 92        const score = result.score;
 93        const resultItem = result.item;
 94        resultItem.score = (parseFloat(score) * 50).toFixed(0);
 95        return resultItem ;
 96      })
 97
 98      if(scope) {
 99        rawResults = rawResults.filter(resultItem => {
100          return resultItem.section == scope;
101        });
102      }
103
104      passive ? searchResults(rawResults, searchTerm, true) : searchResults(rawResults, searchTerm);
105
106    } else {
107      passive ? searchResults([], "", true) : searchResults();
108    }
109  }
110
111  function liveSearch() {
112    const searchField = elem(searchFieldClass);
113
114    if (searchField) {
115      const searchScope = searchField.dataset.scope;
116      searchField.addEventListener('input', function() {
117        const searchTerm = searchField.value.trim().toLowerCase();
118        search(searchTerm, searchScope);
119      });
120
121      if(!searchPageElement) {
122        searchField.addEventListener('search', function(){
123          const searchTerm = searchField.value.trim().toLowerCase();
124          if(searchTerm.length)  {
125            const scopeParameter = searchScope ? `&scope=${searchScope}` : '';
126            window.location.href = new URL(baseURL + `search/?query=${searchTerm}${ scopeParameter }`).href;
127          }
128        });
129      }
130    }
131  }
132
133  function passiveSearch() {
134    if(searchPageElement) {
135      const searchTerm = findQuery();
136      const searchScope = findQuery('scope');
137      // search actively after search page has loaded
138      const searchField = elem(searchFieldClass);
139
140      search(searchTerm, searchScope, true);
141
142      if(searchField) {
143        searchField.addEventListener('input', function() {
144          const searchTerm = searchField.value.trim().toLowerCase();
145          search(searchTerm, true);
146          wrapText(searchTerm, main);
147        });
148      }
149    }
150  }
151
152  function hasSearchResults() {
153    const searchResults = elem('.results');
154    if(searchResults) {
155        const body = searchResults.innerHTML.length;
156        return [searchResults, body];
157    }
158    return false
159  }
160
161  function clearSearchResults() {
162    let searchResults = hasSearchResults();
163    if(searchResults) {
164      searchResults = searchResults[0];
165      searchResults.innerHTML = "";
166      // clear search field
167      const searchField = elem(searchFieldClass);
168      searchField.value = "";
169    }
170  }
171
172  function onEscape(fn){
173    window.addEventListener('keydown', function(event){
174      if(event.code === "Escape") {
175        fn();
176      }
177    });
178  }
179
180  let main = elem('main');
181  if(!main) {
182    main = elem('.main');
183  }
184
185  searchPageElement ? false : liveSearch();
186  passiveSearch();
187
188  highlightSearchTerms(findQuery(), '.post_body', 'mark', 'search-term');
189
190  onEscape(clearSearchResults);
191
192  window.addEventListener('click', function(event){
193    const target = event.target;
194    const isSearch = target.closest(searchClass) || target.matches(searchClass);
195    if(!isSearch && !searchPageElement) {
196      clearSearchResults();
197    }
198  });
199}

shell脚本

  1
  2
  3curl -X PUT http://10.47.56.10:9200/_cluster/settings?pretty -H 'Content-Type: application/json' -d'{"transient": {"cluster.routing.allocation.enable": "none"}}'
  4
  5
  6curl -X PUT http://10.47.56.10:9200/_cluster/settings?pretty -H 'Content-Type: application/json' -d'{"transient": {"cluster.routing.allocation.enable": "all"}}'
  7
  8ansible -i shaoew.ip.txt shaoew -m copy -a 'src=log4j-1.2-api-2.16.0.jar dest=/tmp/log4j-1.2-api-2.16.0.jar mode=0644'
  9ansible -i shaoew.ip.txt shaoew -m copy -a 'src=log4j-api-2.16.0.jar dest=/tmp/log4j-api-2.16.0.jar mode=0644'
 10ansible -i shaoew.ip.txt shaoew -m copy -a 'src=log4j-core-2.16.0.jar dest=/tmp/log4j-core-2.16.0.jar mode=0644'
 11
 12mv log4j-1.2-api-2.17.0.jar log4j-1.2-api-2.17.0.jar.bak
 13mv log4j-api-2.17.0.jar log4j-api-2.17.0.jar.bak
 14mv log4j-core-2.17.0.jar  log4j-core-2.17.0.jar.bak
 15cp /tmp/log4j-*.1.jar .
 16mv log4j-*.jar.bak ~
 17
 18
 19
 20
 21mv  log4j-1.2-api-2.9.1.jar  log4j-1.2-api-2.9.1.jar.bak
 22mv log4j-api-2.9.1.jar log4j-api-2.9.1.jar.bak
 23mv  log4j-core-2.9.1.jar  log4j-core-2.9.1.jar.bak
 24cp /tmp/log4j* .
 25
 26mv log4j-1.2-api-2.8.2.jar log4j-1.2-api-2.8.2.jar.bak
 27mv log4j-api-2.8.2.jar log4j-api-2.8.2.jar.bak
 28mv log4j-core-2.8.2.jar log4j-core-2.8.2.jar.bak
 29cp  /tmp/log4j* .
 30
 31
 32mv log4j-1.2-api-2.16.0.jar log4j-1.2-api-2.16.0.jar.bak
 33mv log4j-api-2.16.0.jar log4j-api-2.16.0.jar.bak
 34mv log4j-core-2.16.0.jar log4j-core-2.16.0.jar.bak
 35cp  /tmp/log4j-*2.17.0.jar .
 36rm -f log4j-1.2-api-2.17.0.jar
 37
 38
 39[nec-es]
 4010.138.4.37
 4110.138.4.38
 4210.138.4.39
 4310.138.8.8
 44
 45nec-promotionadmin-web|nec-promotionweb-web
 46
 47
 48NEC-Dev 调整为0的 deployment
 49nec-message-service
 50
 51
 52rm -f log4j-[ac]*0.jar
 53cp /tmp/log4j-[ac]*1.jar .
 54
 55
 56for i in $(kubectl get pod -n uat | awk 'NR>1{print $1}'); do echo $i  $(kubectl exec -it $i -n uat -- find /data -name 'log4j-core*.jar');done 2>/dev/null | tee nec-uat-pod.log
 57
 58
 59for i in $(kubectl get pod -n product | awk 'NR>1{print $1}'); do echo $i  $(kubectl exec -it $i -n product -- find /data -name 'log4j-core*.jar');done 2>/dev/null | tee nec-prod-pod.log
 60
 61for i in $(kubectl get pod -n dev-jp | awk 'NR>1{print $1}'); do echo $i  $(kubectl exec -it $i -n dev-jp -- find /data -name 'log4j-core*.jar');done 2>/dev/null | tee nec-dev-pod.log
 62
 63
 64for i in $(kubectl get pod -n uat | awk 'NR>1{print $1}'); do echo "$i $(kubect exec -it $i -n uat -- ls -la /proc/*/fd/ 2>/dev/null|grep 'log4j-core' | awk '{print $NF}')";done 2>/dev/null | tee nec-pod.log
 65
 66ls -la /proc/*/fd/ 2>/dev/null|grep -E "log4j-core" | awk '{print $NF}'
 67
 68echo $i, $(kubectl exec -it $i -n uat -- find /data -name 'log4j-core*.jar')
 69
 70/api/v1/notify/getVideoId
 71
 72
 73fio -ioengine=libaio -bs=4k -direct=1 -thread -rw=randwrite -filename=/data/test1 -name="BS 4KB write test" -iodepth=16 -runtime=60
 74
 75sudo fio --name=randwrite --ioengine=libaio --iodepth=8 --rw=randwrite --bs=4k --direct=1 --size=512M --numjobs=2 --runtime=240 --group_reporting --filename=/data/test1
 76
 77
 78
 79proxy_set_header X-Real-IP $remote_addr;
 80proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
 81
 82docker 节点 
 8310.96.83.185 nexus
 84
 85
 86curl -X PUT http://10.120.131.166:9200/orderdiscount/_settings -d '{"number_of_replicas":0}'
 87
 88while read line;do ssh -o ConnectTimeout=5 -n $line "free -h | awk 'NR==2{print $2}'"; done < s.prod.ip.txt
 89
 90while read line;do ssh -o ConnectTimeout=5 -n $line "lscpu | awk '/^CPU\(s\)/'"; done < s.pre.ip.txt
 91
 92
 93proxy_set_header X-Real-IP $remote_addr;
 94proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; 
 95
 9610.138.4.37
 9710.138.4.38
 9810.138.4.39
 9910.138.4.31
10010.138.4.32
10110.138.8.9
10210.138.8.8
103
104mv log4j-1.2-api-2.17.0.jar log4j-1.2-api-2.17.0.jar.bak
105mv log4j-api-2.17.0.jar log4j-api-2.17.0.jar.bak
106mv log4j-core-2.17.0.jar  log4j-core-2.17.0.jar.bak
107cp /tmp/log4j-*.jar .
108mv log4j-*.jar.bak ~
109
110
111
112
113for i in $(kubectl get pod -n product | awk '{print $1}' | sed '1d' ); do echo "$i, $(kubectl exec -it $i -n product -- find /data/ -name '*log4j-core-*.jar' )"; echo ""; done