Arthas原理系列(三):服务端启动流程

前言

本篇文章主要讲我们在终端中敲入的命令是如何被 arthas 服务器识别并且解释的。要注意这个过程是 arthas 对所有命令执行过程的抽闲,对于具体命令的执行过程我会在后面的系列文章中再说。

arthas 服务端的启动

在上一篇文章中,我们跟踪了整个 arthas 工程的入口方法:com.taobao.arthas.agent334.AgentBootstrap#main,在这个方法中,最重要的一个步骤就是启动过了一个绑定线程

private static synchronized void main(String args, final Instrumentation inst) {
    try {
        // 1. 程序运行前的校验,
        // arthas如果已经存在,则直接返回
        // 入参中必须要包含arthas core等
        // 这些代码细节不会影响我们对主流程的理解,因此暂时删除
        final ClassLoader agentLoader = getClassLoader(inst, arthasCoreJarFile);
        Thread bindingThread = new Thread() {
            @Override
            public void run() {
                try {
                    bind(inst, agentLoader, agentArgs);
                } catch (Throwable throwable) {
                    throwable.printStackTrace(ps);
                }
            }
        };

        bindingThread.setName("arthas-binding-thread");
        bindingThread.start();
        bindingThread.join();
    } catch (Throwable t) {
        t.printStackTrace(ps);
        try {
            if (ps != System.err) {
                ps.close();
            }
        } catch (Throwable tt) {
            // ignore
        }
        throw new RuntimeException(t);
    }

bind这个线程的运行时会调用com.taobao.arthas.agent334.AgentBootstrap#bind,这个方法的详细代码如下:

private static void bind(Instrumentation inst, ClassLoader agentLoader, String args) throws Throwable {
        /**
         * <pre>
         * ArthasBootstrap bootstrap = ArthasBootstrap.getInstance(inst);
         * </pre>
         */

        Class<?> bootstrapClass = agentLoader.loadClass(ARTHAS_BOOTSTRAP);
        Object bootstrap = bootstrapClass.getMethod(GET_INSTANCE, Instrumentation.classString.class).invoke(nullinstargs);
        boolean isBind = (Boolean) bootstrapClass.getMethod(IS_BIND).invoke(bootstrap);
        if (!isBind) {
            String errorMsg = "Arthas server port binding failed! Please check $HOME/logs/arthas/arthas.log for more details.";
            ps.println(errorMsg);
            throw new RuntimeException(errorMsg);
        }
        ps.println("Arthas server already bind.");
    }

这段方法用反射的方法调用了com.taobao.arthas.core.server.ArthasBootstrap的静态方法getInstance,并且把从main方法中解析到参数再传到这个getInstance中。

getInstance从这个名字看就是返回一个ArthasBootstrap的实例,事实上代码的逻辑也是这样的,其中最关键的就是ArthasBootstrap的构造函数函数:

private ArthasBootstrap(Instrumentation instrumentation, Map<String, String> args) throws Throwable {
        this.instrumentation = instrumentation;

        String outputPath = System.getProperty("arthas.output.dir""arthas-output");
        arthasOutputDir = new File(outputPath);
        arthasOutputDir.mkdirs();

        // 1. initSpy()
        // 加载SpyAPI这个类
        initSpy(instrumentation);
        // 2. ArthasEnvironment
        // 初始化arthas运行的环境变量
        initArthasEnvironment(args);
        // 3. init logger
        loggerContext = LogUtil.initLooger(arthasEnvironment);

        // 4. init beans
        // 初始化结果渲染和历史命令管理的相关类
        initBeans();

        // 5. start agent server
        // 启动server,开始监听
        bind(configure);

        // 注册一些钩子函数
        executorService = Executors.newScheduledThreadPool(1new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                final Thread t = new Thread(r, "arthas-command-execute");
                t.setDaemon(true);
                return t;
            }
        });

        shutdown = new Thread("as-shutdown-hooker") {

            @Override
            public void run() {
                ArthasBootstrap.this.destroy();
            }
        };

        transformerManager = new TransformerManager(instrumentation);
        Runtime.getRuntime().addShutdownHook(shutdown);
    }

