connector:
作用:从how tomcat works中解读,connector的作用应该是创建一个连接(其实在tomcat6中,创建请求的工作被下放到 JIoEndpoint等类处理了,connector只是扮演的一个包装器的作用。)然后让processor处理。然后processor从socket输入流中读取http请求。而在tomcat6里,ProtocolHandler 应该就是扮演者处理器的角色。connector只扮演者创建连接的角色,至于如何从输入流中解析客户端parameters,ProtocolHandler来解析。
connector的构造函数:
public Connector(String protocol) throws Exception { setProtocol(protocol); // Instantiate protocol handler try { Class clazz = Class.forName(protocolHandlerClassName); this.protocolHandler = (ProtocolHandler) clazz.newInstance(); } catch (Exception e) { log.error (sm.getString ("coyoteConnector.protocolHandlerInstantiationFailed", e)); } }
?
?
从构造函数中我们可以看到connector通过协议(http,ajp等等)初始化了protocolHandler 。
?
connector的start():
1. initialize();
??? 此方法做了什么工作?
?
adapter = new CoyoteAdapter(this); protocolHandler.setAdapter(adapter);
IntrospectionUtils.setProperty(protocolHandler, "jkHome", ?????????????????????????????????????? System.getProperty("catalina.base"));
?protocolHandler.init();
以http请求为例,讲讲protocolHandler.init()到底是什么?
??????? if (initialized) ??????????? return; ??????? ??????? // Initialize thread count defaults for acceptor ??????? if (acceptorThreadCount == 0) { ??????????? acceptorThreadCount = 1; ??????? } ??????? if (serverSocketFactory == null) { ??????????? serverSocketFactory = ServerSocketFactory.getDefault(); ??????? } ??????? if (serverSocket == null) { ??????????? try { ??????????????? if (address == null) { ??????????????????? serverSocket = serverSocketFactory.createSocket(port, backlog); ??????????????? } else { ??????????????????? serverSocket = serverSocketFactory.createSocket(port, backlog, address); ??????????????? } ??????????? } catch (BindException orig) { ??????????????? String msg; ??????????????? if (address == null) ??????????????????? msg = orig.getMessage() + " <null>:" + port; ??????????????? else ??????????????????? msg = orig.getMessage() + " " + ??????????????????????????? address.toString() + ":" + port; ??????????????? BindException be = new BindException(msg); ??????????????? be.initCause(orig); ??????????????? throw be; ??????????? } ??????? } ??????? //if( serverTimeout >= 0 ) ??????? //??? serverSocket.setSoTimeout( serverTimeout ); ??????? ??????? initialized = true;
?由此看出,此方法初始化socketserver,用来响应客户请求。
?
2 ?注册jmx
3.protocolHandler.start()。
if (!initialized) { init(); } if (!running) { running = true; paused = false; // Create worker collection if (executor == null) { workers = new WorkerStack(maxThreads); } // Start acceptor threads for (int i = 0; i < acceptorThreadCount; i++) { Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i); acceptorThread.setPriority(threadPriority); acceptorThread.setDaemon(daemon); acceptorThread.start(); } }
?开启线程,响应客户请求?
Acceptor类是什么作用呢?
开启线程,监听客户端请求。
public void run() { // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore } } // Accept the next incoming connection from the server socket try { Socket socket = serverSocketFactory.acceptSocket(serverSocket); serverSocketFactory.initSocket(socket); // Hand this socket off to an appropriate processor if (!processSocket(socket)) { // Close socket right away try { socket.close(); } catch (IOException e) { // Ignore } } }catch ( IOException x ) { if ( running ) log.error(sm.getString("endpoint.accept.fail"), x); } catch (Throwable t) { log.error(sm.getString("endpoint.accept.fail"), t); } // The processor will recycle itself when it finishes } }
?
由此看出:acceptor是socket线程启动器。
?
看看processsocket方法
if (executor == null) { getWorkerThread().assign(socket); } else { executor.execute(new SocketProcessor(socket)); }
?
getworkthread方法:
protected Worker createWorkerThread() { synchronized (workers) { if (workers.size() > 0) { curThreadsBusy++; return workers.pop(); } if ((maxThreads > 0) && (curThreads < maxThreads)) { curThreadsBusy++; if (curThreadsBusy == maxThreads) { log.info(sm.getString("endpoint.info.maxThreads", Integer.toString(maxThreads), address, Integer.toString(port))); } return (newWorkerThread()); } else { if (maxThreads < 0) { curThreadsBusy++; return (newWorkerThread()); } else { return (null); } } } }
?
接着看newWorkerThread方法:
Worker workerThread = new Worker(); workerThread.start(); return (workerThread);
?workerThread.start():
public void start() { thread = new Thread(this); thread.setName(getName() + "-" + (++curThreads)); thread.setDaemon(true); thread.start(); }
?看看workerThread做了什么工作:
public void run() { // Process requests until we receive a shutdown signal while (running) { // Wait for the next socket to be assigned Socket socket = await(); if (socket == null) continue; // Process the request from this socket if (!setSocketOptions(socket) || !handler.process(socket)) { // Close socket try { socket.close(); } catch (IOException e) { } } // Finish up this request socket = null; recycleWorkerThread(this); } }
?接着process方法,到此才看到了如何真正处理http请求
public void process(Socket theSocket) throws IOException { RequestInfo rp = request.getRequestProcessor(); rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); // Set the remote address remoteAddr = null; remoteHost = null; localAddr = null; localName = null; remotePort = -1; localPort = -1; // Setting up the I/O this.socket = theSocket; inputBuffer.setInputStream(socket.getInputStream()); outputBuffer.setOutputStream(socket.getOutputStream()); // Error flag error = false; keepAlive = true; int keepAliveLeft = maxKeepAliveRequests; int soTimeout = endpoint.getSoTimeout(); // When using an executor, these values may return non-positive values int curThreads = endpoint.getCurrentThreadsBusy(); int maxThreads = endpoint.getMaxThreads(); if (curThreads > 0 && maxThreads > 0) { // Only auto-disable keep-alive if the current thread usage % can be // calculated correctly if ((curThreads*100)/maxThreads > 75) { keepAliveLeft = 1; } } try { socket.setSoTimeout(soTimeout); } catch (Throwable t) { log.debug(sm.getString("http11processor.socket.timeout"), t); error = true; } boolean keptAlive = false; while (started && !error && keepAlive) { // Parsing the request header try { if (keptAlive) { if (keepAliveTimeout > 0) { socket.setSoTimeout(keepAliveTimeout); } else if (soTimeout > 0) { socket.setSoTimeout(soTimeout); } } inputBuffer.parseRequestLine(); request.setStartTime(System.currentTimeMillis()); keptAlive = true; if (disableUploadTimeout) { socket.setSoTimeout(soTimeout); } else { socket.setSoTimeout(timeout); } inputBuffer.parseHeaders(); } catch (IOException e) { error = true; break; } catch (Throwable t) { if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.header.parse"), t); } // 400 - Bad Request response.setStatus(400); error = true; } if (!error) { // Setting up filters, and parse some request headers rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE); try { prepareRequest(); } catch (Throwable t) { if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.request.prepare"), t); } // 400 - Internal Server Error response.setStatus(400); error = true; } } if (maxKeepAliveRequests > 0 && --keepAliveLeft == 0) keepAlive = false; // Process the request in the adapter if (!error) { try { rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); adapter.service(request, response); // Handle when the response was committed before a serious // error occurred. Throwing a ServletException should both // set the status to 500 and set the errorException. // If we fail here, then the response is likely already // committed, so we can't try and set headers. if(keepAlive && !error) { // Avoid checking twice. error = response.getErrorException() != null || statusDropsConnection(response.getStatus()); } } catch (InterruptedIOException e) { error = true; } catch (Throwable t) { log.error(sm.getString("http11processor.request.process"), t); // 500 - Internal Server Error response.setStatus(500); error = true; } } // Finish the handling of the request try { rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT); // If we know we are closing the connection, don't drain input. // This way uploading a 100GB file doesn't tie up the thread // if the servlet has rejected it. if(error) inputBuffer.setSwallowInput(false); inputBuffer.endRequest(); } catch (IOException e) { error = true; } catch (Throwable t) { log.error(sm.getString("http11processor.request.finish"), t); // 500 - Internal Server Error response.setStatus(500); error = true; } try { rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT); outputBuffer.endRequest(); } catch (IOException e) { error = true; } catch (Throwable t) { log.error(sm.getString("http11processor.response.finish"), t); error = true; } // If there was an error, make sure the request is counted as // and error, and update the statistics counter if (error) { response.setStatus(500); } request.updateCounters(); rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE); // Don't reset the param - we'll see it as ended. Next request // will reset it // thrA.setParam(null); // Next request inputBuffer.nextRequest(); outputBuffer.nextRequest(); } rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); // Recycle inputBuffer.recycle(); outputBuffer.recycle(); this.socket = null; // Recycle ssl info sslSupport = null; }
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?