
没写过 Java,心里不顶真;我看了网上的资料应该 ok,试了下也没发现什么问题;要并发的去发送一些请求,用到了连接池,我这样应该是线程安全的吧?
public class MyRequest { private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MyRequest.class); private final String REQUEST_URL; private ExecutorService executorService; private Queue<Map<String, String>> tasks; private static PoolingHttpClientConnectionManager cm; //<----- private static CloseableHttpClient httpClient; private MyRequest(int taskQueueSize, int executorCount, String requestRL, Map<String, Object> connConfig) { this.tasks = new ArrayBlockingQueue<>(taskQueueSize); this.executorService = Executors.newFixedThreadPool(executorCount); REQUEST_URL = requestURL; String proxyHost = connConfig.get("proxyHost").toString(); int proxyPort = Integer.parseInt(connConfig.get("proxyPort").toString()); cm = new PoolingHttpClientConnectionManager(); cm.setMaxTotal(Integer.parseInt(connConfig.get("maxTotal").toString())); cm.setDefaultMaxPerRoute(Integer.parseInt(connConfig.get("defaultMaxPerRoute").toString())); RequestConfig cOnfig= RequestConfig.custom() .setConnectTimeout(Integer.parseInt(connConfig.get("connectTimeout").toString())) .setSocketTimeout(Integer.parseInt(connConfig.get("socketTimeout").toString())) .setConnectionRequestTimeout(Integer.parseInt(connConfig.get("cxxRxxTxout").toString())) .build(); HttpClientBuilder httpClientBuilder = HttpClients.custom() .setConnectionManager(cm) .setDefaultRequestConfig(config); if (!proxyHost.equals("") && 0 != proxyPort) { httpClient = httpClientBuilder.setProxy(new HttpHost(proxyHost, proxyPort)).build(); } else { httpClient = httpClientBuilder.build(); } } private void addTask(Map<String, String> parameters) { tasks.offer(parameters); } private void flush() { List<Future> futures = this.tasks.stream() .map(this::delegate) .collect(Collectors.toList()); futures.forEach((f) -> { try { f.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); this.tasks.clear(); } private Future delegate(Map<String, String> parameters) { return this.executorService.submit(() -> { doRequest(parameters, REQUEST_URL); }); } private void doRequest(Map<String, String> parameters, String url) { CloseableHttpResponse resp = null; HttpGet get = null; try { URIBuilder builder = new URIBuilder(url); builder.addParameter("foo", parameters.get("bar")); get = new HttpGet(builder.build()); //<----- resp = httpClient.execute(get); //<----- if (resp.getStatusLine().getStatusCode() != 200) { LOGGER.warn("xxx"); } else { LOGGER.info("xxx"); } resp.close(); } catch (URISyntaxException e) { LOGGER.warn("xxx" + e.getMessage()); } catch (ClientProtocolException e) { LOGGER.warn("xxx" + e.getMessage()); } catch (IOException e) { LOGGER.warn("xxx" + e.getMessage()); } finally { if (resp != null) { try { resp.close(); } catch (IOException e) { e.printStackTrace(); } } } } private void shutdown() { this.executorService.shutdown(); } public static void main(String args[]) throws IOException { String logPath = System.getProperty("readLogPath"); String requestURL = System.getProperty("requestURL"); int taskQueueSize = Integer.valueOf(System.getProperty("max.requests", "2000")); int executorCount = Integer.valueOf(System.getProperty("num.executors", "100")); int interval = Integer.valueOf(System.getProperty("max.interval", "500")); String proxyHost = System.getProperty("proxyHost", ""); int proxyPort = Integer.parseInt(System.getProperty("proxyInt", "0")); int maxTotal = Integer.parseInt(System.getProperty("maxTotal", "5000")); int defaultMaxPerRoute = Integer.parseInt(System.getProperty("defaultMaxPerRoute", "1000")); int cOnnectTimeout= Integer.parseInt(System.getProperty("connectTimeout", "1000")); int socketTimeout = Integer.parseInt(System.getProperty("socketTimeout", "3000")); int cOnnectionRequestTimeout= Integer.parseInt(System.getProperty("connectionRequestTimeout", "3000")); Map<String, Object> cOnnConfig= new HashMap<>(); connConfig.put("proxyHost", proxyHost); connConfig.put("proxyPort", proxyPort); connConfig.put("maxTotal", maxTotal); connConfig.put("defaultMaxPerRoute", defaultMaxPerRoute); connConfig.put("connectTimeout", connectTimeout); connConfig.put("socketTimeout", socketTimeout); connConfig.put("connectionRequestTimeout", connectionRequestTimeout); try { MyRequest syncLog = new MyRequest(taskQueueSize, executorCount, requestURL, connConfig); Configuration cOnf= new Configuration(); FileSystem fs = FileSystem.get(conf); Path path = new Path(logPath); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path))); try { String line; int size = 0; long startTs = System.currentTimeMillis(); line = br.readLine(); while (line != null) { JsonElement root = new JsonParser().parse(line); Map<String, String> parameters = new HashMap<>(); JsonObject rootJson = root.getAsJsonObject(); for (Map.Entry entry : rootJson.entrySet()) { parameters.put(entry.getKey().toString(), rootJson.get(entry.getKey().toString()).getAsString()); } syncLog.addTask(parameters); ++size; if (size >= taskQueueSize || (System.currentTimeMillis() - startTs) > interval) { syncLog.flush(); size = 0; startTs = System.currentTimeMillis(); } line = br.readLine(); } if (0 != size) { syncLog.flush(); } } catch (IOException e) { LOGGER.error("xxx" + e.getMessage()); } finally { br.close(); } syncLog.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } 1 Finest Sep 11, 2017 oRequest 的 resp.close()重复了吧,既然你都在 finally 里 close 了 |
2 Finest Sep 11, 2017 还有就是 ArrayBlockingQueue 的用法有问题,你这用法还不如直接用 List 批量提交。 如果想用到 Queue,那直接开 N 个线程作为消费者线程。 |