在这个构造函数中,最重要的就是com.taobao.arthas.core.server.ArthasBootstrap#bind这个方法

private void bind(Configure configure) throws Throwable {

    // 无关紧要的一些前置操作,先删除掉

    try {
        // 关于arthas tunnel server,请参考:
        // https://arthas.aliyun.com/doc/tunnel.html
        if (configure.getTunnelServer() != null) {
            tunnelClient = new TunnelClient();
            tunnelClient.setAppName(configure.getAppName());
            tunnelClient.setId(configure.getAgentId());
            tunnelClient.setTunnelServerUrl(configure.getTunnelServer());
            tunnelClient.setVersion(ArthasBanner.version());
            ChannelFuture channelFuture = tunnelClient.start();
            channelFuture.await(10, TimeUnit.SECONDS);
        }
    } catch (Throwable t) {
        logger().error("start tunnel client error", t);
    }

    try {
        // 将一些非常关键的参数包装成ShellServerOptions对象
        ShellServerOptions options = new ShellServerOptions()
                        .setInstrumentation(instrumentation)
                        .setPid(PidUtils.currentLongPid())
                        .setWelcomeMessage(ArthasBanner.welcome());
        if (configure.getSessionTimeout() != null) {
            options.setSessionTimeout(configure.getSessionTimeout() * 1000);
        }

        // new 一个shellServer,用于监听命令
        shellServer = new ShellServerImpl(options);

        // BuiltinCommandPack对象首次出现,包含了所有的内置命令
        BuiltinCommandPack builtinCommands = new BuiltinCommandPack();
        List<CommandResolver> resolvers = new ArrayList<CommandResolver>();
        resolvers.add(builtinCommands);

        //worker group
        workerGroup = new NioEventLoopGroup(new DefaultThreadFactory("arthas-TermServer"true));

        // TODO: discover user provided command resolver
        if (configure.getTelnetPort() != null && configure.getTelnetPort() > 0) {
            shellServer.registerTermServer(new HttpTelnetTermServer(configure.getIp(), configure.getTelnetPort(),
                    options.getConnectionTimeout(), workerGroup));
        } else {
            logger().info("telnet port is {}, skip bind telnet server.", configure.getTelnetPort());
        }
        if (configure.getHttpPort() != null && configure.getHttpPort() > 0) {
            shellServer.registerTermServer(new HttpTermServer(configure.getIp(), configure.getHttpPort(),
                    options.getConnectionTimeout(), workerGroup));
        } else {
            // listen local address in VM communication
            if (configure.getTunnelServer() != null) {
                shellServer.registerTermServer(new HttpTermServer(configure.getIp(), configure.getHttpPort(),
                        options.getConnectionTimeout(), workerGroup));
            }
            logger().info("http port is {}, skip bind http server.", configure.getHttpPort());
        }

        for (CommandResolver resolver : resolvers) {
            shellServer.registerCommandResolver(resolver);
        }

        shellServer.listen(new BindHandler(isBindRef));
        if (!isBind()) {
            throw new IllegalStateException("Arthas failed to bind telnet or http port.");
        }

        //http api session manager
        sessionManager = new SessionManagerImpl(options, shellServer.getCommandManager(), shellServer.getJobController());
        //http api handler
        httpApiHandler = new HttpApiHandler(historyManager, sessionManager);

        logger().info("as-server listening on network={};telnet={};http={};timeout={};", configure.getIp(),
                configure.getTelnetPort(), configure.getHttpPort(), options.getConnectionTimeout());

        // 异步回报启动次数
        if (configure.getStatUrl() != null) {
            logger().info("arthas stat url: {}", configure.getStatUrl());
        }
        UserStatUtil.setStatUrl(configure.getStatUrl());
        UserStatUtil.arthasStart();

        try {
            SpyAPI.init();
        } catch (Throwable e) {
            // ignore
        }

        logger().info("as-server started in {} ms", System.currentTimeMillis() - start);
    } catch (Throwable e) {
        logger().error("Error during start as-server", e);
        destroy();
        throw e;
    }
}

