1 /*
   2  * Copyright (c) 2002, 2019, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 import java.net.*;
  25 import java.io.*;
  26 import java.nio.*;
  27 import java.nio.channels.*;
  28 import sun.net.www.MessageHeader;
  29 import java.util.*;
  30 
  31 /**
  32  * This class implements a simple HTTP server. It uses multiple threads to
  33  * handle connections in parallel, and also multiple connections/requests
  34  * can be handled per thread.
  35  * <p>
  36  * It must be instantiated with a {@link HttpCallback} object to which
  37  * requests are given and must be handled.
  38  * <p>
  39  * Simple synchronization between the client(s) and server can be done
  40  * using the {@link #waitForCondition(String)}, {@link #setCondition(String)} and
  41  * {@link #rendezvous(String,int)} methods.
  42  *
  43  * NOTE NOTE NOTE NOTE NOTE NOTE NOTE
  44  *
  45  * If changes are made here, please sure they are propagated to
  46  * the HTTPS equivalent in the JSSE regression test suite.
  47  *
  48  * NOTE NOTE NOTE NOTE NOTE NOTE NOTE
  49  */
  50 
  51 public class TestHttpServer {
  52 
  53     ServerSocketChannel schan;
  54     int threads;
  55     int cperthread;
  56     HttpCallback cb;
  57     Server[] servers;
  58 
  59     /**
  60      * Create a <code>TestHttpServer<code> instance with the specified callback object
  61      * for handling requests. One thread is created to handle requests,
  62      * and up to ten TCP connections will be handled simultaneously.
  63      * @param cb the callback object which is invoked to handle each
  64      *  incoming request
  65      */
  66 
  67     public TestHttpServer (HttpCallback cb) throws IOException {
  68         this (cb, 1, 10, 0);
  69     }
  70 
  71     /**
  72      * Create a <code>TestHttpServer<code> instance with the specified callback object
  73      * for handling requests. One thread is created to handle requests,
  74      * and up to ten TCP connections will be handled simultaneously.
  75      * @param cb the callback object which is invoked to handle each
  76      *  incoming request
  77      * @param address the address to bind the server to. <code>Null</code>
  78      *  means bind to the wildcard address.
  79      * @param port the port number to bind the server to. <code>Zero</code>
  80      *  means choose any free port.
  81      */
  82 
  83     public TestHttpServer (HttpCallback cb, InetAddress address, int port) throws IOException {
  84         this (cb, 1, 10, address, 0);
  85     }
  86 
  87     /**
  88      * Create a <code>TestHttpServer<code> instance with the specified number of
  89      * threads and maximum number of connections per thread. This functions
  90      * the same as the 4 arg constructor, where the port argument is set to zero.
  91      * @param cb the callback object which is invoked to handle each
  92      *     incoming request
  93      * @param threads the number of threads to create to handle requests
  94      *     in parallel
  95      * @param cperthread the number of simultaneous TCP connections to
  96      *     handle per thread
  97      */
  98 
  99     public TestHttpServer (HttpCallback cb, int threads, int cperthread)
 100         throws IOException {
 101         this (cb, threads, cperthread, 0);
 102     }
 103 
 104     /**
 105      * Create a <code>TestHttpServer<code> instance with the specified number
 106      * of threads and maximum number of connections per thread and running on
 107      * the specified port. The specified number of threads are created to
 108      * handle incoming requests, and each thread is allowed
 109      * to handle a number of simultaneous TCP connections.
 110      * @param cb the callback object which is invoked to handle
 111      *  each incoming request
 112      * @param threads the number of threads to create to handle
 113      *  requests in parallel
 114      * @param cperthread the number of simultaneous TCP connections
 115      *  to handle per thread
 116      * @param port the port number to bind the server to. <code>Zero</code>
 117      *  means choose any free port.
 118      */
 119 
 120     public TestHttpServer (HttpCallback cb, int threads, int cperthread, int port)
 121             throws IOException {
 122         this(cb, threads, cperthread, null, port);
 123     }
 124 
 125     /**
 126      * Create a <code>TestHttpServer<code> instance with the specified number
 127      * of threads and maximum number of connections per thread and running on
 128      * the specified port. The specified number of threads are created to
 129      * handle incoming requests, and each thread is allowed
 130      * to handle a number of simultaneous TCP connections.
 131      * @param cb the callback object which is invoked to handle
 132      *  each incoming request
 133      * @param threads the number of threads to create to handle
 134      *  requests in parallel
 135      * @param cperthread the number of simultaneous TCP connections
 136      *  to handle per thread
 137      * @param address the address to bind the server to. <code>Null</code>
 138      *  means bind to the wildcard address.
 139      * @param port the port number to bind the server to. <code>Zero</code>
 140      *  means choose any free port.
 141      */
 142 
 143     public TestHttpServer (HttpCallback cb, int threads, int cperthread,
 144                            InetAddress address, int port)
 145         throws IOException {
 146         schan = ServerSocketChannel.open ();
 147         InetSocketAddress addr = new InetSocketAddress (address, port);
 148         schan.socket().setReuseAddress(true);
 149         schan.socket().bind (addr);
 150         this.threads = threads;
 151         this.cb = cb;
 152         this.cperthread = cperthread;
 153         servers = new Server [threads];
 154         for (int i=0; i<threads; i++) {
 155             servers[i] = new Server (cb, schan, cperthread);
 156             servers[i].start();
 157         }
 158     }
 159 
 160     /**
 161      * Tell all threads in the server to exit within 5 seconds.
 162      * This is an abortive termination. Just prior to the thread exiting
 163      * all channels in that thread waiting to be closed are forceably closed.
 164      * @throws InterruptedException
 165      */
 166 
 167     public void terminate () {
 168         for (int i=0; i<threads; i++) {
 169             servers[i].terminate ();
 170         }
 171 
 172         for (int i = 0; i < threads; i++) {
 173             try {
 174                 servers[i].join();
 175             } catch (InterruptedException e) {
 176                 System.err.println("Unexpected InterruptedException during terminating server");
 177                 throw new RuntimeException(e);
 178             }
 179         }
 180     }
 181 
 182     /**
 183      * return the local port number to which the server is bound.
 184      * @return the local port number
 185      */
 186 
 187     public int getLocalPort () {
 188         return schan.socket().getLocalPort ();
 189     }
 190 
 191     public String getAuthority() {
 192         InetAddress address = schan.socket().getInetAddress();
 193         String hostaddr = address.getHostAddress();
 194         if (address.isAnyLocalAddress()) hostaddr = "localhost";
 195         if (hostaddr.indexOf(':') > -1) hostaddr = "[" + hostaddr + "]";
 196         return hostaddr + ":" + getLocalPort();
 197     }
 198 
 199     static class Server extends Thread {
 200 
 201         ServerSocketChannel schan;
 202         Selector selector;
 203         SelectionKey listenerKey;
 204         SelectionKey key; /* the current key being processed */
 205         HttpCallback cb;
 206         ByteBuffer consumeBuffer;
 207         int maxconn;
 208         int nconn;
 209         ClosedChannelList clist;
 210         volatile boolean shutdown;
 211 
 212         Server (HttpCallback cb, ServerSocketChannel schan, int maxconn) {
 213             this.schan = schan;
 214             this.maxconn = maxconn;
 215             this.cb = cb;
 216             nconn = 0;
 217             consumeBuffer = ByteBuffer.allocate (512);
 218             clist = new ClosedChannelList ();
 219             try {
 220                 selector = Selector.open ();
 221                 schan.configureBlocking (false);
 222                 listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
 223             } catch (IOException e) {
 224                 System.err.println ("Server could not start: " + e);
 225                 throw new RuntimeException("Server could not start: " + e, e);
 226             }
 227         }
 228 
 229         /* Stop the thread as soon as possible */
 230         public void terminate () {
 231             shutdown = true;
 232         }
 233 
 234         public void run ()  {
 235             try {
 236                 while (true) {
 237                     selector.select(1000);
 238                     Set<SelectionKey> selected = selector.selectedKeys();
 239                     Iterator<SelectionKey> iter = selected.iterator();
 240                     while (iter.hasNext()) {
 241                         key = iter.next();
 242                         if (key.equals (listenerKey)) {
 243                             SocketChannel sock = schan.accept ();
 244                             if (sock == null) {
 245                                 /* false notification */
 246                                 iter.remove();
 247                                 continue;
 248                             }
 249                             sock.configureBlocking (false);
 250                             sock.register (selector, SelectionKey.OP_READ);
 251                             nconn ++;
 252                             System.out.println("SERVER: new connection. chan[" + sock + "]");
 253                             if (nconn == maxconn) {
 254                                 /* deregister */
 255                                 listenerKey.cancel ();
 256                                 listenerKey = null;
 257                             }
 258                         } else {
 259                             if (key.isReadable()) {
 260                                 boolean closed;
 261                                 SocketChannel chan = (SocketChannel) key.channel();
 262                                 System.out.println("SERVER: connection readable. chan[" + chan + "]");
 263                                 if (key.attachment() != null) {
 264                                     System.out.println("Server: consume");
 265                                     closed = consume (chan);
 266                                 } else {
 267                                     closed = read (chan, key);
 268                                 }
 269                                 if (closed) {
 270                                     chan.close ();
 271                                     key.cancel ();
 272                                     if (nconn == maxconn) {
 273                                         listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
 274                                     }
 275                                     nconn --;
 276                                 }
 277                             }
 278                         }
 279                         iter.remove();
 280                     }
 281                     clist.check();
 282                     if (shutdown) {
 283                         System.out.println("Force to Shutdown");
 284                         SelectionKey sKey = schan.keyFor(selector);
 285                         if (sKey != null) {
 286                             sKey.cancel();
 287                         }
 288 
 289                         clist.terminate ();
 290                         selector.close();
 291                         schan.socket().close();
 292                         schan.close();
 293                         return;
 294                     }
 295                 }
 296             } catch (IOException e) {
 297                 System.out.println ("Server exception: " + e);
 298                 // TODO finish
 299             }
 300         }
 301 
 302         /* read all the data off the channel without looking at it
 303              * return true if connection closed
 304              */
 305         boolean consume (SocketChannel chan) {
 306             try {
 307                 consumeBuffer.clear ();
 308                 int c = chan.read (consumeBuffer);
 309                 if (c == -1)
 310                     return true;
 311             } catch (IOException e) {
 312                 return true;
 313             }
 314             return false;
 315         }
 316 
 317         /* return true if the connection is closed, false otherwise */
 318 
 319         private boolean read (SocketChannel chan, SelectionKey key) {
 320             HttpTransaction msg;
 321             boolean res;
 322             try {
 323                 InputStream is = new BufferedInputStream (new NioInputStream (chan));
 324                 String requestline = readLine (is);
 325                 MessageHeader mhead = new MessageHeader (is);
 326                 String clen = mhead.findValue ("Content-Length");
 327                 String trferenc = mhead.findValue ("Transfer-Encoding");
 328                 String data = null;
 329                 if (trferenc != null && trferenc.equals ("chunked"))
 330                     data = new String (readChunkedData (is));
 331                 else if (clen != null)
 332                     data = new String (readNormalData (is, Integer.parseInt (clen)));
 333                 String[] req = requestline.split (" ");
 334                 if (req.length < 2) {
 335                     /* invalid request line */
 336                     return false;
 337                 }
 338                 String cmd = req[0];
 339                 URI uri = null;
 340                 try {
 341                     uri = new URI (req[1]);
 342                     msg = new HttpTransaction (this, cmd, uri, mhead, data, null, key);
 343                     cb.request (msg);
 344                 } catch (URISyntaxException e) {
 345                     System.err.println ("Invalid URI: " + e);
 346                     msg = new HttpTransaction (this, cmd, null, null, null, null, key);
 347                     msg.sendResponse (501, "Whatever");
 348                 }
 349                 res = false;
 350             } catch (IOException e) {
 351                 res = true;
 352             }
 353             return res;
 354         }
 355 
 356         byte[] readNormalData (InputStream is, int len) throws IOException {
 357             byte [] buf  = new byte [len];
 358             int c, off=0, remain=len;
 359             while (remain > 0 && ((c=is.read (buf, off, remain))>0)) {
 360                 remain -= c;
 361                 off += c;
 362             }
 363             return buf;
 364         }
 365 
 366         private void readCRLF(InputStream is) throws IOException {
 367             int cr = is.read();
 368             int lf = is.read();
 369 
 370             if (((cr & 0xff) != 0x0d) ||
 371                 ((lf & 0xff) != 0x0a)) {
 372                 throw new IOException(
 373                     "Expected <CR><LF>:  got '" + cr + "/" + lf + "'");
 374             }
 375         }
 376 
 377         byte[] readChunkedData (InputStream is) throws IOException {
 378             LinkedList l = new LinkedList ();
 379             int total = 0;
 380             for (int len=readChunkLen(is); len!=0; len=readChunkLen(is)) {
 381                 l.add (readNormalData(is, len));
 382                 total += len;
 383                 readCRLF(is);  // CRLF at end of chunk
 384             }
 385             readCRLF(is); // CRLF at end of Chunked Stream.
 386             byte[] buf = new byte [total];
 387             Iterator i = l.iterator();
 388             int x = 0;
 389             while (i.hasNext()) {
 390                 byte[] b = (byte[])i.next();
 391                 System.arraycopy (b, 0, buf, x, b.length);
 392                 x += b.length;
 393             }
 394             return buf;
 395         }
 396 
 397         private int readChunkLen (InputStream is) throws IOException {
 398             int c, len=0;
 399             boolean done=false, readCR=false;
 400             while (!done) {
 401                 c = is.read ();
 402                 if (c == '\n' && readCR) {
 403                     done = true;
 404                 } else {
 405                     if (c == '\r' && !readCR) {
 406                         readCR = true;
 407                     } else {
 408                         int x=0;
 409                         if (c >= 'a' && c <= 'f') {
 410                             x = c - 'a' + 10;
 411                         } else if (c >= 'A' && c <= 'F') {
 412                             x = c - 'A' + 10;
 413                         } else if (c >= '0' && c <= '9') {
 414                             x = c - '0';
 415                         }
 416                         len = len * 16 + x;
 417                     }
 418                 }
 419             }
 420             return len;
 421         }
 422 
 423         private String readLine (InputStream is) throws IOException {
 424             boolean done=false, readCR=false;
 425             byte[] b = new byte [512];
 426             int c, l = 0;
 427 
 428             while (!done) {
 429                 c = is.read ();
 430                 if (c == '\n' && readCR) {
 431                     done = true;
 432                 } else {
 433                     if (c == '\r' && !readCR) {
 434                         readCR = true;
 435                     } else {
 436                         b[l++] = (byte)c;
 437                     }
 438                 }
 439             }
 440             return new String (b);
 441         }
 442 
 443         /** close the channel associated with the current key by:
 444          * 1. shutdownOutput (send a FIN)
 445          * 2. mark the key so that incoming data is to be consumed and discarded
 446          * 3. After a period, close the socket
 447          */
 448 
 449         synchronized void orderlyCloseChannel (SelectionKey key) throws IOException {
 450             SocketChannel ch = (SocketChannel)key.channel ();
 451             System.out.println("SERVER: orderlyCloseChannel chan[" + ch + "]");
 452             ch.socket().shutdownOutput();
 453             key.attach (this);
 454             clist.add (key);
 455         }
 456 
 457         synchronized void abortiveCloseChannel (SelectionKey key) throws IOException {
 458             SocketChannel ch = (SocketChannel)key.channel ();
 459             System.out.println("SERVER: abortiveCloseChannel chan[" + ch + "]");
 460 
 461             Socket s = ch.socket ();
 462             s.setSoLinger (true, 0);
 463             ch.close();
 464         }
 465     }
 466 
 467 
 468     /**
 469      * Implements blocking reading semantics on top of a non-blocking channel
 470      */
 471 
 472     static class NioInputStream extends InputStream {
 473         SocketChannel channel;
 474         Selector selector;
 475         ByteBuffer chanbuf;
 476         SelectionKey key;
 477         int available;
 478         byte[] one;
 479         boolean closed;
 480         ByteBuffer markBuf; /* reads may be satisifed from this buffer */
 481         boolean marked;
 482         boolean reset;
 483         int readlimit;
 484 
 485         public NioInputStream (SocketChannel chan) throws IOException {
 486             this.channel = chan;
 487             selector = Selector.open();
 488             chanbuf = ByteBuffer.allocate (1024);
 489             key = chan.register (selector, SelectionKey.OP_READ);
 490             available = 0;
 491             one = new byte[1];
 492             closed = marked = reset = false;
 493         }
 494 
 495         public synchronized int read (byte[] b) throws IOException {
 496             return read (b, 0, b.length);
 497         }
 498 
 499         public synchronized int read () throws IOException {
 500             return read (one, 0, 1);
 501         }
 502 
 503         public synchronized int read (byte[] b, int off, int srclen) throws IOException {
 504 
 505             int canreturn, willreturn;
 506 
 507             if (closed)
 508                 return -1;
 509 
 510             if (reset) { /* satisfy from markBuf */
 511                 canreturn = markBuf.remaining ();
 512                 willreturn = canreturn>srclen ? srclen : canreturn;
 513                 markBuf.get(b, off, willreturn);
 514                 if (canreturn == willreturn) {
 515                     reset = false;
 516                 }
 517             } else { /* satisfy from channel */
 518                 canreturn = available();
 519                 if (canreturn == 0) {
 520                     block ();
 521                     canreturn = available();
 522                 }
 523                 willreturn = canreturn>srclen ? srclen : canreturn;
 524                 chanbuf.get(b, off, willreturn);
 525                 available -= willreturn;
 526 
 527                 if (marked) { /* copy into markBuf */
 528                     try {
 529                         markBuf.put (b, off, willreturn);
 530                     } catch (BufferOverflowException e) {
 531                         marked = false;
 532                     }
 533                 }
 534             }
 535             return willreturn;
 536         }
 537 
 538         public synchronized int available () throws IOException {
 539             if (closed)
 540                 throw new IOException ("Stream is closed");
 541 
 542             if (reset)
 543                 return markBuf.remaining();
 544 
 545             if (available > 0)
 546                 return available;
 547 
 548             chanbuf.clear ();
 549             available = channel.read (chanbuf);
 550             if (available > 0)
 551                 chanbuf.flip();
 552             else if (available == -1)
 553                 throw new IOException ("Stream is closed");
 554             return available;
 555         }
 556 
 557         /**
 558          * block() only called when available==0 and buf is empty
 559          */
 560         private synchronized void block () throws IOException {
 561             //assert available == 0;
 562             int n = selector.select ();
 563             //assert n == 1;
 564             selector.selectedKeys().clear();
 565             available ();
 566         }
 567 
 568         public void close () throws IOException {
 569             if (closed)
 570                 return;
 571             channel.close ();
 572             closed = true;
 573         }
 574 
 575         public synchronized void mark (int readlimit) {
 576             if (closed)
 577                 return;
 578             this.readlimit = readlimit;
 579             markBuf = ByteBuffer.allocate (readlimit);
 580             marked = true;
 581             reset = false;
 582         }
 583 
 584         public synchronized void reset () throws IOException {
 585             if (closed )
 586                 return;
 587             if (!marked)
 588                 throw new IOException ("Stream not marked");
 589             marked = false;
 590             reset = true;
 591             markBuf.flip ();
 592         }
 593     }
 594 
 595     static class NioOutputStream extends OutputStream {
 596         SocketChannel channel;
 597         ByteBuffer buf;
 598         SelectionKey key;
 599         Selector selector;
 600         boolean closed;
 601         byte[] one;
 602 
 603         public NioOutputStream (SocketChannel channel) throws IOException {
 604             this.channel = channel;
 605             selector = Selector.open ();
 606             key = channel.register (selector, SelectionKey.OP_WRITE);
 607             closed = false;
 608             one = new byte [1];
 609         }
 610 
 611         public synchronized void write (int b) throws IOException {
 612             one[0] = (byte)b;
 613             write (one, 0, 1);
 614         }
 615 
 616         public synchronized void write (byte[] b) throws IOException {
 617             write (b, 0, b.length);
 618         }
 619 
 620         public synchronized void write (byte[] b, int off, int len) throws IOException {
 621             if (closed)
 622                 throw new IOException ("stream is closed");
 623 
 624             buf = ByteBuffer.allocate (len);
 625             buf.put (b, off, len);
 626             buf.flip ();
 627             int n;
 628             while ((n = channel.write (buf)) < len) {
 629                 len -= n;
 630                 if (len == 0)
 631                     return;
 632                 selector.select ();
 633                 selector.selectedKeys().clear ();
 634             }
 635         }
 636 
 637         public void close () throws IOException {
 638             if (closed)
 639                 return;
 640             channel.close ();
 641             closed = true;
 642         }
 643     }
 644 
 645     /**
 646      * Utilities for synchronization. A condition is
 647      * identified by a string name, and is initialized
 648      * upon first use (ie. setCondition() or waitForCondition()). Threads
 649      * are blocked until some thread calls (or has called) setCondition() for the same
 650      * condition.
 651      * <P>
 652      * A rendezvous built on a condition is also provided for synchronizing
 653      * N threads.
 654      */
 655 
 656     private static HashMap conditions = new HashMap();
 657 
 658     /*
 659      * Modifiable boolean object
 660      */
 661     private static class BValue {
 662         boolean v;
 663     }
 664 
 665     /*
 666      * Modifiable int object
 667      */
 668     private static class IValue {
 669         int v;
 670         IValue (int i) {
 671             v =i;
 672         }
 673     }
 674 
 675 
 676     private static BValue getCond (String condition) {
 677         synchronized (conditions) {
 678             BValue cond = (BValue) conditions.get (condition);
 679             if (cond == null) {
 680                 cond = new BValue();
 681                 conditions.put (condition, cond);
 682             }
 683             return cond;
 684         }
 685     }
 686 
 687     /**
 688      * Set the condition to true. Any threads that are currently blocked
 689      * waiting on the condition, will be unblocked and allowed to continue.
 690      * Threads that subsequently call waitForCondition() will not block.
 691      * If the named condition did not exist prior to the call, then it is created
 692      * first.
 693      */
 694 
 695     public static void setCondition (String condition) {
 696         BValue cond = getCond (condition);
 697         synchronized (cond) {
 698             if (cond.v) {
 699                 return;
 700             }
 701             cond.v = true;
 702             cond.notifyAll();
 703         }
 704     }
 705 
 706     /**
 707      * If the named condition does not exist, then it is created and initialized
 708      * to false. If the condition exists or has just been created and its value
 709      * is false, then the thread blocks until another thread sets the condition.
 710      * If the condition exists and is already set to true, then this call returns
 711      * immediately without blocking.
 712      */
 713 
 714     public static void waitForCondition (String condition) {
 715         BValue cond = getCond (condition);
 716         synchronized (cond) {
 717             if (!cond.v) {
 718                 try {
 719                     cond.wait();
 720                 } catch (InterruptedException e) {}
 721             }
 722         }
 723     }
 724 
 725     /* conditions must be locked when accessing this */
 726     static HashMap rv = new HashMap();
 727 
 728     /**
 729      * Force N threads to rendezvous (ie. wait for each other) before proceeding.
 730      * The first thread(s) to call are blocked until the last
 731      * thread makes the call. Then all threads continue.
 732      * <p>
 733      * All threads that call with the same condition name, must use the same value
 734      * for N (or the results may be not be as expected).
 735      * <P>
 736      * Obviously, if fewer than N threads make the rendezvous then the result
 737      * will be a hang.
 738      */
 739 
 740     public static void rendezvous (String condition, int N) {
 741         BValue cond;
 742         IValue iv;
 743         String name = "RV_"+condition;
 744 
 745         /* get the condition */
 746 
 747         synchronized (conditions) {
 748             cond = (BValue)conditions.get (name);
 749             if (cond == null) {
 750                 /* we are first caller */
 751                 if (N < 2) {
 752                     throw new RuntimeException ("rendezvous must be called with N >= 2");
 753                 }
 754                 cond = new BValue ();
 755                 conditions.put (name, cond);
 756                 iv = new IValue (N-1);
 757                 rv.put (name, iv);
 758             } else {
 759                 /* already initialised, just decrement the counter */
 760                 iv = (IValue) rv.get (name);
 761                 iv.v --;
 762             }
 763         }
 764 
 765         if (iv.v > 0) {
 766             waitForCondition (name);
 767         } else {
 768             setCondition (name);
 769             synchronized (conditions) {
 770                 clearCondition (name);
 771                 rv.remove (name);
 772             }
 773         }
 774     }
 775 
 776     /**
 777      * If the named condition exists and is set then remove it, so it can
 778      * be re-initialized and used again. If the condition does not exist, or
 779      * exists but is not set, then the call returns without doing anything.
 780      * Note, some higher level synchronization
 781      * may be needed between clear and the other operations.
 782      */
 783 
 784     public static void clearCondition(String condition) {
 785         BValue cond;
 786         synchronized (conditions) {
 787             cond = (BValue) conditions.get (condition);
 788             if (cond == null) {
 789                 return;
 790             }
 791             synchronized (cond) {
 792                 if (cond.v) {
 793                     conditions.remove (condition);
 794                 }
 795             }
 796         }
 797     }
 798 }