这个方法使我们到目前为止见到的最复杂的一个方法,里面还是有很多的旁枝末节的干扰,总结一下,这个方法全都是围绕着如何构建一个ShellServer对象来进行的:

  1. 第一步会将一些非常重要的入参包装ShellServerOptions传入ShellServer
  2. 然后会在ShellerServer上注册命令解释器BuiltinCommandPack,点开BuiltinCommandPack会发现,所有的命令都已经包含在内了
  3. 根据入参的不同在ShellerServer上注册不同的TermServer,比如HttpTermServer或者是HttpTelnetTermServer
  4. 服务器开启监听指令

BuiltinCommandPack的实现如下所示:

public class  BuiltinCommandPack implements CommandResolver {

    private static List<Command> commands = new ArrayList<Command>();

    static {
        initCommands();
    }

    @Override
    public List<Command> commands() {
        return commands;
    }

    private static void initCommands() {
        commands.add(Command.create(HelpCommand.class));
        commands.add(Command.create(KeymapCommand.class));
        commands.add(Command.create(SearchClassCommand.class));
        commands.add(Command.create(SearchMethodCommand.class));
        // ...
    }
}

服务端对命令行的监听和处理

接下来我们分析arthas服务端的监听过程

@Override
public ShellServer listen(final Handler<Future<Void>> listenHandler) {
    final List<TermServer> toStart;
    synchronized (this) {
        if (!closed) {
            throw new IllegalStateException("Server listening");
        }
        toStart = termServers;
    }
    final AtomicInteger count = new AtomicInteger(toStart.size());
    if (count.get() == 0) {
        setClosed(false);
        listenHandler.handle(Future.<Void>succeededFuture());
        return this;
    }
    Handler<Future<TermServer>> handler = new TermServerListenHandler(this, listenHandler, toStart);
    for (TermServer termServer : toStart) {
        // termHandler是termServer监听命令的回调函数
        // 当有新的命令通过网络到达server时会调用这个回调函数
        termServer.termHandler(new TermServerTermHandler(this));
        termServer.listen(handler);
    }
    return this;
}

我们以HttpTermServer为例

@Override
public TermServer listen(Handler<Future<TermServer>> listenHandler) {
    // TODO: charset and inputrc from options
    bootstrap = new NettyWebsocketTtyBootstrap(workerGroup).setHost(hostIp).setPort(port);
    try {
        bootstrap.start(new Consumer<TtyConnection>() {
            @Override
            public void accept(final TtyConnection conn) {
                termHandler.handle(new TermImpl(Helper.loadKeymap(), conn));
            }
        }).get(connectionTimeout, TimeUnit.MILLISECONDS);
        listenHandler.handle(Future.<TermServer>succeededFuture());
    } catch (Throwable t) {
        logger.error("Error listening to port " + port, t);
        listenHandler.handle(Future.<TermServer>failedFuture(t));
    }
    return this;
}

会发现程序会最终去异步的调用termHandlerhandle方法,而termHandler正是前面注册的TermServerTermHandler这个类的实例:

public class TermServerTermHandler implements Handler<Term{
    private ShellServerImpl shellServer;

    public TermServerTermHandler(ShellServerImpl shellServer) {
        this.shellServer = shellServer;
    }

    @Override
    public void handle(Term term) {
        shellServer.handleTerm(term);
    }
}

handle又回调了shellServerhandleTerm方法,我们的视线随着调用流程再回到ShellServer这个类

public void handleTerm(Term term) {
    synchronized (this) {
        // That might happen with multiple ser
        if (closed) {
            term.close();
            return;
        }
    }

    ShellImpl session = createShell(term);
    tryUpdateWelcomeMessage();
    session.setWelcome(welcomeMessage);
    session.closedFuture.setHandler(new SessionClosedHandler(this, session));
    session.init();
    sessions.put(session.id, session); // Put after init so the close handler on the connection is set
    session.readline(); // Now readline
}

这个方法中的最后一行代码session.readline();是我们重点要关注的地方

public void readline() {
    // 这里要注意ShellLineHandler这个类,后面readLine的回调最终会回到这里来
    term.readline(prompt, new ShellLineHandler(this),
            new CommandManagerCompletionHandler(commandManager));
}

我们以TermImpl的实现为例

public void readline(String prompt, Handler<String> lineHandler, Handler<Completion> completionHandler) {
    if (conn.getStdinHandler() != echoHandler) {
        throw new IllegalStateException();
    }
    if (inReadline) {
        throw new IllegalStateException();
    }
    inReadline = true;
    readline.readline(conn, prompt, new RequestHandler(this, lineHandler), new CompletionHandler(completionHandler, session));
}

这个方法调用了readline.readline方法,并把之前传进来的ShellLineHandler也包进了RequestHandler传到了readline.readline中,我们继续往进看

public void readline(TtyConnection conn, String prompt, Consumer<String> requestHandler, Consumer<Completion> completionHandler) {
    synchronized (this) {
        if (interaction != null) {
        throw new IllegalStateException("Already reading a line");
        }
        interaction = new Interaction(conn, prompt, requestHandler, completionHandler);
    }
    interaction.install();
    conn.write(prompt);
    schedulePendingEvent();
}

interaction.install();以及schedulePendingEvent();这两汉代码最终都会调用下面的一段方法

private void deliver() {
    while (true) {
      Interaction handler;
      KeyEvent event;
      synchronized (this) {
        if (decoder.hasNext() && interaction != null && !interaction.paused) {
          event = decoder.next();
          handler = interaction;
        } else {
          return;
        }
      }
      handler.handle(event);
    }
}

InteractionReadLine的一个内部类,他的handle方法比较长,我们截取这个方法的关键部分如下所示:

private void handle(KeyEvent event) {
    if (event instanceof FunctionEvent) {
    FunctionEvent fname = (FunctionEvent) event;
    Function function = functions.get(fname.name());
    if (function != null) {
        synchronized (this) {
        paused = true;
        }
        function.apply(this);
    } else {
        Logging.READLINE.warn("Unimplemented function " + fname.name());
    }
    } else {
    LineBuffer buf = buffer.copy();
    for (int i = 0;i < event.length();i++) {
        int codePoint = event.getCodePointAt(i);
        try {
        buf.insert(codePoint);
        } catch (IllegalArgumentException e) {
        conn.stdoutHandler().accept(new int[]{'07'});
        }
    }
    refresh(buf);
    }
}

在这段代码中,会首先判断输入时间是否在预存的functions这个变量中已经定义,如果有的话,则执行相应apply方法,否则做缓存相关的操作。 在ReadLine这个类新建的时候,值预定义了一个方法,那就是ACCEPT_LINE

public Readline(Keymap keymap) {
    // https://github.com/alibaba/termd/issues/42
    // this.device = TermInfo.defaultInfo().getDevice("xterm"); // For now use xterm
    this.decoder = new EventQueue(keymap);
    this.history = new ArrayList<int[]>();
    addFunction(ACCEPT_LINE);
}

ACCEPT_LINE的定义如下:

private final Function ACCEPT_LINE = new Function() {

    @Override
    public String name() {
      return "accept-line";
    }

    @Override
    public void apply(Interaction interaction) {
      interaction.line.insert(interaction.buffer.toArray());
      LineStatus pb = new LineStatus();
      for (int i = 0;i < interaction.line.getSize();i++) {
        pb.accept(interaction.line.getAt(i));
      }
      interaction.buffer.clear();
      if (pb.isEscaping()) {
        interaction.line.delete(-1); // Remove 
        interaction.currentPrompt = "> ";
        interaction.conn.write("n> ");
        interaction.resume();
      } else {
        if (pb.isQuoted()) {
          interaction.line.insert('n');
          interaction.conn.write("n> ");
          interaction.currentPrompt = "> ";
          interaction.resume();
        } else {
          String raw = interaction.line.toString();
          if (interaction.line.getSize() > 0) {
            addToHistory(interaction.line.toArray());
          }
          interaction.line.clear();
          interaction.conn.write("n");
          interaction.end(raw);
        }
      }
    }
}

ACCEPT_LINEapply方法中,如果程序判定到达服务器的是一个合法的命令行,则会调用io.termd.core.readline.Readline.Interaction#end方法,而这个方法,最终会调用requestHandler.accept(s);,这个RequestHandler其实就是封装了一层ShellLineHandler

private boolean end(String s) {
    synchronized (Readline.this) {
    if (interaction == null) {
        return false;
    }
    interaction = null;
    conn.setStdinHandler(prevReadHandler);
    conn.setSizeHandler(prevSizeHandler);
    conn.setEventHandler(prevEventHandler);
    }
    requestHandler.accept(s);
    return true;
}

通过上面的分析可以看到,后续我们对命令的处理直接看ShellLineHandler就可以了

命令的执行

public void handle(String line) {
    String name = first.value();
    if (name.equals("exit") || name.equals("logout") || name.equals("q") || name.equals("quit")) {
        handleExit();
        return;
    } else if (name.equals("jobs")) {
        handleJobs();
        return;
    } else if (name.equals("fg")) {
        handleForeground(tokens);
        return;
    } else if (name.equals("bg")) {
        handleBackground(tokens);
        return;
    } else if (name.equals("kill")) {
        handleKill(tokens);
        return;
    }

    Job job = createJob(tokens);
    if (job != null) {
        job.run();
    }
}

com.taobao.arthas.core.shell.handlers.shell.ShellLineHandler#handle的设计中,如果是一些简单的命令,比如说exit, logout,jobs,fg,bg,kill等,都是直接执行的,而其他的命令都是直接通过创建一个Job来执行的,这一小节,我们主要看arthas是怎么抽象命令的执行的:从创建Job开始

@Override
public synchronized Job createJob(List<CliToken> args) {
    Job job = jobController.createJob(commandManager, args, session, new ShellJobHandler(this), term, null);
    return job;
}

会转发到:

@Override
public Job createJob(InternalCommandManager commandManager, List<CliToken> tokens, Session session, JobListener jobHandler, Term term, ResultDistributor resultDistributor) {
    int jobId = idGenerator.incrementAndGet();
    StringBuilder line = new StringBuilder();
    for (CliToken arg : tokens) {
        line.append(arg.raw());
    }
    boolean runInBackground = runInBackground(tokens);
    Process process = createProcess(tokens, commandManager, jobId, term, resultDistributor);
    process.setJobId(jobId);
    JobImpl job = new JobImpl(jobId, this, process, line.toString(), runInBackground, session, jobHandler);
    jobs.put(jobId, job);
    return job;
}

Jobrun方法是完全委托给Process的,所以接下来就直接看createProcess的过程:

private Process createProcess(List<CliToken> line, InternalCommandManager commandManager, int jobId, Term term, ResultDistributor resultDistributor) {
    try {
        ListIterator<CliToken> tokens = line.listIterator();
        while (tokens.hasNext()) {
            CliToken token = tokens.next();
            if (token.isText()) {
                Command command = commandManager.getCommand(token.value());
                if (command != null) {
                    return createCommandProcess(command, tokens, jobId, term, resultDistributor);
                } else {
                    throw new IllegalArgumentException(token.value() + ": command not found");
                }
            }
        }
        throw new IllegalArgumentException();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

这段代码的意图比较明显,根据输入的命令去找相应的Command对象,如果找到则创建Process对象,根据文本找相应Command的逻辑如下:

public Command getCommand(String commandName) {
    Command command = null;
    for (CommandResolver resolver : resolvers) {
        // 内建命令在ShellLineHandler里提前处理了,所以这里不需要再查找内建命令
        if (resolver instanceof BuiltinCommandPack) {
            command = getCommand(resolver, commandName);
            if (command != null) {
                break;
            }
        }
    }
    return command;
}

private static Command getCommand(CommandResolver commandResolver, String name) {
    List<Command> commands = commandResolver.commands();
    if (commands == null || commands.isEmpty()) {
        return null;
    }

    for (Command command : commands) {
        if (name.equals(command.name())) {
            return command;
        }
    }
    return null;
}

这块的逻辑还是比较清晰的,我们再看看在找到对应Command之后如何创建Process

private Process createCommandProcess(Command command, ListIterator<CliToken> tokens, int jobId, Term term, ResultDistributor resultDistributor) throws IOException {
    List<CliToken> remaining = new ArrayList<CliToken>();
    List<CliToken> pipelineTokens = new ArrayList<CliToken>();
    boolean isPipeline = false;
    RedirectHandler redirectHandler = null;
    List<Function<String, String>> stdoutHandlerChain = new ArrayList<Function<String, String>>();
    String cacheLocation = null;
    // 删除中间处理管道和后台进程的代码
    ProcessOutput ProcessOutput = new ProcessOutput(stdoutHandlerChain, cacheLocation, term);
    ProcessImpl process = new ProcessImpl(command, remaining, command.processHandler(), ProcessOutput, resultDistributor);
    process.setTty(term);
    return process;
}

在删除了中间的处理管道和后台命令的代码之后这段代码的逻辑也非常清晰,就是根据解析好的Command对象创建一个Process对象,值得注意的是,这里把command.processHandler()传进了Process的构造函数中。查看com.taobao.arthas.core.shell.system.impl.ProcessImpl#run(),可以看到最终会调用到com.taobao.arthas.core.shell.system.impl.ProcessImpl.CommandProcessTask#run

private class CommandProcessTask implements Runnable {

    private CommandProcess process;

    public CommandProcessTask(CommandProcess process) {
        this.process = process;
    }

    @Override
    public void run() {
        try {
            handler.handle(process);
        } catch (Throwable t) {
            logger.error("Error during processing the command:", t);
            process.end(1"Error during processing the command: " + t.getClass().getName() + ", message:" + t.getMessage()
                    + ", please check $HOME/logs/arthas/arthas.log for more details." );
        }
    }
}

这里的handler正是创建Process对象时调用command.processHandler()传进去的

// processHandler 初始化
private Handler<CommandProcess> processHandler = new ProcessHandler();
@Override
public Handler<CommandProcess> processHandler() {
    return processHandler;
}

private class ProcessHandler implements Handler<CommandProcess{
    @Override
    public void handle(CommandProcess process) {
        process(process);
    }
}

private void process(CommandProcess process) {
    AnnotatedCommand instance;
    try {
        instance = clazz.newInstance();
    } catch (Exception e) {
        process.end();
        return;
    }
    CLIConfigurator.inject(process.commandLine(), instance);
    instance.process(process);
    UserStatUtil.arthasUsageSuccess(name(), process.args());
}

通过instance.process(process);就可以调用到具体Command类的process方法了,比如说我们以watch命令为例,如果客户端输入的是这条命令,则会触发代码的插装

@Override
public void process(final CommandProcess process) {
    // ctrl-C support
    process.interruptHandler(new CommandInterruptHandler(process));
    // q exit support
    process.stdinHandler(new QExitHandler(process));

    // start to enhance
    enhance(process);
}

小结一下

整个启动过程还是比较清晰的,需要注意的是在这个过程中有好多的回调函数,这些回调函数中才包含真正处理事件的逻辑,需要多翻几遍上下文才能完全理解

Arthas原理系列(三):服务端启动流程

本文详细的跟了上面这个类图中类之间的交互,服务器抽象这个模块主要负责建立起完整的服务器环境并监听到达服务端的命令,到达的命令经过初步解析之后通过建立的任务类去执行,在任务的执行中通过在ShellImpl中持有的ShellServer引用,可以解析出具体的Command类,最后,命令的执行会调用对应Command类中的process方法,从而完成了整个命令的执行。


– END –
Arthas原理系列(三):服务端启动流程

OGNL语法规范

消失的堆栈

Arthas原理系列(一):利用JVM的attach机制实现一个极简的watch命令

Arthas原理系列(二):总体架构和项目入口

Arthas原理系列(三):服务端启动流程

Arthas原理系列(三):服务端启动流程

扫描二维码关注

我领取面试资料





原文始发于微信公众号(苦味代码):Arthas原理系列(三):服务端启动流程

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/21598.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!